diff --git a/DPDPU/Common/Include/DPU/FileService.h b/DPDPU/Common/Include/DPU/FileService.h index 64c6f19..c36b995 100644 --- a/DPDPU/Common/Include/DPU/FileService.h +++ b/DPDPU/Common/Include/DPU/FileService.h @@ -55,8 +55,7 @@ void StartFileService( int Argc, char **Argv, - FileService *FS, - pthread_t *AppPthread + FileService *FS ); struct WorkerThreadExitCtx { diff --git a/DPDPU/Common/Source/DPU/FileService.c b/DPDPU/Common/Source/DPU/FileService.c index 39766b5..16a1074 100644 --- a/DPDPU/Common/Source/DPU/FileService.c +++ b/DPDPU/Common/Source/DPU/FileService.c @@ -252,8 +252,7 @@ void StartFileService( int Argc, char **Argv, - FileService *FS, - pthread_t *AppPthread + FileService *FS ) { G_INITIALIZATION_DONE = false; struct StartFileServiceCtx *StartCtx = malloc(sizeof(*StartCtx)); @@ -261,8 +260,7 @@ StartFileService( StartCtx->Argv = Argv; StartCtx->FS = FS; - SPDK_NOTICELOG("Calling pthread_create()...\n"); - pthread_create(AppPthread, NULL, StartFileServiceWrapper, StartCtx); + StartFileServiceWrapper(StartCtx); } // diff --git a/DPDPU/StorageEngine/DDSBackEndDPUService/Source/FileBackEnd.c b/DPDPU/StorageEngine/DDSBackEndDPUService/Source/FileBackEnd.c index 61eadc8..e4dc55b 100644 --- a/DPDPU/StorageEngine/DDSBackEndDPUService/Source/FileBackEnd.c +++ b/DPDPU/StorageEngine/DDSBackEndDPUService/Source/FileBackEnd.c @@ -28,12 +28,6 @@ static inline void DebugPrint(const char* Fmt, ...) { } volatile int ForceQuitFileBackEnd = 0; bool G_INITIALIZATION_DONE = false; -// -// the pthread on which spdk_app_start() is called, might wanna join? -// -// -pthread_t AppPthread; - // // Set a CM channel to be non-blocking // @@ -2689,57 +2683,19 @@ CheckAndProcessControlPlaneCompletions( return ret; } -// -// The entry point for the back end -// This is on the main thread not app thread, i.e. app start not called when entering this func -// -// -int RunFileBackEnd( - const char* ServerIpStr, - const int ServerPort, - const uint32_t MaxClients, - const uint32_t MaxBuffs, - int Argc, - char **Argv +void* DMAAgentThread( + void* Arg ) { - BackEndConfig config; struct rdma_cm_event *event; int ret = 0; int dataPlaneCounter = 0; - - // - // Initialize the back end configuration - // - // - config.ServerIp = inet_addr(ServerIpStr); - config.ServerPort = htons(ServerPort);; - config.MaxClients = MaxClients; - config.MaxBuffs = MaxBuffs; - config.CtrlConns = NULL; - config.BuffConns = NULL; - config.DMAConf.CmChannel = NULL; - config.DMAConf.CmId = NULL; - config.FS = NULL; - - // - // Allocate the file service object - // - // - config.FS = AllocateFileService(); - if (!config.FS) { - fprintf(stderr, "AllocateFileService failed\n"); - return ret; - } - - SPDK_NOTICELOG("Starting file service... StartFileService()\n"); - StartFileService(Argc, Argv, config.FS, &AppPthread); - + BackEndConfig* config = (BackEndConfig*)Arg; // // Initialize DMA // // - ret = InitDMA(&config.DMAConf, config.ServerIp, config.ServerPort); + ret = InitDMA(&config->DMAConf, config->ServerIp, config->ServerPort); if (ret) { fprintf(stderr, "InitDMA failed with %d\n", ret); return ret; @@ -2749,10 +2705,10 @@ int RunFileBackEnd( // Allocate connections // // - ret = AllocConns(&config); + ret = AllocConns(config); if (ret) { fprintf(stderr, "AllocConns failed with %d\n", ret); - TermDMA(&config.DMAConf); + TermDMA(&config->DMAConf); return ret; } @@ -2760,7 +2716,7 @@ int RunFileBackEnd( // Listen for incoming connections // // - ret = rdma_listen(config.DMAConf.CmId, LISTEN_BACKLOG); + ret = rdma_listen(config->DMAConf.CmId, LISTEN_BACKLOG); if (ret) { ret = errno; fprintf(stderr, "rdma_listen error %d\n", ret); @@ -2776,7 +2732,7 @@ int RunFileBackEnd( // Process connection events // // - ret = rdma_get_cm_event(config.DMAConf.CmChannel, &event); + ret = rdma_get_cm_event(config->DMAConf.CmChannel, &event); if (ret && errno != EAGAIN) { ret = errno; fprintf(stderr, "rdma_get_cm_event error %d\n", ret); @@ -2786,10 +2742,10 @@ int RunFileBackEnd( #ifdef DDS_STORAGE_FILE_BACKEND_VERBOSE fprintf(stdout, "cma_event type %s cma_id %p (%s)\n", rdma_event_str(event->event), event->id, - (event->id == config.DMAConf.CmId) ? "parent" : "child"); + (event->id == config->DMAConf.CmId) ? "parent" : "child"); #endif - ret = ProcessCmEvents(&config, event); + ret = ProcessCmEvents(config, event); if (ret) { fprintf(stderr, "ProcessCmEvents error %d\n", ret); SignalHandler(SIGTERM); @@ -2801,7 +2757,7 @@ int RunFileBackEnd( // Process RDMA events for control connections // // - ret = ProcessCtrlCqEvents(&config); + ret = ProcessCtrlCqEvents(config); if (ret) { fprintf(stderr, "ProcessCtrlCqEvents error %d\n", ret); SignalHandler(SIGTERM); @@ -2812,7 +2768,7 @@ int RunFileBackEnd( // Check and process control plane completions // // - ret = CheckAndProcessControlPlaneCompletions(&config); + ret = CheckAndProcessControlPlaneCompletions(config); if (ret) { fprintf(stderr, "CheckAndProcessControlPlaneCompletions error %d\n", ret); SignalHandler(SIGTERM); @@ -2825,7 +2781,7 @@ int RunFileBackEnd( // // // SPDK_NOTICELOG("buffer connections started\n"); - ret = ProcessBuffCqEvents(&config); + ret = ProcessBuffCqEvents(config); if (ret) { fprintf(stderr, "ProcessBuffCqEvents error %d\n", ret); SignalHandler(SIGTERM); @@ -2836,7 +2792,7 @@ int RunFileBackEnd( // Check and process I/O completions // // - ret = CheckAndProcessIOCompletions(&config); + ret = CheckAndProcessIOCompletions(config); if (ret) { fprintf(stderr, "CheckAndProcessIOCompletions error %d\n", ret); SignalHandler(SIGTERM); @@ -2853,11 +2809,72 @@ int RunFileBackEnd( // Clean up // // - DeallocConns(&config); - TermDMA(&config.DMAConf); - StopFileService(config.FS); - DeallocateFileService(config.FS); - sleep(1); // return only after all spdk threads have finished exiting, so wait a bit here + DeallocConns(config); + TermDMA(&config->DMAConf); + StopFileService(config->FS); + DeallocateFileService(config->FS); +} + +// +// The entry point for the back end +// This is on the main thread not app thread, i.e. app start not called when entering this func +// +// +int RunFileBackEnd( + const char* ServerIpStr, + const int ServerPort, + const uint32_t MaxClients, + const uint32_t MaxBuffs, + int Argc, + char **Argv +) { + BackEndConfig config; + pthread_t dmaAgent; + int ret; + + // + // Initialize the back end configuration + // + // + config.ServerIp = inet_addr(ServerIpStr); + config.ServerPort = htons(ServerPort);; + config.MaxClients = MaxClients; + config.MaxBuffs = MaxBuffs; + config.CtrlConns = NULL; + config.BuffConns = NULL; + config.DMAConf.CmChannel = NULL; + config.DMAConf.CmId = NULL; + config.FS = NULL; + + // + // Allocate the file service object + // + // + config.FS = AllocateFileService(); + if (!config.FS) { + fprintf(stderr, "AllocateFileService failed\n"); + return -1; + } + + // + // Run DMA agent in a new thread + // + // + ret = pthread_create(&dmaAgent, NULL, DMAAgentThread, (void*)&config); + if (ret) { + fprintf(stderr, "Failed to start DMA agent thread\n"); + return ret; + } + + // + // Run file service on the current thread + // + // + SPDK_NOTICELOG("Starting file service... StartFileService()\n"); + StartFileService(Argc, Argv, config.FS); + + pthread_join(dmaAgent, NULL); + return ret; }