diff --git a/.env b/.env index c4f9035183a6e..2880bce5270ec 100644 --- a/.env +++ b/.env @@ -32,6 +32,8 @@ LOCAL_DOCKER_MOUNT=/tmp/airbyte_local # Issue: https://github.com/airbytehq/airbyte/issues/577 HACK_LOCAL_ROOT_PARENT=/tmp +SUBMITTER_NUM_THREADS=10 + # Miscellaneous TRACKING_STRATEGY=segment WEBAPP_URL=http://localhost:8000/ diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java index 64d11d9efeb1d..96729307f6d8a 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java @@ -67,6 +67,8 @@ public interface Configs { String getKubeNamespace(); + String getSubmitterNumThreads(); + // Resources String getCpuRequest(); diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index cc50f119db592..b8700c7c39876 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -59,6 +59,7 @@ public class EnvConfigs implements Configs { private static final String TEMPORAL_HOST = "TEMPORAL_HOST"; private static final String TEMPORAL_WORKER_PORTS = "TEMPORAL_WORKER_PORTS"; private static final String KUBE_NAMESPACE = "KUBE_NAMESPACE"; + private static final String SUBMITTER_NUM_THREADS = "SUBMITTER_NUM_THREADS"; private static final String RESOURCE_CPU_REQUEST = "RESOURCE_CPU_REQUEST"; private static final String RESOURCE_CPU_LIMIT = "RESOURCE_CPU_LIMIT"; private static final String RESOURCE_MEMORY_REQUEST = "RESOURCE_MEMORY_REQUEST"; @@ -190,6 +191,11 @@ public String getKubeNamespace() { return getEnvOrDefault(KUBE_NAMESPACE, DEFAULT_KUBE_NAMESPACE); } + @Override + public String getSubmitterNumThreads() { + return getEnvOrDefault(SUBMITTER_NUM_THREADS, "5"); + } + @Override public String getCpuRequest() { return getEnvOrDefault(RESOURCE_CPU_REQUEST, DEFAULT_RESOURCE_REQUIREMENT_CPU); diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index e9b9fdceea8bb..47bc19cf9d13d 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -75,16 +75,20 @@ /** * The SchedulerApp is responsible for finding new scheduled jobs that need to be run and to launch - * them. The current implementation uses a thread pool on the scheduler's machine to launch the - * jobs. One thread is reserved for the job submitter, which is responsible for finding and - * launching new jobs. + * them. The current implementation uses two thread pools to do so. One pool is responsible for all + * job launching operations. The other pool is responsible for clean up operations. + * + * Operations can have thread pools under the hood. An important thread pool to note is that the job + * submitter thread pool. This pool does the work of submitting jobs to temporal - the size of this + * pool determines the number of concurrent jobs that can be run. This is controlled via the + * {@link #SUBMITTER_NUM_THREADS} variable. */ public class SchedulerApp { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerApp.class); private static final long GRACEFUL_SHUTDOWN_SECONDS = 30; - private static final int MAX_WORKERS = 4; + private static final int SUBMITTER_NUM_THREADS = Integer.parseInt(new EnvConfigs().getSubmitterNumThreads()); private static final Duration SCHEDULING_DELAY = Duration.ofSeconds(5); private static final Duration CLEANING_DELAY = Duration.ofHours(2); private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("worker-%d").build(); @@ -121,7 +125,7 @@ public void start() throws IOException { final TemporalPool temporalPool = new TemporalPool(temporalService, workspaceRoot, processFactory); temporalPool.run(); - final ExecutorService workerThreadPool = Executors.newFixedThreadPool(MAX_WORKERS, THREAD_FACTORY); + final ExecutorService workerThreadPool = Executors.newFixedThreadPool(SUBMITTER_NUM_THREADS, THREAD_FACTORY); final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor(); final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot); final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now, jobNotifier); diff --git a/docker-compose.yaml b/docker-compose.yaml index fbd8f7a3b809f..cab388fad2f5e 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -60,6 +60,7 @@ services: - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} - GCP_STORAGE_BUCKET=${GCP_STORAGE_BUCKET} - LOG_LEVEL=${LOG_LEVEL} + - SUBMITTER_NUM_THREADS=${SUBMITTER_NUM_THREADS} - RESOURCE_CPU_REQUEST=${RESOURCE_CPU_REQUEST} - RESOURCE_CPU_LIMIT=${RESOURCE_CPU_LIMIT} - RESOURCE_MEMORY_REQUEST=${RESOURCE_MEMORY_REQUEST} diff --git a/kube/overlays/dev/.env b/kube/overlays/dev/.env index a6d1de033b944..6b77c81e531b8 100644 --- a/kube/overlays/dev/.env +++ b/kube/overlays/dev/.env @@ -24,6 +24,8 @@ WORKSPACE_DOCKER_MOUNT=airbyte_workspace LOCAL_ROOT=/tmp/airbyte_local +SUBMITTER_NUM_THREADS=10 + # Miscellaneous TRACKING_STRATEGY=logging WEBAPP_URL=airbyte-webapp-svc:80 diff --git a/kube/overlays/stable-with-resource-limits/.env b/kube/overlays/stable-with-resource-limits/.env index b410fa8fbdcea..fea24ed7be8e8 100644 --- a/kube/overlays/stable-with-resource-limits/.env +++ b/kube/overlays/stable-with-resource-limits/.env @@ -24,6 +24,8 @@ WORKSPACE_DOCKER_MOUNT=airbyte_workspace LOCAL_ROOT=/tmp/airbyte_local +SUBMITTER_NUM_THREADS=10 + # Miscellaneous TRACKING_STRATEGY=segment WEBAPP_URL=airbyte-webapp-svc:80 diff --git a/kube/overlays/stable/.env b/kube/overlays/stable/.env index b410fa8fbdcea..fea24ed7be8e8 100644 --- a/kube/overlays/stable/.env +++ b/kube/overlays/stable/.env @@ -24,6 +24,8 @@ WORKSPACE_DOCKER_MOUNT=airbyte_workspace LOCAL_ROOT=/tmp/airbyte_local +SUBMITTER_NUM_THREADS=10 + # Miscellaneous TRACKING_STRATEGY=segment WEBAPP_URL=airbyte-webapp-svc:80 diff --git a/kube/resources/scheduler.yaml b/kube/resources/scheduler.yaml index 1d1e752574c7c..5c80f1f6a3fa6 100644 --- a/kube/resources/scheduler.yaml +++ b/kube/resources/scheduler.yaml @@ -99,6 +99,11 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace + - name: SUBMITTER_NUM_THREADS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: SUBMITTER_NUM_THREADS - name: RESOURCE_CPU_REQUEST valueFrom: configMapKeyRef: