Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Two fixes to work coordination #777

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,6 @@
}

public void setup() throws IOException, InterruptedException {
var indexCheckResponse = httpClient.makeJsonRequest(HEAD_METHOD, INDEX_NAME, null, null);
if (indexCheckResponse.getStatusCode() == 200) {
log.info("Not creating " + INDEX_NAME + " because it already exists");
return;
}
log.atInfo().setMessage("Creating " + INDEX_NAME + " because it's HEAD check returned " +
indexCheckResponse.getStatusCode()).log();
var body = "{\n" +
" \"settings\": {\n" +
" \"index\": {" +
Expand Down Expand Up @@ -109,6 +102,13 @@
doUntil("setup-" + INDEX_NAME, 100, MAX_SETUP_RETRIES,
() -> {
try {
var indexCheckResponse = httpClient.makeJsonRequest(HEAD_METHOD, INDEX_NAME, null, null);
if (indexCheckResponse.getStatusCode() == 200) {
log.info("Not creating " + INDEX_NAME + " because it already exists");
return indexCheckResponse;

Check warning on line 108 in RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java#L107-L108

Added lines #L107 - L108 were not covered by tests
}
log.atInfo().setMessage("Creating " + INDEX_NAME + " because it's HEAD check returned " +
indexCheckResponse.getStatusCode()).log();
return httpClient.makeJsonRequest(PUT_METHOD, INDEX_NAME, null, body);
} catch (Exception e) {
throw Lombok.sneakyThrow(e);
Expand All @@ -123,7 +123,7 @@
return "[ statusCode: " + r.getStatusCode() + ", payload: " + payloadStr + "]";
}
},
(response, ignored) -> (response.getStatusCode() / 100) != 2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious - what was the original logic you were trying to achieve here? I didn't quite follow your explanation in the PR overview.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test determines when we're done needing to retry. When this is 'true', the work is deemed to be done. In other words, we would keep looping until we got something other than a 200. Notice that the beginning of the method was a HEAD that didn't have the same affliction - so in the worst case scenario, we'd burn a process and be resolved the next time that the program ran

(response, ignored) -> (response.getStatusCode() / 100) == 2);
peternied marked this conversation as resolved.
Show resolved Hide resolved
} catch (MaxTriesExceededException e) {
throw new IOException(e);
}
Expand Down Expand Up @@ -398,7 +398,7 @@
.replace(SCRIPT_VERSION_TEMPLATE, "poc")
.replace(WORKER_ID_TEMPLATE, workerId)
.replace(CLIENT_TIMESTAMP_TEMPLATE, Long.toString(timestampEpochSeconds))
.replace(OLD_EXPIRATION_THRESHOLD_TEMPLATE, Long.toString(timestampEpochSeconds+expirationWindowSeconds))
.replace(OLD_EXPIRATION_THRESHOLD_TEMPLATE, Long.toString(timestampEpochSeconds))
.replace(EXPIRATION_WINDOW_TEMPLATE, Long.toString(expirationWindowSeconds))
.replace(CLOCK_DEVIATION_SECONDS_THRESHOLD_TEMPLATE, Long.toString(tolerableClientServerClockDifferenceSeconds));

Expand Down Expand Up @@ -471,7 +471,7 @@
Object transformedValue;
}

private static class PotentialClockDriftDetectedException extends IllegalStateException {
public static class PotentialClockDriftDetectedException extends IllegalStateException {
public PotentialClockDriftDetectedException(String s) {
super(s);
}
Expand Down
8 changes: 4 additions & 4 deletions RFS/src/test/java/com/rfs/cms/WorkCoordinatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ void testCreateOrUpdateOrReturnAsIsRequest() throws Exception {
// log.info("doc4="+doc4);
}

//@Test
@Test
public void testAcquireLeaseForQuery() throws Exception {
var objMapper = new ObjectMapper();
final var NUM_DOCS = 40;
Expand Down Expand Up @@ -151,8 +151,8 @@ public void testAcquireLeaseForQuery() throws Exception {
try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(),
3600, "firstPass_NONE")) {
var nextWorkItem = workCoordinator.acquireNextWorkItem(Duration.ofSeconds(2));
log.error("Next work item picked=" + nextWorkItem);
Assertions.assertNull(nextWorkItem);
log.atInfo().setMessage(()->"Next work item picked=" + nextWorkItem).log();
Assertions.assertInstanceOf(IWorkCoordinator.NoAvailableWorkToBeDone.class, nextWorkItem);
}

Thread.sleep(expiration.multipliedBy(2).toMillis());
Expand Down Expand Up @@ -187,7 +187,7 @@ public String onNoAvailableWorkToBeDone() throws IOException {

@Override
public String onAcquiredWork(IWorkCoordinator.WorkItemAndDuration workItem) throws IOException {
log.error("Next work item picked=" + workItem);
log.info("Next work item picked=" + workItem);
Assertions.assertNotNull(workItem);
Assertions.assertNotNull(workItem.workItemId);
Assertions.assertTrue(workItem.leaseExpirationTime.isAfter(Instant.now()));
Expand Down
Loading