Commit 1ac29430 authored by Paul Dardeau's avatar Paul Dardeau
Browse files

Added StartBatch, EndBatch, AbortBatch; added support for setting batch_id.

parent 7b2117dc
Loading
Loading
Loading
Loading
+31 −0
Original line number Diff line number Diff line
@@ -97,6 +97,10 @@ class BaseOperation(object):
            self.m.header.TimeQuanta = kwargs['time_quanta']
            del kwargs['time_quanta']

        if 'batch_id' in kwargs:
            self.m.header.batchID = kwargs['batch_id']
            del kwargs['batch_id']

        return self._build(*args, **kwargs)

    def parse(self, m, value):
@@ -301,6 +305,33 @@ class P2pPipedPush(BaseOperation):
        return [op for op in m.body.p2pOperation.operation]


class StartBatch(BaseOperation):

    def _build(self):
        return _buildMessage(self.m, messages.Command.START_BATCH, '')


class EndBatch(BaseOperation):

    def _build(self, **kwargs):
        # batch_op_count
        (m,_) = _buildMessage(self.m, messages.Command.END_BATCH, '')
        m.body.batch.count = kwargs['batch_op_count']
        del kwargs['batch_op_count']
        return (m, None)

    def onError(self, e):
        if isinstance(e,common.KineticException):
            if e.code and e.code == 'INVALID_BATCH':
                return common.BatchAbortedException(e.value)
        raise e

class AbortBatch(BaseOperation):

    def _build(self):
        return _buildMessage(self.m, messages.Command.ABORT_BATCH, '')


class Flush(BaseOperation):

    def _build(self):