Loading src/lib/bus/Makefile +1 −1 Original line number Diff line number Diff line Loading @@ -5,7 +5,7 @@ LIB_PATH= ../../../obj OPT= -O3 LIB_INC = -I${SOCKET99_PATH} -I${THREADPOOL_PATH} -I${OPENSSL_PATH}/include CFLAGS += -std=c99 ${OPT} -Wall -g ${LIB_INC} LDFLAGS += -L. -lsocket99 -L${LIB_PATH} -L${THREADPOOL_PATH} -L${OPENSSL_PATH}/lib -lssl -lcrypto LDFLAGS += -L. -L${LIB_PATH} -L${THREADPOOL_PATH} -L${OPENSSL_PATH}/lib -L${SOCKET99_PATH} -lsocket99 -lssl -lcrypto BUS_OBJS = \ bus.o \ Loading src/lib/bus/bus_example.c +42 −10 Original line number Diff line number Diff line Loading @@ -87,7 +87,7 @@ static time_t get_cur_second(void); static void log_cb(log_event_t event, int log_level, const char *msg, void *udata) { example_state *s = (example_state *)udata; const char *event_str = bus_log_event_str(event); fprintf(stderr, "%ld -- %s[%d] -- %s\n", fprintf(/*stderr*/stdout, "%ld -- %s[%d] -- %s\n", s->last_second, event_str, log_level, msg); } Loading @@ -103,6 +103,7 @@ static bus_sink_cb_res_t reset_transfer(socket_info *si) { }; si->state = STATE_AWAITING_HEADER; si->used = 0; return res; } Loading @@ -121,9 +122,38 @@ static bus_sink_cb_res_t sink_cb(uint8_t *read_buf, case STATE_AWAITING_HEADER: { bool valid_header = true; bool split_header = false; size_t header_rem = sizeof(prot_header_t) - si->used; if (read_size > header_rem) { printf("surplus read_size %zd\n", read_size); printf("header_rem %zd (sizeof(prot_header_t) %zd)\n", header_rem, sizeof(prot_header_t)); assert(false); } else if (read_size < sizeof(prot_header_t)) { //printf("split header, %zd\n", read_size); split_header = true; } size_t copied = read_size; if (copied > header_rem) { copied = header_rem; } memcpy(&si->buf[si->used], read_buf, copied); si->used += copied; if (si->used < sizeof(prot_header_t)) { bus_sink_cb_res_t res = { .next_read = sizeof(prot_header_t) - si->used, }; si->state = STATE_AWAITING_HEADER; return res; } assert(si->used == sizeof(prot_header_t)); prot_header_t *header = (prot_header_t *)&si->buf[0]; prot_header_t *header = (prot_header_t *)read_buf; if (read_size != sizeof(prot_header_t)) { if (si->used < sizeof(prot_header_t)) { printf("INVALID HEADER A: read_size %zd\n", si->used); valid_header = false; } else if (header->magic_number != MAGIC_NUMBER) { printf("INVALID HEADER B: magic number 0x%08x\n", header->magic_number); Loading @@ -131,18 +161,17 @@ static bus_sink_cb_res_t sink_cb(uint8_t *read_buf, } if (valid_header) { uint8_t *buf = si->buf; prot_header_t *header = (prot_header_t *)read_buf; prot_header_t *header = (prot_header_t *)&si->buf[0]; si->cur_payload_size = header->size; memcpy(buf, header, sizeof(*header)); si->used = sizeof(*header); memcpy(&si->buf[si->used], read_buf + copied, read_size - copied); si->used += read_size - copied; bus_sink_cb_res_t res = { .next_read = header->size, }; si->state = STATE_AWAITING_BODY; return res; } else { assert(false); return reset_transfer(si); } Loading @@ -153,6 +182,7 @@ static bus_sink_cb_res_t sink_cb(uint8_t *read_buf, assert(DEFAULT_BUF_SIZE - si->used >= read_size); memcpy(&si->buf[si->used], read_buf, read_size); si->used += read_size; assert(si->used <= si->cur_payload_size + sizeof(prot_header_t)); size_t rem = si->cur_payload_size + sizeof(prot_header_t) - si->used; if (rem == 0) { Loading Loading @@ -202,6 +232,8 @@ static void unexpected_msg_cb(void *msg, int64_t seq_id, void *bus_udata, void *socket_udata) { printf("\n\n\nUNEXPECTED MESSAGE: %p, seq_id %lld, bus_udata %p, socket_udata %p\n\n\n\n", msg, seq_id, bus_udata, socket_udata); assert(false); } int main(int argc, char **argv) { Loading Loading @@ -327,7 +359,7 @@ static void open_sockets(example_state *s) { socket99_result res; if (!socket99_open(&cfg, &res)) { // socket99_fprintf(stderr, &res); socket99_fprintf(stderr, &res); exit(1); } Loading Loading @@ -446,7 +478,7 @@ static void run_bus(example_state *s, struct bus *b) { if (should_send) { should_send = false; size_t msg_size = construct_msg(msg_buf, buf_size, 10 * payload_size /* * 1024L*/, seq_id); 100 * /*payload_size * */ 1024L, seq_id); LOG(3, " @@ sending message with %zd bytes\n", msg_size); bus_user_msg msg = { .fd = s->sockets[cur_socket_i], Loading src/lib/bus/casq.c +1 −0 Original line number Diff line number Diff line Loading @@ -76,6 +76,7 @@ static void reverse(struct casq *q) { casq_link *to_reverse = NULL; for (;;) { /* spin, unlink */ to_reverse = q->accum; if (q->accum == NULL) { return; } if (ATOMIC_BOOL_COMPARE_AND_SWAP(&q->accum, to_reverse, NULL)) { break; } Loading src/lib/bus/echosrv.c +4 −1 Original line number Diff line number Diff line Loading @@ -119,6 +119,7 @@ static void parse_args(int argc, char **argv, config *cfg) { if (cfg->port_low == 0) { cfg->port_low = cfg->port_high; } if (cfg->port_high == 0) { cfg->port_high = cfg->port_low; } if (cfg->port_high < cfg->port_low || cfg->port_low == 0) { usage(); } if (cfg->verbosity > 0) { printf("verbosity: %d\n", cfg->verbosity); } } int main(int argc, char **argv) { Loading Loading @@ -178,6 +179,7 @@ static void open_ports(config *cfg) { } else { cfg->accept_fds[i].fd = res.fd; cfg->accept_fds[i].events = (POLLIN); LOG(2, " -- Accepting on %s:%d\n", scfg.host, scfg.port); } } } Loading Loading @@ -336,6 +338,7 @@ static void handle_client_io(config *cfg, int available) { LOG(4, "fd[%d]->events 0x%08x ==> revents: 0x%08x\n", i, fd->events, fd->revents); if ((fd->revents & POLLERR) || (fd->revents & POLLHUP)) { LOG(3, "Disconnecting client %d\n", fd->fd); disconnect_client(cfg, fd->fd); } else if (fd->revents & POLLOUT) { checked++; Loading Loading @@ -380,7 +383,7 @@ static void handle_client_io(config *cfg, int available) { cfg->last_second, rres); enqueue_write(cfg, buf->fd, read_buf, rres); } else { LOG(2, "else, rres %zd\n", rres); } } } Loading src/lib/bus/listener.c +466 −152 File changed.Preview size limit exceeded, changes collapsed. Show changes Loading
src/lib/bus/Makefile +1 −1 Original line number Diff line number Diff line Loading @@ -5,7 +5,7 @@ LIB_PATH= ../../../obj OPT= -O3 LIB_INC = -I${SOCKET99_PATH} -I${THREADPOOL_PATH} -I${OPENSSL_PATH}/include CFLAGS += -std=c99 ${OPT} -Wall -g ${LIB_INC} LDFLAGS += -L. -lsocket99 -L${LIB_PATH} -L${THREADPOOL_PATH} -L${OPENSSL_PATH}/lib -lssl -lcrypto LDFLAGS += -L. -L${LIB_PATH} -L${THREADPOOL_PATH} -L${OPENSSL_PATH}/lib -L${SOCKET99_PATH} -lsocket99 -lssl -lcrypto BUS_OBJS = \ bus.o \ Loading
src/lib/bus/bus_example.c +42 −10 Original line number Diff line number Diff line Loading @@ -87,7 +87,7 @@ static time_t get_cur_second(void); static void log_cb(log_event_t event, int log_level, const char *msg, void *udata) { example_state *s = (example_state *)udata; const char *event_str = bus_log_event_str(event); fprintf(stderr, "%ld -- %s[%d] -- %s\n", fprintf(/*stderr*/stdout, "%ld -- %s[%d] -- %s\n", s->last_second, event_str, log_level, msg); } Loading @@ -103,6 +103,7 @@ static bus_sink_cb_res_t reset_transfer(socket_info *si) { }; si->state = STATE_AWAITING_HEADER; si->used = 0; return res; } Loading @@ -121,9 +122,38 @@ static bus_sink_cb_res_t sink_cb(uint8_t *read_buf, case STATE_AWAITING_HEADER: { bool valid_header = true; bool split_header = false; size_t header_rem = sizeof(prot_header_t) - si->used; if (read_size > header_rem) { printf("surplus read_size %zd\n", read_size); printf("header_rem %zd (sizeof(prot_header_t) %zd)\n", header_rem, sizeof(prot_header_t)); assert(false); } else if (read_size < sizeof(prot_header_t)) { //printf("split header, %zd\n", read_size); split_header = true; } size_t copied = read_size; if (copied > header_rem) { copied = header_rem; } memcpy(&si->buf[si->used], read_buf, copied); si->used += copied; if (si->used < sizeof(prot_header_t)) { bus_sink_cb_res_t res = { .next_read = sizeof(prot_header_t) - si->used, }; si->state = STATE_AWAITING_HEADER; return res; } assert(si->used == sizeof(prot_header_t)); prot_header_t *header = (prot_header_t *)&si->buf[0]; prot_header_t *header = (prot_header_t *)read_buf; if (read_size != sizeof(prot_header_t)) { if (si->used < sizeof(prot_header_t)) { printf("INVALID HEADER A: read_size %zd\n", si->used); valid_header = false; } else if (header->magic_number != MAGIC_NUMBER) { printf("INVALID HEADER B: magic number 0x%08x\n", header->magic_number); Loading @@ -131,18 +161,17 @@ static bus_sink_cb_res_t sink_cb(uint8_t *read_buf, } if (valid_header) { uint8_t *buf = si->buf; prot_header_t *header = (prot_header_t *)read_buf; prot_header_t *header = (prot_header_t *)&si->buf[0]; si->cur_payload_size = header->size; memcpy(buf, header, sizeof(*header)); si->used = sizeof(*header); memcpy(&si->buf[si->used], read_buf + copied, read_size - copied); si->used += read_size - copied; bus_sink_cb_res_t res = { .next_read = header->size, }; si->state = STATE_AWAITING_BODY; return res; } else { assert(false); return reset_transfer(si); } Loading @@ -153,6 +182,7 @@ static bus_sink_cb_res_t sink_cb(uint8_t *read_buf, assert(DEFAULT_BUF_SIZE - si->used >= read_size); memcpy(&si->buf[si->used], read_buf, read_size); si->used += read_size; assert(si->used <= si->cur_payload_size + sizeof(prot_header_t)); size_t rem = si->cur_payload_size + sizeof(prot_header_t) - si->used; if (rem == 0) { Loading Loading @@ -202,6 +232,8 @@ static void unexpected_msg_cb(void *msg, int64_t seq_id, void *bus_udata, void *socket_udata) { printf("\n\n\nUNEXPECTED MESSAGE: %p, seq_id %lld, bus_udata %p, socket_udata %p\n\n\n\n", msg, seq_id, bus_udata, socket_udata); assert(false); } int main(int argc, char **argv) { Loading Loading @@ -327,7 +359,7 @@ static void open_sockets(example_state *s) { socket99_result res; if (!socket99_open(&cfg, &res)) { // socket99_fprintf(stderr, &res); socket99_fprintf(stderr, &res); exit(1); } Loading Loading @@ -446,7 +478,7 @@ static void run_bus(example_state *s, struct bus *b) { if (should_send) { should_send = false; size_t msg_size = construct_msg(msg_buf, buf_size, 10 * payload_size /* * 1024L*/, seq_id); 100 * /*payload_size * */ 1024L, seq_id); LOG(3, " @@ sending message with %zd bytes\n", msg_size); bus_user_msg msg = { .fd = s->sockets[cur_socket_i], Loading
src/lib/bus/casq.c +1 −0 Original line number Diff line number Diff line Loading @@ -76,6 +76,7 @@ static void reverse(struct casq *q) { casq_link *to_reverse = NULL; for (;;) { /* spin, unlink */ to_reverse = q->accum; if (q->accum == NULL) { return; } if (ATOMIC_BOOL_COMPARE_AND_SWAP(&q->accum, to_reverse, NULL)) { break; } Loading
src/lib/bus/echosrv.c +4 −1 Original line number Diff line number Diff line Loading @@ -119,6 +119,7 @@ static void parse_args(int argc, char **argv, config *cfg) { if (cfg->port_low == 0) { cfg->port_low = cfg->port_high; } if (cfg->port_high == 0) { cfg->port_high = cfg->port_low; } if (cfg->port_high < cfg->port_low || cfg->port_low == 0) { usage(); } if (cfg->verbosity > 0) { printf("verbosity: %d\n", cfg->verbosity); } } int main(int argc, char **argv) { Loading Loading @@ -178,6 +179,7 @@ static void open_ports(config *cfg) { } else { cfg->accept_fds[i].fd = res.fd; cfg->accept_fds[i].events = (POLLIN); LOG(2, " -- Accepting on %s:%d\n", scfg.host, scfg.port); } } } Loading Loading @@ -336,6 +338,7 @@ static void handle_client_io(config *cfg, int available) { LOG(4, "fd[%d]->events 0x%08x ==> revents: 0x%08x\n", i, fd->events, fd->revents); if ((fd->revents & POLLERR) || (fd->revents & POLLHUP)) { LOG(3, "Disconnecting client %d\n", fd->fd); disconnect_client(cfg, fd->fd); } else if (fd->revents & POLLOUT) { checked++; Loading Loading @@ -380,7 +383,7 @@ static void handle_client_io(config *cfg, int available) { cfg->last_second, rres); enqueue_write(cfg, buf->fd, read_buf, rres); } else { LOG(2, "else, rres %zd\n", rres); } } } Loading
src/lib/bus/listener.c +466 −152 File changed.Preview size limit exceeded, changes collapsed. Show changes