Commit ec4bd046 authored by Paul Dardeau's avatar Paul Dardeau
Browse files

Added support for batch operations

parent e5ac8c76
Loading
Loading
Loading
Loading
+42 −3
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@
from kinetic.baseclient import BaseClient
from kinetic import operations
from kinetic import kinetic_pb2 as messages
from kinetic import batch
import logging

LOG = logging.getLogger(__name__)
@@ -29,13 +30,24 @@ class BlockingClient(BaseClient):
        super(BlockingClient, self).__init__(*args, **kwargs)

    def _process(self, op, *args, **kwargs):
        if 'no_ack' in kwargs:
            send_no_ack = True
            del kwargs['no_ack']
        else:
            send_no_ack = False
        header,value = op.build(*args, **kwargs)
        try:
            with self:
                # update header
                self.update_header(header)
                # send message synchronously
                if send_no_ack:
                    self.send_no_ack(header, value)
                else:
                    _, cmd, value = self.send(header, value)
            if send_no_ack:
                return None
            else:
                operations._check_status(cmd)
                return op.parse(cmd, value)
        except Exception as e:
@@ -85,6 +97,33 @@ class BlockingClient(BaseClient):
    def flush(self, *args, **kwargs):
        return self._process(operations.Flush(), *args, **kwargs)

    # @RequiresProtocol('3.0.6')
    def begin_batch(self, *args, **kwargs):
        self.batch_begin(*args, **kwargs)
        return batch.Batch(self, self.next_batch_id())

    # @RequiresProtocol('3.0.6')
    def batch_begin(self, *args, **kwargs):
        return self._process(operations.StartBatch(), *args, **kwargs)

    # @RequiresProtocol('3.0.6')
    def batch_put(self, *args, **kwargs):
        kwargs['no_ack'] = True
        return self.put(*args, **kwargs)

    # @RequiresProtocol('3.0.6')
    def batch_delete(self, *args, **kwargs):
        kwargs['no_ack'] = True
        return self.delete(*args, **kwargs)

    # @RequiresProtocol('3.0.6')
    def batch_commit(self, *args, **kwargs):
        return self._process(operations.EndBatch(), *args, **kwargs)

    # @RequiresProtocol('3.0.6')
    def batch_abort(self, *args, **kwargs):
        return self._process(operations.AbortBatch(), *args, **kwargs)

    # @RequiresProtocol('3.0.0')
    def mediaScan(self, *args, **kwargs):
        return self._process(operations.MediaScan(), *args, **kwargs)