diff --git a/src/conn.c b/src/conn.c index 415f00e9..c7dbeac3 100644 --- a/src/conn.c +++ b/src/conn.c @@ -204,6 +204,7 @@ _freeConn(natsConnection *nc) natsStrHash_Destroy(nc->respMap); natsCondition_Destroy(nc->reconnectCond); natsMutex_Destroy(nc->subsMu); + natsMutex_Destroy(nc->servicesMu); natsMutex_Destroy(nc->mu); NATS_FREE(nc->services); @@ -3239,6 +3240,8 @@ natsConn_create(natsConnection **newConn, natsOptions *options) s = natsMutex_Create(&(nc->mu)); if (s == NATS_OK) s = natsMutex_Create(&(nc->subsMu)); + if (s == NATS_OK) + s = natsMutex_Create(&(nc->servicesMu)); if (s == NATS_OK) s = _setupServerPool(nc); if (s == NATS_OK) @@ -4467,69 +4470,3 @@ natsConn_defaultErrHandler(natsConnection *nc, natsSubscription *sub, natsStatus } fflush(stderr); } - -int natsConn_getServices(microService ***services, natsConnection *nc) -{ - int numServices = 0; - natsConn_Lock(nc); - *services = nc->services; - numServices = nc->numServices; - natsConn_Unlock(nc); - return numServices; -} - -bool natsConn_removeService(natsConnection *nc, microService *service) -{ - bool removed = false; - if (nc == NULL || service == NULL) - return false; - - natsConn_Lock(nc); - for (int i = 0; i < nc->numServices; i++) - { - if (nc->services[i] == service) - { - for (int j = i; j < nc->numServices - 1; j++) - { - nc->services[j] = nc->services[j + 1]; - } - nc->numServices--; - removed = true; - break; - } - } - natsConn_Unlock(nc); - return removed; -} - -natsStatus natsConn_addService(natsConnection *nc, microService *service) -{ - natsStatus s = NATS_OK; - if (nc == NULL || service == NULL) - return nats_setDefaultError(NATS_INVALID_ARG); - - natsConn_Lock(nc); - if (nc->services == NULL) - { - nc->services = NATS_CALLOC(1, sizeof(microService *)); - if (nc->services == NULL) - s = nats_setDefaultError(NATS_NO_MEMORY); - } - else - { - microService **tmp = NATS_REALLOC(nc->services, (nc->numServices + 1) * sizeof(microService *)); - if (tmp == NULL) - s = nats_setDefaultError(NATS_NO_MEMORY); - else - nc->services = tmp; - } - - if (s == NATS_OK) - { - nc->services[nc->numServices] = service; - nc->numServices++; - } - natsConn_Unlock(nc); - - return s; -} diff --git a/src/conn.h b/src/conn.h index 7d4e1249..41e93f40 100644 --- a/src/conn.h +++ b/src/conn.h @@ -160,13 +160,4 @@ natsConn_close(natsConnection *nc); void natsConn_destroy(natsConnection *nc, bool fromPublicDestroy); -int -natsConn_getServices(microService ***services, natsConnection *nc); - -bool -natsConn_removeService(natsConnection *nc, microService *service); - -natsStatus -natsConn_addService(natsConnection *nc, microService *service); - #endif /* CONN_H_ */ diff --git a/src/micro.c b/src/micro.c index d72812be..17c1b286 100644 --- a/src/micro.c +++ b/src/micro.c @@ -29,6 +29,9 @@ static void _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, v static void _free_cloned_service_config(microServiceConfig *cfg); static void _free_service(microService *m); +static microError * +_attach_service_to_connection(natsConnection *nc, microService *service); + microError * micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *cfg) { @@ -53,7 +56,7 @@ micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *c MICRO_CALL(err, _clone_service_config(&m->cfg, cfg)); // Add the service to the connection. - MICRO_CALL(err, micro_ErrorFromStatus(natsConn_addService(m->nc, m))); + MICRO_CALL(err, _attach_service_to_connection(m->nc, m)); MICRO_CALL(err, (m->refs++, NULL)); // Wrap the connection callbacks before we subscribe to anything. @@ -244,7 +247,62 @@ microGroup_AddEndpoint(microGroup *g, microEndpointConfig *cfg) } static microError * -_stop_service(microService *m, bool unsubscribe, bool finalRelease) +_attach_service_to_connection(natsConnection *nc, microService *service) +{ + natsStatus s = NATS_OK; + if (nc == NULL || service == NULL) + return micro_ErrorInvalidArg; + + natsMutex_Lock(nc->servicesMu); + if (nc->services == NULL) + { + nc->services = NATS_CALLOC(1, sizeof(microService *)); + if (nc->services == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + } + else + { + microService **tmp = NATS_REALLOC(nc->services, (nc->numServices + 1) * sizeof(microService *)); + if (tmp == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + else + nc->services = tmp; + } + + if (s == NATS_OK) + { + nc->services[nc->numServices] = service; + nc->numServices++; + } + natsMutex_Unlock(nc->servicesMu); + + return micro_ErrorFromStatus(s); +} + +static void +_detach_service_from_connection(natsConnection *nc, microService *m) +{ + if (nc == NULL || m == NULL) + return; + + natsMutex_Lock(nc->servicesMu); + for (int i = 0; i < nc->numServices; i++) + { + if (nc->services[i] != m) + continue; + + for (int j = i; j < nc->numServices - 1; j++) + nc->services[j] = nc->services[j + 1]; + nc->numServices--; + break; + } + natsMutex_Unlock(nc->servicesMu); + + return; +} + +static microError * +_stop_service(microService *m, bool unsubscribe, bool release) { microError *err = NULL; microEndpoint *ep = NULL; @@ -271,17 +329,13 @@ _stop_service(microService *m, bool unsubscribe, bool finalRelease) } } } - - if (natsConn_removeService(m->nc, m)) - m->refs--; } - if ((m->refs > 0) && finalRelease) + if (release) m->refs--; refs = m->refs; numEndpoints = m->numEndpoints; - _unlock_service(m); if ((refs == 0) && (numEndpoints == 0)) @@ -293,12 +347,12 @@ _stop_service(microService *m, bool unsubscribe, bool finalRelease) microError * microService_Stop(microService *m) { - // Public API: stop the service, unsubscribe, but don't do the final release. return _stop_service(m, true, false); } +// service lock must be held by the caller. static void -_remove_endpoint(microService *m, microEndpoint *toRemove) +_detach_endpoint_from_service(microService *m, microEndpoint *toRemove) { microEndpoint *ep = NULL; microEndpoint *prev_ep = NULL; @@ -306,18 +360,19 @@ _remove_endpoint(microService *m, microEndpoint *toRemove) if ((m == NULL) || (toRemove == NULL)) return; - for (ep = m->first_ep; ep != NULL; ep = ep->next) + for (ep = m->first_ep; (ep != NULL) && (ep != toRemove); prev_ep = ep, ep = ep->next) + ; + if (ep == NULL) + return; + + m->numEndpoints--; + if (prev_ep == NULL) + m->first_ep = ep->next; + else { - if (ep == toRemove) - { - m->numEndpoints--; - if (prev_ep == NULL) - m->first_ep = ep->next; - else - prev_ep->next = ep->next; - return; - } - prev_ep = ep; + micro_lock_endpoint(prev_ep); + prev_ep->next = ep->next; + micro_unlock_endpoint(prev_ep); } } @@ -328,7 +383,6 @@ void micro_release_endpoint_when_unsubscribed(void *closure) { microEndpoint *ep = (microEndpoint *)closure; microService *m = NULL; - natsSubscription *sub = NULL; microDoneHandler doneHandler = NULL; int refs = 0; @@ -339,27 +393,22 @@ void micro_release_endpoint_when_unsubscribed(void *closure) if ((m == NULL) || (m->service_mu == NULL)) return; + _lock_service(m); + micro_lock_endpoint(ep); - sub = ep->sub; - ep->sub = NULL; // Force the subscription to be destroyed now, so NULL out the pointer to avoid a double free. + _detach_endpoint_from_service(m, ep); refs = --(ep->refs); micro_unlock_endpoint(ep); - natsSubscription_Destroy(sub); - - // If this is the last endpoint, we need to notify the service's done - // callback. - _lock_service(m); - - _remove_endpoint(m, ep); - if (refs == 0) micro_free_endpoint(ep); - if (m->numEndpoints == 0) + { + // Mark the service as stopped before calling the done handler. + m->stopped = true; doneHandler = m->cfg->DoneHandler; + } - refs = m->refs; _unlock_service(m); // Special processing for the last endpoint. @@ -367,8 +416,8 @@ void micro_release_endpoint_when_unsubscribed(void *closure) { doneHandler(m); - if (refs == 0) - _free_service(m); + _detach_service_from_connection(m->nc, m); + _stop_service(m, false, true); // just release } } @@ -534,66 +583,80 @@ _free_cloned_service_config(microServiceConfig *cfg) static void _on_connection_closed(natsConnection *nc, void *ignored) { - microService *m = NULL; - microService **all = NULL; + natsMutex_Lock(nc->servicesMu); - int n = natsConn_getServices(&all, nc); - for (int i = 0; i < n; i++) - { - m = all[i]; - _stop_service(m, false, false); // subs will be terminated by the connection close. - } + // Stop all services. They will get detached from the connection when their + // subs are complete. + for (int i = 0; i < nc->numServices; i++) + _stop_service(nc->services[i], false, false); + + natsMutex_Unlock(nc->servicesMu); } -static void +static bool _on_service_error(microService *m, const char *subject, natsStatus s) { - microEndpoint *ep = NULL; - microError *err = NULL; + microEndpoint *found = NULL; + microError *err = NULL; if (m == NULL) - return; + return false; _lock_service(m); - for (ep = m->first_ep; - (ep != NULL) && !micro_match_endpoint_subject(ep->subject, subject); - ep = ep->next) - ; - micro_retain_endpoint(ep); // for the callback - _unlock_service(m); - if (ep != NULL) + for (microEndpoint *ep = m->first_ep; ep != NULL; ep = ep->next) { - if (m->cfg->ErrHandler != NULL) - (*m->cfg->ErrHandler)(m, ep, s); - - err = microError_Wrapf(micro_ErrorFromStatus(s), "NATS error on endpoint %s", ep->subject); - micro_update_last_error(ep, err); - microError_Destroy(err); + if (!micro_match_endpoint_subject(ep->subject, subject)) + continue; + found = ep; + break; } - micro_release_endpoint(ep); // after the callback + + if (found != NULL) + micro_retain_endpoint(found); + + _unlock_service(m); + + if (found == NULL) + return false; + + err = microError_Wrapf(micro_ErrorFromStatus(s), "NATS error on endpoint %s", subject); + micro_update_last_error(found, err); + microError_Destroy(err); + + if (m->cfg->ErrHandler != NULL) + (*m->cfg->ErrHandler)(m, found, s); + + micro_release_endpoint(found); + return true; } static void _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_used) { - microService *m = NULL; - microService **all = NULL; const char *subject = NULL; if (sub == NULL) - { return; - } + subject = natsSubscription_GetSubject(sub); - int n = natsConn_getServices(&all, nc); - for (int i = 0; i < n; i++) + // TODO: this would be a lot easier if sub had a ref to ep. + natsMutex_Lock(nc->servicesMu); + for (int i = 0; i < nc->numServices; i++) { - m = all[i]; - _on_service_error(m, subject, s); + microService *m = nc->services[i]; + + // See if the service owns the affected subscription, based on matching + // the subjects; notify it of the error. + if (!_on_service_error(m, subject, s)) + continue; + + // Stop the service in error. It will get detached from the connection + // and released when all of its subs are complete. _stop_service(m, true, false); } + natsMutex_Unlock(nc->servicesMu); } static inline microError * diff --git a/src/natsp.h b/src/natsp.h index e2e4a018..80164238 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -701,6 +701,7 @@ struct __natsConnection microService **services; int numServices; + natsMutex *servicesMu; natsConnStatus status; bool initc; // true if the connection is performing the initial connect diff --git a/test/test.c b/test/test.c index ac684374..2ea2a299 100644 --- a/test/test.c +++ b/test/test.c @@ -33561,7 +33561,7 @@ _startMicroservice(microService** new_m, natsConnection *nc, microServiceConfig static void _startMicroserviceOK(microService** new_m, natsConnection *nc, microServiceConfig *cfg, microEndpointConfig **eps, int num_eps, struct threadArg *arg) { - char buf[64]; + char buf[256]; snprintf(buf, sizeof(buf), "Start microservice %s: ", cfg->Name); test(buf); @@ -33576,6 +33576,15 @@ _startManyMicroservices(microService** svcs, int n, natsConnection *nc, microSer for (i = 0; i < n; i++) { + char buf[64]; + cfg->Version = "1.0.0"; + cfg->Description = "returns 42"; + + if (nats_IsStringEmpty(cfg->Name)) + { + snprintf(buf, sizeof(buf), "CoolService-%d", i); + cfg->Name = buf; + } _startMicroserviceOK(&(svcs[i]), nc, cfg, eps, num_eps, arg); } @@ -33584,7 +33593,6 @@ _startManyMicroservices(microService** svcs, int n, natsConnection *nc, microSer #define _waitForMicroservicesAllDone(_arg) \ { \ - nats_Sleep(50); \ natsMutex_Lock((_arg)->m); \ testf("Wait for %d microservices to stop: ", (_arg)->microRunningServiceCount); \ natsStatus waitStatus = NATS_OK; \ @@ -34098,13 +34106,11 @@ void test_MicroBasics(void) &ep2_cfg, }; microServiceConfig cfg = { - .Version = "1.0.0", - .Name = "CoolService", - .Description = "returns 42", .Metadata = (natsMetadata){ .List = (const char *[]){"skey1", "svalue1", "skey2", "svalue2"}, .Count = 2, }, + .Name = "ManyServicesSameName", .Endpoint = NULL, .State = NULL, }; @@ -34162,7 +34168,7 @@ void test_MicroBasics(void) test(buf); err = microService_GetInfo(&info, svcs[i]); testCond((err == NULL) && - (strcmp(info->Name, "CoolService") == 0) && + (strcmp(info->Name, "ManyServicesSameName") == 0) && (strlen(info->Id) > 0) && (strcmp(info->Description, "returns 42") == 0) && (strcmp(info->Version, "1.0.0") == 0) && @@ -34173,7 +34179,7 @@ void test_MicroBasics(void) // Make sure we can request valid info with $SRV.INFO request. test("Create INFO inbox: "); testCond(NATS_OK == natsInbox_Create(&inbox)); - micro_new_control_subject(&subject, MICRO_INFO_VERB, "CoolService", NULL); + micro_new_control_subject(&subject, MICRO_INFO_VERB, "ManyServicesSameName", NULL); test("Subscribe to INFO inbox: "); testCond(NATS_OK == natsConnection_SubscribeSync(&sub, nc, inbox)); test("Publish INFO request: "); @@ -34199,7 +34205,7 @@ void test_MicroBasics(void) snprintf(buf, sizeof(buf), "Validate INFO response strings#%d: ", i); test(buf); testCond( - (NATS_OK == nats_JSONGetStrPtr(js, "name", &str)) && (strcmp(str, "CoolService") == 0) + (NATS_OK == nats_JSONGetStrPtr(js, "name", &str)) && (strcmp(str, "ManyServicesSameName") == 0) && (NATS_OK == nats_JSONGetStrPtr(js, "description", &str)) && (strcmp(str, "returns 42") == 0) && (NATS_OK == nats_JSONGetStrPtr(js, "version", &str)) && (strcmp(str, "1.0.0") == 0) && (NATS_OK == nats_JSONGetStrPtr(js, "id", &str)) && (strlen(str) > 0) @@ -34251,7 +34257,7 @@ void test_MicroBasics(void) // Make sure we can request SRV.PING. test("Create PING inbox: "); testCond(NATS_OK == natsInbox_Create(&inbox)); - micro_new_control_subject(&subject, MICRO_PING_VERB, "CoolService", NULL); + micro_new_control_subject(&subject, MICRO_PING_VERB, "ManyServicesSameName", NULL); test("Subscribe to PING inbox: "); testCond(NATS_OK == natsConnection_SubscribeSync(&sub, nc, inbox)); test("Publish PING request: "); @@ -34273,7 +34279,7 @@ void test_MicroBasics(void) js = NULL; testCond((NATS_OK == nats_JSONParse(&js, reply->data, reply->dataLen)) && (NATS_OK == nats_JSONGetStrPtr(js, "name", &str)) && - (strcmp(str, "CoolService") == 0)); + (strcmp(str, "ManyServicesSameName") == 0)); nats_JSONDestroy(js); natsMsg_Destroy(reply); } @@ -34284,7 +34290,7 @@ void test_MicroBasics(void) // Get and validate $SRV.STATS from all service instances. test("Create STATS inbox: "); testCond(NATS_OK == natsInbox_Create(&inbox)); - micro_new_control_subject(&subject, MICRO_STATS_VERB, "CoolService", NULL); + micro_new_control_subject(&subject, MICRO_STATS_VERB, "ManyServicesSameName", NULL); test("Subscribe to STATS inbox: "); testCond(NATS_OK == natsConnection_SubscribeSync(&sub, nc, inbox)); test("Publish STATS request: "); @@ -34368,9 +34374,6 @@ void test_MicroStartStop(void) .Handler = _microHandleRequest42, }; microServiceConfig cfg = { - .Version = "1.0.0", - .Name = "CoolService", - .Description = "returns 42", .Endpoint = &ep_cfg, }; natsMsg *reply = NULL;