Skip to content
This repository has been archived by the owner on Sep 24, 2024. It is now read-only.

Commit

Permalink
Make DMA agent thread a separate one
Browse files Browse the repository at this point in the history
  • Loading branch information
qizzz committed Dec 11, 2023
1 parent 243d667 commit 9e1066b
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 69 deletions.
3 changes: 1 addition & 2 deletions DPDPU/Common/Include/DPU/FileService.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ void
StartFileService(
int Argc,
char **Argv,
FileService *FS,
pthread_t *AppPthread
FileService *FS
);

struct WorkerThreadExitCtx {
Expand Down
6 changes: 2 additions & 4 deletions DPDPU/Common/Source/DPU/FileService.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,17 +252,15 @@ void
StartFileService(
int Argc,
char **Argv,
FileService *FS,
pthread_t *AppPthread
FileService *FS
) {
G_INITIALIZATION_DONE = false;
struct StartFileServiceCtx *StartCtx = malloc(sizeof(*StartCtx));
StartCtx->Argc = Argc;
StartCtx->Argv = Argv;
StartCtx->FS = FS;

SPDK_NOTICELOG("Calling pthread_create()...\n");
pthread_create(AppPthread, NULL, StartFileServiceWrapper, StartCtx);
StartFileServiceWrapper(StartCtx);
}

//
Expand Down
143 changes: 80 additions & 63 deletions DPDPU/StorageEngine/DDSBackEndDPUService/Source/FileBackEnd.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,6 @@ static inline void DebugPrint(const char* Fmt, ...) { }
volatile int ForceQuitFileBackEnd = 0;
bool G_INITIALIZATION_DONE = false;

//
// the pthread on which spdk_app_start() is called, might wanna join?
//
//
pthread_t AppPthread;

//
// Set a CM channel to be non-blocking
//
Expand Down Expand Up @@ -2689,57 +2683,19 @@ CheckAndProcessControlPlaneCompletions(
return ret;
}

//
// The entry point for the back end
// This is on the main thread not app thread, i.e. app start not called when entering this func
//
//
int RunFileBackEnd(
const char* ServerIpStr,
const int ServerPort,
const uint32_t MaxClients,
const uint32_t MaxBuffs,
int Argc,
char **Argv
void* DMAAgentThread(
void* Arg
) {
BackEndConfig config;
struct rdma_cm_event *event;
int ret = 0;
int dataPlaneCounter = 0;

//
// Initialize the back end configuration
//
//
config.ServerIp = inet_addr(ServerIpStr);
config.ServerPort = htons(ServerPort);;
config.MaxClients = MaxClients;
config.MaxBuffs = MaxBuffs;
config.CtrlConns = NULL;
config.BuffConns = NULL;
config.DMAConf.CmChannel = NULL;
config.DMAConf.CmId = NULL;
config.FS = NULL;

//
// Allocate the file service object
//
//
config.FS = AllocateFileService();
if (!config.FS) {
fprintf(stderr, "AllocateFileService failed\n");
return ret;
}

SPDK_NOTICELOG("Starting file service... StartFileService()\n");
StartFileService(Argc, Argv, config.FS, &AppPthread);

BackEndConfig* config = (BackEndConfig*)Arg;

//
// Initialize DMA
//
//
ret = InitDMA(&config.DMAConf, config.ServerIp, config.ServerPort);
ret = InitDMA(&config->DMAConf, config->ServerIp, config->ServerPort);
if (ret) {
fprintf(stderr, "InitDMA failed with %d\n", ret);
return ret;
Expand All @@ -2749,18 +2705,18 @@ int RunFileBackEnd(
// Allocate connections
//
//
ret = AllocConns(&config);
ret = AllocConns(config);
if (ret) {
fprintf(stderr, "AllocConns failed with %d\n", ret);
TermDMA(&config.DMAConf);
TermDMA(&config->DMAConf);
return ret;
}

//
// Listen for incoming connections
//
//
ret = rdma_listen(config.DMAConf.CmId, LISTEN_BACKLOG);
ret = rdma_listen(config->DMAConf.CmId, LISTEN_BACKLOG);
if (ret) {
ret = errno;
fprintf(stderr, "rdma_listen error %d\n", ret);
Expand All @@ -2776,7 +2732,7 @@ int RunFileBackEnd(
// Process connection events
//
//
ret = rdma_get_cm_event(config.DMAConf.CmChannel, &event);
ret = rdma_get_cm_event(config->DMAConf.CmChannel, &event);
if (ret && errno != EAGAIN) {
ret = errno;
fprintf(stderr, "rdma_get_cm_event error %d\n", ret);
Expand All @@ -2786,10 +2742,10 @@ int RunFileBackEnd(
#ifdef DDS_STORAGE_FILE_BACKEND_VERBOSE
fprintf(stdout, "cma_event type %s cma_id %p (%s)\n",
rdma_event_str(event->event), event->id,
(event->id == config.DMAConf.CmId) ? "parent" : "child");
(event->id == config->DMAConf.CmId) ? "parent" : "child");
#endif

ret = ProcessCmEvents(&config, event);
ret = ProcessCmEvents(config, event);
if (ret) {
fprintf(stderr, "ProcessCmEvents error %d\n", ret);
SignalHandler(SIGTERM);
Expand All @@ -2801,7 +2757,7 @@ int RunFileBackEnd(
// Process RDMA events for control connections
//
//
ret = ProcessCtrlCqEvents(&config);
ret = ProcessCtrlCqEvents(config);
if (ret) {
fprintf(stderr, "ProcessCtrlCqEvents error %d\n", ret);
SignalHandler(SIGTERM);
Expand All @@ -2812,7 +2768,7 @@ int RunFileBackEnd(
// Check and process control plane completions
//
//
ret = CheckAndProcessControlPlaneCompletions(&config);
ret = CheckAndProcessControlPlaneCompletions(config);
if (ret) {
fprintf(stderr, "CheckAndProcessControlPlaneCompletions error %d\n", ret);
SignalHandler(SIGTERM);
Expand All @@ -2825,7 +2781,7 @@ int RunFileBackEnd(
//
//
// SPDK_NOTICELOG("buffer connections started\n");
ret = ProcessBuffCqEvents(&config);
ret = ProcessBuffCqEvents(config);
if (ret) {
fprintf(stderr, "ProcessBuffCqEvents error %d\n", ret);
SignalHandler(SIGTERM);
Expand All @@ -2836,7 +2792,7 @@ int RunFileBackEnd(
// Check and process I/O completions
//
//
ret = CheckAndProcessIOCompletions(&config);
ret = CheckAndProcessIOCompletions(config);
if (ret) {
fprintf(stderr, "CheckAndProcessIOCompletions error %d\n", ret);
SignalHandler(SIGTERM);
Expand All @@ -2853,11 +2809,72 @@ int RunFileBackEnd(
// Clean up
//
//
DeallocConns(&config);
TermDMA(&config.DMAConf);
StopFileService(config.FS);
DeallocateFileService(config.FS);
sleep(1); // return only after all spdk threads have finished exiting, so wait a bit here
DeallocConns(config);
TermDMA(&config->DMAConf);
StopFileService(config->FS);
DeallocateFileService(config->FS);
}

//
// The entry point for the back end
// This is on the main thread not app thread, i.e. app start not called when entering this func
//
//
int RunFileBackEnd(
const char* ServerIpStr,
const int ServerPort,
const uint32_t MaxClients,
const uint32_t MaxBuffs,
int Argc,
char **Argv
) {
BackEndConfig config;
pthread_t dmaAgent;
int ret;

//
// Initialize the back end configuration
//
//
config.ServerIp = inet_addr(ServerIpStr);
config.ServerPort = htons(ServerPort);;
config.MaxClients = MaxClients;
config.MaxBuffs = MaxBuffs;
config.CtrlConns = NULL;
config.BuffConns = NULL;
config.DMAConf.CmChannel = NULL;
config.DMAConf.CmId = NULL;
config.FS = NULL;

//
// Allocate the file service object
//
//
config.FS = AllocateFileService();
if (!config.FS) {
fprintf(stderr, "AllocateFileService failed\n");
return -1;
}

//
// Run DMA agent in a new thread
//
//
ret = pthread_create(&dmaAgent, NULL, DMAAgentThread, (void*)&config);
if (ret) {
fprintf(stderr, "Failed to start DMA agent thread\n");
return ret;
}

//
// Run file service on the current thread
//
//
SPDK_NOTICELOG("Starting file service... StartFileService()\n");
StartFileService(Argc, Argv, config.FS);

pthread_join(dmaAgent, NULL);

return ret;
}

Expand Down

0 comments on commit 9e1066b

Please sign in to comment.