Commit cddae1a9 authored by Ignacio Corderi's avatar Ignacio Corderi
Browse files

Added Batch support

parents b0c811d8 82a27a73
Loading
Loading
Loading
Loading
Compare 6b5c98a2 to 1c522532
Original line number Diff line number Diff line
Subproject commit 6b5c98a2298353713b8f16679f8c9dc7b2e543b7
Subproject commit 1c522532aba40b229f2b0b7e27bb5383243c149f
+5 −1
Original line number Diff line number Diff line
@@ -58,3 +58,7 @@ from common import KineticMessageException
AsyncClient = Client
from kinetic.deprecated.adminclient import AdminClient
from kinetic import greenclient as client
# Fake old asyncclient module 
class AsyncClientCompat(object):
    AsyncClient = Client   
asyncclient = AsyncClientCompat()
 No newline at end of file
+11 −4
Original line number Diff line number Diff line
@@ -132,7 +132,7 @@ class BaseAsync(deprecated.BlockingClient):
    ###


    def sendAsync(self, command, value, onSuccess, onError):
    def sendAsync(self, command, value, onSuccess, onError, no_ack=False):
        if self.faulted: # TODO(Nacho): should we fault through onError on fault or bow up on the callers face?
            self._raise(common.ConnectionFaulted("Can't send message when connection is on a faulted state."), onError)
            return #skip the rest
@@ -152,6 +152,7 @@ class BaseAsync(deprecated.BlockingClient):
        # get sequence
        self.update_header(command)

        if not no_ack:
            # add callback to pending dictionary
            self._pending[command.header.sequence] = (innerSuccess, onError)

@@ -177,8 +178,14 @@ class BaseAsync(deprecated.BlockingClient):
            except Exception as e2:
                onError(e2)

        if 'no_ack' in kwargs:
            send_no_ack = True
            del kwargs['no_ack']
        else:
            send_no_ack = False

        header, value = op.build(*args, **kwargs)
        self.sendAsync(header, value, innerSuccess, innerError)
        self.sendAsync(header, value, innerSuccess, innerError, send_no_ack)


    def putAsync(self, onSuccess, onError, *args, **kwargs):
+8 −0
Original line number Diff line number Diff line
@@ -130,6 +130,7 @@ class BaseClient(object):
            self._socket.settimeout(self.socket_timeout)

            self._sequence = itertools.count()
            self._batch_id = itertools.count()
            self._closed = False
        except:
            self._socket = None
@@ -180,6 +181,7 @@ class BaseClient(object):
        self._socket = None
        self.connection_id = None
        self._sequence = itertools.count()
        self._batch_id = itertools.count()


    def update_header(self, command):
@@ -363,6 +365,12 @@ class BaseClient(object):
            else: done = True
        return m,cmd,value

    def send_no_ack(self, header, value):
        self.network_send(header, value)

    def next_batch_id(self):
        return self._batch_id.next()

    ### with statement support ###

    def __enter__(self):

kinetic/batch.py

0 → 100644
+160 −0
Original line number Diff line number Diff line
# Copyright (C) 2015 Seagate Technology.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA

#@author: Paul Dardeau

import common
import logging
import operations

LOG = logging.getLogger(__name__)

class Batch(object):
    """
    The Batch class is used for grouping a set of put and/or delete operations
    so that all are committed as one unit or all of them are canceled
    (aborted).

    A Batch object is obtained by calling :func:`~baseclient.BaseClient.begin_batch`.
    Once all relevant put and delete calls are made, 'commit' should be called
    to apply all of the operations, or 'abort' to cancel (abort) them.

    A Batch object cannot be reused for subsequent batches. After the 'commit'
    or 'abort' has completed successfully, a new Batch object should be
    requested for the next batch operation.
    """


    def __init__(self, client, batch_id):
        """
        Initialize instance with Kinetic client and batch identifier.

        Args:
            client: the Kinetic client to use for batch operations.
            batch_id: the batch identifier to be used for client connection.
        """
        self._client = client
        self._batch_id = batch_id
        self._op_count = 0
        self._batch_completed = False   # to detect attempted reuse
  
    def put(self, *args, **kwargs):
        """
        Put an entry within the batch operation.

        The command is not committed until :func:`~batch.Batch.commit` is
        called and returns successfully. If a version is specified, it must
        match the one stored in the persistent storage. Otherwise, a
        KineticException is raised.

        Args:

        Kwargs:

        Raises:
            KineticException: if any internal error occurs.
        """
        if self._batch_completed:
            raise common.BatchCompletedException()

        self._op_count += 1        
        kwargs['batch_id'] = self._batch_id
        kwargs['no_ack'] = True
        
        self._client.put(*args, **kwargs)

    def delete(self, *args, **kwargs):
        """
        Delete the entry associated with the specified key.

        The command is not committed until :func:`~batch.Batch.commit` is
        called and returns successfully. If a version is specified, it must
        match the one stored in persistent storage. Otherwise, a
        KineticException is raised.

        Args:

        Kwargs:

        Raises:
            KineticException: if any internal error occurs.
        """
        if self._batch_completed:
            raise common.BatchCompletedException()

        self._op_count += 1
        kwargs['batch_id'] = self._batch_id
        kwargs['no_ack'] = True
        
        self._client.delete(*args, **kwargs)

    def commit(self, *args, **kwargs):
        """
        Commit the current batch operation.

        When this call returned successfully, all the commands performed in the
        current batch are executed and committed to store successfully.

        Raises:
            KineticException: if any internal error occurred. The batch may
                or may not be committed. If committed, all commands are
                committed. Otherwise, no messages are committed.
            BatchAbortedException: if the commit failed. No messages within
                the batch were committed to the store. 
        """
        if self._batch_completed:
            raise common.BatchCompletedException()

        kwargs['batch_id'] = self._batch_id
        kwargs['batch_op_count'] = self._op_count
        try:
            self._client._process(operations.EndBatch(), *args, **kwargs)
            self._batch_completed = True
        except BatchAbortedException:
            self._batch_completed = True
            raise

    def abort(self, *args, **kwargs):
        """
        Abort the current batch operation.

        When this call returned successfully, all the commands queued in the
        current batch are aborted. Resources related to the current batch are
        cleaned up and released.

        Raises:
            KineticException: if any internal error occurred.
        """
        if self._batch_completed:
            raise common.BatchCompletedException()

        kwargs['batch_id'] = self._batch_id
        self._client._process(operations.AbortBatch(), *args, **kwargs)
        self._batch_completed = True

    def is_completed(self):
        """
        Return boolean indicating whether the batch is completed (either
        committed or aborted)
        """
        return self._batch_completed

    def __len__(self):
        """
        Return the number of operations that have been included in the batch.
        """
        return self._op_count
Loading