From 3ae6a02a3e14c504086d474a9d981829ed99a0c0 Mon Sep 17 00:00:00 2001 From: Hizenberg Date: Sat, 19 Oct 2024 15:45:40 +0530 Subject: [PATCH] Project Finished --- CMakeLists.txt | 3 +- asyncproject/CMakeLists.txt | 56 +++++++ asyncproject/network_utils.c | 156 ++++++++++++++++++ asyncproject/network_utils.h | 48 ++++++ asyncproject/rt.c | 209 ++++++++++++++++++++++++ asyncproject/rt.h | 63 ++++++++ asyncproject/stp.c | 138 ++++++++++++++++ asyncproject/stp_el.c | 250 +++++++++++++++++++++++++++++ asyncproject/stp_el.h | 34 ++++ asyncproject/timerlib.c | 301 +++++++++++++++++++++++++++++++++++ asyncproject/timerlib.h | 126 +++++++++++++++ asyncproject/udp_client.c | 30 ++++ asyncproject/utils.c | 26 +++ asyncproject/utils.h | 8 + include/event_loop.h | 27 +++- src/event_loop.c | 68 ++++++-- test/CMakeLists.txt | 12 ++ test/evloop_app.c | 11 +- test/evloop_concurreny_app.c | 59 +++++++ 19 files changed, 1601 insertions(+), 24 deletions(-) create mode 100644 asyncproject/CMakeLists.txt create mode 100644 asyncproject/network_utils.c create mode 100644 asyncproject/network_utils.h create mode 100644 asyncproject/rt.c create mode 100644 asyncproject/rt.h create mode 100644 asyncproject/stp.c create mode 100644 asyncproject/stp_el.c create mode 100644 asyncproject/stp_el.h create mode 100644 asyncproject/timerlib.c create mode 100644 asyncproject/timerlib.h create mode 100644 asyncproject/udp_client.c create mode 100644 asyncproject/utils.c create mode 100644 asyncproject/utils.h create mode 100644 test/evloop_concurreny_app.c diff --git a/CMakeLists.txt b/CMakeLists.txt index aa412f9..dc1d830 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,4 +34,5 @@ set(SOURCE_DIR ${CMAKE_SOURCE_DIR}/src) #Adding Sub-directory having CMakeLists.txt... add_subdirectory(src) -add_subdirectory(test) \ No newline at end of file +add_subdirectory(test) +add_subdirectory(asyncproject) \ No newline at end of file diff --git a/asyncproject/CMakeLists.txt b/asyncproject/CMakeLists.txt new file mode 100644 index 0000000..8f24716 --- /dev/null +++ b/asyncproject/CMakeLists.txt @@ -0,0 +1,56 @@ +set(NETWORK_UTILS "network_utils.c") +set(RT "rt.c") +set(STP "stp.c") +set(UDP_CLIENT "udp_client.c") +set(UTILS "utils.c") +set(STP_EL "stp_el.c") +set(TIMER_LIB "timerlib.c") + +add_executable(stp_exe + ${STP} + ${RT} + ${NETWORK_UTILS} + ${UTILS} + ${STP_EL} + ${TIMER_LIB}) + +add_executable(udp_client + ${UDP_CLIENT} + ${NETWORK_UTILS} + ${UTILS}) + + +#Linking dependent library... + +if( NOT (CMAKE_SYSTEM_NAME STREQUAL "Linux") ) + message(FATAL_ERROR "It's not a Unix-based system.\n \ + POSIX Library will not compile in this project.\n") +endif() +set(THREADS_PREFER_PTHREAD_FLAG ON) +find_package(Threads REQUIRED) + +# Find the rt library +find_library(RT_LIB rt) + +#Below snippet is important for POSIX timer library in WSL2... +target_compile_definitions(stp_exe PRIVATE _POSIX_C_SOURCE=199309L) + +# Check if the rt library is found +if (RT_LIB) + message(STATUS "librt found: ${RT_LIB}") +else() + message(FATAL_ERROR "librt not found") +endif() + + +target_link_libraries(stp_exe PUBLIC + Threads::Threads + ${RT_LIB} + ${EVENT_LOOP_LIB}) + +target_link_libraries(udp_client PUBLIC + Threads::Threads + ${RT_LIB}) + +target_include_directories(stp_exe PUBLIC + ${HEADER_DIR}) \ No newline at end of file diff --git a/asyncproject/network_utils.c b/asyncproject/network_utils.c new file mode 100644 index 0000000..ce856f6 --- /dev/null +++ b/asyncproject/network_utils.c @@ -0,0 +1,156 @@ +#include "network_utils.h" + +/* UDP Server code*/ + +typedef struct thread_arg_pkg_ { + + char ip_addr[16]; + uint32_t port_no; + int comm_fd; + recv_fn_cb recv_fn; + pthread_t* thread; + char* recv_buffer; +} thread_arg_pkg_t; + + +static void* +_udp_server_create_and_start(void* arg) { + + thread_arg_pkg_t* thread_arg_pkg = + (thread_arg_pkg_t*)arg; + + char ip_addr[16]; + strncpy(ip_addr, thread_arg_pkg->ip_addr, 16); + uint32_t port_no = thread_arg_pkg->port_no; + recv_fn_cb recv_fn = thread_arg_pkg->recv_fn; + free(thread_arg_pkg); + thread_arg_pkg = NULL; + + int udp_sock_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + + if (udp_sock_fd == -1) { + printf("Socket Creation Failed\n"); + return 0; + } + + struct sockaddr_in server_addr; + server_addr.sin_family = AF_INET; + server_addr.sin_port = port_no; + server_addr.sin_addr.s_addr = INADDR_ANY; + + if (bind(udp_sock_fd, (struct sockaddr*)&server_addr, + sizeof(struct sockaddr)) == -1) { + printf("Error : UDP socket bind failed\n"); + return 0; + } + + char* recv_buffer = calloc(1, MAX_PACKET_BUFFER_SIZE); + + fd_set active_sock_fd_set, + backup_sock_fd_set; + + FD_ZERO(&active_sock_fd_set); + FD_ZERO(&backup_sock_fd_set); + + struct sockaddr_in client_addr; + FD_SET(udp_sock_fd, &backup_sock_fd_set); + int bytes_recvd = 0, + addr_len = sizeof(client_addr); + + while (1) { + + memcpy(&active_sock_fd_set, &backup_sock_fd_set, sizeof(fd_set)); + select(udp_sock_fd + 1, &active_sock_fd_set, NULL, NULL, NULL); + + if (FD_ISSET(udp_sock_fd, &active_sock_fd_set)) { + + memset(recv_buffer, 0, MAX_PACKET_BUFFER_SIZE); + bytes_recvd = recvfrom(udp_sock_fd, recv_buffer, + MAX_PACKET_BUFFER_SIZE, 0, + (struct sockaddr*)&client_addr, &addr_len); + + recv_fn(recv_buffer, bytes_recvd, + network_covert_ip_n_to_p( + (uint32_t)htonl(client_addr.sin_addr.s_addr), 0), + client_addr.sin_port, udp_sock_fd); + } + } + return 0; +} + + +void +udp_server_create_and_start( + char* ip_addr, + uint32_t udp_port_no, + recv_fn_cb recv_fn) { + + pthread_attr_t attr; + pthread_t recv_pkt_thread; + thread_arg_pkg_t* thread_arg_pkg; + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + + thread_arg_pkg = calloc(1, sizeof(thread_arg_pkg_t)); + strncpy(thread_arg_pkg->ip_addr, ip_addr, 16); + thread_arg_pkg->port_no = udp_port_no; + thread_arg_pkg->recv_fn = recv_fn; + + pthread_create(&recv_pkt_thread, &attr, + _udp_server_create_and_start, + (void*)thread_arg_pkg); +} + +int +send_udp_msg(char* dest_ip_addr, + uint32_t dest_port_no, + char* msg, + uint32_t msg_size, + int sock_fd) { + + struct sockaddr_in dest; + + dest.sin_family = AF_INET; + dest.sin_port = dest_port_no; + struct hostent* host = (struct hostent*)gethostbyname(dest_ip_addr); + dest.sin_addr = *((struct in_addr*)host->h_addr_list); + int addr_len = sizeof(struct sockaddr); + + if (sock_fd < 0) { + + sock_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + + if (sock_fd < 0) { + printf("socket creation failed, errno = %d\n", errno); + return -1; + } + } + sendto(sock_fd, msg, msg_size, + 0, (struct sockaddr*)&dest, + sizeof(struct sockaddr)); + return sock_fd; +} + +char* +network_covert_ip_n_to_p(uint32_t ip_addr, + char* output_buffer) { + + char* out = NULL; + static char str_ip[16]; + out = !output_buffer ? str_ip : output_buffer; + memset(out, 0, 16); + ip_addr = htonl(ip_addr); + inet_ntop(AF_INET, &ip_addr, out, 16); + out[15] = '\0'; + return out; +} + +uint32_t +network_covert_ip_p_to_n(char* ip_addr) { + + uint32_t binary_prefix = 0; + inet_pton(AF_INET, ip_addr, &binary_prefix); + binary_prefix = htonl(binary_prefix); + return binary_prefix; +} \ No newline at end of file diff --git a/asyncproject/network_utils.h b/asyncproject/network_utils.h new file mode 100644 index 0000000..2090336 --- /dev/null +++ b/asyncproject/network_utils.h @@ -0,0 +1,48 @@ +#ifndef __NETWORK_UTILS__ +#define __NETWORK_UTILS__ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define MAX_PACKET_BUFFER_SIZE 1024 + +typedef void (*recv_fn_cb)(char*, /* msg recvd */ + uint32_t, /* recvd msg size */ + char*, /* Sender's IP address */ + uint32_t, /* Sender's Port number */ + uint32_t); /* Sender Communication FD , only for tcp*/ + +void +udp_server_create_and_start( + char* ip_addr, + uint32_t udp_port_no, + recv_fn_cb recv_fn); + +int +send_udp_msg(char* dest_ip_addr, + uint32_t udp_port_no, + char* msg, + uint32_t msg_size, + int sock_fd); + +/* General Nw utilities */ +char* +network_covert_ip_n_to_p(uint32_t ip_addr, + char* output_buffer); + +uint32_t +network_covert_ip_p_to_n(char* ip_addr); + +#endif /* __NETWORK_UTILS__ */ \ No newline at end of file diff --git a/asyncproject/rt.c b/asyncproject/rt.c new file mode 100644 index 0000000..971fa63 --- /dev/null +++ b/asyncproject/rt.c @@ -0,0 +1,209 @@ +#include +#include +#include +#include +#include +#include "utils.h" +#include "rt.h" +#include "stp_el.h" +#include "timerlib.h" + +rt_table_t* +rt_create_new_rt_table(char* name) { + + rt_table_t* rtable = calloc(1, sizeof(rt_table_t)); + strncpy(rtable->rt_table_name, name, 32); + return rtable; +} + +static void +rt_entry_exp_timer_cbk(Timer_t* timer, void* arg) { + rt_table_t* rt_table; + rt_table_entry_t* rt_table_entry = (rt_table_entry_t*)arg; + + rt_table = rt_table_entry->rt_table; + + /* Handover the deletion of routing table entry to event loop thread */ + el_stp_update_routing_table(rt_table, ROUTE_DELETE, rt_table_entry); +} + + +int /*0 on success, -1 on failure*/ +rt_insert_new_entry(rt_table_t* rt, + char* dest, char mask, + char* gw, char* oif, + int exp_timer_in_millisec) { + + rt_table_entry_t* rt_table_entry = rt_look_up_rt_table_entry( + rt, dest, mask); + + if (rt_table_entry) { + printf("Error : Entry already exist for key %s/%d\n", dest, mask); + return -1; + } + + rt_table_entry = + calloc(1, sizeof(rt_table_entry_t)); + + rt_table_entry->rt_table = rt; + + + strncpy(rt_table_entry->dest, dest, 16); + rt_table_entry->mask = mask; + strncpy(rt_table_entry->gw, gw, 16); + strncpy(rt_table_entry->oif, oif, 32); + rt_table_entry->next = 0; + rt_table_entry->prev = 0; + time(&rt_table_entry->last_updated_time); + + + if (exp_timer_in_millisec) { + rt_table_entry->exp_timer = setup_timer( + rt_entry_exp_timer_cbk, + exp_timer_in_millisec, 0, 0, + (void*)rt_table_entry, false); + + start_timer(rt_table_entry->exp_timer); + } + + /* Now insert the new rt_table_entry at the beginnig of the + * list*/ + if (!rt->head) { + rt->head = rt_table_entry; + rt->count++; + return 0; + } + + rt_table_entry->next = rt->head; + rt_table_entry->prev = 0; + rt->head->prev = rt_table_entry; + rt->head = rt_table_entry; + rt->count++; + return 0; +} + +int /*0 on success, -1 on failure*/ +rt_delete_rt_entry(rt_table_t* rt, + char* dest, char mask) { + + rt_table_entry_t* rt_table_entry = rt_look_up_rt_table_entry( + rt, dest, mask); + + if (!rt_table_entry) + return -1; + + if (rt_table_entry->exp_timer) { + delete_timer(rt_table_entry->exp_timer); + rt_table_entry->exp_timer = NULL; + } + + /*Now delete it*/ + if (rt->head == rt_table_entry) { + rt->head = rt_table_entry->next; + if (rt->head) + rt->head->prev = NULL; + free(rt_table_entry); + rt->count--; + return 0; + } + + if (!rt_table_entry->prev) { + if (rt_table_entry->next) { + rt_table_entry->next->prev = NULL; + rt_table_entry->next = 0; + rt->count--; + free(rt_table_entry); + return 0; + } + rt->count--; + free(rt_table_entry); + return 0; + } + if (!rt_table_entry->next) { + rt_table_entry->prev->next = NULL; + rt_table_entry->prev = NULL; + rt->count--; + free(rt_table_entry); + return 0; + } + + rt_table_entry->prev->next = rt_table_entry->next; + rt_table_entry->next->prev = rt_table_entry->prev; + rt_table_entry->prev = 0; + rt_table_entry->next = 0; + rt->count--; + free(rt_table_entry); + return 0; +} + +int /*0 on success, -1 on failure*/ +rt_update_rt_entry(rt_table_t* rt, + char* dest, char mask, + char* new_gw, char* new_oif) { + + rt_table_entry_t* rt_table_entry = rt_look_up_rt_table_entry( + rt, dest, mask); + + if (!rt_table_entry) + return -1; + + if (strncmp(rt_table_entry->dest, dest, 16) == 0 && + rt_table_entry->mask == mask && + strncmp(rt_table_entry->gw, new_gw, 16) == 0 && + strncmp(rt_table_entry->oif, new_oif, 32) == 0) { + return -1; + } + + strncpy(rt_table_entry->dest, dest, 16); + rt_table_entry->mask = mask; + strncpy(rt_table_entry->gw, new_gw, 16); + strncpy(rt_table_entry->oif, new_oif, 32); + time(&rt_table_entry->last_updated_time); + return 0; +} + +void +rt_display_rt_table(rt_table_t* rt) { + + int i = 1; + rt_table_entry_t* rt_table_entry = NULL; + time_t curr_time = time(NULL); + unsigned int uptime_in_seconds = 0; + + printf("# count = %u\n", rt->count); + + for (rt_table_entry = rt->head; rt_table_entry; + rt_table_entry = rt_table_entry->next) { + + uptime_in_seconds = (unsigned int)difftime( + curr_time, rt_table_entry->last_updated_time); + + printf("%d. %-18s %-4d %-18s %-18s ", i, + rt_table_entry->dest, + rt_table_entry->mask, + rt_table_entry->gw, + rt_table_entry->oif); + printf("Last updated : %s\n", hrs_min_sec_format(uptime_in_seconds)); + + printf("Exp time : %lu\n", + rt_table_entry->exp_timer ? \ + timer_get_time_remaining_in_mill_sec(rt_table_entry->exp_timer) : 0); + i++; + } +} + +rt_table_entry_t* +rt_look_up_rt_table_entry(rt_table_t* rt, + char* dest, char mask) { + + rt_table_entry_t* rt_table_entry = NULL; + + for (rt_table_entry = rt->head; rt_table_entry; + rt_table_entry = rt_table_entry->next) { + + if (strncmp(rt_table_entry->dest, dest, 16) == 0 && + rt_table_entry->mask == mask) + return rt_table_entry; + } + return NULL; +} \ No newline at end of file diff --git a/asyncproject/rt.h b/asyncproject/rt.h new file mode 100644 index 0000000..ed1c6d6 --- /dev/null +++ b/asyncproject/rt.h @@ -0,0 +1,63 @@ +#ifndef __RT__ +#define __RT__ + +/*Opaque Data structures*/ +typedef struct rt_table_ rt_table_t; +typedef struct rt_table_entry_ rt_table_entry_t; +typedef struct Timer_ Timer_t; + + +#define RT_ENTRY_EXP_TIMER 30 // 30 sec + +#define ROUTE_CREATE 1 +#define ROUTE_UPDATE 2 +#define ROUTE_DELETE 3 + +struct rt_table_entry_ { + + char dest[16]; + char mask; + char gw[16]; + char oif[32]; + time_t last_updated_time; + struct rt_table_entry_* next; + struct rt_table_entry_* prev; + rt_table_t* rt_table; /* back ptr to owning rt table*/ + + Timer_t* exp_timer; + int exp_timer_msec; +}; + +struct rt_table_ { + + char rt_table_name[32]; + struct rt_table_entry_* head; + uint32_t count; +}; + +rt_table_t* +rt_create_new_rt_table(char* name); + +int /*0 on success, -1 on failure*/ +rt_insert_new_entry(rt_table_t* rt, + char* dest, char mask, + char* gw, char* oif, + int exp_timer_in_millisec); + +int /*0 on success, -1 on failure*/ +rt_delete_rt_entry(rt_table_t* rt, + char* dest, char mask); + +int /*0 on success, -1 on failure*/ +rt_update_rt_entry(rt_table_t* rt, + char* dest, char mask, + char* new_gw, char* new_oif); + +void +rt_display_rt_table(rt_table_t* rt); + +rt_table_entry_t* +rt_look_up_rt_table_entry(rt_table_t* rt, + char* dest, char mask); + +#endif /* __RT__ */ \ No newline at end of file diff --git a/asyncproject/stp.c b/asyncproject/stp.c new file mode 100644 index 0000000..4690da9 --- /dev/null +++ b/asyncproject/stp.c @@ -0,0 +1,138 @@ +#include +#include +#include + +#include "network_utils.h" +#include "rt.h" +#include "stp_el.h" + + +extern event_loop_t el; +rt_table_t* rt_table = NULL; + +int +stp_update_routing_table(rt_table_t* rt_table, uint32_t cmd_code, rt_table_entry_t* rt_entry) { + + int rc; + printf(" Code = %u Route update recvd : %s\n", cmd_code, rt_entry->dest); + switch (cmd_code) { + case ROUTE_CREATE: + rc = rt_insert_new_entry(rt_table, rt_entry->dest, 32, rt_entry->gw, rt_entry->oif, rt_entry->exp_timer_msec); + break; + case ROUTE_UPDATE: + rc = rt_update_rt_entry(rt_table, rt_entry->dest, 32, rt_entry->gw, rt_entry->oif); + break; + case ROUTE_DELETE: + rc = rt_delete_rt_entry(rt_table, rt_entry->dest, 32); + break; + default:; + } + return rc; +} + +static void +pkt_process_fn(char* msg_recvd, uint32_t msg_size, char* sender_ip, uint32_t port_no, uint32_t fd) { + + printf("route recvd on port no %d from IP %s\n", port_no, sender_ip); + uint32_t* cmd_code = (uint32_t*)msg_recvd; + rt_table_entry_t* rt_entry = (rt_table_entry_t*)(cmd_code + 1); + rt_entry->exp_timer_msec = RT_ENTRY_EXP_TIMER * 1000; + + el_stp_update_routing_table(rt_table, *cmd_code, rt_entry); +} + +static void +cli_handler(int choice) +{ + switch (choice) + { + case 1: + //rt_display_rt_table(rt_table); + + rt_display_rt_table_preemption_context_save(rt_table); + printf("\n\n"); + break; + case 2: + { + rt_table_entry_t rt_entry; + printf("Enter Dest Address : "); + scanf("%s", rt_entry.dest); + printf("Enter Gateway Address : "); + scanf("%s", rt_entry.gw); + printf("Enter OIF name : "); + scanf("%s", rt_entry.oif); + el_stp_update_routing_table(rt_table, ROUTE_CREATE, &rt_entry); + } + break; + case 3: + // do self + break; + case 4: + { + rt_table_entry_t rt_entry; + printf("Enter Dest Address : "); + scanf("%s", rt_entry.dest); + if (el_stp_update_routing_table(rt_table, ROUTE_DELETE, &rt_entry)) + { + printf("No Such entry\n"); + } + } + break; + case 5: + { + int portno; + printf("Listening port no ? "); + scanf("%d", &portno); + udp_server_create_and_start( + "127.0.0.1", portno, pkt_process_fn); + } + break; + case 6: + { + rt_table_entry_t rt_entry_template; + printf("Entry Dest Address : "); + scanf("%s", rt_entry_template.dest); + el_stp_serialize_and_send_rt_entry(rt_table, &rt_entry_template); + } + break; + case 7: + { + el_stp_delete_rt_table(rt_table); + } + break; + case 8: + exit(0); + default: + break; + } +} + +int +main(int argc, char** argv) { + + if (!rt_table) { + rt_table = + rt_create_new_rt_table("Table1"); + printf(" New Routing Table Created\n"); + } + + for (;;) { + + printf("Main Menu\n"); + printf("\t 1. : Display Routing Table\n"); + printf("\t 2. : Create New RT Entry\n"); + printf("\t 3. : Update Existing RT Entry\n"); + printf("\t 4. : Delete RT entry\n"); + printf("\t 5. : Start Pkt Listener thread\n"); + printf("\t 6. : Serialize and send RT entry\n"); + printf("\t 7. : Delete RT Table\n"); + printf("\t 8. : exit\n"); + + int choice; + printf("Enter Choice : "); + scanf("%d", &choice); + + cli_handler(choice); + } + return 0; +} \ No newline at end of file diff --git a/asyncproject/stp_el.c b/asyncproject/stp_el.c new file mode 100644 index 0000000..317888b --- /dev/null +++ b/asyncproject/stp_el.c @@ -0,0 +1,250 @@ +#include +#include +#include +#include +#include +#include +#include "utils.h" +#include "network_utils.h" +#include "stp_el.h" +#include "timerlib.h" + +/* Take Event loop global variable */ +event_loop_t el; + +/* Import external function */ +extern int +stp_update_routing_table(rt_table_t* rt_table, uint32_t cmd_code, rt_table_entry_t* rt_entry); + +void +stp_init_el(event_loop_t* el) { + event_loop_init(el); + event_loop_run(el); +} + +static EL_RES_T +el_stp_update_routing_table_cbk(void* arg) { + + el_rt_table_update_data_t* el_rt_table_update_data = + (el_rt_table_update_data_t*)arg; + + stp_update_routing_table(el_rt_table_update_data->rt_table, + el_rt_table_update_data->cmd_code, + el_rt_table_update_data->rt_entry); + + free(el_rt_table_update_data->rt_entry); + free(el_rt_table_update_data); + return EL_FINISH; +} + +task_t* +el_stp_update_routing_table(rt_table_t* rt, int cmd_code, rt_table_entry_t* rt_entry) { + el_rt_table_update_data_t* el_rt_table_update_data = + (el_rt_table_update_data_t*)calloc(1, sizeof(el_rt_table_update_data_t)); + + el_rt_table_update_data->rt_table = rt; + el_rt_table_update_data->cmd_code = cmd_code; + + el_rt_table_update_data->rt_entry = (rt_table_entry_t*)calloc(1, sizeof(rt_table_entry_t)); + + memcpy((char*)el_rt_table_update_data->rt_entry, + rt_entry, sizeof(*rt_entry)); + + task_t* task = task_create_new_job(&el, + el_stp_update_routing_table_cbk, + (void*)el_rt_table_update_data, cmd_code == ROUTE_DELETE ? TASK_PRIORITY_LOW : TASK_PRIORITY_MEDIUM); + + return task; +} + + +typedef struct rt_table_print_cntxt_ { + int i; + rt_table_entry_t* rt_table_entry; +}rt_table_print_cntxt_t; + +static EL_RES_T +rt_display_rt_table_preemption_context_save_cbk(void* arg) { + + time_t curr_time = time(NULL); + unsigned int uptime_in_seconds = 0; + rt_table_print_cntxt_t* cntxt = (rt_table_print_cntxt_t*)arg; + + rt_table_entry_t* rt_table_entry = cntxt->rt_table_entry; + int i = cntxt->i; + + for (; rt_table_entry; + rt_table_entry = rt_table_entry->next) { + + + uptime_in_seconds = (unsigned int)difftime( + curr_time, rt_table_entry->last_updated_time); + + printf("%d. %-18s %-4d %18s %-18s", i, + rt_table_entry->dest, + rt_table_entry->mask, + rt_table_entry->gw, + rt_table_entry->oif); + + printf("Last updated : %s ", hrs_min_sec_format(uptime_in_seconds)); + + printf("Exp time : %lu\n", + rt_table_entry->exp_timer ? \ + timer_get_time_remaining_in_mill_sec(rt_table_entry->exp_timer) : 0); + i++; + + /* Save the context */ + if (i % 10 == 0 && rt_table_entry->next) { + cntxt->rt_table_entry = rt_table_entry->next; + cntxt->i = i++; + return EL_CONTINUE; + } + } + + free(cntxt); + return EL_FINISH; +} + +void +rt_display_rt_table_preemption_context_save(rt_table_t* rt) { + + rt_table_entry_t* rt_table_entry = rt->head; + + if (!rt_table_entry) return; + + printf("# count = %u\n", rt->count); + + rt_table_print_cntxt_t* cntxt = + (rt_table_print_cntxt_t*)calloc(1, sizeof(rt_table_print_cntxt_t)); + + cntxt->i = 1; + cntxt->rt_table_entry = rt_table_entry; + + task_create_new_job(&el, + rt_display_rt_table_preemption_context_save_cbk, + (void*)cntxt, TASK_PRIORITY_HIGH); +} + + +static EL_RES_T +rt_entry_serialize_and_send_task_cbk(void* arg) { + + rt_table_entry_t* rt_entry = (rt_table_entry_t*)arg; + + /* look up RT Entry in RT table */ + rt_table_entry_t* actual_rt_entry = rt_look_up_rt_table_entry( + rt_entry->rt_table, + rt_entry->dest, + rt_entry->mask + ); + + if (!actual_rt_entry) { + printf("Serialize Task : RT Entry do not exist\n"); + free(rt_entry); + return EL_FINISH; + } + + int udp_sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + + if (udp_sockfd < 0) { + printf("Error : Failed to create UDP socket\n"); + free(rt_entry); + return EL_FINISH; + } + + int msg_size = sizeof(uint32_t) + sizeof(rt_table_entry_t); + uint32_t* msg_to_send = (uint32_t*)calloc(1, msg_size); + *msg_to_send = ROUTE_CREATE; + memcpy((char*)(msg_to_send + 1), (char*)actual_rt_entry, sizeof(rt_table_entry_t)); + + send_udp_msg("127.0.0.1", + 50000, + (char*)msg_to_send, msg_size, + udp_sockfd); + + close(udp_sockfd); + free(msg_to_send); + free(rt_entry); + return EL_FINISH; + +} + +void +el_stp_serialize_and_send_rt_entry(rt_table_t* rt, rt_table_entry_t* rt_entry_template) { + + /* This new rt_entry serves the purpose of context-save structure for task */ + + rt_table_entry_t* rt_entry = (rt_table_entry_t*)calloc(1, sizeof(rt_table_entry_t)); + + strncpy(rt_entry->dest, rt_entry_template->dest, 16); + + rt_entry->mask = 32; + rt_entry->rt_table = rt; + + task_create_new_job(&el, + rt_entry_serialize_and_send_task_cbk, + (void*)rt_entry, TASK_PRIORITY_MEDIUM); +} + + +typedef struct rt_table_delete_context_ { + rt_table_entry_t* rt_entry; +}rt_table_delete_context_t; + +EL_RES_T +rt_table_delete_cbk(void* arg) { + int i = 0; + + rt_table_delete_context_t* cntxt = (rt_table_delete_context_t*)arg; + + rt_table_entry_t* rt_entry = cntxt->rt_entry; + rt_table_entry_t* rt_entry_next; + + printf("Resuming rt_table deletion Task\n"); + + do { + rt_entry_next = rt_entry->next; + + if (rt_entry->exp_timer) { + delete_timer(rt_entry->exp_timer); + rt_entry->exp_timer = NULL; + } + + printf("Deleting rt_entry = %s/%d\n", rt_entry->dest, rt_entry->mask); + free(rt_entry); + i++; + + if (i % 10 == 0 && rt_entry_next) { + cntxt->rt_entry = rt_entry_next; + printf("Preempting rt_table deletion Task\n"); + return EL_CONTINUE; + } + + + rt_entry = rt_entry_next; + } while (rt_entry); + + free(cntxt); + printf("rt_table deletion Task Finished\n"); + return EL_FINISH; +} + +void +el_stp_delete_rt_table(rt_table_t* rt_table) { + + /* This is the case when we need to delete a container data structures. + Container data structures must be isolated from remaining application data structures first */ + + if (!rt_table) return; + if (!rt_table->head) return; + + /* Isolate RT Table */ + rt_table_entry_t* rt_entry = rt_table->head; + rt_table->head = NULL; + + rt_table_delete_context_t* cntxt = (rt_table_delete_context_t*)calloc(1, sizeof(rt_table_delete_context_t)); + cntxt->rt_entry = rt_entry; + + task_create_new_job(&el, rt_table_delete_cbk, + (void*)cntxt, TASK_PRIORITY_LOW); +} \ No newline at end of file diff --git a/asyncproject/stp_el.h b/asyncproject/stp_el.h new file mode 100644 index 0000000..957f6b5 --- /dev/null +++ b/asyncproject/stp_el.h @@ -0,0 +1,34 @@ +#ifndef __STP_EL__ +#define __STP_EL__ + +#include + +#include "rt.h" + +/* Data structure to pack all argument to one single argument */ + +typedef struct el_rt_table_update_data_ { + + rt_table_t* rt_table; + int cmd_code; + rt_table_entry_t* rt_entry; +}el_rt_table_update_data_t; + +void +stp_init_el(event_loop_t* el); + +/* Now create APIs' to update routing table using Event loop */ + +task_t* +el_stp_update_routing_table(rt_table_t* rt, int cmd_code, rt_table_entry_t* rt_entry); + +void +rt_display_rt_table_preemption_context_save(rt_table_t* rt); + +void +el_stp_serialize_and_send_rt_entry(rt_table_t* rt, rt_table_entry_t* rt_entry_template); + +void +el_stp_delete_rt_table(rt_table_t* rt_table); + +#endif \ No newline at end of file diff --git a/asyncproject/timerlib.c b/asyncproject/timerlib.c new file mode 100644 index 0000000..796f74c --- /dev/null +++ b/asyncproject/timerlib.c @@ -0,0 +1,301 @@ +#include +#include +#include +#include +#include +#include "timerlib.h" + +unsigned long +timespec_to_millisec( + struct timespec* time) { + + unsigned long milli_sec = 0; + + milli_sec = time->tv_sec * 1000; + milli_sec += time->tv_nsec / 1000000; + return milli_sec; +} + +void +timer_fill_itimerspec(struct timespec* ts, + unsigned long msec) { + + memset(ts, 0, sizeof(struct timespec)); + + if (!msec) return; + + unsigned long sec = msec / 1000; + ts->tv_sec = sec; + + unsigned long remaining_msec = msec % 1000; + + ts->tv_nsec = remaining_msec * (1000000); +} + +static void +timer_callback_wrapper(union sigval arg) { + + Timer_t* timer = (Timer_t*)(arg.sival_ptr); + + timer->invocation_counter++; + + if (timer->thresdhold && + (timer->invocation_counter > timer->thresdhold)) { + cancel_timer(timer); + return; + } + + (timer->cb)(timer, timer->user_arg); + + if (timer->exponential_backoff) { + + assert(timer->exp_back_off_time); + reschedule_timer(timer, + timer->exp_back_off_time *= 2, 0); + } + else if (timer->timer_state == TIMER_RESUMED) { + + reschedule_timer(timer, + timer->exp_timer, timer->sec_exp_timer); + } +} + + +/* Returns NULL in timer creation fails, else + * return a pointer to Timer object*/ +Timer_t* +setup_timer( + void (*timer_cb)(Timer_t*, void*), /* Timer Callback with user data*/ + unsigned long exp_timer, /* First expiration time interval in msec */ + unsigned long sec_exp_timer, /* Subsequent expiration time interval in msec */ + uint32_t threshold, /* Max no of expirations, 0 for infinite*/ + void* user_arg, /* Arg to timer callback */ + bool exponential_backoff) { /* Is Timer Exp backoff*/ + + Timer_t* timer = calloc(1, sizeof(Timer_t)); + timer->posix_timer = calloc(1, sizeof(timer_t)); + + timer->user_arg = user_arg; + timer->exp_timer = exp_timer; + timer->sec_exp_timer = sec_exp_timer; + timer->cb = timer_cb; + timer->thresdhold = threshold; + timer_set_state(timer, TIMER_INIT); + timer->exponential_backoff = exponential_backoff; + + /* Sanity checks */ + assert(timer->cb); /* Mandatory */ + + + struct sigevent evp; + memset(&evp, 0, sizeof(struct sigevent)); + + evp.sigev_value.sival_ptr = (void*)(timer); + evp.sigev_notify = SIGEV_THREAD; + evp.sigev_notify_function = timer_callback_wrapper; + + int rc = timer_create(CLOCK_REALTIME, + &evp, timer->posix_timer); + + assert(rc >= 0); + + timer_fill_itimerspec(&timer->ts.it_value, timer->exp_timer); + + if (!timer->exponential_backoff) { + timer_fill_itimerspec(&timer->ts.it_interval, timer->sec_exp_timer); + timer->exp_back_off_time = 0; + } + else { + timer->exp_back_off_time = timespec_to_millisec(&timer->ts.it_value); + timer_fill_itimerspec(&timer->ts.it_interval, 0); + } + return timer; +} + +void +resurrect_timer(Timer_t* timer) { + + int rc; + rc = timer_settime(*(timer->posix_timer), 0, &timer->ts, NULL); + assert(rc >= 0); +} + +void +start_timer(Timer_t* timer) { + + resurrect_timer(timer); + timer_set_state(timer, TIMER_RUNNING); +} + +void +delete_timer(Timer_t* timer) { + + int rc; + rc = timer_delete(*(timer->posix_timer)); + assert(rc >= 0); + timer->user_arg = NULL; /* User arg need to be freed by Appln */ + timer_set_state(timer, TIMER_DELETED); + free(timer->posix_timer); + free(timer); +} + +void +cancel_timer(Timer_t* timer) { + + TIMER_STATE_T timer_curr_state; + + timer_curr_state = timer_get_current_state(timer); + + if (timer_curr_state == TIMER_INIT || + timer_curr_state == TIMER_DELETED) { + + return; /* No-Operation */ + } + + /* Only Paused or running timer can be cancelled */ + timer_fill_itimerspec(&timer->ts.it_value, 0); + timer_fill_itimerspec(&timer->ts.it_interval, 0); + timer->time_remaining = 0; + timer->invocation_counter = 0; + resurrect_timer(timer); + timer_set_state(timer, TIMER_CANCELLED); +} + + +void +pause_timer(Timer_t* timer) { + + if (timer_get_current_state(timer) == TIMER_PAUSED) + return; + + timer->time_remaining = + timer_get_time_remaining_in_mill_sec(timer); + + timer_fill_itimerspec(&timer->ts.it_value, 0); + timer_fill_itimerspec(&timer->ts.it_interval, 0); + + resurrect_timer(timer); + + timer_set_state(timer, TIMER_PAUSED); +} + + +void +resume_timer(Timer_t* timer) { + + assert(timer_get_current_state(timer) == TIMER_PAUSED); + + timer_fill_itimerspec(&timer->ts.it_value, timer->time_remaining); + timer_fill_itimerspec(&timer->ts.it_interval, timer->sec_exp_timer); + timer->time_remaining = 0; + + resurrect_timer(timer); + timer_set_state(timer, TIMER_RESUMED); +} + +unsigned long +timer_get_time_remaining_in_mill_sec(Timer_t* timer) { + + struct itimerspec remaining_time; + + switch (timer->timer_state) { + + case TIMER_INIT: + break; + case TIMER_DELETED: + return ~0; + case TIMER_PAUSED: + break; + case TIMER_CANCELLED: + return ~0; + case TIMER_RUNNING: + break; + default:; + } + + memset(&remaining_time, 0, sizeof(struct itimerspec)); + + timer_gettime(*timer->posix_timer, &remaining_time); + + return timespec_to_millisec(&remaining_time.it_value); +} + +void +restart_timer(Timer_t* timer) { + + assert(timer->timer_state != TIMER_DELETED); + + cancel_timer(timer); + + timer_fill_itimerspec(&timer->ts.it_value, timer->exp_timer); + + if (!timer->exponential_backoff) + timer_fill_itimerspec(&timer->ts.it_interval, timer->sec_exp_timer); + else + timer_fill_itimerspec(&timer->ts.it_interval, 0); + + timer->invocation_counter = 0; + timer->time_remaining = 0; + timer->exp_back_off_time = timer->exp_timer; + resurrect_timer(timer); + timer_set_state(timer, TIMER_RUNNING); +} + + +void +reschedule_timer(Timer_t* timer, + unsigned long exp_ti, + unsigned long sec_exp_ti) { + + uint32_t invocation_counter; + TIMER_STATE_T timer_state; + + timer_state = timer_get_current_state(timer); + + if (timer_state == TIMER_DELETED) assert(0); + + invocation_counter = timer->invocation_counter; + + if (timer_state != TIMER_CANCELLED) { + cancel_timer(timer); + } + + timer->invocation_counter = invocation_counter; + + timer_fill_itimerspec(&timer->ts.it_value, exp_ti); + + if (!timer->exponential_backoff) { + timer_fill_itimerspec(&timer->ts.it_interval, sec_exp_ti); + } + else { + timer_fill_itimerspec(&timer->ts.it_interval, 0); + timer->exp_back_off_time = exp_ti; + } + + timer->time_remaining = 0; + resurrect_timer(timer); + timer_set_state(timer, TIMER_RUNNING); +} + +void +print_timer(Timer_t* timer) { + + printf("Counter = %u, time remaining = %lu, state = %d\n", + timer->invocation_counter, + timer_get_time_remaining_in_mill_sec(timer), + timer_get_current_state(timer)); +} + +bool +is_timer_running(Timer_t* timer) { + + TIMER_STATE_T timer_state; + + timer_state = timer_get_current_state(timer); + + if (timer_state == TIMER_RUNNING || + timer_state == TIMER_RESUMED) { + return true; + } + return false; +} \ No newline at end of file diff --git a/asyncproject/timerlib.h b/asyncproject/timerlib.h new file mode 100644 index 0000000..51c3d72 --- /dev/null +++ b/asyncproject/timerlib.h @@ -0,0 +1,126 @@ +#ifndef __TIMER_WRAP__ +#define __TIMER_WRAP__ + +#include +#include +#include +#include +#include +#include + +typedef enum { + + TIMER_INIT, + TIMER_DELETED, + TIMER_PAUSED, + TIMER_CANCELLED, + TIMER_RESUMED, + TIMER_RUNNING, +} TIMER_STATE_T; + +typedef struct Timer_ { + + /* Timer config */ + timer_t* posix_timer; + void* user_arg; + unsigned long exp_timer; /* in milli-sec */ + unsigned long sec_exp_timer; /* in milli-sec */ + uint32_t thresdhold; /* No of times to invoke the timer callback */ + void (*cb)(struct Timer_*, void*); /* Timer Callback */ + bool exponential_backoff; + + /* place holder value to store + * dynamic attributes of timer */ + unsigned long time_remaining; /* Time left for paused timer for next expiration */ + uint32_t invocation_counter; + struct itimerspec ts; + unsigned long exp_back_off_time; + TIMER_STATE_T timer_state; +} Timer_t; + +/* Returns NULL in timer creation fails, else + * return a pointer to Timer object*/ +Timer_t* +setup_timer( + /* Timer Callback with user data and user size*/ + void (*)(Timer_t*, void*), + /* First expiration time interval in msec */ + unsigned long, + /* Subsequent expiration time interval in msec */ + unsigned long, + /* Max no of expirations, 0 for infinite*/ + uint32_t, + /* Arg to timer callback */ + void*, + /* Is timer Exp back off */ + bool); + +static inline void +timer_delete_user_data(Timer_t* timer) { + + if (timer->user_arg) { + free(timer->user_arg); + timer->user_arg = NULL; + } +} + +static inline TIMER_STATE_T +timer_get_current_state(Timer_t* timer) { + + return timer->timer_state; +} + +static inline void +timer_set_state(Timer_t* timer, TIMER_STATE_T timer_state) { + + timer->timer_state = timer_state; +} + +void +resurrect_timer(Timer_t* timer); + +void +start_timer(Timer_t* timer); + +void +delete_timer(Timer_t* timer); + +void +cancel_timer(Timer_t* timer); + +void +pause_timer(Timer_t* timer); + +void +resume_timer(Timer_t* timer); + +int +execute_timer(Timer_t* timer, TIMER_STATE_T action); + +/* get remaining time in msec */ +unsigned long +timer_get_time_remaining_in_mill_sec(Timer_t* timer); + +void +restart_timer(Timer_t* timer); + +void +reschedule_timer(Timer_t* timer, + unsigned long exp_ti, + unsigned long sec_exp_ti); + +void +print_timer(Timer_t* timer); + +unsigned long +timespec_to_millisec( + struct timespec* time); + +void +timer_fill_itimerspec(struct timespec* ts, + unsigned long msec); + +bool +is_timer_running(Timer_t* timer); + +#endif /* __TIMER_WRAP__ */ \ No newline at end of file diff --git a/asyncproject/udp_client.c b/asyncproject/udp_client.c new file mode 100644 index 0000000..2f329b1 --- /dev/null +++ b/asyncproject/udp_client.c @@ -0,0 +1,30 @@ +#include +#include +#include +#include "rt.h" +#include "network_utils.h" + +int +main(int argc, char** argv) { + + if (argc != 7) { + printf("Insufficient Argument\n"); + return 0; + } + + char* dest_ip = argv[1]; + uint32_t dest_port_no = atoi(argv[2]); + + rt_table_entry_t rt_table_entry; + + memcpy(rt_table_entry.dest, argv[3], sizeof(rt_table_entry.dest)); + memcpy(rt_table_entry.oif, argv[4], sizeof(rt_table_entry.oif)); + memcpy(rt_table_entry.gw, argv[5], sizeof(rt_table_entry.gw)); + int msg_size = sizeof(uint32_t) + sizeof(rt_table_entry_t); + uint32_t* msg_to_send = (uint32_t*)calloc(1, msg_size); + *msg_to_send = atoi(argv[6]); + memcpy((char*)(msg_to_send + 1), (char*)&rt_table_entry, sizeof(rt_table_entry_t)); + send_udp_msg(dest_ip, dest_port_no, (char*)msg_to_send, msg_size, -1); + free(msg_to_send); + return 0; +} \ No newline at end of file diff --git a/asyncproject/utils.c b/asyncproject/utils.c new file mode 100644 index 0000000..98967be --- /dev/null +++ b/asyncproject/utils.c @@ -0,0 +1,26 @@ +#include "utils.h" +#include "memory.h" +#include +#include + +char* +hrs_min_sec_format(unsigned int seconds) { + + static char time_f[16]; + unsigned int hrs = 0, + min = 0, sec = 0; + + if (seconds > 3600) { + min = seconds / 60; + sec = seconds % 60; + hrs = min / 60; + min = min % 60; + } + else { + min = seconds / 60; + sec = seconds % 60; + } + memset(time_f, 0, sizeof(time_f)); + sprintf(time_f, "%u::%u::%u", hrs, min, sec); + return time_f; +} \ No newline at end of file diff --git a/asyncproject/utils.h b/asyncproject/utils.h new file mode 100644 index 0000000..0a4f052 --- /dev/null +++ b/asyncproject/utils.h @@ -0,0 +1,8 @@ +#ifndef __UTILS__ +#define __UTILS__ + + +char* +hrs_min_sec_format(unsigned int seconds); + +#endif /* __UTILS__ */ \ No newline at end of file diff --git a/include/event_loop.h b/include/event_loop.h index de6e601..41c3b58 100644 --- a/include/event_loop.h +++ b/include/event_loop.h @@ -3,17 +3,33 @@ #include +#define MAX_PRIORITY_LEVEL 4 typedef struct task_ task_t; typedef struct event_loop_ event_loop_t; -typedef void (*event_cbk)(void*); + +typedef enum EL_RES_ { + EL_CONTINUE, + EL_FINISH +} EL_RES_T; + +typedef enum TASK_PRIORITY { + TASK_PRIORITY_HIGH, + TASK_PRIORITY_MEDIUM, + TASK_PRIORITY_LOW, + TASK_PRIORITY_MAX +}TASK_PRIORITY_T; + +typedef EL_RES_T (*event_cbk)(void*); + struct task_ { event_cbk cbk; void* arg; struct task_* left, * right; + TASK_PRIORITY_T priority; }; typedef enum { @@ -25,7 +41,7 @@ typedef enum { struct event_loop_ { /* head to the start of the task array */ - struct task_* task_array_head; + struct task_* task_array_head[MAX_PRIORITY_LEVEL]; /* Mutex to enforce Mutual exclusion enqueue/Deque * Operation in task array. Also used to update event loop * attributes in mutual exclusive way @@ -49,6 +65,11 @@ void event_loop_run(event_loop_t* el); task_t* -task_create_new_job(event_loop_t* el, event_cbk cbk, void* arg); +task_create_new_job(event_loop_t* el, event_cbk cbk, void* arg, TASK_PRIORITY_T priority); + +void +task_cancel_job(event_loop_t* el, task_t* task); + + #endif \ No newline at end of file diff --git a/src/event_loop.c b/src/event_loop.c index 4f16dcf..907fa7a 100644 --- a/src/event_loop.c +++ b/src/event_loop.c @@ -11,16 +11,21 @@ static bool el_debug = true; static task_t* event_loop_get_next_task_to_run(event_loop_t* el) { - task_t* task; - if (!el->task_array_head) return NULL; - task = el->task_array_head; - el->task_array_head = task->right; - if (el->task_array_head) { - el->task_array_head->left = NULL; + task_t* task = NULL; + + for (int i = 0; i < MAX_PRIORITY_LEVEL; i++) { + if (!el->task_array_head[i]) continue; + task = el->task_array_head[i]; + el->task_array_head[i] = task->right; + if (el->task_array_head[i]) { + el->task_array_head[i]->left = NULL; + } + task->left = NULL; + task->right = NULL; + return task; } - task->left = NULL; - task->right = NULL; - return task; + + return NULL; } static void @@ -32,7 +37,7 @@ event_loop_add_task_in_task_array( prev_task = NULL; - task = el->task_array_head; + task = el->task_array_head[new_task->priority]; while (task) { prev_task = task; task = task->right; @@ -42,7 +47,7 @@ event_loop_add_task_in_task_array( new_task->left = prev_task; } else { - el->task_array_head = new_task; + el->task_array_head[new_task->priority] = new_task; } } @@ -57,8 +62,8 @@ task_is_present_in_task_array(task_t* task) { static void event_loop_remove_task_from_task_array(event_loop_t* el, task_t* task) { - if (el->task_array_head == task) { - el->task_array_head = task->right; + if (el->task_array_head[task->priority] == task) { + el->task_array_head[task->priority] = task->right; } if (!task->left) { @@ -84,7 +89,9 @@ event_loop_remove_task_from_task_array(event_loop_t* el, task_t* task) { void event_loop_init(event_loop_t* el) { - el->task_array_head = NULL; + for (int i = 0; i < MAX_PRIORITY_LEVEL; i++) { + el->task_array_head[i] = NULL; + } pthread_mutex_init(&el->ev_loop_mutex, NULL); el->ev_loop_state = EV_LOOP_IDLE; pthread_cond_init(&el->ev_loop_cv, NULL); @@ -111,6 +118,7 @@ event_loop_schedule_task(event_loop_t* el, task_t* task) { static void* event_loop_thread(void* arg) { task_t* task; + EL_RES_T res; event_loop_t* el = (event_loop_t*)arg; while (1) { /* Lock the event Loop Mutex */ @@ -140,21 +148,30 @@ static void* event_loop_thread(void* arg) { /* Fire the task now */ el->current_task = task; - task->cbk(task->arg); + res = task->cbk(task->arg); el->current_task = NULL; + + if (res == EL_CONTINUE) { + /* Re-schedule same task */ + event_loop_schedule_task(el, task); + } + else { + free(task); + } } return NULL; } task_t* -task_create_new_job(event_loop_t* el, event_cbk cbk, void* arg) { +task_create_new_job(event_loop_t* el, event_cbk cbk, void* arg, TASK_PRIORITY_T priority) { task_t* task = (task_t*)calloc(1, sizeof(task_t)); task->arg = arg; task->cbk = cbk; task->left = NULL; task->right = NULL; + task->priority = priority; event_loop_schedule_task(el, task); return task; } @@ -177,3 +194,22 @@ event_loop_run(event_loop_t* el) { } +void +task_cancel_job(event_loop_t* el, task_t* task) { + + /* Dont kill yourself while you are still executing */ + + if (el->current_task == task) { + return; + } + + pthread_mutex_lock(&el->ev_loop_mutex); + + if ( task_is_present_in_task_array(task) ) + event_loop_remove_task_from_task_array(el, task); + + pthread_mutex_unlock(&el->ev_loop_mutex); + + free(task); +} + diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 5419696..0bc60c7 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,11 +1,23 @@ set(EVENT_LOOP_APP evloop_app) +set(EVENT_LOOP_CONCURRENCY_APP evloop_concurrency_app) set(EV_LOOP_APP_SRC "evloop_app.c") +set(EV_LOOP_CONCURRENCY_APP_SRC "evloop_concurreny_app.c") add_executable(${EVENT_LOOP_APP} ${EV_LOOP_APP_SRC}) +add_executable(${EVENT_LOOP_CONCURRENCY_APP} + ${EV_LOOP_CONCURRENCY_APP_SRC}) + target_link_libraries(${EVENT_LOOP_APP} PUBLIC ${EVENT_LOOP_LIB}) +target_link_libraries(${EVENT_LOOP_CONCURRENCY_APP} PUBLIC + ${EVENT_LOOP_LIB}) + target_link_directories(${EVENT_LOOP_APP} PUBLIC + ${HEADER_DIR}) + + +target_link_directories(${EVENT_LOOP_CONCURRENCY_APP} PUBLIC ${HEADER_DIR}) \ No newline at end of file diff --git a/test/evloop_app.c b/test/evloop_app.c index 68d3753..aa97805 100644 --- a/test/evloop_app.c +++ b/test/evloop_app.c @@ -33,18 +33,21 @@ typedef struct arg_obj_ { } arg_obj_t; -void +EL_RES_T sum_wrapper(void* arg) { arg_obj_t* arg_obj = (arg_obj_t*)arg; printf("sum = %d\n", sum(arg_obj->arr, arg_obj->n)); + return EL_FINISH; } -void +EL_RES_T mul_wrapper(void* arg) { arg_obj_t* arg_obj = (arg_obj_t*)arg; printf("mul = %d\n", mul(arg_obj->arr, arg_obj->n)); + + return EL_FINISH; } @@ -58,13 +61,13 @@ main(int argc, char** argv) { arg_obj_t* arg_obj1 = (arg_obj_t*)calloc(1, sizeof(arg_obj_t)); arg_obj1->arr = arr; arg_obj1->n = sizeof(arr) / sizeof(arr[0]); - task_t* task_sum = task_create_new_job(&el, sum_wrapper, (void*)arg_obj1); + task_t* task_sum = task_create_new_job(&el, sum_wrapper, (void*)arg_obj1, TASK_PRIORITY_LOW); arg_obj_t* arg_obj2 = (arg_obj_t*)calloc(1, sizeof(arg_obj_t)); arg_obj2->arr = arr; arg_obj2->n = sizeof(arr) / sizeof(arr[0]); - task_t* task_mul = task_create_new_job(&el, mul_wrapper, (void*)arg_obj2); + task_t* task_mul = task_create_new_job(&el, mul_wrapper, (void*)arg_obj2, TASK_PRIORITY_LOW); printf("End of main\n"); diff --git a/test/evloop_concurreny_app.c b/test/evloop_concurreny_app.c new file mode 100644 index 0000000..8b87cce --- /dev/null +++ b/test/evloop_concurreny_app.c @@ -0,0 +1,59 @@ +#include +#include +#include +#include +#include "event_loop.h" + +event_loop_t el; +static int upload = 0; +static int download = 0; + +static EL_RES_T +upload_fn(void* arg) { + + while (upload < 100) { + + upload += 2; + //sleep(4); + printf("upload percent-age = %d\n", upload); + if (upload % 10 == 0 && upload != 100) { + + return EL_CONTINUE; + /* task_create_new_job(&el, upload_fn, NULL); + return ;*/ + } + } + return EL_FINISH; +} + +static EL_RES_T +download_fn(void* arg) { + + while (download < 100) { + + download += 2; + printf("download percent-age = %d\n", download); + if (download % 10 == 0 && download != 100) { + + return EL_CONTINUE; + /*task_create_new_job(&el, download_fn, NULL); + return;*/ + } + } + return EL_FINISH; +} + +int +main(int argc, char** argv) { + + event_loop_init(&el); + event_loop_run(&el); + sleep(1); + + task_create_new_job(&el, upload_fn, NULL, TASK_PRIORITY_LOW); + task_create_new_job(&el, download_fn, NULL, TASK_PRIORITY_HIGH); + + printf("End of main\n"); + scanf("\n"); + return 0; +} \ No newline at end of file