Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Communication Mechanism Work Item 6 Release lock api #312

Merged
merged 16 commits into from
Feb 15, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ private void createLock(final LockModel tempLock, ActionListener<LockModel> list
}
}

private void findLock(final String lockId, ActionListener<LockModel> listener) {
public void findLock(final String lockId, ActionListener<LockModel> listener) {
GetRequest getRequest = new GetRequest(LOCK_INDEX_NAME).id(lockId);
client.get(getRequest, ActionListener.wrap(response -> {
if (!response.isExists()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.SettingsFilter;

import org.opensearch.jobscheduler.rest.RestGetJobDetailsAction;
import org.opensearch.jobscheduler.rest.RestGetLockAction;
import org.opensearch.jobscheduler.rest.action.RestGetJobDetailsAction;
import org.opensearch.jobscheduler.rest.action.RestGetLockAction;
import org.opensearch.jobscheduler.rest.action.RestReleaseLockAction;
import org.opensearch.jobscheduler.scheduler.JobScheduler;
import org.opensearch.jobscheduler.spi.JobSchedulerExtension;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
Expand Down Expand Up @@ -230,7 +230,8 @@ public List getRestHandlers(
) {
RestGetJobDetailsAction restGetJobDetailsAction = new RestGetJobDetailsAction(jobDetailsService);
RestGetLockAction restGetLockAction = new RestGetLockAction(lockService);
return ImmutableList.of(restGetJobDetailsAction, restGetLockAction);
RestReleaseLockAction restReleaseLockAction = new RestReleaseLockAction(lockService);
return ImmutableList.of(restGetJobDetailsAction, restGetLockAction, restReleaseLockAction);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.rest;
package org.opensearch.jobscheduler.rest.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -16,7 +16,7 @@
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.jobscheduler.JobSchedulerPlugin;

import org.opensearch.jobscheduler.transport.GetJobDetailsRequest;
import org.opensearch.jobscheduler.rest.request.GetJobDetailsRequest;

import org.opensearch.jobscheduler.utils.JobDetailsService;
import org.opensearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -61,39 +61,30 @@ public String getName() {
public List<Route> routes() {
return ImmutableList.of(
// New Job Details Entry Request
new Route(PUT, String.format(Locale.ROOT, "%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_get/_job_details")),
new Route(PUT, String.format(Locale.ROOT, "%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_job_details")),
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
// Update Job Details Entry Request
new Route(
PUT,
String.format(
Locale.ROOT,
"%s/%s/{%s}",
JobSchedulerPlugin.JS_BASE_URI,
"_get/_job_details",
GetJobDetailsRequest.DOCUMENT_ID
)
String.format(Locale.ROOT, "%s/%s/{%s}", JobSchedulerPlugin.JS_BASE_URI, "_job_details", GetJobDetailsRequest.DOCUMENT_ID)
)

);
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
XContentParser parser = restRequest.contentParser();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

GetJobDetailsRequest getJobDetailsRequest = GetJobDetailsRequest.parse(parser);

final String[] jobDetailsResponseHolder = new String[1];
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved

String documentId = restRequest.param(GetJobDetailsRequest.DOCUMENT_ID);
String jobIndex = getJobDetailsRequest.getJobIndex();
String jobType = getJobDetailsRequest.getJobType();
String jobParameterAction = getJobDetailsRequest.getJobParameterAction();
String jobRunnerAction = getJobDetailsRequest.getJobRunnerAction();
String extensionUniqueId = getJobDetailsRequest.getExtensionUniqueId();

CompletableFuture<String[]> inProgressFuture = new CompletableFuture<>();
CompletableFuture<String> inProgressFuture = new CompletableFuture<>();

jobDetailsService.processJobDetails(
documentId,
Expand All @@ -106,8 +97,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
@Override
public void onResponse(String indexedDocumentId) {
// Set document Id
jobDetailsResponseHolder[0] = indexedDocumentId;
inProgressFuture.complete(jobDetailsResponseHolder);
inProgressFuture.complete(indexedDocumentId);
}

@Override
Expand All @@ -119,7 +109,7 @@ public void onFailure(Exception e) {
);

try {
inProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS).join();
inProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS);
} catch (CompletionException e) {
if (e.getCause() instanceof TimeoutException) {
logger.info(" Request timed out with an exception ", e);
Expand All @@ -131,15 +121,21 @@ public void onFailure(Exception e) {
}

return channel -> {
String jobDetailsResponseHolder = null;
try {
jobDetailsResponseHolder = inProgressFuture.get();
} catch (Exception e) {
logger.error("Exception occured in get job details ", e);
}
XContentBuilder builder = channel.newBuilder();
RestStatus restStatus = RestStatus.OK;
String restResponseString = jobDetailsResponseHolder[0] != null ? "success" : "failed";
String restResponseString = jobDetailsResponseHolder != null ? "success" : "failed";
BytesRestResponse bytesRestResponse;
try {
builder.startObject();
builder.field("response", restResponseString);
if (restResponseString.equals("success")) {
builder.field(GetJobDetailsRequest.DOCUMENT_ID, jobDetailsResponseHolder[0]);
builder.field(GetJobDetailsRequest.DOCUMENT_ID, jobDetailsResponseHolder);
} else {
restStatus = RestStatus.INTERNAL_SERVER_ERROR;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.rest;
package org.opensearch.jobscheduler.rest.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -39,7 +39,6 @@

import static org.opensearch.jobscheduler.spi.LockModel.GET_LOCK_ACTION;
import static org.opensearch.jobscheduler.spi.LockModel.SEQUENCE_NUMBER;
import static org.opensearch.jobscheduler.spi.LockModel.SEQUENCE_NUMBER;
import static org.opensearch.jobscheduler.spi.LockModel.PRIMARY_TERM;
import static org.opensearch.jobscheduler.spi.LockModel.LOCK_ID;
import static org.opensearch.jobscheduler.spi.LockModel.LOCK_MODEL;
Expand All @@ -48,9 +47,6 @@
* This class consists of the REST handler to GET a lock model for extensions
*/
public class RestGetLockAction extends BaseRestHandler {

public static LockModel lockModelResponseHolder;

private final Logger logger = LogManager.getLogger(RestGetLockAction.class);

public LockService lockService;
Expand All @@ -70,7 +66,7 @@ public List<Route> routes() {
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, because if it is not public then I cannot call it from Test class

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could make it package private and available by tests

XContentParser parser = restRequest.contentParser();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

Expand All @@ -82,24 +78,18 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient

// Process acquire lock request
CompletableFuture<LockModel> inProgressFuture = new CompletableFuture<>();
lockService.acquireLockWithId(jobIndexName, lockDurationSeconds, jobIndexName, new ActionListener<>() {
@Override
public void onResponse(LockModel lockModel) {

// set lockModel Response
lockModelResponseHolder = lockModel;
inProgressFuture.complete(lockModelResponseHolder);
}

@Override
public void onFailure(Exception e) {
logger.info("Could not acquire lock with ID : " + jobId, e);
inProgressFuture.completeExceptionally(e);
}
});
lockService.acquireLockWithId(
jobIndexName,
lockDurationSeconds,
jobId,
ActionListener.wrap(lockModel -> { inProgressFuture.complete(lockModel); }, exception -> {
logger.info("Could not acquire lock with ID : " + jobId, exception);
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
inProgressFuture.completeExceptionally(exception);
})
);

try {
inProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS).join();
inProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS);
} catch (CompletionException e) {
if (e.getCause() instanceof TimeoutException) {
logger.error(" Request timed out with an exception ", e);
Expand All @@ -112,6 +102,12 @@ public void onFailure(Exception e) {

return channel -> {
BytesRestResponse bytesRestResponse;
LockModel lockModelResponseHolder = null;
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
try {
lockModelResponseHolder = inProgressFuture.get();
} catch (Exception e) {
logger.error("Exception occured in acquiring lock ", e);
}
try (XContentBuilder builder = channel.newBuilder()) {
// Prepare response
RestStatus restStatus = RestStatus.OK;
Expand All @@ -137,7 +133,6 @@ public void onFailure(Exception e) {
bytesRestResponse = new BytesRestResponse(restStatus, builder);
channel.sendResponse(bytesRestResponse);
}

};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.rest.action;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.CompletionException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.jobscheduler.JobSchedulerPlugin;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.jobscheduler.utils.JobDetailsService;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import static org.opensearch.rest.RestRequest.Method.PUT;
import org.opensearch.rest.RestStatus;

public class RestReleaseLockAction extends BaseRestHandler {

public static final String RELEASE_LOCK_ACTION = "release_lock_action";
public static final String LOCK_ID = "lock_id";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets import this from org.opensearch.jobscheduler.spi.LockModel.LOCK_ID.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACK


private final Logger logger = LogManager.getLogger(RestReleaseLockAction.class);

private LockService lockService;

public RestReleaseLockAction(LockService lockService) {
this.lockService = lockService;
}

@Override
public String getName() {
return RELEASE_LOCK_ACTION;
}

@Override
public List<Route> routes() {
return ImmutableList.of(
new Route(PUT, String.format(Locale.ROOT, "%s/%s/{%s}", JobSchedulerPlugin.JS_BASE_URI, "_release_lock", LOCK_ID))
);
}

@Override
public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient) throws IOException {
String lockId = restRequest.param(LOCK_ID);
if (lockId == null || lockId.isEmpty()) {
throw new IOException("lockId cannot be null or empty");
}

CompletableFuture<LockModel> findInProgressFuture = new CompletableFuture<>();
lockService.findLock(lockId, ActionListener.wrap(lock -> { findInProgressFuture.complete(lock); }, exception -> {
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
logger.error("Could not find lock model with lockId " + lockId, exception);
findInProgressFuture.completeExceptionally(exception);
}));

LockModel releaseLock = null;
try {
releaseLock = findInProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS).get();
} catch (CompletionException e) {
if (e.getCause() instanceof TimeoutException) {
logger.error(" Request timed out with an exception ", e);
} else {
throw e;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. do we need the else ? And throw the exception?
The catch all at 87 will anyway take care of it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dbwiddis suggested to catch time out exception seperately

Copy link
Member

@saratvemulapalli saratvemulapalli Feb 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok if so we need need handle this cleanly, would it drop the rest request ? (I see we are throwing the error, I wonder how this is handled)
Is there a test which tests this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No as of now there is no test for this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I think I may have been confused before and reviewing a PR on opensearch cleared it up. We do need to clean up this exception handling. Take a look at ExternsionsManager and try to follow this pattern:

} catch (CompletionException e) {
    if (e.getCause() instanceof TimeoutException) {
        logger.info("No response from extension to request.");
    }
    if (e.getCause() instanceof RuntimeException) {
        throw (RuntimeException) e.getCause();
    } else if (e.getCause() instanceof Error) {
        throw (Error) e.getCause();
    } else {
        throw new RuntimeException(e.getCause());
    }
}

}
} catch (Exception e) {
logger.error(" Could not find lock model with lockId due to exception ", e);
}

CompletableFuture<Boolean> releaseLockInProgressFuture = new CompletableFuture<>();
if (releaseLock != null) {
lockService.release(releaseLock, new ActionListener<>() {
@Override
public void onResponse(Boolean response) {
releaseLockInProgressFuture.complete(response);
}

@Override
public void onFailure(Exception e) {
logger.error("Releasing lock failed with an exception", e);
releaseLockInProgressFuture.completeExceptionally(e);
}
});

try {
releaseLockInProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS);
} catch (CompletionException e) {
if (e.getCause() instanceof TimeoutException) {
logger.error(" Request timed out with an exception ", e);
} else {
throw e;
}
} catch (Exception e) {
logger.error(" Could not release lock with " + releaseLock.getLockId() + " due to exception ", e);
}
}

return channel -> {
boolean releaseResponse = false;
try {
releaseResponse = releaseLockInProgressFuture.get();
} catch (Exception e) {
logger.error("Exception occured in releasing lock ", e);
}
XContentBuilder builder = channel.newBuilder();
RestStatus restStatus = RestStatus.OK;
String restResponseString = releaseResponse ? "success" : "failed";
BytesRestResponse bytesRestResponse;
try {
builder.startObject();
builder.field("response", restResponseString);
Copy link
Member

@saratvemulapalli saratvemulapalli Feb 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would re-name the field to release-lock to be more explicit when the client reads it. response is generic and it brings ambiguity.

Suggested change
builder.field("response", restResponseString);
builder.field("release-lock", restResponseString);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACK

if (restResponseString.equals("failed")) {
restStatus = RestStatus.INTERNAL_SERVER_ERROR;
}
builder.endObject();
bytesRestResponse = new BytesRestResponse(restStatus, builder);
} finally {
builder.close();
}

channel.sendResponse(bytesRestResponse);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.transport;
package org.opensearch.jobscheduler.rest.request;

import java.util.Objects;
import org.opensearch.action.ActionRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.transport;
package org.opensearch.jobscheduler.transport.request;

import java.io.IOException;
import org.opensearch.common.io.stream.Writeable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.transport;
package org.opensearch.jobscheduler.transport.request;

import java.io.IOException;
import org.opensearch.common.bytes.BytesReference;
Expand Down
Loading