-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ECR-4133] Add service resume #1372
Changes from 3 commits
1dda6cd
6cf11d2
4c0f684
54b738e
ef2c526
6a54eb3
bb36ec4
8c08540
367b32e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -182,7 +182,7 @@ public boolean isArtifactDeployed(ServiceArtifactId id) { | |
* @throws ExecutionException if such exception occurred in the service constructor; | ||
* must be translated into an error of kind {@link ErrorKind#SERVICE} | ||
* @throws UnexpectedExecutionException if any other exception occurred in | ||
* the the service constructor; it is included as cause. The cause must be translated | ||
* the service constructor; it is included as cause. The cause must be translated | ||
* into an error of kind {@link ErrorKind#UNEXPECTED} | ||
* @throws RuntimeException if the runtime failed to instantiate the service for other reason | ||
*/ | ||
|
@@ -206,6 +206,39 @@ public void initiateAddingService(Fork fork, ServiceInstanceSpec instanceSpec, | |
} | ||
} | ||
|
||
/** | ||
* Initiates resuming of previously stopped service instance. Service instance artifact could | ||
* be upgraded in advance to bring some new functionality. | ||
* | ||
* @param fork a database view to apply configuration | ||
* @param instanceSpec a service instance specification; must reference a deployed artifact | ||
* @param configuration a service instance configuration parameters as a serialized protobuf | ||
* message | ||
* @throws IllegalArgumentException if the given service instance is active; or its artifact | ||
* is not deployed | ||
* @throws ExecutionException if such exception occurred in the service method; | ||
* must be translated into an error of kind {@link ErrorKind#SERVICE} | ||
* @throws UnexpectedExecutionException if any other exception occurred in | ||
* the service method; it is included as cause. The cause must be translated | ||
* into an error of kind {@link ErrorKind#UNEXPECTED} | ||
* @throws RuntimeException if the runtime failed to resume the service for other reason | ||
*/ | ||
public void initializeResumingService(Fork fork, ServiceInstanceSpec instanceSpec, | ||
byte[] configuration) { | ||
try { | ||
synchronized (lock) { | ||
checkStoppedService(instanceSpec.getId()); | ||
ServiceWrapper service = createServiceInstance(instanceSpec); | ||
service.resume(fork, new ServiceConfiguration(configuration)); | ||
} | ||
logger.info("Resumed service: {}", instanceSpec); | ||
} catch (Exception e) { | ||
logger.error("Failed to resume a service {} instance with parameters {}", | ||
instanceSpec, configuration, e); | ||
throw e; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently this class is responsible to log errors, though there is ECR-3813 to shift that. |
||
} | ||
|
||
/** | ||
* Modifies the state of the given service instance at the runtime either by activation it or | ||
* stopping. The service instance should be successfully initialized | ||
|
@@ -486,16 +519,24 @@ public void close() throws InterruptedException { | |
} | ||
|
||
private ServiceWrapper getServiceById(Integer serviceId) { | ||
checkService(serviceId); | ||
checkActiveService(serviceId); | ||
return servicesById.get(serviceId); | ||
} | ||
|
||
/** Checks that the service with the given id is started in this runtime. */ | ||
private void checkService(Integer serviceId) { | ||
private void checkActiveService(Integer serviceId) { | ||
checkArgument(servicesById.containsKey(serviceId), | ||
"No service with id=%s in the Java runtime", serviceId); | ||
} | ||
|
||
/** Checks that the service with the given id is not active in this runtime. */ | ||
private void checkStoppedService(Integer serviceId) { | ||
ServiceWrapper activeService = servicesById.get(serviceId); | ||
checkArgument(activeService == null, | ||
"Service with id=%s should be stopped, but actually active. " | ||
+ "Found active service instance: %s", serviceId, activeService); | ||
} | ||
|
||
@VisibleForTesting | ||
Optional<ServiceWrapper> findService(String name) { | ||
return Optional.ofNullable(services.get(name)); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,6 +53,26 @@ default void initialize(Fork fork, Configuration configuration) { | |
// No configuration | ||
} | ||
|
||
/** | ||
* Resumes the previously stopped service instance. This method is called when | ||
* a stopped service instance is restarted. | ||
* | ||
* <p>This method may perform any changes to the database. For example, update some service | ||
* parameters, deprecate old entries etc. | ||
* | ||
* <p>Also, note that performing any bulk operations or data migration | ||
* <em>is not recommended</em> here. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd add short motivation, as Ostrovski clarified in the discussion. |
||
* <!--TODO: Add a link to the migration procedure --> | ||
* | ||
* @param fork a database fork to apply changes to. Not valid after this method returns | ||
* @param configuration the service configuration parameters | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As we've discussed, those are hardly configuration parameters, just generic 'arguments'. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there are 2 questions here:
Also, talking about Configuration, what is the reason for having interface + implementation(ServiceConfiguration), not just VO class? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It could be made one, it is made in this way probably to give more flexibility in terms of implementation (e.g., if we went with also properties). |
||
* @throws ExecutionException if the configuration parameters are not valid (e.g., | ||
* malformed, or do not meet the preconditions) | ||
*/ | ||
default void resume(Fork fork, Configuration configuration) { | ||
// No configuration | ||
} | ||
|
||
/** | ||
* Creates handlers that make up the public HTTP API of this service. | ||
* The handlers are added to the given router, which is then mounted at the following path: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -311,8 +311,6 @@ void activateServiceDuplicate() { | |
|
||
@Test | ||
void stopNonActiveService() { | ||
serviceRuntime.initialize(mock(Node.class)); | ||
|
||
ServiceArtifactId artifactId = ServiceArtifactId.newJavaId("com.acme/foo-service", "1.0.0"); | ||
ServiceInstanceSpec instanceSpec = ServiceInstanceSpec.newInstance(TEST_NAME, | ||
TEST_ID, artifactId); | ||
|
@@ -369,6 +367,68 @@ void updateServiceStatusBadStatus(Status badStatus) { | |
assertThat(exception).hasMessageContaining(instanceSpec.getName()); | ||
} | ||
|
||
@Test | ||
void initializeResumingService() { | ||
serviceRuntime.initialize(mock(Node.class)); | ||
|
||
ServiceArtifactId artifactId = ServiceArtifactId.parseFrom("1:com.acme/foo-service:1.0.0"); | ||
LoadedServiceDefinition serviceDefinition = LoadedServiceDefinition | ||
.newInstance(artifactId, TestServiceModule::new); | ||
ServiceInstanceSpec instanceSpec = ServiceInstanceSpec.newInstance(TEST_NAME, | ||
TEST_ID, artifactId); | ||
when(serviceLoader.findService(artifactId)) | ||
.thenReturn(Optional.of(serviceDefinition)); | ||
|
||
ServiceWrapper serviceWrapper = mock(ServiceWrapper.class); | ||
when(servicesFactory.createService(eq(serviceDefinition), eq(instanceSpec), | ||
any(MultiplexingNodeDecorator.class))) | ||
.thenReturn(serviceWrapper); | ||
|
||
// Create the service from the artifact | ||
Fork fork = mock(Fork.class); | ||
byte[] configuration = anyConfiguration(); | ||
serviceRuntime.initializeResumingService(fork, instanceSpec, configuration); | ||
|
||
// Check it was instantiated as expected | ||
verify(servicesFactory).createService(eq(serviceDefinition), eq(instanceSpec), | ||
any(MultiplexingNodeDecorator.class)); | ||
|
||
// and the service was configured | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. resumed |
||
Configuration expectedConfig = new ServiceConfiguration(configuration); | ||
verify(serviceWrapper).resume(fork, expectedConfig); | ||
|
||
// but not registered in the runtime yet: | ||
assertThat(serviceRuntime.findService(TEST_NAME)).isEmpty(); | ||
} | ||
|
||
@Test | ||
void initializeResumingActiveService() { | ||
serviceRuntime.initialize(mock(Node.class)); | ||
|
||
ServiceArtifactId artifactId = ServiceArtifactId.newJavaId("com.acme/foo-service", "1.0.0"); | ||
LoadedServiceDefinition serviceDefinition = LoadedServiceDefinition | ||
.newInstance(artifactId, TestServiceModule::new); | ||
ServiceInstanceSpec instanceSpec = ServiceInstanceSpec.newInstance(TEST_NAME, | ||
TEST_ID, artifactId); | ||
when(serviceLoader.findService(artifactId)) | ||
.thenReturn(Optional.of(serviceDefinition)); | ||
|
||
ServiceWrapper serviceWrapper = mock(ServiceWrapper.class); | ||
when(serviceWrapper.getId()).thenReturn(TEST_ID); | ||
when(serviceWrapper.getName()).thenReturn(TEST_NAME); | ||
when(servicesFactory.createService(eq(serviceDefinition), eq(instanceSpec), | ||
any(MultiplexingNodeDecorator.class))) | ||
.thenReturn(serviceWrapper); | ||
|
||
// Activate the service from the artifact | ||
serviceRuntime.updateInstanceStatus(instanceSpec, Status.ACTIVE); | ||
|
||
byte[] configuration = anyConfiguration(); | ||
Fork fork = mock(Fork.class); | ||
assertThrows(IllegalArgumentException.class, | ||
() -> serviceRuntime.initializeResumingService(fork, instanceSpec, configuration)); | ||
} | ||
|
||
@Test | ||
void shutdown() throws InterruptedException { | ||
serviceRuntime.shutdown(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No data changes, according to the recent discussions, is no longer correct.
I'd add to the list below 'synchronous data migration'.