Skip to content

Commit

Permalink
Addressing Sarat's Comments
Browse files Browse the repository at this point in the history
Signed-off-by: Varun Jain <[email protected]>
  • Loading branch information
vibrantvarun committed Feb 14, 2023
1 parent 4fc06ee commit 5fdfcd9
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ public List<Route> routes() {
);
}

@VisibleForTesting
@Override
public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
XContentParser parser = restRequest.contentParser();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

Expand Down Expand Up @@ -112,12 +113,15 @@ public void onFailure(Exception e) {
inProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS);
} catch (CompletionException e) {
if (e.getCause() instanceof TimeoutException) {
logger.info(" Request timed out with an exception ", e);
logger.error("Get Job Details timed out ", e);
}
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
} else {
throw e;
throw new RuntimeException(e.getCause());
}
} catch (Exception e) {
logger.info(" Could not process job index due to exception ", e);
}

return channel -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ public List<Route> routes() {
return ImmutableList.of(new Route(GET, String.format(Locale.ROOT, "%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_lock")));
}

@VisibleForTesting
@Override
public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
XContentParser parser = restRequest.contentParser();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

Expand All @@ -92,12 +93,15 @@ public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient cl
inProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS);
} catch (CompletionException e) {
if (e.getCause() instanceof TimeoutException) {
logger.error(" Request timed out with an exception ", e);
logger.error("Acquiring lock timed out ", e);
}
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
} else {
throw e;
throw new RuntimeException(e.getCause());
}
} catch (Exception e) {
logger.error(" Could not process acquire lock request due to exception ", e);
}

return channel -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
Expand All @@ -34,8 +35,6 @@
public class RestReleaseLockAction extends BaseRestHandler {

public static final String RELEASE_LOCK_ACTION = "release_lock_action";
public static final String LOCK_ID = "lock_id";

private final Logger logger = LogManager.getLogger(RestReleaseLockAction.class);

private LockService lockService;
Expand All @@ -52,13 +51,13 @@ public String getName() {
@Override
public List<Route> routes() {
return ImmutableList.of(
new Route(PUT, String.format(Locale.ROOT, "%s/%s/{%s}", JobSchedulerPlugin.JS_BASE_URI, "_release_lock", LOCK_ID))
new Route(PUT, String.format(Locale.ROOT, "%s/%s/{%s}", JobSchedulerPlugin.JS_BASE_URI, "_release_lock", LockModel.LOCK_ID))
);
}

@Override
public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient) throws IOException {
String lockId = restRequest.param(LOCK_ID);
String lockId = restRequest.param(LockModel.LOCK_ID);
if (lockId == null || lockId.isEmpty()) {
throw new IOException("lockId cannot be null or empty");
}
Expand All @@ -69,17 +68,20 @@ public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient no
findInProgressFuture.completeExceptionally(exception);
}));

LockModel releaseLock = null;
LockModel releaseLock;
try {
releaseLock = findInProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS).get();
} catch (CompletionException e) {
} catch (CompletionException | InterruptedException | ExecutionException e) {
if (e.getCause() instanceof TimeoutException) {
logger.error(" Request timed out with an exception ", e);
logger.error(" Finding lock timed out ", e);
}
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
} else {
throw e;
throw new RuntimeException(e.getCause());
}
} catch (Exception e) {
logger.error(" Could not find lock model with lockId due to exception ", e);
}

CompletableFuture<Boolean> releaseLockInProgressFuture = new CompletableFuture<>();
Expand All @@ -101,12 +103,15 @@ public void onFailure(Exception e) {
releaseLockInProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS);
} catch (CompletionException e) {
if (e.getCause() instanceof TimeoutException) {
logger.error(" Request timed out with an exception ", e);
logger.error("Release lock timed out ", e);
}
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
} else {
throw e;
throw new RuntimeException(e.getCause());
}
} catch (Exception e) {
logger.error(" Could not release lock with " + releaseLock.getLockId() + " due to exception ", e);
}
}

Expand All @@ -123,7 +128,7 @@ public void onFailure(Exception e) {
BytesRestResponse bytesRestResponse;
try {
builder.startObject();
builder.field("response", restResponseString);
builder.field("release-lock", restResponseString);
if (restResponseString.equals("failed")) {
restStatus = RestStatus.INTERNAL_SERVER_ERROR;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.rest.action;

public @interface VisibleForTesting {
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.rest;
package org.opensearch.jobscheduler.rest.action;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import java.io.IOException;
Expand All @@ -21,7 +21,6 @@
import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.jobscheduler.JobSchedulerPlugin;
import org.opensearch.jobscheduler.rest.action.RestGetJobDetailsAction;
import org.opensearch.jobscheduler.rest.request.GetJobDetailsRequest;
import org.opensearch.jobscheduler.utils.JobDetailsService;
import org.opensearch.rest.RestHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.rest;
package org.opensearch.jobscheduler.rest.action;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import java.io.IOException;
Expand All @@ -28,7 +28,6 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.jobscheduler.JobSchedulerPlugin;
import org.opensearch.jobscheduler.TestHelpers;
import org.opensearch.jobscheduler.rest.action.RestGetLockAction;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.jobscheduler.transport.AcquireLockRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.rest;
package org.opensearch.jobscheduler.rest.action;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import java.io.IOException;
Expand All @@ -20,7 +20,7 @@
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.jobscheduler.JobSchedulerPlugin;
import org.opensearch.jobscheduler.rest.action.RestReleaseLockAction;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.RestRequest;
Expand Down Expand Up @@ -48,13 +48,7 @@ public void setUp() throws Exception {
this.client = Mockito.mock(Client.class);
this.lockService = new LockService(client, clusterService);
restReleaseLockAction = new RestReleaseLockAction(this.lockService);
this.releaseLockPath = String.format(
Locale.ROOT,
"%s/%s/{%s}",
JobSchedulerPlugin.JS_BASE_URI,
"_release_lock",
restReleaseLockAction.LOCK_ID
);
this.releaseLockPath = String.format(Locale.ROOT, "%s/%s/{%s}", JobSchedulerPlugin.JS_BASE_URI, "_release_lock", LockModel.LOCK_ID);

}

Expand All @@ -70,7 +64,7 @@ public void testGetRoutes() {

public void testPrepareReleaseLockRequest() throws IOException {
Map<String, String> params = new HashMap<>();
params.put(restReleaseLockAction.LOCK_ID, "lock_id");
params.put(LockModel.LOCK_ID, "lock_id");
FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT)
.withPath(releaseLockPath)
.withParams(params)
Expand Down

0 comments on commit 5fdfcd9

Please sign in to comment.