Loading blockconnection.go +9 −9 Original line number Diff line number Diff line Loading @@ -33,7 +33,7 @@ func (conn *BlockConnection) NoOp() (Status, error) { return callback.Status(), nil } func (conn *BlockConnection) get(key []byte, getCmd kproto.Command_MessageType) (Record, Status, error) { func (conn *BlockConnection) get(key []byte, getCmd kproto.Command_MessageType) (*Record, Status, error) { callback := &GetCallback{} h := NewResponseHandler(callback) Loading @@ -47,25 +47,25 @@ func (conn *BlockConnection) get(key []byte, getCmd kproto.Command_MessageType) err = conn.nbc.GetNext(key, h) } if err != nil { return Record{}, callback.Status(), err return nil, callback.Status(), err } for callback.Done() == false { conn.nbc.Run() } return callback.Entry, callback.Status(), nil return &callback.Entry, callback.Status(), nil } func (conn *BlockConnection) Get(key []byte) (Record, Status, error) { func (conn *BlockConnection) Get(key []byte) (*Record, Status, error) { return conn.get(key, kproto.Command_GET) } func (conn *BlockConnection) GetNext(key []byte) (Record, Status, error) { func (conn *BlockConnection) GetNext(key []byte) (*Record, Status, error) { return conn.get(key, kproto.Command_GETNEXT) } func (conn *BlockConnection) GetPrevious(key []byte) (Record, Status, error) { func (conn *BlockConnection) GetPrevious(key []byte) (*Record, Status, error) { return conn.get(key, kproto.Command_GETPREVIOUS) } Loading Loading @@ -159,19 +159,19 @@ func (conn *BlockConnection) P2PPush(request *P2PPushRequest) ([]Status, Status, return callback.Statuses, callback.Status(), nil } func (conn *BlockConnection) GetLog(logs []LogType) (Log, Status, error) { func (conn *BlockConnection) GetLog(logs []LogType) (*Log, Status, error) { callback := &GetLogCallback{} h := NewResponseHandler(callback) err := conn.nbc.GetLog(logs, h) if err != nil { return Log{}, callback.Status(), err return nil, callback.Status(), err } for callback.Done() == false { conn.nbc.Run() } return callback.Logs, callback.Status(), nil return &callback.Logs, callback.Status(), nil } func (conn *BlockConnection) pinop(pin []byte, op kproto.Command_PinOperation_PinOpType) (Status, error) { Loading connection_test.go +57 −1 Original line number Diff line number Diff line Loading @@ -114,7 +114,37 @@ func TestNonBlockGetKeyRange(t *testing.T) { } } func TestNonBlockGetLog(t *testing.T) { func TestNonBlockGetLogCapacity(t *testing.T) { logs := []LogType{ LOG_CAPACITIES, } klogs, status, err := blockConn.GetLog(logs) if err != nil || status.Code != OK { t.Fatal("Nonblocking GetLog Failure") } if !(klogs.Capacity.CapacityInBytes > 0 && klogs.Capacity.PortionFull > 0) { t.Logf("%#v", klogs.Capacity) t.Fatal("Nonblocking GetLog for Capacity Failure") } } func TestNonBlockGetLogLimit(t *testing.T) { logs := []LogType{ LOG_LIMITS, } klogs, status, err := blockConn.GetLog(logs) if err != nil || status.Code != OK { t.Fatal("Nonblocking GetLog Failure") } if klogs.Limits.MaxKeySize != 4096 || klogs.Limits.MaxValueSize != 1024*1024 { t.Logf("%#v", klogs.Limits) t.Fatal("Nonblocking GetLog for Limits Failure") } } func TestNonBlockGetLogAll(t *testing.T) { logs := []LogType{ LOG_UTILIZATIONS, LOG_TEMPERATURES, Loading @@ -130,3 +160,29 @@ func TestNonBlockGetLog(t *testing.T) { } t.Logf("GetLog %+v", klogs) } func TestNonBlockMediaScan(t *testing.T) { op := MediaOperation{ StartKey: []byte("object000"), EndKey: []byte("object999"), StartKeyInclusive: true, EndKeyInclusive: true, } status, err := blockConn.MediaScan(&op, PRIORITY_NORMAL) if err != nil || status.Code != OK { t.Fatal("Nonblocking MediaScan Failure: ", status.Error()) } } func TestNonBlockMediaOptimize(t *testing.T) { op := MediaOperation{ StartKey: []byte("object000"), EndKey: []byte("object999"), StartKeyInclusive: true, EndKeyInclusive: true, } status, err := blockConn.MediaOptimize(&op, PRIORITY_NORMAL) if err != nil || status.Code != OK { t.Fatal("Nonblocking MediaOptimize Failure: ", status.Error()) } } service.go +9 −5 Original line number Diff line number Diff line Loading @@ -52,12 +52,14 @@ func newNetworkService(op ClientOptions) (*networkService, error) { return nil, err } ns := &networkService{conn: conn, ns := &networkService{ conn: conn, seq: 0, connId: 0, option: op, hmap: make(map[int64]*ResponseHandler), fatal: false} fatal: false, } _, _, _, err = ns.receive() if err != nil { Loading @@ -68,7 +70,8 @@ func newNetworkService(op ClientOptions) (*networkService, error) { return ns, nil } // When client network service has error, call error handling from all Messagehandler current in Queue. // When client network service has error, call error handling // from all Messagehandler current in Queue. func (ns *networkService) clientError(s Status, mh *ResponseHandler) { for ack, h := range ns.hmap { if h.callback != nil { Loading Loading @@ -140,7 +143,7 @@ func (ns *networkService) submit(msg *kproto.Message, cmd *kproto.Command, value err = ns.send(msg, value) if err != nil { return nil return err } klog.Info("Kinetic message send ", cmd.GetHeader().GetMessageType().String()) Loading @@ -151,9 +154,10 @@ func (ns *networkService) submit(msg *kproto.Message, cmd *kproto.Command, value } // update sequence number // TODO: Need mutex protection here ns.seq++ return err return nil } func (ns *networkService) send(msg *kproto.Message, value []byte) error { Loading utility.go 0 → 100644 +92 −0 Original line number Diff line number Diff line package kinetic import ( "fmt" "io" "io/ioutil" "os" ) func UpdateFirmware(conn *BlockConnection, file string) error { _, err := os.Stat(file) if err != nil { if os.IsNotExist(err) { klog.Errorf("Update firmware fail, file %s not exist", file) } return err } data, err := ioutil.ReadFile(file) if err != nil { klog.Errorf("Update firmware fail, file %s can't read", file) return err } status, err := conn.UpdateFirmware(data) if err != nil || status.Code != OK { klog.Errorf("Update firmware fail : %s\n", status.Error()) } return err } func UploadAppletFile(conn *BlockConnection, file, key string) ([][]byte, error) { info, err := os.Stat(file) if err != nil { if os.IsNotExist(err) { klog.Errorf("Upload applet fail, file %s not exist", file) } return nil, err } fileSize := info.Size() var chunkSize int64 = 1024 * 1024 chunks := fileSize / chunkSize if fileSize%chunkSize != 0 { chunks++ } keys := make([][]byte, chunks) f, err := os.Open(file) if err != nil { return nil, err } defer f.Close() buf := make([]byte, chunkSize) var n int var offset, cnt int = 0, 0 for { n, err = f.Read(buf) if err != nil { if err == io.EOF { break } else { // TODO: Should delete already PUT objects??? return nil, err } } keys[cnt] = []byte(fmt.Sprintf("%s-%010d", key, offset)) entry := Record{ Key: keys[cnt], Value: buf[:n], Tag: []byte(""), Sync: SYNC_WRITETHROUGH, Algo: ALGO_SHA1, Force: true, } status, err := conn.Put(&entry) if err != nil || status.Code != OK { klog.Errorf("Upload applet fail for chunk[%02d], key[%s] : %s\n", cnt, keys[cnt], status.Error()) // TODO: Should delete already PUT objects??? return nil, err } offset += n cnt++ } return keys, nil } utility_test.go 0 → 100644 +22 −0 Original line number Diff line number Diff line package kinetic import ( "testing" ) func TestUploadAppletFile(t *testing.T) { file := "not/exist/applet/javapplet.jar" keys, err := UploadAppletFile(blockConn, file, "test-applet") if err != nil || len(keys) <= 0 { t.Fatal("Upload applet file fail: ", file) } } func TestUpdateFirmware(t *testing.T) { //file := "K:\\srv\\tftp\\AD-installer-v44.01.03.slod" file := "not/exist/firmare/unknown-version.slod" err := UpdateFirmware(blockConn, file) if err != nil { t.Fatal("Firmware update fail: ", file) } } Loading
blockconnection.go +9 −9 Original line number Diff line number Diff line Loading @@ -33,7 +33,7 @@ func (conn *BlockConnection) NoOp() (Status, error) { return callback.Status(), nil } func (conn *BlockConnection) get(key []byte, getCmd kproto.Command_MessageType) (Record, Status, error) { func (conn *BlockConnection) get(key []byte, getCmd kproto.Command_MessageType) (*Record, Status, error) { callback := &GetCallback{} h := NewResponseHandler(callback) Loading @@ -47,25 +47,25 @@ func (conn *BlockConnection) get(key []byte, getCmd kproto.Command_MessageType) err = conn.nbc.GetNext(key, h) } if err != nil { return Record{}, callback.Status(), err return nil, callback.Status(), err } for callback.Done() == false { conn.nbc.Run() } return callback.Entry, callback.Status(), nil return &callback.Entry, callback.Status(), nil } func (conn *BlockConnection) Get(key []byte) (Record, Status, error) { func (conn *BlockConnection) Get(key []byte) (*Record, Status, error) { return conn.get(key, kproto.Command_GET) } func (conn *BlockConnection) GetNext(key []byte) (Record, Status, error) { func (conn *BlockConnection) GetNext(key []byte) (*Record, Status, error) { return conn.get(key, kproto.Command_GETNEXT) } func (conn *BlockConnection) GetPrevious(key []byte) (Record, Status, error) { func (conn *BlockConnection) GetPrevious(key []byte) (*Record, Status, error) { return conn.get(key, kproto.Command_GETPREVIOUS) } Loading Loading @@ -159,19 +159,19 @@ func (conn *BlockConnection) P2PPush(request *P2PPushRequest) ([]Status, Status, return callback.Statuses, callback.Status(), nil } func (conn *BlockConnection) GetLog(logs []LogType) (Log, Status, error) { func (conn *BlockConnection) GetLog(logs []LogType) (*Log, Status, error) { callback := &GetLogCallback{} h := NewResponseHandler(callback) err := conn.nbc.GetLog(logs, h) if err != nil { return Log{}, callback.Status(), err return nil, callback.Status(), err } for callback.Done() == false { conn.nbc.Run() } return callback.Logs, callback.Status(), nil return &callback.Logs, callback.Status(), nil } func (conn *BlockConnection) pinop(pin []byte, op kproto.Command_PinOperation_PinOpType) (Status, error) { Loading
connection_test.go +57 −1 Original line number Diff line number Diff line Loading @@ -114,7 +114,37 @@ func TestNonBlockGetKeyRange(t *testing.T) { } } func TestNonBlockGetLog(t *testing.T) { func TestNonBlockGetLogCapacity(t *testing.T) { logs := []LogType{ LOG_CAPACITIES, } klogs, status, err := blockConn.GetLog(logs) if err != nil || status.Code != OK { t.Fatal("Nonblocking GetLog Failure") } if !(klogs.Capacity.CapacityInBytes > 0 && klogs.Capacity.PortionFull > 0) { t.Logf("%#v", klogs.Capacity) t.Fatal("Nonblocking GetLog for Capacity Failure") } } func TestNonBlockGetLogLimit(t *testing.T) { logs := []LogType{ LOG_LIMITS, } klogs, status, err := blockConn.GetLog(logs) if err != nil || status.Code != OK { t.Fatal("Nonblocking GetLog Failure") } if klogs.Limits.MaxKeySize != 4096 || klogs.Limits.MaxValueSize != 1024*1024 { t.Logf("%#v", klogs.Limits) t.Fatal("Nonblocking GetLog for Limits Failure") } } func TestNonBlockGetLogAll(t *testing.T) { logs := []LogType{ LOG_UTILIZATIONS, LOG_TEMPERATURES, Loading @@ -130,3 +160,29 @@ func TestNonBlockGetLog(t *testing.T) { } t.Logf("GetLog %+v", klogs) } func TestNonBlockMediaScan(t *testing.T) { op := MediaOperation{ StartKey: []byte("object000"), EndKey: []byte("object999"), StartKeyInclusive: true, EndKeyInclusive: true, } status, err := blockConn.MediaScan(&op, PRIORITY_NORMAL) if err != nil || status.Code != OK { t.Fatal("Nonblocking MediaScan Failure: ", status.Error()) } } func TestNonBlockMediaOptimize(t *testing.T) { op := MediaOperation{ StartKey: []byte("object000"), EndKey: []byte("object999"), StartKeyInclusive: true, EndKeyInclusive: true, } status, err := blockConn.MediaOptimize(&op, PRIORITY_NORMAL) if err != nil || status.Code != OK { t.Fatal("Nonblocking MediaOptimize Failure: ", status.Error()) } }
service.go +9 −5 Original line number Diff line number Diff line Loading @@ -52,12 +52,14 @@ func newNetworkService(op ClientOptions) (*networkService, error) { return nil, err } ns := &networkService{conn: conn, ns := &networkService{ conn: conn, seq: 0, connId: 0, option: op, hmap: make(map[int64]*ResponseHandler), fatal: false} fatal: false, } _, _, _, err = ns.receive() if err != nil { Loading @@ -68,7 +70,8 @@ func newNetworkService(op ClientOptions) (*networkService, error) { return ns, nil } // When client network service has error, call error handling from all Messagehandler current in Queue. // When client network service has error, call error handling // from all Messagehandler current in Queue. func (ns *networkService) clientError(s Status, mh *ResponseHandler) { for ack, h := range ns.hmap { if h.callback != nil { Loading Loading @@ -140,7 +143,7 @@ func (ns *networkService) submit(msg *kproto.Message, cmd *kproto.Command, value err = ns.send(msg, value) if err != nil { return nil return err } klog.Info("Kinetic message send ", cmd.GetHeader().GetMessageType().String()) Loading @@ -151,9 +154,10 @@ func (ns *networkService) submit(msg *kproto.Message, cmd *kproto.Command, value } // update sequence number // TODO: Need mutex protection here ns.seq++ return err return nil } func (ns *networkService) send(msg *kproto.Message, value []byte) error { Loading
utility.go 0 → 100644 +92 −0 Original line number Diff line number Diff line package kinetic import ( "fmt" "io" "io/ioutil" "os" ) func UpdateFirmware(conn *BlockConnection, file string) error { _, err := os.Stat(file) if err != nil { if os.IsNotExist(err) { klog.Errorf("Update firmware fail, file %s not exist", file) } return err } data, err := ioutil.ReadFile(file) if err != nil { klog.Errorf("Update firmware fail, file %s can't read", file) return err } status, err := conn.UpdateFirmware(data) if err != nil || status.Code != OK { klog.Errorf("Update firmware fail : %s\n", status.Error()) } return err } func UploadAppletFile(conn *BlockConnection, file, key string) ([][]byte, error) { info, err := os.Stat(file) if err != nil { if os.IsNotExist(err) { klog.Errorf("Upload applet fail, file %s not exist", file) } return nil, err } fileSize := info.Size() var chunkSize int64 = 1024 * 1024 chunks := fileSize / chunkSize if fileSize%chunkSize != 0 { chunks++ } keys := make([][]byte, chunks) f, err := os.Open(file) if err != nil { return nil, err } defer f.Close() buf := make([]byte, chunkSize) var n int var offset, cnt int = 0, 0 for { n, err = f.Read(buf) if err != nil { if err == io.EOF { break } else { // TODO: Should delete already PUT objects??? return nil, err } } keys[cnt] = []byte(fmt.Sprintf("%s-%010d", key, offset)) entry := Record{ Key: keys[cnt], Value: buf[:n], Tag: []byte(""), Sync: SYNC_WRITETHROUGH, Algo: ALGO_SHA1, Force: true, } status, err := conn.Put(&entry) if err != nil || status.Code != OK { klog.Errorf("Upload applet fail for chunk[%02d], key[%s] : %s\n", cnt, keys[cnt], status.Error()) // TODO: Should delete already PUT objects??? return nil, err } offset += n cnt++ } return keys, nil }
utility_test.go 0 → 100644 +22 −0 Original line number Diff line number Diff line package kinetic import ( "testing" ) func TestUploadAppletFile(t *testing.T) { file := "not/exist/applet/javapplet.jar" keys, err := UploadAppletFile(blockConn, file, "test-applet") if err != nil || len(keys) <= 0 { t.Fatal("Upload applet file fail: ", file) } } func TestUpdateFirmware(t *testing.T) { //file := "K:\\srv\\tftp\\AD-installer-v44.01.03.slod" file := "not/exist/firmare/unknown-version.slod" err := UpdateFirmware(blockConn, file) if err != nil { t.Fatal("Firmware update fail: ", file) } }