Loading src/main/socket_wrapper.cc +93 −58 Original line number Diff line number Diff line Loading @@ -24,15 +24,21 @@ #include <netinet/in.h> #include <arpa/inet.h> #include <netdb.h> #include <unistd.h> #include <string> #include "glog/logging.h" #include "socket_wrapper.h" using kinetic::SocketWrapper; namespace kinetic { using std::string; SocketWrapper::SocketWrapper(const std::string& host, int port, bool nonblocking) : host_(host), port_(port), nonblocking_(nonblocking), fd_(-1) {} : host_(host), port_(port), nonblocking_(nonblocking), fd_(-1) { } SocketWrapper::~SocketWrapper() { if (fd_ == -1) { Loading @@ -46,42 +52,58 @@ SocketWrapper::SocketWrapper(const std::string &host, int port, bool nonblocking } bool SocketWrapper::Connect() { LOG(INFO) << "Connecting to " << host_ << ":" << port_; LOG(INFO) << "Connecting to " << host_ << ":" << port_; struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); // could be inet or inet6 hints.ai_family = PF_UNSPEC; hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = IPPROTO_TCP; hints.ai_flags = AI_NUMERICSERV; struct addrinfo* result; // Lookup host IP struct hostent *he; struct in_addr **addr_list; string port_str = std::to_string(port_); if (!(he = gethostbyname(host_.c_str()))) { PLOG(ERROR) << "Unable to resolve host " << host_; if (int res = getaddrinfo(host_.c_str(), port_str.c_str(), &hints, &result) != 0) { LOG(ERROR) << "Could not resolve host " << host_ << " port " << port_ << ": " << gai_strerror(res); return false; } addr_list = (struct in_addr**)he->h_addr_list; if (!addr_list[0]) { PLOG(ERROR) << "Unable to resolve host " << host_; return false; struct addrinfo* ai; int socket_fd; for (ai = result; ai != NULL; ai = ai->ai_next) { char host[NI_MAXHOST]; char service[NI_MAXSERV]; if (int res = getnameinfo(ai->ai_addr, ai->ai_addrlen, host, sizeof(host), service, sizeof(service), NI_NUMERICHOST | NI_NUMERICSERV) != 0) { LOG(ERROR) << "Could not get name info: " << gai_strerror(res); continue; } else { LOG(INFO) << "Trying to connect to " << string(host) << " on " << string(service); } // Establish the connection struct sockaddr_in server_name; socket_fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); fd_ = socket(AF_INET, SOCK_STREAM, 0); if (fd_ == -1) { PLOG(ERROR) << "Unable to create socket"; return false; if (socket_fd == -1) { LOG(WARNING) << "Could not create socket"; continue; } // os x won't let us set close-on-exec when creating the socket, so set it separately int current_fd_flags = fcntl(fd_, F_GETFD); int current_fd_flags = fcntl(socket_fd, F_GETFD); if (current_fd_flags == -1) { PLOG(ERROR) << "Failed to get socket fd flags"; return false; close(socket_fd); continue; } if (fcntl(fd_, F_SETFD, current_fd_flags | FD_CLOEXEC) == -1) { if (fcntl(socket_fd, F_SETFD, current_fd_flags | FD_CLOEXEC) == -1) { PLOG(ERROR) << "Failed to set socket close-on-exit"; return false; close(socket_fd); continue; } // On BSD-like systems we can set SO_NOSIGPIPE on the socket to prevent it from sending a Loading @@ -89,31 +111,44 @@ bool SocketWrapper::Connect() { // forcibly #ifdef SO_NOSIGPIPE int set = 1; int setsockopt_result = setsockopt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof(set)); int setsockopt_result = setsockopt(socket_fd, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof(set)); // Allow ENOTSOCK because it allows tests to use pipes instead of real sockets if (setsockopt_result != 0 && setsockopt_result != ENOTSOCK) { PLOG(ERROR) << "Failed to set SO_NOSIGPIPE on socket"; return false; close(socket_fd); continue; } #endif server_name.sin_family = AF_INET; server_name.sin_addr = *addr_list[0]; server_name.sin_port = htons(port_); if (connect(fd_, (struct sockaddr*)&server_name, sizeof(server_name)) == -1) { if (connect(socket_fd, ai->ai_addr, ai->ai_addrlen) == -1) { PLOG(WARNING) << "Unable to connect"; return false; close(socket_fd); continue; } if (nonblocking_ && fcntl(fd_, F_SETFL, O_NONBLOCK) != 0) { if (nonblocking_ && fcntl(socket_fd, F_SETFL, O_NONBLOCK) != 0) { PLOG(ERROR) << "Failed to set socket nonblocking"; close(socket_fd); continue; } break; } freeaddrinfo(result); if (ai == NULL) { // we went through all addresses without finding one we could bind to LOG(ERROR) << "Could not connect to " << host_ << " on port " << port_; return false; } fd_ = socket_fd; return true; } int SocketWrapper::fd() { return fd_; } } // namespace kinetic Loading
src/main/socket_wrapper.cc +93 −58 Original line number Diff line number Diff line Loading @@ -24,15 +24,21 @@ #include <netinet/in.h> #include <arpa/inet.h> #include <netdb.h> #include <unistd.h> #include <string> #include "glog/logging.h" #include "socket_wrapper.h" using kinetic::SocketWrapper; namespace kinetic { using std::string; SocketWrapper::SocketWrapper(const std::string& host, int port, bool nonblocking) : host_(host), port_(port), nonblocking_(nonblocking), fd_(-1) {} : host_(host), port_(port), nonblocking_(nonblocking), fd_(-1) { } SocketWrapper::~SocketWrapper() { if (fd_ == -1) { Loading @@ -46,42 +52,58 @@ SocketWrapper::SocketWrapper(const std::string &host, int port, bool nonblocking } bool SocketWrapper::Connect() { LOG(INFO) << "Connecting to " << host_ << ":" << port_; LOG(INFO) << "Connecting to " << host_ << ":" << port_; struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); // could be inet or inet6 hints.ai_family = PF_UNSPEC; hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = IPPROTO_TCP; hints.ai_flags = AI_NUMERICSERV; struct addrinfo* result; // Lookup host IP struct hostent *he; struct in_addr **addr_list; string port_str = std::to_string(port_); if (!(he = gethostbyname(host_.c_str()))) { PLOG(ERROR) << "Unable to resolve host " << host_; if (int res = getaddrinfo(host_.c_str(), port_str.c_str(), &hints, &result) != 0) { LOG(ERROR) << "Could not resolve host " << host_ << " port " << port_ << ": " << gai_strerror(res); return false; } addr_list = (struct in_addr**)he->h_addr_list; if (!addr_list[0]) { PLOG(ERROR) << "Unable to resolve host " << host_; return false; struct addrinfo* ai; int socket_fd; for (ai = result; ai != NULL; ai = ai->ai_next) { char host[NI_MAXHOST]; char service[NI_MAXSERV]; if (int res = getnameinfo(ai->ai_addr, ai->ai_addrlen, host, sizeof(host), service, sizeof(service), NI_NUMERICHOST | NI_NUMERICSERV) != 0) { LOG(ERROR) << "Could not get name info: " << gai_strerror(res); continue; } else { LOG(INFO) << "Trying to connect to " << string(host) << " on " << string(service); } // Establish the connection struct sockaddr_in server_name; socket_fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); fd_ = socket(AF_INET, SOCK_STREAM, 0); if (fd_ == -1) { PLOG(ERROR) << "Unable to create socket"; return false; if (socket_fd == -1) { LOG(WARNING) << "Could not create socket"; continue; } // os x won't let us set close-on-exec when creating the socket, so set it separately int current_fd_flags = fcntl(fd_, F_GETFD); int current_fd_flags = fcntl(socket_fd, F_GETFD); if (current_fd_flags == -1) { PLOG(ERROR) << "Failed to get socket fd flags"; return false; close(socket_fd); continue; } if (fcntl(fd_, F_SETFD, current_fd_flags | FD_CLOEXEC) == -1) { if (fcntl(socket_fd, F_SETFD, current_fd_flags | FD_CLOEXEC) == -1) { PLOG(ERROR) << "Failed to set socket close-on-exit"; return false; close(socket_fd); continue; } // On BSD-like systems we can set SO_NOSIGPIPE on the socket to prevent it from sending a Loading @@ -89,31 +111,44 @@ bool SocketWrapper::Connect() { // forcibly #ifdef SO_NOSIGPIPE int set = 1; int setsockopt_result = setsockopt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof(set)); int setsockopt_result = setsockopt(socket_fd, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof(set)); // Allow ENOTSOCK because it allows tests to use pipes instead of real sockets if (setsockopt_result != 0 && setsockopt_result != ENOTSOCK) { PLOG(ERROR) << "Failed to set SO_NOSIGPIPE on socket"; return false; close(socket_fd); continue; } #endif server_name.sin_family = AF_INET; server_name.sin_addr = *addr_list[0]; server_name.sin_port = htons(port_); if (connect(fd_, (struct sockaddr*)&server_name, sizeof(server_name)) == -1) { if (connect(socket_fd, ai->ai_addr, ai->ai_addrlen) == -1) { PLOG(WARNING) << "Unable to connect"; return false; close(socket_fd); continue; } if (nonblocking_ && fcntl(fd_, F_SETFL, O_NONBLOCK) != 0) { if (nonblocking_ && fcntl(socket_fd, F_SETFL, O_NONBLOCK) != 0) { PLOG(ERROR) << "Failed to set socket nonblocking"; close(socket_fd); continue; } break; } freeaddrinfo(result); if (ai == NULL) { // we went through all addresses without finding one we could bind to LOG(ERROR) << "Could not connect to " << host_ << " on port " << port_; return false; } fd_ = socket_fd; return true; } int SocketWrapper::fd() { return fd_; } } // namespace kinetic