Commit 6493c75c authored by Zhu Yong's avatar Zhu Yong
Browse files

Merge branch 'develop'

parents b329be24 363a6f5b
Loading
Loading
Loading
Loading
+12 −1
Original line number Diff line number Diff line
# Introduction

Kinetic client library in Golang. 

## Kinetic Protocol Version

This client is using version `3.0.6` from [Kinetic-Protocol](https://github.com/Kinetic/kinetic-protocol)

## Installation


## Usage

Refer to file `kinetic_test.go` and `connection_test.go` for some examples.
+25 −16
Original line number Diff line number Diff line
@@ -30,35 +30,35 @@ func TestMain(m *testing.M) {
func TestBlockNoOp(t *testing.T) {
	status, err := blockConn.NoOp()
	if err != nil || status.Code != OK {
		t.Fatal("Blocking NoOp Failure")
		t.Fatal("Blocking NoOp Failure", err, status.String())
	}
}

func TestBlockGet(t *testing.T) {
	_, status, err := blockConn.Get([]byte("object000"))
	if err != nil || status.Code != OK {
		t.Fatal("Blocking Get Failure")
		t.Fatal("Blocking Get Failure", err, status.String())
	}
}

func TestBlockGetNext(t *testing.T) {
	_, status, err := blockConn.GetNext([]byte("object000"))
	if err != nil || status.Code != OK {
		t.Fatal("Blocking GetNext Failure")
		t.Fatal("Blocking GetNext Failure", err, status.String())
	}
}

func TestBlockGetPrevious(t *testing.T) {
	_, status, err := blockConn.GetPrevious([]byte("object000"))
	if err != nil || status.Code != OK {
		t.Fatal("Blocking GetPrevious Failure")
		t.Fatal("Blocking GetPrevious Failure", err, status.String())
	}
}

func TestBlockGetVersion(t *testing.T) {
	version, status, err := blockConn.GetVersion([]byte("object000"))
	if err != nil || status.Code != OK {
		t.Fatal("Blocking GetVersion Failure")
		t.Fatal("Blocking GetVersion Failure", err, status.String())
	}
	t.Logf("Object version = %x", version)
}
@@ -66,7 +66,7 @@ func TestBlockGetVersion(t *testing.T) {
func TestBlockFlush(t *testing.T) {
	status, err := blockConn.Flush()
	if err != nil || status.Code != OK {
		t.Fatal("Blocking Flush Failure")
		t.Fatal("Blocking Flush Failure", err, status.String())
	}
}

@@ -81,7 +81,7 @@ func TestBlockPut(t *testing.T) {
	}
	status, err := blockConn.Put(&entry)
	if err != nil || status.Code != OK {
		t.Fatal("Blocking Put Failure")
		t.Fatal("Blocking Put Failure", err, status.String())
	}
}

@@ -94,7 +94,7 @@ func TestBlockDelete(t *testing.T) {
	}
	status, err := blockConn.Delete(&entry)
	if err != nil || status.Code != OK {
		t.Fatal("Blocking Delete Failure")
		t.Fatal("Blocking Delete Failure", err, status.String())
	}
}

@@ -121,12 +121,12 @@ func TestBlockGetLogCapacity(t *testing.T) {
	}
	klogs, status, err := blockConn.GetLog(logs)
	if err != nil || status.Code != OK {
		t.Fatal("Blocking GetLog Failure")
		t.Fatal("Blocking GetLog Failure", err, status.String())
	}
	if !(klogs.Capacity.CapacityInBytes > 0 &&
		klogs.Capacity.PortionFull > 0) {
		t.Logf("%#v", klogs.Capacity)
		t.Fatal("Blocking GetLog for Capacity Failure")
		t.Fatal("Blocking GetLog for Capacity Failure", err, status.String())
	}
}

@@ -136,12 +136,12 @@ func TestBlockGetLogLimit(t *testing.T) {
	}
	klogs, status, err := blockConn.GetLog(logs)
	if err != nil || status.Code != OK {
		t.Fatal("Blocking GetLog Failure")
		t.Fatal("Blocking GetLog Failure", err, status.String())
	}
	if klogs.Limits.MaxKeySize != 4096 ||
		klogs.Limits.MaxValueSize != 1024*1024 {
		t.Logf("%#v", klogs.Limits)
		t.Fatal("Blocking GetLog for Limits Failure")
		t.Fatal("Blocking GetLog for Limits Failure", err, status.String())
	}
}

@@ -157,9 +157,18 @@ func TestBlockGetLogAll(t *testing.T) {
	}
	klogs, status, err := blockConn.GetLog(logs)
	if err != nil || status.Code != OK {
		t.Fatal("Blocking GetLog Failure")
		t.Fatal("Blocking GetLog Failure", err, status.String())
	}
	if klogs.Limits.MaxKeySize != 4096 ||
		klogs.Limits.MaxValueSize != 1024*1024 {
		t.Logf("%#v", klogs.Limits)
		t.Fatal("Blocking GetLog, Limits Failure", err, status.String())
	}
	if !(klogs.Capacity.CapacityInBytes > 0 &&
		klogs.Capacity.PortionFull > 0) {
		t.Logf("%#v", klogs.Capacity)
		t.Fatal("Blocking GetLog, Capacity Failure", err, status.String())
	}
	t.Logf("GetLog %+v", klogs)
}

func TestBlockMediaScan(t *testing.T) {
@@ -171,7 +180,7 @@ func TestBlockMediaScan(t *testing.T) {
	}
	status, err := blockConn.MediaScan(&op, PRIORITY_NORMAL)
	if err != nil || status.Code != OK {
		t.Fatal("Blocking MediaScan Failure: ", status.Error())
		t.Fatal("Blocking MediaScan Failure: ", err, status.String())
	}
}

@@ -184,6 +193,6 @@ func TestBlockMediaOptimize(t *testing.T) {
	}
	status, err := blockConn.MediaOptimize(&op, PRIORITY_NORMAL)
	if err != nil || status.Code != OK {
		t.Fatal("Blocking MediaOptimize Failure: ", status.Error())
		t.Fatal("Blocking MediaOptimize Failure: ", err, status.String())
	}
}

kinetic_test.go

0 → 100644
+218 −0
Original line number Diff line number Diff line
package kinetic

import (
	"bytes"
	"fmt"
)

func ExampleBlockPutGetDelete() {
	// Set the log leverl to debug
	SetLogLevel(LogLevelDebug)

	// Client options
	var option = ClientOptions{
		Host: "10.29.24.55",
		Port: 8123,
		User: 1,
		Hmac: []byte("asdfasdf")}

	conn, err := NewBlockConnection(option)
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	// PUT
	pentry := Record{
		Key:   []byte("Test Object"),
		Value: []byte("Test Object Data"),
		Sync:  SYNC_WRITETHROUGH,
		Algo:  ALGO_SHA1,
		Tag:   []byte(""),
		Force: true,
	}
	status, err := conn.Put(&pentry)
	if err != nil || status.Code != OK {
		fmt.Println("Blocking Put Failure")
	}

	// GET back the object
	gentry, status, err := conn.Get(pentry.Key)
	if err != nil || status.Code != OK {
		fmt.Println("Blocking Get Failure")
	}

	// Verify the object Key and Value
	if !bytes.Equal(pentry.Key, gentry.Key) {
		fmt.Printf("Key Mismatch: [%s] vs [%s]\n", pentry.Key, gentry.Key)
	}
	if !bytes.Equal(pentry.Value, gentry.Value) {
		fmt.Printf("Value Mismatch: [%s] vs [%s]\n", pentry.Value, gentry.Value)
	}

	// DELETE the object
	dentry := Record{
		Key:   pentry.Key,
		Sync:  pentry.Sync,
		Force: true,
	}
	status, err = conn.Delete(&dentry)
	if err != nil || status.Code != OK {
		fmt.Println("Blocking Delete Failure")
	}
}

func ExampleNonBlockPutGetDelete() {
	// Set the log leverl to debug
	SetLogLevel(LogLevelDebug)

	// Client options
	var option = ClientOptions{
		Host: "10.29.24.55",
		Port: 8123,
		User: 1,
		Hmac: []byte("asdfasdf")}

	conn, err := NewNonBlockConnection(option)
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	// PUT
	pentry := Record{
		Key:   []byte("Test Object"),
		Value: []byte("Test Object Data"),
		Sync:  SYNC_WRITETHROUGH,
		Algo:  ALGO_SHA1,
		Tag:   []byte(""),
		Force: true,
	}
	// Each Nonblock operation require specific Callback and ResponseHandler
	// For operation doesn't require data from Kinetic drive, GenericCallback will enough.
	pcallback := &GenericCallback{}
	ph := NewResponseHandler(pcallback)
	err = conn.Put(&pentry, ph)
	if err != nil {
		fmt.Println("NonBlocking Put Failure")
	}
	conn.Listen(ph)

	// GET back the object, GET operation need to process data from drive, so use GetCallBack
	gcallback := &GetCallback{}
	gh := NewResponseHandler(gcallback)
	err = conn.Get(pentry.Key, gh)
	if err != nil {
		fmt.Println("NonBlocking Get Failure")
	}
	conn.Listen(gh)
	gentry := gcallback.Entry

	// Verify the object Key and Value
	if !bytes.Equal(pentry.Key, gentry.Key) {
		fmt.Printf("Key Mismatch: [%s] vs [%s]\n", pentry.Key, gentry.Key)
	}
	if !bytes.Equal(pentry.Value, gentry.Value) {
		fmt.Printf("Value Mismatch: [%s] vs [%s]\n", pentry.Value, gentry.Value)
	}

	// DELETE the object, DELETE doesn't require data from drive, use GenericCallback
	dcallback := &GenericCallback{}
	dh := NewResponseHandler(dcallback)
	dentry := Record{
		Key:   pentry.Key,
		Sync:  pentry.Sync,
		Force: true,
	}
	err = conn.Delete(&dentry, dh)
	if err != nil {
		fmt.Println("NonBlocking Delete Failure")
	}
	conn.Listen(dh)
}

func ExampleNonBlockMuliplePut() {
	// Set the log leverl to debug
	SetLogLevel(LogLevelDebug)

	// Client options
	var option = ClientOptions{
		Host: "10.29.24.55",
		Port: 8123,
		User: 1,
		Hmac: []byte("asdfasdf")}

	conn, err := NewNonBlockConnection(option)
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	done := make(chan bool)

	prefix := []byte("TestObject")

	// PUT
	// 1st round: main routin PUT object, start new go routine to wait for operation done
	for id := 1; id <= 100; id++ {
		key := []byte(fmt.Sprintf("%s-%05d", prefix, id))
		v := bytes.Repeat(key, id)
		if len(v) > 1024*1024 {
			v = v[:1024*1024]
		}
		pentry := Record{
			Key:   key,
			Value: v,
			Sync:  SYNC_WRITETHROUGH,
			Algo:  ALGO_SHA1,
			Tag:   []byte(""),
			Force: true,
		}
		pcallback := &GenericCallback{}
		ph := NewResponseHandler(pcallback)
		err = conn.Put(&pentry, ph)
		if err != nil {
			fmt.Println("NonBlocking Put Failure")
		}

		go func() {
			conn.Listen(ph)
			done <- true
		}()
	}

	// PUT
	// 2nd round, start new go routin for each PUT object and wait for operation done
	for id := 101; id <= 200; id++ {
		go func(id int, done chan bool) {
			key := []byte(fmt.Sprintf("%s-%05d", prefix, id))
			v := bytes.Repeat(key, id)
			if len(v) > 1024*1024 {
				v = v[:1024*1024]
			}
			pentry := Record{
				Key:   key,
				Value: v,
				Sync:  SYNC_WRITETHROUGH,
				Algo:  ALGO_SHA1,
				Tag:   []byte(""),
				Force: true,
			}
			pcallback := &GenericCallback{}
			ph := NewResponseHandler(pcallback)
			err = conn.Put(&pentry, ph)
			if err != nil {
				fmt.Println("NonBlocking Put Failure")
			}

			conn.Listen(ph)
			done <- true
		}(id, done)
	}

	// Total 200 go routine started, wait for all to finish
	for id := 1; id <= 200; id++ {
		<-done
	}

}