Loading kinetic/batch.py 0 → 100644 +155 −0 Original line number Diff line number Diff line # Copyright (C) 2015 Seagate Technology. # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA #@author: Paul Dardeau import common import logging LOG = logging.getLogger(__name__) class Batch(object): """ The Batch class is used for grouping a set of put and/or delete operations so that all are committed as one unit or all of them are canceled (aborted). A Batch object is obtained by calling :func:`~baseclient.BaseClient.begin_batch`. Once all relevant put and delete calls are made, 'commit' should be called to apply all of the operations, or 'abort' to cancel (abort) them. A Batch object cannot be reused for subsequent batches. After the 'commit' or 'abort' has completed successfully, a new Batch object should be requested for the next batch operation. """ MSG_BATCH_COMPLETED = "batch completed. no more operations are permitted within this batch." def __init__(self, client, batch_id): """ Initialize instance with Kinetic client and batch identifier. Args: client: the Kinetic client to use for batch operations. batch_id: the batch identifier to be used for client connection. """ self._client = client self._batch_id = batch_id self._op_count = 0 self._batch_completed = False # to detect attempted reuse def put(self, *args, **kwargs): """ Put an entry within the batch operation. The command is not committed until :func:`~batch.Batch.commit` is called and returns successfully. If a version is specified, it must match the one stored in the persistent storage. Otherwise, a KineticException is raised. Args: Kwargs: Raises: KineticException: if any internal error occurs. """ if self._batch_completed: raise common.KineticException(MSG_BATCH_COMPLETED) self._op_count += 1 #TODO: add batch put logging self._client.batch_put(self._batch_id, *args, **kwargs) def delete(self, *args, **kwargs): """ Delete the entry associated with the specified key. The command is not committed until :func:`~batch.Batch.commit` is called and returns successfully. If a version is specified, it must match the one stored in persistent storage. Otherwise, a KineticException is raised. Args: Kwargs: Raises: KineticExcpetion: if any internal error occurs. """ if self._batch_completed: raise common.KineticException(MSG_BATCH_COMPLETED) self._op_count += 1 #TODO: add batch delete logging self._client.batch_delete(self._batch_id, *args, **kwargs) def commit(self): """ Commit the current batch operation. When this call returned successfully, all the commands performed in the current batch are executed and committed to store successfully. Raises: KineticException: if any internal error occurred. The batch may or may not be committed. If committed, all commands are committed. Otherwise, no messages are committed. BatchAbortedException: if the commit failed. No messages within the batch were committed to the store. """ if self._batch_completed: raise common.KineticException(MSG_BATCH_COMPLETED) #TODO: add batch commit logging try: self._client.batch_commit(self._batch_id, self._op_count) except BatchAbortedException: self._batch_completed = True raise def abort(self): """ Abort the current batch operation. When this call returned successfully, all the commands queued in the current batch are aborted. Resources related to the current batch are cleaned up and released. Raises: KineticException: if any internal error occurred. """ if self._batch_completed: raise common.KineticException(MSG_BATCH_COMPLETED) #TODO: add batch abort logging self._client.batch_abort(self._batch_id) self._batch_completed = True def is_completed(self): """ Return boolean indicating whether the batch is completed (either committed or aborted) """ return self._batch_completed def operation_count(self): """ Return the number of operations that have been included in the batch. """ return self._op_count test/test_batch.py 0 → 100644 +196 −0 Original line number Diff line number Diff line # Copyright (C) 2015 Seagate Technology. # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA #@author: Paul Dardeau import unittest from kinetic import Client from kinetic import Batch from common import BatchAbortedExcpetion from base import BaseTestCase class BatchTestCase(BaseTestCase): def setUp(self): super(BatchTestCase, self).setUp() self.client = Client(self.host, self.port) self.client.connect() self.create_new_batch() def create_new_batch(self): self.batch = self.client.begin_batch() def test_batch_initial_state(self): self.assertFalse(self.batch.is_completed()) self.assertEquals(self.batch.operation_count(), 0) def test_batch_operation_count(self): key1 = 'test_batch_operation_count_1' key2 = 'test_batch_operation_count_2' key3 = 'test_batch_operation_count_3' self.batch.put(key1, '') self.assertEquals(self.batch.operation_count(), 1) self.batch.put(key2, '') self.assertEquals(self.batch.operation_count(), 2) self.batch.delete(key3) self.assertEquals(self.batch.operation_count(), 3) self.batch.abort() def test_batch_is_completed(self): key1 = 'test_batch_is_completed_1' key2 = 'test_batch_is_completed_2' self.assertFalse(self.batch.is_completed()) self.batch.put(key1, '') self.batch.delete(key2) self.assertFalse(self.batch.is_completed()) self.batch.commit() self.assertTrue(self.batch.is_completed()) # do it again, but abort this time self.create_new_batch() self.assertFalse(self.batch.is_completed()) self.batch.put(key1, '') self.batch.delete(key2) self.assertFalse(self.batch.is_completed()) self.batch.abort() self.assertTrue(self.batch.is_completed()) def test_batch_abort(self): self.abort() # abort with no operations in batch self.create_new_batch() key = 'key_should_not_exist' self.batch.put(key, '') self.abort() self.assertNone(self.client.get(key)) def test_batch_commit(self): self.commit() # commit with no operations in batch self.create_new_batch() key = 'key_should_exist' self.batch.put(key, '') self.batch.commit() self.assertNotNone(self.client.get(key)) def test_batch_delete_commit(self): # put an entry key = 'test_batch_delete_commit' self.client.put(key, '') self.batch.delete(key) self.batch.commit() self.assertNone(self.client.get(key)) def test_batch_delete_abort(self): # put an entry key = 'test_batch_delete_abort' self.client.put(key, '') self.batch.delete(key) self.batch.abort() self.assertNotNone(self.client.get(key)) def test_batch_put_commit(self): key = 'test_batch_put_commit' self.batch.put(key, '') self.batch.commit() self.assertNotNone(self.client.get(key)) def test_batch_put_abort(self): key = 'test_batch_put_abort' self.batch.put(key, '') self.batch.abort() self.assertNone(self.client.get(key)) def test_batch_multiple_put_commit(self): key1 = 'test_batch_multiple_put_commit_1' key2 = 'test_batch_multiple_put_commit_2' self.batch.put(key1, '') self.batch.put(key2, '') self.batch.commit() self.assertNotNone(self.client.get(key1)) self.assertNotNone(self.client.get(key2)) def test_batch_multiple_put_abort(self): key1 = 'test_batch_multiple_put_abort_1' key2 = 'test_batch_multiple_put_abort_2' self.batch.put(key1, '') self.batch.put(key2, '') self.batch.abort() self.assertNone(self.client.get(key1)) self.assertNone(self.client.get(key2)) def test_batch_multiple_delete_commit(self): key1 = 'test_batch_multiple_delete_commit_1' key2 = 'test_batch_multiple_delete_commit_2' self.client.put(key1, '') self.client.put(key2, '') self.batch.delete(key1) self.batch.delete(key2) self.batch.commit() self.assertNone(self.client.get(key1)) self.assertNone(self.client.get(key2)) def test_batch_multiple_delete_abort(self): key1 = 'test_batch_multiple_delete_abort_1' key2 = 'test_batch_multiple_delete_abort_2' self.client.put(key1, '') self.client.put(key2, '') self.batch.delete(key1) self.batch.delete(key2) self.batch.abort() self.assertNotNone(self.client.get(key1)) self.assertNotNone(self.client.get(key2)) def test_batch_mixed_commit(self): key1 = 'test_batch_mixed_commit_1' key2 = 'test_batch_mixed_commit_2' self.client.put(key1, '') self.batch.delete(key1) self.batch.put(key2, '') self.batch.commit() self.assertNone(self.client.get(key1)) self.assertNotNone(self.client.get(key2)) def test_batch_mixed_abort(self): key1 = 'test_batch_mixed_abort_1' key2 = 'test_batch_mixed_abort_2' self.client.put(key1, '') self.batch.delete(key1) self.batch.put(key2, '') self.batch.abort() self.assertNotNone(self.client.get(key1)) self.assertNone(self.client.get(key2)) def test_batch_reuse(self): key = 'test_batch_reuse' self.batch.put(key, '') self.batch.commit() self.assertRaises(common.BatchAbortedException, self.batch.delete(key)) if __name__ == '__main__': unittest.main() Loading
kinetic/batch.py 0 → 100644 +155 −0 Original line number Diff line number Diff line # Copyright (C) 2015 Seagate Technology. # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA #@author: Paul Dardeau import common import logging LOG = logging.getLogger(__name__) class Batch(object): """ The Batch class is used for grouping a set of put and/or delete operations so that all are committed as one unit or all of them are canceled (aborted). A Batch object is obtained by calling :func:`~baseclient.BaseClient.begin_batch`. Once all relevant put and delete calls are made, 'commit' should be called to apply all of the operations, or 'abort' to cancel (abort) them. A Batch object cannot be reused for subsequent batches. After the 'commit' or 'abort' has completed successfully, a new Batch object should be requested for the next batch operation. """ MSG_BATCH_COMPLETED = "batch completed. no more operations are permitted within this batch." def __init__(self, client, batch_id): """ Initialize instance with Kinetic client and batch identifier. Args: client: the Kinetic client to use for batch operations. batch_id: the batch identifier to be used for client connection. """ self._client = client self._batch_id = batch_id self._op_count = 0 self._batch_completed = False # to detect attempted reuse def put(self, *args, **kwargs): """ Put an entry within the batch operation. The command is not committed until :func:`~batch.Batch.commit` is called and returns successfully. If a version is specified, it must match the one stored in the persistent storage. Otherwise, a KineticException is raised. Args: Kwargs: Raises: KineticException: if any internal error occurs. """ if self._batch_completed: raise common.KineticException(MSG_BATCH_COMPLETED) self._op_count += 1 #TODO: add batch put logging self._client.batch_put(self._batch_id, *args, **kwargs) def delete(self, *args, **kwargs): """ Delete the entry associated with the specified key. The command is not committed until :func:`~batch.Batch.commit` is called and returns successfully. If a version is specified, it must match the one stored in persistent storage. Otherwise, a KineticException is raised. Args: Kwargs: Raises: KineticExcpetion: if any internal error occurs. """ if self._batch_completed: raise common.KineticException(MSG_BATCH_COMPLETED) self._op_count += 1 #TODO: add batch delete logging self._client.batch_delete(self._batch_id, *args, **kwargs) def commit(self): """ Commit the current batch operation. When this call returned successfully, all the commands performed in the current batch are executed and committed to store successfully. Raises: KineticException: if any internal error occurred. The batch may or may not be committed. If committed, all commands are committed. Otherwise, no messages are committed. BatchAbortedException: if the commit failed. No messages within the batch were committed to the store. """ if self._batch_completed: raise common.KineticException(MSG_BATCH_COMPLETED) #TODO: add batch commit logging try: self._client.batch_commit(self._batch_id, self._op_count) except BatchAbortedException: self._batch_completed = True raise def abort(self): """ Abort the current batch operation. When this call returned successfully, all the commands queued in the current batch are aborted. Resources related to the current batch are cleaned up and released. Raises: KineticException: if any internal error occurred. """ if self._batch_completed: raise common.KineticException(MSG_BATCH_COMPLETED) #TODO: add batch abort logging self._client.batch_abort(self._batch_id) self._batch_completed = True def is_completed(self): """ Return boolean indicating whether the batch is completed (either committed or aborted) """ return self._batch_completed def operation_count(self): """ Return the number of operations that have been included in the batch. """ return self._op_count
test/test_batch.py 0 → 100644 +196 −0 Original line number Diff line number Diff line # Copyright (C) 2015 Seagate Technology. # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA #@author: Paul Dardeau import unittest from kinetic import Client from kinetic import Batch from common import BatchAbortedExcpetion from base import BaseTestCase class BatchTestCase(BaseTestCase): def setUp(self): super(BatchTestCase, self).setUp() self.client = Client(self.host, self.port) self.client.connect() self.create_new_batch() def create_new_batch(self): self.batch = self.client.begin_batch() def test_batch_initial_state(self): self.assertFalse(self.batch.is_completed()) self.assertEquals(self.batch.operation_count(), 0) def test_batch_operation_count(self): key1 = 'test_batch_operation_count_1' key2 = 'test_batch_operation_count_2' key3 = 'test_batch_operation_count_3' self.batch.put(key1, '') self.assertEquals(self.batch.operation_count(), 1) self.batch.put(key2, '') self.assertEquals(self.batch.operation_count(), 2) self.batch.delete(key3) self.assertEquals(self.batch.operation_count(), 3) self.batch.abort() def test_batch_is_completed(self): key1 = 'test_batch_is_completed_1' key2 = 'test_batch_is_completed_2' self.assertFalse(self.batch.is_completed()) self.batch.put(key1, '') self.batch.delete(key2) self.assertFalse(self.batch.is_completed()) self.batch.commit() self.assertTrue(self.batch.is_completed()) # do it again, but abort this time self.create_new_batch() self.assertFalse(self.batch.is_completed()) self.batch.put(key1, '') self.batch.delete(key2) self.assertFalse(self.batch.is_completed()) self.batch.abort() self.assertTrue(self.batch.is_completed()) def test_batch_abort(self): self.abort() # abort with no operations in batch self.create_new_batch() key = 'key_should_not_exist' self.batch.put(key, '') self.abort() self.assertNone(self.client.get(key)) def test_batch_commit(self): self.commit() # commit with no operations in batch self.create_new_batch() key = 'key_should_exist' self.batch.put(key, '') self.batch.commit() self.assertNotNone(self.client.get(key)) def test_batch_delete_commit(self): # put an entry key = 'test_batch_delete_commit' self.client.put(key, '') self.batch.delete(key) self.batch.commit() self.assertNone(self.client.get(key)) def test_batch_delete_abort(self): # put an entry key = 'test_batch_delete_abort' self.client.put(key, '') self.batch.delete(key) self.batch.abort() self.assertNotNone(self.client.get(key)) def test_batch_put_commit(self): key = 'test_batch_put_commit' self.batch.put(key, '') self.batch.commit() self.assertNotNone(self.client.get(key)) def test_batch_put_abort(self): key = 'test_batch_put_abort' self.batch.put(key, '') self.batch.abort() self.assertNone(self.client.get(key)) def test_batch_multiple_put_commit(self): key1 = 'test_batch_multiple_put_commit_1' key2 = 'test_batch_multiple_put_commit_2' self.batch.put(key1, '') self.batch.put(key2, '') self.batch.commit() self.assertNotNone(self.client.get(key1)) self.assertNotNone(self.client.get(key2)) def test_batch_multiple_put_abort(self): key1 = 'test_batch_multiple_put_abort_1' key2 = 'test_batch_multiple_put_abort_2' self.batch.put(key1, '') self.batch.put(key2, '') self.batch.abort() self.assertNone(self.client.get(key1)) self.assertNone(self.client.get(key2)) def test_batch_multiple_delete_commit(self): key1 = 'test_batch_multiple_delete_commit_1' key2 = 'test_batch_multiple_delete_commit_2' self.client.put(key1, '') self.client.put(key2, '') self.batch.delete(key1) self.batch.delete(key2) self.batch.commit() self.assertNone(self.client.get(key1)) self.assertNone(self.client.get(key2)) def test_batch_multiple_delete_abort(self): key1 = 'test_batch_multiple_delete_abort_1' key2 = 'test_batch_multiple_delete_abort_2' self.client.put(key1, '') self.client.put(key2, '') self.batch.delete(key1) self.batch.delete(key2) self.batch.abort() self.assertNotNone(self.client.get(key1)) self.assertNotNone(self.client.get(key2)) def test_batch_mixed_commit(self): key1 = 'test_batch_mixed_commit_1' key2 = 'test_batch_mixed_commit_2' self.client.put(key1, '') self.batch.delete(key1) self.batch.put(key2, '') self.batch.commit() self.assertNone(self.client.get(key1)) self.assertNotNone(self.client.get(key2)) def test_batch_mixed_abort(self): key1 = 'test_batch_mixed_abort_1' key2 = 'test_batch_mixed_abort_2' self.client.put(key1, '') self.batch.delete(key1) self.batch.put(key2, '') self.batch.abort() self.assertNotNone(self.client.get(key1)) self.assertNone(self.client.get(key2)) def test_batch_reuse(self): key = 'test_batch_reuse' self.batch.put(key, '') self.batch.commit() self.assertRaises(common.BatchAbortedException, self.batch.delete(key)) if __name__ == '__main__': unittest.main()