Commit 54f6e0db authored by plensing's avatar plensing
Browse files

Merge pull request #28 from thaimai/master

add support for non-blocking sockets with time out in ReaderWriter class 
parents 961b3a42 67ea8b77
Loading
Loading
Loading
Loading
+27 −9
Original line number Diff line number Diff line
@@ -25,50 +25,68 @@

#include "glog/logging.h"

const uint32_t SOCKET_TIMEOUT = 20000;  //10 seconds

namespace kinetic {

ReaderWriter::ReaderWriter(int fd) : fd_(fd) {}

bool ReaderWriter::Read(void *buf, size_t n, int* err) {
    size_t bytes_read = 0;
    while (bytes_read < n) {
    uint32_t socket_timeout = 0;
    while (bytes_read < n && socket_timeout < SOCKET_TIMEOUT) {
        int status = read(fd_, reinterpret_cast<char *>(buf) + bytes_read, n - bytes_read);
        if (status == -1 && errno == EINTR) {
            continue;
        }
        if (status < 0) {
        } else if (status == -1 && (errno == EAGAIN || errno == EWOULDBLOCK )) {
	    //Wait for 500us;
	    usleep(500);
            socket_timeout++;
            continue;
        } else if (status < 0) {
            *err = errno;
            PLOG(WARNING) << "Failed to read from socket";
            return false;
        }
        if (status == 0) {
            LOG(WARNING) << "Failed to read from socket";
            LOG(WARNING) << "Unexpected EOF. Socket (TX) may be closed by Peer";
            return false;
        }
        bytes_read += status;
    }

    if (socket_timeout >= SOCKET_TIMEOUT) {
        LOG(INFO) << "Peer is slow to transmit";
        return false;
    }
    return true;
}

bool ReaderWriter::Write(const void *buf, size_t n) {
    size_t bytes_written = 0;
    while (bytes_written < n) {
    uint32_t socket_timeout = 0;
    while (bytes_written < n && socket_timeout < SOCKET_TIMEOUT) {
        int status = write(fd_, reinterpret_cast<const char *>(buf) + bytes_written,
            n - bytes_written);
        if (status == -1 && errno == EINTR) {
            continue;
        }
        if (status < 0) {
        } else if (status == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
	    usleep(500);
	    socket_timeout++;
            continue;
        } else if (status < 0) {	
            PLOG(WARNING) << "Failed to write to socket";
            return false;
        }
        if (status == 0) {
            LOG(WARNING) << "Failed to write to socket";
            LOG(WARNING) << "Unexpected EOF, Socket(RX) may be closed by Peer";
            return false;
        }
        bytes_written += status;
    }
    if (socket_timeout >= SOCKET_TIMEOUT) {
        LOG(INFO) << " Peer is slow to receive";
        return false;
    }

    return true;
}