Loading src/lib/kinetic_socket.c +40 −24 Original line number Diff line number Diff line Loading @@ -103,30 +103,17 @@ int KineticSocket_Connect(char* host, int port, bool nonBlocking) for (ai = ai_result; ai != NULL; ai = ai->ai_next) { // OSX won't let us set close-on-exec when creating the socket, // so set it separately int current_fd_flags = fcntl(result.fd, F_GETFD); if (current_fd_flags == -1) { LOG("Failed to get socket fd flags"); close(result.fd); continue; } if (fcntl(result.fd, F_SETFD, current_fd_flags | FD_CLOEXEC) == -1) { LOG("Failed to set socket close-on-exit"); close(result.fd); continue; } int setsockopt_result; int enable = 1; int buffer_size = PDU_VALUE_MAX_LEN; #if defined(SO_NOSIGPIPE) && !defined(__APPLE__) // On BSD-like systems we can set SO_NOSIGPIPE on the socket to // prevent it from sending a PIPE signal and bringing down the whole // application if the server closes the socket forcibly { int set = 1; int setsockopt_result = setsockopt(result.fd, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof(set)); setsockopt_result = setsockopt(result.fd, SOL_SOCKET, SO_NOSIGPIPE, &enable, sizeof(enable)); // Allow ENOTSOCK because it allows tests to use pipes instead of // real sockets if (setsockopt_result != 0 && setsockopt_result != ENOTSOCK) Loading @@ -135,9 +122,32 @@ int KineticSocket_Connect(char* host, int port, bool nonBlocking) close(result.fd); continue; } } #endif // Increase send buffer to PDU_VALUE_MAX_LEN // Note: OS allocates 2x this value for its overhead setsockopt_result = setsockopt(result.fd, SOL_SOCKET, SO_SNDBUF, &buffer_size, sizeof(buffer_size)); if (setsockopt_result == -1) { LOG("Error setting socket send buffer size"); close(result.fd); continue; } // Increase receive buffer to PDU_VALUE_MAX_LEN // Note: OS allocates 2x this value for its overheadbuffer_size setsockopt_result = setsockopt(result.fd, SOL_SOCKET, SO_RCVBUF, &buffer_size, sizeof(buffer_size)); if (setsockopt_result == -1) { LOG("Error setting socket receive buffer size"); close(result.fd); continue; } break; } Loading Loading @@ -218,7 +228,13 @@ bool KineticSocket_Read(int socketDescriptor, // The socket is ready for reading status = read(socketDescriptor, &buffer.data[count], buffer.len - count); if (status == -1 && errno == EINTR) // Retry if no data yet... if (status == -1 && ( (errno == EINTR) || (errno == EAGAIN) || (errno == EWOULDBLOCK) ) ) { continue; } Loading Loading
src/lib/kinetic_socket.c +40 −24 Original line number Diff line number Diff line Loading @@ -103,30 +103,17 @@ int KineticSocket_Connect(char* host, int port, bool nonBlocking) for (ai = ai_result; ai != NULL; ai = ai->ai_next) { // OSX won't let us set close-on-exec when creating the socket, // so set it separately int current_fd_flags = fcntl(result.fd, F_GETFD); if (current_fd_flags == -1) { LOG("Failed to get socket fd flags"); close(result.fd); continue; } if (fcntl(result.fd, F_SETFD, current_fd_flags | FD_CLOEXEC) == -1) { LOG("Failed to set socket close-on-exit"); close(result.fd); continue; } int setsockopt_result; int enable = 1; int buffer_size = PDU_VALUE_MAX_LEN; #if defined(SO_NOSIGPIPE) && !defined(__APPLE__) // On BSD-like systems we can set SO_NOSIGPIPE on the socket to // prevent it from sending a PIPE signal and bringing down the whole // application if the server closes the socket forcibly { int set = 1; int setsockopt_result = setsockopt(result.fd, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof(set)); setsockopt_result = setsockopt(result.fd, SOL_SOCKET, SO_NOSIGPIPE, &enable, sizeof(enable)); // Allow ENOTSOCK because it allows tests to use pipes instead of // real sockets if (setsockopt_result != 0 && setsockopt_result != ENOTSOCK) Loading @@ -135,9 +122,32 @@ int KineticSocket_Connect(char* host, int port, bool nonBlocking) close(result.fd); continue; } } #endif // Increase send buffer to PDU_VALUE_MAX_LEN // Note: OS allocates 2x this value for its overhead setsockopt_result = setsockopt(result.fd, SOL_SOCKET, SO_SNDBUF, &buffer_size, sizeof(buffer_size)); if (setsockopt_result == -1) { LOG("Error setting socket send buffer size"); close(result.fd); continue; } // Increase receive buffer to PDU_VALUE_MAX_LEN // Note: OS allocates 2x this value for its overheadbuffer_size setsockopt_result = setsockopt(result.fd, SOL_SOCKET, SO_RCVBUF, &buffer_size, sizeof(buffer_size)); if (setsockopt_result == -1) { LOG("Error setting socket receive buffer size"); close(result.fd); continue; } break; } Loading Loading @@ -218,7 +228,13 @@ bool KineticSocket_Read(int socketDescriptor, // The socket is ready for reading status = read(socketDescriptor, &buffer.data[count], buffer.len - count); if (status == -1 && errno == EINTR) // Retry if no data yet... if (status == -1 && ( (errno == EINTR) || (errno == EAGAIN) || (errno == EWOULDBLOCK) ) ) { continue; } Loading