Skip to content

Commit

Permalink
Wrap interactions with .opendistro-job-scheduler-lock in ThreadContex…
Browse files Browse the repository at this point in the history
…t.stashContext to ensure JS can read and write to the index (#347)

* Make .opendistro-job-scheduler-lock a System Index

Signed-off-by: Craig Perkins <[email protected]>

* Switch back to private

Signed-off-by: Craig Perkins <[email protected]>

---------

Signed-off-by: Craig Perkins <[email protected]>
(cherry picked from commit eb506e2)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Jul 2, 2024
1 parent 7baa9da commit 1b03dbe
Showing 1 changed file with 65 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package org.opensearch.jobscheduler.spi.utils;

import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
Expand Down Expand Up @@ -77,20 +78,25 @@ public boolean lockIndexExist() {

@VisibleForTesting
void createLockIndex(ActionListener<Boolean> listener) {
if (lockIndexExist()) {
listener.onResponse(true);
} else {
final CreateIndexRequest request = new CreateIndexRequest(LOCK_INDEX_NAME).mapping(lockMapping());
client.admin()
.indices()
.create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> {
if (exception instanceof ResourceAlreadyExistsException
|| exception.getCause() instanceof ResourceAlreadyExistsException) {
listener.onResponse(true);
} else {
listener.onFailure(exception);
}
}));
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
if (lockIndexExist()) {
listener.onResponse(true);
} else {
final CreateIndexRequest request = new CreateIndexRequest(LOCK_INDEX_NAME).mapping(lockMapping());
client.admin()
.indices()
.create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> {
if (exception instanceof ResourceAlreadyExistsException
|| exception.getCause() instanceof ResourceAlreadyExistsException) {
listener.onResponse(true);
} else {
listener.onFailure(exception);
}
}));
}
} catch (Exception e) {
logger.error(e);
listener.onFailure(e);
}
}

Expand Down Expand Up @@ -180,7 +186,7 @@ private boolean isLockReleasedOrExpired(final LockModel lock) {
}

private void updateLock(final LockModel updateLock, ActionListener<LockModel> listener) {
try {
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
UpdateRequest updateRequest = new UpdateRequest().index(LOCK_INDEX_NAME)
.id(updateLock.getLockId())
.setIfSeqNo(updateLock.getSeqNo())
Expand Down Expand Up @@ -212,11 +218,14 @@ private void updateLock(final LockModel updateLock, ActionListener<LockModel> li
} catch (IOException e) {
logger.error("IOException occurred updating lock.", e);
listener.onResponse(null);
} catch (Exception e) {
logger.error(e);
listener.onFailure(e);
}
}

private void createLock(final LockModel tempLock, ActionListener<LockModel> listener) {
try {
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
final IndexRequest request = new IndexRequest(LOCK_INDEX_NAME).id(tempLock.getLockId())
.source(tempLock.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.setIfSeqNo(SequenceNumbers.UNASSIGNED_SEQ_NO)
Expand All @@ -243,25 +252,30 @@ private void createLock(final LockModel tempLock, ActionListener<LockModel> list
}

public void findLock(final String lockId, ActionListener<LockModel> listener) {
GetRequest getRequest = new GetRequest(LOCK_INDEX_NAME).id(lockId);
client.get(getRequest, ActionListener.wrap(response -> {
if (!response.isExists()) {
listener.onResponse(null);
} else {
try {
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString());
parser.nextToken();
listener.onResponse(LockModel.parse(parser, response.getSeqNo(), response.getPrimaryTerm()));
} catch (IOException e) {
logger.error("IOException occurred finding lock", e);
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
GetRequest getRequest = new GetRequest(LOCK_INDEX_NAME).id(lockId);
client.get(getRequest, ActionListener.wrap(response -> {
if (!response.isExists()) {
listener.onResponse(null);
} else {
try {
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString());
parser.nextToken();
listener.onResponse(LockModel.parse(parser, response.getSeqNo(), response.getPrimaryTerm()));
} catch (IOException e) {
logger.error("IOException occurred finding lock", e);
listener.onResponse(null);
}
}
}
}, exception -> {
logger.error("Exception occurred finding lock", exception);
listener.onFailure(exception);
}));
}, exception -> {
logger.error("Exception occurred finding lock", exception);
listener.onFailure(exception);
}));
} catch (Exception e) {
logger.error(e);
listener.onFailure(e);
}
}

/**
Expand Down Expand Up @@ -293,19 +307,24 @@ public void release(final LockModel lock, ActionListener<Boolean> listener) {
* or not the delete was successful
*/
public void deleteLock(final String lockId, ActionListener<Boolean> listener) {
DeleteRequest deleteRequest = new DeleteRequest(LOCK_INDEX_NAME).id(lockId);
client.delete(deleteRequest, ActionListener.wrap(response -> {
listener.onResponse(
response.getResult() == DocWriteResponse.Result.DELETED || response.getResult() == DocWriteResponse.Result.NOT_FOUND
);
}, exception -> {
if (exception instanceof IndexNotFoundException || exception.getCause() instanceof IndexNotFoundException) {
logger.debug("Index is not found to delete lock. {}", exception.getMessage());
listener.onResponse(true);
} else {
listener.onFailure(exception);
}
}));
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
DeleteRequest deleteRequest = new DeleteRequest(LOCK_INDEX_NAME).id(lockId);
client.delete(deleteRequest, ActionListener.wrap(response -> {
listener.onResponse(
response.getResult() == DocWriteResponse.Result.DELETED || response.getResult() == DocWriteResponse.Result.NOT_FOUND
);
}, exception -> {
if (exception instanceof IndexNotFoundException || exception.getCause() instanceof IndexNotFoundException) {
logger.debug("Index is not found to delete lock. {}", exception.getMessage());
listener.onResponse(true);
} else {
listener.onFailure(exception);
}
}));
} catch (Exception e) {
logger.error(e);
listener.onFailure(e);
}
}

/**
Expand Down

0 comments on commit 1b03dbe

Please sign in to comment.