Commit 8c5388b3 authored by Greg Williams's avatar Greg Williams
Browse files

Updated metadata copy from responses to update all elements, and added support...

Updated metadata copy from responses to update all elements, and added support to retain newVersion and dbVersion buffers if both are specified.
Fixed client utility tests to perform PUT, GET, DELETE all within the same run of the utility so that entry fields are used properly.
parent b5f8c324
Loading
Loading
Loading
Loading
+0 −3
Original line number Diff line number Diff line
@@ -265,9 +265,6 @@ run: $(UTIL_EXEC) start_simulator
	@echo --------------------------------------------------------------------------------
	@echo
	$(UTIL_EXEC) noop
	exec $(UTIL_EXEC) put
	exec $(UTIL_EXEC) get
	exec $(UTIL_EXEC) delete
	exec $(UTIL_EXEC) put get delete
	@echo
	@echo Test Utility integration tests w/ kinetic-c lib passed!
+20 −20
Original line number Diff line number Diff line
@@ -72,7 +72,7 @@ static void* KineticAllocator_NewItem(KineticList* const list, size_t size)
    list->last = newItem;
    KineticAllocator_UnlockList(list);

    LOGF2("Allocated new list item @ 0x%0llX w/data @ 0x%0llX",
    LOGF3("Allocated new list item @ 0x%0llX w/data @ 0x%0llX",
         (long long)newItem, (long long)newItem->data);

    return newItem->data;
@@ -92,38 +92,38 @@ static void KineticAllocator_FreeItem(KineticList* const list, void* item)
            cur = cur->next;
        }
    }
    LOG2("  Done searching for item list item");
    LOG3("  Done searching for item list item");

    if ((cur != NULL) && (cur->data == item)) {
        LOG2("  item found! freeing it.");
        LOG3("  item found! freeing it.");

        // Handle PDU list emptied
        if (cur->previous == NULL) {
            LOG2("  At start of list.");
            LOG3("  At start of list.");
            if (cur->next == NULL) {
                LOG2("  Making it empty, since all deallocated!");
                LOG3("  Making it empty, since all deallocated!");
                list->start = NULL;
                list->last = NULL;
            }
            else {
                LOG2("  Moving current item to head, since head deallocated!");
                LOG3("  Moving current item to head, since head deallocated!");
                list->start = cur->next;
                list->start->previous = NULL;
            }
        }
        else {
            // Relink from previous to next, if avaliable
            LOG2("  Not at list start, so relinking list to free item.");
            LOG3("  Not at list start, so relinking list to free item.");
            if (cur->previous->next != NULL) {
                LOG2("  Relinking previous to next");
                LOG3("  Relinking previous to next");
                if (cur->next != NULL) {
                    LOG2("    next being reset!");
                    LOG3("    next being reset!");
                    cur->previous->next = cur->next;
                }
                else {
                    list->last = cur->previous;
                    list->last->next = NULL;
                    LOGF2("    next is NULL. End of list now @ 0x%0llX",
                    LOGF3("    next is NULL. End of list now @ 0x%0llX",
                         (long long)list->last);
                }
            }
@@ -133,7 +133,7 @@ static void KineticAllocator_FreeItem(KineticList* const list, void* item)
            }
        }

        LOGF2("  Freeing item @ 0x%0llX, item @ 0x%0llX",
        LOGF3("  Freeing item @ 0x%0llX, item @ 0x%0llX",
             (long long)cur, (long long)&cur->data);
        free(cur->data);
        cur->data = NULL;
@@ -146,32 +146,32 @@ static void KineticAllocator_FreeItem(KineticList* const list, void* item)
static void KineticAllocator_FreeList(KineticList* const list)
{
    if (list != NULL) {
        LOG2("Freeing list of all items");
        LOG3("Freeing list of all items");
        KineticAllocator_LockList(list);
        KineticListItem* current = list->start;

        while (current->next != NULL) {
            LOG2("Advancing to next list item...");
            LOG3("Advancing to next list item...");
            current = current->next;
        }

        while (current != NULL) {
            LOG2("  Current item not freed!");
            LOGF2("  DEALLOCATING item: 0x%0llX, data: 0x%llX, prev: 0x%0llX",
            LOGF3("  DEALLOCATING item: 0x%0llX, data: 0x%llX, prev: 0x%0llX",
                (long long)current,
                (long long)&current->data,
                (long long)current->previous);
            KineticListItem* curItem = current;
            KineticListItem* prevItem = current->previous;
            if (curItem != NULL) {
                LOG2("  Freeing list item");
                LOG3("  Freeing list item");
                if (curItem->data != NULL) {
                    free(curItem->data);
                }
                free(curItem);
            }
            current = prevItem;
            LOGF2("  on to prev=0x%llX", (long long)current);
            LOGF3("  on to prev=0x%llX", (long long)current);
        }

        *list = (KineticList) {
@@ -197,7 +197,7 @@ KineticPDU* KineticAllocator_NewPDU(KineticList* const list, KineticConnection*
    }
    assert(newPDU->proto == NULL);
    KINETIC_PDU_INIT(newPDU, connection);
    LOGF2("Allocated new PDU @ 0x%0llX", (long long)newPDU);
    LOGF3("Allocated new PDU @ 0x%0llX", (long long)newPDU);
    return newPDU;
}

@@ -205,7 +205,7 @@ void KineticAllocator_FreePDU(KineticList* const list, KineticPDU* pdu)
{
    KineticAllocator_LockList(list);
    if ((pdu->proto != NULL) && pdu->protobufDynamicallyExtracted) {
        LOG2("Freeing dynamically allocated protobuf");
        LOG3("Freeing dynamically allocated protobuf");
        KineticProto_Message__free_unpacked(pdu->proto, NULL);
    };
    KineticAllocator_UnlockList(list);
@@ -246,7 +246,7 @@ KineticPDU* KineticAllocator_GetNextPDU(KineticList* const list, KineticPDU* pdu
void KineticAllocator_FreeAllPDUs(KineticList* const list)
{
    if (list->start != NULL) {
        LOG2("Freeing all PDUs...");
        LOG3("Freeing all PDUs...");
        KineticAllocator_LockList(list);
        KineticListItem* current = list->start;
        while (current != NULL) {
@@ -268,7 +268,7 @@ void KineticAllocator_FreeAllPDUs(KineticList* const list)
bool KineticAllocator_ValidateAllMemoryFreed(KineticList* const list)
{
    bool empty = (list->start == NULL);
    LOGF2("  PDUList: 0x%0llX, empty=%s",
    LOGF3("  PDUList: 0x%0llX, empty=%s",
        (long long)list->start, empty ? "true" : "false");
    return empty;
}
+14 −2
Original line number Diff line number Diff line
@@ -208,9 +208,21 @@ KineticStatus KineticClient_Put(KineticSessionHandle handle,
    if (status == KINETIC_STATUS_SUCCESS) {
        // Propagate newVersion to dbVersion in metadata, if newVersion specified
        if (entry->newVersion.array.data != NULL && entry->newVersion.array.len > 0) {

            // If both buffers supplied, copy newVersion into dbVersion, and clear newVersion
            if (entry->dbVersion.array.data != NULL && entry->dbVersion.array.len > 0) {
                ByteBuffer_Reset(&entry->dbVersion);
                ByteBuffer_Append(&entry->dbVersion, entry->dbVersion.array.data, entry->dbVersion.bytesUsed);
                ByteBuffer_Reset(&entry->newVersion);
            }

            // If only newVersion buffer supplied, move newVersion buffer into dbVersion, and set newVersion to NULL buffer
            else {
                entry->dbVersion = entry->newVersion;
                entry->newVersion = BYTE_BUFFER_NONE;
            }

        }
    }

    KineticOperation_Free(&operation);
+10 −5
Original line number Diff line number Diff line
@@ -119,6 +119,7 @@ KineticStatus KineticPDU_Send(KineticPDU* request)
        }
    }

    LOG2("PDU sent successfully!");
    return KINETIC_STATUS_SUCCESS;
}

@@ -142,7 +143,7 @@ KineticStatus KineticPDU_Receive(KineticPDU* const response)
        return status;
    }
    else {
        LOG2("PDU header received successfully");
        LOG3("PDU header received successfully");
        KineticPDUHeader* headerNBO = &response->headerNBO;
        response->header = (KineticPDUHeader) {
            .versionPrefix = headerNBO->versionPrefix,
@@ -159,7 +160,7 @@ KineticStatus KineticPDU_Receive(KineticPDU* const response)
        return status;
    }
    else {
        LOG2("Received PDU protobuf");
        LOG3("Received PDU protobuf");
        KineticLogger_LogProtobuf(2, response->proto);
    }

@@ -174,7 +175,7 @@ KineticStatus KineticPDU_Receive(KineticPDU* const response)
            return KINETIC_STATUS_DATA_ERROR;
        }
        else {
            LOG2("Received protobuf HMAC validation succeeded");
            LOG3("Received protobuf HMAC validation succeeded");
        }
    }
    else if (response->proto->authType == KINETIC_PROTO_MESSAGE_AUTH_TYPE_PINAUTH) {
@@ -182,7 +183,7 @@ KineticStatus KineticPDU_Receive(KineticPDU* const response)
        return KINETIC_STATUS_DATA_ERROR;
    }
    else if (response->proto->authType == KINETIC_PROTO_MESSAGE_AUTH_TYPE_UNSOLICITEDSTATUS) {
        LOG1("Unsolicited status message is not authenticated");
        LOG3("Unsolicited status message is not authenticated");
    }

    // Extract embedded command, if supplied
@@ -221,7 +222,11 @@ KineticStatus KineticPDU_Receive(KineticPDU* const response)
        response->connection->connectionID = cmd->header->connectionID;
    }

    return KineticPDU_GetStatus(response);
    status = KineticPDU_GetStatus(response);
    if (status == KINETIC_STATUS_SUCCESS) {
        LOG2("PDU received successfully!");
    }
    return status;
}

KineticStatus KineticPDU_GetStatus(KineticPDU* pdu)
+8 −10
Original line number Diff line number Diff line
@@ -154,10 +154,8 @@ void KineticSocket_Close(int socket)

KineticStatus KineticSocket_Read(int socket, ByteBuffer* dest, size_t len)
{
#ifdef KINETIC_LOG_SOCKET_OPERATIONS
    LOGF1("Reading %zd bytes into buffer @ 0x%zX from fd=%d",
    LOGF2("Reading %zd bytes into buffer @ 0x%zX from fd=%d",
         len, (size_t)dest->array.data, socket);
#endif

    KineticStatus status = KINETIC_STATUS_INVALID;

@@ -210,7 +208,7 @@ KineticStatus KineticSocket_Read(int socket, ByteBuffer* dest, size_t len)
            }
            else {
                dest->bytesUsed += opStatus;
                LOGF1("Received %d bytes (%zd of %zd)",
                LOGF3("Received %d bytes (%zd of %zd)",
                     opStatus, dest->bytesUsed, len);
            }
        }
@@ -275,7 +273,7 @@ KineticStatus KineticSocket_Read(int socket, ByteBuffer* dest, size_t len)
                }
                else {
                    dest->bytesUsed += opStatus;
                    LOGF2("Flushed %d bytes from socket read pipe (%zd of %zd)",
                    LOGF3("Flushed %d bytes from socket read pipe (%zd of %zd)",
                         opStatus, dest->bytesUsed, len);
                }
            }
@@ -299,14 +297,14 @@ KineticStatus KineticSocket_Read(int socket, ByteBuffer* dest, size_t len)
             len, dest->array.len);
        return KINETIC_STATUS_BUFFER_OVERRUN;
    }
    LOGF1("Received %zd of %zd bytes requested", dest->bytesUsed, len);
    LOGF3("Received %zd of %zd bytes requested", dest->bytesUsed, len);
    return KINETIC_STATUS_SUCCESS;
}

KineticStatus KineticSocket_ReadProtobuf(int socket, KineticPDU* pdu)
{
    size_t bytesToRead = pdu->header.protobufLength;
    LOGF1("Reading %zd bytes of protobuf", bytesToRead);
    LOGF2("Reading %zd bytes of protobuf", bytesToRead);

    uint8_t* packed = (uint8_t*)malloc(bytesToRead);
    if (packed == NULL) {
@@ -336,14 +334,14 @@ KineticStatus KineticSocket_ReadProtobuf(int socket, KineticPDU* pdu)
    }
    else {
        pdu->protobufDynamicallyExtracted = true;
        LOG2("Protobuf unpacked successfully!");
        LOG3("Protobuf unpacked successfully!");
        return KINETIC_STATUS_SUCCESS;
    }
}

KineticStatus KineticSocket_Write(int socket, ByteBuffer* src)
{
    LOGF1("Writing %zu bytes to socket...", src->bytesUsed);
    LOGF3("Writing %zu bytes to socket...", src->bytesUsed);
    for (unsigned int bytesSent = 0; bytesSent < src->bytesUsed;) {
        int bytesRemaining = src->bytesUsed - bytesSent;
        int status = write(socket, &src->array.data[bytesSent], bytesRemaining);
@@ -361,7 +359,7 @@ KineticStatus KineticSocket_Write(int socket, ByteBuffer* src)
            LOGF2("Wrote %d bytes (%d of %zu sent)", status, bytesSent, src->bytesUsed);
        }
    }
    LOG2("Socket write completed successfully");
    LOG3("Socket write completed successfully");
    return KINETIC_STATUS_SUCCESS;
}

Loading