summaryrefslogtreecommitdiff
path: root/source/utilities/BulkHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'source/utilities/BulkHandler.cpp')
-rw-r--r--source/utilities/BulkHandler.cpp286
1 files changed, 286 insertions, 0 deletions
diff --git a/source/utilities/BulkHandler.cpp b/source/utilities/BulkHandler.cpp
new file mode 100644
index 0000000..9dc9169
--- /dev/null
+++ b/source/utilities/BulkHandler.cpp
@@ -0,0 +1,286 @@
+/******************************************************************************
+*
+* Copyright (C) ST-Ericsson SA 2011
+* License terms: 3-clause BSD license
+*
+******************************************************************************/
+
+#include "BulkHandler.h"
+#include "LCDriverMethods.h"
+#include "Buffers.h"
+#include "LcmInterface.h"
+#include "Logger.h"
+#include "Error.h"
+#include "lcdriver_error_codes.h"
+#include <string>
+#include <cstdio>
+using namespace std;
+
+/// <summary>
+/// BulkHandler constructor
+/// </summary>
+BulkHandler::BulkHandler(CLCDriverMethods *methods, Buffers *buffers, LcmInterface *lcmInterface, Logger *logger):
+ m_Methods(methods),
+ m_ReceiveQueue(32),
+ m_pBuffers(buffers),
+ m_pLcmInterface(lcmInterface),
+ m_pLogger(logger),
+ m_State(BULK_INACTIVE),
+ m_pFileWriteThread(0),
+ m_pBulkVector(0)
+{
+}
+
+/// <summary>
+/// BulkHandler destructor
+/// </summary>
+BulkHandler::~BulkHandler()
+{
+ TL_BulkVectorList_t *bulkVector = 0;
+
+ while (true) {
+ RemoveResult result = m_ReceiveQueue.RemoveHead(reinterpret_cast<void **>(&bulkVector), 0);
+
+ if (REMOVE_TIMEOUT == result) {
+ break;
+ } else if (REMOVE_CANCEL == result) {
+ continue;
+ }
+
+ m_pLcmInterface->BulkDestroyVector(bulkVector, true);
+#ifdef _BULKDEBUG
+ m_pLogger->log("BULK: Vector destroyed BulkVector = 0x%x", m_pBulkVector);
+#endif
+ }
+}
+
+int BulkHandler::Send(const string &sourceFile)
+{
+ if (BULK_INACTIVE != m_State) {
+ return BULK_ALREADY_IN_PROGRESS;
+ }
+
+ m_sFilePath = sourceFile;
+ m_State = BULK_TX;
+ return 0;
+}
+
+int BulkHandler::Receive(const string &destinationFile)
+{
+ if (BULK_INACTIVE != m_State) {
+ return BULK_ALREADY_IN_PROGRESS;
+ }
+
+ m_sFilePath = destinationFile;
+
+ // truncate the file for receiving
+ FILE *file = fopen(m_sFilePath.c_str(), "wb");
+
+ if (!file) {
+ return FILE_OPENING_ERROR;
+ }
+
+ fclose(file);
+
+ m_State = BULK_RX;
+
+ // start bulk file write thread
+ m_pFileWriteThread = new CThreadWrapper(FileWriteThreadFunction, this);
+ m_pFileWriteThread->ResumeThread();
+
+ return 0;
+}
+
+void BulkHandler::Finish()
+{
+ if (BULK_RX == m_State) {
+ m_State = BULK_INACTIVE;
+ m_ReceiveQueue.SignalEvent();
+ m_pFileWriteThread->WaitToDie(INFINITE);
+ } else {
+ m_State = BULK_INACTIVE;
+ }
+}
+
+void BulkHandler::HandleCommandRequest(uint16 session, uint32 chunkSize, uint64 offset, uint32 length, bool acknowledge)
+{
+ switch (SELECT_COMMAND(m_State, acknowledge)) {
+ case BULK_WRITE_REQUEST:
+ HandleWriteRequest(session, chunkSize, offset, length);
+ break;
+ case BULK_READ_REQUEST:
+ HandleReadRequest(session, chunkSize, offset, length);
+ break;
+ case BULK_RX_SESSION_END:
+ HandleRxSessionEnd(session, chunkSize, offset, length);
+ break;
+ case BULK_TX_SESSION_END:
+ HandleTxSessionEnd(session, chunkSize, offset, length);
+ break;
+ default:
+ break;
+ }
+}
+
+void BulkHandler::HandleWriteRequest(uint16 session, uint32 chunkSize, uint64 offset, uint32 length)
+{
+ int ReturnValue = E_SUCCESS;
+
+ uint32 VectorID = m_pLcmInterface->BulkOpenSession(session, BULK_RECEIVE, length);
+ VERIFY(BULK_ERROR != VectorID, BULK_VECTOR_ID_ERROR);
+#ifdef _BULKDEBUG
+ m_pLogger->log("BULK: Session opened with VectorID = %u", VectorID);
+#endif
+ m_pBulkVector = m_pLcmInterface->BulkCreateVector(VectorID, length, chunkSize, NULL);
+ VERIFY_SUCCESS(m_pLcmInterface->BulkStartSession(m_pBulkVector, offset));
+#ifdef _BULKDEBUG
+ m_pLogger->log("BULK: Session %u started with BulkVector = 0x%x", session, m_pBulkVector);
+#endif
+
+ErrorExit:
+
+ if (E_SUCCESS != ReturnValue) {
+ m_Methods->SignalError(ReturnValue);
+ }
+}
+
+void BulkHandler::HandleReadRequest(uint16 session, uint32 chunkSize, uint64 offset, uint32 length)
+{
+ int ReturnValue = E_SUCCESS;
+
+ uint32 VectorID = m_pLcmInterface->BulkOpenSession(session, BULK_SEND, length);
+ VERIFY(BULK_ERROR != VectorID, BULK_VECTOR_ID_ERROR);
+#ifdef _BULKDEBUG
+ m_pLogger->log("BULK: Session opened with VectorID = %u", VectorID);
+#endif
+ m_pBulkVector = m_pLcmInterface->BulkCreateVector(VectorID, length, chunkSize, NULL);
+
+ VERIFY_SUCCESS(m_pBuffers->AllocateBulkVector(m_pBulkVector, chunkSize, offset, length));
+ VERIFY_SUCCESS(m_pLcmInterface->BulkStartSession(m_pBulkVector, offset));
+#ifdef _BULKDEBUG
+ m_pLogger->log("BULK: Session %u started with BulkVector = 0x%x", session, m_pBulkVector);
+#endif
+
+ErrorExit:
+
+ if (E_SUCCESS != ReturnValue) {
+ m_Methods->SignalError(ReturnValue);
+ }
+}
+
+void BulkHandler::HandleRxSessionEnd(uint16 session, uint32 chunkSize, uint64 offset, uint32 length)
+{
+ int ReturnValue = E_SUCCESS;
+
+ m_ReceiveQueue.AddTail(m_pBulkVector);
+
+ VERIFY_SUCCESS(m_pLcmInterface->BulkCloseSession(m_pBulkVector));
+#ifdef _BULKDEBUG
+ m_pLogger->log("BULK: Session closed Session = %u", session);
+#endif
+
+ m_Methods->UpdateBulkProgress();
+
+ErrorExit:
+
+ if (E_SUCCESS != ReturnValue) {
+ m_Methods->SignalError(ReturnValue);
+ }
+}
+
+void BulkHandler::HandleTxSessionEnd(uint16 session, uint32 chunkSize, uint64 offset, uint32 length)
+{
+ int ReturnValue = E_SUCCESS;
+
+ m_pBuffers->ReleaseBulkVector(m_pBulkVector);
+
+ VERIFY_SUCCESS(m_pLcmInterface->BulkDestroyVector(m_pBulkVector, false));
+#ifdef _BULKDEBUG
+ m_pLogger->log("BULK: Vector destroyed BulkVector = 0x%x", m_pBulkVector);
+#endif
+
+ m_Methods->UpdateBulkProgress();
+
+ErrorExit:
+
+ if (E_SUCCESS != ReturnValue) {
+ m_Methods->SignalError(ReturnValue);
+ }
+}
+
+void BulkHandler::FlushReceiveQueue()
+{
+#ifdef _BULKDEBUG
+ m_pLogger->log("BULK: Flush receive queue");
+#endif
+ TL_BulkVectorList_t *bulkVector = 0;
+
+ while (true) {
+ RemoveResult result = m_ReceiveQueue.RemoveHead(reinterpret_cast<void **>(&bulkVector), 0);
+
+ if (REMOVE_TIMEOUT == result) {
+ break;
+ } else if (REMOVE_CANCEL == result) {
+ continue;
+ }
+
+ WriteBulkVector(bulkVector);
+
+ m_pLcmInterface->BulkDestroyVector(bulkVector, true);
+#ifdef _BULKDEBUG
+ m_pLogger->log("BULK: Vector destroyed BulkVector = 0x%x", bulkVector);
+#endif
+ }
+}
+
+void BulkHandler::WriteBulkVector(TL_BulkVectorList_t *bulkVector)
+{
+ FILE *file = fopen(m_sFilePath.c_str(), "ab");
+
+ if (file) {
+ for (size_t i = 0; i != bulkVector->Buffers; ++i) {
+ uint32 currentChunkSize = bulkVector->ChunkSize;
+
+ if (i + 1 == bulkVector->Buffers) { //lastchunk
+ currentChunkSize = bulkVector->Length - (i * bulkVector->ChunkSize);
+ }
+
+ fwrite(bulkVector->Entries[i].Payload_p, currentChunkSize, 1, file);
+ }
+
+ fclose(file);
+#ifdef _BULKDEBUG
+ m_pLogger->log("BULK: Vector written BulkVector = 0x%x", m_pBulkVector);
+#endif
+ }
+}
+
+#ifdef _WIN32
+unsigned int WINAPI BulkHandler::FileWriteThreadFunction(void *arg)
+#else
+void *BulkHandler::FileWriteThreadFunction(void *arg)
+#endif
+{
+ BulkHandler *pThis = static_cast<BulkHandler *>(arg);
+
+ TL_BulkVectorList_t *bulkVector = 0;
+
+ while (true) {
+ RemoveResult result = pThis->m_ReceiveQueue.RemoveHead(reinterpret_cast<void **>(&bulkVector), INFINITE);
+
+ if (REMOVE_SUCCESS == result) {
+ pThis->WriteBulkVector(bulkVector);
+
+ pThis->m_pLcmInterface->BulkDestroyVector(bulkVector, true);
+#ifdef _BULKDEBUG
+ pThis->m_pLogger->log("BULK: Vector destroyed BulkVector = 0x%x", bulkVector);
+#endif
+ } else {
+ break;
+ }
+ }
+
+ pThis->FlushReceiveQueue();
+
+ return 0;
+}