Publisher and Subscriber

This commit is contained in:
2024-03-10 17:15:47 +05:30
parent 60786b94e2
commit edb640b492
11 changed files with 687 additions and 30 deletions

7
compile.sh Normal file
View File

@@ -0,0 +1,7 @@
gcc -g -c glthread/glthread.c -o glthread/glthread.o
gcc -g -c notif.c -o notif.o
gcc -g -c rtm_publisher.c -o rtm_publisher.o
gcc -g -c rt.c -o rt.o
gcc -g -c threaded_subsciber.c -o threaded_subsciber.o
gcc -g rtm_publisher.o threaded_subsciber.o rt.o notif.o glthread/glthread.o -o main.exe -lpthread
rm *.o glthread/*.o

BIN
glthread/.glthread.h.un~ Normal file

Binary file not shown.

View File

@@ -44,7 +44,7 @@ glthread_add_last(glthread_t *base_glthread, glthread_t *new_glthread);
#define IS_GLTHREAD_LIST_EMPTY(glthreadptr) \ #define IS_GLTHREAD_LIST_EMPTY(glthreadptr) \
((glthreadptr)->right == 0 && (glthreadptr)->left == 0) ((glthreadptr)->right == 0 && (glthreadptr)->left == 0)
#define GLTHREAD_TO_STRUCT(fn_name, structure_name, field_name,glthreadptr) \ #define GLTHREAD_TO_STRUCT(fn_name, structure_name, field_name) \
static inline structure_name * fn_name(glthread_t *glthreadptr){ \ static inline structure_name * fn_name(glthread_t *glthreadptr){ \
if(glthreadptr)\ if(glthreadptr)\
return (structure_name *)((char *)(glthreadptr) - (char *)&(((structure_name *)0)->field_name)); \ return (structure_name *)((char *)(glthreadptr) - (char *)&(((structure_name *)0)->field_name)); \

95
glthread/glthread.h~ Normal file
View File

@@ -0,0 +1,95 @@
/*
* =====================================================================================
*
* Filename: glthread.h
*
* Description:
*
* Version: 1.0
* Created: 09/03/24 07:30:40 PM IST
* Revision: none
* Compiler: gcc
*
* Author: YOUR NAME (),
* Organization:
*
* =====================================================================================
*/
#ifndef __GLUETHREAD__
#define __GLUETHREAD__
typedef struct _glthread{
struct _glthread *left;
struct _glthread *right;
} glthread_t;
void
glthread_add_next(glthread_t *base_glthread, glthread_t *new_glthread);
void
glthread_add_before(glthread_t *base_glthread, glthread_t *new_glthread);
void
remove_glthread(glthread_t *glthread);
void
init_glthread(glthread_t *glthread);
void
glthread_add_last(glthread_t *base_glthread, glthread_t *new_glthread);
#define IS_GLTHREAD_LIST_EMPTY(glthreadptr) \
((glthreadptr)->right == 0 && (glthreadptr)->left == 0)
#define GLTHREAD_TO_STRUCT(fn_name, structure_name, field_name,glthreadptr) \
static inline structure_name * fn_name(glthread_t *glthreadptr){ \
if(glthreadptr)\
return (structure_name *)((char *)(glthreadptr) - (char *)&(((structure_name *)0)->field_name)); \
else return NULL;\
}
/* delete safe loop*/
/*Normal continue and break can be used with this loop macro*/
#define BASE(glthreadptr) ((glthreadptr)->right)
#define ITERATE_GLTHREAD_BEGIN(glthreadptrstart, glthreadptr) \
{ \
glthread_t *_glthread_ptr = NULL; \
glthreadptr = BASE(glthreadptrstart); \
for(; glthreadptr!= NULL; glthreadptr = _glthread_ptr){ \
_glthread_ptr = (glthreadptr)->right;
#define ITERATE_GLTHREAD_END(glthreadptrstart, glthreadptr) \
}}
#define GLTHREAD_GET_USER_DATA_FROM_OFFSET(glthreadptr, offset) \
(void *)((char *)(glthreadptr) - offset)
void
delete_glthread_list(glthread_t *base_glthread);
unsigned int
get_glthread_list_count(glthread_t *base_glthread);
void
glthread_priority_insert(glthread_t *base_glthread,
glthread_t *glthread,
int (*comp_fn)(void *, void *),
int offset);
glthread_t *
dequeue_glthread_first(glthread_t *base_glthread);
#if 0
void *
gl_thread_search(glthread_t *base_glthread,
void *(*thread_to_struct_fn)(glthread_t *),
void *key,
int (*comparison_fn)(void *, void *));
#endif
#endif /* __GLUETHREAD__ */

BIN
main.exe Executable file

Binary file not shown.

36
notif.c
View File

@@ -43,6 +43,20 @@ nfc_register_notif_chain(notif_chain_t *nfc,
glthread_add_next(&nfc->notif_chain_head, &new_nfce->glue); glthread_add_next(&nfc->notif_chain_head, &new_nfce->glue);
} }
void
nfc_delete_all_nfce(notif_chain_t *nfc){
glthread_t *curr;
notif_chain_elem_t *nfce;
ITERATE_GLTHREAD_BEGIN(&nfc->notif_chain_head, curr){
nfce = glthread_glue_to_notif_chain_elem(curr);
remove_glthread(&nfce->glue);
free(nfce);
} ITERATE_GLTHREAD_END(&nfc->notif_chain_head, curr);
}
void void
nfc_invoke_notif_chain(notif_chain_t *nfc, nfc_invoke_notif_chain(notif_chain_t *nfc,
void *arg, size_t arg_size, void *arg, size_t arg_size,
@@ -58,8 +72,6 @@ nfc_invoke_notif_chain(notif_chain_t *nfc,
assert(key_size <= MAX_NOTIF_KEY_SIZE); assert(key_size <= MAX_NOTIF_KEY_SIZE);
char *nfc_op_s = nfc_get_str_op_code(nfc_op_code);
ITERATE_GLTHREAD_BEGIN(&nfc->notif_chain_head, curr){ ITERATE_GLTHREAD_BEGIN(&nfc->notif_chain_head, curr){
nfce = glthread_glue_to_notif_chain_elem(curr); nfce = glthread_glue_to_notif_chain_elem(curr);
@@ -67,30 +79,14 @@ nfc_invoke_notif_chain(notif_chain_t *nfc,
if(!(key && key_size && if(!(key && key_size &&
nfce->is_key_set && (key_size == nfce->key_size))){ nfce->is_key_set && (key_size == nfce->key_size))){
nfce->app_cb(arg, arg_size, nfc_op_s, nfce->subs_id); nfce->app_cb(arg, arg_size, nfc_op_code, nfce->subs_id);
} }
else { else {
if(memcmp(key, nfce->key, key_size) == 0) { if(memcmp(key, nfce->key, key_size) == 0) {
nfce->app_cb(arg, arg_size, nfc_op_s, nfce->subs_id); nfce->app_cb(arg, arg_size, nfc_op_code, nfce->subs_id);
} }
} }
}ITERATE_GLTHREAD_END(&nfc->notif_chain_head, curr); }ITERATE_GLTHREAD_END(&nfc->notif_chain_head, curr);
} }
void
nfc_delete_all_nfce(notif_chain_t *nfc){
glthread *curr;
notif_chain_elem_t *nfc_e;
ITERATE_GLTHREAD_BEGIN(&nfc->notif_chain_head,curr){
nfc_e = glthread_glue_to_notif_chain_elem(curr);
remove_glthread(&nfc_e->glue);
free(nfc_e);
}ITERATE_GLTHREAD_END(&nfc->notif_chain_head,curr);
}

11
notif.h
View File

@@ -15,14 +15,13 @@
* *
* ===================================================================================== * =====================================================================================
*/ */
#ifndef __NOTIF_CHAIN_ #ifndef __NOTIF_CHAIN_
#define __NOTIF_CHAIN_ #define __NOTIF_CHAIN_
#include <stddef.h> /* for size_t */ #include <stddef.h> /* for size_t */
#include "utils.h" #include "glthread/glthread.h"
#include "gluethread/glthread.h" #include <stdint.h>
#include <stdbool.h>
#define MAX_NOTIF_KEY_SIZE 64 #define MAX_NOTIF_KEY_SIZE 64
typedef enum{ typedef enum{
@@ -57,14 +56,14 @@ nfc_get_str_op_code(nfc_op_t nfc_op_code) {
} }
typedef void (*nfc_app_cb)(void *, size_t, char *, uint32_t); typedef void (*nfc_app_cb)(void *, size_t, nfc_op_t, uint32_t);
typedef struct notif_chain_elem_{ typedef struct notif_chain_elem_{
char key[MAX_NOTIF_KEY_SIZE]; char key[MAX_NOTIF_KEY_SIZE];
size_t key_size; size_t key_size;
uint32_t subs_id; uint32_t subs_id;
bool_t is_key_set; bool is_key_set;
nfc_app_cb app_cb; nfc_app_cb app_cb;
glthread_t glue; glthread_t glue;
} notif_chain_elem_t; } notif_chain_elem_t;

217
rt.c Normal file
View File

@@ -0,0 +1,217 @@
/*
* =====================================================================================
*
* Filename: rt.c
*
* Description:
*
* Version: 1.0
* Created: 10/03/24 12:55:35 PM IST
* Revision: none
* Compiler: gcc
*
* Author: YOUR NAME (),
* Organization:
*
* =====================================================================================
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <memory.h>
#include <assert.h>
#include "rt.h"
void
rt_init_rt_table(rt_table_t *rt_table){
rt_table->head = NULL;
}
rt_entry_t *
rt_add_or_update_rt_entry(rt_table_t *rt_table,
char *dest,
char mask,
char *gw_ip,
char *oif){
bool new_entry;
rt_entry_t *head = NULL;
rt_entry_t *rt_entry = NULL;
new_entry = false;
rt_entry = rt_look_up_rt_entry(rt_table, dest, mask);
if(!rt_entry) {
rt_entry = calloc(1, sizeof(rt_entry_t));
strncpy(rt_entry->rt_entry_keys.dest, dest,
sizeof(rt_entry->rt_entry_keys.dest));
rt_entry->rt_entry_keys.mask = mask;
rt_entry->nfc = nfc_create_new_notif_chain(0);
new_entry = true;
}
if(gw_ip)
strncpy(rt_entry->gw_ip, gw_ip, sizeof(rt_entry->gw_ip));
if(oif)
strncpy(rt_entry->oif, oif, sizeof(rt_entry->oif));
if (new_entry) {
head = rt_table->head;
rt_table->head = rt_entry;
rt_entry->prev = 0;
rt_entry->next = head;
if(head)
head->prev = rt_entry;
}
if (gw_ip || oif) {
/* Entry is being updated by the publisher, send
* notification to all subscribers*/
nfc_invoke_notif_chain(rt_entry->nfc,
(char *)rt_entry, sizeof(rt_entry_t),
0, 0, NFC_MOD);
}
return rt_entry;
}
bool
rt_delete_rt_entry(rt_table_t *rt_table,
char *dest, char mask){
rt_entry_t *rt_entry = NULL;
ITERTAE_RT_TABLE_BEGIN(rt_table, rt_entry){
if(strncmp(rt_entry->rt_entry_keys.dest,
dest, sizeof(rt_entry->rt_entry_keys.dest)) == 0 &&
rt_entry->rt_entry_keys.mask == mask){
rt_entry_remove(rt_table, rt_entry);
nfc_invoke_notif_chain(rt_entry->nfc,
(char *)rt_entry, sizeof(rt_entry_t),
0, 0, NFC_DEL);
nfc_delete_all_nfce(rt_entry->nfc);
free(rt_entry->nfc);
rt_entry->nfc = NULL;
free(rt_entry->nfc);
free(rt_entry);
return true;
}
} ITERTAE_RT_TABLE_END(rt_table, curr);
return false;
}
void
rt_clear_rt_table(rt_table_t *rt_table){
}
void
rt_free_rt_table(rt_table_t *rt_table){
}
void
rt_dump_rt_table(rt_table_t *rt_table){
rt_entry_t *rt_entry = NULL;
ITERTAE_RT_TABLE_BEGIN(rt_table, rt_entry){
printf("%-20s %-4d %-20s %s\n",
rt_entry->rt_entry_keys.dest,
rt_entry->rt_entry_keys.mask,
rt_entry->gw_ip,
rt_entry->oif);
printf("\tPrinting Subscribers : ");
glthread_t *curr;
notif_chain_elem_t *nfce;
ITERATE_GLTHREAD_BEGIN(&rt_entry->nfc->notif_chain_head, curr){
nfce = glthread_glue_to_notif_chain_elem(curr);
printf("%u ", nfce->subs_id);
} ITERATE_GLTHREAD_END(&rt_entry->nfc->notif_chain_head, curr)
printf("\n");
} ITERTAE_RT_TABLE_END(rt_table, rt_entry);
}
rt_entry_t *
rt_look_up_rt_entry(rt_table_t *rt_table,
char *dest, char mask) {
rt_entry_t *rt_entry = NULL;
ITERTAE_RT_TABLE_BEGIN(rt_table, rt_entry) {
if ((strncmp(rt_entry->rt_entry_keys.dest,
dest, sizeof(rt_entry->rt_entry_keys.dest)) == 0) &&
rt_entry->rt_entry_keys.mask == mask) {
return rt_entry;
}
} ITERTAE_RT_TABLE_END(rt_table, rt_entry);
return NULL;
}
void
rt_table_register_for_notification(
rt_table_t *rt_table,
rt_entry_keys_t *key,
size_t key_size,
nfc_app_cb app_cb,
uint32_t subs_id) {
bool new_entry_created;
rt_entry_t *rt_entry;
notif_chain_elem_t nfce;
new_entry_created = false;
rt_entry = rt_look_up_rt_entry(rt_table, key->dest, key->mask);
if (!rt_entry) {
/* rt_entry was not existing before, but we are
* creating it because subscriber is interested in notif
* for this entry. Create such an entry without data. Later
* When publisher would actually cate this entry, all registered
* subscriber should be notified
* */
rt_entry = rt_add_or_update_rt_entry(
rt_table, key->dest, key->mask, 0, 0);
new_entry_created = true;
}
memset(&nfce, 0, sizeof(notif_chain_elem_t));
assert(key_size <= MAX_NOTIF_KEY_SIZE);
/* No need to keep keys as nfce;s are tied to
* routing table entry which contain keys*/
//memcpy(nfce.key, (char *)key, key_size);
//nfce.key_size = key_size;
nfce.app_cb = app_cb;
nfce.subs_id = subs_id;
nfc_register_notif_chain(rt_entry->nfc, &nfce);
/* Subscriber subscribes to already existing rt entry,
* immediately send notif to Subscriber with opcode
* NFC_ADD*/
if (!new_entry_created) {
app_cb(rt_entry, sizeof(rt_entry_t), NFC_ADD, subs_id);
}
}

122
rt.h Normal file
View File

@@ -0,0 +1,122 @@
/*
* =====================================================================================
*
* Filename: rt.h
*
* Description:
*
* Version: 1.0
* Created: 10/03/24 12:47:20 PM IST
* Revision: none
* Compiler: gcc
*
* Author: YOUR NAME (),
* Organization:
*
* =====================================================================================
*/
#ifndef __RT__
#define __RT__
#include <stdbool.h>
#include "notif.h"
typedef struct rt_entry_keys_{
char dest[16];
char mask;
} rt_entry_keys_t;
typedef struct rt_entry_{
/* A Structure which represnets only the keys of the
* Routing Table.*/
rt_entry_keys_t rt_entry_keys;
char gw_ip[16];
char oif[32];
bool created;
struct rt_entry_ *prev;
struct rt_entry_ *next;
notif_chain_t *nfc;
} rt_entry_t;
typedef struct rt_table_{
rt_entry_t *head;
} rt_table_t;
void
rt_init_rt_table(rt_table_t *rt_table);
rt_entry_t *
rt_add_or_update_rt_entry(rt_table_t *rt_table,
char *dest_ip, char mask, char *gw_ip, char *oif);
bool
rt_delete_rt_entry(rt_table_t *rt_table,
char *dest_ip, char mask);
bool
rt_update_rt_entry(rt_table_t *rt_table,
char *dest_ip, char mask,
char *new_gw_ip, char *new_oif);
rt_entry_t *
rt_look_up_rt_entry(rt_table_t *rt_table,
char *dest, char mask);
void
rt_clear_rt_table(rt_table_t *rt_table);
void
rt_free_rt_table(rt_table_t *rt_table);
void
rt_dump_rt_table(rt_table_t *rt_table);
static inline void
rt_entry_remove(rt_table_t *rt_table,
rt_entry_t *rt_entry){
if(!rt_entry->prev){
if(rt_entry->next){
rt_entry->next->prev = 0;
rt_table->head = rt_entry->next;
rt_entry->next = 0;
return;
}
rt_table->head = 0;
return;
}
if(!rt_entry->next){
rt_entry->prev->next = 0;
rt_entry->prev = 0;
return;
}
rt_entry->prev->next = rt_entry->next;
rt_entry->next->prev = rt_entry->prev;
rt_entry->prev = 0;
rt_entry->next = 0;
}
#define ITERTAE_RT_TABLE_BEGIN(rt_table_ptr, rt_entry_ptr) \
{ \
rt_entry_t *_next_rt_entry; \
for((rt_entry_ptr) = (rt_table_ptr)->head; \
(rt_entry_ptr); \
(rt_entry_ptr) = _next_rt_entry) { \
_next_rt_entry = (rt_entry_ptr)->next;
#define ITERTAE_RT_TABLE_END(rt_table_ptr, rt_entry_ptr) }}
void
rt_table_register_for_notification(
rt_table_t *rt_table,
rt_entry_keys_t *key,
size_t key_size,
nfc_app_cb app_cb,
uint32_t subs_id);
#endif /* __RT__ */

128
rtm_publisher.c Normal file
View File

@@ -0,0 +1,128 @@
/*
* =====================================================================================
*
* Filename: rtm_publisher.c
*
* Description:
*
* Version: 1.0
* Created: 10/03/24 02:01:50 PM IST
* Revision: none
* Compiler: gcc
*
* Author: YOUR NAME (),
* Organization:
*
* =====================================================================================
*/
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include "rt.h"
extern void
create_subscriber_thread(uint32_t client_id);
rt_table_t publisher_rt_table;
rt_table_t *
publisher_get_rt_table() {
return &publisher_rt_table;
}
void
main_menu() {
int choice;
while(1){
printf("Publisher Menu\n");
printf("1. Add/Update rt table entry\n");
printf("2. Delete rt table entry\n");
printf("3. Dump rt table\n");
printf("Enter Choice :");
choice = 0;
scanf("%d", &choice);
switch(choice){
case 1:
{
char dest[16];
char mask;
char oif[32];
char gw[16];
printf("Enter Destination :");
scanf("%s", dest);
printf("Mask : ");
scanf("%d", &mask);
printf("Enter oif name :");
scanf("%s", oif);
printf("Enter Gateway IP :");
scanf("%s", gw);
rt_add_or_update_rt_entry(publisher_get_rt_table(),
dest, mask, gw, oif);
}
break;
case 2:
/* Implement your self */
break;
case 3:
rt_dump_rt_table(publisher_get_rt_table());
break;
default: ;
}
}
}
void *
publisher_thread_fn(void *arg) {
/* Add some default entries in rt table */
rt_add_or_update_rt_entry(
publisher_get_rt_table(),
"122.1.1.1", 32, "10.1.1.1", "eth1");
rt_add_or_update_rt_entry(
publisher_get_rt_table(),
"122.1.1.2", 32, "10.1.1.2", "eth1");
rt_add_or_update_rt_entry(
publisher_get_rt_table(),
"122.1.1.3", 32, "10.1.1.3", "eth1");
rt_dump_rt_table(publisher_get_rt_table());
main_menu();
}
void
create_publisher_thread() {
pthread_attr_t attr;
pthread_t pub_thread;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&pub_thread, &attr,
publisher_thread_fn,
(void *)0);
}
int
main(int argc, char **argv) {
rt_init_rt_table(&publisher_rt_table);
/* Create Subscriber threads */
create_subscriber_thread(1);
sleep(1);
create_subscriber_thread(2);
sleep(1);
create_subscriber_thread(3);
sleep(1);
/* Create publisher thread*/
create_publisher_thread();
printf("Publisher thread created\n");
pthread_exit(0);
return 0;
}

93
threaded_subsciber.c Normal file
View File

@@ -0,0 +1,93 @@
/*
* =====================================================================================
*
* Filename: threaded_subsciber.c
*
* Description:
*
* Version: 1.0
* Created: 10/03/24 02:45:48 PM IST
* Revision: none
* Compiler: gcc
*
* Author: YOUR NAME (),
* Organization:
*
* =====================================================================================
*/
#include <stdio.h>
#include <stdlib.h>
#include <memory.h>
#include <sys/types.h> /*for pid_t*/
#include <unistd.h> /*for getpid()*/
#include <pthread.h>
#include <stdbool.h>
#include "notif.h"
#include "rt.h"
void
create_subscriber_thread();
extern rt_table_t *
publisher_get_rt_table();
static void
test_cb(void *arg, size_t arg_size,
nfc_op_t nfc_op_code,
uint32_t client_id) {
rt_entry_t *rt_entry;
printf("%s() client : %u, Notified with opcode %s\n",
__FUNCTION__, client_id, nfc_get_str_op_code(nfc_op_code));
rt_entry = (rt_entry_t *) arg;
printf("%-20s %-4d %-20s %s\n",
rt_entry->rt_entry_keys.dest,
rt_entry->rt_entry_keys.mask,
rt_entry->gw_ip,
rt_entry->oif);
printf("\n");
}
static void *
subscriber_thread_fn(void *arg){
rt_entry_keys_t rt_entry_keys;
/* register for Notif 122.1.1.1/32 */
memset(&rt_entry_keys, 0, sizeof(rt_entry_keys_t));
strncpy(rt_entry_keys.dest, "122.1.1.1", 16);
rt_entry_keys.mask = 32;
rt_table_register_for_notification(publisher_get_rt_table(), &rt_entry_keys, sizeof(rt_entry_keys_t), test_cb, *((uint32_t *)arg));
/* register for Notif 122.1.1.2/32 */
memset(&rt_entry_keys, 0, sizeof(rt_entry_keys_t));
strncpy(rt_entry_keys.dest, "122.1.1.5", 16);
rt_entry_keys.mask = 32;
rt_table_register_for_notification(publisher_get_rt_table(), &rt_entry_keys, sizeof(rt_entry_keys_t), test_cb, *((uint32_t *)arg));
/* register for Notif 122.1.1.3/32 */
memset(&rt_entry_keys, 0, sizeof(rt_entry_keys_t));
strncpy(rt_entry_keys.dest, "122.1.1.6", 16);
rt_entry_keys.mask = 32;
rt_table_register_for_notification(publisher_get_rt_table(), &rt_entry_keys, sizeof(rt_entry_keys_t), test_cb, *((uint32_t *)arg));
pause();
return NULL;
}
void
create_subscriber_thread(uint32_t client_id){
pthread_attr_t attr;
pthread_t subs_thread;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&subs_thread, &attr,
subscriber_thread_fn,
(void *)&client_id);
printf("Subscriber %u created\n", client_id);
}