Commit 91a0e39e authored by Greg Williams's avatar Greg Williams
Browse files

Updated client API for new KineticSession encapsulating an opaque pointer to...

Updated client API for new KineticSession encapsulating an opaque pointer to the private connection instance established for a given session. Still need to complete usage updates to get compilation working again.
parent eb701b5a
Loading
Loading
Loading
Loading
+24 −26
Original line number Diff line number Diff line
v0.5.0
------
* Added blocking PUT operation
v0.8.1 (kinetic-protocol 3.0.5)
-------------------------------
* Added GETLOG operation.
* Added thread-safety to allow multiple client threads within the same session.
* Added I/O examples to demonstrate client write operations for blocking/non-blocking (asynchrounous) and single/multi-threaded.

v0.8.0 (kinetic-protocol 3.0.5)
-------------------------------
* **Updated to use Kinetic Protocol v3.0 (3.0.5)**
    * **NOT backwards compatible with earlier versions of Kinetic Protocol**
    * **Kinetic device firmware must be updated to a release supporting v3.0 protocol!**
* Added asynchronous/non-blocking execution option to all operations (GET/PUT/DELETE/GETKEYRANGE).
    * A given operation is carried out in asynchronous mode if a closure callback with optional data is supplied.
* ByteArray and ByteBuffer types are now supplied directly in the byte_array.h public interface file.
* Fixed a concurrency issue that intermittently produces a crash in kinetic_allocator in a threaded context

v0.7.0 (kinetic-protocol 2.0.6)
-------------------------------
* Added blocking GETKEYRANGE operation.

v0.6.0
------
v0.6.0 (kinetic-protocol 2.0.6)
-------------------------------
* Added blocking GET and DELETE operations.
* Added Makefile build implementing standard (make; sudo make install) interface.
* Added creation/installation of static (.a) and dynamic (.so) libraries
* Added ByteArray type for buffer management.
* Added Kinetic_KeyValue type for key/value pair handling.

v0.7.0
------
* Added blocking GETKEYRANGE operation.

v0.8.0
------
* Updated to use Kinetic Protocol v3.0 (3.0.5)
    * NOT backwards compatible with earlier versions of Kinetic Protocol
    * Kinetic device firmware must be updated to a release supporting v3.0 protocol!
* Added asynchronous/non-blocking execution option to all operations (GET/PUT/DELETE/GETKEYRANGE).
    * A given operation is carried out in asynchronous mode if a closure callback with optional data is supplied.
* ByteArray and ByteBuffer types are now supplied directly in the byte_array.h public interface file.

v0.8.1
------
* Added GETLOG operation.
* Added thread-safety to allow multiple client threads within the same session.
* Added I/O examples to demonstrate client write operations for blocking/non-blocking (asynchrounous) and single/multi-threaded.
* Still using Kinetic Protocol v3.0 (3.0.5)
    * NOT backwards compatible with earlier versions of Kinetic Protocol (< v3.0)
    * Kinetic device firmware must be updated to a release supporting v3.0 protocol!
v0.5.0 (kinetic-protocol 2.0.4)
-------------------------------
* Added blocking PUT operation
+15 −16
Original line number Diff line number Diff line
@@ -40,13 +40,12 @@ void KineticClient_Shutdown(void);
 * @brief Initializes the Kinetic API, configures logging destination, establishes a
 * connection to the specified Kinetic Device, and establishes a session.
 *
 * @param config    Session configuration
 * @param session   Configured KineticSession to connect
 *  .host             Host name or IP address to connect to
 *  .port             Port to establish socket connection on
 *  .clusterVersion   Cluster version to use for the session
 *  .identity         Identity to use for the session
 *  .hmacKey          Key to use for HMAC calculations (NULL-terminated string)
 * @param handle    Pointer to KineticSessionHandle (populated upon successful connection)
 *
 * @return          Returns the resulting KineticStatus
 */
@@ -55,7 +54,7 @@ KineticStatus KineticClient_CreateConnection(KineticSession * const session);
/**
 * @brief Closes the connection to a host.
 *
 * @param handle    KineticSessionHandle for a connected session.
 * @param session   The KineticSession to destroy.
 *
 * @return          Returns the resulting KineticStatus
 */
@@ -64,7 +63,7 @@ KineticStatus KineticClient_DestroyConnection(KineticSession * const session);
/**
 * @brief Executes a NOOP command to test whether the Kinetic Device is operational.
 *
 * @param handle        KineticSessionHandle for a connected session.
 * @param session       The connected KineticSession to use for the operation.
 *
 * @return              Returns the resulting KineticStatus
 */
@@ -73,7 +72,7 @@ KineticStatus KineticClient_NoOp(KineticSession const * const session);
/**
 * @brief Executes a PUT command to store/update an entry on the Kinetic Device.
 *
 * @param handle        KineticSessionHandle for a connected session.
 * @param session       The connected KineticSession to use for the operation.
 * @param entry         Key/value entry for object to store. 'value' must
 *                      specify the data to be stored.
 * @param closure       Optional closure. If specified, operation will be
@@ -83,14 +82,14 @@ KineticStatus KineticClient_NoOp(KineticSession const * const session);
 *
 * @return              Returns the resulting KineticStatus
 */
KineticStatus KineticClient_Put(KineticSessionHandle handle,
KineticStatus KineticClient_Put(KineticSession const * const session,
                                KineticEntry* const entry,
                                KineticCompletionClosure* closure);

/**
 * @brief Executes a GET command to retrieve and entry from the Kinetic Device.
 *
 * @param handle        KineticSessionHandle for a connected session.
 * @param session       The connected KineticSession to use for the operation.
 * @param entry         Key/value entry for object to retrieve. 'value' will
 *                      be populated unless 'metadataOnly' is set to 'true'.
 * @param closure       Optional closure. If specified, operation will be
@@ -99,14 +98,14 @@ KineticStatus KineticClient_Put(KineticSessionHandle handle,
 *
 * @return              Returns the resulting KineticStatus
 */
KineticStatus KineticClient_Get(KineticSessionHandle handle,
KineticStatus KineticClient_Get(KineticSession const * const session,
                                KineticEntry* const entry,
                                KineticCompletionClosure* closure);

/**
 * @brief Executes a DELETE command to delete an entry from the Kinetic Device
 *
 * @param handle        KineticSessionHandle for a connected session.
 * @param session       The connected KineticSession to use for the operation.
 * @param entry         Key/value entry for object to delete. 'value' is
 *                      not used for this operation.
 * @param closure       Optional closure. If specified, operation will be
@@ -115,7 +114,7 @@ KineticStatus KineticClient_Get(KineticSessionHandle handle,
 *
 * @return              Returns the resulting KineticStatus
 */
KineticStatus KineticClient_Delete(KineticSessionHandle handle,
KineticStatus KineticClient_Delete(KineticSession const * const session,
                                   KineticEntry* const entry,
                                   KineticCompletionClosure* closure);

@@ -123,7 +122,7 @@ KineticStatus KineticClient_Delete(KineticSessionHandle handle,
 * @brief Executes a GETKEYRANGE command to retrive a set of keys in the range
 * specified range from the Kinetic Device
 *
 * @param handle        KineticSessionHandle for a connected session
 * @param session       The connected KineticSession to use for the operation
 * @param range         KineticKeyRange specifying keys to return
 * @param keys          ByteBufferArray to store the retrieved keys
 * @param closure       Optional closure. If specified, operation will be
@@ -134,7 +133,7 @@ KineticStatus KineticClient_Delete(KineticSessionHandle handle,
 * @return              Returns 0 upon succes, -1 or the Kinetic status code
 *                      upon failure
 */
KineticStatus KineticClient_GetKeyRange(KineticSessionHandle handle,
KineticStatus KineticClient_GetKeyRange(KineticSession const * const session,
                                        KineticKeyRange* range, ByteBufferArray* keys,
                                        KineticCompletionClosure* closure);

@@ -142,7 +141,7 @@ KineticStatus KineticClient_GetKeyRange(KineticSessionHandle handle,
 * @brief Executes a GETLOG command to retrive a set of keys in the range
 * specified range from the Kinetic Device
 *
 * @param handle        KineticSessionHandle for a connected session
 * @param session       The connected KineticSession to use for the operation
 * @param type          KineticLogDataType specifying data type to retrieve.
 * @param info          KineticDeviceInfo pointer, which will be assigned to
 *                      a dynamically allocated structure populated with
@@ -154,7 +153,7 @@ KineticStatus KineticClient_GetKeyRange(KineticSessionHandle handle,
 * @return              Returns 0 upon succes, -1 or the Kinetic status code
 *                      upon failure
 */
KineticStatus KineticClient_GetLog(KineticSessionHandle handle,
KineticStatus KineticClient_GetLog(KineticSession const * const session,
                                   KineticDeviceInfo_Type type,
                                   KineticDeviceInfo** info,
                                   KineticCompletionClosure* closure);
@@ -162,10 +161,10 @@ KineticStatus KineticClient_GetLog(KineticSessionHandle handle,
/**
 * @brief Executes an InstantSecureErase command to erase all data from the Kinetic device.
 *
 * @param handle        KineticSessionHandle for a connected session.
 * @param session       The connected KineticSession to use for the operation.
 *
 * @return              Returns the resulting KineticStatus
 */
KineticStatus KineticClient_InstantSecureErase(KineticSessionHandle handle);
KineticStatus KineticClient_InstantSecureErase(KineticSession const * const session);

#endif // _KINETIC_CLIENT_H
+16 −12
Original line number Diff line number Diff line
@@ -88,24 +88,20 @@ typedef enum _KineticSynchronization {


/**
 * @brief Handle for a session instance
 * @brief Kinetic Connection Instance
 */
typedef int KineticSessionHandle;

struct _KineticConnection;

/**
 * @brief Structure used to specify the configuration of a session.
 * @brief Structure used to specify the configuration for a session.
 */
typedef struct _KineticSession {
typedef struct _KineticSessionConfig {
    // Host name/IP address of Kinetic Device
    char    host[HOST_NAME_MAX];

    // Port for Kinetic Device session
    int     port;

    // Set to true to enable non-blocking/asynchronous I/O
    bool    nonBlocking;

    // The version number of this cluster definition. If this is not equal to
    // the value on the Kinetic Device, the request is rejected and will return
    // `KINETIC_STATUS_VERSION_FAILURE`
@@ -120,22 +116,30 @@ typedef struct _KineticSession {
    // client and the device, used to sign requests.
    uint8_t keyData[KINETIC_MAX_KEY_LEN];
    ByteArray hmacKey;
} KineticSessionConfig;

/**
 * @brief An instance of a session with a Kinetic device.
 */
typedef struct _KineticSession {
    // Session configuration structure which must be configured 
    KineticSessionConfig config;

    // Connection instance which is dynamically allocated upon call to KineticClient_CreateConnection.
    // Client must call KineticClient_DestroyConnection when finished with a session to shutdown
    // a session cleanly and free the `connection`.
    void* connection;
    struct _KineticConnection* connection;
} KineticSession;

#define KINETIC_SESSION_INIT(_session, _host, _clusterVersion, _identity, _hmacKey) { \
    *(_session) = (KineticSession) { \
    _session.config = (KineticSessionConfig) { \
        .port = KINETIC_PORT, \
        .clusterVersion = (_clusterVersion), \
        .identity = (_identity), \
        .hmacKey = {.data = (_session)->keyData, .len = (_hmacKey).len}, \
    }; \
    strcpy((_session)->host, (_host)); \
    memcpy((_session)->hmacKey.data, (_hmacKey).data, (_hmacKey).len); \
    strcpy(*(_session).config->host, (_host)); \
    memcpy(*(_session).config->hmacKey.data, (_hmacKey).data, (_hmacKey).len); \
}

// Kinetic Status Codes
+7 −7
Original line number Diff line number Diff line
@@ -30,7 +30,7 @@

typedef struct {
    char ip[16];
    KineticSessionHandle sessionHandle;
    KineticSession* session;
    char keyPrefix[KINETIC_DEFAULT_KEY_LEN];
    uint8_t key[KINETIC_DEFAULT_KEY_LEN];
    uint8_t version[KINETIC_DEFAULT_KEY_LEN];
@@ -65,7 +65,7 @@ void store_data(write_args* args)
        );

        // Store the object
        KineticStatus status = KineticClient_Put(args->sessionHandle, entry, NULL);
        KineticStatus status = KineticClient_Put(args->session, entry, NULL);
        if (status != KINETIC_STATUS_SUCCESS) {
            fprintf(stderr, "Kinetic PUT of object %d to host %s failed w/ status: %s\n",
                objIndex, args->ip, Kinetic_GetStatusDescription(status));
@@ -99,7 +99,7 @@ int main(int argc, char** argv)
    // Establish connection
    KineticStatus status;
    const char HmacKeyString[] = "asdfasdf";
    const KineticSession sessionConfig = {
    const KineticSession session = {
        .host = "localhost",
        .port = KINETIC_PORT,
        .clusterVersion = 0,
@@ -108,10 +108,10 @@ int main(int argc, char** argv)
    };
    write_args* writeArgs = calloc(1, sizeof(write_args));
    KineticClient_Init("stdout", 0);
    status = KineticClient_CreateConnection(&sessionConfig, &writeArgs->sessionHandle);
    status = KineticClient_CreateConnection(&session);
    if (status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Connection to host '%s' failed w/ status: %s",
            sessionConfig.host, Kinetic_GetStatusDescription(status));
            session.host, Kinetic_GetStatusDescription(status));
    }

    // Create a ByteBuffer for consuming chunks of data out of for overlapped PUTs
@@ -133,14 +133,14 @@ int main(int argc, char** argv)
        .value = ByteBuffer_Create(writeArgs->value, sizeof(writeArgs->value), 0),
        .synchronization = KINETIC_SYNCHRONIZATION_WRITEBACK,
    };
    strcpy(writeArgs->ip, sessionConfig.host);
    strcpy(writeArgs->ip, session.host);

    // Store the data
    printf("\nWriting data file to the Kinetic device...\n");
    store_data(writeArgs);

    // Shutdown client connection and cleanup
    KineticClient_DestroyConnection(&writeArgs->sessionHandle);
    KineticClient_DestroyConnection(&writeArgs->session);
    KineticClient_Shutdown();
    free(writeArgs);
    free(buf);
+11 −13
Original line number Diff line number Diff line
@@ -37,7 +37,7 @@
typedef struct {
    pthread_t threadID;
    char ip[16];
    KineticSessionHandle sessionHandle;
    KineticSession* session;
    char keyPrefix[KINETIC_DEFAULT_KEY_LEN];
    uint8_t key[KINETIC_DEFAULT_KEY_LEN];
    uint8_t version[KINETIC_DEFAULT_KEY_LEN];
@@ -66,7 +66,7 @@ void* store_data(void* args)
        ByteBuffer_AppendArray(&entry->value, ByteBuffer_Consume(&thread_args->data, KINETIC_OBJ_SIZE));

        // Store the data slice
        KineticStatus status = KineticClient_Put(thread_args->sessionHandle, entry, NULL);
        KineticStatus status = KineticClient_Put(thread_args->session, entry, NULL);
        if (status != KINETIC_STATUS_SUCCESS) {
            fprintf(stderr, "Failed writing entry %d to disk w/status: %s",
                objIndex+1, Kinetic_GetStatusDescription(status));
@@ -96,34 +96,32 @@ int main(int argc, char** argv)
        exit(-1);
    }

    // Allocate session/thread data
    write_args* writeArgs = calloc(NUM_FILES, sizeof(write_args));
    if (writeArgs == NULL) {
        fprintf(stderr, "Failed allocating overlapped thread arguments!\n");
    }

    // Initialize kinetic-c and configure sessions
    KineticClient_Init("stdout", 0);
    const char HmacKeyString[] = "asdfasdf";
    const KineticSession sessionConfig = {
    const KineticSession session = {
        .host = "localhost",
        .port = KINETIC_PORT,
        .clusterVersion = 0,
        .identity = 1,
        .hmacKey = ByteArray_CreateWithCString(HmacKeyString),
    };
    KineticClient_Init("stdout", 0);
    write_args* writeArgs = calloc(NUM_FILES, sizeof(write_args));
    if (writeArgs == NULL) {
        fprintf(stderr, "Failed allocating overlapped thread arguments!\n");
    }

    // Kick off a thread for each file to store
    for (int i = 0; i < NUM_FILES; i++) {

        // Establish connection
        status = KineticClient_CreateConnection(&sessionConfig, &writeArgs[i].sessionHandle);
        status = KineticClient_CreateConnection(&writeArgs[i].session);
        if (status != KINETIC_STATUS_SUCCESS) {
            fprintf(stderr, "Failed connecting to the Kinetic device w/status: %s\n",
                Kinetic_GetStatusDescription(status));
            return -1;
        }
        strcpy(writeArgs[i].ip, sessionConfig.host);
        strcpy(writeArgs[i].ip, writeArgs[i].session.host);

        // Create a ByteBuffer for consuming chunks of data out of for overlapped PUTs
        writeArgs[i].data = ByteBuffer_Create(buf, dataLen, 0);
@@ -160,7 +158,7 @@ int main(int argc, char** argv)
        if (joinStatus != 0) {
            fprintf(stderr, "pthread join failed!\n");
        }
        KineticClient_DestroyConnection(&writeArgs[i].sessionHandle);
        KineticClient_DestroyConnection(&writeArgs[i].session);
    }

    // Shutdown client connection and cleanup
Loading