Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Communication Mechanism Work Item 6 Release lock api #312

Merged
merged 16 commits into from
Feb 15, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ private void createLock(final LockModel tempLock, ActionListener<LockModel> list
}
}

private void findLock(final String lockId, ActionListener<LockModel> listener) {
public void findLock(final String lockId, ActionListener<LockModel> listener) {
GetRequest getRequest = new GetRequest(LOCK_INDEX_NAME).id(lockId);
client.get(getRequest, ActionListener.wrap(response -> {
if (!response.isExists()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.SettingsFilter;

import org.opensearch.jobscheduler.rest.RestGetJobDetailsAction;
import org.opensearch.jobscheduler.rest.RestGetLockAction;
import org.opensearch.jobscheduler.rest.action.RestGetJobDetailsAction;
import org.opensearch.jobscheduler.rest.action.RestGetLockAction;
import org.opensearch.jobscheduler.rest.action.RestReleaseLockAction;
import org.opensearch.jobscheduler.scheduler.JobScheduler;
import org.opensearch.jobscheduler.spi.JobSchedulerExtension;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
Expand Down Expand Up @@ -230,7 +230,8 @@ public List getRestHandlers(
) {
RestGetJobDetailsAction restGetJobDetailsAction = new RestGetJobDetailsAction(jobDetailsService);
RestGetLockAction restGetLockAction = new RestGetLockAction(lockService);
return ImmutableList.of(restGetJobDetailsAction, restGetLockAction);
RestReleaseLockAction restReleaseLockAction = new RestReleaseLockAction(lockService);
return ImmutableList.of(restGetJobDetailsAction, restGetLockAction, restReleaseLockAction);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.rest;
package org.opensearch.jobscheduler.rest.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -16,7 +16,7 @@
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.jobscheduler.JobSchedulerPlugin;

import org.opensearch.jobscheduler.transport.GetJobDetailsRequest;
import org.opensearch.jobscheduler.rest.request.GetJobDetailsRequest;

import org.opensearch.jobscheduler.utils.JobDetailsService;
import org.opensearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -61,39 +61,31 @@ public String getName() {
public List<Route> routes() {
return ImmutableList.of(
// New Job Details Entry Request
new Route(PUT, String.format(Locale.ROOT, "%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_get/_job_details")),
new Route(PUT, String.format(Locale.ROOT, "%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_job_details")),
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
// Update Job Details Entry Request
new Route(
PUT,
String.format(
Locale.ROOT,
"%s/%s/{%s}",
JobSchedulerPlugin.JS_BASE_URI,
"_get/_job_details",
GetJobDetailsRequest.DOCUMENT_ID
)
String.format(Locale.ROOT, "%s/%s/{%s}", JobSchedulerPlugin.JS_BASE_URI, "_job_details", GetJobDetailsRequest.DOCUMENT_ID)
)

);
}

@VisibleForTesting
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
XContentParser parser = restRequest.contentParser();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

GetJobDetailsRequest getJobDetailsRequest = GetJobDetailsRequest.parse(parser);

final String[] jobDetailsResponseHolder = new String[1];
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved

String documentId = restRequest.param(GetJobDetailsRequest.DOCUMENT_ID);
String jobIndex = getJobDetailsRequest.getJobIndex();
String jobType = getJobDetailsRequest.getJobType();
String jobParameterAction = getJobDetailsRequest.getJobParameterAction();
String jobRunnerAction = getJobDetailsRequest.getJobRunnerAction();
String extensionUniqueId = getJobDetailsRequest.getExtensionUniqueId();

CompletableFuture<String[]> inProgressFuture = new CompletableFuture<>();
CompletableFuture<String> inProgressFuture = new CompletableFuture<>();

jobDetailsService.processJobDetails(
documentId,
Expand All @@ -106,8 +98,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
@Override
public void onResponse(String indexedDocumentId) {
// Set document Id
jobDetailsResponseHolder[0] = indexedDocumentId;
inProgressFuture.complete(jobDetailsResponseHolder);
inProgressFuture.complete(indexedDocumentId);
}

@Override
Expand All @@ -119,27 +110,36 @@ public void onFailure(Exception e) {
);

try {
inProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS).join();
inProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS);
} catch (CompletionException e) {
if (e.getCause() instanceof TimeoutException) {
logger.info(" Request timed out with an exception ", e);
logger.error("Get Job Details timed out ", e);
}
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
} else {
throw e;
throw new RuntimeException(e.getCause());
}
} catch (Exception e) {
logger.info(" Could not process job index due to exception ", e);
}

return channel -> {
String jobDetailsResponseHolder = null;
try {
jobDetailsResponseHolder = inProgressFuture.get();
} catch (Exception e) {
logger.error("Exception occured in get job details ", e);
}
XContentBuilder builder = channel.newBuilder();
RestStatus restStatus = RestStatus.OK;
String restResponseString = jobDetailsResponseHolder[0] != null ? "success" : "failed";
String restResponseString = jobDetailsResponseHolder != null ? "success" : "failed";
BytesRestResponse bytesRestResponse;
try {
builder.startObject();
builder.field("response", restResponseString);
if (restResponseString.equals("success")) {
builder.field(GetJobDetailsRequest.DOCUMENT_ID, jobDetailsResponseHolder[0]);
builder.field(GetJobDetailsRequest.DOCUMENT_ID, jobDetailsResponseHolder);
} else {
restStatus = RestStatus.INTERNAL_SERVER_ERROR;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.rest;
package org.opensearch.jobscheduler.rest.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -39,7 +39,6 @@

import static org.opensearch.jobscheduler.spi.LockModel.GET_LOCK_ACTION;
import static org.opensearch.jobscheduler.spi.LockModel.SEQUENCE_NUMBER;
import static org.opensearch.jobscheduler.spi.LockModel.SEQUENCE_NUMBER;
import static org.opensearch.jobscheduler.spi.LockModel.PRIMARY_TERM;
import static org.opensearch.jobscheduler.spi.LockModel.LOCK_ID;
import static org.opensearch.jobscheduler.spi.LockModel.LOCK_MODEL;
Expand All @@ -48,9 +47,6 @@
* This class consists of the REST handler to GET a lock model for extensions
*/
public class RestGetLockAction extends BaseRestHandler {

public static LockModel lockModelResponseHolder;

private final Logger logger = LogManager.getLogger(RestGetLockAction.class);

public LockService lockService;
Expand All @@ -69,6 +65,7 @@ public List<Route> routes() {
return ImmutableList.of(new Route(GET, String.format(Locale.ROOT, "%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_lock")));
}

@VisibleForTesting
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
XContentParser parser = restRequest.contentParser();
Expand All @@ -82,36 +79,39 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient

// Process acquire lock request
CompletableFuture<LockModel> inProgressFuture = new CompletableFuture<>();
lockService.acquireLockWithId(jobIndexName, lockDurationSeconds, jobIndexName, new ActionListener<>() {
@Override
public void onResponse(LockModel lockModel) {

// set lockModel Response
lockModelResponseHolder = lockModel;
inProgressFuture.complete(lockModelResponseHolder);
}

@Override
public void onFailure(Exception e) {
logger.info("Could not acquire lock with ID : " + jobId, e);
inProgressFuture.completeExceptionally(e);
}
});
lockService.acquireLockWithId(
jobIndexName,
lockDurationSeconds,
jobId,
ActionListener.wrap(lockModel -> { inProgressFuture.complete(lockModel); }, exception -> {
logger.info("Could not acquire lock with ID : " + jobId, exception);
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
inProgressFuture.completeExceptionally(exception);
})
);

try {
inProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS).join();
inProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS);
} catch (CompletionException e) {
if (e.getCause() instanceof TimeoutException) {
logger.error(" Request timed out with an exception ", e);
logger.error("Acquiring lock timed out ", e);
}
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
} else {
throw e;
throw new RuntimeException(e.getCause());
}
} catch (Exception e) {
logger.error(" Could not process acquire lock request due to exception ", e);
}

return channel -> {
BytesRestResponse bytesRestResponse;
LockModel lockModelResponseHolder = null;
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
try {
lockModelResponseHolder = inProgressFuture.get();
} catch (Exception e) {
logger.error("Exception occured in acquiring lock ", e);
}
try (XContentBuilder builder = channel.newBuilder()) {
// Prepare response
RestStatus restStatus = RestStatus.OK;
Expand All @@ -137,7 +137,6 @@ public void onFailure(Exception e) {
bytesRestResponse = new BytesRestResponse(restStatus, builder);
channel.sendResponse(bytesRestResponse);
}

};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.rest.action;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.jobscheduler.JobSchedulerPlugin;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.jobscheduler.utils.JobDetailsService;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import static org.opensearch.rest.RestRequest.Method.PUT;
import org.opensearch.rest.RestStatus;

public class RestReleaseLockAction extends BaseRestHandler {

public static final String RELEASE_LOCK_ACTION = "release_lock_action";
private final Logger logger = LogManager.getLogger(RestReleaseLockAction.class);

private LockService lockService;

public RestReleaseLockAction(LockService lockService) {
this.lockService = lockService;
}

@Override
public String getName() {
return RELEASE_LOCK_ACTION;
}

@Override
public List<Route> routes() {
return ImmutableList.of(
new Route(PUT, String.format(Locale.ROOT, "%s/%s/{%s}", JobSchedulerPlugin.JS_BASE_URI, "_release_lock", LockModel.LOCK_ID))
);
}

@Override
public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient) throws IOException {
String lockId = restRequest.param(LockModel.LOCK_ID);
if (lockId == null || lockId.isEmpty()) {
throw new IOException("lockId cannot be null or empty");
}

CompletableFuture<LockModel> findInProgressFuture = new CompletableFuture<>();
lockService.findLock(lockId, ActionListener.wrap(lock -> { findInProgressFuture.complete(lock); }, exception -> {
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
logger.error("Could not find lock model with lockId " + lockId, exception);
findInProgressFuture.completeExceptionally(exception);
}));

LockModel releaseLock;
try {
releaseLock = findInProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS).get();
} catch (CompletionException | InterruptedException | ExecutionException e) {
if (e.getCause() instanceof TimeoutException) {
logger.error(" Finding lock timed out ", e);
}
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
} else {
throw new RuntimeException(e.getCause());
}
}

CompletableFuture<Boolean> releaseLockInProgressFuture = new CompletableFuture<>();
if (releaseLock != null) {
lockService.release(releaseLock, new ActionListener<>() {
@Override
public void onResponse(Boolean response) {
releaseLockInProgressFuture.complete(response);
}

@Override
public void onFailure(Exception e) {
logger.error("Releasing lock failed with an exception", e);
releaseLockInProgressFuture.completeExceptionally(e);
}
});

try {
releaseLockInProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS);
} catch (CompletionException e) {
if (e.getCause() instanceof TimeoutException) {
logger.error("Release lock timed out ", e);
}
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
} else {
throw new RuntimeException(e.getCause());
}
}
}

return channel -> {
boolean releaseResponse = false;
try {
releaseResponse = releaseLockInProgressFuture.get();
} catch (Exception e) {
logger.error("Exception occured in releasing lock ", e);
}
XContentBuilder builder = channel.newBuilder();
RestStatus restStatus = RestStatus.OK;
String restResponseString = releaseResponse ? "success" : "failed";
BytesRestResponse bytesRestResponse;
try {
builder.startObject();
builder.field("release-lock", restResponseString);
if (restResponseString.equals("failed")) {
restStatus = RestStatus.INTERNAL_SERVER_ERROR;
}
builder.endObject();
bytesRestResponse = new BytesRestResponse(restStatus, builder);
} finally {
builder.close();
}

channel.sendResponse(bytesRestResponse);
};
}
}
Loading