Commit 551b4896 authored by Zhu Yong's avatar Zhu Yong
Browse files

Add utility function to upload file to kinetic device with user defined keys and chunk size

parent bc765e58
Loading
Loading
Loading
Loading
+80 −0
Original line number Diff line number Diff line
@@ -20,6 +20,8 @@ package kinetic
import (
	//"fmt"
	//"io"
	"fmt"
	"io"
	"io/ioutil"
	"os"
)
@@ -48,3 +50,81 @@ func UpdateFirmware(conn *BlockConnection, file string) error {

	return err
}

// UploadFile is the utility function to upload file to drive.
// conn is BlockConnection to drive, file is the full path to the file.
// The file may be stored into multiple object files depends on its size and input chunkSize.
// Input number of keys should equal to total number of object files on drive.
// If any chunk PUT fail, upload will stop and return status.
func UploadFile(conn *BlockConnection, file string, keys [][]byte, chunkSize int32) ([]Status, error) {
	info, err := os.Stat(file)
	if err != nil {
		if os.IsNotExist(err) {
			klog.Errorf("Upload fail, file %s not exist", file)
		}
		return nil, err
	}

	fileSize := info.Size()
	if fileSize <= 0 {
		return nil, fmt.Errorf("File content empty, can't upload")
	}

	if chunkSize <= 0 || chunkSize > 1024*1024 {
		return nil, fmt.Errorf("Chunk size should with range (1 -- %d)", 1024*1024)
	}

	chunks := fileSize / int64(chunkSize)
	if fileSize%int64(chunkSize) != 0 {
		chunks++
	}

	if len(keys) != int(chunks) {
		return nil, fmt.Errorf("Expect %d keys, actual %d keys", chunks, len(keys))
	}

	f, err := os.Open(file)
	if err != nil {
		return nil, err
	}
	defer f.Close()

	buf := make([]byte, chunkSize)
	var n int
	var offset, cnt int = 0, 0

	status := make([]Status, 0)

	for {
		n, err = f.Read(buf)
		if err != nil {
			if err == io.EOF {
				break
			} else {
				// TODO: Should delete already PUT objects???
				return nil, err
			}
		}

		entry := Record{
			Key:   keys[cnt],
			Value: buf[:n],
			Tag:   []byte(""),
			Sync:  SyncWriteThrough,
			Algo:  AlgorithmSHA1,
			Force: true,
		}
		sts, err := conn.Put(&entry)
		status = append(status, sts)
		if err != nil || sts.Code != OK {
			klog.Errorf("Upload fail for chunk[%02d], key[%s] : %s\n", cnt, keys[cnt], sts.Error())
			// TODO: Should delete already PUT objects???
			return status, err
		}

		offset += n
		cnt++
	}

	return status, nil
}