Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run Engine 2.0 (alpha) #1575

Open
wants to merge 467 commits into
base: main
Choose a base branch
from
Open

Run Engine 2.0 (alpha) #1575

wants to merge 467 commits into from

Conversation

nicktrn
Copy link
Collaborator

@nicktrn nicktrn commented Dec 17, 2024

Good luck @coderabbitai

Summary by CodeRabbit

  • New Features

    • Launched an alpha version of the new Run Engine with enhanced run processing and event handling.
    • Introduced updated UI elements including a new task-cached icon and keyboard shortcuts (e.g., “C” to cancel a run) along with added display details like worker type.
    • Added new configuration options for Valkey and Run Engine 2.0 parameters.
    • Introduced a new class for managing developer presence in the environment.
    • Added a new React functional component for the Task Cached Icon.
    • Introduced new models and fields in the database schema to support worker instances and task execution tracking.
    • Added new asynchronous functions to manage concurrency limits for MARQS and the RunQueue.
    • Enhanced the RunEngine class with new event handling capabilities and structured error management for task runs.
    • Added a new API route for creating waitpoints, enhancing the management of task execution.
  • Improvements

    • Enhanced error messaging and validation for run and batch identifiers.
    • Upgraded dependencies and configuration settings to improve overall stability and performance.
    • Improved handling of concurrency limits for environments and queues.
    • Enhanced task triggering functionality with robust error handling and idempotency support.
    • Streamlined the process for managing worker deployments and background workers.
    • Improved Redis container management and connection handling processes.
    • Enhanced the testing framework for the RunEngine by validating its dequeuing capabilities under various conditions.
    • Improved the structure and clarity of error handling in various services and APIs.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
apps/webapp/app/v3/services/triggerTaskV2.server.ts (4)

35-36: Enhance the deprecation notice.

The deprecation notice should provide more context about migration path and timeline.

-/** @deprecated Use TriggerTaskService in `triggerTask.server.ts` instead. */
+/**
+ * @deprecated This class is deprecated and will be removed in the next major version.
+ * Please migrate to TriggerTaskService in `triggerTask.server.ts` for improved functionality
+ * and better performance. See the migration guide for details.
+ */

37-439: Consider breaking down the call method into smaller, focused methods.

The method is quite long (400+ lines) and handles multiple responsibilities including idempotency, entitlements, queue management, and error handling. This makes it harder to maintain and test.

Consider extracting the following into separate methods:

  1. Idempotency key handling
  2. Entitlement checking
  3. Queue size validation
  4. Run creation logic
  5. Error handling

Example structure:

class TriggerTaskServiceV2 extends WithRunEngine {
  public async call(params: TriggerTaskParams): Promise<TriggerTaskServiceResult | undefined> {
    return await this.traceWithEnv("call()", params.environment, async (span) => {
      this.validateAttempts(params.attempt);
      const existingRun = await this.handleIdempotency(params);
      if (existingRun) return existingRun;
      await this.checkEntitlements(params.environment);
      await this.validateQueueSize(params);
      return await this.createRun(params, span);
    });
  }

  private validateAttempts(attempt: number): void {
    if (attempt > MAX_ATTEMPTS) {
      throw new ServiceValidationError(
        `Failed to trigger after ${MAX_ATTEMPTS} attempts.`
      );
    }
  }

  // ... other extracted methods
}

467-470: Use logger consistently instead of console.log.

The code uses console.log in some places while using the logger utility in others. This inconsistency could make log aggregation and filtering more difficult.

-      console.log("Failed to get queue name: No task found", {
+      logger.debug("Failed to get queue name: No task found", {
         taskId,
         environmentId: environment.id,
       });

-      console.log("Failed to get queue name: Invalid queue config", {
+      logger.debug("Failed to get queue name: Invalid queue config", {
         taskId,
         environmentId: environment.id,
         queueConfig: task.queueConfig,
       });

Also applies to: 478-482


490-533: Consider adding payload size validation.

While the code handles payload offloading based on size, it might be beneficial to add upfront validation to prevent processing of extremely large payloads.

  async #handlePayloadPacket(
    payload: any,
    payloadType: string,
    pathPrefix: string,
    environment: AuthenticatedEnvironment
  ) {
    return await startActiveSpan("handlePayloadPacket()", async (span) => {
+     const maxPayloadSize = env.MAX_PAYLOAD_SIZE || 100 * 1024 * 1024; // 100MB default
+     const payloadSize = Buffer.byteLength(JSON.stringify(payload));
+     if (payloadSize > maxPayloadSize) {
+       throw new ServiceValidationError(
+         `Payload size (${payloadSize} bytes) exceeds maximum allowed size (${maxPayloadSize} bytes)`
+       );
+     }
      const packet = this.#createPayloadPacket(payload, payloadType);
      // ... rest of the method
    });
  }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0cbd59a and d948818.

📒 Files selected for processing (2)
  • apps/webapp/app/v3/services/triggerTask.server.ts (3 hunks)
  • apps/webapp/app/v3/services/triggerTaskV2.server.ts (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (5)
apps/webapp/app/v3/services/triggerTaskV2.server.ts (2)

42-42: Check default attempts and over-retries.

There's an attempt parameter that defaults to 0. Combined with MAX_ATTEMPTS, the code raises an error after attempt > MAX_ATTEMPTS. Carefully verify that you don't introduce an endless loop if an unexpected race condition resets it.


536-563: LGTM! Well-structured queue size management.

The implementation properly handles environment-specific queue size limits with good fallbacks to environment variables.

apps/webapp/app/v3/services/triggerTask.server.ts (3)

17-19: LGTM! Type definitions updated to support batch processing.

The addition of batchIndex and renaming of runId to runFriendlyId improve the service's capability to handle batch operations while maintaining better naming clarity.


37-63: Well-structured version routing implementation.

The service elegantly handles multiple engine versions through:

  • Clean integration with the run engine via WithRunEngine extension
  • Clear version determination logic
  • Modular routing to version-specific implementations

65-91:

✅ Verification successful

Verify version-specific handler implementations.

The handlers are well-separated, but please ensure that:

  1. Both TriggerTaskServiceV1 and TriggerTaskServiceV2 implementations exist
  2. The V2 service correctly utilizes the engine capabilities

🏁 Script executed:

#!/bin/bash
# Verify the existence and implementation of version-specific services
echo "Checking for V1 and V2 service implementations..."
rg -A 5 "class TriggerTaskServiceV1|class TriggerTaskServiceV2" apps/webapp/app/v3/services/

# Verify engine usage in V2
echo "Checking engine usage in V2 service..."
rg -A 10 "this._engine" apps/webapp/app/v3/services/triggerTaskV2.server.ts

Length of output: 3211


Version-specific handler implementations verified; no changes required.

  • Both TriggerTaskServiceV1 and TriggerTaskServiceV2 implementations exist in the repository (located in apps/webapp/app/v3/services/triggerTaskV1.server.ts and triggerTaskV2.server.ts respectively).
  • The V2 service correctly utilizes engine capabilities as evidenced by the usage of this._engine in its implementation.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (14)
internal-packages/run-engine/src/engine/locking.test.ts (3)

1-5: Consider adding test coverage for edge cases.

While the basic locking functionality is tested, consider adding test cases for:

  • Lock timeout scenarios
  • Redis connection failures
  • Concurrent lock attempts from multiple instances

7-23: Enhance test robustness with parameterized timeouts and signal handling.

Consider these improvements:

  1. Extract the lock timeout (5000ms) to a constant or test parameter
  2. Add assertions to verify signal abort functionality

Example improvement:

+const TEST_LOCK_TIMEOUT = 5000;
+
 redisTest("Test acquiring a lock works", { timeout: 15_000 }, async ({ redisOptions }) => {
   const redis = new Redis(redisOptions);
   try {
     const runLock = new RunLocker({ redis });
     expect(runLock.isInsideLock()).toBe(false);
-    await runLock.lock(["test-1"], 5000, async (signal) => {
+    await runLock.lock(["test-1"], TEST_LOCK_TIMEOUT, async (signal) => {
       expect(signal).toBeDefined();
+      expect(signal.aborted).toBe(false);
       expect(runLock.isInsideLock()).toBe(true);
     });
     expect(runLock.isInsideLock()).toBe(false);
   } finally {
     await redis.quit();
   }
 });

25-48: Improve nested lock test coverage and documentation.

The test verifies basic nested locking but could be enhanced:

  1. Verify lock release order
  2. Test with different timeouts for inner/outer locks
  3. Improve comment style

Example improvement:

 redisTest("Test double locking works", { timeout: 15_000 }, async ({ redisOptions }) => {
   const redis = new Redis(redisOptions);
   try {
     const runLock = new RunLocker({ redis });
+    const lockStates: boolean[] = [];
     expect(runLock.isInsideLock()).toBe(false);

-    await runLock.lock(["test-1"], 5000, async (signal) => {
+    await runLock.lock(["test-1"], TEST_LOCK_TIMEOUT, async (signal) => {
       expect(signal).toBeDefined();
       expect(runLock.isInsideLock()).toBe(true);
+      lockStates.push(runLock.isInsideLock());

-      //should be able to "lock it again"
+      // Verify nested lock acquisition with different timeout
-      await runLock.lock(["test-1"], 5000, async (signal) => {
+      await runLock.lock(["test-1"], TEST_LOCK_TIMEOUT / 2, async (signal) => {
         expect(signal).toBeDefined();
         expect(runLock.isInsideLock()).toBe(true);
+        lockStates.push(runLock.isInsideLock());
       });
     });

     expect(runLock.isInsideLock()).toBe(false);
+    // Verify lock states were properly tracked
+    expect(lockStates).toEqual([true, true]);
   } finally {
     await redis.quit();
   }
 });
internal-packages/run-engine/src/engine/tests/delays.test.ts (1)

13-89: Consider making the test more robust against timing issues.

The test uses fixed delays which could be flaky in CI environments. Consider implementing the following improvements:

  1. Use longer delays with wider margins for CI environments
  2. Add retry logic with exponential backoff for status checks
  3. Extract machine configuration to a shared test fixture
-  containerTest("Run start delayed", { timeout: 15_000 }, async ({ prisma, redisOptions }) => {
+  containerTest("Run start delayed", { timeout: 30_000 }, async ({ prisma, redisOptions }) => {
     //create environment
     const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
+    const DELAY_MS = process.env.CI ? 2000 : 500;
+    const WAIT_MS = process.env.CI ? 4000 : 1000;

     // ... engine setup ...

-    delayUntil: new Date(Date.now() + 500),
+    delayUntil: new Date(Date.now() + DELAY_MS),

     // ... trigger run ...

-    await setTimeout(1_000);
+    await setTimeout(WAIT_MS);

-    const executionData2 = await engine.getRunExecutionData({ runId: run.id });
+    // Retry with exponential backoff
+    let attempts = 0;
+    let executionData2;
+    while (attempts < 3) {
+      executionData2 = await engine.getRunExecutionData({ runId: run.id });
+      if (executionData2?.snapshot.executionStatus === "QUEUED") break;
+      await setTimeout(Math.pow(2, attempts) * 1000);
+      attempts++;
+    }
internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts (1)

366-367: Await the asynchronous engine teardown

engine.quit() is asynchronous, so consider awaiting it to ensure that Redis connections are fully closed (and to avoid resource leaks) before finishing the test:

- engine.quit();
+ await engine.quit();
internal-packages/run-engine/src/run-queue/index.ts (4)

1-1681: Consider splitting this large file into smaller modules

This file is quite long and covers many aspects of RunQueue logic, custom Redis commands, concurrency controls, and advanced queue operations all in one place. Splitting it into focused modules (e.g., concurrency config vs. Lua script commands vs. queue operations) can improve readability and maintainability.


64-64: Add or expand doc comments on the RunQueue class

The RunQueue class is central to handling concurrency and messaging. Detailed doc comments or JSDoc annotations on its constructor and key methods would help future maintainers quickly grasp the contract and usage scenarios.


784-785: Update inconsistent messaging system label

Here, [SEMATTRS_MESSAGING_SYSTEM] is set to "marqs". In other parts of this file, it’s consistently set to "runqueue". For clarity and consistency, replace "marqs" with "runqueue":

- [SEMATTRS_MESSAGING_SYSTEM]: "marqs",
+ [SEMATTRS_MESSAGING_SYSTEM]: "runqueue",

1-1681: Add tests for concurrency edge cases

While this file defines robust Lua scripts for concurrency management, there are intricate paths (e.g., zero or very high concurrency, disabled concurrency at the org level, etc.) that may need dedicated tests. Consider expanding coverage to ensure edge scenarios are fully tested.

internal-packages/run-engine/src/engine/tests/ttl.test.ts (2)

85-85: Beware of possible flaky timing

Relying on a fixed 1.5-second delay after triggering a run with a 1-second TTL may work under normal conditions, but test environments can vary in timing. For a more robust approach, you could poll for the run’s status or implement event-based checks to confirm expiration consistently.


103-104: Await the quit call to free resources

engine.quit() is asynchronous. Consider adding await here to ensure all connections are cleanly closed before the test ends, preventing potential resource leaks or lingering connections.

internal-packages/run-engine/src/run-queue/index.test.ts (3)

11-27: Consider extracting retry configuration to constants.

The retry configuration contains magic numbers. Consider extracting these values to named constants for better maintainability and documentation.

+const RETRY_CONFIG = {
+  MAX_ATTEMPTS: 5,
+  FACTOR: 1.1,
+  MIN_TIMEOUT_MS: 100,
+  MAX_TIMEOUT_MS: 1_000,
+} as const;
+
 const testOptions = {
   name: "rq",
   tracer: trace.getTracer("rq"),
   queuePriorityStrategy: new SimpleWeightedChoiceStrategy({ queueSelectionCount: 36 }),
   envQueuePriorityStrategy: new SimpleWeightedChoiceStrategy({ queueSelectionCount: 12 }),
   workers: 1,
   defaultEnvConcurrency: 25,
   enableRebalancing: false,
   logger: new Logger("RunQueue", "warn"),
   retryOptions: {
-    maxAttempts: 5,
-    factor: 1.1,
-    minTimeoutInMs: 100,
-    maxTimeoutInMs: 1_000,
+    maxAttempts: RETRY_CONFIG.MAX_ATTEMPTS,
+    factor: RETRY_CONFIG.FACTOR,
+    minTimeoutInMs: RETRY_CONFIG.MIN_TIMEOUT_MS,
+    maxTimeoutInMs: RETRY_CONFIG.MAX_TIMEOUT_MS,
     randomize: true,
   },
 };

45-67: Consider using a test data factory for message fixtures.

The message fixtures for production and development environments share a lot of common fields. Consider using a factory function to reduce duplication and improve maintainability.

+const createTestMessage = (overrides: Partial<InputPayload> = {}): InputPayload => ({
+  runId: "r1234",
+  taskIdentifier: "task/my-task",
+  orgId: "o1234",
+  projectId: "p1234",
+  environmentId: "e1234",
+  environmentType: "PRODUCTION",
+  queue: "task/my-task",
+  timestamp: Date.now(),
+  attempt: 0,
+  ...overrides,
+});
+
-const messageProd: InputPayload = {
-  runId: "r1234",
-  taskIdentifier: "task/my-task",
-  orgId: "o1234",
-  projectId: "p1234",
-  environmentId: "e1234",
-  environmentType: "PRODUCTION",
-  queue: "task/my-task",
-  timestamp: Date.now(),
-  attempt: 0,
-};
+const messageProd = createTestMessage();
+
+const messageDev = createTestMessage({
+  runId: "r4321",
+  environmentId: "e4321",
+  environmentType: "DEVELOPMENT",
+});
-const messageDev: InputPayload = {
-  runId: "r4321",
-  taskIdentifier: "task/my-task",
-  orgId: "o1234",
-  projectId: "p1234",
-  environmentId: "e4321",
-  environmentType: "DEVELOPMENT",
-  queue: "task/my-task",
-  timestamp: Date.now(),
-  attempt: 0,
-};

69-891: Consider adding edge case tests.

The test suite is comprehensive for happy paths but could benefit from additional edge case tests:

  • Message enqueuing with invalid data
  • Concurrent dequeue operations
  • Redis connection failures

Would you like me to generate test cases for these edge scenarios?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d948818 and 68211e1.

📒 Files selected for processing (8)
  • apps/webapp/test/realtimeClient.test.ts (3 hunks)
  • internal-packages/run-engine/src/engine/locking.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/delays.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/ttl.test.ts (1 hunks)
  • internal-packages/run-engine/src/run-queue/index.test.ts (1 hunks)
  • internal-packages/run-engine/src/run-queue/index.ts (1 hunks)
  • internal-packages/testcontainers/src/index.ts (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal-packages/testcontainers/src/index.ts
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (5)
apps/webapp/test/realtimeClient.test.ts (3)

9-9: LGTM! Parameter update for Redis configuration.

The change from redis to redisOptions is consistent with the updated RealtimeClient configuration pattern.

Also applies to: 13-13


145-145: LGTM! Consistent Redis configuration update.

The Redis configuration changes are applied consistently, maintaining the test's functionality for run tag subscriptions.

Also applies to: 149-149


224-224:

✅ Verification successful

LGTM! Redis configuration updated with comprehensive test coverage.

The Redis configuration changes are consistently applied while maintaining backward compatibility tests. The test suite thoroughly covers:

  1. Concurrency limits
  2. Run tag subscriptions
  3. Older client version compatibility

Run this script to verify test coverage:

Also applies to: 228-228


🏁 Script executed:

#!/bin/bash
# Description: Verify test coverage for RealtimeClient
# Look for any untested Redis-related functionality

rg -A 5 "RealtimeClient.*redis" --type ts

Length of output: 43


I'll run an additional verification script to check for references to the updated Redis configuration in the test file. This will help ensure that the changes (such as the redisOptions parameter) are properly covered in the tests.


🏁 Script executed:

#!/bin/bash
# Searching in the realtimeClient test file for references to redisOptions for more context
rg -A 10 "redisOptions" apps/webapp/test/realtimeClient.test.ts

Length of output: 1335


LGTM! The updated test cases in apps/webapp/test/realtimeClient.test.ts now clearly instantiate RealtimeClient using the new redisOptions parameter. The tests consistently verify that the Redis configuration is applied correctly and maintain backward compatibility (including checks for concurrency limits, run tag subscriptions, and older client version support) as seen in changes at lines 224 and 228.

internal-packages/run-engine/src/engine/tests/delays.test.ts (1)

1-11: LGTM! Well-structured imports.

The imports are well-organized and appropriate for the testing requirements, including containerization, authentication, worker setup, and OpenTelemetry tracing.

internal-packages/run-engine/src/run-queue/index.test.ts (1)

70-110: LGTM! Well-structured test for concurrency limits.

The test case effectively validates the queue concurrency limit functionality with proper cleanup.

Comment on lines +46 to +50
const backgroundWorker = await setupBackgroundWorker(
prisma,
authenticatedEnvironment,
taskIdentifier
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure proper cleanup of background worker.

The test should clean up the background worker in the finally block to prevent resource leaks.

   try {
     const taskIdentifier = "test-task";
     const backgroundWorker = await setupBackgroundWorker(
       prisma,
       authenticatedEnvironment,
       taskIdentifier
     );
     // ... test code ...
   } finally {
     engine.quit();
+    await backgroundWorker.stop();
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const backgroundWorker = await setupBackgroundWorker(
prisma,
authenticatedEnvironment,
taskIdentifier
);
try {
const taskIdentifier = "test-task";
const backgroundWorker = await setupBackgroundWorker(
prisma,
authenticatedEnvironment,
taskIdentifier
);
// ... test code ...
} finally {
engine.quit();
await backgroundWorker.stop();
}

Comment on lines 91 to 183
containerTest(
"Rescheduling a delayed run",
{ timeout: 15_000 },
async ({ prisma, redisOptions }) => {
//create environment
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");

const engine = new RunEngine({
prisma,
redis: {
...redisOptions,
},
worker: {
workers: 1,
tasksPerWorker: 10,
pollIntervalMs: 100,
},
machines: {
defaultMachine: "small-1x",
machines: {
"small-1x": {
name: "small-1x" as const,
cpu: 0.5,
memory: 0.5,
centsPerMs: 0.0001,
},
},
baseCostInCents: 0.0001,
},
tracer: trace.getTracer("test", "0.0.0"),
});

try {
const taskIdentifier = "test-task";

//create background worker
const backgroundWorker = await setupBackgroundWorker(
prisma,
authenticatedEnvironment,
taskIdentifier
);

//trigger the run
const run = await engine.trigger(
{
number: 1,
friendlyId: "run_1234",
environment: authenticatedEnvironment,
taskIdentifier,
payload: "{}",
payloadType: "application/json",
context: {},
traceContext: {},
traceId: "t12345",
spanId: "s12345",
masterQueue: "main",
queueName: "task/test-task",
isTest: false,
tags: [],
delayUntil: new Date(Date.now() + 200),
},
prisma
);

//should be created but not queued yet
const executionData = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionData);
expect(executionData.snapshot.executionStatus).toBe("RUN_CREATED");

const rescheduleTo = new Date(Date.now() + 1_500);
const updatedRun = await engine.rescheduleRun({ runId: run.id, delayUntil: rescheduleTo });
expect(updatedRun.delayUntil?.toISOString()).toBe(rescheduleTo.toISOString());

//wait so the initial delay passes
await setTimeout(1_000);

//should still be created
const executionData2 = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionData2);
expect(executionData2.snapshot.executionStatus).toBe("RUN_CREATED");

//wait so the updated delay passes
await setTimeout(1_750);

//should now be queued
const executionData3 = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionData3);
expect(executionData3.snapshot.executionStatus).toBe("QUEUED");
} finally {
engine.quit();
}
}
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve test coverage and reliability for rescheduling.

The test has similar timing issues as the first test and could benefit from additional verifications:

  1. Verify that the initial delay was cancelled
  2. Add retry logic for status checks
  3. Clean up background worker
   try {
     const taskIdentifier = "test-task";
     const backgroundWorker = await setupBackgroundWorker(
       prisma,
       authenticatedEnvironment,
       taskIdentifier
     );

+    const INITIAL_DELAY_MS = process.env.CI ? 1000 : 200;
+    const RESCHEDULE_DELAY_MS = process.env.CI ? 4000 : 1500;
+    const INITIAL_WAIT_MS = process.env.CI ? 2000 : 1000;
+    const RESCHEDULE_WAIT_MS = process.env.CI ? 5000 : 1750;

     //trigger the run with initial delay
-    delayUntil: new Date(Date.now() + 200),
+    delayUntil: new Date(Date.now() + INITIAL_DELAY_MS),

     // Reschedule
-    const rescheduleTo = new Date(Date.now() + 1_500);
+    const rescheduleTo = new Date(Date.now() + RESCHEDULE_DELAY_MS);
     const updatedRun = await engine.rescheduleRun({ runId: run.id, delayUntil: rescheduleTo });

     //wait and verify still in created state
-    await setTimeout(1_000);
+    await setTimeout(INITIAL_WAIT_MS);

+    // Verify the initial delay was cancelled by checking Redis
+    const cancelledDelay = await engine.getScheduledDelay(run.id);
+    expect(cancelledDelay).toBeNull();

     //wait for updated delay
-    await setTimeout(1_750);
+    await setTimeout(RESCHEDULE_WAIT_MS);

     // Add retry logic for final status check
+    let attempts = 0;
+    let executionData3;
+    while (attempts < 3) {
+      executionData3 = await engine.getRunExecutionData({ runId: run.id });
+      if (executionData3?.snapshot.executionStatus === "QUEUED") break;
+      await setTimeout(Math.pow(2, attempts) * 1000);
+      attempts++;
+    }

   } finally {
     engine.quit();
+    await backgroundWorker.stop();
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
containerTest(
"Rescheduling a delayed run",
{ timeout: 15_000 },
async ({ prisma, redisOptions }) => {
//create environment
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
const engine = new RunEngine({
prisma,
redis: {
...redisOptions,
},
worker: {
workers: 1,
tasksPerWorker: 10,
pollIntervalMs: 100,
},
machines: {
defaultMachine: "small-1x",
machines: {
"small-1x": {
name: "small-1x" as const,
cpu: 0.5,
memory: 0.5,
centsPerMs: 0.0001,
},
},
baseCostInCents: 0.0001,
},
tracer: trace.getTracer("test", "0.0.0"),
});
try {
const taskIdentifier = "test-task";
//create background worker
const backgroundWorker = await setupBackgroundWorker(
prisma,
authenticatedEnvironment,
taskIdentifier
);
//trigger the run
const run = await engine.trigger(
{
number: 1,
friendlyId: "run_1234",
environment: authenticatedEnvironment,
taskIdentifier,
payload: "{}",
payloadType: "application/json",
context: {},
traceContext: {},
traceId: "t12345",
spanId: "s12345",
masterQueue: "main",
queueName: "task/test-task",
isTest: false,
tags: [],
delayUntil: new Date(Date.now() + 200),
},
prisma
);
//should be created but not queued yet
const executionData = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionData);
expect(executionData.snapshot.executionStatus).toBe("RUN_CREATED");
const rescheduleTo = new Date(Date.now() + 1_500);
const updatedRun = await engine.rescheduleRun({ runId: run.id, delayUntil: rescheduleTo });
expect(updatedRun.delayUntil?.toISOString()).toBe(rescheduleTo.toISOString());
//wait so the initial delay passes
await setTimeout(1_000);
//should still be created
const executionData2 = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionData2);
expect(executionData2.snapshot.executionStatus).toBe("RUN_CREATED");
//wait so the updated delay passes
await setTimeout(1_750);
//should now be queued
const executionData3 = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionData3);
expect(executionData3.snapshot.executionStatus).toBe("QUEUED");
} finally {
engine.quit();
}
}
);
containerTest(
"Rescheduling a delayed run",
{ timeout: 15_000 },
async ({ prisma, redisOptions }) => {
//create environment
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
const engine = new RunEngine({
prisma,
redis: {
...redisOptions,
},
worker: {
workers: 1,
tasksPerWorker: 10,
pollIntervalMs: 100,
},
machines: {
defaultMachine: "small-1x",
machines: {
"small-1x": {
name: "small-1x" as const,
cpu: 0.5,
memory: 0.5,
centsPerMs: 0.0001,
},
},
baseCostInCents: 0.0001,
},
tracer: trace.getTracer("test", "0.0.0"),
});
try {
const taskIdentifier = "test-task";
//create background worker
const backgroundWorker = await setupBackgroundWorker(
prisma,
authenticatedEnvironment,
taskIdentifier
);
const INITIAL_DELAY_MS = process.env.CI ? 1000 : 200;
const RESCHEDULE_DELAY_MS = process.env.CI ? 4000 : 1500;
const INITIAL_WAIT_MS = process.env.CI ? 2000 : 1000;
const RESCHEDULE_WAIT_MS = process.env.CI ? 5000 : 1750;
//trigger the run with initial delay
const run = await engine.trigger(
{
number: 1,
friendlyId: "run_1234",
environment: authenticatedEnvironment,
taskIdentifier,
payload: "{}",
payloadType: "application/json",
context: {},
traceContext: {},
traceId: "t12345",
spanId: "s12345",
masterQueue: "main",
queueName: "task/test-task",
isTest: false,
tags: [],
delayUntil: new Date(Date.now() + INITIAL_DELAY_MS),
},
prisma
);
//should be created but not queued yet
const executionData = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionData);
expect(executionData.snapshot.executionStatus).toBe("RUN_CREATED");
const rescheduleTo = new Date(Date.now() + RESCHEDULE_DELAY_MS);
const updatedRun = await engine.rescheduleRun({ runId: run.id, delayUntil: rescheduleTo });
expect(updatedRun.delayUntil?.toISOString()).toBe(rescheduleTo.toISOString());
//wait so the initial delay passes
await setTimeout(INITIAL_WAIT_MS);
// Verify the initial delay was cancelled by checking Redis
const cancelledDelay = await engine.getScheduledDelay(run.id);
expect(cancelledDelay).toBeNull();
//wait for updated delay
await setTimeout(RESCHEDULE_WAIT_MS);
// Add retry logic for final status check
let attempts = 0;
let executionData3;
while (attempts < 3) {
executionData3 = await engine.getRunExecutionData({ runId: run.id });
if (executionData3?.snapshot.executionStatus === "QUEUED") break;
await setTimeout(Math.pow(2, attempts) * 1000);
attempts++;
}
assertNonNullable(executionData3);
expect(executionData3.snapshot.executionStatus).toBe("QUEUED");
} finally {
engine.quit();
await backgroundWorker.stop();
}
}
);

id: batch.id,
},
});
expect(batchAfter?.status === "COMPLETED");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use toBe("COMPLETED") for an actual assertion

You're using expect(batchAfter?.status === "COMPLETED"); on line 364, which won't fail if the condition is false. To properly assert that the batch status is "COMPLETED", update the check as follows:

- expect(batchAfter?.status === "COMPLETED");
+ expect(batchAfter?.status).toBe("COMPLETED");
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
expect(batchAfter?.status === "COMPLETED");
expect(batchAfter?.status).toBe("COMPLETED");

Comment on lines +793 to +890
redisTest("Dead Letter Queue", { timeout: 8_000 }, async ({ redisContainer, redisOptions }) => {
const queue = new RunQueue({
...testOptions,
retryOptions: {
maxAttempts: 1,
},
redis: {
keyPrefix: "runqueue:test:",
host: redisContainer.getHost(),
port: redisContainer.getPort(),
},
});

const redis = new Redis({ ...redisOptions, keyPrefix: "runqueue:test:" });

try {
await queue.enqueueMessage({
env: authenticatedEnvProd,
message: messageProd,
masterQueues: "main",
});

const messages = await queue.dequeueMessageFromMasterQueue("test_12345", "main", 10);
expect(messages.length).toBe(1);

//check the message is there
const key = queue.keys.messageKey(messages[0].message.orgId, messages[0].messageId);
const exists = await redis.exists(key);
expect(exists).toBe(1);

//nack (we only have attempts set to 1)
await queue.nackMessage({
orgId: messages[0].message.orgId,
messageId: messages[0].messageId,
});

//dequeue
const messages2 = await queue.dequeueMessageFromMasterQueue("test_12345", "main", 10);
expect(messages2.length).toBe(0);

//concurrencies
const queueConcurrency2 = await queue.currentConcurrencyOfQueue(
authenticatedEnvProd,
messageProd.queue
);
expect(queueConcurrency2).toBe(0);
const envConcurrency2 = await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd);
expect(envConcurrency2).toBe(0);
const projectConcurrency2 = await queue.currentConcurrencyOfProject(authenticatedEnvProd);
expect(projectConcurrency2).toBe(0);
const taskConcurrency2 = await queue.currentConcurrencyOfTask(
authenticatedEnvProd,
messageProd.taskIdentifier
);
expect(taskConcurrency2).toBe(0);

//check the message is still there
const exists2 = await redis.exists(key);
expect(exists2).toBe(1);

//check it's in the dlq
const dlqKey = "dlq";
const dlqExists = await redis.exists(dlqKey);
expect(dlqExists).toBe(1);
const dlqMembers = await redis.zrange(dlqKey, 0, -1);
expect(dlqMembers).toContain(messageProd.runId);

//redrive
const redisClient = new Redis({
host: redisContainer.getHost(),
port: redisContainer.getPort(),
password: redisContainer.getPassword(),
});

// Publish redrive message
await redisClient.publish(
"rq:redrive",
JSON.stringify({ runId: messageProd.runId, orgId: messageProd.orgId })
);

// Wait for the item to be redrived and processed
await setTimeout(5_000);
await redisClient.quit();

//shouldn't be in the dlq now
const dlqMembersAfter = await redis.zrange(dlqKey, 0, -1);
expect(dlqMembersAfter).not.toContain(messageProd.runId);

//dequeue
const messages3 = await queue.dequeueMessageFromMasterQueue("test_12345", "main", 10);
expect(messages3[0].messageId).toBe(messageProd.runId);
} finally {
try {
await queue.quit();
await redis.quit();
} catch (e) {}
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider improving the Dead Letter Queue test reliability.

The DLQ test has a potential race condition due to the fixed wait time. Consider using a polling mechanism instead of a fixed timeout.

-      // Wait for the item to be redrived and processed
-      await setTimeout(5_000);
+      // Poll for the item to be removed from DLQ
+      const MAX_RETRIES = 10;
+      const POLL_INTERVAL = 500;
+      let retries = 0;
+      while (retries < MAX_RETRIES) {
+        const dlqMembersAfter = await redis.zrange(dlqKey, 0, -1);
+        if (!dlqMembersAfter.includes(messageProd.runId)) {
+          break;
+        }
+        await setTimeout(POLL_INTERVAL);
+        retries++;
+      }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
redisTest("Dead Letter Queue", { timeout: 8_000 }, async ({ redisContainer, redisOptions }) => {
const queue = new RunQueue({
...testOptions,
retryOptions: {
maxAttempts: 1,
},
redis: {
keyPrefix: "runqueue:test:",
host: redisContainer.getHost(),
port: redisContainer.getPort(),
},
});
const redis = new Redis({ ...redisOptions, keyPrefix: "runqueue:test:" });
try {
await queue.enqueueMessage({
env: authenticatedEnvProd,
message: messageProd,
masterQueues: "main",
});
const messages = await queue.dequeueMessageFromMasterQueue("test_12345", "main", 10);
expect(messages.length).toBe(1);
//check the message is there
const key = queue.keys.messageKey(messages[0].message.orgId, messages[0].messageId);
const exists = await redis.exists(key);
expect(exists).toBe(1);
//nack (we only have attempts set to 1)
await queue.nackMessage({
orgId: messages[0].message.orgId,
messageId: messages[0].messageId,
});
//dequeue
const messages2 = await queue.dequeueMessageFromMasterQueue("test_12345", "main", 10);
expect(messages2.length).toBe(0);
//concurrencies
const queueConcurrency2 = await queue.currentConcurrencyOfQueue(
authenticatedEnvProd,
messageProd.queue
);
expect(queueConcurrency2).toBe(0);
const envConcurrency2 = await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd);
expect(envConcurrency2).toBe(0);
const projectConcurrency2 = await queue.currentConcurrencyOfProject(authenticatedEnvProd);
expect(projectConcurrency2).toBe(0);
const taskConcurrency2 = await queue.currentConcurrencyOfTask(
authenticatedEnvProd,
messageProd.taskIdentifier
);
expect(taskConcurrency2).toBe(0);
//check the message is still there
const exists2 = await redis.exists(key);
expect(exists2).toBe(1);
//check it's in the dlq
const dlqKey = "dlq";
const dlqExists = await redis.exists(dlqKey);
expect(dlqExists).toBe(1);
const dlqMembers = await redis.zrange(dlqKey, 0, -1);
expect(dlqMembers).toContain(messageProd.runId);
//redrive
const redisClient = new Redis({
host: redisContainer.getHost(),
port: redisContainer.getPort(),
password: redisContainer.getPassword(),
});
// Publish redrive message
await redisClient.publish(
"rq:redrive",
JSON.stringify({ runId: messageProd.runId, orgId: messageProd.orgId })
);
// Wait for the item to be redrived and processed
await setTimeout(5_000);
await redisClient.quit();
//shouldn't be in the dlq now
const dlqMembersAfter = await redis.zrange(dlqKey, 0, -1);
expect(dlqMembersAfter).not.toContain(messageProd.runId);
//dequeue
const messages3 = await queue.dequeueMessageFromMasterQueue("test_12345", "main", 10);
expect(messages3[0].messageId).toBe(messageProd.runId);
} finally {
try {
await queue.quit();
await redis.quit();
} catch (e) {}
}
});
redisTest("Dead Letter Queue", { timeout: 8_000 }, async ({ redisContainer, redisOptions }) => {
const queue = new RunQueue({
...testOptions,
retryOptions: {
maxAttempts: 1,
},
redis: {
keyPrefix: "runqueue:test:",
host: redisContainer.getHost(),
port: redisContainer.getPort(),
},
});
const redis = new Redis({ ...redisOptions, keyPrefix: "runqueue:test:" });
try {
await queue.enqueueMessage({
env: authenticatedEnvProd,
message: messageProd,
masterQueues: "main",
});
const messages = await queue.dequeueMessageFromMasterQueue("test_12345", "main", 10);
expect(messages.length).toBe(1);
//check the message is there
const key = queue.keys.messageKey(messages[0].message.orgId, messages[0].message.messageId);
const exists = await redis.exists(key);
expect(exists).toBe(1);
//nack (we only have attempts set to 1)
await queue.nackMessage({
orgId: messages[0].message.orgId,
messageId: messages[0].messageId,
});
//dequeue
const messages2 = await queue.dequeueMessageFromMasterQueue("test_12345", "main", 10);
expect(messages2.length).toBe(0);
//concurrencies
const queueConcurrency2 = await queue.currentConcurrencyOfQueue(
authenticatedEnvProd,
messageProd.queue
);
expect(queueConcurrency2).toBe(0);
const envConcurrency2 = await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd);
expect(envConcurrency2).toBe(0);
const projectConcurrency2 = await queue.currentConcurrencyOfProject(authenticatedEnvProd);
expect(projectConcurrency2).toBe(0);
const taskConcurrency2 = await queue.currentConcurrencyOfTask(
authenticatedEnvProd,
messageProd.taskIdentifier
);
expect(taskConcurrency2).toBe(0);
//check the message is still there
const exists2 = await redis.exists(key);
expect(exists2).toBe(1);
//check it's in the dlq
const dlqKey = "dlq";
const dlqExists = await redis.exists(dlqKey);
expect(dlqExists).toBe(1);
const dlqMembers = await redis.zrange(dlqKey, 0, -1);
expect(dlqMembers).toContain(messageProd.runId);
//redrive
const redisClient = new Redis({
host: redisContainer.getHost(),
port: redisContainer.getPort(),
password: redisContainer.getPassword(),
});
// Publish redrive message
await redisClient.publish(
"rq:redrive",
JSON.stringify({ runId: messageProd.runId, orgId: messageProd.orgId })
);
// Poll for the item to be removed from DLQ
const MAX_RETRIES = 10;
const POLL_INTERVAL = 500;
let retries = 0;
while (retries < MAX_RETRIES) {
const dlqMembersAfter = await redis.zrange(dlqKey, 0, -1);
if (!dlqMembersAfter.includes(messageProd.runId)) {
break;
}
await setTimeout(POLL_INTERVAL);
retries++;
}
await redisClient.quit();
//shouldn't be in the dlq now
const dlqMembersAfter = await redis.zrange(dlqKey, 0, -1);
expect(dlqMembersAfter).not.toContain(messageProd.runId);
//dequeue
const messages3 = await queue.dequeueMessageFromMasterQueue("test_12345", "main", 10);
expect(messages3[0].messageId).toBe(messageProd.runId);
} finally {
try {
await queue.quit();
await redis.quit();
} catch (e) {}
}
});

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (30)
apps/webapp/test/fairDequeuingStrategy.test.ts (2)

15-15: Consider adjusting the test timeout.

The 30-second timeout might be too long for unit tests. Consider reducing it unless there's a specific need for such a long duration.

-vi.setConfig({ testTimeout: 30_000 }); // 30 seconds timeout
+vi.setConfig({ testTimeout: 10_000 }); // 10 seconds timeout

230-231: Remove or use proper logging.

Consider replacing console.log with proper test logging utilities or remove them if they're only used for debugging.

-console.log("First distribution took", distribute1Duration, "ms");
-console.log("Second distribution took", distribute2Duration, "ms");
-console.log("Third distribution took", distribute3Duration, "ms");

Also applies to: 248-249, 262-263

internal-packages/run-engine/src/engine/tests/trigger.test.ts (4)

13-13: Consider parameterizing common test configurations.

All three test cases share identical timeout values and similar engine configurations. Consider extracting these into shared test fixtures or helper functions to improve maintainability and reduce duplication.

Example refactor:

const TEST_TIMEOUT = 15_000;
const DEFAULT_ENGINE_CONFIG = (redisOptions: RedisOptions) => ({
  worker: {
    redis: redisOptions,
    workers: 1,
    tasksPerWorker: 10,
    pollIntervalMs: 100,
  },
  queue: { redis: redisOptions },
  runLock: { redis: redisOptions },
  machines: {
    defaultMachine: "small-1x",
    machines: {
      "small-1x": {
        name: "small-1x" as const,
        cpu: 0.5,
        memory: 0.5,
        centsPerMs: 0.0001,
      },
    },
    baseCostInCents: 0.0001,
  },
  tracer: trace.getTracer("test", "0.0.0"),
});

Also applies to: 207-207, 334-334


46-204: Consider breaking down the success test into smaller sub-tests.

The success test case is quite long and tests multiple aspects. Consider breaking it down into smaller, focused test cases using describe blocks:

  1. Run triggering and database persistence
  2. Queue management and concurrency
  3. Run execution and completion
  4. Event handling
  5. Waitpoint management

This would improve test readability and maintenance.


285-290: Extract error fixture to reduce duplication.

The error object is duplicated in both the failure and retry tests. Consider extracting it to a shared test fixture.

const TEST_USER_ERROR = {
  type: "BUILT_IN_ERROR" as const,
  name: "UserError",
  message: "This is a user error",
  stackTrace: "Error: This is a user error\n    at <anonymous>:1:1",
};

Also applies to: 415-420


1-491: Add test coverage for edge cases and error scenarios.

While the basic success, failure, and retry paths are well tested, consider adding tests for:

  1. Invalid inputs
  2. Race conditions
  3. Network failures
  4. Redis connection issues
  5. Database transaction failures

This would improve the robustness of the test suite and help catch potential issues early.

internal-packages/run-engine/src/engine/tests/priority.test.ts (3)

63-63: Consider using a more descriptive variable name for priorities array.

The array contains a mix of priorities including undefined and negative values. A more descriptive name like mixedPriorities or priorityLevels would better convey its purpose.

-const priorities = [undefined, 500, -1200, 1000, 4000];
+const priorityLevels = [undefined, 500, -1200, 1000, 4000];

79-79: Add a comment explaining the negative priority behavior.

The comment about expecting 4 items could be more descriptive about why negative priorities are handled differently.

-//dequeue (expect 4 items because of the negative priority)
+//dequeue (expect 4 items initially, as the negative priority run is delayed and dequeued separately)

91-92: Consider making the delay duration configurable.

The 2-second delay for negative priority is hardcoded. Consider making it configurable or proportional to the priority value.

-await setTimeout(2_000);
+const NEGATIVE_PRIORITY_DELAY_MS = 2_000;
+await setTimeout(NEGATIVE_PRIORITY_DELAY_MS);
internal-packages/run-engine/src/engine/tests/cancelling.test.ts (2)

154-154: Convert TODO comment into a test case.

The TODO comment about bulk cancelling runs should be implemented as a test case to ensure complete coverage of cancellation scenarios.

Would you like me to help create a test case for bulk run cancellation?


179-180: Consider making the delay duration configurable.

The 200ms delay for async cancellation is hardcoded. Consider making it configurable to avoid potential flakiness in CI.

-await setTimeout(200);
+const ASYNC_CANCELLATION_DELAY_MS = 200;
+await setTimeout(ASYNC_CANCELLATION_DELAY_MS);
internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts (2)

171-171: Consider making the delay duration configurable.

The 500ms delay for waitpoint completion is hardcoded. Consider making it configurable to avoid potential flakiness in CI.

-await setTimeout(500);
+const WAITPOINT_COMPLETION_DELAY_MS = 500;
+await setTimeout(WAITPOINT_COMPLETION_DELAY_MS);

409-409: Consider making the delay duration configurable.

Similar to the previous delay, this 500ms delay should also be configurable.

-await setTimeout(500);
+const WAITPOINT_COMPLETION_DELAY_MS = 500;
+await setTimeout(WAITPOINT_COMPLETION_DELAY_MS);
apps/webapp/app/env.server.ts (2)

376-378: Consider adding validation for worker configuration.

The worker count and tasks per worker should have minimum values to prevent invalid configurations.

-RUN_ENGINE_WORKER_COUNT: z.coerce.number().int().default(4),
-RUN_ENGINE_TASKS_PER_WORKER: z.coerce.number().int().default(10),
+RUN_ENGINE_WORKER_COUNT: z.coerce.number().int().min(1).default(4),
+RUN_ENGINE_TASKS_PER_WORKER: z.coerce.number().int().min(1).default(10),

379-382: Consider adding validation for timeout values.

The timeout values should have minimum values to prevent invalid configurations.

-RUN_ENGINE_TIMEOUT_PENDING_EXECUTING: z.coerce.number().int().default(60_000),
-RUN_ENGINE_TIMEOUT_PENDING_CANCEL: z.coerce.number().int().default(60_000),
-RUN_ENGINE_TIMEOUT_EXECUTING: z.coerce.number().int().default(60_000),
-RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS: z.coerce.number().int().default(60_000),
+RUN_ENGINE_TIMEOUT_PENDING_EXECUTING: z.coerce.number().int().min(1000).default(60_000),
+RUN_ENGINE_TIMEOUT_PENDING_CANCEL: z.coerce.number().int().min(1000).default(60_000),
+RUN_ENGINE_TIMEOUT_EXECUTING: z.coerce.number().int().min(1000).default(60_000),
+RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS: z.coerce.number().int().min(1000).default(60_000),
internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts (6)

1-11: Consider adding type annotations for test environment utilities.

The imports look good, but consider adding type annotations for the test environment setup functions to improve type safety and documentation.

-import {
-  containerTest,
-  setupAuthenticatedEnvironment,
-  setupBackgroundWorker,
-} from "@internal/testcontainers";
+import type {
+  TestContainer,
+  AuthenticatedEnvironment,
+  BackgroundWorker,
+} from "@internal/testcontainers";
+import {
+  containerTest,
+  setupAuthenticatedEnvironment,
+  setupBackgroundWorker,
+} from "@internal/testcontainers";

12-48: Extract test configuration for better maintainability.

Consider the following improvements:

  1. Extract the timeout value to a constant for reusability.
  2. Move the machine configuration to a shared test config file.
+const TEST_TIMEOUT = 15_000;
+
 describe("RunEngine batchTrigger", () => {
   containerTest(
     "Batch trigger shares a batch",
-    { timeout: 15_000 },
+    { timeout: TEST_TIMEOUT },
     async ({ prisma, redisOptions }) => {

Create a new file test-config.ts:

export const testMachineConfig = {
  defaultMachine: "small-1x",
  machines: {
    "small-1x": {
      name: "small-1x" as const,
      cpu: 0.5,
      memory: 0.5,
      centsPerMs: 0.0001,
    },
  },
  baseCostInCents: 0.0005,
};

49-107: Enhance test data creation and coverage.

Consider the following improvements:

  1. Create test data factories for consistent test data creation.
  2. Add test cases for error scenarios and different batch sizes.

Create a test data factory:

interface TriggerRunParams {
  number: number;
  friendlyId: string;
  batchIndex: number;
}

function createTriggerRunData(
  params: TriggerRunParams,
  environment: AuthenticatedEnvironment,
  batchId: string
) {
  return {
    number: params.number,
    friendlyId: params.friendlyId,
    environment,
    taskIdentifier: "test-task",
    payload: "{}",
    payloadType: "application/json",
    context: {},
    traceContext: {},
    traceId: "t12345",
    spanId: "s12345",
    masterQueue: "main",
    queueName: "task/test-task",
    isTest: false,
    tags: [],
    batch: { id: batchId, index: params.batchIndex },
  };
}

Add test cases for error scenarios:

it("should handle batch processing errors", async () => {
  // Test implementation
});

it("should process large batches efficiently", async () => {
  // Test implementation
});

108-120: Add more comprehensive assertions.

Consider adding assertions for:

  1. Run status
  2. Queue priority
  3. Batch metadata
 expect(run2.batchId).toBe(batch.id);
+expect(run2.status).toBe("PENDING");
+expect(run2.priority).toBe(0);
+
+// Verify batch metadata
+const batchMetadata = await prisma.batchTaskRun.findUnique({
+  where: { id: batch.id },
+  select: { totalRuns: true, completedRuns: true },
+});
+expect(batchMetadata?.totalRuns).toBe(2);
+expect(batchMetadata?.completedRuns).toBe(0);

121-159: Improve run completion process.

Consider the following improvements:

  1. Add error handling for failed completions
  2. Make completions concurrent for better performance
-const result1 = await engine.completeRunAttempt({
-  runId: attempt1.run.id,
-  snapshotId: attempt1.snapshot.id,
-  completion: {
-    ok: true,
-    id: attempt1.run.id,
-    output: `{"foo":"bar"}`,
-    outputType: "application/json",
-  },
-});
-const result2 = await engine.completeRunAttempt({
+const [result1, result2] = await Promise.all([
+  engine.completeRunAttempt({
+    runId: attempt1.run.id,
+    snapshotId: attempt1.snapshot.id,
+    completion: {
+      ok: true,
+      id: attempt1.run.id,
+      output: `{"foo":"bar"}`,
+      outputType: "application/json",
+    },
+  }).catch(error => {
+    console.error("Failed to complete run 1:", error);
+    throw error;
+  }),
+  engine.completeRunAttempt({

178-182: Enhance test cleanup.

Consider adding cleanup for the background worker and any remaining data.

 } finally {
   engine.quit();
+  await backgroundWorker?.stop();
+  // Clean up test data
+  await prisma.batchTaskRun.deleteMany({
+    where: { runtimeEnvironmentId: authenticatedEnvironment.id },
+  });
 }
apps/webapp/app/routes/api.v1.dev.presence.ts (1)

43-48: Implement the TODO for optimizing Redis operations.

The current implementation uses setex which requires two Redis operations. As noted in the TODO, this can be optimized.

Apply this diff to optimize the Redis operation:

-        //todo set a string instead, with the expire on the same call
-        //won't need multi
-
-        // Set initial presence with more context
-        await redis.setex(presenceKey, env.DEV_PRESENCE_TTL_MS / 1000, Date.now().toString());
+        // Set presence with expiry in a single operation
+        await redis.set(
+          presenceKey,
+          JSON.stringify({
+            lastSeen: Date.now(),
+            connectionId: id
+          }),
+          'EX',
+          Math.floor(env.DEV_PRESENCE_TTL_MS / 1000)
+        );
internal-packages/run-engine/src/engine/tests/waitpoints.test.ts (4)

10-10: Consider using a more precise timing mechanism.
Relying on a fixed setTimeout in tests can lead to flaky behavior, especially on busy CI machines. Whenever possible, use deterministic test strategies (e.g., mocking time or polling the database until the waitpoint is complete) to enhance reliability.


13-120: Possible flakiness with short wait durations.
In the "waitForDuration" test, using a wait of 1_000 ms and then checking completion by waiting only 1_500 ms (line 105) can be tight. Slight CI delays could cause sporadic failures. Consider adding a more generous buffer or employing a mocked clock for deterministic control.


400-516: Potential for time-based test flakiness.
The "Manual waitpoint timeout" test depends on failAfter: new Date(Date.now() + 200) and then a 750 ms wait (line 496). On slower systems, this might intermittently fail. Consider adding a larger margin or mocking time to ensure reliability.


519-666: Impressive stress test, but watch concurrency overhead.
The "Race condition with multiple waitpoints completing simultaneously" scenario demonstrates robust concurrency handling. Keep an eye on performance overhead for large iteration counts (line 596). If the environment is resource-constrained, a high iteration count might slow CI pipelines significantly.

internal-packages/run-engine/src/engine/tests/notDeployed.test.ts (1)

134-135: Avoid arbitrary delays for worker deployment checks.
Waiting 500 ms (line 134) might cause unpredictable outcomes on certain CI systems. Consider a loop-based approach that polls the run statuses until they match the expected "QUEUED" or a specific time limit elapses.

internal-packages/run-engine/src/engine/tests/heartbeats.test.ts (1)

1-493: Enhance test coverage and organization.

The tests are well-structured and cover important edge cases. Consider these improvements:

  1. Add test cases for:

    • Concurrent heartbeat timeouts
    • Race conditions between cancellation and timeout
    • Edge cases around retry timing
    • Recovery after temporary Redis failures
  2. Improve test organization:

    • Extract common setup into beforeEach
    • Use shared test fixtures
    • Group related test cases using describe blocks

Here's an example of how to improve the test organization:

 describe("RunEngine heartbeats", () => {
+  let prisma: any;
+  let redisOptions: any;
+  let engine: RunEngine;
+  let authenticatedEnvironment: any;
+
+  beforeEach(async () => {
+    // Common setup code
+    authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
+    engine = new RunEngine({
+      prisma,
+      worker: {
+        redis: redisOptions,
+        workers: 1,
+        tasksPerWorker: 10,
+        pollIntervalMs: 100,
+      },
+      // ... other common options
+    });
+  });
+
+  afterEach(async () => {
+    await engine.quit();
+  });
+
+  describe("timeout scenarios", () => {
     containerTest(
       "Attempt timeout then successfully attempted",
       // ... existing test
     );
+
+    containerTest(
+      "Concurrent heartbeat timeouts",
+      { timeout: 15_000 },
+      async () => {
+        // New test for concurrent timeouts
+      }
+    );
+  });
+
+  describe("cancellation scenarios", () => {
+    containerTest(
+      "Race between cancellation and timeout",
+      { timeout: 15_000 },
+      async () => {
+        // New test for race conditions
+      }
+    );
+  });
 });
internal-packages/run-engine/src/run-queue/index.ts (2)

614-648: Optimize Redis connection handling in queueConcurrencyScanStream.

The current implementation creates a new Redis connection for each scan stream. This could be optimized to reuse connections from a pool.

Here's how to improve the implementation:

+  private redisPool: Redis[] = [];
+  private readonly MAX_POOL_SIZE = 5;
+
+  private getRedisFromPool(): Redis {
+    let redis = this.redisPool.pop();
+    if (!redis) {
+      redis = this.redis.duplicate();
+      if (this.redisPool.length < this.MAX_POOL_SIZE) {
+        this.redisPool.push(redis);
+      }
+    }
+    return redis;
+  }
+
   queueConcurrencyScanStream(
     count: number = 100,
     onEndCallback?: () => void,
     onErrorCallback?: (error: Error) => void
   ) {
     const pattern = this.keys.queueCurrentConcurrencyScanPattern();

     this.logger.debug("Starting queue concurrency scan stream", {
       pattern,
       component: "runqueue",
       operation: "queueConcurrencyScanStream",
       service: this.name,
       count,
     });

-    const redis = this.redis.duplicate();
+    const redis = this.getRedisFromPool();

     const stream = redis.scanStream({
       match: pattern,
       type: "set",
       count,
     });

     stream.on("end", () => {
       onEndCallback?.();
-      redis.quit();
+      this.redisPool.push(redis);
     });

     stream.on("error", (error) => {
       onErrorCallback?.(error);
-      redis.quit();
+      this.redisPool.push(redis);
     });

     return { stream, redis };
   }

1-64: Improve code organization with better separation of concerns.

The RunQueue class has grown large and handles multiple responsibilities. Consider splitting it into smaller, focused classes:

  1. Extract Redis Lua scripts into a separate file
  2. Create separate classes for different concerns:
    • Message handling
    • Concurrency management
    • Dead letter queue operations

Here's an example of how to improve the organization:

// redis-scripts.ts
export const REDIS_SCRIPTS = {
  enqueueMessage: `...`,
  dequeueMessage: `...`,
  // ... other scripts
};

// message-handler.ts
export class MessageHandler {
  constructor(private redis: Redis, private logger: Logger) {}
  // ... message handling methods
}

// concurrency-manager.ts
export class ConcurrencyManager {
  constructor(private redis: Redis, private logger: Logger) {}
  // ... concurrency management methods
}

// run-queue.ts
export class RunQueue {
  private messageHandler: MessageHandler;
  private concurrencyManager: ConcurrencyManager;

  constructor(options: RunQueueOptions) {
    this.messageHandler = new MessageHandler(this.redis, this.logger);
    this.concurrencyManager = new ConcurrencyManager(this.redis, this.logger);
  }
  // ... delegate to appropriate handler
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 68211e1 and abbdb4f.

📒 Files selected for processing (19)
  • apps/webapp/app/env.server.ts (2 hunks)
  • apps/webapp/app/routes/api.v1.dev.presence.ts (1 hunks)
  • apps/webapp/app/v3/runEngine.server.ts (1 hunks)
  • apps/webapp/test/authorizationRateLimitMiddleware.test.ts (10 hunks)
  • apps/webapp/test/fairDequeuingStrategy.test.ts (7 hunks)
  • internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/cancelling.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/delays.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/dequeuing.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/heartbeats.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/notDeployed.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/priority.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/trigger.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/ttl.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/waitpoints.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/types.ts (1 hunks)
  • internal-packages/run-engine/src/run-queue/index.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • internal-packages/run-engine/src/engine/tests/delays.test.ts
  • apps/webapp/app/v3/runEngine.server.ts
  • internal-packages/run-engine/src/engine/types.ts
  • internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts
  • internal-packages/run-engine/src/engine/tests/ttl.test.ts
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (20)
apps/webapp/test/authorizationRateLimitMiddleware.test.ts (2)

25-27: LGTM! Parameter renaming is consistent.

The renaming of the redis parameter to redisOptions has been consistently applied across all test cases, improving clarity by better reflecting the parameter's purpose.

Also applies to: 56-58, 80-82, 108-110, 138-140, 188-190, 224-226, 268-270, 309-311, 356-359


1-416: Excellent test coverage!

The test suite provides comprehensive coverage of the rate limiting middleware, including:

  • Basic rate limiting functionality
  • Authorization validation
  • Path whitelisting
  • Multiple rate limit algorithms (token bucket, fixed/sliding windows)
  • Edge cases (token refill, near-zero tokens)
  • Caching behavior

This thorough testing approach helps ensure the reliability of the rate limiting implementation.

apps/webapp/test/fairDequeuingStrategy.test.ts (5)

18-19: LGTM! Redis instance creation is consistently updated.

The change from using a direct redis parameter to creating new Redis instances using redisOptions is consistently applied across all test cases.

Also applies to: 48-49, 84-85, 118-119, 175-176, 271-272, 430-431, 535-536, 666-667, 764-765


173-267: Performance testing approach is well-implemented.

The test case effectively validates the snapshot reuse optimization by:

  1. Measuring distribution durations
  2. Verifying that subsequent calls are significantly faster
  3. Confirming that the snapshot expires after the configured reuse count

269-425: Comprehensive fairness testing implementation.

The test case thoroughly validates fair distribution by:

  1. Testing across multiple organizations and environments
  2. Tracking distribution statistics
  3. Verifying standard deviations
  4. Ensuring reasonable fairness bounds

664-760: Well-structured age influence testing.

The test effectively validates the queueAgeRandomization parameter by:

  1. Testing different randomization levels (0.0, 0.5, 1.0)
  2. Verifying expected position distributions
  3. Ensuring proper age-based ordering when required

762-891: Thorough maximum org count validation.

The test comprehensively verifies the maximumOrgCount feature by:

  1. Testing with different org age profiles
  2. Verifying selection frequencies
  3. Validating age-based prioritization
internal-packages/run-engine/src/engine/tests/trigger.test.ts (3)

1-11: LGTM! Well-organized imports and dependencies.

The imports are logically grouped and include all necessary testing utilities and types.


428-431: Consider testing different retry scenarios.

The retry test only covers immediate retry (delay: 0). Consider adding test cases for:

  • Delayed retry
  • Maximum retry attempts
  • Different retry intervals

This would ensure the retry mechanism is thoroughly tested.


202-204: LGTM! Proper cleanup in finally blocks.

The tests properly clean up resources by calling engine.quit() in finally blocks, ensuring resources are released even if tests fail.

Also applies to: 329-331, 485-487

internal-packages/run-engine/src/engine/tests/priority.test.ts (1)

1-146: LGTM! Well-structured test suite for priority handling.

The test suite effectively validates the priority ordering mechanism:

  • Comprehensive test cases with mixed priorities
  • Clear assertions for queue length and dequeuing order
  • Proper cleanup with engine.quit()
internal-packages/run-engine/src/engine/tests/cancelling.test.ts (1)

1-337: LGTM! Comprehensive test suite for run cancellation.

The test suite effectively validates the cancellation mechanism:

  • Tests both executing and non-executing run cancellation
  • Verifies child run cancellation propagation
  • Validates event emissions and concurrency release
  • Includes proper cleanup
internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts (1)

1-456: LGTM! Well-structured test suite for triggerAndWait functionality.

The test suite effectively validates the triggerAndWait mechanism:

  • Comprehensive test cases for basic and shared child scenarios
  • Clear assertions for execution status and waitpoint completion
  • Proper cleanup with engine.quit()
apps/webapp/app/env.server.ts (1)

375-519: LGTM! Well-structured environment configuration for Run Engine 2.0.

The configuration schema effectively manages the Run Engine 2.0 settings:

  • Comprehensive worker configuration
  • Clear timeout definitions
  • Proper Redis configuration with fallbacks
  • Development-specific parameters
internal-packages/run-engine/src/engine/tests/waitpoints.test.ts (2)

122-257: Test coverage looks solid.
The "Waitpoints cleared if attempt fails" scenario comprehensively verifies that failing a run attempt removes all blocking waitpoints. This ensures correct cleanup and error handling. Nicely done.


259-398: Clear and helpful test for manual waitpoint lifecycle.
The "Create, block, and complete a Manual waitpoint" flow is well-structured and confirms that the engine correctly transitions from "EXECUTING_WITH_WAITPOINTS" back to "EXECUTING". Good clarity and coverage here.

internal-packages/run-engine/src/engine/tests/notDeployed.test.ts (1)

12-151: Well-designed "not deployed" test flow.
This test verifies that runs remain in a "waiting for deploy" status until the worker becomes available. The sequence of checks ensures that no unintended dequeues occur prematurely. Great job coordinating environment setup, run triggers, and final confirmations.

internal-packages/run-engine/src/engine/tests/dequeuing.test.ts (3)

15-79: Dequeue logic is validated effectively.
The "Dequeues 5 runs" test reliably checks correct run counts. This helps ensure your queue mechanics work as expected. Everything here appears solid.


80-171: Machine constraint checks align well with resource-limited scenarios.
The "Dequeues runs within machine constraints" test ensures the engine respects CPU and memory limits. Good approach to verifying partial dequeues (line 147) and validating queue lengths after each dequeue. This fosters confident and safe production scaling.


174-212: Helper function is well-structured.
triggerRuns (line 174) neatly abstracts repeating logic, improving test readability and maintainability. Keeping this logic separate reduces duplication across tests.

Comment on lines +160 to +177
//the batch won't complete immediately
const batchAfter1 = await prisma.batchTaskRun.findUnique({
where: {
id: batch.id,
},
});
expect(batchAfter1?.status).toBe("PENDING");

await setTimeout(3_000);

//the batch should complete
const batchAfter2 = await prisma.batchTaskRun.findUnique({
where: {
id: batch.id,
},
});
expect(batchAfter2?.status).toBe("COMPLETED");
} finally {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve batch status verification.

Consider the following improvements:

  1. Replace setTimeout with a polling mechanism
  2. Add intermediate status checks
async function waitForBatchStatus(prisma: PrismaClient, batchId: string, status: string, timeout = 5000) {
  const startTime = Date.now();
  while (Date.now() - startTime < timeout) {
    const batch = await prisma.batchTaskRun.findUnique({
      where: { id: batchId },
    });
    if (batch?.status === status) {
      return batch;
    }
    await new Promise(resolve => setTimeout(resolve, 100));
  }
  throw new Error(`Batch status did not transition to ${status} within ${timeout}ms`);
}

Then use it in the test:

-await setTimeout(3_000);
+const completedBatch = await waitForBatchStatus(prisma, batch.id, "COMPLETED");
-//the batch should complete
-const batchAfter2 = await prisma.batchTaskRun.findUnique({
-  where: {
-    id: batch.id,
-  },
-});
-expect(batchAfter2?.status).toBe("COMPLETED");
+expect(completedBatch.status).toBe("COMPLETED");
+expect(completedBatch.completedRuns).toBe(2);

Comment on lines +61 to +65
iterator: async ({ send, date }) => {
await redis.setex(presenceKey, env.DEV_PRESENCE_TTL_MS / 1000, date.toISOString());

send({ event: "time", data: new Date().toISOString() });
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling and consistent timestamp handling.

The iterator and cleanup functions need improved error handling and consistent timestamp handling.

Apply this diff to enhance error handling and timestamp consistency:

     iterator: async ({ send, date }) => {
+      try {
+        const timestamp = date.toISOString();
-        await redis.setex(presenceKey, env.DEV_PRESENCE_TTL_MS / 1000, date.toISOString());
+        await redis.set(
+          presenceKey,
+          JSON.stringify({
+            lastSeen: timestamp,
+            connectionId: id
+          }),
+          'EX',
+          Math.floor(env.DEV_PRESENCE_TTL_MS / 1000)
+        );

-        send({ event: "time", data: new Date().toISOString() });
+        send({ event: "time", data: timestamp });
+      } catch (error) {
+        logger.error("Error in presence iterator", { error, presenceKey });
+        controller.abort();
+      }
     },
     cleanup: async () => {
+      try {
+        const timestamp = new Date().toISOString();
         await redis.del(presenceKey);

         // Publish disconnect event
         await redis.publish(
           presenceChannel,
           JSON.stringify({
             type: "disconnected",
             environmentId,
-            timestamp: Date.now(),
+            timestamp,
           })
         );
+      } catch (error) {
+        logger.error("Error in presence cleanup", { error, presenceKey });
+      }
     },

Also applies to: 67-78

Comment on lines +9 to +16
const redis = new Redis({
port: env.RUN_ENGINE_DEV_PRESENCE_REDIS_PORT ?? undefined,
host: env.RUN_ENGINE_DEV_PRESENCE_REDIS_HOST ?? undefined,
username: env.RUN_ENGINE_DEV_PRESENCE_REDIS_USERNAME ?? undefined,
password: env.RUN_ENGINE_DEV_PRESENCE_REDIS_PASSWORD ?? undefined,
enableAutoPipelining: true,
...(env.RUN_ENGINE_DEV_PRESENCE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add Redis connection error handling and enforce TLS.

The Redis configuration has several areas for improvement:

  1. Add error handling for Redis connection failures.
  2. Consider implementing connection management (reconnection strategy, health checks).
  3. TLS should be enforced in production environments for security.

Apply this diff to improve the Redis configuration:

+const createRedisClient = () => {
+  const client = new Redis({
     port: env.RUN_ENGINE_DEV_PRESENCE_REDIS_PORT ?? undefined,
     host: env.RUN_ENGINE_DEV_PRESENCE_REDIS_HOST ?? undefined,
     username: env.RUN_ENGINE_DEV_PRESENCE_REDIS_USERNAME ?? undefined,
     password: env.RUN_ENGINE_DEV_PRESENCE_REDIS_PASSWORD ?? undefined,
     enableAutoPipelining: true,
-    ...(env.RUN_ENGINE_DEV_PRESENCE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
+    ...(process.env.NODE_ENV === "production" ? { tls: {} } : {}),
+    retryStrategy: (times) => Math.min(times * 50, 2000),
   });

+  client.on("error", (err) => {
+    logger.error("Redis connection error", { error: err });
+  });

+  return client;
+};

-const redis = new Redis({...});
+const redis = createRedisClient();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const redis = new Redis({
port: env.RUN_ENGINE_DEV_PRESENCE_REDIS_PORT ?? undefined,
host: env.RUN_ENGINE_DEV_PRESENCE_REDIS_HOST ?? undefined,
username: env.RUN_ENGINE_DEV_PRESENCE_REDIS_USERNAME ?? undefined,
password: env.RUN_ENGINE_DEV_PRESENCE_REDIS_PASSWORD ?? undefined,
enableAutoPipelining: true,
...(env.RUN_ENGINE_DEV_PRESENCE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
});
const createRedisClient = () => {
const client = new Redis({
port: env.RUN_ENGINE_DEV_PRESENCE_REDIS_PORT ?? undefined,
host: env.RUN_ENGINE_DEV_PRESENCE_REDIS_HOST ?? undefined,
username: env.RUN_ENGINE_DEV_PRESENCE_REDIS_USERNAME ?? undefined,
password: env.RUN_ENGINE_DEV_PRESENCE_REDIS_PASSWORD ?? undefined,
enableAutoPipelining: true,
...(process.env.NODE_ENV === "production" ? { tls: {} } : {}),
retryStrategy: (times) => Math.min(times * 50, 2000),
});
client.on("error", (err) => {
logger.error("Redis connection error", { error: err });
});
return client;
};
const redis = createRedisClient();

Comment on lines +18 to +33
export const loader = createSSELoader({
timeout: env.DEV_PRESENCE_TTL_MS,
interval: env.DEV_PRESENCE_POLL_INTERVAL_MS,
debug: true,
handler: async ({ id, controller, debug, request }) => {
const authentication = await authenticateApiRequestWithFailure(request);

if (!authentication.ok) {
throw json({ error: "Invalid or Missing API key" }, { status: 401 });
}

const environmentId = authentication.environment.id;

const presenceKey = DevPresenceStream.getPresenceKey(environmentId);
const presenceChannel = DevPresenceStream.getPresenceChannel(environmentId);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance environment variable validation and error handling.

The SSE loader configuration could benefit from improved validation and error handling:

  1. Add validation for timeout and interval values.
  2. Improve authentication error message specificity.

Apply this diff to enhance the configuration:

+const validateConfig = () => {
+  const timeout = Number(env.DEV_PRESENCE_TTL_MS);
+  const interval = Number(env.DEV_PRESENCE_POLL_INTERVAL_MS);
+  
+  if (isNaN(timeout) || timeout <= 0) {
+    throw new Error("Invalid DEV_PRESENCE_TTL_MS value");
+  }
+  if (isNaN(interval) || interval <= 0) {
+    throw new Error("Invalid DEV_PRESENCE_POLL_INTERVAL_MS value");
+  }
+  if (interval >= timeout) {
+    throw new Error("Poll interval must be less than TTL");
+  }
+  return { timeout, interval };
+};

+const { timeout, interval } = validateConfig();

 export const loader = createSSELoader({
-  timeout: env.DEV_PRESENCE_TTL_MS,
-  interval: env.DEV_PRESENCE_POLL_INTERVAL_MS,
+  timeout,
+  interval,
   debug: true,
   handler: async ({ id, controller, debug, request }) => {
     const authentication = await authenticateApiRequestWithFailure(request);

     if (!authentication.ok) {
-      throw json({ error: "Invalid or Missing API key" }, { status: 401 });
+      throw json({ 
+        error: "Unauthorized", 
+        message: authentication.error ?? "Invalid or missing API key",
+        code: "INVALID_API_KEY"
+      }, { status: 401 });
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export const loader = createSSELoader({
timeout: env.DEV_PRESENCE_TTL_MS,
interval: env.DEV_PRESENCE_POLL_INTERVAL_MS,
debug: true,
handler: async ({ id, controller, debug, request }) => {
const authentication = await authenticateApiRequestWithFailure(request);
if (!authentication.ok) {
throw json({ error: "Invalid or Missing API key" }, { status: 401 });
}
const environmentId = authentication.environment.id;
const presenceKey = DevPresenceStream.getPresenceKey(environmentId);
const presenceChannel = DevPresenceStream.getPresenceChannel(environmentId);
const validateConfig = () => {
const timeout = Number(env.DEV_PRESENCE_TTL_MS);
const interval = Number(env.DEV_PRESENCE_POLL_INTERVAL_MS);
if (isNaN(timeout) || timeout <= 0) {
throw new Error("Invalid DEV_PRESENCE_TTL_MS value");
}
if (isNaN(interval) || interval <= 0) {
throw new Error("Invalid DEV_PRESENCE_POLL_INTERVAL_MS value");
}
if (interval >= timeout) {
throw new Error("Poll interval must be less than TTL");
}
return { timeout, interval };
};
const { timeout, interval } = validateConfig();
export const loader = createSSELoader({
timeout,
interval,
debug: true,
handler: async ({ id, controller, debug, request }) => {
const authentication = await authenticateApiRequestWithFailure(request);
if (!authentication.ok) {
throw json({
error: "Unauthorized",
message: authentication.error ?? "Invalid or missing API key",
code: "INVALID_API_KEY"
}, { status: 401 });
}
const environmentId = authentication.environment.id;
const presenceKey = DevPresenceStream.getPresenceKey(environmentId);
const presenceChannel = DevPresenceStream.getPresenceChannel(environmentId);
}
});

Comment on lines +399 to +516
const maxAttempts = this.retryOptions.maxAttempts ?? defaultRetrySettings.maxAttempts;

const message = await this.#readMessage(orgId, messageId);
if (!message) {
this.logger.log(`[${this.name}].nackMessage() message not found`, {
orgId,
messageId,
maxAttempts,
retryAt,
service: this.name,
});
return;
}

span.setAttributes({
[SemanticAttributes.QUEUE]: message.queue,
[SemanticAttributes.RUN_ID]: messageId,
[SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey,
[SemanticAttributes.MASTER_QUEUES]: message.masterQueues.join(","),
});

const messageKey = this.keys.messageKey(orgId, messageId);
const messageQueue = message.queue;
const concurrencyKey = this.keys.currentConcurrencyKeyFromQueue(message.queue);
const envConcurrencyKey = this.keys.envCurrentConcurrencyKeyFromQueue(message.queue);
const taskConcurrencyKey = this.keys.taskIdentifierCurrentConcurrencyKeyFromQueue(
message.queue,
message.taskIdentifier
);
const projectConcurrencyKey = this.keys.projectCurrentConcurrencyKeyFromQueue(
message.queue
);
const envQueueKey = this.keys.envQueueKeyFromQueue(message.queue);

if (incrementAttemptCount) {
message.attempt = message.attempt + 1;
if (message.attempt >= maxAttempts) {
await this.redis.moveToDeadLetterQueue(
messageKey,
messageQueue,
concurrencyKey,
envConcurrencyKey,
projectConcurrencyKey,
envQueueKey,
taskConcurrencyKey,
"dlq",
messageId,
JSON.stringify(message.masterQueues),
this.options.redis.keyPrefix ?? ""
);
return false;
}
}

const nextRetryDelay = calculateNextRetryDelay(this.retryOptions, message.attempt);
const messageScore = retryAt ?? (nextRetryDelay ? Date.now() + nextRetryDelay : Date.now());

this.logger.debug("Calling nackMessage", {
messageKey,
messageQueue,
masterQueues: message.masterQueues,
concurrencyKey,
envConcurrencyKey,
projectConcurrencyKey,
envQueueKey,
taskConcurrencyKey,
messageId,
messageScore,
attempt: message.attempt,
service: this.name,
});

await this.redis.nackMessage(
//keys
messageKey,
messageQueue,
concurrencyKey,
envConcurrencyKey,
projectConcurrencyKey,
envQueueKey,
taskConcurrencyKey,
//args
messageId,
JSON.stringify(message),
String(messageScore),
JSON.stringify(message.masterQueues),
this.options.redis.keyPrefix ?? ""
);
return true;
},
{
kind: SpanKind.CONSUMER,
attributes: {
[SEMATTRS_MESSAGING_OPERATION]: "nack",
[SEMATTRS_MESSAGE_ID]: messageId,
[SEMATTRS_MESSAGING_SYSTEM]: "runqueue",
},
}
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling in nackMessage.

The nackMessage implementation could be improved to handle edge cases more robustly:

  1. The error handling for missing messages could be more informative
  2. The retry delay calculation could benefit from jitter
  3. The dead letter queue operation could use additional error handling

Here's how to improve the implementation:

 public async nackMessage({
   orgId,
   messageId,
   retryAt,
   incrementAttemptCount = true,
 }: {
   orgId: string;
   messageId: string;
   retryAt?: number;
   incrementAttemptCount?: boolean;
 }) {
   return this.#trace(
     "nackMessage",
     async (span) => {
       const maxAttempts = this.retryOptions.maxAttempts ?? defaultRetrySettings.maxAttempts;

       const message = await this.#readMessage(orgId, messageId);
       if (!message) {
-        this.logger.log(`[${this.name}].nackMessage() message not found`, {
+        this.logger.error(`[${this.name}].nackMessage() message not found`, {
           orgId,
           messageId,
           maxAttempts,
           retryAt,
           service: this.name,
+          stack: new Error().stack,
         });
-        return;
+        return false;
       }

+      try {
         span.setAttributes({
           [SemanticAttributes.QUEUE]: message.queue,
           [SemanticAttributes.RUN_ID]: messageId,
           [SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey,
           [SemanticAttributes.MASTER_QUEUES]: message.masterQueues.join(","),
         });

         // ... existing code ...

         const nextRetryDelay = calculateNextRetryDelay(this.retryOptions, message.attempt);
-        const messageScore = retryAt ?? (nextRetryDelay ? Date.now() + nextRetryDelay : Date.now());
+        // Add jitter to prevent thundering herd
+        const jitter = Math.floor(Math.random() * 1000);
+        const messageScore = retryAt ?? (nextRetryDelay ? Date.now() + nextRetryDelay + jitter : Date.now());

         // ... existing code ...

         await this.redis.nackMessage(
           // ... existing arguments ...
         );
         return true;
+      } catch (error) {
+        this.logger.error(`[${this.name}].nackMessage() failed`, {
+          orgId,
+          messageId,
+          error,
+          service: this.name,
+        });
+        throw error;
+      }
     },
     // ... existing options ...
   );
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/**
* Negative acknowledge a message, which will requeue the message (with an optional future date).
If you pass no date it will get reattempted with exponential backoff.
*/
public async nackMessage({
orgId,
messageId,
retryAt,
incrementAttemptCount = true,
}: {
orgId: string;
messageId: string;
retryAt?: number;
incrementAttemptCount?: boolean;
}) {
return this.#trace(
"nackMessage",
async (span) => {
const maxAttempts = this.retryOptions.maxAttempts ?? defaultRetrySettings.maxAttempts;
const message = await this.#readMessage(orgId, messageId);
if (!message) {
this.logger.log(`[${this.name}].nackMessage() message not found`, {
orgId,
messageId,
maxAttempts,
retryAt,
service: this.name,
});
return;
}
span.setAttributes({
[SemanticAttributes.QUEUE]: message.queue,
[SemanticAttributes.RUN_ID]: messageId,
[SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey,
[SemanticAttributes.MASTER_QUEUES]: message.masterQueues.join(","),
});
const messageKey = this.keys.messageKey(orgId, messageId);
const messageQueue = message.queue;
const concurrencyKey = this.keys.currentConcurrencyKeyFromQueue(message.queue);
const envConcurrencyKey = this.keys.envCurrentConcurrencyKeyFromQueue(message.queue);
const taskConcurrencyKey = this.keys.taskIdentifierCurrentConcurrencyKeyFromQueue(
message.queue,
message.taskIdentifier
);
const projectConcurrencyKey = this.keys.projectCurrentConcurrencyKeyFromQueue(
message.queue
);
const envQueueKey = this.keys.envQueueKeyFromQueue(message.queue);
if (incrementAttemptCount) {
message.attempt = message.attempt + 1;
if (message.attempt >= maxAttempts) {
await this.redis.moveToDeadLetterQueue(
messageKey,
messageQueue,
concurrencyKey,
envConcurrencyKey,
projectConcurrencyKey,
envQueueKey,
taskConcurrencyKey,
"dlq",
messageId,
JSON.stringify(message.masterQueues),
this.options.redis.keyPrefix ?? ""
);
return false;
}
}
const nextRetryDelay = calculateNextRetryDelay(this.retryOptions, message.attempt);
const messageScore = retryAt ?? (nextRetryDelay ? Date.now() + nextRetryDelay : Date.now());
this.logger.debug("Calling nackMessage", {
messageKey,
messageQueue,
masterQueues: message.masterQueues,
concurrencyKey,
envConcurrencyKey,
projectConcurrencyKey,
envQueueKey,
taskConcurrencyKey,
messageId,
messageScore,
attempt: message.attempt,
service: this.name,
});
await this.redis.nackMessage(
//keys
messageKey,
messageQueue,
concurrencyKey,
envConcurrencyKey,
projectConcurrencyKey,
envQueueKey,
taskConcurrencyKey,
//args
messageId,
JSON.stringify(message),
String(messageScore),
JSON.stringify(message.masterQueues),
this.options.redis.keyPrefix ?? ""
);
return true;
},
{
kind: SpanKind.CONSUMER,
attributes: {
[SEMATTRS_MESSAGING_OPERATION]: "nack",
[SEMATTRS_MESSAGE_ID]: messageId,
[SEMATTRS_MESSAGING_SYSTEM]: "runqueue",
},
}
);
}
/**
* Negative acknowledge a message, which will requeue the message (with an optional future date).
If you pass no date it will get reattempted with exponential backoff.
*/
public async nackMessage({
orgId,
messageId,
retryAt,
incrementAttemptCount = true,
}: {
orgId: string;
messageId: string;
retryAt?: number;
incrementAttemptCount?: boolean;
}) {
return this.#trace(
"nackMessage",
async (span) => {
const maxAttempts = this.retryOptions.maxAttempts ?? defaultRetrySettings.maxAttempts;
const message = await this.#readMessage(orgId, messageId);
if (!message) {
this.logger.error(`[${this.name}].nackMessage() message not found`, {
orgId,
messageId,
maxAttempts,
retryAt,
service: this.name,
stack: new Error().stack,
});
return false;
}
try {
span.setAttributes({
[SemanticAttributes.QUEUE]: message.queue,
[SemanticAttributes.RUN_ID]: messageId,
[SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey,
[SemanticAttributes.MASTER_QUEUES]: message.masterQueues.join(","),
});
const messageKey = this.keys.messageKey(orgId, messageId);
const messageQueue = message.queue;
const concurrencyKey = this.keys.currentConcurrencyKeyFromQueue(message.queue);
const envConcurrencyKey = this.keys.envCurrentConcurrencyKeyFromQueue(message.queue);
const taskConcurrencyKey = this.keys.taskIdentifierCurrentConcurrencyKeyFromQueue(
message.queue,
message.taskIdentifier
);
const projectConcurrencyKey = this.keys.projectCurrentConcurrencyKeyFromQueue(message.queue);
const envQueueKey = this.keys.envQueueKeyFromQueue(message.queue);
if (incrementAttemptCount) {
message.attempt = message.attempt + 1;
if (message.attempt >= maxAttempts) {
await this.redis.moveToDeadLetterQueue(
messageKey,
messageQueue,
concurrencyKey,
envConcurrencyKey,
projectConcurrencyKey,
envQueueKey,
taskConcurrencyKey,
"dlq",
messageId,
JSON.stringify(message.masterQueues),
this.options.redis.keyPrefix ?? ""
);
return false;
}
}
const nextRetryDelay = calculateNextRetryDelay(this.retryOptions, message.attempt);
// Add jitter to prevent thundering herd
const jitter = Math.floor(Math.random() * 1000);
const messageScore = retryAt ?? (nextRetryDelay ? Date.now() + nextRetryDelay + jitter : Date.now());
this.logger.debug("Calling nackMessage", {
messageKey,
messageQueue,
masterQueues: message.masterQueues,
concurrencyKey,
envConcurrencyKey,
projectConcurrencyKey,
envQueueKey,
taskConcurrencyKey,
messageId,
messageScore,
attempt: message.attempt,
service: this.name,
});
await this.redis.nackMessage(
//keys
messageKey,
messageQueue,
concurrencyKey,
envConcurrencyKey,
projectConcurrencyKey,
envQueueKey,
taskConcurrencyKey,
//args
messageId,
JSON.stringify(message),
String(messageScore),
JSON.stringify(message.masterQueues),
this.options.redis.keyPrefix ?? ""
);
return true;
} catch (error) {
this.logger.error(`[${this.name}].nackMessage() failed`, {
orgId,
messageId,
error,
service: this.name,
});
throw error;
}
},
{
kind: SpanKind.CONSUMER,
attributes: {
[SEMATTRS_MESSAGING_OPERATION]: "nack",
[SEMATTRS_MESSAGE_ID]: messageId,
[SEMATTRS_MESSAGING_SYSTEM]: "runqueue",
},
}
);
}

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
internal-packages/database/package.json (1)

7-9: Confirm TypeScript Dependency Removal.
The removal of the "typescript": "^4.8.4" dependency in the dependencies block is in line with the project’s broader objective to simplify package-specific dependencies. Please ensure that a centralized TypeScript installation (or equivalent dev dependency in the monorepo) is available so that tooling such as the tsc --noEmit command in the scripts remains operational.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 720f461 and fde9918.

📒 Files selected for processing (4)
  • internal-packages/database/package.json (2 hunks)
  • internal-packages/database/prisma/migrations/20250219140441_waitpoint_added_idempotency_key_expires_at/migration.sql (1 hunks)
  • internal-packages/database/prisma/schema.prisma (19 hunks)
  • internal-packages/run-engine/src/engine/tests/waitpoints.test.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal-packages/run-engine/src/engine/tests/waitpoints.test.ts
🔇 Additional comments (13)
internal-packages/database/package.json (1)

21-22: Review Typecheck Script Usage.
Even though the TypeScript dependency has been removed from this package, the "typecheck": "tsc --noEmit" script is still present. Confirm that your development environment or monorepo configuration provides the necessary tsc executable to avoid build or CI issues.

internal-packages/database/prisma/migrations/20250219140441_waitpoint_added_idempotency_key_expires_at/migration.sql (1)

1-3: Database Migration Update: Add idempotencyKeyExpiresAt Column
This migration script correctly adds the new column "idempotencyKeyExpiresAt" with TIMESTAMP(3) precision to the "Waitpoint" table. The field is nullable, which should preserve backward compatibility. Please ensure that any related application logic (or migration rollback procedures) properly handles the case when this field is null.

internal-packages/database/prisma/schema.prisma (11)

151-152: Organization Model Enhancement: Worker Group Associations
New fields workerGroups and workerInstances have been added to the Organization model to establish associations with WorkerInstanceGroup and WorkerInstance, respectively. This enhancement supports better organization-level grouping of worker resources.


454-463: Project Model Update: Run Engine and Worker Associations
In the Project model, the new fields—version (defaulting to V2) and engine (defaulting to V1)—along with associations for workerGroups, workers, and the optional defaultWorkerGroup (with its corresponding foreign-key defaultWorkerGroupId) have been introduced. These changes integrate the new run engine architecture into projects and enable controller grouping of worker instances. Please review that the default settings and relationships align with your business logic.


1922-1927: New Enum: RunEngineVersion
The addition of the RunEngineVersion enum—with values V1 and V2 (accompanied by descriptive comments)—provides a clear mechanism to distinguish between the original and new engine behaviors. Verify that all models referencing this enum correctly use the desired default values.


1578-1579: BackgroundWorker Model Update: Engine Field and Worker Group Linkage
The BackgroundWorker model now includes an engine field (defaulting to V1) and a new association to the WorkerInstanceGroup via the workerGroup and its corresponding workerGroupId field. These adjustments support the evolving execution engine and resource grouping. Ensure that any logic which instantiates or schedules background workers is updated accordingly.

Also applies to: 1603-1605


1715-1718: TaskRun Model: Queue Information Fields
New fields have been introduced in the TaskRun model to support enhanced queue management:
masterQueue with a default value of "main"
secondaryMasterQueue as an optional field
attemptNumber (now defined as an optional field for engine v2+ runs)
These changes aim to better segment and track queued task runs. Please verify that these fields are correctly wired into the dispatch and execution logic.


1755-1758: TaskRun Model: Priority Field Addition
The addition of the priorityMs field (default set to 0) permits a negative offset—in milliseconds—for the queue timestamp. This allows custom prioritization of runs. Ensure that any scheduling calculations incorporate this offset correctly and that edge cases (e.g., very high negative offsets) are appropriately managed.


1770-1775: TaskRun Model: Waitpoint Integration
Two new fields have been added to help manage waitpoints:
associatedWaitpoint – links a run to its corresponding waitpoint so that when the run completes, the waitpoint can be marked as complete.
blockedByWaitpoints – an array capturing any waitpoints currently blocking the run’s execution.
Confirm that the business logic checks these associations before starting run execution, ensuring a proper gatekeeping mechanism.


1934-1988: TaskRunExecutionSnapshot: New Snapshot Model for Engine v2
This new model records snapshots of task run execution, capturing details such as the engine version (defaulting to V2), execution status, checkpoint linkage, and waitpoint order. Its robust associations with TaskRun, BatchTaskRun, and the environment provide enhanced tracking of execution state. Please ensure that integration tests cover these new fields to verify that snapshots are created, updated, and queried as expected.


2055-2065: Waitpoint Model: Idempotency Key Expiration Mechanism
A new field, idempotencyKeyExpiresAt, has been added to the Waitpoint model. In addition, the supplementation via inactiveIdempotencyKey can manage key deactivation. This will be key in handling debounce logic or cancelling child runs. Please double-check that application logic which interacts with waitpoints correctly inspects these fields.


2539-2547: BatchTaskRun Model: Engine v2 Enhancements
The BatchTaskRun model now incorporates new fields aimed at supporting engine v2 enhancements:
executionSnapshots – links batch-related snapshots
runsBlocked – specifies run blockers for the batch
waitpoints – tracks waitpoints that block associated runs
These additions provide granular control over batch execution and state. Validate that batch-processing workflows are updated to utilize these fields effectively.


2719-2723: WorkerDeployment Update: Deployment Type Field
A new enum WorkerDeploymentType (with values MANAGED, UNMANAGED, and V1) has been introduced, and the WorkerDeployment model now includes a type field (defaulting to V1) to support multiple deployment strategies. This change should enable flexible handling of deployments across new and legacy workflows. Please verify downstream consumers of deployment information are aware of this new field.

Also applies to: 2738-2739

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fde9918 and b236a73.

📒 Files selected for processing (1)
  • apps/webapp/app/routes/api.v1.waitpoints.ts (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (3)
apps/webapp/app/routes/api.v1.waitpoints.ts (3)

1-7: LGTM! Well-organized imports.

The imports are properly structured and follow best practices, with clear separation between external packages and internal utilities.


8-14: LGTM! Secure route configuration.

The route configuration implements security best practices:

  • Request body validation using schema
  • Content length limit (10KB)
  • HTTP method restriction

29-39: LGTM! Well-structured response.

The response handling follows REST API best practices and is properly typed.

Comment on lines 21 to 28
const waitpoint = await engine.createManualWaitpoint({
environmentId: authentication.environment.id,
projectId: authentication.environment.projectId,
idempotencyKey: body.idempotencyKey,
idempotencyKeyExpiresAt,
timeout,
});

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for waitpoint creation.

The waitpoint creation needs error handling to gracefully handle potential failures:

  1. Validate authentication context
  2. Handle engine errors

Consider applying this improvement:

+    if (!authentication.environment?.id || !authentication.environment?.projectId) {
+      return json(
+        { error: "Invalid authentication context" },
+        { status: 401 }
+      );
+    }
+
+    try {
       const waitpoint = await engine.createManualWaitpoint({
         environmentId: authentication.environment.id,
         projectId: authentication.environment.projectId,
         idempotencyKey: body.idempotencyKey,
         idempotencyKeyExpiresAt,
         timeout,
       });
+    } catch (error) {
+      console.error("Failed to create waitpoint:", error);
+      return json(
+        { error: "Failed to create waitpoint" },
+        { status: 500 }
+      );
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const waitpoint = await engine.createManualWaitpoint({
environmentId: authentication.environment.id,
projectId: authentication.environment.projectId,
idempotencyKey: body.idempotencyKey,
idempotencyKeyExpiresAt,
timeout,
});
if (!authentication.environment?.id || !authentication.environment?.projectId) {
return json(
{ error: "Invalid authentication context" },
{ status: 401 }
);
}
try {
const waitpoint = await engine.createManualWaitpoint({
environmentId: authentication.environment.id,
projectId: authentication.environment.projectId,
idempotencyKey: body.idempotencyKey,
idempotencyKeyExpiresAt,
timeout,
});
} catch (error) {
console.error("Failed to create waitpoint:", error);
return json(
{ error: "Failed to create waitpoint" },
{ status: 500 }
);
}

Comment on lines 15 to 20
const idempotencyKeyExpiresAt = body.idempotencyKeyTTL
? resolveIdempotencyKeyTTL(body.idempotencyKeyTTL)
: undefined;

const timeout = await parseDelay(body.timeout);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for request processing.

The request processing could be more robust with proper error handling:

  1. Handle potential errors from parseDelay
  2. Validate the resolved idempotencyKeyTTL value

Consider applying this improvement:

-    const timeout = await parseDelay(body.timeout);
+    let timeout;
+    try {
+      timeout = await parseDelay(body.timeout);
+    } catch (error) {
+      return json(
+        { error: "Invalid timeout format" },
+        { status: 400 }
+      );
+    }
+
+    if (idempotencyKeyExpiresAt && idempotencyKeyExpiresAt < new Date()) {
+      return json(
+        { error: "Invalid idempotencyKeyTTL: expiration time must be in the future" },
+        { status: 400 }
+      );
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const idempotencyKeyExpiresAt = body.idempotencyKeyTTL
? resolveIdempotencyKeyTTL(body.idempotencyKeyTTL)
: undefined;
const timeout = await parseDelay(body.timeout);
const idempotencyKeyExpiresAt = body.idempotencyKeyTTL
? resolveIdempotencyKeyTTL(body.idempotencyKeyTTL)
: undefined;
let timeout;
try {
timeout = await parseDelay(body.timeout);
} catch (error) {
return json(
{ error: "Invalid timeout format" },
{ status: 400 }
);
}
if (idempotencyKeyExpiresAt && idempotencyKeyExpiresAt < new Date()) {
return json(
{ error: "Invalid idempotencyKeyTTL: expiration time must be in the future" },
{ status: 400 }
);
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants