Commit 8412edd1 authored by Ignacio Corderi's avatar Ignacio Corderi
Browse files

Moved some thigns around

parent 1c8d6c6e
Loading
Loading
Loading
Loading
+1 −66
Original line number Diff line number Diff line
@@ -28,68 +28,3 @@ public protocol KineticChannel {
    func clone() -> KineticSession
    func close()
}
 No newline at end of file

protocol StreamChannel {
    var inp: NSInputStream? { get set }
    var out: NSOutputStream? { get set }
}

extension StreamChannel {        
    
    func rawSend(proto: NSData, value: Bytes?) throws {
        // Prepare 9 bytes header
        // 1 byte - magic number | 4 bytes - proto length | 4 bytes - value length
        var headerBuffer = Bytes(count: 9, repeatedValue: 0)
        headerBuffer[0] = 70 // Magic
        copyFromUInt32(&headerBuffer, offset: 1, value: UInt32(proto.length))
        if value != nil {
            copyFromUInt32(&headerBuffer, offset: 5, value: UInt32(value!.count))
        }
        
        // Send header, proto and value
        let outputStream = self.out!
        outputStream.write(headerBuffer, maxLength: headerBuffer.count)
        var array = Bytes(count: proto.length, repeatedValue: 0)
        // TODO: make sure this is a non-memcopy operation
        proto.getBytes(&array, length: proto.length)
        outputStream.write(array, maxLength: array.count)
        if value != nil {
            outputStream.write(value!, maxLength: value!.count)
        }
    }
    
    func rawReceive() throws -> (Message, Bytes) {
        let inputStream = self.inp!
        
        var headerBuffer = Bytes(count:9, repeatedValue: 0)
        
        // TODO: what are the semantics of read in swift? does it read all?
        let _ = inputStream.read(&headerBuffer, maxLength: headerBuffer.count)
        
        if headerBuffer[0] != 70 {
            throw KineticConnectionErrors.InvalidMagicNumber
        }
        
        let protoLength = Int(bytesToUInt32(headerBuffer, offset: 1))
        let valueLength = Int(bytesToUInt32(headerBuffer, offset: 5))
        
        var protoBuffer = Array<UInt8>(count:protoLength, repeatedValue: 0)
        // TODO: what are the semantics of read in swift? does it read all?
        let _ = inputStream.read(&protoBuffer, maxLength: protoBuffer.count)
        
        let proto = NSData(bytes: &protoBuffer, length: protoLength)
        let msg = try Message.parseFromData(proto)
        // TODO: verify HMAC
        
        if valueLength > 0 {
            var value = Bytes(count:valueLength, repeatedValue: 0)
            // TODO: what are the semantics of read in swift? does it read all?
            let _ = inputStream.read(&value, maxLength: value.count)
            
            return (msg, value)
        } else {
            return (msg, [])
        }
    }
}
+64 −0
Original line number Diff line number Diff line
@@ -22,6 +22,70 @@

public let connect = NetworkChannel.connect

protocol StreamChannel {
    var inp: NSInputStream? { get set }
    var out: NSOutputStream? { get set }
}

extension StreamChannel {
    
    func rawSend(proto: NSData, value: Bytes?) throws {
        // Prepare 9 bytes header
        // 1 byte - magic number | 4 bytes - proto length | 4 bytes - value length
        var headerBuffer = Bytes(count: 9, repeatedValue: 0)
        headerBuffer[0] = 70 // Magic
        copyFromUInt32(&headerBuffer, offset: 1, value: UInt32(proto.length))
        if value != nil {
            copyFromUInt32(&headerBuffer, offset: 5, value: UInt32(value!.count))
        }
        
        // Send header, proto and value
        let outputStream = self.out!
        outputStream.write(headerBuffer, maxLength: headerBuffer.count)
        var array = Bytes(count: proto.length, repeatedValue: 0)
        // TODO: make sure this is a non-memcopy operation
        proto.getBytes(&array, length: proto.length)
        outputStream.write(array, maxLength: array.count)
        if value != nil {
            outputStream.write(value!, maxLength: value!.count)
        }
    }
    
    func rawReceive() throws -> (Message, Bytes) {
        let inputStream = self.inp!
        
        var headerBuffer = Bytes(count:9, repeatedValue: 0)
        
        // TODO: what are the semantics of read in swift? does it read all?
        let _ = inputStream.read(&headerBuffer, maxLength: headerBuffer.count)
        
        if headerBuffer[0] != 70 {
            throw KineticConnectionErrors.InvalidMagicNumber
        }
        
        let protoLength = Int(bytesToUInt32(headerBuffer, offset: 1))
        let valueLength = Int(bytesToUInt32(headerBuffer, offset: 5))
        
        var protoBuffer = Array<UInt8>(count:protoLength, repeatedValue: 0)
        // TODO: what are the semantics of read in swift? does it read all?
        let _ = inputStream.read(&protoBuffer, maxLength: protoBuffer.count)
        
        let proto = NSData(bytes: &protoBuffer, length: protoLength)
        let msg = try Message.parseFromData(proto)
        // TODO: verify HMAC
        
        if valueLength > 0 {
            var value = Bytes(count:valueLength, repeatedValue: 0)
            // TODO: what are the semantics of read in swift? does it read all?
            let _ = inputStream.read(&value, maxLength: value.count)
            
            return (msg, value)
        } else {
            return (msg, [])
        }
    }
}

public class NetworkChannel: CustomStringConvertible, KineticChannel, StreamChannel {
    
    public let host: String