diff --git a/compile.sh b/compile.sh new file mode 100644 index 0000000..263201a --- /dev/null +++ b/compile.sh @@ -0,0 +1,7 @@ +gcc -g -c glthread/glthread.c -o glthread/glthread.o +gcc -g -c notif.c -o notif.o +gcc -g -c rtm_publisher.c -o rtm_publisher.o +gcc -g -c rt.c -o rt.o +gcc -g -c threaded_subsciber.c -o threaded_subsciber.o +gcc -g rtm_publisher.o threaded_subsciber.o rt.o notif.o glthread/glthread.o -o main.exe -lpthread +rm *.o glthread/*.o diff --git a/glthread/.glthread.h.un~ b/glthread/.glthread.h.un~ new file mode 100644 index 0000000..249653d Binary files /dev/null and b/glthread/.glthread.h.un~ differ diff --git a/glthread/glthread.h b/glthread/glthread.h index e221b99..1e85273 100644 --- a/glthread/glthread.h +++ b/glthread/glthread.h @@ -44,7 +44,7 @@ glthread_add_last(glthread_t *base_glthread, glthread_t *new_glthread); #define IS_GLTHREAD_LIST_EMPTY(glthreadptr) \ ((glthreadptr)->right == 0 && (glthreadptr)->left == 0) -#define GLTHREAD_TO_STRUCT(fn_name, structure_name, field_name,glthreadptr) \ +#define GLTHREAD_TO_STRUCT(fn_name, structure_name, field_name) \ static inline structure_name * fn_name(glthread_t *glthreadptr){ \ if(glthreadptr)\ return (structure_name *)((char *)(glthreadptr) - (char *)&(((structure_name *)0)->field_name)); \ diff --git a/glthread/glthread.h~ b/glthread/glthread.h~ new file mode 100644 index 0000000..e221b99 --- /dev/null +++ b/glthread/glthread.h~ @@ -0,0 +1,95 @@ +/* + * ===================================================================================== + * + * Filename: glthread.h + * + * Description: + * + * Version: 1.0 + * Created: 09/03/24 07:30:40 PM IST + * Revision: none + * Compiler: gcc + * + * Author: YOUR NAME (), + * Organization: + * + * ===================================================================================== + */ + + +#ifndef __GLUETHREAD__ +#define __GLUETHREAD__ + +typedef struct _glthread{ + + struct _glthread *left; + struct _glthread *right; +} glthread_t; + +void +glthread_add_next(glthread_t *base_glthread, glthread_t *new_glthread); + +void +glthread_add_before(glthread_t *base_glthread, glthread_t *new_glthread); + +void +remove_glthread(glthread_t *glthread); + +void +init_glthread(glthread_t *glthread); + +void +glthread_add_last(glthread_t *base_glthread, glthread_t *new_glthread); + +#define IS_GLTHREAD_LIST_EMPTY(glthreadptr) \ + ((glthreadptr)->right == 0 && (glthreadptr)->left == 0) + +#define GLTHREAD_TO_STRUCT(fn_name, structure_name, field_name,glthreadptr) \ + static inline structure_name * fn_name(glthread_t *glthreadptr){ \ + if(glthreadptr)\ + return (structure_name *)((char *)(glthreadptr) - (char *)&(((structure_name *)0)->field_name)); \ + else return NULL;\ + } + +/* delete safe loop*/ +/*Normal continue and break can be used with this loop macro*/ + +#define BASE(glthreadptr) ((glthreadptr)->right) + +#define ITERATE_GLTHREAD_BEGIN(glthreadptrstart, glthreadptr) \ +{ \ + glthread_t *_glthread_ptr = NULL; \ + glthreadptr = BASE(glthreadptrstart); \ + for(; glthreadptr!= NULL; glthreadptr = _glthread_ptr){ \ + _glthread_ptr = (glthreadptr)->right; + +#define ITERATE_GLTHREAD_END(glthreadptrstart, glthreadptr) \ + }} + +#define GLTHREAD_GET_USER_DATA_FROM_OFFSET(glthreadptr, offset) \ + (void *)((char *)(glthreadptr) - offset) + +void +delete_glthread_list(glthread_t *base_glthread); + +unsigned int +get_glthread_list_count(glthread_t *base_glthread); + +void +glthread_priority_insert(glthread_t *base_glthread, + glthread_t *glthread, + int (*comp_fn)(void *, void *), + int offset); + +glthread_t * +dequeue_glthread_first(glthread_t *base_glthread); + +#if 0 +void * +gl_thread_search(glthread_t *base_glthread, + void *(*thread_to_struct_fn)(glthread_t *), + void *key, + int (*comparison_fn)(void *, void *)); + +#endif +#endif /* __GLUETHREAD__ */ diff --git a/main.exe b/main.exe new file mode 100755 index 0000000..c0cf06b Binary files /dev/null and b/main.exe differ diff --git a/notif.c b/notif.c index ce88d23..2e6499b 100644 --- a/notif.c +++ b/notif.c @@ -43,6 +43,20 @@ nfc_register_notif_chain(notif_chain_t *nfc, glthread_add_next(&nfc->notif_chain_head, &new_nfce->glue); } +void +nfc_delete_all_nfce(notif_chain_t *nfc){ + + glthread_t *curr; + notif_chain_elem_t *nfce; + + ITERATE_GLTHREAD_BEGIN(&nfc->notif_chain_head, curr){ + + nfce = glthread_glue_to_notif_chain_elem(curr); + remove_glthread(&nfce->glue); + free(nfce); + } ITERATE_GLTHREAD_END(&nfc->notif_chain_head, curr); +} + void nfc_invoke_notif_chain(notif_chain_t *nfc, void *arg, size_t arg_size, @@ -58,39 +72,21 @@ nfc_invoke_notif_chain(notif_chain_t *nfc, assert(key_size <= MAX_NOTIF_KEY_SIZE); - char *nfc_op_s = nfc_get_str_op_code(nfc_op_code); - ITERATE_GLTHREAD_BEGIN(&nfc->notif_chain_head, curr){ nfce = glthread_glue_to_notif_chain_elem(curr); - if(!(key && key_size && + if(!(key && key_size && nfce->is_key_set && (key_size == nfce->key_size))){ - - nfce->app_cb(arg, arg_size, nfc_op_s, nfce->subs_id); + + nfce->app_cb(arg, arg_size, nfc_op_code, nfce->subs_id); } else { - + if(memcmp(key, nfce->key, key_size) == 0) { - nfce->app_cb(arg, arg_size, nfc_op_s, nfce->subs_id); + nfce->app_cb(arg, arg_size, nfc_op_code, nfce->subs_id); } } }ITERATE_GLTHREAD_END(&nfc->notif_chain_head, curr); } - -void -nfc_delete_all_nfce(notif_chain_t *nfc){ - - glthread *curr; - notif_chain_elem_t *nfc_e; - - ITERATE_GLTHREAD_BEGIN(&nfc->notif_chain_head,curr){ - - nfc_e = glthread_glue_to_notif_chain_elem(curr); - remove_glthread(&nfc_e->glue); - free(nfc_e); - }ITERATE_GLTHREAD_END(&nfc->notif_chain_head,curr); - - -} diff --git a/notif.h b/notif.h index 0910e32..c9f9ad8 100644 --- a/notif.h +++ b/notif.h @@ -15,14 +15,13 @@ * * ===================================================================================== */ - #ifndef __NOTIF_CHAIN_ #define __NOTIF_CHAIN_ #include /* for size_t */ -#include "utils.h" -#include "gluethread/glthread.h" - +#include "glthread/glthread.h" +#include +#include #define MAX_NOTIF_KEY_SIZE 64 typedef enum{ @@ -57,14 +56,14 @@ nfc_get_str_op_code(nfc_op_t nfc_op_code) { } -typedef void (*nfc_app_cb)(void *, size_t, char *, uint32_t); +typedef void (*nfc_app_cb)(void *, size_t, nfc_op_t, uint32_t); typedef struct notif_chain_elem_{ char key[MAX_NOTIF_KEY_SIZE]; size_t key_size; uint32_t subs_id; - bool_t is_key_set; + bool is_key_set; nfc_app_cb app_cb; glthread_t glue; } notif_chain_elem_t; diff --git a/rt.c b/rt.c new file mode 100644 index 0000000..d37b10a --- /dev/null +++ b/rt.c @@ -0,0 +1,217 @@ +/* + * ===================================================================================== + * + * Filename: rt.c + * + * Description: + * + * Version: 1.0 + * Created: 10/03/24 12:55:35 PM IST + * Revision: none + * Compiler: gcc + * + * Author: YOUR NAME (), + * Organization: + * + * ===================================================================================== + */ +#include +#include +#include +#include +#include +#include "rt.h" + +void +rt_init_rt_table(rt_table_t *rt_table){ + + rt_table->head = NULL; +} + +rt_entry_t * +rt_add_or_update_rt_entry(rt_table_t *rt_table, + char *dest, + char mask, + char *gw_ip, + char *oif){ + + bool new_entry; + rt_entry_t *head = NULL; + rt_entry_t *rt_entry = NULL; + + new_entry = false; + + rt_entry = rt_look_up_rt_entry(rt_table, dest, mask); + + if(!rt_entry) { + + rt_entry = calloc(1, sizeof(rt_entry_t)); + + strncpy(rt_entry->rt_entry_keys.dest, dest, + sizeof(rt_entry->rt_entry_keys.dest)); + rt_entry->rt_entry_keys.mask = mask; + + rt_entry->nfc = nfc_create_new_notif_chain(0); + new_entry = true; + } + + if(gw_ip) + strncpy(rt_entry->gw_ip, gw_ip, sizeof(rt_entry->gw_ip)); + if(oif) + strncpy(rt_entry->oif, oif, sizeof(rt_entry->oif)); + + if (new_entry) { + head = rt_table->head; + rt_table->head = rt_entry; + rt_entry->prev = 0; + rt_entry->next = head; + if(head) + head->prev = rt_entry; + } + + if (gw_ip || oif) { + + /* Entry is being updated by the publisher, send + * notification to all subscribers*/ + nfc_invoke_notif_chain(rt_entry->nfc, + (char *)rt_entry, sizeof(rt_entry_t), + 0, 0, NFC_MOD); + } + + return rt_entry; +} + +bool +rt_delete_rt_entry(rt_table_t *rt_table, + char *dest, char mask){ + + rt_entry_t *rt_entry = NULL; + + ITERTAE_RT_TABLE_BEGIN(rt_table, rt_entry){ + + if(strncmp(rt_entry->rt_entry_keys.dest, + dest, sizeof(rt_entry->rt_entry_keys.dest)) == 0 && + rt_entry->rt_entry_keys.mask == mask){ + + rt_entry_remove(rt_table, rt_entry); + + nfc_invoke_notif_chain(rt_entry->nfc, + (char *)rt_entry, sizeof(rt_entry_t), + 0, 0, NFC_DEL); + nfc_delete_all_nfce(rt_entry->nfc); + free(rt_entry->nfc); + rt_entry->nfc = NULL; + free(rt_entry->nfc); + + free(rt_entry); + return true; + } + } ITERTAE_RT_TABLE_END(rt_table, curr); + + return false; +} + +void +rt_clear_rt_table(rt_table_t *rt_table){ + + +} + +void +rt_free_rt_table(rt_table_t *rt_table){ + + +} + +void +rt_dump_rt_table(rt_table_t *rt_table){ + + rt_entry_t *rt_entry = NULL; + + ITERTAE_RT_TABLE_BEGIN(rt_table, rt_entry){ + + printf("%-20s %-4d %-20s %s\n", + rt_entry->rt_entry_keys.dest, + rt_entry->rt_entry_keys.mask, + rt_entry->gw_ip, + rt_entry->oif); + printf("\tPrinting Subscribers : "); + + glthread_t *curr; + notif_chain_elem_t *nfce; + + ITERATE_GLTHREAD_BEGIN(&rt_entry->nfc->notif_chain_head, curr){ + + nfce = glthread_glue_to_notif_chain_elem(curr); + + printf("%u ", nfce->subs_id); + + } ITERATE_GLTHREAD_END(&rt_entry->nfc->notif_chain_head, curr) + printf("\n"); + } ITERTAE_RT_TABLE_END(rt_table, rt_entry); +} + +rt_entry_t * +rt_look_up_rt_entry(rt_table_t *rt_table, + char *dest, char mask) { + + rt_entry_t *rt_entry = NULL; + + ITERTAE_RT_TABLE_BEGIN(rt_table, rt_entry) { + if ((strncmp(rt_entry->rt_entry_keys.dest, + dest, sizeof(rt_entry->rt_entry_keys.dest)) == 0) && + rt_entry->rt_entry_keys.mask == mask) { + + return rt_entry; + } + } ITERTAE_RT_TABLE_END(rt_table, rt_entry); + return NULL; +} + +void +rt_table_register_for_notification( + rt_table_t *rt_table, + rt_entry_keys_t *key, + size_t key_size, + nfc_app_cb app_cb, + uint32_t subs_id) { + + bool new_entry_created; + rt_entry_t *rt_entry; + notif_chain_elem_t nfce; + + new_entry_created = false; + + rt_entry = rt_look_up_rt_entry(rt_table, key->dest, key->mask); + + if (!rt_entry) { + /* rt_entry was not existing before, but we are + * creating it because subscriber is interested in notif + * for this entry. Create such an entry without data. Later + * When publisher would actually cate this entry, all registered + * subscriber should be notified + * */ + rt_entry = rt_add_or_update_rt_entry( + rt_table, key->dest, key->mask, 0, 0); + new_entry_created = true; + } + + memset(&nfce, 0, sizeof(notif_chain_elem_t)); + + assert(key_size <= MAX_NOTIF_KEY_SIZE); + + /* No need to keep keys as nfce;s are tied to + * routing table entry which contain keys*/ + //memcpy(nfce.key, (char *)key, key_size); + //nfce.key_size = key_size; + nfce.app_cb = app_cb; + nfce.subs_id = subs_id; + nfc_register_notif_chain(rt_entry->nfc, &nfce); + + /* Subscriber subscribes to already existing rt entry, + * immediately send notif to Subscriber with opcode + * NFC_ADD*/ + if (!new_entry_created) { + app_cb(rt_entry, sizeof(rt_entry_t), NFC_ADD, subs_id); + } +} diff --git a/rt.h b/rt.h new file mode 100644 index 0000000..19aba01 --- /dev/null +++ b/rt.h @@ -0,0 +1,122 @@ +/* + * ===================================================================================== + * + * Filename: rt.h + * + * Description: + * + * Version: 1.0 + * Created: 10/03/24 12:47:20 PM IST + * Revision: none + * Compiler: gcc + * + * Author: YOUR NAME (), + * Organization: + * + * ===================================================================================== + */ +#ifndef __RT__ +#define __RT__ + +#include +#include "notif.h" + +typedef struct rt_entry_keys_{ + + char dest[16]; + char mask; +} rt_entry_keys_t; + +typedef struct rt_entry_{ + + /* A Structure which represnets only the keys of the + * Routing Table.*/ + rt_entry_keys_t rt_entry_keys; + + char gw_ip[16]; + char oif[32]; + bool created; + struct rt_entry_ *prev; + struct rt_entry_ *next; + notif_chain_t *nfc; +} rt_entry_t; + +typedef struct rt_table_{ + + rt_entry_t *head; +} rt_table_t; + +void +rt_init_rt_table(rt_table_t *rt_table); + +rt_entry_t * +rt_add_or_update_rt_entry(rt_table_t *rt_table, + char *dest_ip, char mask, char *gw_ip, char *oif); + +bool +rt_delete_rt_entry(rt_table_t *rt_table, + char *dest_ip, char mask); + +bool +rt_update_rt_entry(rt_table_t *rt_table, + char *dest_ip, char mask, + char *new_gw_ip, char *new_oif); + +rt_entry_t * +rt_look_up_rt_entry(rt_table_t *rt_table, + char *dest, char mask); + +void +rt_clear_rt_table(rt_table_t *rt_table); + +void +rt_free_rt_table(rt_table_t *rt_table); + +void +rt_dump_rt_table(rt_table_t *rt_table); + +static inline void +rt_entry_remove(rt_table_t *rt_table, + rt_entry_t *rt_entry){ + + if(!rt_entry->prev){ + if(rt_entry->next){ + rt_entry->next->prev = 0; + rt_table->head = rt_entry->next; + rt_entry->next = 0; + return; + } + rt_table->head = 0; + return; + } + if(!rt_entry->next){ + rt_entry->prev->next = 0; + rt_entry->prev = 0; + return; + } + + rt_entry->prev->next = rt_entry->next; + rt_entry->next->prev = rt_entry->prev; + rt_entry->prev = 0; + rt_entry->next = 0; +} + +#define ITERTAE_RT_TABLE_BEGIN(rt_table_ptr, rt_entry_ptr) \ +{ \ + rt_entry_t *_next_rt_entry; \ + for((rt_entry_ptr) = (rt_table_ptr)->head; \ + (rt_entry_ptr); \ + (rt_entry_ptr) = _next_rt_entry) { \ + _next_rt_entry = (rt_entry_ptr)->next; + +#define ITERTAE_RT_TABLE_END(rt_table_ptr, rt_entry_ptr) }} + +void +rt_table_register_for_notification( + rt_table_t *rt_table, + rt_entry_keys_t *key, + size_t key_size, + nfc_app_cb app_cb, + uint32_t subs_id); + +#endif /* __RT__ */ diff --git a/rtm_publisher.c b/rtm_publisher.c new file mode 100644 index 0000000..787c5f3 --- /dev/null +++ b/rtm_publisher.c @@ -0,0 +1,128 @@ +/* + * ===================================================================================== + * + * Filename: rtm_publisher.c + * + * Description: + * + * Version: 1.0 + * Created: 10/03/24 02:01:50 PM IST + * Revision: none + * Compiler: gcc + * + * Author: YOUR NAME (), + * Organization: + * + * ===================================================================================== + */ +#include +#include +#include +#include "rt.h" + +extern void +create_subscriber_thread(uint32_t client_id); + +rt_table_t publisher_rt_table; + +rt_table_t * +publisher_get_rt_table() { + + return &publisher_rt_table; +} + +void +main_menu() { + + int choice; + while(1){ + printf("Publisher Menu\n"); + printf("1. Add/Update rt table entry\n"); + printf("2. Delete rt table entry\n"); + printf("3. Dump rt table\n"); + printf("Enter Choice :"); + choice = 0; + scanf("%d", &choice); + switch(choice){ + case 1: + { + char dest[16]; + char mask; + char oif[32]; + char gw[16]; + printf("Enter Destination :"); + scanf("%s", dest); + printf("Mask : "); + scanf("%d", &mask); + printf("Enter oif name :"); + scanf("%s", oif); + printf("Enter Gateway IP :"); + scanf("%s", gw); + rt_add_or_update_rt_entry(publisher_get_rt_table(), + dest, mask, gw, oif); + } + break; + case 2: + /* Implement your self */ + break; + case 3: + rt_dump_rt_table(publisher_get_rt_table()); + break; + default: ; + } + } +} + +void * +publisher_thread_fn(void *arg) { + + /* Add some default entries in rt table */ + rt_add_or_update_rt_entry( + publisher_get_rt_table(), + "122.1.1.1", 32, "10.1.1.1", "eth1"); + + rt_add_or_update_rt_entry( + publisher_get_rt_table(), + "122.1.1.2", 32, "10.1.1.2", "eth1"); + + rt_add_or_update_rt_entry( + publisher_get_rt_table(), + "122.1.1.3", 32, "10.1.1.3", "eth1"); + + rt_dump_rt_table(publisher_get_rt_table()); + main_menu(); +} + +void +create_publisher_thread() { + + pthread_attr_t attr; + pthread_t pub_thread; + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + + pthread_create(&pub_thread, &attr, + publisher_thread_fn, + (void *)0); +} + +int +main(int argc, char **argv) { + + rt_init_rt_table(&publisher_rt_table); + /* Create Subscriber threads */ + create_subscriber_thread(1); + sleep(1); + + create_subscriber_thread(2); + sleep(1); + + create_subscriber_thread(3); + sleep(1); + /* Create publisher thread*/ + create_publisher_thread(); + printf("Publisher thread created\n"); + pthread_exit(0); + return 0; +} diff --git a/threaded_subsciber.c b/threaded_subsciber.c new file mode 100644 index 0000000..67ac2d4 --- /dev/null +++ b/threaded_subsciber.c @@ -0,0 +1,93 @@ +/* + * ===================================================================================== + * + * Filename: threaded_subsciber.c + * + * Description: + * + * Version: 1.0 + * Created: 10/03/24 02:45:48 PM IST + * Revision: none + * Compiler: gcc + * + * Author: YOUR NAME (), + * Organization: + * + * ===================================================================================== + */ +#include +#include +#include +#include /*for pid_t*/ +#include /*for getpid()*/ +#include +#include +#include "notif.h" +#include "rt.h" + +void +create_subscriber_thread(); + +extern rt_table_t * +publisher_get_rt_table(); + +static void +test_cb(void *arg, size_t arg_size, + nfc_op_t nfc_op_code, + uint32_t client_id) { + + rt_entry_t *rt_entry; + printf("%s() client : %u, Notified with opcode %s\n", + __FUNCTION__, client_id, nfc_get_str_op_code(nfc_op_code)); + + rt_entry = (rt_entry_t *) arg; + + printf("%-20s %-4d %-20s %s\n", + rt_entry->rt_entry_keys.dest, + rt_entry->rt_entry_keys.mask, + rt_entry->gw_ip, + rt_entry->oif); + printf("\n"); +} + +static void * +subscriber_thread_fn(void *arg){ + + rt_entry_keys_t rt_entry_keys; + + /* register for Notif 122.1.1.1/32 */ + memset(&rt_entry_keys, 0, sizeof(rt_entry_keys_t)); + strncpy(rt_entry_keys.dest, "122.1.1.1", 16); + rt_entry_keys.mask = 32; + rt_table_register_for_notification(publisher_get_rt_table(), &rt_entry_keys, sizeof(rt_entry_keys_t), test_cb, *((uint32_t *)arg)); + + /* register for Notif 122.1.1.2/32 */ + memset(&rt_entry_keys, 0, sizeof(rt_entry_keys_t)); + strncpy(rt_entry_keys.dest, "122.1.1.5", 16); + rt_entry_keys.mask = 32; + rt_table_register_for_notification(publisher_get_rt_table(), &rt_entry_keys, sizeof(rt_entry_keys_t), test_cb, *((uint32_t *)arg)); + + /* register for Notif 122.1.1.3/32 */ + memset(&rt_entry_keys, 0, sizeof(rt_entry_keys_t)); + strncpy(rt_entry_keys.dest, "122.1.1.6", 16); + rt_entry_keys.mask = 32; + rt_table_register_for_notification(publisher_get_rt_table(), &rt_entry_keys, sizeof(rt_entry_keys_t), test_cb, *((uint32_t *)arg)); + + pause(); + return NULL; +} + +void +create_subscriber_thread(uint32_t client_id){ + + pthread_attr_t attr; + pthread_t subs_thread; + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + + pthread_create(&subs_thread, &attr, + subscriber_thread_fn, + (void *)&client_id); + printf("Subscriber %u created\n", client_id); +}