Loading CHANGES.md +1 −0 Original line number Diff line number Diff line Loading @@ -3,6 +3,7 @@ Changes since 0.7.1 This section will document changes to the library since the last release ## Bug Fixes - Fixex bug that caused close() and connect() to fail on a connection that faulted (Issue #7) - Fixed a bug that caused the AsyncClient to crash when calling close() Changes from 0.7.0 to 0.7.1 Loading kinetic/asyncclient.py +10 −0 Original line number Diff line number Diff line Loading @@ -76,6 +76,16 @@ class AsyncClient(baseasync.BaseAsync): def sendAsync(self, header, value, onSuccess, onError): if self.closing: raise common.ConnectionClosed("Client is closing, can't queue more operations.") if self.faulted: self._raise(common.ConnectionFaulted("Can't send message when connection is on a faulted state."), onError) return #skip the rest # fail fast on NotConnected if not self.isConnected: self._raise(common.NotConnected("Not connected."), onError) return #skip the rest if LOG.isEnabledFor(logging.DEBUG): LOG.debug("Queue: {0}".format(self.queue.qsize())) self.queue.put((header, value, onSuccess, onError)) Loading kinetic/baseclient.py +22 −8 Original line number Diff line number Diff line Loading @@ -111,14 +111,20 @@ class BaseClient(object): def connect(self): if self._socket: raise common.AlreadyConnected("Client is already connected.") self._sequence = itertools.count() self._socket = self.build_socket() self._socket.settimeout(self.connect_timeout) # Stage socket on a local variable first s = self.build_socket() s.settimeout(self.connect_timeout) if self.socket_address: LOG.debug("Client local port address bound to " + self.socket_address) self._socket.bind((self.socket_address, self.socket_port)) self._socket.connect((self.hostname, self.port)) self._socket.settimeout(self.socket_timeout) s.bind((self.socket_address, self.socket_port)) # if connect fails, there is nothing to clean up s.connect((self.hostname, self.port)) s.settimeout(self.socket_timeout) # We are connected now, update attributes self._socket = s self._sequence = itertools.count() self.connection_id = int(time.time()) self._closed = False Loading @@ -129,8 +135,10 @@ class BaseClient(object): def close(self): self._closed = True if self._socket: try: self._socket.shutdown(socket.SHUT_RDWR) self._socket.close() except: pass # if socket wasnt connected, keep going self._buff = '' self._socket = None self.connection_id = None Loading Loading @@ -247,6 +255,9 @@ class BaseClient(object): proto = messages.Message() proto.ParseFromString(str(raw_proto)) if self.debug: print proto return (proto, value) Loading Loading @@ -289,6 +300,9 @@ class BaseClient(object): resp = messages.Message() resp.ParseFromString(raw_proto) if self.debug: print resp if header[2] > 0: resp.value = buff Loading Loading
CHANGES.md +1 −0 Original line number Diff line number Diff line Loading @@ -3,6 +3,7 @@ Changes since 0.7.1 This section will document changes to the library since the last release ## Bug Fixes - Fixex bug that caused close() and connect() to fail on a connection that faulted (Issue #7) - Fixed a bug that caused the AsyncClient to crash when calling close() Changes from 0.7.0 to 0.7.1 Loading
kinetic/asyncclient.py +10 −0 Original line number Diff line number Diff line Loading @@ -76,6 +76,16 @@ class AsyncClient(baseasync.BaseAsync): def sendAsync(self, header, value, onSuccess, onError): if self.closing: raise common.ConnectionClosed("Client is closing, can't queue more operations.") if self.faulted: self._raise(common.ConnectionFaulted("Can't send message when connection is on a faulted state."), onError) return #skip the rest # fail fast on NotConnected if not self.isConnected: self._raise(common.NotConnected("Not connected."), onError) return #skip the rest if LOG.isEnabledFor(logging.DEBUG): LOG.debug("Queue: {0}".format(self.queue.qsize())) self.queue.put((header, value, onSuccess, onError)) Loading
kinetic/baseclient.py +22 −8 Original line number Diff line number Diff line Loading @@ -111,14 +111,20 @@ class BaseClient(object): def connect(self): if self._socket: raise common.AlreadyConnected("Client is already connected.") self._sequence = itertools.count() self._socket = self.build_socket() self._socket.settimeout(self.connect_timeout) # Stage socket on a local variable first s = self.build_socket() s.settimeout(self.connect_timeout) if self.socket_address: LOG.debug("Client local port address bound to " + self.socket_address) self._socket.bind((self.socket_address, self.socket_port)) self._socket.connect((self.hostname, self.port)) self._socket.settimeout(self.socket_timeout) s.bind((self.socket_address, self.socket_port)) # if connect fails, there is nothing to clean up s.connect((self.hostname, self.port)) s.settimeout(self.socket_timeout) # We are connected now, update attributes self._socket = s self._sequence = itertools.count() self.connection_id = int(time.time()) self._closed = False Loading @@ -129,8 +135,10 @@ class BaseClient(object): def close(self): self._closed = True if self._socket: try: self._socket.shutdown(socket.SHUT_RDWR) self._socket.close() except: pass # if socket wasnt connected, keep going self._buff = '' self._socket = None self.connection_id = None Loading Loading @@ -247,6 +255,9 @@ class BaseClient(object): proto = messages.Message() proto.ParseFromString(str(raw_proto)) if self.debug: print proto return (proto, value) Loading Loading @@ -289,6 +300,9 @@ class BaseClient(object): resp = messages.Message() resp.ParseFromString(raw_proto) if self.debug: print resp if header[2] > 0: resp.value = buff Loading