Skip to content

Commit

Permalink
Add API to get scheduled job info by job type
Browse files Browse the repository at this point in the history
Signed-off-by: Craig Perkins <[email protected]>
  • Loading branch information
cwperks committed Nov 8, 2024
1 parent c5ce0c8 commit 7b58964
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@
*/
package org.opensearch.jobscheduler.spi;

import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.jobscheduler.spi.schedule.Schedule;
import org.opensearch.core.xcontent.ToXContentObject;

import java.io.IOException;
import java.time.Instant;

/**
* Job parameters that being used by the JobScheduler.
*/
public interface ScheduledJobParameter extends ToXContentObject {
public interface ScheduledJobParameter extends ToXContentObject, Writeable {
/**
* @return job name.
*/
Expand Down Expand Up @@ -67,4 +70,15 @@ default Long getLockDurationSeconds() {
default Double getJitter() {
return null;
}

@Override
default void writeTo(StreamOutput out) throws IOException {
out.writeString(getName());
out.writeInstant(getLastUpdateTime());
out.writeInstant(getEnabledTime());
out.writeOptionalWriteable(getSchedule());
out.writeBoolean(isEnabled());
out.writeLong(getLockDurationSeconds());
out.writeDouble(getJitter());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.jobscheduler;

import org.opensearch.action.ActionRequest;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -45,6 +46,7 @@
import org.opensearch.jobscheduler.transport.schedule.TransportGetScheduleAction;
import org.opensearch.jobscheduler.utils.JobDetailsService;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ClusterPlugin;
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SystemIndexPlugin;
Expand All @@ -69,7 +71,7 @@

import com.google.common.collect.ImmutableList;

public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin, SystemIndexPlugin {
public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin, SystemIndexPlugin, ClusterPlugin {

public static final String OPEN_DISTRO_JOB_SCHEDULER_THREAD_POOL_NAME = "open_distro_job_scheduler";
public static final String JS_BASE_URI = "/_plugins/_job_scheduler";
Expand Down Expand Up @@ -134,8 +136,6 @@ public Collection<Object> createComponents(
ScheduledJobProvider provider = this.indexToJobProviders.get(jobIndex);
this.scheduler.getScheduledJobInfo().putJobTypeToIndex(provider.getJobType(), jobIndex);
}
System.out.println("indexToJobProviders: " + indexToJobProviders);
System.out.println("jobTypeToIndex: " + this.scheduler.getScheduledJobInfo().getJobTypeToIndexMap());
clusterService.addListener(this.sweeper);
clusterService.addLifecycleListener(this.sweeper);

Expand Down Expand Up @@ -268,4 +268,9 @@ public List getRestHandlers(
return Arrays.asList(new ActionHandler<>(GetScheduleAction.INSTANCE, TransportGetScheduleAction.class));
}

@Override
public void onNodeStarted(DiscoveryNode localNode) {
sweeper.afterStart();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@
*/
package org.opensearch.jobscheduler.scheduler;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.threadpool.Scheduler;

import java.io.IOException;
import java.time.Instant;

class JobSchedulingInfo {
public class JobSchedulingInfo implements Writeable, ToXContentFragment {

private String indexName;
private String jobId;
Expand All @@ -30,6 +36,12 @@ class JobSchedulingInfo {
this.jobParameter = jobParameter;
}

public JobSchedulingInfo(StreamInput in) throws IOException {
this.indexName = in.readString();
this.jobId = in.readString();
this.descheduled = in.readBoolean();
}

public String getIndexName() {
return indexName;
}
Expand Down Expand Up @@ -82,4 +94,27 @@ public void setScheduledCancellable(Scheduler.ScheduledCancellable scheduledCanc
this.scheduledCancellable = scheduledCancellable;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(this.indexName);
out.writeString(this.jobId);
out.writeOptionalWriteable(this.jobParameter);
out.writeBoolean(this.descheduled);
out.writeInstant(this.actualPreviousExecutionTime);
out.writeInstant(this.expectedPreviousExecutionTime);
out.writeInstant(this.expectedExecutionTime);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("index_name", this.indexName);
builder.field("job_id", this.jobId);
builder.field("job_parameter", this.jobParameter);
builder.field("descheduled", this.descheduled);
builder.field("actual_previous_execution_time", this.actualPreviousExecutionTime);
builder.field("expected_previous_execution_time", this.expectedPreviousExecutionTime);
builder.field("expected_execution_time", this.expectedExecutionTime);
return builder.endObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,17 @@ void initBackgroundSweep() {
}

Runnable scheduledSweep = () -> {
log.info("Running full sweep");
TimeValue elapsedTime = getFullSweepElapsedTime();
long delta = this.sweepPeriod.millis() - elapsedTime.millis();
if (delta < 20L) {
this.fullSweepExecutor.submit(this::sweepAllJobIndices);
}
};
Runnable sweepOnStartup = () -> { this.fullSweepExecutor.submit(this::sweepAllJobIndices); };
// Sweep on startup
if (this.threadPool.generic() != null) {
this.threadPool.generic().submit(sweepOnStartup);
}
this.scheduledFullSweep = this.threadPool.scheduleWithFixedDelay(scheduledSweep, sweepPeriod, ThreadPool.Names.SAME);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.jobscheduler.scheduler.JobSchedulingInfo;

import java.io.IOException;
import java.util.Map;
Expand All @@ -22,27 +23,25 @@
*/
public class GetScheduleResponse extends ActionResponse implements ToXContentObject {

private Map<String, String> jobTypeToIndexMap;
private Map<String, JobSchedulingInfo> jobIdToInfoMap;

public GetScheduleResponse(Map<String, String> jobTypeToIndexMap) {
this.jobTypeToIndexMap = jobTypeToIndexMap;
public GetScheduleResponse(Map<String, JobSchedulingInfo> jobIdToInfoMap) {
this.jobIdToInfoMap = jobIdToInfoMap;
}

public GetScheduleResponse(StreamInput in) throws IOException {
super(in);
jobTypeToIndexMap = in.readMap(StreamInput::readString, StreamInput::readString);
jobIdToInfoMap = in.readMap(StreamInput::readString, JobSchedulingInfo::new);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("job_type_to_index_map", jobTypeToIndexMap);
builder.endObject();
builder.map(jobIdToInfoMap);
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(jobTypeToIndexMap, StreamOutput::writeString, StreamOutput::writeString);
out.writeMap(jobIdToInfoMap, StreamOutput::writeString, (o, s) -> s.writeTo(o));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.jobscheduler.scheduler.JobSchedulingInfo;
import org.opensearch.jobscheduler.scheduler.ScheduledJobInfo;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.util.Map;

public class TransportGetScheduleAction extends HandledTransportAction<GetScheduleRequest, GetScheduleResponse> {

private final Client client;
Expand All @@ -40,8 +43,8 @@ public TransportGetScheduleAction(

@Override
protected void doExecute(Task task, GetScheduleRequest request, ActionListener<GetScheduleResponse> actionListener) {
System.out.println("ScheduledJobInfo: " + scheduledJobInfo);
System.out.println("jobTypeToIndex: " + scheduledJobInfo.getJobTypeToIndexMap());
actionListener.onResponse(new GetScheduleResponse(scheduledJobInfo.getJobTypeToIndexMap()));
String jobIndex = scheduledJobInfo.getJobTypeToIndexMap().get(request.getJobType());
Map<String, JobSchedulingInfo> jobIdToInfo = scheduledJobInfo.getJobsByIndex(jobIndex);
actionListener.onResponse(new GetScheduleResponse(jobIdToInfo));
}
}

0 comments on commit 7b58964

Please sign in to comment.