Loading blockconnection.go +62 −4 Original line number Diff line number Diff line Loading @@ -4,11 +4,15 @@ import ( kproto "github.com/yongzhy/kinetic-go/proto" ) // BlockConnection sends kinetic message to devices and wait for response message from device. // For all API fucntions, it will only return after response from kinetic device handled. // If no data required from kinetic device, API function will return Status and error. // If any data required from kinetic device, the data will be one of the return values. type BlockConnection struct { nbc *NonBlockConnection } // Helper function to establish block connection to device. // NewBlockConnection is helper function to establish block connection to device. func NewBlockConnection(op ClientOptions) (*BlockConnection, error) { nbc, err := NewNonBlockConnection(op) if err != nil { Loading @@ -19,6 +23,8 @@ func NewBlockConnection(op ClientOptions) (*BlockConnection, error) { return &BlockConnection{nbc: nbc}, err } // NoOp does nothing but wait for drive to return response. // On success, Status.Code will be OK func (conn *BlockConnection) NoOp() (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) Loading @@ -36,7 +42,7 @@ func (conn *BlockConnection) get(key []byte, getCmd kproto.Command_MessageType) callback := &GetCallback{} h := NewResponseHandler(callback) var err error = nil var err error switch getCmd { case kproto.Command_GET: err = conn.nbc.Get(key, h) Loading @@ -54,18 +60,26 @@ func (conn *BlockConnection) get(key []byte, getCmd kproto.Command_MessageType) return &callback.Entry, callback.Status(), err } // Get gets the object from kinetic drive with key. // On success, object Record will return and Status.Code = OK func (conn *BlockConnection) Get(key []byte) (*Record, Status, error) { return conn.get(key, kproto.Command_GET) } // GetNext gets the next object with key after the passed in key. // On success, object Record will return and Status.Code = OK func (conn *BlockConnection) GetNext(key []byte) (*Record, Status, error) { return conn.get(key, kproto.Command_GETNEXT) } // GetPrevious gets the previous object with key before the passed in key. // On success, object Record will return and Status.Code = OK func (conn *BlockConnection) GetPrevious(key []byte) (*Record, Status, error) { return conn.get(key, kproto.Command_GETPREVIOUS) } // GetKeyRange gets list of objects' keys, which meet the criteria defined by KeyRange. // On success, list of objects's keys returned, and Status.Code = OK func (conn *BlockConnection) GetKeyRange(r *KeyRange) ([][]byte, Status, error) { callback := &GetKeyRangeCallback{} h := NewResponseHandler(callback) Loading @@ -79,6 +93,8 @@ func (conn *BlockConnection) GetKeyRange(r *KeyRange) ([][]byte, Status, error) return callback.Keys, callback.Status(), err } // GetVersion gets object DB version information. // On success, version information will return and Status.Code = OK func (conn *BlockConnection) GetVersion(key []byte) ([]byte, Status, error) { callback := &GetVersionCallback{} h := NewResponseHandler(callback) Loading @@ -92,6 +108,8 @@ func (conn *BlockConnection) GetVersion(key []byte) ([]byte, Status, error) { return callback.Version, callback.Status(), err } // Flush requests kinetic device to write all cached data to persistent media. // On success, Status.Code = OK func (conn *BlockConnection) Flush() (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) Loading @@ -105,6 +123,8 @@ func (conn *BlockConnection) Flush() (Status, error) { return callback.Status(), err } // Delete deletes object from kinetic device. // On success, Status.Code = OK func (conn *BlockConnection) Delete(entry *Record) (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) Loading @@ -118,6 +138,8 @@ func (conn *BlockConnection) Delete(entry *Record) (Status, error) { return callback.Status(), err } // Put store object to kinetic device. // On success, Status.Code = OK func (conn *BlockConnection) Put(entry *Record) (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) Loading @@ -131,6 +153,7 @@ func (conn *BlockConnection) Put(entry *Record) (Status, error) { return callback.Status(), err } // P2Push func (conn *BlockConnection) P2PPush(request *P2PPushRequest) ([]Status, Status, error) { callback := &P2PPushCallback{} h := NewResponseHandler(callback) Loading @@ -144,6 +167,8 @@ func (conn *BlockConnection) P2PPush(request *P2PPushRequest) ([]Status, Status, return callback.Statuses, callback.Status(), err } // GetLog gets kinetic device Log information. Can request single LogType or multiple LogType. // On success, device Log information will return, and Status.Code = OK func (conn *BlockConnection) GetLog(logs []LogType) (*Log, Status, error) { callback := &GetLogCallback{} h := NewResponseHandler(callback) Loading @@ -161,7 +186,7 @@ func (conn *BlockConnection) pinop(pin []byte, op kproto.Command_PinOperation_Pi callback := &GenericCallback{} h := NewResponseHandler(callback) var err error = nil var err error switch op { case kproto.Command_PinOperation_SECURE_ERASE_PINOP: err = conn.nbc.SecureErase(pin, h) Loading @@ -181,23 +206,38 @@ func (conn *BlockConnection) pinop(pin []byte, op kproto.Command_PinOperation_Pi return callback.Status(), err } // SecureErase request kinetic device to perform secure erase. // SSL connection is requested to perform this operation, and the erase pin is needed. // On success, Status.Code = OK func (conn *BlockConnection) SecureErase(pin []byte) (Status, error) { return conn.pinop(pin, kproto.Command_PinOperation_SECURE_ERASE_PINOP) } // InstantErase request kinetic device to perform instant erase. // SSL connection is requested to perform this operation, and the erase pin is needed. // On success, Status.Code = OK func (conn *BlockConnection) InstantErase(pin []byte) (Status, error) { return conn.pinop(pin, kproto.Command_PinOperation_ERASE_PINOP) } // LockDevice locks the kinetic device. // SSL connection is requested to perform this operation, and the lock pin is needed. // On success, Status.Code = OK func (conn *BlockConnection) LockDevice(pin []byte) (Status, error) { return conn.pinop(pin, kproto.Command_PinOperation_LOCK_PINOP) } // UnlockDevice unlocks the kinetic device. // SSL connection is requested to perform this operation, and the lock pin is needed. // On success, Status.Code = OK func (conn *BlockConnection) UnlockDevice(pin []byte) (Status, error) { return conn.pinop(pin, kproto.Command_PinOperation_UNLOCK_PINOP) } // UpdateFirmware requests to update kientic device firmware. // Status.OK will return if firmware data received by kinetic device. // Then drive will reboot and perform the firmware update process. func (conn *BlockConnection) UpdateFirmware(code []byte) (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) Loading @@ -211,6 +251,8 @@ func (conn *BlockConnection) UpdateFirmware(code []byte) (Status, error) { return callback.Status(), err } // SetClusterVersion sets the cluster version on kinetic drive. // On success, Status.Code = OK. func (conn *BlockConnection) SetClusterVersion(version int64) (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) Loading @@ -224,6 +266,14 @@ func (conn *BlockConnection) SetClusterVersion(version int64) (Status, error) { return callback.Status(), err } // SetClientClusterVersion sets the cluster version for all following message to kinetic device. func (conn *BlockConnection) SetClientClusterVersion(version int64) { conn.nbc.SetClientClusterVersion(version) } // SetLockPin changes kinetic device lock pin. Both current pin and new pin needed. // SSL connection is required to perform this operation. // On success, Status.Code = OK. func (conn *BlockConnection) SetLockPin(currentPin []byte, newPin []byte) (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) Loading @@ -237,6 +287,9 @@ func (conn *BlockConnection) SetLockPin(currentPin []byte, newPin []byte) (Statu return callback.Status(), err } // SetErasePin changes kinetic device erase pin. Both current pin and new pin needed. // SSL connection is required to perform this operation. // On success, Status.Code = OK. func (conn *BlockConnection) SetErasePin(currentPin []byte, newPin []byte) (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) Loading @@ -250,7 +303,9 @@ func (conn *BlockConnection) SetErasePin(currentPin []byte, newPin []byte) (Stat return callback.Status(), err } func (conn *BlockConnection) SetACL(acls []SecurityACL) (Status, error) { // SetACL sets Permission for particular user Identify. // On success, Status.Code = OK. func (conn *BlockConnection) SetACL(acls []ACL) (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) err := conn.nbc.SetACL(acls, h) Loading @@ -263,6 +318,7 @@ func (conn *BlockConnection) SetACL(acls []SecurityACL) (Status, error) { return callback.Status(), err } // MediaScan func (conn *BlockConnection) MediaScan(op *MediaOperation, pri Priority) (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) Loading @@ -276,6 +332,7 @@ func (conn *BlockConnection) MediaScan(op *MediaOperation, pri Priority) (Status return callback.Status(), err } // MediaOptimize func (conn *BlockConnection) MediaOptimize(op *MediaOperation, pri Priority) (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) Loading @@ -289,6 +346,7 @@ func (conn *BlockConnection) MediaOptimize(op *MediaOperation, pri Priority) (St return callback.Status(), err } // Close the connection to kientic device func (conn *BlockConnection) Close() { conn.nbc.Close() } callback.go +14 −7 Original line number Diff line number Diff line Loading @@ -15,31 +15,34 @@ type Callback interface { Status() Status } // Generic Callback, can be used for all MessageType which doesn't require data from Kinetic drive. // GenericCallback can be used for all MessageType which doesn't require data from Kinetic drive. // And for MessageType that require data from drive, a new struct need to be defined GenericCallback type GenericCallback struct { status Status } // Success is called by ResponseHandler when response message received from kinetic device has OK status. func (c *GenericCallback) Success(resp *kproto.Command, value []byte) { c.status = Status{Code: OK} } // Failure is called ResponseHandler when response message received from kinetic device with status code other than OK. func (c *GenericCallback) Failure(status Status) { c.status = status } // Status returns the status after ResponseHandler processed response message from kinetic device. func (c *GenericCallback) Status() Status { return c.status } // Callback for Command_GET Message // GetCallback is the Callback for Command_GET Message type GetCallback struct { GenericCallback Entry Record // Entity information } // Success function extracts object information from kinetic message protocol and // Success function extracts object information from response message and // store into GetCallback.Entry. func (c *GetCallback) Success(resp *kproto.Command, value []byte) { c.GenericCallback.Success(resp, value) Loading @@ -51,34 +54,37 @@ func (c *GetCallback) Success(resp *kproto.Command, value []byte) { c.Entry.Value = value } // Callback for Command_GETKEYRANGE Message // GetKeyRangeCallback is the Callback for Command_GETKEYRANGE Message type GetKeyRangeCallback struct { GenericCallback Keys [][]byte // List of objects' keys within range, get from device } // Success extracts objects' keys within range, from response message. func (c *GetKeyRangeCallback) Success(resp *kproto.Command, value []byte) { c.GenericCallback.Success(resp, value) c.Keys = resp.GetBody().GetRange().GetKeys() } // Callback for Command_GETVERSION Message // GetVersionCallback is the Callback for Command_GETVERSION Message type GetVersionCallback struct { GenericCallback Version []byte // Version of the object on device } // Success extracts object's version information from response message. func (c *GetVersionCallback) Success(resp *kproto.Command, value []byte) { c.GenericCallback.Success(resp, value) c.Version = resp.GetBody().GetKeyValue().GetDbVersion() } // Callback for Command_PEER2PEERPUSH // P2PPushCallback is the Callback for Command_PEER2PEERPUSH type P2PPushCallback struct { GenericCallback Statuses []Status } // Success extracts P2Push operation status from response message. func (c *P2PPushCallback) Success(resp *kproto.Command, value []byte) { c.GenericCallback.Success(resp, value) c.Statuses = make([]Status, len(resp.GetBody().GetP2POperation().GetOperation())) Loading @@ -88,12 +94,13 @@ func (c *P2PPushCallback) Success(resp *kproto.Command, value []byte) { } } // Callback for Command_GETLOG Message // GetLogCallback is the Callback for Command_GETLOG Message type GetLogCallback struct { GenericCallback Logs Log // Device log information } // Success extracts kientic device's Log information from response message. func (c *GetLogCallback) Success(resp *kproto.Command, value []byte) { c.GenericCallback.Success(resp, value) c.Logs = getLogFromProto(resp) Loading connection_test.go +55 −5 Original line number Diff line number Diff line Loading @@ -6,17 +6,21 @@ import ( ) var ( blockConn *BlockConnection = nil nonblockConn *NonBlockConnection = nil blockConn *BlockConnection nonblockConn *NonBlockConnection ) var option = ClientOptions{ Host: "127.0.0.1", Port: 8123, //Port: 8443, // For SSL connection User: 1, Hmac: []byte("asdfasdf")} Hmac: []byte("asdfasdf"), //UseSSL: true, } func TestMain(m *testing.M) { SetLogLevel(LogLevelDebug) blockConn, _ = NewBlockConnection(option) if blockConn != nil { code := m.Run() Loading Loading @@ -72,7 +76,7 @@ func TestBlockFlush(t *testing.T) { func TestBlockPut(t *testing.T) { entry := Record{ Key: []byte("object001"), Key: []byte("object000"), Value: []byte("ABCDEFG"), Sync: SYNC_WRITETHROUGH, Algo: ALGO_SHA1, Loading @@ -87,7 +91,7 @@ func TestBlockPut(t *testing.T) { func TestBlockDelete(t *testing.T) { entry := Record{ Key: []byte("object001"), Key: []byte("object000"), Sync: SYNC_WRITETHROUGH, Algo: ALGO_SHA1, Force: true, Loading Loading @@ -196,3 +200,49 @@ func TestBlockMediaOptimize(t *testing.T) { t.Fatal("Blocking MediaOptimize Failure: ", err, status.String()) } } func TestBlockSetClusterVersion(t *testing.T) { status, err := blockConn.SetClusterVersion(1) if err != nil || status.Code != OK { t.Fatal("Blocking SetClusterVersion Failure: ", err, status.String()) } blockConn.SetClientClusterVersion(2) _, status, err = blockConn.Get([]byte("object000")) if err != nil || status.Code != REMOTE_CLUSTER_VERSION_MISMATCH { t.Fatal("Blocking Get expected REMOTE_CLUSTER_VERSION_MISMATCH. ", err, status.String()) } t.Log(status.String()) } func TestBlockInstantErase(t *testing.T) { t.Skip("Danger: Skip InstanceErase Test") status, err := blockConn.InstantErase([]byte("PIN")) if err != nil || status.Code != OK { t.Fatal("Blocking InstantErase Failure: ", err, status.String()) } } func TestBlockSecureErase(t *testing.T) { t.Skip("Danger: Skip SecureErase Test") status, err := blockConn.SecureErase([]byte("")) if err != nil || status.Code != OK { t.Fatal("Blocking SecureErase Failure: ", err, status.String()) } } func TestBlockSetErasePin(t *testing.T) { t.Skip("Danger: Skip SetErasePin Test") status, err := blockConn.SetErasePin([]byte(""), []byte("PIN")) if err != nil || status.Code != OK { t.Fatal("Blocking SetErasePin Failure: ", err, status.String()) } } func TestBlockSetLockPin(t *testing.T) { t.Skip("Danger: Skip SetLockPin Test") status, err := blockConn.SetLockPin([]byte(""), []byte("PIN")) if err != nil || status.Code != OK { t.Fatal("Blocking SetLockPin Failure: ", err, status.String()) } } getlog.go +51 −48 Original line number Diff line number Diff line Loading @@ -4,6 +4,7 @@ import ( kproto "github.com/yongzhy/kinetic-go/proto" ) // LogType defines what type of information to retrieve by GetLog. type LogType int32 const ( Loading Loading @@ -83,25 +84,28 @@ func convertLogTypeFromProto(l kproto.Command_GetLog_Type) LogType { return ret } // UtilizationLog for kinetic drive utilization information // UtilizationLog for kinetic device utilization information. type UtilizationLog struct { Name string // Name of the device utlity Value float32 // Value of device utility } // TemperatureLog for kinetic device tempture. type TemperatureLog struct { Name string // Name of the drive Name string // Name of the device Current float32 // Current Temperature Minimum float32 // Minimum Temperature for drive Maximum float32 // Maximum Tempture for drive Target float32 // Target Temperature for drive } // CapacityLog for kinetic device capacity information. type CapacityLog struct { CapacityInBytes uint64 // total capacity of hard disk, in bytes PortionFull float32 // remaining capacity of hard disk } // ConfigurationInterface for kinetic device network interfaces information. type ConfigurationInterface struct { Name string // network device name MAC []byte // network device mac address Loading @@ -109,6 +113,7 @@ type ConfigurationInterface struct { Ipv6Addr []byte // network device ipv6 address } // ConfigurationLog for kinetic device configuration information. type ConfigurationLog struct { Vendor string // Vendor name Model string // Device model Loading @@ -125,7 +130,7 @@ type ConfigurationLog struct { TlsPort int32 // TLS service port } // Statistic information for each type of MessageType. // StatisticsLog information for each type of MessageType. // Count is total number of Type message processed. // Bytes is the sum of the data that is in the data portion. // This does not include the command description. Loading @@ -137,6 +142,7 @@ type StatisticsLog struct { Bytes uint64 } // LimitsLog defines max values. type LimitsLog struct { MaxKeySize uint32 // max key size MaxValueSize uint32 // max value size Loading @@ -157,39 +163,40 @@ type DeviceLog struct { Name []byte } // Log is the top level structure that groups all the log information type Log struct { Utilizations []UtilizationLog // List of utilization information of the drive Temperatures []TemperatureLog // List of tempeture inforamtion of the drive Capacity CapacityLog // Capacity information of the drive Configuration ConfigurationLog // Configuration information of the drive Capacity *CapacityLog // Capacity information of the drive Configuration *ConfigurationLog // Configuration information of the drive Statistics []StatisticsLog // List of statistic information from the drive Messages []byte // Kinetic log messages from the drive Limits LimitsLog // Limits information from the drive Device DeviceLog Limits *LimitsLog // Limits information from the drive Device *DeviceLog } func getUtilizationLogFromProto(getlog *kproto.Command_GetLog) []UtilizationLog { func getUtilizationLogFromProto(getlog *kproto.Command_GetLog) (log []UtilizationLog) { log = nil utils := getlog.GetUtilizations() if utils != nil { ulog := make([]UtilizationLog, len(utils)) log = make([]UtilizationLog, len(utils)) for k, v := range utils { ulog[k] = UtilizationLog{ log[k] = UtilizationLog{ Name: v.GetName(), Value: v.GetValue(), } } return ulog } else { return nil } return } func getTemperatureLogFromProto(getlog *kproto.Command_GetLog) []TemperatureLog { func getTemperatureLogFromProto(getlog *kproto.Command_GetLog) (log []TemperatureLog) { log = nil temps := getlog.GetTemperatures() if temps != nil { templog := make([]TemperatureLog, len(temps)) log = make([]TemperatureLog, len(temps)) for k, v := range temps { templog[k] = TemperatureLog{ log[k] = TemperatureLog{ Name: v.GetName(), Current: v.GetCurrent(), Minimum: v.GetMinimum(), Loading @@ -197,28 +204,27 @@ func getTemperatureLogFromProto(getlog *kproto.Command_GetLog) []TemperatureLog Target: v.GetTarget(), } } return templog } else { return nil } return } func getCapacityLogFromProto(getlog *kproto.Command_GetLog) CapacityLog { var log CapacityLog func getCapacityLogFromProto(getlog *kproto.Command_GetLog) (log *CapacityLog) { log = nil capacity := getlog.GetCapacity() if capacity != nil { log = CapacityLog{ log = &CapacityLog{ CapacityInBytes: capacity.GetNominalCapacityInBytes(), PortionFull: capacity.GetPortionFull(), } } return log return } func getConfigurationInterfaceFromProto(conf *kproto.Command_GetLog_Configuration) []ConfigurationInterface { func getConfigurationInterfaceFromProto(conf *kproto.Command_GetLog_Configuration) (inf []ConfigurationInterface) { inf = nil pinf := conf.GetInterface() if pinf != nil { inf := make([]ConfigurationInterface, len(pinf)) inf = make([]ConfigurationInterface, len(pinf)) for k, v := range pinf { inf[k] = ConfigurationInterface{ Name: v.GetName(), Loading @@ -227,17 +233,15 @@ func getConfigurationInterfaceFromProto(conf *kproto.Command_GetLog_Configuratio Ipv6Addr: v.GetIpv6Address(), } } return inf } else { return nil } return } func getConfigurationLogFromProto(getlog *kproto.Command_GetLog) ConfigurationLog { var log ConfigurationLog func getConfigurationLogFromProto(getlog *kproto.Command_GetLog) (log *ConfigurationLog) { log = nil conf := getlog.GetConfiguration() if conf != nil { log = ConfigurationLog{ log = &ConfigurationLog{ Vendor: conf.GetVendor(), Model: conf.GetModel(), SerialNumber: conf.GetSerialNumber(), Loading @@ -253,35 +257,34 @@ func getConfigurationLogFromProto(getlog *kproto.Command_GetLog) ConfigurationLo TlsPort: conf.GetTlsPort(), } } return log return } func getStatisticsLogFromProto(getlog *kproto.Command_GetLog) []StatisticsLog { func getStatisticsLogFromProto(getlog *kproto.Command_GetLog) (log []StatisticsLog) { log = nil statics := getlog.GetStatistics() if statics != nil { slog := make([]StatisticsLog, len(statics)) log := make([]StatisticsLog, len(statics)) for k, v := range statics { slog[k] = StatisticsLog{ log[k] = StatisticsLog{ Type: convertMessageTypeFromProto(v.GetMessageType()), Count: v.GetCount(), Bytes: v.GetBytes(), } } return slog } else { return nil } return } func getLogMessageFromProto(getlog *kproto.Command_GetLog) []byte { return getlog.GetMessages() } func getLimitsLogFromProto(getlog *kproto.Command_GetLog) LimitsLog { var log LimitsLog func getLimitsLogFromProto(getlog *kproto.Command_GetLog) (log *LimitsLog) { log = nil limits := getlog.GetLimits() if limits != nil { log = LimitsLog{ log = &LimitsLog{ MaxKeySize: limits.GetMaxKeySize(), MaxValueSize: limits.GetMaxValueSize(), MaxVersionSize: limits.GetMaxVersionSize(), Loading @@ -297,12 +300,12 @@ func getLimitsLogFromProto(getlog *kproto.Command_GetLog) LimitsLog { MaxBatchCountPerDevice: limits.GetMaxBatchCountPerDevice(), } } return log return } func getDeviceLogFromProto(getlog *kproto.Command_GetLog) DeviceLog { func getDeviceLogFromProto(getlog *kproto.Command_GetLog) *DeviceLog { //TODO: Need more details return DeviceLog{ return &DeviceLog{ Name: getlog.GetDevice().GetName(), } } Loading handler.go +3 −2 Original line number Diff line number Diff line package kinetic import ( kproto "github.com/yongzhy/kinetic-go/proto" "sync" kproto "github.com/yongzhy/kinetic-go/proto" ) // ResponseHandler is the handler for XXXXX_RESPONSE message from drive. Loading Loading @@ -52,7 +53,7 @@ func (h *ResponseHandler) wait() { h.cond.L.Unlock() } // Helper function to build a ResponseHandler with call as the Callback. // NewResponseHandler is helper function to build a ResponseHandler with call as the Callback. // For each operation, a unique ResponseHandler is requried func NewResponseHandler(call Callback) *ResponseHandler { h := &ResponseHandler{callback: call, done: false, cond: sync.NewCond(&sync.Mutex{})} Loading Loading
blockconnection.go +62 −4 Original line number Diff line number Diff line Loading @@ -4,11 +4,15 @@ import ( kproto "github.com/yongzhy/kinetic-go/proto" ) // BlockConnection sends kinetic message to devices and wait for response message from device. // For all API fucntions, it will only return after response from kinetic device handled. // If no data required from kinetic device, API function will return Status and error. // If any data required from kinetic device, the data will be one of the return values. type BlockConnection struct { nbc *NonBlockConnection } // Helper function to establish block connection to device. // NewBlockConnection is helper function to establish block connection to device. func NewBlockConnection(op ClientOptions) (*BlockConnection, error) { nbc, err := NewNonBlockConnection(op) if err != nil { Loading @@ -19,6 +23,8 @@ func NewBlockConnection(op ClientOptions) (*BlockConnection, error) { return &BlockConnection{nbc: nbc}, err } // NoOp does nothing but wait for drive to return response. // On success, Status.Code will be OK func (conn *BlockConnection) NoOp() (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) Loading @@ -36,7 +42,7 @@ func (conn *BlockConnection) get(key []byte, getCmd kproto.Command_MessageType) callback := &GetCallback{} h := NewResponseHandler(callback) var err error = nil var err error switch getCmd { case kproto.Command_GET: err = conn.nbc.Get(key, h) Loading @@ -54,18 +60,26 @@ func (conn *BlockConnection) get(key []byte, getCmd kproto.Command_MessageType) return &callback.Entry, callback.Status(), err } // Get gets the object from kinetic drive with key. // On success, object Record will return and Status.Code = OK func (conn *BlockConnection) Get(key []byte) (*Record, Status, error) { return conn.get(key, kproto.Command_GET) } // GetNext gets the next object with key after the passed in key. // On success, object Record will return and Status.Code = OK func (conn *BlockConnection) GetNext(key []byte) (*Record, Status, error) { return conn.get(key, kproto.Command_GETNEXT) } // GetPrevious gets the previous object with key before the passed in key. // On success, object Record will return and Status.Code = OK func (conn *BlockConnection) GetPrevious(key []byte) (*Record, Status, error) { return conn.get(key, kproto.Command_GETPREVIOUS) } // GetKeyRange gets list of objects' keys, which meet the criteria defined by KeyRange. // On success, list of objects's keys returned, and Status.Code = OK func (conn *BlockConnection) GetKeyRange(r *KeyRange) ([][]byte, Status, error) { callback := &GetKeyRangeCallback{} h := NewResponseHandler(callback) Loading @@ -79,6 +93,8 @@ func (conn *BlockConnection) GetKeyRange(r *KeyRange) ([][]byte, Status, error) return callback.Keys, callback.Status(), err } // GetVersion gets object DB version information. // On success, version information will return and Status.Code = OK func (conn *BlockConnection) GetVersion(key []byte) ([]byte, Status, error) { callback := &GetVersionCallback{} h := NewResponseHandler(callback) Loading @@ -92,6 +108,8 @@ func (conn *BlockConnection) GetVersion(key []byte) ([]byte, Status, error) { return callback.Version, callback.Status(), err } // Flush requests kinetic device to write all cached data to persistent media. // On success, Status.Code = OK func (conn *BlockConnection) Flush() (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) Loading @@ -105,6 +123,8 @@ func (conn *BlockConnection) Flush() (Status, error) { return callback.Status(), err } // Delete deletes object from kinetic device. // On success, Status.Code = OK func (conn *BlockConnection) Delete(entry *Record) (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) Loading @@ -118,6 +138,8 @@ func (conn *BlockConnection) Delete(entry *Record) (Status, error) { return callback.Status(), err } // Put store object to kinetic device. // On success, Status.Code = OK func (conn *BlockConnection) Put(entry *Record) (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) Loading @@ -131,6 +153,7 @@ func (conn *BlockConnection) Put(entry *Record) (Status, error) { return callback.Status(), err } // P2Push func (conn *BlockConnection) P2PPush(request *P2PPushRequest) ([]Status, Status, error) { callback := &P2PPushCallback{} h := NewResponseHandler(callback) Loading @@ -144,6 +167,8 @@ func (conn *BlockConnection) P2PPush(request *P2PPushRequest) ([]Status, Status, return callback.Statuses, callback.Status(), err } // GetLog gets kinetic device Log information. Can request single LogType or multiple LogType. // On success, device Log information will return, and Status.Code = OK func (conn *BlockConnection) GetLog(logs []LogType) (*Log, Status, error) { callback := &GetLogCallback{} h := NewResponseHandler(callback) Loading @@ -161,7 +186,7 @@ func (conn *BlockConnection) pinop(pin []byte, op kproto.Command_PinOperation_Pi callback := &GenericCallback{} h := NewResponseHandler(callback) var err error = nil var err error switch op { case kproto.Command_PinOperation_SECURE_ERASE_PINOP: err = conn.nbc.SecureErase(pin, h) Loading @@ -181,23 +206,38 @@ func (conn *BlockConnection) pinop(pin []byte, op kproto.Command_PinOperation_Pi return callback.Status(), err } // SecureErase request kinetic device to perform secure erase. // SSL connection is requested to perform this operation, and the erase pin is needed. // On success, Status.Code = OK func (conn *BlockConnection) SecureErase(pin []byte) (Status, error) { return conn.pinop(pin, kproto.Command_PinOperation_SECURE_ERASE_PINOP) } // InstantErase request kinetic device to perform instant erase. // SSL connection is requested to perform this operation, and the erase pin is needed. // On success, Status.Code = OK func (conn *BlockConnection) InstantErase(pin []byte) (Status, error) { return conn.pinop(pin, kproto.Command_PinOperation_ERASE_PINOP) } // LockDevice locks the kinetic device. // SSL connection is requested to perform this operation, and the lock pin is needed. // On success, Status.Code = OK func (conn *BlockConnection) LockDevice(pin []byte) (Status, error) { return conn.pinop(pin, kproto.Command_PinOperation_LOCK_PINOP) } // UnlockDevice unlocks the kinetic device. // SSL connection is requested to perform this operation, and the lock pin is needed. // On success, Status.Code = OK func (conn *BlockConnection) UnlockDevice(pin []byte) (Status, error) { return conn.pinop(pin, kproto.Command_PinOperation_UNLOCK_PINOP) } // UpdateFirmware requests to update kientic device firmware. // Status.OK will return if firmware data received by kinetic device. // Then drive will reboot and perform the firmware update process. func (conn *BlockConnection) UpdateFirmware(code []byte) (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) Loading @@ -211,6 +251,8 @@ func (conn *BlockConnection) UpdateFirmware(code []byte) (Status, error) { return callback.Status(), err } // SetClusterVersion sets the cluster version on kinetic drive. // On success, Status.Code = OK. func (conn *BlockConnection) SetClusterVersion(version int64) (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) Loading @@ -224,6 +266,14 @@ func (conn *BlockConnection) SetClusterVersion(version int64) (Status, error) { return callback.Status(), err } // SetClientClusterVersion sets the cluster version for all following message to kinetic device. func (conn *BlockConnection) SetClientClusterVersion(version int64) { conn.nbc.SetClientClusterVersion(version) } // SetLockPin changes kinetic device lock pin. Both current pin and new pin needed. // SSL connection is required to perform this operation. // On success, Status.Code = OK. func (conn *BlockConnection) SetLockPin(currentPin []byte, newPin []byte) (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) Loading @@ -237,6 +287,9 @@ func (conn *BlockConnection) SetLockPin(currentPin []byte, newPin []byte) (Statu return callback.Status(), err } // SetErasePin changes kinetic device erase pin. Both current pin and new pin needed. // SSL connection is required to perform this operation. // On success, Status.Code = OK. func (conn *BlockConnection) SetErasePin(currentPin []byte, newPin []byte) (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) Loading @@ -250,7 +303,9 @@ func (conn *BlockConnection) SetErasePin(currentPin []byte, newPin []byte) (Stat return callback.Status(), err } func (conn *BlockConnection) SetACL(acls []SecurityACL) (Status, error) { // SetACL sets Permission for particular user Identify. // On success, Status.Code = OK. func (conn *BlockConnection) SetACL(acls []ACL) (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) err := conn.nbc.SetACL(acls, h) Loading @@ -263,6 +318,7 @@ func (conn *BlockConnection) SetACL(acls []SecurityACL) (Status, error) { return callback.Status(), err } // MediaScan func (conn *BlockConnection) MediaScan(op *MediaOperation, pri Priority) (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) Loading @@ -276,6 +332,7 @@ func (conn *BlockConnection) MediaScan(op *MediaOperation, pri Priority) (Status return callback.Status(), err } // MediaOptimize func (conn *BlockConnection) MediaOptimize(op *MediaOperation, pri Priority) (Status, error) { callback := &GenericCallback{} h := NewResponseHandler(callback) Loading @@ -289,6 +346,7 @@ func (conn *BlockConnection) MediaOptimize(op *MediaOperation, pri Priority) (St return callback.Status(), err } // Close the connection to kientic device func (conn *BlockConnection) Close() { conn.nbc.Close() }
callback.go +14 −7 Original line number Diff line number Diff line Loading @@ -15,31 +15,34 @@ type Callback interface { Status() Status } // Generic Callback, can be used for all MessageType which doesn't require data from Kinetic drive. // GenericCallback can be used for all MessageType which doesn't require data from Kinetic drive. // And for MessageType that require data from drive, a new struct need to be defined GenericCallback type GenericCallback struct { status Status } // Success is called by ResponseHandler when response message received from kinetic device has OK status. func (c *GenericCallback) Success(resp *kproto.Command, value []byte) { c.status = Status{Code: OK} } // Failure is called ResponseHandler when response message received from kinetic device with status code other than OK. func (c *GenericCallback) Failure(status Status) { c.status = status } // Status returns the status after ResponseHandler processed response message from kinetic device. func (c *GenericCallback) Status() Status { return c.status } // Callback for Command_GET Message // GetCallback is the Callback for Command_GET Message type GetCallback struct { GenericCallback Entry Record // Entity information } // Success function extracts object information from kinetic message protocol and // Success function extracts object information from response message and // store into GetCallback.Entry. func (c *GetCallback) Success(resp *kproto.Command, value []byte) { c.GenericCallback.Success(resp, value) Loading @@ -51,34 +54,37 @@ func (c *GetCallback) Success(resp *kproto.Command, value []byte) { c.Entry.Value = value } // Callback for Command_GETKEYRANGE Message // GetKeyRangeCallback is the Callback for Command_GETKEYRANGE Message type GetKeyRangeCallback struct { GenericCallback Keys [][]byte // List of objects' keys within range, get from device } // Success extracts objects' keys within range, from response message. func (c *GetKeyRangeCallback) Success(resp *kproto.Command, value []byte) { c.GenericCallback.Success(resp, value) c.Keys = resp.GetBody().GetRange().GetKeys() } // Callback for Command_GETVERSION Message // GetVersionCallback is the Callback for Command_GETVERSION Message type GetVersionCallback struct { GenericCallback Version []byte // Version of the object on device } // Success extracts object's version information from response message. func (c *GetVersionCallback) Success(resp *kproto.Command, value []byte) { c.GenericCallback.Success(resp, value) c.Version = resp.GetBody().GetKeyValue().GetDbVersion() } // Callback for Command_PEER2PEERPUSH // P2PPushCallback is the Callback for Command_PEER2PEERPUSH type P2PPushCallback struct { GenericCallback Statuses []Status } // Success extracts P2Push operation status from response message. func (c *P2PPushCallback) Success(resp *kproto.Command, value []byte) { c.GenericCallback.Success(resp, value) c.Statuses = make([]Status, len(resp.GetBody().GetP2POperation().GetOperation())) Loading @@ -88,12 +94,13 @@ func (c *P2PPushCallback) Success(resp *kproto.Command, value []byte) { } } // Callback for Command_GETLOG Message // GetLogCallback is the Callback for Command_GETLOG Message type GetLogCallback struct { GenericCallback Logs Log // Device log information } // Success extracts kientic device's Log information from response message. func (c *GetLogCallback) Success(resp *kproto.Command, value []byte) { c.GenericCallback.Success(resp, value) c.Logs = getLogFromProto(resp) Loading
connection_test.go +55 −5 Original line number Diff line number Diff line Loading @@ -6,17 +6,21 @@ import ( ) var ( blockConn *BlockConnection = nil nonblockConn *NonBlockConnection = nil blockConn *BlockConnection nonblockConn *NonBlockConnection ) var option = ClientOptions{ Host: "127.0.0.1", Port: 8123, //Port: 8443, // For SSL connection User: 1, Hmac: []byte("asdfasdf")} Hmac: []byte("asdfasdf"), //UseSSL: true, } func TestMain(m *testing.M) { SetLogLevel(LogLevelDebug) blockConn, _ = NewBlockConnection(option) if blockConn != nil { code := m.Run() Loading Loading @@ -72,7 +76,7 @@ func TestBlockFlush(t *testing.T) { func TestBlockPut(t *testing.T) { entry := Record{ Key: []byte("object001"), Key: []byte("object000"), Value: []byte("ABCDEFG"), Sync: SYNC_WRITETHROUGH, Algo: ALGO_SHA1, Loading @@ -87,7 +91,7 @@ func TestBlockPut(t *testing.T) { func TestBlockDelete(t *testing.T) { entry := Record{ Key: []byte("object001"), Key: []byte("object000"), Sync: SYNC_WRITETHROUGH, Algo: ALGO_SHA1, Force: true, Loading Loading @@ -196,3 +200,49 @@ func TestBlockMediaOptimize(t *testing.T) { t.Fatal("Blocking MediaOptimize Failure: ", err, status.String()) } } func TestBlockSetClusterVersion(t *testing.T) { status, err := blockConn.SetClusterVersion(1) if err != nil || status.Code != OK { t.Fatal("Blocking SetClusterVersion Failure: ", err, status.String()) } blockConn.SetClientClusterVersion(2) _, status, err = blockConn.Get([]byte("object000")) if err != nil || status.Code != REMOTE_CLUSTER_VERSION_MISMATCH { t.Fatal("Blocking Get expected REMOTE_CLUSTER_VERSION_MISMATCH. ", err, status.String()) } t.Log(status.String()) } func TestBlockInstantErase(t *testing.T) { t.Skip("Danger: Skip InstanceErase Test") status, err := blockConn.InstantErase([]byte("PIN")) if err != nil || status.Code != OK { t.Fatal("Blocking InstantErase Failure: ", err, status.String()) } } func TestBlockSecureErase(t *testing.T) { t.Skip("Danger: Skip SecureErase Test") status, err := blockConn.SecureErase([]byte("")) if err != nil || status.Code != OK { t.Fatal("Blocking SecureErase Failure: ", err, status.String()) } } func TestBlockSetErasePin(t *testing.T) { t.Skip("Danger: Skip SetErasePin Test") status, err := blockConn.SetErasePin([]byte(""), []byte("PIN")) if err != nil || status.Code != OK { t.Fatal("Blocking SetErasePin Failure: ", err, status.String()) } } func TestBlockSetLockPin(t *testing.T) { t.Skip("Danger: Skip SetLockPin Test") status, err := blockConn.SetLockPin([]byte(""), []byte("PIN")) if err != nil || status.Code != OK { t.Fatal("Blocking SetLockPin Failure: ", err, status.String()) } }
getlog.go +51 −48 Original line number Diff line number Diff line Loading @@ -4,6 +4,7 @@ import ( kproto "github.com/yongzhy/kinetic-go/proto" ) // LogType defines what type of information to retrieve by GetLog. type LogType int32 const ( Loading Loading @@ -83,25 +84,28 @@ func convertLogTypeFromProto(l kproto.Command_GetLog_Type) LogType { return ret } // UtilizationLog for kinetic drive utilization information // UtilizationLog for kinetic device utilization information. type UtilizationLog struct { Name string // Name of the device utlity Value float32 // Value of device utility } // TemperatureLog for kinetic device tempture. type TemperatureLog struct { Name string // Name of the drive Name string // Name of the device Current float32 // Current Temperature Minimum float32 // Minimum Temperature for drive Maximum float32 // Maximum Tempture for drive Target float32 // Target Temperature for drive } // CapacityLog for kinetic device capacity information. type CapacityLog struct { CapacityInBytes uint64 // total capacity of hard disk, in bytes PortionFull float32 // remaining capacity of hard disk } // ConfigurationInterface for kinetic device network interfaces information. type ConfigurationInterface struct { Name string // network device name MAC []byte // network device mac address Loading @@ -109,6 +113,7 @@ type ConfigurationInterface struct { Ipv6Addr []byte // network device ipv6 address } // ConfigurationLog for kinetic device configuration information. type ConfigurationLog struct { Vendor string // Vendor name Model string // Device model Loading @@ -125,7 +130,7 @@ type ConfigurationLog struct { TlsPort int32 // TLS service port } // Statistic information for each type of MessageType. // StatisticsLog information for each type of MessageType. // Count is total number of Type message processed. // Bytes is the sum of the data that is in the data portion. // This does not include the command description. Loading @@ -137,6 +142,7 @@ type StatisticsLog struct { Bytes uint64 } // LimitsLog defines max values. type LimitsLog struct { MaxKeySize uint32 // max key size MaxValueSize uint32 // max value size Loading @@ -157,39 +163,40 @@ type DeviceLog struct { Name []byte } // Log is the top level structure that groups all the log information type Log struct { Utilizations []UtilizationLog // List of utilization information of the drive Temperatures []TemperatureLog // List of tempeture inforamtion of the drive Capacity CapacityLog // Capacity information of the drive Configuration ConfigurationLog // Configuration information of the drive Capacity *CapacityLog // Capacity information of the drive Configuration *ConfigurationLog // Configuration information of the drive Statistics []StatisticsLog // List of statistic information from the drive Messages []byte // Kinetic log messages from the drive Limits LimitsLog // Limits information from the drive Device DeviceLog Limits *LimitsLog // Limits information from the drive Device *DeviceLog } func getUtilizationLogFromProto(getlog *kproto.Command_GetLog) []UtilizationLog { func getUtilizationLogFromProto(getlog *kproto.Command_GetLog) (log []UtilizationLog) { log = nil utils := getlog.GetUtilizations() if utils != nil { ulog := make([]UtilizationLog, len(utils)) log = make([]UtilizationLog, len(utils)) for k, v := range utils { ulog[k] = UtilizationLog{ log[k] = UtilizationLog{ Name: v.GetName(), Value: v.GetValue(), } } return ulog } else { return nil } return } func getTemperatureLogFromProto(getlog *kproto.Command_GetLog) []TemperatureLog { func getTemperatureLogFromProto(getlog *kproto.Command_GetLog) (log []TemperatureLog) { log = nil temps := getlog.GetTemperatures() if temps != nil { templog := make([]TemperatureLog, len(temps)) log = make([]TemperatureLog, len(temps)) for k, v := range temps { templog[k] = TemperatureLog{ log[k] = TemperatureLog{ Name: v.GetName(), Current: v.GetCurrent(), Minimum: v.GetMinimum(), Loading @@ -197,28 +204,27 @@ func getTemperatureLogFromProto(getlog *kproto.Command_GetLog) []TemperatureLog Target: v.GetTarget(), } } return templog } else { return nil } return } func getCapacityLogFromProto(getlog *kproto.Command_GetLog) CapacityLog { var log CapacityLog func getCapacityLogFromProto(getlog *kproto.Command_GetLog) (log *CapacityLog) { log = nil capacity := getlog.GetCapacity() if capacity != nil { log = CapacityLog{ log = &CapacityLog{ CapacityInBytes: capacity.GetNominalCapacityInBytes(), PortionFull: capacity.GetPortionFull(), } } return log return } func getConfigurationInterfaceFromProto(conf *kproto.Command_GetLog_Configuration) []ConfigurationInterface { func getConfigurationInterfaceFromProto(conf *kproto.Command_GetLog_Configuration) (inf []ConfigurationInterface) { inf = nil pinf := conf.GetInterface() if pinf != nil { inf := make([]ConfigurationInterface, len(pinf)) inf = make([]ConfigurationInterface, len(pinf)) for k, v := range pinf { inf[k] = ConfigurationInterface{ Name: v.GetName(), Loading @@ -227,17 +233,15 @@ func getConfigurationInterfaceFromProto(conf *kproto.Command_GetLog_Configuratio Ipv6Addr: v.GetIpv6Address(), } } return inf } else { return nil } return } func getConfigurationLogFromProto(getlog *kproto.Command_GetLog) ConfigurationLog { var log ConfigurationLog func getConfigurationLogFromProto(getlog *kproto.Command_GetLog) (log *ConfigurationLog) { log = nil conf := getlog.GetConfiguration() if conf != nil { log = ConfigurationLog{ log = &ConfigurationLog{ Vendor: conf.GetVendor(), Model: conf.GetModel(), SerialNumber: conf.GetSerialNumber(), Loading @@ -253,35 +257,34 @@ func getConfigurationLogFromProto(getlog *kproto.Command_GetLog) ConfigurationLo TlsPort: conf.GetTlsPort(), } } return log return } func getStatisticsLogFromProto(getlog *kproto.Command_GetLog) []StatisticsLog { func getStatisticsLogFromProto(getlog *kproto.Command_GetLog) (log []StatisticsLog) { log = nil statics := getlog.GetStatistics() if statics != nil { slog := make([]StatisticsLog, len(statics)) log := make([]StatisticsLog, len(statics)) for k, v := range statics { slog[k] = StatisticsLog{ log[k] = StatisticsLog{ Type: convertMessageTypeFromProto(v.GetMessageType()), Count: v.GetCount(), Bytes: v.GetBytes(), } } return slog } else { return nil } return } func getLogMessageFromProto(getlog *kproto.Command_GetLog) []byte { return getlog.GetMessages() } func getLimitsLogFromProto(getlog *kproto.Command_GetLog) LimitsLog { var log LimitsLog func getLimitsLogFromProto(getlog *kproto.Command_GetLog) (log *LimitsLog) { log = nil limits := getlog.GetLimits() if limits != nil { log = LimitsLog{ log = &LimitsLog{ MaxKeySize: limits.GetMaxKeySize(), MaxValueSize: limits.GetMaxValueSize(), MaxVersionSize: limits.GetMaxVersionSize(), Loading @@ -297,12 +300,12 @@ func getLimitsLogFromProto(getlog *kproto.Command_GetLog) LimitsLog { MaxBatchCountPerDevice: limits.GetMaxBatchCountPerDevice(), } } return log return } func getDeviceLogFromProto(getlog *kproto.Command_GetLog) DeviceLog { func getDeviceLogFromProto(getlog *kproto.Command_GetLog) *DeviceLog { //TODO: Need more details return DeviceLog{ return &DeviceLog{ Name: getlog.GetDevice().GetName(), } } Loading
handler.go +3 −2 Original line number Diff line number Diff line package kinetic import ( kproto "github.com/yongzhy/kinetic-go/proto" "sync" kproto "github.com/yongzhy/kinetic-go/proto" ) // ResponseHandler is the handler for XXXXX_RESPONSE message from drive. Loading Loading @@ -52,7 +53,7 @@ func (h *ResponseHandler) wait() { h.cond.L.Unlock() } // Helper function to build a ResponseHandler with call as the Callback. // NewResponseHandler is helper function to build a ResponseHandler with call as the Callback. // For each operation, a unique ResponseHandler is requried func NewResponseHandler(call Callback) *ResponseHandler { h := &ResponseHandler{callback: call, done: false, cond: sync.NewCond(&sync.Mutex{})} Loading