Commit 45f8b372 authored by Ignacio Corderi's avatar Ignacio Corderi
Browse files

Removed deprecated clients

parent 0c516add
Loading
Loading
Loading
Loading
+12 −9
Original line number Diff line number Diff line
@@ -27,8 +27,7 @@ import socket
import sys

import kinetic
from kinetic import client
from kinetic import PipelinedClient
from kinetic import AsyncClient

preparser = argparse.ArgumentParser(add_help=False)
preparser.add_argument('-H', '--hostname', default='localhost')
@@ -155,8 +154,9 @@ class Cmd(cmd.Cmd, object):

        hostname = options.get('hostname', 'localhost')
        port = options.get('port', 8123)
        self.client = PipelinedClient(hostname, port)
        self.verbose = options.get('verbose') or 0
        self.client = AsyncClient(hostname, port)
        self.client.connect()

    def do_verbose(self, line):
        """Set active verbosity level [0-3]"""
@@ -230,10 +230,10 @@ class Cmd(cmd.Cmd, object):

    @add_parser(getr_parser)
    def do_getr(self, args):
        keys = self._list(args)
        if not keys:
            return 1
        for entry in self.client.gets(keys):
        # behave like a prefix
        if not args.end:
            args.end = args.start + '\xff'
        for entry in self.client.getRange(args.start, args.end):
            if args.verbose:
                print >> sys.stderr, 'key:', entry.key
            sys.stdout.write(entry.value)
@@ -241,9 +241,12 @@ class Cmd(cmd.Cmd, object):

    @add_parser(deleter_parser)
    def do_deleter(self, args):
        def on_success(m): pass
        def on_error(ex): pass
        keys = self._list(args)
        if not self.client.deletes(keys):
            return 1
        for k in keys:
            self.client.deleteAsync(on_success, on_error, k, force=True)
        self.client.wait()


def handle_loop(**options):

kinetic/greenclient.py

deleted100644 → 0
+0 −189
Original line number Diff line number Diff line
# Copyright (C) 2014 Seagate Technology.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA

#@author: Clayg

from collections import deque

from kinetic import operations
from baseasync import BaseAsync
import common

import eventlet
eventlet.monkey_patch()

DEFAULT_DEPTH = 16

class Response(object):

    def __init__(self):
        self.resp = eventlet.event.Event()
        self._hasError = False

    def setResponse(self, v):
        self.resp.send(v)

    def setError(self, e):
        self._hasError = True
        self.resp.send(e)

    def wait(self):
        resp = self.resp.wait()
        if self._hasError:
            raise resp
        else:
            return resp

    def ready(self):
        return self.resp.ready()

class GreenClient(BaseAsync):

    def __init__(self, *args, **kwargs):
        super(GreenClient, self).__init__(*args, **kwargs)
        self._running = False

    def connect(self):
        super(GreenClient, self).connect()
        self._dispatcher = eventlet.spawn(self._run)
        self._running = True

    def close(self, flush=True, timeout=None):
        if self._running:
            self._running = False
            with eventlet.Timeout(timeout, False):
                if flush:
                    self._dispatcher.wait()
            if not self._dispatcher.dead:
                self._dispatcher.kill()
        super(GreenClient, self).close()

    def _flush(self):
        while self._pending:
            self._wait()

    def wait(self):
        while self._pending:
            eventlet.sleep(0.05)

    def _run(self):
        while self._running:
            if len(self._pending) > 0:
                self._async_recv()
            else:
                eventlet.sleep(0.1)
        while len(self._pending) > 0:
            self._async_recv()

    def submit(self, op, *args, **kwargs):
        promise = Response()
        self._processAsync(op, promise.setResponse, promise.setError, *args, **kwargs)
        return promise

    def put(self, key, data, *args, **kwargs):
        return self.submit(operations.Put, key, data, *args, **kwargs)

    def get(self, key, *args, **kwargs):
        return self.submit(operations.Get, key, *args, **kwargs)

    def delete(self, key, *args, **kwargs):
        return self.submit(operations.Delete, key, *args, **kwargs)

    def getPrevious(self, *args, **kwargs):
        return self.submit(operations.GetPrevious, *args, **kwargs)

    def getKeyRange(self, *args, **kwargs):
        return self.submit(operations.GetKeyRange, *args, **kwargs)

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, t, v, tb):
        self.close()

    def put_entries(self, entries, depth=DEFAULT_DEPTH):
        pending = deque()
        for entry in entries:
            if len(pending) >= depth:
                # we have to wait on something, may as well be the oldest?
                pending.popleft().wait()
            pending.append(self.put(entry.key, entry.value, synchronization=common.Synchronization.WRITEBACK))
        for resp in pending:
            resp.wait()

    def get_keys(self, keys, depth=DEFAULT_DEPTH):
        pending = deque()
        for key in keys:
            if len(pending) >= depth:
                yield pending.popleft().wait()
            pending.append(self.get(key))
        for resp in pending:
            yield resp.wait()

    def delete_keys(self, keys, depth=DEFAULT_DEPTH):
        """
        Delete a number of keys out of kinetic with pipelined requests.

        :param keys: an iterable of keys

        If keys is a generator, it's yield will be sent a boolean indicating
        if a key was missing.

        :param depth: max number of outstanding requests
        """
        keys = iter(keys)
        if hasattr(keys, 'send'):
            # yield responses to keys if it's a generator
            iter_method = keys.send
        else:
            # otherwise, swallow deleted and consume
            iter_method = lambda deleted: keys.next()
        pending = deque()
        any_deleted = False
        missing = None
        while True:
            try:
                key = iter_method(missing)
            except StopIteration:
                break
            pending.append(self.delete(key))
            if len(pending) >= depth:
                missing = not pending.popleft().wait()
                any_deleted |= not missing
        # drain remaining requests
        for resp in pending:
            found = resp.wait()
            any_deleted |= found
        return any_deleted

    def push_keys(self, target, keys, batch=16):
        host, port = target.split(':')
        port = int(port)
        key_batch = []
        results = []
        for key in keys:
            key_batch.append(key)
            if len(key_batch) < batch:
                continue
            # send a batch
            resp = self.submit(operations.PushKeys, key_batch, host, port)
            results.extend(resp.wait())
            key_batch = []
        if key_batch:
            resp = self.submit(operations.PushKeys, key_batch, host, port)
            results.extend(resp.wait())
        return results

kinetic/pipelinedclient.py

deleted100644 → 0
+0 −183
Original line number Diff line number Diff line
# Copyright (C) 2014 Seagate Technology.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA

#@author: Clayg

import logging
import kinetic_pb2 as messages
import operations
from client import Client
from common import Entry
from collections import deque

LOG = logging.getLogger(__name__)

DEFAULT_PIPELINE_DEPTH = 16

class PipelinedClient(Client):

    def __init__(self, *args, **kwargs):
        super(PipelinedClient, self).__init__(*args, **kwargs)
        self.depth = kwargs.pop('depth', DEFAULT_PIPELINE_DEPTH)
        self._pending = set()

    @property
    def num_pending(self):
        return len(self._pending)

    ### Override BaseClient Methods ###

    def close(self):
        super(PipelinedClient, self).close()
        self._pending = set()

    def network_send(self, message, value):
        super(PipelinedClient, self).network_send(message, value)
        self._pending.add(message.command.header.sequence)
        LOG.debug('Pending sequence: %r' % self._pending)

    def network_recv(self):
        (resp,value) = super(PipelinedClient, self).network_recv()
        self._pending.remove(resp.command.header.ackSequence)
        LOG.debug('Pending sequence: %r' % self._pending)
        return (resp,value)

    ###

    def gets(self, keys, depth=None, **kwargs):
        """
        Retrieve multiple of values out of kinetic in an ordered fashion
        with pipelined requests.

        :param keys: a iterable of keys
        :param depth: the maximum number of outstanding requests

        :returns: a generator of Entry instances for the passed in keys
        """
        depth = kwargs.get('read_depth', depth) or self.depth
        seq = deque()
        ack_map = {}
        with self:
            for key in keys:
                while len(ack_map) + len(seq) >= depth:
                    # need to bring in some outstanding requests
                    (resp,value) = self.network_recv()
                    seq_id = resp.command.header.ackSequence
                    if seq_id != seq[0]:
                        ack_map[seq_id] = resp
                        try:
                            resp = ack_map.pop(seq[0])
                        except KeyError:
                            # maybe next time
                            continue
                    yield Entry.fromResponse(resp, value)
                    seq.popleft()
                # always send!
                m,v = operations.Get.build(key)
                self.update_header(m)
                self.network_send(m,v)
                seq.append(m.command.header.sequence)
            # drain remaining requests
            for next_id in seq:
                try:
                    (resp,value) = ack_map.pop(next_id)
                except KeyError:
                    for (resp,value) in self:
                        seq_id = resp.command.header.ackSequence
                        if seq_id == next_id:
                            # this is the one we're looking for!
                            break
                        # ain't it - throw it in the map
                        ack_map[seq_id] = (resp,value)
                yield Entry.fromResponse(resp, value)

    ### Iterator support ###

    def __iter__(self):
        return self

    def next(self):
        if self.num_pending <= 0:
            raise StopIteration()
        return self.network_recv()

    ###

    def puts(self, entries, depth=None, **kwargs):
        """
        Load a number of entries into kinetic with pipelined requests.

        :param entry_iter: an interable producing Entry objects
        :param depth: the maximum number of outstanding requests
        """
        depth = kwargs.get('write_depth', depth) or self.depth
        # TODO: try to yield an associated entry?
        # the interface feels better if rv is None, I guess i never liked the
        # rv from put either :\
        with self:
            for entry in entries:
                if self.num_pending >= depth:
                    self.network_recv() # TODO What is this for!? if there are operations pending the results will be lost
                m,v = operations.Put.build(entry.key,
                                           data=entry.value,
                                           new_version=entry.metadata.version)
                self.update_header(m)
                self.network_send(m,v)
            # drain remaining requests
            for r in self:
                pass

    def deletes(self, keys, depth=None):
        """
        Delete a number of keys out of kinetic with pipelined requests.

        :param keys: an iterable of keys

        If keys is a generator, it's yield will be sent a boolean indicating
        if a key was missing.

        :param depth: max number of outstanding requests
        """
        depth = depth or self.depth
        keys = iter(keys)
        if hasattr(keys, 'send'):
            # yield responses to keys if it's a generator
            iter_method = keys.send
        else:
            # otherwise, swallow deleted and consume
            iter_method = lambda deleted: keys.next()
        any_deleted = False
        missing = None
        with self:
            while True:
                try:
                    # TODO: with an ack_map we could send the missing key
                    key = iter_method(missing)
                except StopIteration:
                    break
                if self.num_pending >= depth:
                    (resp,value) = self.network_recv()
                    missing = Entry.fromResponse(resp, vaue) is None
                    any_deleted |= not missing
                #TODO(clayg): add support for versioned deletes
                # TODO: hey @clayg you are not going to support keys with versions?
                m,v = operations.Delete.build(key)
                self.update_header(m)
                self.network_send(m,v)
            # drain remaining requests
            for (resp,value) in self:
                any_deleted |= Entry.fromResponse(resp, value) is not None
        return any_deleted
+5 −5
Original line number Diff line number Diff line
@@ -257,7 +257,7 @@ class TestCommand(BaseCommandTestCase):
        # put something there
        self.client.put(self.test_key, 'myvalue')
        # try a short offset to value from command line
        args = '-v next %s' % self.test_key[:-1]
        args = '-vb next %s' % self.test_key[:-1]
        with self.capture_stdio() as stdio:
            errorcode = cmd.main(self.conn_args + args)
            stdout, stderr = stdio
@@ -331,7 +331,7 @@ class TestCommand(BaseCommandTestCase):
        # put something there
        self.client.put(self.test_key, 'myvalue')
        # try a short offset to value from command line
        args = '-v prev %s~' % self.test_key
        args = '-vb prev %s~' % self.test_key
        with self.capture_stdio() as stdio:
            errorcode = cmd.main(self.conn_args + args)
            stdout, stderr = stdio
@@ -358,8 +358,8 @@ class TestGetRangeCommand(BaseCommandTestCase):
    def test_missing_keys(self):
        args = 'getr %s' % self.test_key
        errorcode, output = self.run_cmd(args)
        self.assertTrue(errorcode)
        self.assertEquals('', output)
        self.assertFalse(errorcode)
        self.assertEquals('\n', output)

    def test_explicit_range(self):
        num_keys = 10
@@ -394,7 +394,7 @@ class TestDeleteRangeCommand(BaseCommandTestCase):
    def test_missing_keys(self):
        args = 'deleter %s' % self.test_key
        errorcode, output = self.run_cmd(args)
        self.assertTrue(errorcode)
        self.assertFalse(errorcode)
        self.assertEquals('', output)

    def test_explicit_range(self):

test/test_greenclient.py

deleted100644 → 0
+0 −527

File deleted.

Preview size limit exceeded, changes collapsed.