Commit 4cc7039a authored by Paul Dardeau's avatar Paul Dardeau
Browse files

Added support for no_ack sends

parent dded7963
Loading
Loading
Loading
Loading
+4 −4
Original line number Diff line number Diff line
@@ -78,7 +78,7 @@ class Client(baseasync.BaseAsync):
        self.writer_thread = None
        self.reader_thread = None

    def sendAsync(self, header, value, onSuccess, onError):
    def sendAsync(self, header, value, onSuccess, onError, no_ack=False):
        if self.closing:
            raise common.ConnectionClosed("Client is closing, can't queue more operations.")

@@ -93,7 +93,7 @@ class Client(baseasync.BaseAsync):

        if LOG.isEnabledFor(logging.DEBUG):
            LOG.debug("Queue: {0}".format(self.queue.qsize()))
        self.queue.put((header, value, onSuccess, onError))
        self.queue.put((header, value, onSuccess, onError, no_ack))
        eventlet.sleep(0)

    def wait(self):
@@ -125,8 +125,8 @@ class Client(baseasync.BaseAsync):
            try:
                while len(self._pending) > self.max_pending:
                    eventlet.sleep(0)
                (header, value, onSuccess, onError) = self.queue.get()
                super(Client, self).sendAsync(header, value, onSuccess, onError)
                (header, value, onSuccess, onError, no_ack) = self.queue.get()
                super(Client, self).sendAsync(header, value, onSuccess, onError, no_ack)
            except common.ConnectionFaulted: pass
            except common.ConnectionClosed: pass
            except Exception as ex: