Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make MAX_WORKERS Env Var. #4687

Merged
merged 9 commits into from
Jul 19, 2021
Merged
2 changes: 2 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ LOCAL_DOCKER_MOUNT=/tmp/airbyte_local
# Issue: https://github.com/airbytehq/airbyte/issues/577
HACK_LOCAL_ROOT_PARENT=/tmp

SUBMITTER_NUM_THREADS=10

# Miscellaneous
TRACKING_STRATEGY=segment
WEBAPP_URL=http://localhost:8000/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public interface Configs {

String getKubeNamespace();

String getSubmitterNumThreads();

// Resources
String getCpuRequest();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class EnvConfigs implements Configs {
private static final String TEMPORAL_HOST = "TEMPORAL_HOST";
private static final String TEMPORAL_WORKER_PORTS = "TEMPORAL_WORKER_PORTS";
private static final String KUBE_NAMESPACE = "KUBE_NAMESPACE";
private static final String SUBMITTER_NUM_THREADS = "SUBMITTER_NUM_THREADS";
private static final String RESOURCE_CPU_REQUEST = "RESOURCE_CPU_REQUEST";
private static final String RESOURCE_CPU_LIMIT = "RESOURCE_CPU_LIMIT";
private static final String RESOURCE_MEMORY_REQUEST = "RESOURCE_MEMORY_REQUEST";
Expand Down Expand Up @@ -190,6 +191,11 @@ public String getKubeNamespace() {
return getEnvOrDefault(KUBE_NAMESPACE, DEFAULT_KUBE_NAMESPACE);
}

@Override
public String getSubmitterNumThreads() {
return getEnvOrDefault(SUBMITTER_NUM_THREADS, "5");
}

@Override
public String getCpuRequest() {
return getEnvOrDefault(RESOURCE_CPU_REQUEST, DEFAULT_RESOURCE_REQUIREMENT_CPU);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,20 @@

/**
* The SchedulerApp is responsible for finding new scheduled jobs that need to be run and to launch
* them. The current implementation uses a thread pool on the scheduler's machine to launch the
* jobs. One thread is reserved for the job submitter, which is responsible for finding and
* launching new jobs.
* them. The current implementation uses two thread pools to do so. One pool is responsible for all
* job launching operations. The other pool is responsible for clean up operations.
*
* Operations can have thread pools under the hood. An important thread pool to note is that the job
* submitter thread pool. This pool does the work of submitting jobs to temporal - the size of this
* pool determines the number of concurrent jobs that can be run. This is controlled via the
* {@link #SUBMITTER_NUM_THREADS} variable.
*/
public class SchedulerApp {

private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerApp.class);

private static final long GRACEFUL_SHUTDOWN_SECONDS = 30;
private static final int MAX_WORKERS = 4;
private static final int SUBMITTER_NUM_THREADS = Integer.parseInt(new EnvConfigs().getSubmitterNumThreads());
private static final Duration SCHEDULING_DELAY = Duration.ofSeconds(5);
private static final Duration CLEANING_DELAY = Duration.ofHours(2);
private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("worker-%d").build();
Expand Down Expand Up @@ -121,7 +125,7 @@ public void start() throws IOException {
final TemporalPool temporalPool = new TemporalPool(temporalService, workspaceRoot, processFactory);
temporalPool.run();

final ExecutorService workerThreadPool = Executors.newFixedThreadPool(MAX_WORKERS, THREAD_FACTORY);
final ExecutorService workerThreadPool = Executors.newFixedThreadPool(SUBMITTER_NUM_THREADS, THREAD_FACTORY);
final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor();
final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot);
final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now, jobNotifier);
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ services:
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- GCP_STORAGE_BUCKET=${GCP_STORAGE_BUCKET}
- LOG_LEVEL=${LOG_LEVEL}
- SUBMITTER_NUM_THREADS=${SUBMITTER_NUM_THREADS}
- RESOURCE_CPU_REQUEST=${RESOURCE_CPU_REQUEST}
- RESOURCE_CPU_LIMIT=${RESOURCE_CPU_LIMIT}
- RESOURCE_MEMORY_REQUEST=${RESOURCE_MEMORY_REQUEST}
Expand Down
2 changes: 2 additions & 0 deletions kube/overlays/dev/.env
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ WORKSPACE_DOCKER_MOUNT=airbyte_workspace

LOCAL_ROOT=/tmp/airbyte_local

SUBMITTER_NUM_THREADS=10

# Miscellaneous
TRACKING_STRATEGY=logging
WEBAPP_URL=airbyte-webapp-svc:80
Expand Down
2 changes: 2 additions & 0 deletions kube/overlays/stable-with-resource-limits/.env
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ WORKSPACE_DOCKER_MOUNT=airbyte_workspace

LOCAL_ROOT=/tmp/airbyte_local

SUBMITTER_NUM_THREADS=10

# Miscellaneous
TRACKING_STRATEGY=segment
WEBAPP_URL=airbyte-webapp-svc:80
Expand Down
2 changes: 2 additions & 0 deletions kube/overlays/stable/.env
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ WORKSPACE_DOCKER_MOUNT=airbyte_workspace

LOCAL_ROOT=/tmp/airbyte_local

SUBMITTER_NUM_THREADS=10

# Miscellaneous
TRACKING_STRATEGY=segment
WEBAPP_URL=airbyte-webapp-svc:80
Expand Down
5 changes: 5 additions & 0 deletions kube/resources/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: SUBMITTER_NUM_THREADS
valueFrom:
configMapKeyRef:
name: airbyte-env
key: SUBMITTER_NUM_THREADS
- name: RESOURCE_CPU_REQUEST
valueFrom:
configMapKeyRef:
Expand Down