Skip to content

Commit

Permalink
Add service resume [ECR-4133] (#1372)
Browse files Browse the repository at this point in the history
  • Loading branch information
bullet-tooth authored and dmitry-timofeev committed Jan 23, 2020
1 parent 6aabf29 commit eb2b5f6
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 8 deletions.
4 changes: 3 additions & 1 deletion exonum-java-binding/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- `supervisor-mode` CLI parameter added for `generate-template` command. It
allows to configure the mode of the Supervisor service. Possible values are
"simple" and "decentralized". (#1361)
- Service instances can be stopped now. (#1358)
- Support of service instances lifecycle: they can be activated, stopped and resumed now.
Also, service instance artifacts can be upgraded before resuming which allows services
API update, add new service transactions, synchronous data migration etc. (#1358, #1372)

[blockchain-proofs]: https://exonum.com/doc/api/java-binding/0.10.0-SNAPSHOT/com/exonum/binding/core/blockchain/Blockchain.html#proofs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.exonum.binding.core.runtime;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import com.exonum.binding.common.crypto.PublicKey;
Expand All @@ -34,7 +35,7 @@ class MultiplexingNodeDecorator implements Node {
private boolean closed;

MultiplexingNodeDecorator(Node node) {
this.node = node;
this.node = checkNotNull(node);
this.closed = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public boolean isArtifactDeployed(ServiceArtifactId id) {
* @throws ExecutionException if such exception occurred in the service constructor;
* must be translated into an error of kind {@link ErrorKind#SERVICE}
* @throws UnexpectedExecutionException if any other exception occurred in
* the the service constructor; it is included as cause. The cause must be translated
* the service constructor; it is included as cause. The cause must be translated
* into an error of kind {@link ErrorKind#UNEXPECTED}
* @throws RuntimeException if the runtime failed to instantiate the service for other reason
*/
Expand All @@ -206,6 +206,38 @@ public void initiateAddingService(Fork fork, ServiceInstanceSpec instanceSpec,
}
}

/**
* Initiates resuming of previously stopped service instance. Service instance artifact could
* be upgraded in advance to bring some new functionality.
*
* @param fork a database view to apply changes to
* @param instanceSpec a service instance specification; must reference a deployed artifact
* @param arguments a service arguments as a serialized protobuf message
* @throws IllegalArgumentException if the given service instance is active; or its artifact
* is not deployed
* @throws ExecutionException if such exception occurred in the service method;
* must be translated into an error of kind {@link ErrorKind#SERVICE}
* @throws UnexpectedExecutionException if any other exception occurred in
* the service method; it is included as cause. The cause must be translated
* into an error of kind {@link ErrorKind#UNEXPECTED}
* @throws RuntimeException if the runtime failed to resume the service for other reason
*/
public void initializeResumingService(Fork fork, ServiceInstanceSpec instanceSpec,
byte[] arguments) {
try {
synchronized (lock) {
checkStoppedService(instanceSpec.getId());
ServiceWrapper service = createServiceInstance(instanceSpec);
service.resume(fork, arguments);
}
logger.info("Resumed service: {}", instanceSpec);
} catch (Exception e) {
logger.error("Failed to resume a service {} instance with parameters {}",
instanceSpec, arguments, e);
throw e;
}
}

/**
* Modifies the state of the given service instance at the runtime either by activation it or
* stopping. The service instance should be successfully initialized
Expand Down Expand Up @@ -486,16 +518,24 @@ public void close() throws InterruptedException {
}

private ServiceWrapper getServiceById(Integer serviceId) {
checkService(serviceId);
checkActiveService(serviceId);
return servicesById.get(serviceId);
}

/** Checks that the service with the given id is started in this runtime. */
private void checkService(Integer serviceId) {
private void checkActiveService(Integer serviceId) {
checkArgument(servicesById.containsKey(serviceId),
"No service with id=%s in the Java runtime", serviceId);
}

/** Checks that the service with the given id is not active in this runtime. */
private void checkStoppedService(Integer serviceId) {
ServiceWrapper activeService = servicesById.get(serviceId);
checkArgument(activeService == null,
"Service with id=%s should be stopped, but actually active. "
+ "Found active service instance: %s", serviceId, activeService);
}

@VisibleForTesting
Optional<ServiceWrapper> findService(String name) {
return Optional.ofNullable(services.get(name));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,26 @@ void initiateAddingService(long forkHandle, byte[] instanceSpec, byte[] configur
}
}

/**
* Starts resuming a service with the given specification.
*
* @param forkHandle a handle to a native fork object
* @param instanceSpec the service instance specification as a serialized {@link InstanceSpec}
* protobuf message
* @param arguments the service arguments as a serialized protobuf message
* @see ServiceRuntime#initializeResumingService(Fork, ServiceInstanceSpec, byte[])
*/
void initializeResumingService(long forkHandle, byte[] instanceSpec, byte[] arguments)
throws CloseFailuresException {
try (Cleaner cleaner = new Cleaner()) {
Fork fork = viewFactory.createFork(forkHandle, cleaner);
ServiceInstanceSpec javaInstanceSpec = parseInstanceSpec(instanceSpec);
serviceRuntime.initializeResumingService(fork, javaInstanceSpec, arguments);
} catch (CloseFailuresException e) {
handleCloseFailure(e);
}
}

/**
* Updates the status of the service instance.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ void initialize(Fork view, Configuration configuration) {
callServiceMethod(() -> service.initialize(view, configuration));
}

void resume(Fork view, byte[] arguments) {
callServiceMethod(() -> service.resume(view, arguments));
}

void executeTransaction(String interfaceName, int txId, byte[] arguments, int callerServiceId,
TransactionContext context) {
switch (interfaceName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,27 @@ default void initialize(Fork fork, Configuration configuration) {
// No configuration
}

/**
* Resumes the previously stopped service instance. This method is called when
* a stopped service instance is restarted.
*
* <p>This method may perform any changes to the database. For example, update some service
* parameters, deprecate old entries etc.
*
* <p>Also, note that performing any bulk operations or data migration
* <em>is not recommended</em> here, because this method is invoked synchronously
* when the block is committed.
* <!--TODO: Add a link to the migration procedure -->
*
* @param fork a database fork to apply changes to. Not valid after this method returns
* @param arguments the service arguments
* @throws ExecutionException if the arguments are not valid (e.g.,
* malformed, or do not meet the preconditions)
*/
default void resume(Fork fork, byte[] arguments) {
// No actions by default
}

/**
* Creates handlers that make up the public HTTP API of this service.
* The handlers are added to the given router, which is then mounted at the following path:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,33 @@ void addService() throws CloseFailuresException {
verify(serviceRuntime).initiateAddingService(fork, expected, configuration);
}

@Test
void initializeResumingService() throws CloseFailuresException {
long forkHandle = 0x110b;
Cleaner cleaner = new Cleaner();
Fork fork = Fork.newInstance(forkHandle, false, cleaner);
when(viewFactory.createFork(eq(forkHandle), any(Cleaner.class)))
.thenReturn(fork);

int serviceId = 1;
String serviceName = "s1";
ArtifactId artifact = ARTIFACT_ID;
byte[] instanceSpec = InstanceSpec.newBuilder()
.setId(serviceId)
.setName(serviceName)
.setArtifact(artifact)
.build()
.toByteArray();
byte[] arguments = bytes(1, 2);

serviceRuntimeAdapter.initializeResumingService(forkHandle, instanceSpec, arguments);

// Check the runtime was invoked with correct config
ServiceInstanceSpec expected = ServiceInstanceSpec.newInstance(serviceName, serviceId,
ServiceArtifactId.fromProto(artifact));
verify(serviceRuntime).initializeResumingService(fork, expected, arguments);
}

@Test
void afterTransactions() throws CloseFailuresException {
int serviceId = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,6 @@ void activateServiceDuplicate() {

@Test
void stopNonActiveService() {
serviceRuntime.initialize(mock(Node.class));

ServiceArtifactId artifactId = ServiceArtifactId.newJavaId("com.acme/foo-service", "1.0.0");
ServiceInstanceSpec instanceSpec = ServiceInstanceSpec.newInstance(TEST_NAME,
TEST_ID, artifactId);
Expand Down Expand Up @@ -369,6 +367,67 @@ void updateServiceStatusBadStatus(Status badStatus) {
assertThat(exception).hasMessageContaining(instanceSpec.getName());
}

@Test
void initializeResumingService() {
serviceRuntime.initialize(mock(Node.class));

ServiceArtifactId artifactId = ServiceArtifactId.parseFrom("1:com.acme/foo-service:1.0.0");
LoadedServiceDefinition serviceDefinition = LoadedServiceDefinition
.newInstance(artifactId, TestServiceModule::new);
ServiceInstanceSpec instanceSpec = ServiceInstanceSpec.newInstance(TEST_NAME,
TEST_ID, artifactId);
when(serviceLoader.findService(artifactId))
.thenReturn(Optional.of(serviceDefinition));

ServiceWrapper serviceWrapper = mock(ServiceWrapper.class);
when(servicesFactory.createService(eq(serviceDefinition), eq(instanceSpec),
any(MultiplexingNodeDecorator.class)))
.thenReturn(serviceWrapper);

// Create the service from the artifact
Fork fork = mock(Fork.class);
byte[] arguments = anyConfiguration();
serviceRuntime.initializeResumingService(fork, instanceSpec, arguments);

// Check it was instantiated as expected
verify(servicesFactory).createService(eq(serviceDefinition), eq(instanceSpec),
any(MultiplexingNodeDecorator.class));

// and the service was resumed
verify(serviceWrapper).resume(fork, arguments);

// but not registered in the runtime yet:
assertThat(serviceRuntime.findService(TEST_NAME)).isEmpty();
}

@Test
void initializeResumingActiveService() {
serviceRuntime.initialize(mock(Node.class));

ServiceArtifactId artifactId = ServiceArtifactId.newJavaId("com.acme/foo-service", "1.0.0");
LoadedServiceDefinition serviceDefinition = LoadedServiceDefinition
.newInstance(artifactId, TestServiceModule::new);
ServiceInstanceSpec instanceSpec = ServiceInstanceSpec.newInstance(TEST_NAME,
TEST_ID, artifactId);
when(serviceLoader.findService(artifactId))
.thenReturn(Optional.of(serviceDefinition));

ServiceWrapper serviceWrapper = mock(ServiceWrapper.class);
when(serviceWrapper.getId()).thenReturn(TEST_ID);
when(serviceWrapper.getName()).thenReturn(TEST_NAME);
when(servicesFactory.createService(eq(serviceDefinition), eq(instanceSpec),
any(MultiplexingNodeDecorator.class)))
.thenReturn(serviceWrapper);

// Activate the service from the artifact
serviceRuntime.updateInstanceStatus(instanceSpec, Status.ACTIVE);

byte[] arguments = anyConfiguration();
Fork fork = mock(Fork.class);
assertThrows(IllegalArgumentException.class,
() -> serviceRuntime.initializeResumingService(fork, instanceSpec, arguments));
}

@Test
void shutdown() throws InterruptedException {
serviceRuntime.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,38 @@ void initializeWrapsRuntimeExceptions() {
assertThat(actual).hasCause(e);
}

@Test
void resume() {
Fork fork = mock(Fork.class);
byte[] arguments = new byte[0];
serviceWrapper.resume(fork, arguments);
verify(service).resume(fork, arguments);
}

@Test
void resumePropagatesExecutionException() {
ExecutionException e = new ExecutionException((byte) 1);
Fork fork = mock(Fork.class);
byte[] arguments = new byte[0];
doThrow(e).when(service).resume(fork, arguments);

ExecutionException actual = assertThrows(ExecutionException.class,
() -> serviceWrapper.resume(fork, arguments));
assertThat(actual).isSameAs(e);
}

@Test
void resumeWrapsRuntimeExceptions() {
RuntimeException e = new RuntimeException("unexpected");
Fork fork = mock(Fork.class);
byte[] arguments = new byte[0];
doThrow(e).when(service).resume(fork, arguments);

Exception actual = assertThrows(UnexpectedExecutionException.class,
() -> serviceWrapper.resume(fork, arguments));
assertThat(actual).hasCause(e);
}

@Test
void executeTransactionDefaultInterface() {
int txId = 2;
Expand Down Expand Up @@ -330,5 +362,6 @@ private static TransactionContext.Builder anyContext() {
.fork(mock(Fork.class));
}

private interface ConfigurableService extends Service, Configurable {}
private interface ConfigurableService extends Service, Configurable {
}
}

0 comments on commit eb2b5f6

Please sign in to comment.