Loading src/lib/bus/listener_io.c +14 −28 Original line number Diff line number Diff line Loading @@ -28,8 +28,6 @@ #include "syscall.h" #include "util.h" #define YELP(...) //fprintf(stderr, __VA_ARGS__) void ListenerIO_AttemptRecv(listener *l, int available) { /* --> failure --> set 'closed' error on socket, don't die */ struct bus *b = l->bus; Loading @@ -46,19 +44,7 @@ void ListenerIO_AttemptRecv(listener *l, int available) { * (POLLHUP | POLLERR | POLLNVAL), so if we get a status message * with a reason for a hangup we can still pass it along. */ YELP("checking l->fds[%d]: revents 0x%04x\n", i + 1, l->fds[i + 1].revents); if (fd->revents & (POLLERR | POLLNVAL)) { read_from++; BUS_LOG(b, 2, LOG_LISTENER, "pollfd: socket error (POLLERR | POLLNVAL)", b->udata); set_error_for_socket(l, i, ci->fd, RX_ERROR_POLLERR); } else if (fd->revents & POLLHUP) { read_from++; BUS_LOG(b, 3, LOG_LISTENER, "pollfd: socket error POLLHUP", b->udata); set_error_for_socket(l, i, ci->fd, RX_ERROR_POLLHUP); } else if (fd->revents & POLLIN) { if (fd->revents & POLLIN) { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, "reading %zd bytes from socket (buf is %zd)", ci->to_read_size, l->read_buf_size); Loading @@ -76,13 +62,24 @@ void ListenerIO_AttemptRecv(listener *l, int available) { BUS_ASSERT(b, b->udata, false); } } if (fd->revents & (POLLERR | POLLNVAL)) { read_from++; BUS_LOG(b, 2, LOG_LISTENER, "pollfd: socket error (POLLERR | POLLNVAL)", b->udata); set_error_for_socket(l, i, ci->fd, RX_ERROR_POLLERR); } else if (fd->revents & POLLHUP) { read_from++; BUS_LOG(b, 3, LOG_LISTENER, "pollfd: socket error POLLHUP", b->udata); set_error_for_socket(l, i, ci->fd, RX_ERROR_POLLHUP); } } } static bool socket_read_plain(struct bus *b, listener *l, int pfd_i, connection_info *ci) { for (;;) { ssize_t size = syscall_read(ci->fd, l->read_buf, ci->to_read_size); YELP("read %zd on fd %d, to_read_size %zd\n", size, ci->fd, ci->to_read_size); if (size == -1) { if (util_is_resumable_io_error(errno)) { errno = 0; Loading Loading @@ -192,11 +189,9 @@ static bool sink_socket_read(struct bus *b, bus_unlock_log(b); #endif YELP("sinking read, %zd bytes\n", size); bus_sink_cb_res_t sres = b->sink_cb(l->read_buf, size, ci->udata); if (sres.full_msg_buffer) { BUS_LOG(b, 3, LOG_LISTENER, "calling unpack CB", b->udata); YELP("calling unpack_cb\n"); bus_unpack_cb_res_t ures = b->unpack_cb(sres.full_msg_buffer, ci->udata); BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, "process_unpacked_message: ok? %d, seq_id:%lld", Loading @@ -210,7 +205,6 @@ static bool sink_socket_read(struct bus *b, "expecting next read to have %zd bytes", ci->to_read_size); /* Grow read buffer if necessary. */ YELP("grow buffer? %zd => %zd\n", ci->to_read_size, l->read_buf_size); if (ci->to_read_size > l->read_buf_size) { if (!ListenerTask_GrowReadBuf(l, ci->to_read_size)) { BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, Loading @@ -227,7 +221,6 @@ static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err) { struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, "set_error_for_socket %d, err %d", fd, err); YELP("set_error_for_socket: fd %d, error %d\n", fd, err); for (int i = 0; i <= l->rx_info_max_used; i++) { rx_info_t *info = &l->rx_info[i]; Loading Loading @@ -264,20 +257,13 @@ static void process_unpacked_message(listener *l, /* NOTE: message may be an unsolicited status message */ YELP("processing unpacked message. ok? %d\n", result.ok); if (result.ok) { int64_t seq_id = result.u.success.seq_id; void *opaque_msg = result.u.success.msg; YELP("msg: seq_id %lld, opaque_msg %p\n", seq_id, opaque_msg); rx_info_t *info = listener_helper_find_info_by_sequence_id(l, ci->fd, seq_id); YELP("got: %p (sz %zd)\n", info, sizeof(info)); YELP("listener_helper_find_info_by_sequence_id <fd:%d, seq_id:%lld> => %p\n", ci->fd, seq_id, info); if (info) { YELP("info->state %d\n", info->state); switch (info->state) { case RIS_HOLD: Loading test/unit/test_bus_listener_io.c +0 −7 Original line number Diff line number Diff line Loading @@ -170,7 +170,6 @@ static bus_sink_cb_res_t sink_cb(uint8_t *read_buf, result = the_result; } size_t next_read = pi->to_read - pi->read; fprintf(stderr, "next read: %zd\n", next_read); bus_sink_cb_res_t res = { .next_read = next_read, .full_msg_buffer = result, Loading Loading @@ -290,8 +289,6 @@ void test_ListenerIO_AttemptRecv_should_handle_successful_socket_read_and_unpack } void test_ListenerIO_AttemptRecv_should_handle_successful_socket_read_and_unpack_message_in_multiple_pieces(void) { fprintf(stderr, "\n\ntest_ListenerIO_AttemptRecv_should_handle_successful_socket_read_and_unpack_message_in_multiple_pieces\n"); l->fds[0 + INCOMING_MSG_PIPE].fd = 5; l->fds[0 + INCOMING_MSG_PIPE].events = POLLIN; l->fds[0 + INCOMING_MSG_PIPE].revents = POLLIN; Loading Loading @@ -461,9 +458,6 @@ void test_ListenerIO_AttemptRecv_should_handle_successful_socket_read_and_unpack } void test_ListenerIO_AttemptRecv_should_handle_successful_socket_read_and_unpack_message_in_multiple_pieces_over_SSL(void) { TEST_IGNORE_MESSAGE("work in progress"); fprintf(stderr, "\n\ntest_ListenerIO_AttemptRecv_should_handle_successful_socket_read_and_unpack_message_in_multiple_pieces\n"); l->fds[0 + INCOMING_MSG_PIPE].fd = 5; l->fds[0 + INCOMING_MSG_PIPE].events = POLLIN; l->fds[0 + INCOMING_MSG_PIPE].revents = POLLIN; Loading Loading @@ -491,7 +485,6 @@ void test_ListenerIO_AttemptRecv_should_handle_successful_socket_read_and_unpack info->u.expect.box = box; syscall_SSL_read_ExpectAndReturn(ci.ssl, l->read_buf, ci.to_read_size, ci.to_read_size - 1); ListenerIO_AttemptRecv(l, 1); syscall_SSL_read_ExpectAndReturn(ci.ssl, l->read_buf, 1, 1); Loading Loading
src/lib/bus/listener_io.c +14 −28 Original line number Diff line number Diff line Loading @@ -28,8 +28,6 @@ #include "syscall.h" #include "util.h" #define YELP(...) //fprintf(stderr, __VA_ARGS__) void ListenerIO_AttemptRecv(listener *l, int available) { /* --> failure --> set 'closed' error on socket, don't die */ struct bus *b = l->bus; Loading @@ -46,19 +44,7 @@ void ListenerIO_AttemptRecv(listener *l, int available) { * (POLLHUP | POLLERR | POLLNVAL), so if we get a status message * with a reason for a hangup we can still pass it along. */ YELP("checking l->fds[%d]: revents 0x%04x\n", i + 1, l->fds[i + 1].revents); if (fd->revents & (POLLERR | POLLNVAL)) { read_from++; BUS_LOG(b, 2, LOG_LISTENER, "pollfd: socket error (POLLERR | POLLNVAL)", b->udata); set_error_for_socket(l, i, ci->fd, RX_ERROR_POLLERR); } else if (fd->revents & POLLHUP) { read_from++; BUS_LOG(b, 3, LOG_LISTENER, "pollfd: socket error POLLHUP", b->udata); set_error_for_socket(l, i, ci->fd, RX_ERROR_POLLHUP); } else if (fd->revents & POLLIN) { if (fd->revents & POLLIN) { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, "reading %zd bytes from socket (buf is %zd)", ci->to_read_size, l->read_buf_size); Loading @@ -76,13 +62,24 @@ void ListenerIO_AttemptRecv(listener *l, int available) { BUS_ASSERT(b, b->udata, false); } } if (fd->revents & (POLLERR | POLLNVAL)) { read_from++; BUS_LOG(b, 2, LOG_LISTENER, "pollfd: socket error (POLLERR | POLLNVAL)", b->udata); set_error_for_socket(l, i, ci->fd, RX_ERROR_POLLERR); } else if (fd->revents & POLLHUP) { read_from++; BUS_LOG(b, 3, LOG_LISTENER, "pollfd: socket error POLLHUP", b->udata); set_error_for_socket(l, i, ci->fd, RX_ERROR_POLLHUP); } } } static bool socket_read_plain(struct bus *b, listener *l, int pfd_i, connection_info *ci) { for (;;) { ssize_t size = syscall_read(ci->fd, l->read_buf, ci->to_read_size); YELP("read %zd on fd %d, to_read_size %zd\n", size, ci->fd, ci->to_read_size); if (size == -1) { if (util_is_resumable_io_error(errno)) { errno = 0; Loading Loading @@ -192,11 +189,9 @@ static bool sink_socket_read(struct bus *b, bus_unlock_log(b); #endif YELP("sinking read, %zd bytes\n", size); bus_sink_cb_res_t sres = b->sink_cb(l->read_buf, size, ci->udata); if (sres.full_msg_buffer) { BUS_LOG(b, 3, LOG_LISTENER, "calling unpack CB", b->udata); YELP("calling unpack_cb\n"); bus_unpack_cb_res_t ures = b->unpack_cb(sres.full_msg_buffer, ci->udata); BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, "process_unpacked_message: ok? %d, seq_id:%lld", Loading @@ -210,7 +205,6 @@ static bool sink_socket_read(struct bus *b, "expecting next read to have %zd bytes", ci->to_read_size); /* Grow read buffer if necessary. */ YELP("grow buffer? %zd => %zd\n", ci->to_read_size, l->read_buf_size); if (ci->to_read_size > l->read_buf_size) { if (!ListenerTask_GrowReadBuf(l, ci->to_read_size)) { BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, Loading @@ -227,7 +221,6 @@ static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err) { struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, "set_error_for_socket %d, err %d", fd, err); YELP("set_error_for_socket: fd %d, error %d\n", fd, err); for (int i = 0; i <= l->rx_info_max_used; i++) { rx_info_t *info = &l->rx_info[i]; Loading Loading @@ -264,20 +257,13 @@ static void process_unpacked_message(listener *l, /* NOTE: message may be an unsolicited status message */ YELP("processing unpacked message. ok? %d\n", result.ok); if (result.ok) { int64_t seq_id = result.u.success.seq_id; void *opaque_msg = result.u.success.msg; YELP("msg: seq_id %lld, opaque_msg %p\n", seq_id, opaque_msg); rx_info_t *info = listener_helper_find_info_by_sequence_id(l, ci->fd, seq_id); YELP("got: %p (sz %zd)\n", info, sizeof(info)); YELP("listener_helper_find_info_by_sequence_id <fd:%d, seq_id:%lld> => %p\n", ci->fd, seq_id, info); if (info) { YELP("info->state %d\n", info->state); switch (info->state) { case RIS_HOLD: Loading
test/unit/test_bus_listener_io.c +0 −7 Original line number Diff line number Diff line Loading @@ -170,7 +170,6 @@ static bus_sink_cb_res_t sink_cb(uint8_t *read_buf, result = the_result; } size_t next_read = pi->to_read - pi->read; fprintf(stderr, "next read: %zd\n", next_read); bus_sink_cb_res_t res = { .next_read = next_read, .full_msg_buffer = result, Loading Loading @@ -290,8 +289,6 @@ void test_ListenerIO_AttemptRecv_should_handle_successful_socket_read_and_unpack } void test_ListenerIO_AttemptRecv_should_handle_successful_socket_read_and_unpack_message_in_multiple_pieces(void) { fprintf(stderr, "\n\ntest_ListenerIO_AttemptRecv_should_handle_successful_socket_read_and_unpack_message_in_multiple_pieces\n"); l->fds[0 + INCOMING_MSG_PIPE].fd = 5; l->fds[0 + INCOMING_MSG_PIPE].events = POLLIN; l->fds[0 + INCOMING_MSG_PIPE].revents = POLLIN; Loading Loading @@ -461,9 +458,6 @@ void test_ListenerIO_AttemptRecv_should_handle_successful_socket_read_and_unpack } void test_ListenerIO_AttemptRecv_should_handle_successful_socket_read_and_unpack_message_in_multiple_pieces_over_SSL(void) { TEST_IGNORE_MESSAGE("work in progress"); fprintf(stderr, "\n\ntest_ListenerIO_AttemptRecv_should_handle_successful_socket_read_and_unpack_message_in_multiple_pieces\n"); l->fds[0 + INCOMING_MSG_PIPE].fd = 5; l->fds[0 + INCOMING_MSG_PIPE].events = POLLIN; l->fds[0 + INCOMING_MSG_PIPE].revents = POLLIN; Loading Loading @@ -491,7 +485,6 @@ void test_ListenerIO_AttemptRecv_should_handle_successful_socket_read_and_unpack info->u.expect.box = box; syscall_SSL_read_ExpectAndReturn(ci.ssl, l->read_buf, ci.to_read_size, ci.to_read_size - 1); ListenerIO_AttemptRecv(l, 1); syscall_SSL_read_ExpectAndReturn(ci.ssl, l->read_buf, 1, 1); Loading