Commit 2cdf06b2 authored by Ignacio Corderi's avatar Ignacio Corderi
Browse files

Added support for channeled Puts

parent b7011d95
Loading
Loading
Loading
Loading

.gitignore

0 → 100644
+1 −0
Original line number Diff line number Diff line
bin/
+46 −1
Original line number Diff line number Diff line
@@ -64,6 +64,7 @@ func (e RemoteError) Error() string {

type Client interface {
	Put (key []byte, value []byte) ((<-chan error), error)
	PutFrom (key []byte, length int, queue <-chan []byte) ((<-chan error), error)
	
	Close()
}
@@ -179,7 +180,51 @@ func(self *NetworkClient) Put(key []byte, value []byte) ((<-chan error), error)
			CommandBytes: cmd_bytes,
		}
	
	network.Send(self.conn, msg, value)	
	err = network.Send(self.conn, msg, value)	
	if err != nil { return nil, err }
			
	rx := make(chan error, 1)		
	pending := PendingOperation { sequence: self.sequence, receiver: rx }		
			
	self.notifier <- pending			
			
	self.sequence += 1
	
	return rx, nil
}

// TOOD: needs refactor
func(self *NetworkClient) PutFrom(key []byte, length int, queue <-chan []byte) ((<-chan error), error) {	
	cmd := &kproto.Command {
			Header: &kproto.Command_Header {
				ConnectionID: proto.Int64(self.connectionId),
				Sequence: proto.Int64(self.sequence),
				MessageType: kproto.Command_PUT.Enum(),
			},
			Body: &kproto.Command_Body {
				KeyValue: &kproto.Command_KeyValue {
					Key: key,
					Algorithm: kproto.Command_SHA1.Enum(),
					Tag: make([]byte, 0),
					Synchronization: kproto.Command_WRITEBACK.Enum(),
				},
			},
		}
		
	cmd_bytes, err := proto.Marshal(cmd)		
	if err != nil { return nil, err }
	
	msg := &kproto.Message {
			AuthType: kproto.Message_HMACAUTH.Enum(),
			HmacAuth: &kproto.Message_HMACauth {
				Identity: proto.Int64(self.userId),
				Hmac: calculate_hmac(self.secret, cmd_bytes),
			},
			CommandBytes: cmd_bytes,
		}
	
	err = network.SendFrom(self.conn, msg, length, queue)	
	if err != nil { return nil, err }
			
	rx := make(chan error, 1)		
	pending := PendingOperation { sequence: self.sequence, receiver: rx }		
+34 −0
Original line number Diff line number Diff line
@@ -29,10 +29,44 @@ import (
	"encoding/binary"
	"net"
	"io"
	"errors"
	
	kproto "github.com/seagate/kinetic-go/kinetic/proto"
)

// Refactor
func SendFrom(conn net.Conn, msg *kproto.Message, length int, queue <-chan []byte) error {
	msg_bytes, err := proto.Marshal(msg)
	if err != nil { return err }
	
	header := make([]byte, 9)
	header[0] = 70 // magic
	binary.BigEndian.PutUint32(header[1:5], uint32(len(msg_bytes)))
	binary.BigEndian.PutUint32(header[5:9], uint32(length))
	
	conn.Write(header)
	conn.Write(msg_bytes)
	sent := 0
	for {
		chunk := <-queue
		ln := len(chunk)
		if ln + sent > length {
			// TODO: should shut down socket to cancel operation.
			return errors.New("Tried to send more bytes than promised.")
		}
		if chunk == nil { break }
		conn.Write(chunk)
		sent += ln
	}
	
	if sent < length {
		// TODO: should shut down socket to cancel operation.
		return errors.New("Received less bytes than promised.")
	}
	
	return nil
}

func Send(conn net.Conn, msg *kproto.Message, value []byte) error {
	msg_bytes, err := proto.Marshal(msg)
	if err != nil { return err }
+44 −1
Original line number Diff line number Diff line
@@ -28,7 +28,7 @@ import "github.com/seagate/kinetic-go/kinetic"
import "strconv"
import "fmt"

func main() {
func regular() {
	client, _ := kinetic.Connect("localhost:8123")
	defer client.Close()
	
@@ -48,3 +48,46 @@ func main() {
		}
	}
}

func repeat(ch chan []byte, count int, data[] byte) {	
	for i := 0; i < count; i++ {
		ch <- data
	}
	ch <- nil
}

func channeled() {
	client, err := kinetic.Connect("localhost:8123")
	if err != nil {
		fmt.Println(err)
		return
	}	
	defer client.Close()
	
	count := 10
	
	data := make([]byte, 64*1024)
	
	rxs := make([]<-chan error, count)
	
	for i := 0; i < count; i++ {
		ch := make(chan []byte)
		go repeat(ch, 16, data)
		rxs[i], err = client.PutFrom([]byte("go-big-" + strconv.Itoa(i)), 1024*1024, ch)
		if err != nil {
			fmt.Println(err)
		}
	}
	
	// wait for all
	for i := 0; i < count; i++ {
		err := <-rxs[i]
		if err != nil {
			fmt.Println(err)
		}
	}
}

func main() {
	channeled()
}
 No newline at end of file