Commit 871ad97d authored by Ignacio Corderi's avatar Ignacio Corderi
Browse files

Background workers inital refactor

parent 2bc5fabf
Loading
Loading
Loading
Loading
+56 −3
Original line number Diff line number Diff line
@@ -47,6 +47,9 @@ public class KineticSession {
    var sequence: Int64
    var channel: KineticChannel
    
    private var writerQueue :  dispatch_queue_t
    private var pending: [Int64: (RawResponse) -> ()]
    
    public private(set) var device: KineticDevice?
    public private(set) var credentials: AuthenticationCredential
    
@@ -67,6 +70,35 @@ public class KineticSession {
        self.sequence = 0
        self.channel = channel
        self.device = device
        self.writerQueue = dispatch_get_main_queue()
        self.pending = [:]
        
        // Reader
        dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0)) {
            print("Background reader for \(self.connectionId!) is active.")
            while self.connected {
                do {
                    print("waiting...")
                    let raw = try self.channel.receive()
                    print("Background loops seems to work... Ack:\(raw.command.header.ackSequence)")
                    if let x = self.pending[raw.command.header.ackSequence] {
                        Queue.global.async { x(raw) }
                    } else {
                        print("Oops: Unsolicited or unexpected ACK :/")
                    }
                } catch {
                    // TODO: fault the session, close the channel
                    print("Oops: please code me...")
                }
            }
        }
        
        // We only need the writer queue if connection was ok
        if self.device != nil {
            self.writerQueue = dispatch_queue_create(
                "KineticSession (wwn: \(self.device!.wwn), connection:\(self.connectionId!))"
                , DISPATCH_QUEUE_SERIAL)
        }
    }
    
    internal convenience init(withCredentials channel: KineticChannel, device: KineticDevice?,
@@ -118,9 +150,30 @@ public class KineticSession {
        
        self.credentials.authenticate(builder)
        
        // Send & receive
        // Prepare promise
        let promise = Promise<RawResponse, NoError>()
        self.pending[builder.command.header.sequence] = { r in
            do {
                try promise.success(r)
            } catch {
                print("Mmm... when does this happen?")
            }
            self.pending[builder.command.header.sequence] = nil
        }
        
        // Send & Receive
        dispatch_async(self.writerQueue) {
            do {
                print("Sending seq:\(builder.command.header.sequence)")
                try self.channel.send(builder)
        let r = try self.channel.receive()
            } catch {
                // TODO: write me!
                print("Sending failed :/ what a bummer")
            }
        }
        
        // TODO: all this !bangs are ugly, what can go wrong?
        let r = promise.future.forced()!.value!
        
        return C.ResponseType.parse(r)
    }