Commit 097e6a6c authored by Ignacio Corderi's avatar Ignacio Corderi
Browse files

Added support for Batch operations

parent fbc6b067
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@
		3EDAAB4D1B66D32D00F30808 /* KineticTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3EDAAB4C1B66D32D00F30808 /* KineticTests.swift */; };
		3EDAAB581B66D47200F30808 /* Session.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3EDAAB571B66D47200F30808 /* Session.swift */; };
		3EDB8E881B87198900503F9A /* Extensions.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3EDB8E871B87198900503F9A /* Extensions.swift */; };
		3EDB8E8A1B871C5C00503F9A /* Batch.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3EDB8E891B871C5C00503F9A /* Batch.swift */; };
		3EFB7F1C1B7A55D100988886 /* Core.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3EFB7F1B1B7A55D100988886 /* Core.swift */; };
		3EFB7F1E1B7A6A9800988886 /* Put.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3EFB7F1D1B7A6A9800988886 /* Put.swift */; };
		3EFB7F201B7A6C3300988886 /* Errors.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3EFB7F1F1B7A6C3300988886 /* Errors.swift */; };
@@ -53,6 +54,8 @@
		3EDAAB4E1B66D32D00F30808 /* Info.plist */ = {isa = PBXFileReference; lastKnownFileType = text.plist.xml; path = Info.plist; sourceTree = "<group>"; };
		3EDAAB571B66D47200F30808 /* Session.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Session.swift; sourceTree = "<group>"; };
		3EDB8E871B87198900503F9A /* Extensions.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Extensions.swift; sourceTree = "<group>"; };
		3EDB8E891B871C5C00503F9A /* Batch.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Batch.swift; sourceTree = "<group>"; };
		3EDB8E8B1B87299700503F9A /* Example 4 - Batch.playground */ = {isa = PBXFileReference; lastKnownFileType = file.playground; path = "Example 4 - Batch.playground"; sourceTree = "<group>"; };
		3EF4F82D1B7BE11400E4EB73 /* Example 1 - Hello World.playground */ = {isa = PBXFileReference; lastKnownFileType = file.playground; path = "Example 1 - Hello World.playground"; sourceTree = "<group>"; };
		3EFB7F1B1B7A55D100988886 /* Core.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Core.swift; sourceTree = "<group>"; };
		3EFB7F1D1B7A6A9800988886 /* Put.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Put.swift; sourceTree = "<group>"; };
@@ -151,6 +154,7 @@
				3EF4F82D1B7BE11400E4EB73 /* Example 1 - Hello World.playground */,
				3E386E6A1B7F85720040D363 /* Example 2 - Basics.playground */,
				3ED681EE1B7D0EC600AFDF79 /* Example 3 - Async.playground */,
				3EDB8E8B1B87299700503F9A /* Example 4 - Batch.playground */,
			);
			path = examples;
			sourceTree = "<group>";
@@ -164,6 +168,7 @@
				3EFB7F261B7AA45A00988886 /* Get.swift */,
				3EFB7F281B7AB06800988886 /* Delete.swift */,
				3EA070361B846C3800E4821E /* KeyRange.swift */,
				3EDB8E891B871C5C00503F9A /* Batch.swift */,
			);
			path = Commands;
			sourceTree = "<group>";
@@ -328,6 +333,7 @@
				3EFB7F291B7AB06800988886 /* Delete.swift in Sources */,
				3EFB7F2D1B7BAF8D00988886 /* NetworkChannel.swift in Sources */,
				3EDAAB581B66D47200F30808 /* Session.swift in Sources */,
				3EDB8E8A1B871C5C00503F9A /* Batch.swift in Sources */,
				3EFB7F271B7AA45A00988886 /* Get.swift in Sources */,
				3E07EA7F1B78E3B500DAB3F1 /* Kinetic.proto.swift in Sources */,
				3EDB8E881B87198900503F9A /* Extensions.swift in Sources */,
+211 −0
Original line number Diff line number Diff line
// Copyright (c) 2015 Seagate Technology

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:

// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.

// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

// @author: Ignacio Corderi

internal struct NoResponse : ChannelResponse {
    typealias ContextType = Void
    let success: Bool
    let error: KineticRemoteError?
    
    init() {
        self.success = true
        self.error = nil
    }
    
    static func parse(raw: RawResponse, context: Void) -> NoResponse {
        return NoResponse()
    }
}

class BatchBeginCommand : ChannelCommand {
    
    typealias ResponseType = VoidResponse
    
    private static var nextBatchId: UInt32 = 1
    
    private let batchId: UInt32
    
    internal init() {
        self.batchId = BatchBeginCommand.nextBatchId++
    }
    
    func build(builder: Builder, device: KineticDevice) {
        builder.header.messageType = .StartBatch
        builder.header.batchId = self.batchId
    }
    
}

class BatchPutCommand : ChannelCommand {
    
    typealias ResponseType = NoResponse
    
    let batchId: UInt32
    let key: KeyType
    let value: ValueType
    
    internal init(batchId: UInt32, key: KeyType, value: ValueType) {
        self.batchId = batchId
        self.key = key
        self.value = value
    }
    
    func build(builder: Builder, device: KineticDevice) {
        builder.header.messageType = .Put
        builder.header.batchId = self.batchId
        builder.keyValue.key = self.key.toData()
        builder.keyValue.tag = "1337".toData()
        builder.keyValue.algorithm = .Sha1
        builder.value = value.toBytes()
    }
    
}

class BatchDeleteCommand : ChannelCommand {
    
    typealias ResponseType = NoResponse
    
    let batchId: UInt32
    let key: KeyType
    
    internal init(batchId: UInt32, key: KeyType) {
        self.batchId = batchId
        self.key = key
    }
    
    func build(builder: Builder, device: KineticDevice) {
        builder.header.messageType = .Delete
        builder.header.batchId = self.batchId
        builder.keyValue.key = self.key.toData()
    }
    
}

class BatchCommitCommand : ChannelCommand {
    
    typealias ResponseType = VoidResponse // TODO: create a BatchCommitResponse
    
    let batchId: UInt32
    let count: Int32
    
    internal init(batchId: UInt32, count: Int32) {
        self.batchId = batchId
        self.count = count
    }
    
    func build(builder: Builder, device: KineticDevice) {
        builder.header.messageType = .EndBatch
        builder.header.batchId = self.batchId
        builder.batch.count = self.count
    }
    
}

class BatchAbortCommand : ChannelCommand {
    
    typealias ResponseType = VoidResponse
    
    let batchId: UInt32
    
    internal init(batchId: UInt32) {
        self.batchId = batchId
    }
    
    func build(builder: Builder, device: KineticDevice) {
        builder.header.messageType = .AbortBatch
        builder.header.batchId = self.batchId
    }
    
}

public enum BatchErrors: KineticError {
    case BatchIsNotActive
}

public class Batch {
    
    public unowned let session: KineticSession
    
    public let id: UInt32
    public private(set) var count: UInt
    public private(set) var aborted: Bool
    public private(set) var commited: Bool
    
    public var active: Bool { return !self.aborted  && !self.commited }
    
    internal init(_ session: KineticSession, id: UInt32) {
        self.session = session
        self.id = id
        self.count = 0
        self.aborted = false
        self.commited = false 
    }
    
    public func put(key: KeyType, value: ValueType) throws {
        guard self.active else { throw BatchErrors.BatchIsNotActive }
        
        let cmd = BatchPutCommand(batchId: self.id, key: key, value: value)
        try self.session.send(cmd)
        self.count += 1
    }
    
    public func delete(key: KeyType) throws {
        guard self.active else { throw BatchErrors.BatchIsNotActive }
        
        let cmd = BatchDeleteCommand(batchId: self.id, key: key)
        try self.session.send(cmd)
        self.count += 1
    }
    
    public func commit() throws {
        guard self.active else { throw BatchErrors.BatchIsNotActive }
        
        let cmd = BatchCommitCommand(batchId: self.id, count: Int32(self.count))
        try self.session.send(cmd)
        self.commited = true
    }
    
    public func abort() throws {
        guard self.active else { throw BatchErrors.BatchIsNotActive }
        
        let cmd = BatchAbortCommand(batchId: self.id)
        try self.session.send(cmd)
        self.aborted = true
    }
    
}

public extension KineticSession {
    func beginBatch() throws -> Batch {
        let cmd = BatchBeginCommand()
        try self.send(cmd)
        return Batch(self, id: cmd.batchId)
    }
    
    func swap(a: KeyType, _ b: KeyType) throws {
        let ar = try self.get(a)
        let br = try self.get(b)
        let batch = try self.beginBatch()
        try batch.put("a", value: br.value!)
        try batch.put("b", value: ar.value!)
        try batch.commit()
    }
}
 No newline at end of file
+8 −8
Original line number Diff line number Diff line
@@ -20,13 +20,13 @@

// @author: Ignacio Corderi

public struct NoResponse : ChannelResponse {
public struct VoidResponse : ChannelResponse {
    public typealias ContextType = Void
    public let success: Bool
    public let error: KineticRemoteError?
    
    public static func parse(raw: RawResponse, context: Void) -> NoResponse {
        return NoResponse(success: raw.command.status.code == .Success,
    public static func parse(raw: RawResponse, context: Void) -> VoidResponse {
        return VoidResponse(success: raw.command.status.code == .Success,
            error: KineticRemoteError.fromStatus(raw.command.status))
    }
}
@@ -35,20 +35,20 @@ public struct ValueResponse : ChannelResponse {
    public typealias ContextType = Void
    public let success: Bool
    public let error: KineticRemoteError?
    public let value: Bytes?
    public let value: ValueType?
    public let exists: Bool
    public var hasValue: Bool { return value != nil && value!.count > 0 }
    public var hasValue: Bool { return value != nil && value!.length > 0 }
    
    public static func parse(raw: RawResponse, context: Void) -> ValueResponse {
        switch raw.command.status.code {
        case .Success:
            return ValueResponse(success: true, error: nil, value: raw.value, exists: true)
            return ValueResponse(success: true, error: nil, value: NSData.fromBytes(raw.value), exists: true)
        case .NotFound:
            return ValueResponse(success: true, error: nil, value: nil, exists: false)
        default:
            return ValueResponse(success: false,
            error: KineticRemoteError.fromStatus(raw.command.status),
            value: raw.value,
            value: NSData.fromBytes(raw.value),
            exists: false)
        }
    }
@@ -57,7 +57,7 @@ public struct ValueResponse : ChannelResponse {
        get {
            if self.success {
                if self.hasValue {
                    return "Success (length: \(self.value!.count))"
                    return "Success (length: \(self.value!.length))"
                } else {
                    return "Success (Empty)"
                }
+1 −1
Original line number Diff line number Diff line
@@ -81,7 +81,7 @@ public struct EmptyResponse : ChannelResponse {
extension DeleteCommand: CustomStringConvertible {
    public var description: String {
        get {
            return "Delete (key: \(self.key.toUtf8()))"
            return "Delete (key: \(self.key.toUtf8String()))"
        }
    }
}
+2 −1
Original line number Diff line number Diff line
@@ -27,7 +27,8 @@ extension KineticSession {
        let r = try self.get(key)
        var v: UInt32 = 0
        if r.exists {
            v = bytesToUInt32(r.value!, offset: 0)
            let raw = r.value!.toBytes()
            v = bytesToUInt32(raw, offset: 0)
        }
        v += value
        try self.put(key, value: v)
Loading