Commit 0c516add authored by Ignacio Corderi's avatar Ignacio Corderi
Browse files

AsyncClient and Client working

parent 3b7bffc0
Loading
Loading
Loading
Loading
+11 −11
Original line number Diff line number Diff line
@@ -48,15 +48,15 @@ class Entry(object):
    #the subclass on a fromMessage. I suspect you always want to generate Entry objects,
    #in which case, a staticmethod is appropriate as a factory.
    @staticmethod
    def fromMessage(header, value):
        if not header: return None
        return Entry(header.command.body.keyValue.key, value, EntryMetadata.fromMessage(header))
    def fromMessage(command, value):
        if not command: return None
        return Entry(command.body.keyValue.key, value, EntryMetadata.fromMessage(command))

    @staticmethod
    def fromResponse(header, value):
        if (header.command.status.code == messages.Message.Status.SUCCESS):
            return Entry.fromMessage(header, value)
        elif (header.command.status.code == messages.Message.Status.NOT_FOUND):
    def fromResponse(response, value):
        if (response.status.code == messages.Command.Status.SUCCESS):
            return Entry.fromMessage(response, value)
        elif (response.status.code == messages.Command.Status.NOT_FOUND):
            return None
        else:
            raise KineticClientException("Invalid response status, can' build entry from response.")
@@ -76,10 +76,10 @@ class Entry(object):
class EntryMetadata(object):

    @staticmethod
    def fromMessage(msg):
        if not msg: return None
        return EntryMetadata(msg.command.body.keyValue.dbVersion, msg.command.body.keyValue.tag,
                             msg.command.body.keyValue.algorithm)
    def fromMessage(command):
        if not command: return None
        return EntryMetadata(command.body.keyValue.dbVersion, command.body.keyValue.tag,
                             command.body.keyValue.algorithm)

    def __init__(self, version=None, tag=None, algorithm=None):
        self.version = version
+73 −73
Original line number Diff line number Diff line
@@ -26,38 +26,38 @@ import logging
LOG = logging.getLogger(__name__)


def _check_status(proto):
    if (proto.command.status.code == messages.Message.Status.SUCCESS):
def _check_status(command):
    if (command.status.code == messages.Command.Status.SUCCESS):
        return
    elif(proto.command.status.code == messages.Message.Status.VERSION_FAILURE):
        raise common.ClusterVersionFailureException(proto.command.status, proto.command.header.clusterVersion)
    elif(command.status.code == messages.Command.Status.VERSION_FAILURE):
        raise common.ClusterVersionFailureException(command.status, command.header.clusterVersion)
    else:
        raise KineticMessageException(proto.command.status)
        raise KineticMessageException(command.status)


def _buildMessage(messageType, key, data=None, version='', new_version='',
                  force=False, tag=None, algorithm=None, synchronization=None):
    m = messages.Message()
    m.command.header.messageType = messageType
    m = messages.Command()
    m.header.messageType = messageType
    if len(key) > common.MAX_KEY_SIZE: raise common.KineticClientException("Key exceeds maximum size of {0} bytes.".format(common.MAX_KEY_SIZE))
    m.command.body.keyValue.key = key
    m.body.keyValue.key = key
    if data:
        if len(data) > common.MAX_VALUE_SIZE: raise common.KineticClientException("Value exceeds maximum size of {0} bytes.".format(common.MAX_VALUE_SIZE))

    if tag and algorithm:
        m.command.body.keyValue.tag = tag
        m.command.body.keyValue.algorithm = algorithm
    elif messageType == messages.Message.PUT:
        m.command.body.keyValue.tag = 'l337'
        m.command.body.keyValue.algorithm = 1 # nacho: should be change to a value over 100
        m.body.keyValue.tag = tag
        m.body.keyValue.algorithm = algorithm
    elif messageType == messages.Command.PUT:
        m.body.keyValue.tag = 'l337'
        m.body.keyValue.algorithm = 1 # nacho: should be change to a value over 100
    if synchronization:
        m.command.body.keyValue.synchronization = synchronization
        m.body.keyValue.synchronization = synchronization
    if version:
        m.command.body.keyValue.dbVersion = version
        m.body.keyValue.dbVersion = version
    if new_version:
        m.command.body.keyValue.newVersion = new_version
        m.body.keyValue.newVersion = new_version
    if force:
        m.command.body.keyValue.force = True
        m.body.keyValue.force = True

    return (m,data)

@@ -65,8 +65,8 @@ class Noop(object):

    @staticmethod
    def build():
        m = messages.Message()
        m.command.header.messageType = messages.Message.NOOP
        m = messages.Command()
        m.header.messageType = messages.Command.NOOP
        return (m, None)

    @staticmethod
@@ -81,7 +81,7 @@ class Put(object):

    @staticmethod
    def build(key, data, version="", new_version="", **kwargs):
        return _buildMessage(messages.Message.PUT, key, data, version, new_version, **kwargs)
        return _buildMessage(messages.Command.PUT, key, data, version, new_version, **kwargs)

    @staticmethod
    def parse(m, value):
@@ -95,7 +95,7 @@ class Get(object):

    @staticmethod
    def build(key):
        return _buildMessage(messages.Message.GET, key)
        return _buildMessage(messages.Command.GET, key)

    @staticmethod
    def parse(m, value):
@@ -112,8 +112,8 @@ class GetMetadata(object):

    @staticmethod
    def build(key):
        (m,_) = _buildMessage(messages.Message.GET, key)
        m.command.body.keyValue.metadataOnly = True
        (m,_) = _buildMessage(messages.Command.GET, key)
        m.body.keyValue.metadataOnly = True
        return (m, None)

    @staticmethod
@@ -128,7 +128,7 @@ class Delete(object):

    @staticmethod
    def build(key, version="", **kwargs):
        return _buildMessage(messages.Message.DELETE, key, version=version, **kwargs)
        return _buildMessage(messages.Command.DELETE, key, version=version, **kwargs)

    @staticmethod
    def parse(m, value):
@@ -145,7 +145,7 @@ class GetNext(object):

    @staticmethod
    def build(key):
        return _buildMessage(messages.Message.GETNEXT, key)
        return _buildMessage(messages.Command.GETNEXT, key)

    @staticmethod
    def parse(m, value):
@@ -159,7 +159,7 @@ class GetPrevious(object):

    @staticmethod
    def build(key):
        return _buildMessage(messages.Message.GETPREVIOUS, key)
        return _buildMessage(messages.Command.GETPREVIOUS, key)

    @staticmethod
    def parse(m, value):
@@ -181,10 +181,10 @@ class GetKeyRange(object):
        if len(startKey) > common.MAX_KEY_SIZE: raise common.KineticClientException("Start key exceeds maximum size of {0} bytes.".format(common.MAX_KEY_SIZE))
        if len(endKey) > common.MAX_KEY_SIZE: raise common.KineticClientException("End key exceeds maximum size of {0} bytes.".format(common.MAX_KEY_SIZE))

        m = messages.Message()
        m.command.header.messageType = messages.Message.GETKEYRANGE
        m = messages.Command()
        m.header.messageType = messages.Command.GETKEYRANGE

        kr = m.command.body.range
        kr = m.body.range
        kr.startKey = startKey
        kr.endKey = endKey
        kr.startKeyInclusive = startKeyInclusive
@@ -195,7 +195,7 @@ class GetKeyRange(object):

    @staticmethod
    def parse(m, value):
        return [k for k in m.command.body.range.key] # key is actually a set of keys
        return [k for k in m.body.range.keys] # key is actually a set of keys

    @staticmethod
    def onError(e):
@@ -205,12 +205,12 @@ class GetVersion(object):

    @staticmethod
    def build(key):
        (m,_) = _buildMessage(messages.Message.GETVERSION, key)
        (m,_) = _buildMessage(messages.Command.GETVERSION, key)
        return (m, None)

    @staticmethod
    def parse(m, value):
        return m.command.body.keyValue.dbVersion
        return m.body.keyValue.dbVersion

    @staticmethod
    def onError(e):
@@ -223,19 +223,19 @@ class P2pPush(object):

    @staticmethod
    def build(keys, hostname='localhost', port=8123, tls=False):
        m = messages.Message()
        m.command.header.messageType = messages.Message.PEER2PEERPUSH
        m.command.body.p2pOperation.peer.hostname = hostname
        m.command.body.p2pOperation.peer.port = port
        m.command.body.p2pOperation.peer.tls = tls
        m = messages.Command()
        m.header.messageType = messages.Command.PEER2PEERPUSH
        m.body.p2pOperation.peer.hostname = hostname
        m.body.p2pOperation.peer.port = port
        m.body.p2pOperation.peer.tls = tls

        operations = []
        for k in keys:
            op = None
            if isinstance(k, str):
                op = messages.Message.P2POperation.Operation(key=k)
                op = messages.Command.P2POperation.Operation(key=k)
            elif isinstance(k, common.P2pOp):
                op = messages.Message.P2POperation.Operation(key=k.key)
                op = messages.Command.P2POperation.Operation(key=k.key)
                if k.version:
                    op.version = k.version
                if k.newKey:
@@ -244,13 +244,13 @@ class P2pPush(object):
                    op.force = k.force
            operations.append(op)

        m.command.body.p2pOperation.operation.extend(operations)
        m.body.p2pOperation.operation.extend(operations)

        return (m, None)

    @staticmethod
    def parse(m, value):
        return [op for op in m.command.body.p2pOperation.operation]
        return [op for op in m.body.p2pOperation.operation]

    @staticmethod
    def onError(e):
@@ -260,12 +260,12 @@ class P2pPipedPush(object):

    @staticmethod
    def build(keys, targets):
        m = messages.Message()
        m.command.header.messageType = messages.Message.PEER2PEERPUSH
        m.command.body.p2pOperation.peer.hostname = targets[0].hostname
        m.command.body.p2pOperation.peer.port = targets[0].port
        m = messages.Command()
        m.header.messageType = messages.Command.PEER2PEERPUSH
        m.body.p2pOperation.peer.hostname = targets[0].hostname
        m.body.p2pOperation.peer.port = targets[0].port
        if targets[0].tls:
            m.command.body.p2pOperation.peer.tls = targets[0].tls
            m.body.p2pOperation.peer.tls = targets[0].tls


        def rec(targets, op):
@@ -275,7 +275,7 @@ class P2pPipedPush(object):
                op.p2pop.peer.port = target.port
                if target.tls:
                    op.p2pop.peer.tls = target.tls
                innerop = messages.Message.P2POperation.Operation(key=op.key)
                innerop = messages.Command.P2POperation.Operation(key=op.key)
                if op.version:
                    innerop.version = op.version
                if op.force:
@@ -290,9 +290,9 @@ class P2pPipedPush(object):
        for k in keys:
            op = None
            if isinstance(k, str):
                op = messages.Message.P2POperation.Operation(key=k)
                op = messages.Command.P2POperation.Operation(key=k)
            elif isinstance(k, common.P2pOp):
                op = messages.Message.P2POperation.Operation(key=k.key)
                op = messages.Command.P2POperation.Operation(key=k.key)
                if k.version:
                    op.version = k.version
                if k.newKey:
@@ -307,13 +307,13 @@ class P2pPipedPush(object):
        if warn_newKey:
            LOG.warn("Setting new key on piped push is not currently supported.")

        m.command.body.p2pOperation.operation.extend(operations)
        m.body.p2pOperation.operation.extend(operations)

        return (m, None)

    @staticmethod
    def parse(m, value):
        return [op for op in m.command.body.p2pOperation.operation]
        return [op for op in m.body.p2pOperation.operation]

    @staticmethod
    def onError(e):
@@ -323,20 +323,20 @@ class PushKeys(object):

    @staticmethod
    def build(keys, hostname='localhost', port=8123, **kwargs):
        m = messages.Message()
        m.command.header.messageType = messages.Message.PEER2PEERPUSH
        m.command.body.p2pOperation.peer.hostname = hostname
        m.command.body.p2pOperation.peer.port = port
        m = messages.Command()
        m.header.messageType = messages.Command.PEER2PEERPUSH
        m.body.p2pOperation.peer.hostname = hostname
        m.body.p2pOperation.peer.port = port

        m.command.body.p2pOperation.operation.extend([
            messages.Message.P2POperation.Operation(key=key) for key in keys
        m.body.p2pOperation.operation.extend([
            messages.Command.P2POperation.Operation(key=key) for key in keys
        ])

        return (m, None)

    @staticmethod
    def parse(m, value):
        return [op for op in m.command.body.p2pOperation.operation]
        return [op for op in m.body.p2pOperation.operation]

    @staticmethod
    def onError(e):
@@ -346,8 +346,8 @@ class Flush(object):

    @staticmethod
    def build():
        m = messages.Message()
        m.command.header.messageType = messages.Message.FLUSHALLDATA
        m = messages.Command()
        m.header.messageType = messages.Command.FLUSHALLDATA

        return (m, None)

@@ -366,10 +366,10 @@ class GetLog(object):

    @staticmethod
    def build(types, device=None):
        m = messages.Message()
        m.command.header.messageType = messages.Message.GETLOG
        m = messages.Command()
        m.header.messageType = messages.Command.GETLOG

        log = m.command.body.getLog
        log = m.body.getLog
        log.type.extend(types) #type is actually a repeatable field

        if device:
@@ -380,9 +380,9 @@ class GetLog(object):
    @staticmethod
    def parse(m, value):
        if value:
            return (m.command.body.getLog, value)
            return (m.body.getLog, value)
        else:
            return m.command.body.getLog
            return m.body.getLog

    @staticmethod
    def onError(e):
@@ -395,10 +395,10 @@ class Setup(object):

    @staticmethod
    def build(**kwargs):
        m = messages.Message()
        m.command.header.messageType = messages.Message.SETUP
        m = messages.Command()
        m.header.messageType = messages.Command.SETUP

        op = m.command.body.setup
        op = m.body.setup

        value = None

@@ -432,20 +432,20 @@ class Security(object):

    @staticmethod
    def build(acls):
        m = messages.Message()
        m.command.header.messageType = messages.Message.SECURITY
        m = messages.Command()
        m.header.messageType = messages.Command.SECURITY

        proto_acls = []

        for acl in acls:
            proto_acl = messages.Message.Security.ACL(identity=acl.identity,
            proto_acl = messages.Command.Security.ACL(identity=acl.identity,
                                                      key=acl.key,
                                                      hmacAlgorithm=acl.hmacAlgorithm)

            proto_domains = []

            for domain in acl.domains:
                proto_d = messages.Message.Security.ACL.Scope(
                proto_d = messages.Command.Security.ACL.Scope(
                            TlsRequired=domain.tlsRequired)

                proto_d.permission.extend(domain.roles)
@@ -460,7 +460,7 @@ class Security(object):
            proto_acl.scope.extend(proto_domains)
            proto_acls.append(proto_acl)

        m.command.body.security.acl.extend(proto_acls)
        m.body.security.acl.extend(proto_acls)

        return (m, None)