Loading kinetic/client.go +18 −2 Original line number Diff line number Diff line Loading @@ -30,6 +30,7 @@ import ( "crypto/sha1" "encoding/binary" "net" "fmt" "github.com/seagate/kinetic-go/kinetic/network" kproto "github.com/seagate/kinetic-go/kinetic/proto" Loading @@ -53,6 +54,14 @@ func calculate_hmac(secret []byte, bytes []byte) []byte { return mac.Sum(nil) } type RemoteError struct { status kproto.Command_Status } func (e RemoteError) Error() string { return fmt.Sprintf("%v: %v", e.status.Code, *e.status.StatusMessage) } type Client interface { Put (key []byte, value []byte) ((<-chan error), error) Loading Loading @@ -109,6 +118,11 @@ func (self *NetworkClient) listen(notifications <-chan PendingOperation) { break } var response error if *cmd.Status.Code != kproto.Command_Status_SUCCESS { response = RemoteError { status: *cmd.Status } } // Notify caller // Seems more complicated than it should, but we are optimizing // for when we receive in order Loading @@ -116,14 +130,14 @@ func (self *NetworkClient) listen(notifications <-chan PendingOperation) { op := <-notifications // Chances are, it's in order if op.sequence == *cmd.Header.AckSequence { op.receiver <- nil // TODO: send back the actual response op.receiver <- response // TODO: send back the actual response break } else { // Either we missed it or it hasnt arrived yet. pending[op.sequence] = op op, ok := pending[*cmd.Header.AckSequence] if ok { // this is the case where we missed it op.receiver <- nil op.receiver <- response delete(pending, op.sequence) break } Loading @@ -146,6 +160,8 @@ func(self *NetworkClient) Put(key []byte, value []byte) ((<-chan error), error) Body: &kproto.Command_Body { KeyValue: &kproto.Command_KeyValue { Key: key, Algorithm: kproto.Command_SHA1.Enum(), Tag: make([]byte, 0), Synchronization: kproto.Command_WRITEBACK.Enum(), }, }, Loading main.go +17 −2 Original line number Diff line number Diff line Loading @@ -25,11 +25,26 @@ package main import "github.com/seagate/kinetic-go/kinetic" import "strconv" import "fmt" func main() { client, _ := kinetic.Connect("localhost:8123") defer client.Close() rx, _ := client.Put([]byte("from-go"), []byte("refactored!")) <-rx // wait for it count := 10 rxs := make([]<-chan error, count) for i := 0; i < count; i++ { rxs[i], _ = client.Put([]byte("from-go-" + strconv.Itoa(i)), []byte("refactored 2.0!")) } // wait for all for i := 0; i < count; i++ { err := <-rxs[i] if err != nil { fmt.Println(err) } } } No newline at end of file Loading
kinetic/client.go +18 −2 Original line number Diff line number Diff line Loading @@ -30,6 +30,7 @@ import ( "crypto/sha1" "encoding/binary" "net" "fmt" "github.com/seagate/kinetic-go/kinetic/network" kproto "github.com/seagate/kinetic-go/kinetic/proto" Loading @@ -53,6 +54,14 @@ func calculate_hmac(secret []byte, bytes []byte) []byte { return mac.Sum(nil) } type RemoteError struct { status kproto.Command_Status } func (e RemoteError) Error() string { return fmt.Sprintf("%v: %v", e.status.Code, *e.status.StatusMessage) } type Client interface { Put (key []byte, value []byte) ((<-chan error), error) Loading Loading @@ -109,6 +118,11 @@ func (self *NetworkClient) listen(notifications <-chan PendingOperation) { break } var response error if *cmd.Status.Code != kproto.Command_Status_SUCCESS { response = RemoteError { status: *cmd.Status } } // Notify caller // Seems more complicated than it should, but we are optimizing // for when we receive in order Loading @@ -116,14 +130,14 @@ func (self *NetworkClient) listen(notifications <-chan PendingOperation) { op := <-notifications // Chances are, it's in order if op.sequence == *cmd.Header.AckSequence { op.receiver <- nil // TODO: send back the actual response op.receiver <- response // TODO: send back the actual response break } else { // Either we missed it or it hasnt arrived yet. pending[op.sequence] = op op, ok := pending[*cmd.Header.AckSequence] if ok { // this is the case where we missed it op.receiver <- nil op.receiver <- response delete(pending, op.sequence) break } Loading @@ -146,6 +160,8 @@ func(self *NetworkClient) Put(key []byte, value []byte) ((<-chan error), error) Body: &kproto.Command_Body { KeyValue: &kproto.Command_KeyValue { Key: key, Algorithm: kproto.Command_SHA1.Enum(), Tag: make([]byte, 0), Synchronization: kproto.Command_WRITEBACK.Enum(), }, }, Loading
main.go +17 −2 Original line number Diff line number Diff line Loading @@ -25,11 +25,26 @@ package main import "github.com/seagate/kinetic-go/kinetic" import "strconv" import "fmt" func main() { client, _ := kinetic.Connect("localhost:8123") defer client.Close() rx, _ := client.Put([]byte("from-go"), []byte("refactored!")) <-rx // wait for it count := 10 rxs := make([]<-chan error, count) for i := 0; i < count; i++ { rxs[i], _ = client.Put([]byte("from-go-" + strconv.Itoa(i)), []byte("refactored 2.0!")) } // wait for all for i := 0; i < count; i++ { err := <-rxs[i] if err != nil { fmt.Println(err) } } } No newline at end of file