From b62778eb9179bfc02617ad384495d95936047990 Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Wed, 13 Nov 2024 11:55:47 -0700 Subject: [PATCH] Create RFS successor work items (#1104) Signed-off-by: Mikayla Thompson Co-authored-by: Andre Kurait --- .../tracing/IWorkCoordinationContexts.java | 13 + .../tracing/RootWorkCoordinationContext.java | 13 +- .../tracing/WorkCoordinationContexts.java | 60 ++++ .../workcoordination/IWorkCoordinator.java | 19 +- .../OpenSearchWorkCoordinator.java | 277 ++++++++++++++++-- .../tracing/DocumentMigrationContexts.java | 10 + .../tracing/IDocumentMigrationContexts.java | 5 + .../workcoordination/WorkCoordinatorTest.java | 235 ++++++++++++++- 8 files changed, 607 insertions(+), 25 deletions(-) diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IWorkCoordinationContexts.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IWorkCoordinationContexts.java index 3781e60c9..4dbcca1ca 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IWorkCoordinationContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IWorkCoordinationContexts.java @@ -13,6 +13,7 @@ class ActivityNames { public static final String ACQUIRE_SPECIFIC_WORK = "acquireSpecificWorkItem"; public static final String COMPLETE_WORK = "completeWork"; public static final String ACQUIRE_NEXT_WORK = "acquireNextWorkItem"; + public static final String CREATE_SUCCESSOR_WORK_ITEMS = "createSuccessorWorkItems"; private ActivityNames() {} } @@ -54,6 +55,8 @@ interface IBaseAcquireWorkContext extends IRetryableActivityContext {} interface IAcquireSpecificWorkContext extends IBaseAcquireWorkContext { String ACTIVITY_NAME = ActivityNames.ACQUIRE_SPECIFIC_WORK; + + ICreateSuccessorWorkItemsContext getCreateSuccessorWorkItemsContext(); } interface IAcquireNextWorkItemContext extends IBaseAcquireWorkContext { @@ -68,6 +71,9 @@ interface IAcquireNextWorkItemContext extends IBaseAcquireWorkContext { void recordRecoverableClockError(); void recordFailure(OpenSearchWorkCoordinator.PotentialClockDriftDetectedException e); + + ICreateSuccessorWorkItemsContext getCreateSuccessorWorkItemsContext(); + } interface ICompleteWorkItemContext extends IRetryableActivityContext { @@ -76,6 +82,13 @@ interface ICompleteWorkItemContext extends IRetryableActivityContext { IRefreshContext getRefreshContext(); } + interface ICreateSuccessorWorkItemsContext extends IRetryableActivityContext { + String ACTIVITY_NAME = ActivityNames.CREATE_SUCCESSOR_WORK_ITEMS; + IRefreshContext getRefreshContext(); + ICompleteWorkItemContext getCompleteWorkItemContext(); + ICreateUnassignedWorkItemContext getCreateUnassignedWorkItemContext(); + } + interface IScopedWorkContext extends IScopedInstrumentationAttributes { C createOpeningContext(); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/RootWorkCoordinationContext.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/RootWorkCoordinationContext.java index ee36f5ed5..82d3f6ee2 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/RootWorkCoordinationContext.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/RootWorkCoordinationContext.java @@ -18,6 +18,7 @@ public class RootWorkCoordinationContext extends RootOtelContext { public final WorkCoordinationContexts.AcquireSpecificWorkContext.MetricInstruments acquireSpecificWorkMetrics; public final WorkCoordinationContexts.CompleteWorkItemContext.MetricInstruments completeWorkMetrics; public final WorkCoordinationContexts.AcquireNextWorkItemContext.MetricInstruments acquireNextWorkMetrics; + public final WorkCoordinationContexts.CreateSuccessorWorkItemsContext.MetricInstruments createSuccessorWorkItemsMetrics; public RootWorkCoordinationContext(OpenTelemetry sdk, IContextTracker contextTracker) { this(sdk, contextTracker, null); @@ -38,6 +39,7 @@ public RootWorkCoordinationContext(OpenTelemetry sdk, acquireSpecificWorkMetrics = WorkCoordinationContexts.AcquireSpecificWorkContext.makeMetrics(meter); completeWorkMetrics = WorkCoordinationContexts.CompleteWorkItemContext.makeMetrics(meter); acquireNextWorkMetrics = WorkCoordinationContexts.AcquireNextWorkItemContext.makeMetrics(meter); + createSuccessorWorkItemsMetrics = WorkCoordinationContexts.CreateSuccessorWorkItemsContext.makeMetrics(meter); } public IWorkCoordinationContexts.IInitializeCoordinatorStateContext createCoordinationInitializationStateContext() { @@ -87,4 +89,13 @@ public IWorkCoordinationContexts.ICompleteWorkItemContext createCompleteWorkCont ) { return new WorkCoordinationContexts.CompleteWorkItemContext(this, enclosingScope); } -} + + public IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext createSuccessorWorkItemsContext() { + return createSuccessorWorkItemsContext(null); + } + + public IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext createSuccessorWorkItemsContext( + IScopedInstrumentationAttributes enclosingScope + ) { + return new WorkCoordinationContexts.CreateSuccessorWorkItemsContext(this, enclosingScope); + }} diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/WorkCoordinationContexts.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/WorkCoordinationContexts.java index ff0a26362..64b871325 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/WorkCoordinationContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/WorkCoordinationContexts.java @@ -248,6 +248,10 @@ private MetricInstruments(Meter meter, String activityName) { public MetricInstruments getRetryMetrics() { return getRootInstrumentationScope().acquireSpecificWorkMetrics; } + + public ICreateSuccessorWorkItemsContext getCreateSuccessorWorkItemsContext() { + return new CreateSuccessorWorkItemsContext(this.rootInstrumentationScope, this); + } } @Getter @@ -276,6 +280,10 @@ public IRefreshContext getRefreshContext() { return new Refresh(this.rootInstrumentationScope, this); } + public ICreateSuccessorWorkItemsContext getCreateSuccessorWorkItemsContext() { + return new CreateSuccessorWorkItemsContext(this.rootInstrumentationScope, this); + } + public static class MetricInstruments extends RetryMetricInstruments { public final LongCounter assignedCounter; public final LongCounter nothingAvailableCounter; @@ -363,4 +371,56 @@ public MetricInstruments getRetryMetrics() { return getRootInstrumentationScope().completeWorkMetrics; } } + + @Getter + class CreateSuccessorWorkItemsContext extends BaseSpanContext + implements + ICreateSuccessorWorkItemsContext, + RetryableActivityContextMetricMixin { + final IScopedInstrumentationAttributes enclosingScope; + + CreateSuccessorWorkItemsContext( + RootWorkCoordinationContext rootScope, + IScopedInstrumentationAttributes enclosingScope + ) { + super(rootScope); + this.enclosingScope = enclosingScope; + initializeSpan(rootScope); + } + + @Override + public String getActivityName() { + return ACTIVITY_NAME; + } + + @Override + public IRefreshContext getRefreshContext() { + return new Refresh(this.rootInstrumentationScope, this); + } + + @Override + public ICompleteWorkItemContext getCompleteWorkItemContext() { + return new CompleteWorkItemContext(this.rootInstrumentationScope, this); + } + + @Override + public ICreateUnassignedWorkItemContext getCreateUnassignedWorkItemContext() { + return null; + } + + public static class MetricInstruments extends RetryMetricInstruments { + private MetricInstruments(Meter meter, String activityName) { + super(meter, autoLabels(activityName)); + } + } + + public static @NonNull MetricInstruments makeMetrics(Meter meter) { + return new MetricInstruments(meter, ACTIVITY_NAME); + } + + @Override + public MetricInstruments getRetryMetrics() { + return getRootInstrumentationScope().createSuccessorWorkItemsMetrics; + } + } } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java index 595fdae20..5a5d1ba70 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java @@ -4,6 +4,7 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.function.Supplier; import org.opensearch.migrations.bulkload.tracing.IWorkCoordinationContexts; @@ -67,8 +68,8 @@ WorkAcquisitionOutcome createOrUpdateLeaseForWorkItem( String workItemId, Duration leaseDuration, Supplier contextSupplier - ) throws IOException; - + ) throws IOException, InterruptedException; + /** * Scan the created work items that have not yet had leases acquired and have not yet finished. * One of those work items will be returned along with a lease for how long this process may continue @@ -98,6 +99,20 @@ void completeWorkItem( Supplier contextSupplier ) throws IOException, InterruptedException; + /** + * Add the list of successor items to the work item, create new work items for each of the successors, and mark the + * original work item as completed. + * @param workItemId the work item that is being completed + * @param successorWorkItemIds the list of successor work items that will be created + * @throws IOException + * @throws InterruptedException + */ + void createSuccessorWorkItemsAndMarkComplete( + String workItemId, + ArrayList successorWorkItemIds, + Supplier contextSupplier + ) throws IOException, InterruptedException; + /** * @return the number of items that are not yet complete. This will include items with and without claimed leases. * @throws IOException diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java index 6c8106b86..f8ea25f4c 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java @@ -5,20 +5,28 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Optional; +import java.util.Spliterators; import java.util.function.BiPredicate; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.opensearch.migrations.bulkload.tracing.IWorkCoordinationContexts; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Lombok; import lombok.NonNull; import lombok.SneakyThrows; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -31,13 +39,19 @@ public class OpenSearchWorkCoordinator implements IWorkCoordinator { static final int MAX_DRIFT_RETRIES = 13; // last delay before failure: 40 seconds static final int MAX_MALFORMED_ASSIGNED_WORK_DOC_RETRIES = 17; // last delay before failure: 655.36 seconds static final int MAX_ASSIGNED_DOCUMENT_NOT_FOUND_RETRY_INTERVAL = 60 * 1000; + static final int MAX_CREATE_SUCCESSOR_WORK_ITEMS_RETRIES = 10; + static final int CREATE_SUCCESSOR_WORK_ITEMS_RETRY_BASE_MS = 10; // last delay before failure: 10 seconds + static final int MAX_CREATE_UNASSIGNED_SUCCESSOR_WORK_ITEM_RETRIES = 7; // last delay before failure: 1.2 seconds + static final int MAX_MARK_AS_COMPLETED_RETRIES = 7; // last delay before failure: 1.2 seconds + public static final String SCRIPT_VERSION_TEMPLATE = "{SCRIPT_VERSION}"; public static final String WORKER_ID_TEMPLATE = "{WORKER_ID}"; public static final String CLIENT_TIMESTAMP_TEMPLATE = "{CLIENT_TIMESTAMP}"; public static final String EXPIRATION_WINDOW_TEMPLATE = "{EXPIRATION_WINDOW}"; public static final String CLOCK_DEVIATION_SECONDS_THRESHOLD_TEMPLATE = "{CLOCK_DEVIATION_SECONDS_THRESHOLD}"; - public static final String OLD_EXPIRATION_THRESHOLD_TEMPLATE = "OLD_EXPIRATION_THRESHOLD"; + public static final String OLD_EXPIRATION_THRESHOLD_TEMPLATE = "{OLD_EXPIRATION_THRESHOLD}"; + public static final String SUCCESSOR_WORK_ITEM_IDS_TEMPLATE = "{SUCCESSOR_WORK_ITEM_IDS}"; public static final String RESULT_OPENSSEARCH_FIELD_NAME = "result"; public static final String EXPIRATION_FIELD_NAME = "expiration"; @@ -46,6 +60,11 @@ public class OpenSearchWorkCoordinator implements IWorkCoordinator { public static final String VERSION_CONFLICTS_FIELD_NAME = "version_conflicts"; public static final String COMPLETED_AT_FIELD_NAME = "completedAt"; public static final String SOURCE_FIELD_NAME = "_source"; + public static final String SUCCESSOR_ITEMS_FIELD_NAME = "successor_items"; + public static final String SUCCESSOR_ITEM_DELIMITER = ","; + + public static final int CREATED_RESPONSE_CODE = 201; + public static final int CONFLICT_RESPONSE_CODE = 409; public static final String QUERY_INCOMPLETE_EXPIRED_ITEMS_STR = " \"query\": {\n" + " \"bool\": {" @@ -86,6 +105,18 @@ private boolean waitExtendsPastLease(Duration nextRetryAtDuration) { } } + /** + * This is a WorkAcquisitionOutcome for a WorkItem that may or may not already have successor work items. + */ + @Getter + @AllArgsConstructor + @ToString + static class WorkItemWithPotentialSuccessors { + final String workItemId; + final Instant leaseExpirationTime; + final ArrayList successorWorkItemIds; + } + private final long tolerableClientServerClockDifferenceSeconds; private final AbstractedHttpClient httpClient; private final String workerId; @@ -114,6 +145,37 @@ public OpenSearchWorkCoordinator( this.objectMapper = new ObjectMapper(); } + @FunctionalInterface + public interface RetryableAction { + void execute() throws IOException, NonRetryableException, InterruptedException; + } + + private static void retryWithExponentialBackoff( + RetryableAction action, int maxRetries, long baseRetryTimeMs, Consumer exceptionConsumer) throws InterruptedException, IllegalStateException { + int attempt = 0; + while (true) { + try { + action.execute(); + break; // Exit if action succeeds + } catch (NonRetryableException e) { + Exception underlyingException = (Exception) e.getCause(); + exceptionConsumer.accept(underlyingException); + throw new IllegalStateException(underlyingException); + } catch (Exception e) { + attempt++; + if (attempt > maxRetries) { + exceptionConsumer.accept(e); + throw new RetriesExceededException(e, attempt); + } + Duration sleepDuration = Duration.ofMillis((long) (Math.pow(2.0, attempt - 1) * baseRetryTimeMs)); + log.atWarn().setCause(e) + .setMessage("Couldn't complete action due to exception. Backing off {} and trying again.") + .addArgument(sleepDuration).log(); + Thread.sleep(sleepDuration.toMillis()); + } + } + } + public void setup(Supplier contextSupplier) throws IOException, InterruptedException { var body = "{\n" @@ -138,6 +200,10 @@ public void setup(Supplier 0 && " - + // don't obtain a lease lock - " ctx._source." + COMPLETED_AT_FIELD_NAME + " == null) {" - + // already done - " if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " == params.workerId && " + + " if (params.expirationWindow > 0 && ctx._source." + COMPLETED_AT_FIELD_NAME + " == null) {" + + // work item is not completed, but may be assigned to this or a different worker (or unassigned) + " if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " == params.workerId && " + " ctx._source." + EXPIRATION_FIELD_NAME + " > serverTimeSeconds) {" - + // count as an update to force the caller to lookup the expiration time, but no need to modify it - " ctx.op = \\\"update\\\";" + + // count as an update to force the caller to lookup the expiration time, but no need to modify it + " ctx.op = \\\"update\\\";" + " } else if (ctx._source." + EXPIRATION_FIELD_NAME + " < serverTimeSeconds && " + // is expired " ctx._source." + EXPIRATION_FIELD_NAME + " < newExpiration) {" + // sanity check " ctx._source." + EXPIRATION_FIELD_NAME + " = newExpiration;" @@ -312,13 +376,20 @@ public boolean createUnassignedWorkItem( } } + private ArrayList getSuccessorItemsIfPresent(JsonNode responseDoc) { + if (responseDoc.has(SUCCESSOR_ITEMS_FIELD_NAME)) { + return new ArrayList<>(Arrays.asList(responseDoc.get(SUCCESSOR_ITEMS_FIELD_NAME).asText().split(SUCCESSOR_ITEM_DELIMITER))); + } + return null; + } + @Override @NonNull public WorkAcquisitionOutcome createOrUpdateLeaseForWorkItem( String workItemId, Duration leaseDuration, Supplier contextSupplier - ) throws IOException { + ) throws IOException, InterruptedException { try (var ctx = contextSupplier.get()) { var startTime = Instant.now(); var updateResponse = createOrUpdateLeaseForDocument(workItemId, leaseDuration.toSeconds()); @@ -335,10 +406,8 @@ public WorkAcquisitionOutcome createOrUpdateLeaseForWorkItem( ); final var responseDoc = objectMapper.readTree(httpResponse.getPayloadBytes()).path(SOURCE_FIELD_NAME); if (resultFromUpdate == DocumentModificationResult.UPDATED) { - return new WorkItemAndDuration( - workItemId, - Instant.ofEpochMilli(1000 * responseDoc.path(EXPIRATION_FIELD_NAME).longValue()) - ); + var leaseExpirationTime = Instant.ofEpochMilli(1000 * responseDoc.path(EXPIRATION_FIELD_NAME).longValue()); + return new WorkItemAndDuration(workItemId, leaseExpirationTime); } else if (!responseDoc.path(COMPLETED_AT_FIELD_NAME).isMissingNode()) { return new AlreadyCompleted(); } else if (resultFromUpdate == DocumentModificationResult.IGNORED) { @@ -552,7 +621,7 @@ UpdateResult assignOneWorkItem(long expirationWindowSeconds) throws IOException } } - private WorkItemAndDuration getAssignedWorkItemUnsafe() + private WorkItemWithPotentialSuccessors getAssignedWorkItemUnsafe() throws IOException, AssignedWorkDocumentNotFoundException, MalformedAssignedWorkDocumentException { final var queryWorkersAssignedItemsTemplate = "{\n" + " \"query\": {\n" @@ -600,12 +669,15 @@ private WorkItemAndDuration getAssignedWorkItemUnsafe() .addArgument(response::toDiagnosticString).log(); throw new MalformedAssignedWorkDocumentException(response); } - var rval = new WorkItemAndDuration(resultHitInner.get("_id").asText(), Instant.ofEpochMilli(1000 * expiration)); + + var responseDoc = resultHitInner.get(SOURCE_FIELD_NAME); + var successorItems = getSuccessorItemsIfPresent(responseDoc); + var rval = new WorkItemWithPotentialSuccessors(resultHitInner.get("_id").asText(), Instant.ofEpochMilli(1000 * expiration), successorItems); log.atInfo().setMessage("Returning work item and lease: {}").addArgument(rval).log(); return rval; } - private WorkItemAndDuration getAssignedWorkItem(LeaseChecker leaseChecker, + private WorkItemWithPotentialSuccessors getAssignedWorkItem(LeaseChecker leaseChecker, IWorkCoordinationContexts.IAcquireNextWorkItemContext ctx) throws RetriesExceededException, InterruptedException { @@ -645,6 +717,164 @@ private WorkItemAndDuration getAssignedWorkItem(LeaseChecker leaseChecker, } } + private void updateWorkItemWithSuccessors(String workItemId, ArrayList successorWorkItemIds) throws IOException, NonRetryableException { + final var updateSuccessorWorkItemsTemplate = "{\n" + + " \"script\": {\n" + + " \"lang\": \"painless\",\n" + + " \"params\": { \n" + + " \"clientTimestamp\": " + CLIENT_TIMESTAMP_TEMPLATE + ",\n" + + " \"workerId\": \"" + WORKER_ID_TEMPLATE + "\",\n" + + " \"successorWorkItems\": \"" + SUCCESSOR_WORK_ITEM_IDS_TEMPLATE + "\"\n" + + " },\n" + + " \"source\": \"" + + " if (ctx._source.scriptVersion != \\\"" + SCRIPT_VERSION_TEMPLATE + "\\\") {" + + " throw new IllegalArgumentException(\\\"scriptVersion mismatch. Not all participants are using the same script: sourceVersion=\\\" + ctx.source.scriptVersion);" + + " }" + + " if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " != params.workerId) {" + + " throw new IllegalArgumentException(\\\"work item was owned by \\\" + ctx._source." + + LEASE_HOLDER_ID_FIELD_NAME + " + \\\" not \\\" + params.workerId);" + + " }" + + " if (ctx._source." + SUCCESSOR_ITEMS_FIELD_NAME + " != null && ctx._source." + SUCCESSOR_ITEMS_FIELD_NAME + " != params.successorWorkItems) {" + + " throw new IllegalArgumentException(\\\"The " + SUCCESSOR_ITEMS_FIELD_NAME + " field cannot be updated with a different value.\\\")" + + " }" + + " ctx._source." + SUCCESSOR_ITEMS_FIELD_NAME + " = params.successorWorkItems;" + + "\"\n" + + " }\n" + + "}"; + + var body = updateSuccessorWorkItemsTemplate.replace(SCRIPT_VERSION_TEMPLATE, "poc") + .replace(WORKER_ID_TEMPLATE, workerId) + .replace(CLIENT_TIMESTAMP_TEMPLATE, Long.toString(clock.instant().toEpochMilli() / 1000)) + .replace(SUCCESSOR_WORK_ITEM_IDS_TEMPLATE, String.join(SUCCESSOR_ITEM_DELIMITER, successorWorkItemIds)); + + var response = httpClient.makeJsonRequest( + AbstractedHttpClient.POST_METHOD, + INDEX_NAME + "/_update/" + workItemId, + null, + body + ); + try { + DocumentModificationResult modificationResult = getResult(response); + if (DocumentModificationResult.UPDATED != modificationResult) { + throw new IllegalStateException( + "Unexpected response for workItemId: " + + workItemId + + ". Response: " + + response.toDiagnosticString() + ); + } + } catch (IllegalArgumentException e) { + var resultTree = objectMapper.readTree(response.getPayloadBytes()); + if (resultTree.has("error") && + resultTree.get("error").has("type") && + resultTree.get("error").get("type").asText().equals("illegal_argument_exception")) { + throw new NonRetryableException(new IllegalArgumentException(resultTree.get("error").get("caused_by").asText())); + } + throw new IllegalStateException( + "Unexpected response for workItemId: " + + workItemId + + ". Response: " + + response.toDiagnosticString() + ); + } + } + + // This is an idempotent function to create multiple unassigned work items. It uses the `create` function in the bulk + // API which creates a document only if the specified ID doesn't yet exist. It is distinct from createUnassignedWorkItem + // because it is an expected outcome of this function that sometimes the work item is already created. That function + // uses `createOrUpdateLease`, whereas this function deliberately never modifies an already-existing work item. + private void createUnassignedWorkItemsIfNonexistent(ArrayList workItemIds) throws IOException, IllegalStateException { + String workItemBodyTemplate = "{\"numAttempts\":0, \"scriptVersion\":\"" + SCRIPT_VERSION_TEMPLATE + "\", " + + "\"creatorId\":\"" + WORKER_ID_TEMPLATE + "\", \"" + EXPIRATION_FIELD_NAME + "\":0 }"; + String workItemBody = workItemBodyTemplate.replace(SCRIPT_VERSION_TEMPLATE, "poc").replace(WORKER_ID_TEMPLATE, workerId); + + StringBuilder body = new StringBuilder(); + for (var workItemId : workItemIds) { + body.append("{\"create\":{\"_id\":\"").append(workItemId).append("\"}}\n"); + body.append(workItemBody).append("\n"); + } + var response = httpClient.makeJsonRequest( + AbstractedHttpClient.POST_METHOD, + INDEX_NAME + "/_bulk", + null, + body.toString() + ); + var statusCode = response.getStatusCode(); + if (statusCode != 200) { + throw new IllegalStateException( + "A bulk request to create successor work item(s), " + + String.join(", ", workItemIds) + + "returned an unexpected status code " + + statusCode + + " instead of 200" + ); + } + // parse the response body and if any of the writes failed with anything EXCEPT a version conflict, throw an exception + var resultTree = objectMapper.readTree(response.getPayloadBytes()); + var errors = resultTree.path("errors").asBoolean(); + if (!errors) { + return; + } + // Sometimes these work items have already been created. This is because of the non-transactional nature of OpenSearch + // as a work coordinator. If a worker crashed/failed after updating the parent task's `successorItems` field, but before + // completing creation of all the successor items, some of them may already exist. The `create` action in a bulk + // request will not modify those items, but it will return a 409 CONFLICT response code for them. + var acceptableStatusCodes = List.of(CREATED_RESPONSE_CODE, CONFLICT_RESPONSE_CODE); + + var resultsIncludeUnacceptableStatusCodes = StreamSupport.stream( + Spliterators.spliteratorUnknownSize(resultTree.path("items").elements(), 0), false + ).anyMatch(item -> !acceptableStatusCodes.contains(item.path("create").path("status").asInt())); + + if (resultsIncludeUnacceptableStatusCodes) { + throw new IllegalStateException( + "One or more of the successor work item(s) could not be created: " + + String.join(", ", workItemIds) + + ". Response: " + + response.toDiagnosticString() + ); + } + + } + + @Override + public void createSuccessorWorkItemsAndMarkComplete( + String workItemId, + ArrayList successorWorkItemIds, + Supplier contextSupplier + ) throws IOException, InterruptedException, IllegalStateException { + if (successorWorkItemIds.contains(workItemId)) { + throw new IllegalArgumentException(String.format("successorWorkItemIds %s can not not contain the parent workItemId: %s", successorWorkItemIds, workItemId)); + } + if (successorWorkItemIds.stream().anyMatch(itemId -> itemId.contains(SUCCESSOR_ITEM_DELIMITER))) { + throw new IllegalArgumentException("successorWorkItemIds can not contain the delimiter: " + SUCCESSOR_ITEM_DELIMITER); + } + try (var ctx = contextSupplier.get()) { + // It is extremely valuable to try hard to get the work item updated with successor item ids. If it fails without + // completing this step, the next worker to pick up this lease will rerun all of the work. If it fails after this + // step, the next worker to pick it up will see this update and resume the work of creating the successor work items, + // without redriving the work. + retryWithExponentialBackoff( + () -> updateWorkItemWithSuccessors(workItemId, successorWorkItemIds), + MAX_CREATE_SUCCESSOR_WORK_ITEMS_RETRIES, + CREATE_SUCCESSOR_WORK_ITEMS_RETRY_BASE_MS, + e -> ctx.addTraceException(e, true) + ); + retryWithExponentialBackoff( + () -> createUnassignedWorkItemsIfNonexistent(successorWorkItemIds), + MAX_CREATE_UNASSIGNED_SUCCESSOR_WORK_ITEM_RETRIES, + CREATE_SUCCESSOR_WORK_ITEMS_RETRY_BASE_MS, + e -> ctx.addTraceException(e, true) + ); + retryWithExponentialBackoff( + () -> completeWorkItem(workItemId, ctx::getCompleteWorkItemContext), + MAX_MARK_AS_COMPLETED_RETRIES, + CREATE_SUCCESSOR_WORK_ITEMS_RETRY_BASE_MS, + e -> ctx.addTraceException(e, true) + ); + refresh(ctx::getRefreshContext); + } + } + @AllArgsConstructor private static class MaxTriesExceededException extends Exception { final transient Object suppliedValue; @@ -693,6 +923,12 @@ public RetriesExceededException(Throwable cause, int retries) { } } + public static class NonRetryableException extends Exception { + public NonRetryableException(Exception cause) { + super(cause); + } + } + static U doUntil( String labelThatShouldBeAContext, long initialRetryDelayMs, @@ -776,7 +1012,14 @@ private void refresh(Supplier context switch (obtainResult) { case SUCCESSFUL_ACQUISITION: ctx.recordAssigned(); - return getAssignedWorkItem(leaseChecker, ctx); + var workItem = getAssignedWorkItem(leaseChecker, ctx); + if (workItem.successorWorkItemIds != null) { + // continue the previous work of creating the successors and marking this item as completed. + createSuccessorWorkItemsAndMarkComplete(workItem.workItemId, workItem.successorWorkItemIds, ctx::getCreateSuccessorWorkItemsContext); + // this item is not acquirable, so repeat the loop to find a new item. + continue; + } + return new WorkItemAndDuration(workItem.workItemId, workItem.leaseExpirationTime); case NOTHING_TO_ACQUIRE: ctx.recordNothingAvailable(); return new NoAvailableWorkToBeDone(); diff --git a/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/DocumentMigrationContexts.java b/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/DocumentMigrationContexts.java index 2c17bcb42..697834060 100644 --- a/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/DocumentMigrationContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/DocumentMigrationContexts.java @@ -72,6 +72,11 @@ public IWorkCoordinationContexts.ICompleteWorkItemContext createWorkCompletionCo public IAddShardWorkItemContext createShardWorkItemContext() { return new AddShardWorkItemContext(rootInstrumentationScope, this); } + + @Override + public IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext createSuccessorWorkItemsContext() { + return getWorkCoordinationRootContext().createSuccessorWorkItemsContext(getEnclosingScope()); + } } class AddShardWorkItemContext extends BaseNestedSpanContext< @@ -175,5 +180,10 @@ public IWorkCoordinationContexts.IAcquireNextWorkItemContext createOpeningContex public IWorkCoordinationContexts.ICompleteWorkItemContext createCloseContet() { return getWorkCoordinationRootContext().createCompleteWorkContext(); } + + @Override + public IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext createSuccessorWorkItemsContext() { + return getWorkCoordinationRootContext().createSuccessorWorkItemsContext(); + } } } diff --git a/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/IDocumentMigrationContexts.java b/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/IDocumentMigrationContexts.java index 9a3cb580d..6d7d257ce 100644 --- a/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/IDocumentMigrationContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/IDocumentMigrationContexts.java @@ -26,6 +26,8 @@ interface IShardSetupAttemptContext extends IScopedInstrumentationAttributes { IWorkCoordinationContexts.ICompleteWorkItemContext createWorkCompletionContext(); IAddShardWorkItemContext createShardWorkItemContext(); + + IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext createSuccessorWorkItemsContext(); } interface IAddShardWorkItemContext extends IScopedInstrumentationAttributes { @@ -42,5 +44,8 @@ interface IDocumentReindexContext IRfsContexts.IRequestContext createBulkRequest(); IRfsContexts.IRequestContext createRefreshContext(); + + IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext createSuccessorWorkItemsContext(); + } } diff --git a/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorTest.java b/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorTest.java index 39edd9a62..608062cbf 100644 --- a/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorTest.java +++ b/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorTest.java @@ -3,11 +3,14 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams; import org.opensearch.migrations.bulkload.framework.SearchClusterContainer; @@ -156,7 +159,7 @@ public void testAcquireLeaseForQuery() throws Exception { Assertions.assertEquals(NUM_DOCS, seenWorkerItems.size()); try ( - var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "firstPass_NONE") + var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "NONE") ) { var nextWorkItem = workCoordinator.acquireNextWorkItem( Duration.ofSeconds(2), @@ -180,12 +183,233 @@ public void testAcquireLeaseForQuery() throws Exception { InMemoryInstrumentationBundle.getMetricValueOrZero(metrics, "acquireNextWorkItemRetries")); } + @Test + public void testAddSuccessorWorkItems() throws Exception { + var testContext = WorkCoordinationTestContext.factory().withAllTracking(); + final var NUM_DOCS = 20; + final var NUM_SUCCESSOR_ITEMS = 3; + try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "docCreatorWorker")) { + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + for (var i = 0; i < NUM_DOCS; ++i) { + final var docId = "R" + i; + workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); + } + Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + } + + try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "claimItemWorker")) { + for (var i = 0; i < NUM_DOCS; ++i) { + String workItemId = getWorkItemAndVerify(testContext, "claimItemWorker", new ConcurrentHashMap<>(), Duration.ofSeconds(10), false, false); + var currentNumPendingItems = workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext); + var successorWorkItems = (ArrayList) IntStream.range(0, NUM_SUCCESSOR_ITEMS).mapToObj(j -> workItemId + "_successor_" + j).collect(Collectors.toList()); + + workCoordinator.createSuccessorWorkItemsAndMarkComplete( + workItemId, successorWorkItems, + testContext::createSuccessorWorkItemsContext + ); + Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + // One item marked as completed, and NUM_SUCCESSOR_ITEMS created. + Assertions.assertEquals(currentNumPendingItems - 1 + NUM_SUCCESSOR_ITEMS, workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext)); + } + Assertions.assertEquals(NUM_SUCCESSOR_ITEMS * NUM_DOCS, workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext)); + } + // Now go claim NUM_DOCS * NUM_SUCCESSOR_ITEMS items to verify all were created and are claimable. + try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "claimItemWorker")) { + for (var i = 0; i < NUM_DOCS * NUM_SUCCESSOR_ITEMS; ++i) { + getWorkItemAndVerify(testContext, "claimWorker_" + i, new ConcurrentHashMap<>(), Duration.ofSeconds(10), false, true); + } + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + } + } + + @Test + public void testAddSuccessorWorkItemsSimultaneous() throws Exception { + var testContext = WorkCoordinationTestContext.factory().withAllTracking(); + final var NUM_DOCS = 20; + final var NUM_SUCCESSOR_ITEMS = 3; + var executorService = Executors.newFixedThreadPool(NUM_DOCS); + try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "docCreatorWorker")) { + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + for (var i = 0; i < NUM_DOCS; ++i) { + final var docId = "R" + i; + workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); + } + Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + } + + final var seenWorkerItems = new ConcurrentHashMap(); + var allFutures = new ArrayList>(); + final var expiration = Duration.ofSeconds(5); + for (int i = 0; i < NUM_DOCS; ++i) { + int finalI = i; + allFutures.add( + CompletableFuture.supplyAsync( + () -> getWorkItemAndCompleteWithSuccessors(testContext, "successor_test_" + finalI, seenWorkerItems, expiration, true, NUM_SUCCESSOR_ITEMS), + executorService + ) + ); + } + CompletableFuture.allOf(allFutures.toArray(CompletableFuture[]::new)).join(); + Assertions.assertEquals(NUM_DOCS, seenWorkerItems.size()); + try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "checkResults")) { + Assertions.assertEquals(NUM_SUCCESSOR_ITEMS * NUM_DOCS, workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext)); + } + } + + @Test + @Tag("isolatedTest") + public void testAddSuccessorWorkItemsPartiallyCompleted() throws Exception { + // A partially completed successor item will have a `successor_items` field and _some_ of the successor work items will be created + // but not all. This tests that the coordinator handles this case correctly by continuing to make the originally specific successor items. + var testContext = WorkCoordinationTestContext.factory().withAllTracking(); + var docId = "R0"; + var N_SUCCESSOR_ITEMS = 3; + var successorItems = (ArrayList) IntStream.range(0, N_SUCCESSOR_ITEMS).mapToObj(i -> docId + "_successor_" + i).collect(Collectors.toList()); + + var originalWorkItemExpiration = Duration.ofSeconds(5); + final var seenWorkerItems = new ConcurrentHashMap(); + + try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "successorTest")) { + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); + Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + // Claim the work item + getWorkItemAndVerify(testContext, "successorTest", seenWorkerItems, originalWorkItemExpiration, false, false); + var client = httpClientSupplier.get(); + // Add the list of successors to the work item + var body = "{\"doc\": {\"successor_items\": \"" + String.join(",", successorItems) + "\"}}"; + var response = client.makeJsonRequest("POST", ".migrations_working_state/_update/" + docId, null, body); + Assertions.assertEquals(200, response.getStatusCode()); + // Create a successor item and then claim it with a long lease. + workCoordinator.createUnassignedWorkItem(successorItems.get(0), testContext::createUnassignedWorkContext); + // Now, we should be able to claim the first successor item with a different worker id + // We should NOT be able to claim the other successor items yet (since they haven't been created yet) or the original item + String workItemId = getWorkItemAndVerify(testContext, "claimSuccessorItem", seenWorkerItems, Duration.ofSeconds(600), false, true); + Assertions.assertEquals(successorItems.get(0), workItemId); // We need to ensure that the item we just claimed is the expected one. + + // Sleep for the remainder of the original work item's lease so that it becomes available. + Thread.sleep(originalWorkItemExpiration.toMillis() + 1000); + + // Okay, we're now in a state where the only document available is the original, incomplete one. + // We need to make sure that if we try to acquire this work item, it will jump into `createSuccessorWorkItemsAndMarkComplete`, + // which we can verify because it should be completed successfully and have created the two missing items. + // After cleaning up the original, acquireNewWorkItem will re-run to claim a valid work item (one of the newly created successor items). + var nextSuccessorWorkItem = getWorkItemAndVerify(testContext, "cleanupOriginalAndClaimNextSuccessor", seenWorkerItems, originalWorkItemExpiration, false, true); + Assertions.assertTrue(successorItems.contains(nextSuccessorWorkItem)); + // Now: the original work item is completed, the first successor item is completed (a few lines above) and the second successor is completed (immediately above) + Assertions.assertEquals(N_SUCCESSOR_ITEMS - 2, workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext)); + + // Now, we should be able to claim the remaining successor items but the _next_ call should fail because there are no available items + for (int i = 0; i < (N_SUCCESSOR_ITEMS - 2); i++) { + workItemId = getWorkItemAndVerify(testContext, "claimItem_" + i, seenWorkerItems, originalWorkItemExpiration, false, true); + Assertions.assertTrue(successorItems.contains(workItemId)); + } + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + Assertions.assertThrows(NoWorkToBeDoneException.class, () -> { + getWorkItemAndVerify(testContext, "finalClaimItem", seenWorkerItems, originalWorkItemExpiration, false, false); + }); + Assertions.assertEquals(N_SUCCESSOR_ITEMS + 1, seenWorkerItems.size()); + } + } + + + @Test + public void testAddSuccessorItemsFailsIfAlreadyDifferentSuccessorItems() throws Exception { + // A partially completed successor item will have a `successor_items` field and _some_ of the successor work items will be created + // but not all. This tests that the coordinator handles this case correctly by continuing to make the originally specific successor items. + var testContext = WorkCoordinationTestContext.factory().withAllTracking(); + var docId = "R0"; + var N_SUCCESSOR_ITEMS = 3; + var successorItems = (ArrayList) IntStream.range(0, N_SUCCESSOR_ITEMS).mapToObj(i -> docId + "_successor_" + i).collect(Collectors.toList()); + + var originalWorkItemExpiration = Duration.ofSeconds(5); + try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "successorTest")) { + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); + Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + // Claim the work item + getWorkItemAndVerify(testContext, "successorTest", new ConcurrentHashMap<>(), originalWorkItemExpiration, false, false); + var client = httpClientSupplier.get(); + // Add an INCORRECT list of successors to the work item + var incorrectSuccessors = "successor_99,successor_98,successor_97"; + var body = "{\"doc\": {\"successor_items\": \"" + incorrectSuccessors + "\"}}"; + var response = client.makeJsonRequest("POST", ".migrations_working_state/_update/" + docId, null, body); + var responseBody = (new ObjectMapper()).readTree(response.getPayloadBytes()); + Assertions.assertEquals(200, response.getStatusCode()); + + // Now attempt to go through with the correct successor item list + Assertions.assertThrows(IllegalStateException.class, + () -> workCoordinator.createSuccessorWorkItemsAndMarkComplete(docId, successorItems, testContext::createSuccessorWorkItemsContext)); + } + } + + // Create a test where a work item tries to create itself as a successor -- it should fail and NOT be marked as complete. Another worker should pick it up and double the lease time. + @Test + public void testCreatingSelfAsSuccessorWorkItemFails() throws Exception { + // A partially completed successor item will have a `successor_items` field and _some_ of the successor work items will be created + // but not all. This tests that the coordinator handles this case correctly by continuing to make the originally specific successor items. + var testContext = WorkCoordinationTestContext.factory().withAllTracking(); + var docId = "R0"; + var successorItems = new ArrayList<>(List.of("R0", "R1", "R2")); + + try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "successorTest")) { + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); + Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + // Claim the work item + getWorkItemAndVerify(testContext, "successorTest", new ConcurrentHashMap<>(), Duration.ofSeconds(5), false, false); + + // Now attempt to go through with the correct successor item list + Assertions.assertThrows(IllegalArgumentException.class, + () -> workCoordinator.createSuccessorWorkItemsAndMarkComplete(docId, successorItems, testContext::createSuccessorWorkItemsContext)); + } + } + + @SneakyThrows + private String getWorkItemAndCompleteWithSuccessors( + WorkCoordinationTestContext testContext, + String workerName, + ConcurrentHashMap seenWorkerItems, + Duration expirationWindow, + boolean placeFinishedDoc, + int numSuccessorItems + ) { + var workItemId = getWorkItemAndVerify( + testContext, + workerName, + seenWorkerItems, + expirationWindow, + placeFinishedDoc, + false + ); + ArrayList successorWorkItems = new ArrayList<>(); + for (int j = 0; j < numSuccessorItems; j++) { + successorWorkItems.add(workItemId + "_successor_" + j); + } + try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, workerName)) { + workCoordinator.createSuccessorWorkItemsAndMarkComplete( + workItemId, successorWorkItems, + testContext::createSuccessorWorkItemsContext + ); + } catch (Exception e) { + throw new RuntimeException(e); + } + return workItemId; + } + + + class NoWorkToBeDoneException extends Exception { + public NoWorkToBeDoneException() { + super(); + } + } + static AtomicInteger nonce = new AtomicInteger(); @SneakyThrows private String getWorkItemAndVerify( WorkCoordinationTestContext testContext, - String workerSuffix, + String workerName, ConcurrentHashMap seenWorkerItems, Duration expirationWindow, boolean placeFinishedDoc, @@ -194,8 +418,7 @@ private String getWorkItemAndVerify( try ( var workCoordinator = new OpenSearchWorkCoordinator( httpClientSupplier.get(), - 3600, - "firstPass_" + workerSuffix + 3600, workerName ) ) { var doneId = DUMMY_FINISHED_DOC_ID + "_" + nonce.incrementAndGet(); @@ -212,9 +435,10 @@ public String onAlreadyCompleted() throws IOException { throw new IllegalStateException(); } + @SneakyThrows @Override public String onNoAvailableWorkToBeDone() throws IOException { - throw new IllegalStateException(); + throw new NoWorkToBeDoneException(); } @Override @@ -245,4 +469,5 @@ public String onAcquiredWork(IWorkCoordinator.WorkItemAndDuration workItem) thro throw e; } } + }