diff --git a/FetchMigration/python/progress_metrics.py b/FetchMigration/python/progress_metrics.py index 367ba9d49..b01d8a8df 100644 --- a/FetchMigration/python/progress_metrics.py +++ b/FetchMigration/python/progress_metrics.py @@ -95,7 +95,8 @@ def update_records_in_flight_count(self, rec_in_flight: Optional[int]): self.__record_value(self._REC_IN_FLIGHT_KEY, rec_in_flight) def update_no_partitions_count(self, no_part_count: Optional[int]): - self.__record_value(self._NO_PART_KEY, no_part_count) + if no_part_count and no_part_count > 0: + self.__record_value(self._NO_PART_KEY, no_part_count) def get_doc_completion_percentage(self) -> int: success_doc_count = self.__get_current_value(self._SUCCESS_DOCS_KEY) diff --git a/deployment/cdk/opensearch-service-migration/dp_pipeline_template.yaml b/deployment/cdk/opensearch-service-migration/dp_pipeline_template.yaml index e96560117..3e8f704d0 100644 --- a/deployment/cdk/opensearch-service-migration/dp_pipeline_template.yaml +++ b/deployment/cdk/opensearch-service-migration/dp_pipeline_template.yaml @@ -34,7 +34,11 @@ historical-data-migration: connection: insecure: true # Additional pipeline options/optimizations + buffer: + bounded_blocking: # Values recommended by an expert + buffer_size: 1000000 + batch_size: 12500 # For maximum throughput, match workers to number of vCPUs (default: 1) - workers: 1 + workers: 2 # delay is how often the worker threads should process data (default: 3000 ms) delay: 0 diff --git a/deployment/cdk/opensearch-service-migration/lib/fetch-migration-stack.ts b/deployment/cdk/opensearch-service-migration/lib/fetch-migration-stack.ts index ad4c94b36..dd712543d 100644 --- a/deployment/cdk/opensearch-service-migration/lib/fetch-migration-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/fetch-migration-stack.ts @@ -50,8 +50,8 @@ export class FetchMigrationStack extends Stack { // ECS Task Definition const fetchMigrationFargateTask = new FargateTaskDefinition(this, "fetchMigrationFargateTask", { family: `migration-${props.stage}-${serviceName}`, - memoryLimitMiB: 4096, - cpu: 1024, + memoryLimitMiB: 8192, + cpu: 2048, taskRole: ecsTaskRole }); diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-service-core.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-service-core.ts index 1b60e79cc..c419609dc 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-service-core.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-service-core.ts @@ -86,6 +86,7 @@ export class MigrationServiceCore extends Stack { props.taskRolePolicies?.forEach(policy => serviceTaskRole.addToPolicy(policy)) const serviceTaskDef = new FargateTaskDefinition(this, "ServiceTaskDef", { + ephemeralStorageGiB: 75, family: `migration-${props.stage}-${props.serviceName}`, memoryLimitMiB: props.taskMemoryLimitMiB ? props.taskMemoryLimitMiB : 1024, cpu: props.taskCpuUnits ? props.taskCpuUnits : 256,