Commit 9ac2e25e authored by Ignacio Corderi's avatar Ignacio Corderi
Browse files

Added timeout, time_quanta, priority and early_exit support

parent 1fa94ca6
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -2,6 +2,9 @@ Changes since 0.8.0
===========================
This section will document changes to the library since the last release

## New features
- Added timeout, time_quanta, priority and early_exit support

Changes from 0.7.3 to 0.8.0
===========================

+13 −13
Original line number Diff line number Diff line
@@ -182,41 +182,41 @@ class BaseAsync(Client):


    def putAsync(self, onSuccess, onError, *args, **kwargs):
        self._processAsync(operations.Put, onSuccess, onError, *args, **kwargs)
        self._processAsync(operations.Put(), onSuccess, onError, *args, **kwargs)

    def getAsync(self, onSuccess, onError, *args, **kwargs):
        self._processAsync(operations.Get, onSuccess, onError, *args, **kwargs)
        self._processAsync(operations.Get(), onSuccess, onError, *args, **kwargs)

    def getMetadataAsync(self, onSuccess, onError, *args, **kwargs):
        self._processAsync(operations.GetMetadata, onSuccess, onError, *args, **kwargs)
        self._processAsync(operations.GetMetadata(), onSuccess, onError, *args, **kwargs)

    def deleteAsync(self, onSuccess, onError, *args, **kwargs):
        self._processAsync(operations.Delete, onSuccess, onError, *args, **kwargs)
        self._processAsync(operations.Delete(), onSuccess, onError, *args, **kwargs)

    def getNextAsync(self, onSuccess, onError, *args, **kwargs):
        self._processAsync(operations.GetNext, onSuccess, onError, *args, **kwargs)
        self._processAsync(operations.GetNext(), onSuccess, onError, *args, **kwargs)

    def getPreviousAsync(self, onSuccess, onError, *args, **kwargs):
        self._processAsync(operations.GetPrevious, onSuccess, onError, *args, **kwargs)
        self._processAsync(operations.GetPrevious(), onSuccess, onError, *args, **kwargs)

    def getKeyRangeAsync(self, onSuccess, onError, *args, **kwargs):
        self._processAsync(operations.GetKeyRange, onSuccess, onError, *args, **kwargs)
        self._processAsync(operations.GetKeyRange(), onSuccess, onError, *args, **kwargs)

    def getVersionAsync(self, onSuccess, onError, *args, **kwargs):
        self._processAsync(operations.GetVersion, onSuccess, onError, *args, **kwargs)
        self._processAsync(operations.GetVersion(), onSuccess, onError, *args, **kwargs)

    def flushAsync(self, onSuccess, onError, *args, **kwargs):
        self._processAsync(operations.Flush, onSuccess, onError, *args, **kwargs)
        self._processAsync(operations.Flush(), onSuccess, onError, *args, **kwargs)

    def noopAsync(self, onSuccess, onError, *args, **kwargs):
        self._processAsync(operations.Noop, onSuccess, onError, *args, **kwargs)
        self._processAsync(operations.Noop(), onSuccess, onError, *args, **kwargs)

    def mediaScanAsync(self, onSuccess, onError, *args, **kwargs):
        self._processAsync(operations.MediaScan, onSuccess, onError, *args, **kwargs)
        self._processAsync(operations.MediaScan(), onSuccess, onError, *args, **kwargs)

    def mediaOptimizeAsync(self, onSuccess, onError, *args, **kwargs):
        self._processAsync(operations.MediaOptimize, onSuccess, onError, *args, **kwargs)
        self._processAsync(operations.MediaOptimize(), onSuccess, onError, *args, **kwargs)

    def getLogAsync(self, onSuccess, onError, *args, **kwargs):
        self._processAsync(operations.GetLog, onSuccess, onError, *args, **kwargs)
        self._processAsync(operations.GetLog(), onSuccess, onError, *args, **kwargs)
+0 −1
Original line number Diff line number Diff line
@@ -27,7 +27,6 @@ import struct
import common
import kinetic_pb2 as messages
import ssl
import operations

ss = socket

+15 −15
Original line number Diff line number Diff line
@@ -42,37 +42,37 @@ class Client(BaseClient):
            return op.onError(e)

    def noop(self, *args, **kwargs):
        return self._process(operations.Noop, *args, **kwargs)
        return self._process(operations.Noop(), *args, **kwargs)

    def put(self, *args, **kwargs):
        return self._process(operations.Put, *args, **kwargs)
        return self._process(operations.Put(), *args, **kwargs)

    def get(self, *args, **kwargs):
        return self._process(operations.Get, *args, **kwargs)
        return self._process(operations.Get(), *args, **kwargs)

    def getMetadata(self, *args, **kwargs):
        return self._process(operations.GetMetadata, *args, **kwargs)
        return self._process(operations.GetMetadata(), *args, **kwargs)

    def delete(self, *args, **kwargs):
        return self._process(operations.Delete, *args, **kwargs)
        return self._process(operations.Delete(), *args, **kwargs)

    def getNext(self, *args, **kwargs):
        return self._process(operations.GetNext, *args, **kwargs)
        return self._process(operations.GetNext(), *args, **kwargs)

    def getPrevious(self, *args, **kwargs):
        return self._process(operations.GetPrevious, *args, **kwargs)
        return self._process(operations.GetPrevious(), *args, **kwargs)

    def getKeyRange(self, *args, **kwargs):
        return self._process(operations.GetKeyRange, *args, **kwargs)
        return self._process(operations.GetKeyRange(), *args, **kwargs)

    def getRange(self, startKey, endKey, startKeyInclusive=True, endKeyInclusive=True, prefetch=64):
        return KineticRangeIter(self, startKey, endKey, startKeyInclusive, endKeyInclusive, prefetch)

    def push(self, *args, **kwargs):
        return self._process(operations.P2pPush, *args, **kwargs)
        return self._process(operations.P2pPush(), *args, **kwargs)

    def pipedPush(self, *args, **kwargs):
        return self._process(operations.P2pPipedPush, *args, **kwargs)
        return self._process(operations.P2pPipedPush(), *args, **kwargs)


    def getVersion(self, *args, **kwargs):
@@ -80,23 +80,23 @@ class Client(BaseClient):
            Arguments: key -> The key you are seeking version information for.
            Returns a protobuf object with the version property that determines the pair's current version.
        """
        return self._process(operations.GetVersion, *args, **kwargs)
        return self._process(operations.GetVersion(), *args, **kwargs)


    # @RequiresProtocol('2.0.3')
    def flush(self, *args, **kwargs):
        return self._process(operations.Flush, *args, **kwargs)
        return self._process(operations.Flush(), *args, **kwargs)

    # @RequiresProtocol('3.0.0')
    def mediaScan(self, *args, **kwargs):
        return self._process(operations.MediaScan, *args, **kwargs)
        return self._process(operations.MediaScan(), *args, **kwargs)

    # @RequiresProtocol('3.0.0')
    def mediaOptimize(self, *args, **kwargs):
        return self._process(operations.MediaOptimize, *args, **kwargs)
        return self._process(operations.MediaOptimize(), *args, **kwargs)

    def getLog(self, *args, **kwargs):
        return self._process(operations.GetLog, *args, **kwargs)
        return self._process(operations.GetLog(), *args, **kwargs)

class KineticRangeIter(object):

+125 −268
Original line number Diff line number Diff line
@@ -35,9 +35,8 @@ def _check_status(command):
        raise KineticMessageException(command.status)


def _buildMessage(messageType, key, data=None, version='', new_version='',
def _buildMessage(m, messageType, key, data=None, version='', new_version='',
                  force=False, tag=None, algorithm=None, synchronization=None):
    m = messages.Command()
    m.header.messageType = messageType
    if len(key) > common.MAX_KEY_SIZE: raise common.KineticClientException("Key exceeds maximum size of {0} bytes.".format(common.MAX_KEY_SIZE))
    m.body.keyValue.key = key
@@ -61,118 +60,108 @@ def _buildMessage(messageType, key, data=None, version='', new_version='',

    return (m,data)

class Noop(object):

    @staticmethod
    def build():
        m = messages.Command()
        m.header.messageType = messages.Command.NOOP
        return (m, None)
class BaseOperation(object):

    @staticmethod
    def parse(m, value):
        return
    def __init__(self):
        self.m = None

    @staticmethod
    def onError(e):
        raise e
    def _build(): pass

    def build(self, *args, **kwargs):
        self.m = messages.Command()

        if 'timeout' in kwargs:
            self.m.header.timeout = kwargs['timeout']
            del kwargs['timeout']

class Put(object):
        if 'priority' in kwargs:
            self.m.header.priority = kwargs['priority']
            del kwargs['priority']

    @staticmethod
    def build(key, data, version="", new_version="", **kwargs):
        return _buildMessage(messages.Command.PUT, key, data, version, new_version, **kwargs)
        if 'early_exit' in kwargs:
            self.m.header.earlyExit = kwargs['early_exit']
            del kwargs['early_exit']

    @staticmethod
    def parse(m, value):
        if 'time_quanta' in kwargs:
            self.m.header.TimeQuanta = kwargs['time_quanta']
            del kwargs['time_quanta']

        return self._build(*args, **kwargs)

    def parse(self, m, value):
        return

    @staticmethod
    def onError(e):
    def onError(self, e):
        raise e

class Get(object):
class Noop(BaseOperation):

    def _build(self):
        m = self.m
        m.header.messageType = messages.Command.NOOP
        return (m, None)

    @staticmethod
    def build(key):
        return _buildMessage(messages.Command.GET, key)

    @staticmethod
    def parse(m, value):
class Put(BaseOperation):

    def _build(self, key, data, version="", new_version="", **kwargs):
        return _buildMessage(self.m, messages.Command.PUT, key, data, version, new_version, **kwargs)


class Get(BaseOperation):

    def _build(self, key):
        return _buildMessage(self.m, messages.Command.GET, key)

    def parse(self, m, value):
        return Entry.fromResponse(m, value)

    @staticmethod
    def onError(e):
    def onError(self, e):
        if isinstance(e,KineticMessageException):
            if e.code and e.code == 'NOT_FOUND':
                return None
        raise e

class GetMetadata(object):

    @staticmethod
    def build(key):
        (m,_) = _buildMessage(messages.Command.GET, key)
class GetMetadata(Get):

    def _build(self, key):
        (m,_) = _buildMessage(self.m, messages.Command.GET, key)
        m.body.keyValue.metadataOnly = True
        return (m, None)

    @staticmethod
    def parse(m, value):
        return Get.parse(m, value)

    @staticmethod
    def onError(e):
        return Get.onError(e)

class Delete(object):
class Delete(BaseOperation):

    @staticmethod
    def build(key, version="", **kwargs):
        return _buildMessage(messages.Command.DELETE, key, version=version, **kwargs)
    def _build(self, key, version="", **kwargs):
        return _buildMessage(self.m, messages.Command.DELETE, key, version=version, **kwargs)

    @staticmethod
    def parse(m, value):
    def parse(self, m, value):
        return True

    @staticmethod
    def onError(e):
    def onError(self, e):
        if isinstance(e,KineticMessageException):
            if e.code and e.code == 'NOT_FOUND':
                return False
        raise e

class GetNext(object):

    @staticmethod
    def build(key):
        return _buildMessage(messages.Command.GETNEXT, key)

    @staticmethod
    def parse(m, value):
        return Get.parse(m, value)
class GetNext(Get):

    @staticmethod
    def onError(e):
        return Get.onError(e)
    def _build(self, key):
        return _buildMessage(self.m, messages.Command.GETNEXT, key)

class GetPrevious(object):

    @staticmethod
    def build(key):
        return _buildMessage(messages.Command.GETPREVIOUS, key)
class GetPrevious(Get):

    @staticmethod
    def parse(m, value):
        return Get.parse(m, value)
    def _build(self, key):
        return _buildMessage(self.m, messages.Command.GETPREVIOUS, key)

    @staticmethod
    def onError(e):
        return Get.onError(e)

class GetKeyRange(object):
class GetKeyRange(BaseOperation):

    @staticmethod
    def build(startKey=None, endKey=None, startKeyInclusive=True, endKeyInclusive=True, maxReturned=200):
    def _build(self, startKey=None, endKey=None, startKeyInclusive=True, endKeyInclusive=True, maxReturned=200):
        if not startKey:
            startKey = ''
        if not endKey:
@@ -181,7 +170,7 @@ class GetKeyRange(object):
        if len(startKey) > common.MAX_KEY_SIZE: raise common.KineticClientException("Start key exceeds maximum size of {0} bytes.".format(common.MAX_KEY_SIZE))
        if len(endKey) > common.MAX_KEY_SIZE: raise common.KineticClientException("End key exceeds maximum size of {0} bytes.".format(common.MAX_KEY_SIZE))

        m = messages.Command()
        m = self.m
        m.header.messageType = messages.Command.GETKEYRANGE

        kr = m.body.range
@@ -193,37 +182,29 @@ class GetKeyRange(object):

        return (m, None)

    @staticmethod
    def parse(m, value):
    def parse(self, m, value):
        return [k for k in m.body.range.keys] # key is actually a set of keys

    @staticmethod
    def onError(e):
        raise e

class GetVersion(object):
class GetVersion(BaseOperation):

    @staticmethod
    def build(key):
        (m,_) = _buildMessage(messages.Command.GETVERSION, key)
    def _build(self, key):
        (m,_) = _buildMessage(self.m, messages.Command.GETVERSION, key)
        return (m, None)

    @staticmethod
    def parse(m, value):
    def parse(self, m, value):
        return m.body.keyValue.dbVersion

    @staticmethod
    def onError(e):
    def onError(self, e):
        if isinstance(e,KineticMessageException):
            if e.code and e.code == 'NOT_FOUND':
                return None
        raise e

class P2pPush(object):
class P2pPush(BaseOperation):

    @staticmethod
    def build(keys, hostname='localhost', port=8123, tls=False):
        m = messages.Command()
    def _build(self, keys, hostname='localhost', port=8123, tls=False):
        m = self.m
        m.header.messageType = messages.Command.PEER2PEERPUSH
        m.body.p2pOperation.peer.hostname = hostname
        m.body.p2pOperation.peer.port = port
@@ -248,19 +229,14 @@ class P2pPush(object):

        return (m, None)

    @staticmethod
    def parse(m, value):
    def parse(self, m, value):
        return [op for op in m.body.p2pOperation.operation]

    @staticmethod
    def onError(e):
        raise e

class P2pPipedPush(object):
class P2pPipedPush(BaseOperation):

    @staticmethod
    def build(keys, targets):
        m = messages.Command()
    def _build(self, keys, targets):
        m = self.m
        m.header.messageType = messages.Command.PEER2PEERPUSH
        m.body.p2pOperation.peer.hostname = targets[0].hostname
        m.body.p2pOperation.peer.port = targets[0].port
@@ -311,62 +287,25 @@ class P2pPipedPush(object):

        return (m, None)

    @staticmethod
    def parse(m, value):
    def parse(self, m, value):
        return [op for op in m.body.p2pOperation.operation]

    @staticmethod
    def onError(e):
        raise e

class PushKeys(object):

    @staticmethod
    def build(keys, hostname='localhost', port=8123, **kwargs):
        m = messages.Command()
        m.header.messageType = messages.Command.PEER2PEERPUSH
        m.body.p2pOperation.peer.hostname = hostname
        m.body.p2pOperation.peer.port = port

        m.body.p2pOperation.operation.extend([
            messages.Command.P2POperation.Operation(key=key) for key in keys
        ])

        return (m, None)

    @staticmethod
    def parse(m, value):
        return [op for op in m.body.p2pOperation.operation]

    @staticmethod
    def onError(e):
        raise e

class Flush(object):
class Flush(BaseOperation):

    @staticmethod
    def build():
        m = messages.Command()
    def _build(self):
        m = self.m
        m.header.messageType = messages.Command.FLUSHALLDATA

        return (m, None)

    @staticmethod
    def parse(m, value):
        return

    @staticmethod
    def onError(e):
        raise e


### Admin Operations ###

class GetLog(object):
class GetLog(BaseOperation):

    @staticmethod
    def build(types, device=None):
        m = messages.Command()
    def _build(self, types, device=None):
        m = self.m
        m.header.messageType = messages.Command.GETLOG

        log = m.body.getLog
@@ -377,15 +316,13 @@ class GetLog(object):

        return (m, None)

    @staticmethod
    def parse(m, value):
    def parse(self, m, value):
        if value:
            return (m.body.getLog, value)
        else:
            return m.body.getLog

    @staticmethod
    def onError(e):
    def onError(self, e):
        if isinstance(e,KineticMessageException):
            if e.code and e.code == 'NOT_FOUND':
                return None
@@ -395,54 +332,35 @@ class GetLog(object):
######################
#  Setup operations  #
######################
class SetClusterVersion(object):
class SetClusterVersion(BaseOperation):

    @staticmethod
    def build(version):
        m = messages.Command()
    def _build(self, version):
        m = self.m
        m.header.messageType = messages.Command.SETUP

        m.body.setup.newClusterVersion = version

        return (m, None)

    @staticmethod
    def parse(m, value):
        return

    @staticmethod
    def onError(e):
        raise e


class UpdateFirmware(object):
class UpdateFirmware(BaseOperation):

    @staticmethod
    def build(firmware):
        m = messages.Command()
    def _build(self, firmware):
        m = self.m
        m.header.messageType = messages.Command.SETUP

        m.body.setup.firmwareDownload = True

        return (m, firmware)

    @staticmethod
    def parse(m, value):
        return

    @staticmethod
    def onError(e):
        raise e


########################
#  Security operations #
########################
class Security(object):
class Security(BaseOperation):

    @staticmethod
    def build(acls=None, old_erase_pin=None, new_erase_pin=None, old_lock_pin=None, new_lock_pin=None):
        m = messages.Command()
    def _build(self, acls=None, old_erase_pin=None, new_erase_pin=None, old_lock_pin=None, new_lock_pin=None):
        m = self.m
        m.header.messageType = messages.Command.SECURITY
        op = m.body.security

@@ -482,21 +400,13 @@ class Security(object):

        return (m, None)

    @staticmethod
    def parse(m, value):
        return

    @staticmethod
    def onError(e):
        raise e

###########################
#  Background operations  #
###########################
class MediaScan(object):
class MediaScan(BaseOperation):

    @staticmethod
    def build(startKey=None, endKey=None, startKeyInclusive=True, endKeyInclusive=True, maxReturned=200):
    def _build(self, startKey=None, endKey=None, startKeyInclusive=True, endKeyInclusive=True, maxReturned=200):
        if not startKey:
            startKey = ''
        if not endKey:
@@ -505,7 +415,7 @@ class MediaScan(object):
        if len(startKey) > common.MAX_KEY_SIZE: raise common.KineticClientException("Start key exceeds maximum size of {0} bytes.".format(common.MAX_KEY_SIZE))
        if len(endKey) > common.MAX_KEY_SIZE: raise common.KineticClientException("End key exceeds maximum size of {0} bytes.".format(common.MAX_KEY_SIZE))

        m = messages.Command()
        m = self.m

        m.header.messageType = messages.Command.BACKOP

@@ -521,19 +431,14 @@ class MediaScan(object):

        return (m, None)

    @staticmethod
    def parse(m, value):
    def parse(self, m, value):
        r = m.body.backgroundOperation.range
        return ([k for k in r.keys], r.endKey)

    @staticmethod
    def onError(e):
        raise e

class MediaOptimize(object):
class MediaOptimize(BaseOperation):

    @staticmethod
    def build(startKey=None, endKey=None, startKeyInclusive=True, endKeyInclusive=True, maxReturned=200):
    def _build(self, startKey=None, endKey=None, startKeyInclusive=True, endKeyInclusive=True, maxReturned=200):
        if not startKey:
            startKey = ''
        if not endKey:
@@ -542,7 +447,7 @@ class MediaOptimize(object):
        if len(startKey) > common.MAX_KEY_SIZE: raise common.KineticClientException("Start key exceeds maximum size of {0} bytes.".format(common.MAX_KEY_SIZE))
        if len(endKey) > common.MAX_KEY_SIZE: raise common.KineticClientException("End key exceeds maximum size of {0} bytes.".format(common.MAX_KEY_SIZE))

        m = messages.Command()
        m = self.m

        m.header.messageType = messages.Command.BACKOP

@@ -558,97 +463,49 @@ class MediaOptimize(object):

        return (m, None)

    @staticmethod
    def parse(m, value):
    def parse(self, m, value):
        r = m.body.backgroundOperation.range
        return ([k for k in r.keys], r.endKey)

    @staticmethod
    def onError(e):
        raise e

####################
#  Pin operations  #
####################
class UnlockDevice(object):
class BasePinOperation(BaseOperation):

    @staticmethod
    def build():
        m = messages.Command()
    def __init__(self):
        super(BaseOperation, self).__init__()
        self.pin_op_type = None

    def _build(self):
        m = self.m
        m.header.messageType = messages.Command.PINOP

        m.body.pinOp.pinOpType = messages.Command.PinOperation.UNLOCK_PINOP

        m.body.pinOp.pinOpType = self.pin_op_type
        return (m, None)

    @staticmethod
    def parse(m, value):
        return

    @staticmethod
    def onError(e):
        raise e

class UnlockDevice(BasePinOperation):

class LockDevice(object):
    def __init__(self):
        super(UnlockDevice, self).__init__()
        self.pin_op_type = messages.Command.PinOperation.UNLOCK_PINOP

    @staticmethod
    def build():
        m = messages.Command()

        m.header.messageType = messages.Command.PINOP
class LockDevice(BaseOperation):

        m.body.pinOp.pinOpType = messages.Command.PinOperation.LOCK_PINOP
    def __init__(self):
        super(LockDevice, self).__init__()
        self.pin_op_type = messages.Command.PinOperation.LOCK_PINOP

        return (m, None)

    @staticmethod
    def parse(m, value):
        return

    @staticmethod
    def onError(e):
        raise e
class EraseDevice(BaseOperation):

    def __init__(self):
        super(EraseDevice, self).__init__()
        self.pin_op_type = messages.Command.PinOperation.ERASE_PINOP

class EraseDevice(object):

    @staticmethod
    def build():
        m = messages.Command()
class SecureEraseDevice(BaseOperation):

        m.header.messageType = messages.Command.PINOP

        m.body.pinOp.pinOpType = messages.Command.PinOperation.ERASE_PINOP

        return (m, None)

    @staticmethod
    def parse(m, value):
        return

    @staticmethod
    def onError(e):
        raise e


class SecureEraseDevice(object):

    @staticmethod
    def build():
        m = messages.Command()

        m.header.messageType = messages.Command.PINOP

        m.body.pinOp.pinOpType = messages.Command.PinOperation.SECURE_ERASE_PINOP

        return (m, None)

    @staticmethod
    def parse(m, value):
        return

    @staticmethod
    def onError(e):
        raise e
    def __init__(self):
        super(SecureEraseDevice, self).__init__()
        self.pin_op_type = messages.Command.PinOperation.SECURE_ERASE_PINOP