Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clp-package): Add support for uploading extracted streams to S3. #662

Merged
merged 28 commits into from
Jan 17, 2025

Conversation

haiqi96
Copy link
Contributor

@haiqi96 haiqi96 commented Jan 13, 2025

Description

This PR adds support for uploading JSON chunks and IR streams to S3.

The proposed flow is as follow:

  1. While extraction, clo and clp-s binaries writes streams to the disk, and prints a list of extracted streams to stdout.
  2. After the binary execution finishes, the stdout is decoded and processed by extraction_worker (in python)
  3. Extraction_worker uploads each streams to S3. Only if all streams are upload will extraction_worker return success.

A limitation of the current implementation:

  1. The mongodb is directly updated by the clo and clp-s binaries. Even if S3 upload fails, webui will still see streams as available. The plan is to move the mongodb update from binaries into extraction_worker script.
  2. Stream viewing is not yet supported in webui.

The PR includes the following changes:

  1. Updated stream_output config to accept either a FsStorage or S3Storage config.
  2. Updated clp-s and clo to print out the streams stats
  3. Updated the extraction worker to upload s3 stream.

Validation performed

Trigger a stream extraction job from webui. verified that streams are properly written into S3.

Also validated different configuration input and confirmed that type of storage and value of directory are expected:
Case:

  1. Default config:
    • Type=FS, directory = var/data/[streams|archives]
  2. Explicitly specifying FsStorage config, without specifying directory
    • Type=FS, directory = var/data/[streams|archives]
  3. Explicitly specifying FsStorage config and directory
    • Type=FS, directory = [User specified value]
  4. Explicitly specifying S3Storage config, without specifying staging_directory
    • Type=S3, staging_directory = var/data/[staged_streams|staged_archives]
  5. Explicitly specifying S3Storage config and directory
    • Type=S3, staging_directory = [User specified value]

Summary by CodeRabbit

Release Notes

  • New Features

    • Added command-line options to print stream statistics during extraction.
    • Enhanced JSON output capabilities for stream processing.
    • Improved S3 storage integration for stream extraction tasks.
    • Introduced new classes for advanced storage configurations.
  • Improvements

    • Refined configuration management for storage directories.
    • Updated command-line argument parsing for more flexible options.
    • Improved error handling and configuration validation.
  • Technical Enhancements

    • Refactored storage configuration classes.
    • Updated method signatures for better encapsulation.
    • Added support for dynamic directory path management.

Copy link
Contributor

coderabbitai bot commented Jan 13, 2025

Warning

Rate limit exceeded

@haiqi96 has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 5 minutes and 7 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 05bf3f6 and 04b94ba.

📒 Files selected for processing (4)
  • components/clp-package-utils/clp_package_utils/scripts/start_clp.py (4 hunks)
  • components/clp-py-utils/clp_py_utils/clp_config.py (11 hunks)
  • components/core/src/clp_s/CommandLineArguments.cpp (2 hunks)
  • components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (7 hunks)

Walkthrough

This pull request introduces a series of changes across multiple components, primarily focusing on the management of stream output configurations and command-line argument parsing. Key modifications include the refactoring of how the stream_output attribute is accessed and managed, the addition of new command-line options for printing stream statistics, and enhancements to S3 storage integration for task execution. The changes span Python utility classes, C++ command-line argument handling, and task orchestration utilities.

Changes

File Change Summary
clp-package-utils/clp_package_utils/general.py Updated stream_output handling in configuration generation functions, using set_directory and get_directory methods.
clp-package-utils/clp_package_utils/scripts/start_clp.py Modified directory access method for stream_output from direct attribute to method call.
clp-py-utils/clp_py_utils/clp_config.py Added new storage classes, introduced default data directory path, and refactored ArchiveOutput and StreamOutput classes.
core/src/clp/clo/CommandLineArguments.cpp/.hpp Added --print-ir-stats command-line option and supporting method.
core/src/clp_s/CommandLineArguments.cpp/.hpp Introduced --print-ordered-chunk-stats command-line option.
job-orchestration/job_orchestration/executor/query/extract_stream_task.py Enhanced S3 storage integration, added stream upload capabilities.
job-orchestration/job_orchestration/executor/query/fs_search_task.py Updated return handling in search function to return task results as a dictionary.
job-orchestration/job_orchestration/executor/query/utils.py Modified run_query_task to return task result and stdout.
package-template/src/etc/clp-config.yml Updated stream_output configuration structure to include storage type and directory attributes.
clp-py-utils/clp_py_utils/s3_utils.py Changed AWS credentials retrieval in s3_put function to use get_credentials() method.

Sequence Diagram

sequenceDiagram
    participant CLI as Command Line Interface
    participant Config as CLPConfig
    participant Task as Extraction Task
    participant Storage as Storage Handler

    CLI->>Config: Set stream output configuration
    Config->>Storage: Initialize storage (FS/S3)
    Config->>Storage: Set directory path
    CLI->>Task: Start extraction
    Task->>Storage: Retrieve output directory
    Task->>Task: Execute extraction
    Task->>Storage: Upload results if S3 enabled
    Task-->>CLI: Return extraction results
Loading

Possibly related PRs

Suggested reviewers

  • kirkrodrigues

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.


if s3_error:
task_results.status = QueryTaskStatus.FAILED
task_results.error_log_path = "S3 Failed"
Copy link
Contributor Author

@haiqi96 haiqi96 Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestion how should we handle error if binaries pass but s3_update fails?

Should we write the error into a local err_log file and update error_log_path accordingly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way to get the logging destination from the logger object we get from celery and use it to set error_log_path here? Since we log the errors in this python code there it seems like the natural thing to do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit tricky because the destination is set through the -f option for container.

Tried a few tricks I found online but none of them work, maybe we can just pass the log_path via environmental variable from start_clp.py and print it out at this line?

@@ -20,7 +20,6 @@
REDIS_COMPONENT_NAME,
REDUCER_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME,
StorageType,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixing a previous redundant import

@haiqi96
Copy link
Contributor Author

haiqi96 commented Jan 13, 2025

@coderabbitai review

Copy link
Contributor

coderabbitai bot commented Jan 13, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
components/job-orchestration/job_orchestration/executor/query/utils.py (1)

50-50: Consider using UTF-8 encoding for better character support.

The current implementation uses ASCII decoding which might fail with non-ASCII characters in the output. Consider using UTF-8 for better character support.

-    return task_result, stdout_data.decode("ascii")
+    return task_result, stdout_data.decode("utf-8")

Also applies to: 64-64, 86-86, 113-113

components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (1)

187-189: Add more detailed error information.

The current error message "S3 Failed" is not descriptive enough for debugging purposes.

-            task_results.error_log_path = "S3 Failed"
+            task_results.error_log_path = f"S3 upload failed: {s3_error}"
components/core/src/clp/clo/clo.cpp (1)

216-222: Consider explicit JSON formatting settings.

The current implementation uses automatic indentation (-1) and ignores JSON formatting errors. Consider:

  1. Using explicit indentation (e.g., 2 or 4 spaces) for consistent output
  2. Handling JSON formatting errors to catch potential issues
-                std::cout << json_msg.dump(-1, ' ', true, nlohmann::json::error_handler_t::ignore)
+                std::cout << json_msg.dump(2, ' ', true)
                          << std::endl;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 57bc265 and f933099.

📒 Files selected for processing (14)
  • components/clp-package-utils/clp_package_utils/general.py (2 hunks)
  • components/clp-package-utils/clp_package_utils/scripts/start_clp.py (2 hunks)
  • components/clp-py-utils/clp_py_utils/clp_config.py (9 hunks)
  • components/core/src/clp/clo/CommandLineArguments.cpp (1 hunks)
  • components/core/src/clp/clo/CommandLineArguments.hpp (2 hunks)
  • components/core/src/clp/clo/clo.cpp (2 hunks)
  • components/core/src/clp_s/CommandLineArguments.cpp (2 hunks)
  • components/core/src/clp_s/CommandLineArguments.hpp (2 hunks)
  • components/core/src/clp_s/JsonConstructor.cpp (2 hunks)
  • components/core/src/clp_s/JsonConstructor.hpp (1 hunks)
  • components/core/src/clp_s/clp-s.cpp (1 hunks)
  • components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (7 hunks)
  • components/job-orchestration/job_orchestration/executor/query/fs_search_task.py (2 hunks)
  • components/job-orchestration/job_orchestration/executor/query/utils.py (5 hunks)
🧰 Additional context used
📓 Path-based instructions (8)
components/core/src/clp_s/JsonConstructor.hpp (1)

Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.

components/core/src/clp_s/JsonConstructor.cpp (1)

Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.

components/core/src/clp/clo/CommandLineArguments.cpp (1)

Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.

components/core/src/clp/clo/CommandLineArguments.hpp (1)

Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.

components/core/src/clp_s/clp-s.cpp (1)

Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.

components/core/src/clp/clo/clo.cpp (1)

Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.

components/core/src/clp_s/CommandLineArguments.hpp (1)

Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.

components/core/src/clp_s/CommandLineArguments.cpp (1)

Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.

⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: centos-stream-9-static-linked-bins
  • GitHub Check: ubuntu-focal-static-linked-bins
  • GitHub Check: ubuntu-jammy-static-linked-bins
  • GitHub Check: centos-stream-9-dynamic-linked-bins
  • GitHub Check: ubuntu-focal-dynamic-linked-bins
  • GitHub Check: ubuntu-jammy-dynamic-linked-bins
  • GitHub Check: build-macos (macos-14, false)
  • GitHub Check: build-macos (macos-14, true)
  • GitHub Check: build-macos (macos-13, false)
  • GitHub Check: build-macos (macos-13, true)
🔇 Additional comments (29)
components/clp-package-utils/clp_package_utils/general.py (2)

256-266: LGTM! Good encapsulation of stream output directory access.

The changes improve encapsulation by using getter/setter methods instead of direct attribute access, which is consistent with object-oriented principles.


278-278: LGTM! Proper handling of stream output configuration.

The change correctly copies the entire stream output configuration instead of just the directory, which is more robust and maintainable.

components/clp-py-utils/clp_py_utils/clp_config.py (8)

43-43: LGTM! Well-defined constant for default data directory.

The constant follows naming conventions and provides a centralized definition for the default data directory path.


344-344: LGTM! Improved type safety for directory field.

Using pathlib.Path instead of a generic type improves type safety and IDE support.


363-363: LGTM! Consistent type safety improvement for S3 staging directory.

Using pathlib.Path maintains consistency with FsStorage and improves type safety.


381-395: LGTM! Well-structured storage class hierarchy.

The new storage classes provide clear specialization for different storage types with appropriate default paths. The inheritance hierarchy is clean and logical.


397-417: LGTM! Well-designed helper functions for directory management.

The helper functions properly encapsulate storage type-specific logic with appropriate error handling. The private scope is correctly indicated with the underscore prefix.


420-420: LGTM! Improved type safety and encapsulation in ArchiveOutput.

The changes properly leverage the new specialized storage types and helper functions for better type safety and encapsulation.

Also applies to: 451-454


464-464: LGTM! Consistent implementation in StreamOutput.

The changes maintain consistency with ArchiveOutput, properly implementing the same pattern for directory management.

Also applies to: 473-477


552-552: LGTM! Proper adaptation of CLPConfig methods.

The changes correctly adapt the configuration methods to use the new directory access pattern while maintaining clear error messages.

Also applies to: 582-582

components/clp-package-utils/clp_package_utils/scripts/start_clp.py (2)

707-707: LGTM! Consistent use of directory getter method.

The change properly uses the new getter method for stream output directory, maintaining consistency with the updated storage configuration model.


925-925: LGTM! Proper integration with log viewer settings.

The change correctly uses the new getter method when configuring the log viewer's stream files directory.

components/core/src/clp_s/JsonConstructor.hpp (1)

34-34: LGTM! Clean addition of the stream stats flag.

The new boolean member follows the existing pattern and naming conventions.

components/job-orchestration/job_orchestration/executor/query/utils.py (1)

9-9: LGTM! Clean import addition.

The Tuple import is correctly added to support the new return type.

components/job-orchestration/job_orchestration/executor/query/fs_search_task.py (1)

Line range hint 160-170: LGTM! Clean handling of the new return type.

The changes correctly unpack the tuple and discard the unused stdout data.

Also applies to: 171-171

components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (3)

2-2: LGTM! Clean import organization.

The imports are logically grouped and properly ordered.

Also applies to: 9-11


32-32: LGTM! Consistent command-line flag handling.

The enable_s3_write flag is consistently used across both CLP and CLP-S commands.

Also applies to: 57-58, 78-79


131-139: LGTM! Clean S3 configuration setup.

The S3 configuration is properly initialized based on the storage type.

components/core/src/clp_s/JsonConstructor.cpp (2)

8-8: LGTM!

The include statement is correctly placed with other third-party includes and is necessary for the JSON output functionality.


133-139: ⚠️ Potential issue

Several issues in the JSON output implementation.

  1. The indent parameter -1 in json.dump() is invalid. Use a positive integer for pretty-printing or 0 for compact output.
  2. The error handler is set to ignore, which could silently hide JSON encoding errors.
  3. As per previous feedback, the "id" field might be unnecessary.

Apply this diff to fix the issues:

         if (m_option.print_ordered_stream_stats) {
             nlohmann::json json_msg;
             json_msg["stream_path"] = new_file_path.string();
-            json_msg["id"] = m_option.archive_id;
-            std::cout << json_msg.dump(-1, ' ', true, nlohmann::json::error_handler_t::ignore)
+            std::cout << json_msg.dump(2, ' ', true, nlohmann::json::error_handler_t::strict)
                       << std::endl;
         }

Likely invalid or redundant comment.

components/core/src/clp_s/CommandLineArguments.hpp (2)

73-75: LGTM!

The method declaration follows the class's style and correctly uses [[nodiscard]] to prevent accidental value discarding.


198-198: LGTM!

The member variable is correctly initialized and follows the class's naming convention.

components/core/src/clp/clo/CommandLineArguments.hpp (2)

51-52: LGTM!

The method declaration follows the class's style and correctly uses [[nodiscard]] to prevent accidental value discarding.


185-185: LGTM!

The member variable is correctly initialized and follows the class's naming convention.

components/core/src/clp_s/clp-s.cpp (1)

309-309: LGTM!

The option assignment is correctly placed and follows the same style as other option assignments.

components/core/src/clp/clo/clo.cpp (1)

6-6: LGTM! The JSON library include is correctly placed.

The include is appropriately positioned with other external dependencies.

components/core/src/clp/clo/CommandLineArguments.cpp (1)

182-190: LGTM! The command-line option is well-implemented.

The new --print-stream-stats option follows the established patterns:

  • Uses bool_switch for flag-style option
  • Provides clear description
  • Maintains consistent formatting with other options
components/core/src/clp_s/CommandLineArguments.cpp (2)

415-418: LGTM! The command-line option is well-implemented.

The new --print-ordered-stream-stats option follows the established patterns:

  • Uses bool_switch for flag-style option
  • Provides clear description
  • Maintains consistent formatting with other options

499-504: LGTM! The validation logic is thorough and clear.

The validation ensures that --print-ordered-stream-stats is only used with the --ordered argument, providing a clear error message on misuse.

@haiqi96 haiqi96 changed the title Draft for now feat(clp-package): Add support for uploading extracted streams to S3. Jan 13, 2025
@haiqi96 haiqi96 marked this pull request as ready for review January 13, 2025 22:34
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
components/core/src/clp/clo/clo.cpp (2)

216-221: Consider enhancing the JSON output implementation.

Several improvements could make the JSON output more robust and useful:

  1. Use a specific indentation value instead of -1 for better clarity
  2. Consider using a stricter error handler for UTF-8 validation
  3. Use relative paths to avoid exposing system information
  4. Include additional useful metadata (e.g., size, timestamp)

Here's a suggested implementation:

-                json_msg["stream_path"] = dest_ir_path;
-                std::cout << json_msg.dump(-1, ' ', true, nlohmann::json::error_handler_t::ignore)
-                          << std::endl;
+                json_msg["stream_path"] = dest_ir_path.filename();
+                json_msg["size"] = std::filesystem::file_size(dest_ir_path);
+                json_msg["timestamp"] = std::filesystem::last_write_time(dest_ir_path);
+                std::cout << json_msg.dump(2, ' ', true, nlohmann::json::error_handler_t::strict)
+                          << std::endl;

215-222: Consider abstracting the stream output handling.

The current implementation tightly couples the JSON output with filesystem operations. To better support the planned S3 integration and potential future storage backends, consider introducing an abstraction layer for stream output handling.

This would make it easier to:

  • Support different storage backends (filesystem, S3, etc.)
  • Standardize the JSON output format across backends
  • Add new output formats without modifying the core extraction logic
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f933099 and e506044.

📒 Files selected for processing (2)
  • components/core/src/clp/clo/clo.cpp (2 hunks)
  • components/core/src/clp_s/JsonConstructor.cpp (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • components/core/src/clp_s/JsonConstructor.cpp
🧰 Additional context used
📓 Path-based instructions (1)
components/core/src/clp/clo/clo.cpp (1)

Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.

🔇 Additional comments (1)
components/core/src/clp/clo/clo.cpp (1)

6-6: LGTM! Good choice of JSON library.

The nlohmann/json library is a well-tested, header-only JSON library for C++. The single-include version is used, which is good for compilation performance.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
components/clp-py-utils/clp_py_utils/clp_config.py (1)

397-417: Add docstrings to helper functions.

The helper functions _get_directory_from_storage_config and _set_directory_for_storage_config would benefit from docstrings describing their purpose, parameters, return values, and possible exceptions.

 def _get_directory_from_storage_config(storage_config: Union[FsStorage, S3Storage]) -> pathlib.Path:
+    """
+    Get the appropriate directory path from a storage configuration.
+    
+    Args:
+        storage_config: The storage configuration object
+        
+    Returns:
+        pathlib.Path: The directory path for the storage
+        
+    Raises:
+        NotImplementedError: If the storage type is not supported
+    """
     storage_type = storage_config.type

 def _set_directory_for_storage_config(
     storage_config: Union[FsStorage, S3Storage], directory
 ) -> None:
+    """
+    Set the appropriate directory path in a storage configuration.
+    
+    Args:
+        storage_config: The storage configuration object
+        directory: The directory path to set
+        
+    Raises:
+        NotImplementedError: If the storage type is not supported
+    """
     storage_type = storage_config.type
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e506044 and 5aac350.

📒 Files selected for processing (2)
  • components/clp-py-utils/clp_py_utils/clp_config.py (9 hunks)
  • components/package-template/src/etc/clp-config.yml (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (12)
  • GitHub Check: ubuntu-focal-static-linked-bins
  • GitHub Check: centos-stream-9-static-linked-bins
  • GitHub Check: ubuntu-jammy-static-linked-bins
  • GitHub Check: ubuntu-focal-dynamic-linked-bins
  • GitHub Check: centos-stream-9-dynamic-linked-bins
  • GitHub Check: ubuntu-jammy-dynamic-linked-bins
  • GitHub Check: build-macos (macos-14, false)
  • GitHub Check: build-macos (macos-14, true)
  • GitHub Check: build-macos (macos-13, false)
  • GitHub Check: lint-check (ubuntu-latest)
  • GitHub Check: lint-check (macos-latest)
  • GitHub Check: build-macos (macos-13, true)
🔇 Additional comments (4)
components/package-template/src/etc/clp-config.yml (1)

89-91: LGTM! The storage configuration structure is well-defined.

The new structure for stream_output.storage aligns with the PR objectives and maintains consistency with the existing archive_output configuration.

components/clp-py-utils/clp_py_utils/clp_config.py (3)

43-43: LGTM! Well-structured base classes with proper type hints and validation.

The addition of CLP_DEFAULT_DATA_DIRECTORY_PATH constant and updates to storage base classes provide a solid foundation for the storage configuration system.

Also applies to: 344-344, 363-363


381-394: LGTM! Well-designed storage class hierarchy.

The specialized storage classes (ArchiveFsStorage, StreamFsStorage, ArchiveS3Storage, StreamS3Storage) provide clear separation of concerns and appropriate default paths.


420-420: LGTM! Configuration classes are well-structured with proper validation.

The updates to ArchiveOutput and StreamOutput classes provide:

  • Correct type hints for specialized storage classes
  • Proper encapsulation of directory operations
  • Consistent validation of paths

Also applies to: 451-454, 458-458, 464-464, 473-477, 481-482, 552-552, 582-582, 668-668, 677-677

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (1)

167-189: Enhance error logging for S3 upload failures.

The current error message "S3 Failed" is not informative enough. Consider including more details about the failure.

-            task_results.error_log_path = "S3 Failed"
+            task_results.error_log_path = f"S3 upload failed: {s3_error}"
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5aac350 and bb5f9cc.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (7 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: build-macos (macos-13, true)
  • GitHub Check: build (macos-latest)
🔇 Additional comments (4)
components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (4)

9-11: LGTM! Import changes and parameter addition are well-structured.

The new imports and parameter addition properly support the S3 upload functionality.

Also applies to: 31-32


57-58: LGTM! Consistent implementation of stream stats printing.

The print_stream_stats flag is handled consistently for both storage engines.

Also applies to: 78-79


131-139: LGTM! Clean S3 configuration setup.

The S3 configuration initialization and enable flag handling is well-structured.

Also applies to: 146-146


193-193: LGTM! Clean task results handling.

The dictionary conversion of task results maintains consistency with the rest of the codebase.

components/clp-py-utils/clp_py_utils/clp_config.py Outdated Show resolved Hide resolved
components/clp-py-utils/clp_py_utils/clp_config.py Outdated Show resolved Hide resolved
);
options_ir_extraction.add_options()(
"target-size",
po::value<size_t>(&m_ir_target_size)->value_name("SIZE"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
po::value<size_t>(&m_ir_target_size)->value_name("SIZE"),
po::value<size_t>(&m_ir_target_size)
->value_name("SIZE")
->default_value(m_ir_target_size),

Probably want a default value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

components/core/src/clp/clo/CommandLineArguments.cpp Outdated Show resolved Hide resolved
components/core/src/clp_s/CommandLineArguments.cpp Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
components/clp-py-utils/clp_py_utils/clp_config.py (1)

Line range hint 420-480: Consider adding validation for S3 storage configuration.

The ArchiveOutput and StreamOutput classes should validate that S3 credentials are properly configured when S3 storage is selected.

Add a validator to check S3 configuration:

 class ArchiveOutput(BaseModel):
     storage: Union[ArchiveFsStorage, ArchiveS3Storage] = ArchiveFsStorage()
+    
+    @validator("storage")
+    def validate_storage(cls, field):
+        if isinstance(field, ArchiveS3Storage):
+            if not field.s3_config.access_key_id or not field.s3_config.secret_access_key:
+                raise ValueError("S3 credentials must be configured for S3 storage")
+        return field

Apply similar changes to StreamOutput class.

🧹 Nitpick comments (7)
components/clp-py-utils/clp_py_utils/clp_config.py (3)

Line range hint 344-417: Consider adding type hints to helper function parameters.

The helper functions _set_directory_for_storage_config and _get_directory_from_storage_config would benefit from more specific type hints.

Apply this diff to improve type safety:

-def _get_directory_from_storage_config(storage_config: Union[FsStorage, S3Storage]) -> pathlib.Path:
+def _get_directory_from_storage_config(storage_config: Union[FsStorage, S3Storage, ArchiveFsStorage, ArchiveS3Storage, StreamFsStorage, StreamS3Storage]) -> pathlib.Path:

-def _set_directory_for_storage_config(storage_config: Union[FsStorage, S3Storage], directory) -> None:
+def _set_directory_for_storage_config(storage_config: Union[FsStorage, S3Storage, ArchiveFsStorage, ArchiveS3Storage, StreamFsStorage, StreamS3Storage], directory: pathlib.Path) -> None:

550-550: Consider consolidating directory validation logic.

The validation methods for archive and stream output directories are similar. Consider extracting the common validation logic into a helper function.

+def _validate_output_directory(directory: pathlib.Path, output_type: str) -> None:
+    try:
+        validate_path_could_be_dir(directory)
+    except ValueError as ex:
+        raise ValueError(f"{output_type} directory is invalid: {ex}")
+
 def validate_stream_output_dir(self):
-    try:
-        validate_path_could_be_dir(self.stream_output.get_directory())
-    except ValueError as ex:
-        raise ValueError(f"stream_output.directory is invalid: {ex}")
+    _validate_output_directory(self.stream_output.get_directory(), "stream_output")

Also applies to: 580-582


Line range hint 666-675: Consider adding docstring to explain the purpose of stream-related fields.

The addition of stream-related fields in WorkerConfig would benefit from documentation explaining their purpose and when they are used.

Add a docstring to explain the stream-related fields:

 class WorkerConfig(BaseModel):
+    """Configuration for worker processes.
+    
+    Attributes:
+        stream_output: Configuration for stream storage, used only by query workers.
+        stream_collection_name: Name of the collection storing stream metadata, used only by query workers.
+    """
     package: Package = Package()
components/job-orchestration/job_orchestration/executor/query/utils.py (2)

64-65: Consider using a context manager for the log file.

While the stdout/stderr configuration is correct, consider using a context manager for the log file to ensure proper cleanup:

-    clo_log_file = open(clo_log_path, "w")
+    with open(clo_log_path, "w") as clo_log_file:
         task_proc = subprocess.Popen(
             task_command,
             preexec_fn=os.setpgrp,
             close_fds=True,
             stdout=subprocess.PIPE,
             stderr=clo_log_file,
         )

86-86: Add error handling for UTF-8 decoding.

While the UTF-8 decoding implementation is good, consider adding error handling for potential decode failures:

-    return task_result, stdout_data.decode("utf-8")
+    try:
+        decoded_output = stdout_data.decode("utf-8")
+    except UnicodeDecodeError as e:
+        logger.error(f"Failed to decode stdout as UTF-8: {e}")
+        decoded_output = stdout_data.decode("utf-8", errors="replace")
+    return task_result, decoded_output

Also applies to: 113-113

components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (2)

178-182: Enhance stream path validation

Consider adding more specific validation for the stream path:

  1. Check if path exists
  2. Validate path is within expected directory
  3. Ensure path is a file
             stream_path_str = stream_stats.get("path")
             if stream_path_str is None:
                 logger.error(f"`path` is not a valid key in `{line}`")
                 upload_error = True
                 continue
+
+            stream_path = Path(stream_path_str)
+            if not stream_path.exists():
+                logger.error(f"Stream path {stream_path_str} does not exist")
+                upload_error = True
+                continue
+
+            if not stream_path.is_file():
+                logger.error(f"Stream path {stream_path_str} is not a file")
+                upload_error = True
+                continue
+
+            if not str(stream_path).startswith(str(worker_config.stream_output.get_directory())):
+                logger.error(f"Stream path {stream_path_str} is outside the expected directory")
+                upload_error = True
+                continue

199-201: Enhance error reporting granularity

Consider providing more specific error status and detailed error information in the task results.

         if upload_error:
             task_results.status = QueryTaskStatus.FAILED
-            task_results.error_log_path = str(os.getenv("WORKER_LOG_PATH"))
+            task_results.status = QueryTaskStatus.FAILED_S3_UPLOAD
+            task_results.error_details = "Failed to upload one or more streams to S3"
+            task_results.error_log_path = str(os.getenv("WORKER_LOG_PATH"))
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bb5f9cc and 856664f.

📒 Files selected for processing (12)
  • components/clp-package-utils/clp_package_utils/scripts/start_clp.py (4 hunks)
  • components/clp-py-utils/clp_py_utils/clp_config.py (9 hunks)
  • components/core/src/clp/clo/CommandLineArguments.cpp (1 hunks)
  • components/core/src/clp/clo/CommandLineArguments.hpp (2 hunks)
  • components/core/src/clp/clo/clo.cpp (2 hunks)
  • components/core/src/clp_s/CommandLineArguments.cpp (2 hunks)
  • components/core/src/clp_s/CommandLineArguments.hpp (2 hunks)
  • components/core/src/clp_s/JsonConstructor.cpp (2 hunks)
  • components/core/src/clp_s/JsonConstructor.hpp (1 hunks)
  • components/core/src/clp_s/clp-s.cpp (1 hunks)
  • components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (7 hunks)
  • components/job-orchestration/job_orchestration/executor/query/utils.py (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
  • components/core/src/clp_s/JsonConstructor.hpp
  • components/core/src/clp_s/JsonConstructor.cpp
  • components/clp-package-utils/clp_package_utils/scripts/start_clp.py
  • components/core/src/clp/clo/CommandLineArguments.hpp
  • components/core/src/clp_s/clp-s.cpp
  • components/core/src/clp/clo/clo.cpp
  • components/core/src/clp/clo/CommandLineArguments.cpp
🧰 Additional context used
📓 Path-based instructions (2)
components/core/src/clp_s/CommandLineArguments.cpp (1)

Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.

components/core/src/clp_s/CommandLineArguments.hpp (1)

Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.

⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: build-macos (macos-14, false)
🔇 Additional comments (12)
components/core/src/clp_s/CommandLineArguments.hpp (2)

73-75: LGTM! Well-structured getter method.

The method is correctly marked with [[nodiscard]] and uses modern C++ auto return type syntax.


198-198: LGTM! Well-defined member variable.

The variable follows the class's naming convention and is correctly initialized.

components/core/src/clp_s/CommandLineArguments.cpp (2)

415-418: LGTM! Well-defined command-line option.

The option follows the naming convention and has a clear description.


499-503: LGTM! Proper validation of command-line option dependencies.

The validation ensures the option is only used with the "ordered" argument and provides a clear error message.

components/clp-py-utils/clp_py_utils/clp_config.py (2)

43-43: LGTM! Good practice using a constant for the default data directory path.

The addition of CLP_DEFAULT_DATA_DIRECTORY_PATH improves maintainability by centralizing the default path configuration.


381-394: Verify the default directory paths are created during initialization.

The specialized storage classes set default paths, but we should ensure these directories are created when needed.

✅ Verification successful

Default directory paths are properly created during initialization

The directories are created with appropriate flags (parents=True, exist_ok=True) through the config object's get_directory() method in start_clp.py during system initialization.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for directory creation logic
rg -A 5 "mkdir|makedirs|exists|is_dir" --type py

Length of output: 37218

components/job-orchestration/job_orchestration/executor/query/utils.py (3)

9-9: LGTM! Import addition is well-placed.

The addition of Tuple to the typing imports is appropriate and necessary for the new function signature.


84-85: Comment improvement was previously suggested.

The added comment about communicate vs wait implements the previously suggested improvement.


50-50: Verify all callers handle the new tuple return type.

The return type change to Tuple[QueryTaskResult, str] is well-structured and aligns with the PR's objective to support S3 uploads. However, we should ensure all callers are updated to handle both return values.

✅ Verification successful

All callers properly handle the new tuple return type

Both callers have been updated to handle the tuple return:

  • extract_stream_task.py uses both values: task_results, task_stdout_as_str
  • fs_search_task.py explicitly ignores the second value: task_results, _
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for direct calls to run_query_task to verify they handle both return values
ast-grep --pattern 'run_query_task($_)' 

Length of output: 41


Script:

#!/bin/bash
# Find function definition
ast-grep --pattern 'def run_query_task($$$) -> Tuple[$_]'

# Find imports and usages
rg "run_query_task" -A 2

# Find function calls with return value handling
ast-grep --pattern '
(assignment_statement
  (pattern_list) @capture
  (call
    (attribute
      (identifier)
      (identifier "run_query_task"))
    (argument_list)))'

Length of output: 1919

components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (3)

31-32: LGTM: Command-line options for stream statistics

The addition of print_stream_stats parameter and corresponding command-line options (--print-ir-stats, --print-ordered-chunk-stats) is well-implemented and consistent across both storage engines.

Also applies to: 57-58, 78-79


131-138: LGTM: Clean S3 configuration setup

The S3 configuration initialization is well-structured and follows the existing patterns for storage configuration.


197-197: Review stream deletion logic

The stream is deleted even if upload_error is true. While streams can be recreated, consider preserving failed streams for immediate debugging.

-            stream_path.unlink()
+            if not upload_error:
+                stream_path.unlink()
+            else:
+                logger.warning(f"Preserving failed stream {stream_path} for debugging")

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
components/clp-py-utils/clp_py_utils/clp_config.py (1)

Line range hint 687-696: Consider adding stream output validation.

The WorkerConfig class should validate that the stream_output configuration is compatible with the package storage engine, similar to how archive_output is validated in CLPConfig.

 class WorkerConfig(BaseModel):
     def dump_to_primitive_dict(self):
         d = self.dict()
         d["archive_output"] = self.archive_output.dump_to_primitive_dict()
+        if (
+            StorageType.S3 == self.stream_output.storage.type
+            and StorageEngine.CLP_S != self.package.storage_engine
+        ):
+            raise ValueError(
+                f"stream_output.storage.type = 's3' is only supported with package.storage_engine"
+                f" = '{StorageEngine.CLP_S}'"
+            )
         d["stream_output"] = self.stream_output.dump_to_primitive_dict()
         return d
🧹 Nitpick comments (5)
components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (2)

190-195: Consider using more specific exception handling for S3 operations

Instead of catching a generic Exception, consider catching specific S3-related exceptions to provide more detailed error messages and handling.

-                try:
-                    s3_put(s3_config, stream_path, stream_name)
-                    logger.info(f"Finished uploading stream {stream_name} to S3.")
-                except Exception as err:
-                    logger.error(f"Failed to upload stream {stream_name}: {err}")
-                    upload_error = True
+                try:
+                    s3_put(s3_config, stream_path, stream_name)
+                    logger.info(f"Finished uploading stream {stream_name} to S3.")
+                except (S3UploadError, S3ConnectionError) as err:
+                    logger.error(f"S3 error uploading stream {stream_name}: {err}")
+                    upload_error = True
+                except Exception as err:
+                    logger.error(f"Unexpected error uploading stream {stream_name}: {err}")
+                    upload_error = True

199-203: Consider providing more detailed task status updates

The current implementation only updates the task status to FAILED. Consider adding more granular status updates to help with debugging and monitoring.

         if upload_error:
             task_results.status = QueryTaskStatus.FAILED
             task_results.error_log_path = str(os.getenv("WORKER_LOG_PATH"))
+            task_results.error_message = "Failed to upload one or more streams to S3"
         else:
             logger.info(f"Finished uploading streams.")
components/clp-py-utils/clp_py_utils/clp_config.py (3)

313-327: Enhance S3 credentials validation.

While the empty string validation is good, consider adding additional validation for AWS access key format:

  • Access key IDs typically follow the pattern ^[A-Z0-9]{20}$
  • Secret access keys typically follow the pattern ^[A-Za-z0-9/+=]{40}$
     @validator("access_key_id")
     def validate_access_key_id(cls, field):
         if field == "":
             raise ValueError("access_key_id cannot be empty")
+        if not re.match(r"^[A-Z0-9]{20}$", field):
+            raise ValueError("access_key_id format is invalid")
         return field

     @validator("secret_access_key")
     def validate_secret_access_key(cls, field):
         if field == "":
             raise ValueError("secret_access_key cannot be empty")
+        if not re.match(r"^[A-Za-z0-9/+=]{40}$", field):
+            raise ValueError("secret_access_key format is invalid")
         return field

357-361: Consider adding credential rotation support.

The get_credentials method could be enhanced to support credential rotation by adding a method to refresh credentials from external sources (e.g., AWS IAM roles, environment variables).


418-438: Consider adding path validation in directory management helpers.

The helper functions _get_directory_from_storage_config and _set_directory_for_storage_config should validate that the directory paths are absolute when setting them.

 def _set_directory_for_storage_config(
     storage_config: Union[FsStorage, S3Storage], directory
 ) -> None:
+    if not directory.is_absolute():
+        raise ValueError("Directory path must be absolute")
     storage_type = storage_config.type
     if StorageType.FS == storage_type:
         storage_config.directory = directory
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 856664f and c6a1ad2.

📒 Files selected for processing (5)
  • components/clp-py-utils/clp_py_utils/clp_config.py (11 hunks)
  • components/clp-py-utils/clp_py_utils/s3_utils.py (1 hunks)
  • components/core/src/clp_s/JsonConstructor.cpp (2 hunks)
  • components/core/src/clp_s/clp-s.cpp (1 hunks)
  • components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (7 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • components/core/src/clp_s/JsonConstructor.cpp
  • components/core/src/clp_s/clp-s.cpp
⏰ Context from checks skipped due to timeout of 90000ms (12)
  • GitHub Check: ubuntu-jammy-static-linked-bins
  • GitHub Check: ubuntu-jammy-dynamic-linked-bins
  • GitHub Check: ubuntu-focal-static-linked-bins
  • GitHub Check: ubuntu-focal-dynamic-linked-bins
  • GitHub Check: centos-stream-9-static-linked-bins
  • GitHub Check: centos-stream-9-dynamic-linked-bins
  • GitHub Check: build-macos (macos-14, false)
  • GitHub Check: build-macos (macos-14, true)
  • GitHub Check: lint-check (ubuntu-latest)
  • GitHub Check: build-macos (macos-13, false)
  • GitHub Check: build-macos (macos-13, true)
  • GitHub Check: lint-check (macos-latest)
🔇 Additional comments (7)
components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (2)

9-11: LGTM: Command modifications for stream statistics printing

The changes to support printing stream statistics are well-integrated into both IR and JSON extraction paths.

Also applies to: 31-32, 57-58, 78-79


131-138: LGTM: S3 configuration initialization

The S3 configuration setup is clean and follows proper validation patterns.

components/clp-py-utils/clp_py_utils/clp_config.py (3)

3-3: LGTM! Clean import addition and constant definition.

The new import of Tuple and the constant CLP_DEFAULT_DATA_DIRECTORY_PATH are well-placed and follow the project's conventions.

Also applies to: 43-43


402-416: LGTM! Well-structured storage class hierarchy.

The specialized storage classes (ArchiveFsStorage, StreamFsStorage, ArchiveS3Storage, StreamS3Storage) provide good separation of concerns and default paths.


571-571: LGTM! Proper path validation in configuration.

The updates to make_config_paths_absolute and validate_stream_output_dir ensure proper path handling and validation.

Also applies to: 601-601

components/clp-py-utils/clp_py_utils/s3_utils.py (2)

164-164: 🛠️ Refactor suggestion

Verify error handling for credential retrieval

The get_credentials() method call should be wrapped in a try-except block to handle potential credential retrieval failures gracefully.

-    aws_access_key_id, aws_secret_access_key = s3_config.get_credentials()
+    try:
+        aws_access_key_id, aws_secret_access_key = s3_config.get_credentials()
+    except Exception as e:
+        raise ValueError(f"Failed to retrieve AWS credentials: {str(e)}")

Let's verify the error handling in the get_credentials() method:


169-170: Consider using AWS SDK's default credential chain

Instead of explicitly passing credentials, consider leveraging boto3's default credential chain which supports multiple credential sources (environment variables, IAM roles, etc.) and follows AWS security best practices.

Let's check if the codebase already uses the default credential chain elsewhere:

@gibber9809 gibber9809 self-requested a review January 17, 2025 02:21
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
components/clp-py-utils/clp_py_utils/clp_config.py (1)

Consider adding environment variable fallback for AWS credentials

The codebase already uses AWS environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) in other components. Adding environment variable fallback in the S3Config class would align with existing patterns and AWS best practices.

  • components/clp-py-utils/clp_py_utils/clp_config.py: Consider modifying get_credentials() to check environment variables when self.credentials is None.
🔗 Analysis chain

Line range hint 313-361: Well-structured credentials management with proper validation.

The implementation properly encapsulates S3 credentials and includes appropriate validation. Consider adding a check for credential environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) as a fallback.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if the codebase handles AWS environment variables
rg -l "AWS_(?:ACCESS_KEY_ID|SECRET_ACCESS_KEY)"

Length of output: 227


Script:

#!/bin/bash
# Check the implementation details around AWS credentials
echo "=== InputConfig.hpp ==="
rg -A 5 -B 5 "AWS_(?:ACCESS_KEY_ID|SECRET_ACCESS_KEY)" "components/core/src/clp_s/InputConfig.hpp"

echo -e "\n=== CommandLineArguments.cpp ==="
rg -A 5 -B 5 "AWS_(?:ACCESS_KEY_ID|SECRET_ACCESS_KEY)" "components/core/src/clp_s/CommandLineArguments.cpp"

echo -e "\n=== compression_task.py ==="
rg -A 5 -B 5 "AWS_(?:ACCESS_KEY_ID|SECRET_ACCESS_KEY)" "components/job-orchestration/job_orchestration/executor/compress/compression_task.py"

Length of output: 2814

🧹 Nitpick comments (2)
components/clp-py-utils/clp_py_utils/clp_config.py (2)

402-416: Well-organized storage class hierarchy with clear defaults.

The specialized storage classes provide good separation of concerns. Consider adding docstrings to explain the purpose and usage of each storage class.

Add docstrings to explain the purpose of each storage class:

 class ArchiveFsStorage(FsStorage):
+    """Filesystem storage configuration for archives with default path under CLP_DEFAULT_DATA_DIRECTORY_PATH."""
     directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "archives"

418-438: Good abstraction of storage directory operations.

The helper functions effectively reduce code duplication. Consider adding type hints for the directory parameter in _set_directory_for_storage_config.

Add type hints for better code clarity:

-def _set_directory_for_storage_config(
-    storage_config: Union[FsStorage, S3Storage], directory
-) -> None:
+def _set_directory_for_storage_config(
+    storage_config: Union[FsStorage, S3Storage], directory: pathlib.Path
+) -> None:
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c6a1ad2 and ff14a5b.

📒 Files selected for processing (1)
  • components/clp-py-utils/clp_py_utils/clp_config.py (11 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (12)
  • GitHub Check: centos-stream-9-static-linked-bins
  • GitHub Check: centos-stream-9-dynamic-linked-bins
  • GitHub Check: ubuntu-jammy-static-linked-bins
  • GitHub Check: ubuntu-jammy-dynamic-linked-bins
  • GitHub Check: ubuntu-focal-static-linked-bins
  • GitHub Check: ubuntu-focal-dynamic-linked-bins
  • GitHub Check: build-macos (macos-14, false)
  • GitHub Check: build-macos (macos-14, true)
  • GitHub Check: lint-check (ubuntu-latest)
  • GitHub Check: build-macos (macos-13, false)
  • GitHub Check: build (macos-latest)
  • GitHub Check: build-macos (macos-13, true)
🔇 Additional comments (3)
components/clp-py-utils/clp_py_utils/clp_config.py (3)

3-3: LGTM! Clean addition of imports and constants.

The new imports and constant definition are well-placed and follow the existing pattern.

Also applies to: 43-43


365-365: Good removal of default values for storage paths.

Requiring explicit directory paths improves configuration clarity and prevents ambiguous defaults.

Also applies to: 384-384


441-441: Clean integration of the new storage architecture.

The storage configuration changes are consistently applied across all relevant classes, maintaining type safety and proper validation.

Also applies to: 472-475, 484-484, 493-497, 571-571, 601-601, 687-687, 696-696

gibber9809
gibber9809 previously approved these changes Jan 17, 2025
Copy link
Contributor

@gibber9809 gibber9809 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, though I think @kirkrodrigues might also be planning to take a look.

components/clp-py-utils/clp_py_utils/clp_config.py Outdated Show resolved Hide resolved
components/clp-py-utils/clp_py_utils/clp_config.py Outdated Show resolved Hide resolved
components/clp-py-utils/clp_py_utils/clp_config.py Outdated Show resolved Hide resolved
components/core/src/clp/clo/CommandLineArguments.cpp Outdated Show resolved Hide resolved
components/core/src/clp_s/CommandLineArguments.cpp Outdated Show resolved Hide resolved
@@ -729,6 +730,7 @@ def generic_start_worker(
"-e", f"CLP_CONFIG_PATH={container_clp_config.logs_directory / container_config_filename}",
"-e", f"CLP_LOGS_DIR={container_logs_dir}",
"-e", f"CLP_LOGGING_LEVEL={worker_config.logging_level}",
"-e", f"WORKER_LOG_PATH={container_worker_log_path}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Prefix with CLP_ to avoid conflicts with other env vars?
  • I think we discussed in a previous PR that we don't really need to have WORKER in the name, but feel free to disagree in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this is a special case where worker has two logs, one for clp executable and one for worker itself. I would keep worker in this case so it's clear

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
components/clp-py-utils/clp_py_utils/clp_config.py (3)

313-328: Enhance validation error messages.

The validation error messages could be more descriptive to help users understand the requirements better.

-            raise ValueError("access_key_id cannot be empty")
+            raise ValueError("S3 access_key_id cannot be empty. Please provide valid AWS credentials.")
-            raise ValueError("secret_access_key cannot be empty")
+            raise ValueError("S3 secret_access_key cannot be empty. Please provide valid AWS credentials.")

418-438: Add docstrings to helper functions.

These utility functions would benefit from docstrings describing their purpose, parameters, and return values.

 def _get_directory_from_storage_config(storage_config: Union[FsStorage, S3Storage]) -> pathlib.Path:
+    """
+    Get the appropriate directory path from a storage configuration.
+    
+    Args:
+        storage_config: Either FsStorage or S3Storage configuration
+    
+    Returns:
+        pathlib.Path: The directory path for the given storage type
+    
+    Raises:
+        NotImplementedError: If the storage type is not supported
+    """

601-603: Enhance error message clarity.

The error message could be more specific about what makes the directory invalid.

-            raise ValueError(f"stream_output.storage's directory is invalid: {ex}")
+            raise ValueError(f"stream_output.storage's directory '{self.stream_output.get_directory()}' is invalid: {ex}")
components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (1)

166-200: Enhance error messages for better debugging.

The stream upload implementation is solid, but error messages could be more descriptive.

Consider enhancing error messages with more context:

-                logger.exception(f"`{line}` cannot be decoded as JSON")
+                logger.exception(f"Failed to decode line as JSON. Line content: `{line}`")
-                logger.error(f"`path` is not a valid key in `{line}`")
+                logger.error(f"Missing required 'path' key in stream stats. Content: `{line}`")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b2c842b and 05bf3f6.

📒 Files selected for processing (3)
  • components/clp-py-utils/clp_py_utils/clp_config.py (11 hunks)
  • components/core/src/clp/clo/CommandLineArguments.cpp (1 hunks)
  • components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (7 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
components/core/src/clp/clo/CommandLineArguments.cpp (1)

Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.

⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: build-macos (macos-14, false)
  • GitHub Check: build-macos (macos-13, false)
  • GitHub Check: build-macos (macos-13, true)
  • GitHub Check: lint-check (ubuntu-latest)
  • GitHub Check: build (macos-latest)
  • GitHub Check: lint-check (macos-latest)
🔇 Additional comments (7)
components/core/src/clp/clo/CommandLineArguments.cpp (2)

182-188: LGTM! Well-structured command-line option.

The implementation follows Boost.Program_options conventions, and the description clearly indicates the unit (B) for the target size.


189-191: LGTM! Clear and consistent option implementation.

The boolean switch follows the codebase conventions, and the description clearly indicates the output format (NDJSON).

components/clp-py-utils/clp_py_utils/clp_config.py (2)

43-43: LGTM! Well-defined constant.

The constant follows the established naming convention and maintains consistency with other path constants in the file.


402-416: Consider using hyphens in directory names.

Based on previous feedback, the team prefers using hyphens instead of underscores in directory names.

-    directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged_archives"
+    directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged-archives"
-    staging_directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged_streams"
+    staging_directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged-streams"
components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (3)

2-2: LGTM! Clean imports and parameter addition.

The new imports for S3 functionality and the addition of the print_stream_stats parameter are well-organized and properly integrated.

Also applies to: 9-11, 32-32


131-138: LGTM! Clean S3 configuration setup.

The S3 configuration initialization and flag handling is well-structured. Good reuse of the enable_s3_upload flag for controlling both S3 upload and stream statistics printing.

Also applies to: 145-145


202-207: Verify error log path consistency.

The error log path is set from WORKER_LOG_PATH environment variable, but this might not align with the logging destination set through the -f option for the container, as discussed in previous comments.

Run this script to check logging configuration consistency:

✅ Verification successful

Logging configuration is properly aligned

The error log path using WORKER_LOG_PATH is correctly configured and aligns with the container setup. Both WORKER_LOG_PATH and CLP_LOGS_DIR environment variables are properly set and mounted in the worker containers, serving different logging purposes.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check logging configuration consistency
# Look for logging configuration in start_clp.py and container setup
rg -A 5 "WORKER_LOG_PATH|CLP_LOGS_DIR" --type py

Length of output: 7414

@haiqi96 haiqi96 requested a review from kirkrodrigues January 17, 2025 22:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants