From 48d1cba19ef3a2e1fdf7d8da3ab6d9d2ae52364e Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Thu, 23 Mar 2023 15:35:02 -0400 Subject: [PATCH 1/2] Make .opendistro-job-scheduler-lock a System Index Signed-off-by: Craig Perkins --- .github/workflows/ci.yml | 1 + .../jobscheduler/spi/utils/LockService.java | 116 +++++++++++------- 2 files changed, 70 insertions(+), 47 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 306f8a47..34a0b467 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,6 +10,7 @@ on: jobs: build: strategy: + fail-fast: false matrix: os: [ubuntu-latest, windows-latest] java: [11, 17] diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java index 96a0b87b..bb45ce3d 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java @@ -8,6 +8,7 @@ */ package org.opensearch.jobscheduler.spi.utils; +import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.jobscheduler.spi.JobExecutionContext; import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; @@ -44,7 +45,7 @@ public final class LockService { private static final Logger logger = LogManager.getLogger(LockService.class); - private static final String LOCK_INDEX_NAME = ".opendistro-job-scheduler-lock"; + public static final String LOCK_INDEX_NAME = ".opendistro-job-scheduler-lock"; private final Client client; private final ClusterService clusterService; @@ -77,20 +78,25 @@ public boolean lockIndexExist() { @VisibleForTesting void createLockIndex(ActionListener listener) { - if (lockIndexExist()) { - listener.onResponse(true); - } else { - final CreateIndexRequest request = new CreateIndexRequest(LOCK_INDEX_NAME).mapping(lockMapping()); - client.admin() - .indices() - .create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> { - if (exception instanceof ResourceAlreadyExistsException - || exception.getCause() instanceof ResourceAlreadyExistsException) { - listener.onResponse(true); - } else { - listener.onFailure(exception); - } - })); + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { + if (lockIndexExist()) { + listener.onResponse(true); + } else { + final CreateIndexRequest request = new CreateIndexRequest(LOCK_INDEX_NAME).mapping(lockMapping()); + client.admin() + .indices() + .create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> { + if (exception instanceof ResourceAlreadyExistsException + || exception.getCause() instanceof ResourceAlreadyExistsException) { + listener.onResponse(true); + } else { + listener.onFailure(exception); + } + })); + } + } catch (Exception e) { + logger.error(e); + listener.onFailure(e); } } @@ -180,7 +186,7 @@ private boolean isLockReleasedOrExpired(final LockModel lock) { } private void updateLock(final LockModel updateLock, ActionListener listener) { - try { + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { UpdateRequest updateRequest = new UpdateRequest().index(LOCK_INDEX_NAME) .id(updateLock.getLockId()) .setIfSeqNo(updateLock.getSeqNo()) @@ -212,11 +218,14 @@ private void updateLock(final LockModel updateLock, ActionListener li } catch (IOException e) { logger.error("IOException occurred updating lock.", e); listener.onResponse(null); + } catch (Exception e) { + logger.error(e); + listener.onFailure(e); } } private void createLock(final LockModel tempLock, ActionListener listener) { - try { + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { final IndexRequest request = new IndexRequest(LOCK_INDEX_NAME).id(tempLock.getLockId()) .source(tempLock.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) .setIfSeqNo(SequenceNumbers.UNASSIGNED_SEQ_NO) @@ -240,29 +249,37 @@ private void createLock(final LockModel tempLock, ActionListener list } catch (IOException e) { logger.error("IOException occurred creating lock", e); listener.onResponse(null); + } catch (Exception e) { + logger.error(e); + listener.onFailure(e); } } public void findLock(final String lockId, ActionListener listener) { - GetRequest getRequest = new GetRequest(LOCK_INDEX_NAME).id(lockId); - client.get(getRequest, ActionListener.wrap(response -> { - if (!response.isExists()) { - listener.onResponse(null); - } else { - try { - XContentParser parser = XContentType.JSON.xContent() - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString()); - parser.nextToken(); - listener.onResponse(LockModel.parse(parser, response.getSeqNo(), response.getPrimaryTerm())); - } catch (IOException e) { - logger.error("IOException occurred finding lock", e); + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { + GetRequest getRequest = new GetRequest(LOCK_INDEX_NAME).id(lockId); + client.get(getRequest, ActionListener.wrap(response -> { + if (!response.isExists()) { listener.onResponse(null); + } else { + try { + XContentParser parser = XContentType.JSON.xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString()); + parser.nextToken(); + listener.onResponse(LockModel.parse(parser, response.getSeqNo(), response.getPrimaryTerm())); + } catch (IOException e) { + logger.error("IOException occurred finding lock", e); + listener.onResponse(null); + } } - } - }, exception -> { - logger.error("Exception occurred finding lock", exception); - listener.onFailure(exception); - })); + }, exception -> { + logger.error("Exception occurred finding lock", exception); + listener.onFailure(exception); + })); + } catch (Exception e) { + logger.error(e); + listener.onFailure(e); + } } /** @@ -294,19 +311,24 @@ public void release(final LockModel lock, ActionListener listener) { * or not the delete was successful */ public void deleteLock(final String lockId, ActionListener listener) { - DeleteRequest deleteRequest = new DeleteRequest(LOCK_INDEX_NAME).id(lockId); - client.delete(deleteRequest, ActionListener.wrap(response -> { - listener.onResponse( - response.getResult() == DocWriteResponse.Result.DELETED || response.getResult() == DocWriteResponse.Result.NOT_FOUND - ); - }, exception -> { - if (exception instanceof IndexNotFoundException || exception.getCause() instanceof IndexNotFoundException) { - logger.debug("Index is not found to delete lock. {}", exception.getMessage()); - listener.onResponse(true); - } else { - listener.onFailure(exception); - } - })); + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { + DeleteRequest deleteRequest = new DeleteRequest(LOCK_INDEX_NAME).id(lockId); + client.delete(deleteRequest, ActionListener.wrap(response -> { + listener.onResponse( + response.getResult() == DocWriteResponse.Result.DELETED || response.getResult() == DocWriteResponse.Result.NOT_FOUND + ); + }, exception -> { + if (exception instanceof IndexNotFoundException || exception.getCause() instanceof IndexNotFoundException) { + logger.debug("Index is not found to delete lock. {}", exception.getMessage()); + listener.onResponse(true); + } else { + listener.onFailure(exception); + } + })); + } catch (Exception e) { + logger.error(e); + listener.onFailure(e); + } } /** From 21f3bb8ef2040ef2b8c7b949add25f0d4d95b7ff Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Thu, 23 Mar 2023 15:35:59 -0400 Subject: [PATCH 2/2] Switch back to private Signed-off-by: Craig Perkins --- .../java/org/opensearch/jobscheduler/spi/utils/LockService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java index bb45ce3d..89a73376 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java @@ -45,7 +45,7 @@ public final class LockService { private static final Logger logger = LogManager.getLogger(LockService.class); - public static final String LOCK_INDEX_NAME = ".opendistro-job-scheduler-lock"; + private static final String LOCK_INDEX_NAME = ".opendistro-job-scheduler-lock"; private final Client client; private final ClusterService clusterService;