From bf6b0fd9f9950695b5ed6cc522f9a8d9999d50ad Mon Sep 17 00:00:00 2001 From: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com> Date: Thu, 19 Dec 2024 16:33:58 +0530 Subject: [PATCH 1/4] Speed up builds in few places (#17591) --- .github/workflows/static-checks.yml | 2 +- it.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/static-checks.yml b/.github/workflows/static-checks.yml index 778a79db7618..6a149a46a6d3 100644 --- a/.github/workflows/static-checks.yml +++ b/.github/workflows/static-checks.yml @@ -71,7 +71,7 @@ jobs: if: ${{ matrix.java == '17' }} # errorprone requires JDK 11+ # Strict compilation requires more than 2 GB - run: ${MVN} clean -DstrictCompile compile test-compile --fail-at-end ${MAVEN_SKIP} ${MAVEN_SKIP_TESTS} + run: ${MVN} clean -DstrictCompile compile test-compile --fail-at-end ${MAVEN_SKIP} ${MAVEN_SKIP_TESTS} -T1C - name: maven install if: ${{ matrix.java == '17' }} diff --git a/it.sh b/it.sh index 12519c24b54d..cf9e02e2b2cc 100755 --- a/it.sh +++ b/it.sh @@ -229,7 +229,7 @@ case $CMD in usage ;; "ci" ) - mvn -q clean install dependency:go-offline -P dist $MAVEN_IGNORE + mvn -q clean install dependency:go-offline -P dist $MAVEN_IGNORE -T1C ;; "build" ) mvn -B clean install -P dist $MAVEN_IGNORE -T1.0C $* From 4648a41cab55b2ac4d28e1f471c92da2402d68cb Mon Sep 17 00:00:00 2001 From: Charles Smith Date: Thu, 19 Dec 2024 12:49:56 -0800 Subject: [PATCH 2/4] [Docs] Adds tutorial and stepwise instructions for EXTERN (#17501) * draft tutorial on extern: * updated draft * updates * add sidebar, fix reference wording * update reference * final updates to reference * update * add cloud info to tutorial * fix conflict * fix link * Update docs/multi-stage-query/reference.md Co-authored-by: Katya Macedo <38017980+ektravel@users.noreply.github.com> * Update docs/multi-stage-query/reference.md Co-authored-by: Katya Macedo <38017980+ektravel@users.noreply.github.com> * Apply suggestions from code review Co-authored-by: Katya Macedo <38017980+ektravel@users.noreply.github.com> * Apply suggestions from code review Co-authored-by: Katya Macedo <38017980+ektravel@users.noreply.github.com> * fixes * make hyperlink to console * Update docs/multi-stage-query/reference.md Co-authored-by: Katya Macedo <38017980+ektravel@users.noreply.github.com> * Apply suggestions from code review Co-authored-by: Katya Macedo <38017980+ektravel@users.noreply.github.com> * fix typos --------- Co-authored-by: Katya Macedo <38017980+ektravel@users.noreply.github.com> --- docs/multi-stage-query/reference.md | 103 +++++++------- docs/multi-stage-query/security.md | 3 + docs/tutorials/tutorial-extern.md | 206 ++++++++++++++++++++++++++++ website/sidebars.json | 3 +- 4 files changed, 265 insertions(+), 50 deletions(-) create mode 100644 docs/tutorials/tutorial-extern.md diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index 6facbaedcb3d..d34ba1bdd4e3 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -72,9 +72,8 @@ FROM TABLE( `name` and a `type`. The type can be `string`, `long`, `double`, or `float`. This row signature is used to map the external data into the SQL layer. -Variation 2, with the input schema expressed in SQL using an `EXTEND` clause. (See the next -section for more detail on `EXTEND`). This format also uses named arguments to make the -SQL a bit easier to read: +Variation 2, with the input schema expressed in SQL using an `EXTEND` clause. See the next +section for more detail on `EXTEND`. This format also uses named arguments to make the SQL easier to read: ```sql SELECT @@ -95,12 +94,12 @@ For more information, see [Read external data with EXTERN](concepts.md#read-exte #### `EXTERN` to export to a destination -`EXTERN` can be used to specify a destination where you want to export data to. -This variation of EXTERN requires one argument, the details of the destination as specified below. -This variation additionally requires an `AS` clause to specify the format of the exported rows. +You can use `EXTERN` to specify a destination to export data. +This variation of `EXTERN` accepts the details of the destination as the only argument and requires an `AS` clause to specify the format of the exported rows. + +When you export data, Druid creates metadata files in a subdirectory named `_symlink_format_manifest`. +Within the `_symlink_format_manifest/manifest` directory, the `manifest` file lists absolute paths to exported files using the symlink manifest format. For example: -While exporting data, some metadata files will also be created at the destination in addition to the data. These files will be created in a directory `_symlink_format_manifest`. -- `_symlink_format_manifest/manifest`: Lists the files which were created as part of the export. The file is in the symlink manifest format, and consists of a list of absolute paths to the files created. ```text s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker2-partition2.csv s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker1-partition1.csv @@ -112,8 +111,8 @@ s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker0-par Keep the following in mind when using EXTERN to export rows: - Only INSERT statements are supported. - Only `CSV` format is supported as an export format. -- Partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) aren't supported with export statements. -- You can export to Amazon S3 or local storage. +- Partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) aren't supported with EXTERN statements. +- You can export to Amazon S3, Google GCS, or local storage. - The destination provided should contain no other files or directories. When you export data, use the `rowsPerPage` context parameter to restrict the size of exported files. @@ -128,10 +127,14 @@ SELECT FROM ``` -##### S3 +##### S3 - Amazon S3 + +To export results to S3, pass the `s3()` function as an argument to the `EXTERN` function. +Export to S3 requires the `druid-s3-extensions` extension. +For a list of S3 permissions the MSQ task engine requires to perform export, see [Permissions for durable storage](./security.md#s3). -Export results to S3 by passing the function `s3()` as an argument to the `EXTERN` function. Note that this requires the `druid-s3-extensions`. -The `s3()` function is a Druid function that configures the connection. Arguments for `s3()` should be passed as named parameters with the value in single quotes like the following example: +The `s3()` function configures the connection to AWS. +Pass all arguments for `s3()` as named parameters with their values enclosed in single quotes. For example: ```sql INSERT INTO @@ -146,25 +149,26 @@ FROM
Supported arguments for the function: -| Parameter | Required | Description | Default | -|-------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| -| `bucket` | Yes | The S3 bucket to which the files are exported to. The bucket and prefix combination should be whitelisted in `druid.export.storage.s3.allowedExportPaths`. | n/a | -| `prefix` | Yes | Path where the exported files would be created. The export query expects the destination to be empty. If the location includes other files, then the query will fail. The bucket and prefix combination should be whitelisted in `druid.export.storage.s3.allowedExportPaths`. | n/a | +| Parameter | Required | Description | Default | +|---|---|---|---| +| `bucket` | Yes | S3 bucket destination for exported files. You must add the bucket and prefix combination to the `druid.export.storage.s3.allowedExportPaths` allow list. | n/a | +| `prefix` | Yes | Destination path in the bucket to create exported files. The export query expects the destination path to be empty. If the location includes other files, the query will fail. You must add the bucket and prefix combination to the `druid.export.storage.s3.allowedExportPaths` allow list. | n/a | -The following runtime parameters must be configured to export into an S3 destination: +Configure the following runtime parameters to export to an S3 destination: -| Runtime Parameter | Required | Description | Default | -|----------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----| -| `druid.export.storage.s3.allowedExportPaths` | Yes | An array of S3 prefixes that are whitelisted as export destinations. Export queries fail if the export destination does not match any of the configured prefixes. Example: `[\"s3://bucket1/export/\", \"s3://bucket2/export/\"]` | n/a | -| `druid.export.storage.s3.tempLocalDir` | No | Directory used on the local storage of the worker to store temporary files required while uploading the data. Uses the task temporary directory by default. | n/a | -| `druid.export.storage.s3.maxRetry` | No | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | 10 | -| `druid.export.storage.s3.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls to S3, however it requires more disk space to store the temporary chunks. | 100MiB | +| Runtime parameter | Required | Description | Default | +|---|---|---|---| +| `druid.export.storage.s3.allowedExportPaths` | Yes | Array of S3 prefixes allowed as export destinations. Export queries fail if the export destination does not match any of the configured prefixes. For example: `[\"s3://bucket1/export/\", \"s3://bucket2/export/\"]` | n/a | +| `druid.export.storage.s3.tempLocalDir` | No | Directory for local storage where the worker stores temporary files before uploading the data to S3. | n/a | +| `druid.export.storage.s3.maxRetry` | No | Maximum number of attempts for S3 API calls to avoid failures due to transient errors. | 10 | +| `druid.export.storage.s3.chunkSize` | No | Individual chunk size to store temporarily in `tempDir`. Large chunk sizes reduce the number of API calls to S3, but require more disk space to store temporary chunks. | 100MiB | +##### GOOGLE - Google Cloud Storage -##### GS +To export query results to Google Cloud Storage (GCS), pass the `google()` function as an argument to the `EXTERN` function. +Export to GCS requires the `druid-google-extensions` extension. -Export results to GCS by passing the function `google()` as an argument to the `EXTERN` function. Note that this requires the `druid-google-extensions`. -The `google()` function is a Druid function that configures the connection. Arguments for `google()` should be passed as named parameters with the value in single quotes like the following example: +The `google()` function configures the connection to GCS. Pass the arguments for `google()` as named parameters with their values enclosed in single quotes. For example: ```sql INSERT INTO @@ -179,29 +183,30 @@ FROM
Supported arguments for the function: -| Parameter | Required | Description | Default | -|-------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| -| `bucket` | Yes | The GS bucket to which the files are exported to. The bucket and prefix combination should be whitelisted in `druid.export.storage.google.allowedExportPaths`. | n/a | -| `prefix` | Yes | Path where the exported files would be created. The export query expects the destination to be empty. If the location includes other files, then the query will fail. The bucket and prefix combination should be whitelisted in `druid.export.storage.google.allowedExportPaths`. | n/a | +| Parameter | Required | Description | Default | +|---|---|---|---| +| `bucket` | Yes | GCS bucket destination for exported files. You must add the bucket and prefix combination to the `druid.export.storage.google.allowedExportPaths` allow list. | n/a | +| `prefix` | Yes | Destination path in the bucket to create exported files. The export query expects the destination path to be empty. If the location includes other files, the query will fail. You must add the bucket and prefix combination to the `druid.export.storage.google.allowedExportPaths` allow list. | n/a | -The following runtime parameters must be configured to export into a GCS destination: +Configure the following runtime parameters to export query results to a GCS destination: -| Runtime Parameter | Required | Description | Default | -|--------------------------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| -| `druid.export.storage.google.allowedExportPaths` | Yes | An array of GS prefixes that are allowed as export destinations. Export queries fail if the export destination does not match any of the configured prefixes. Example: `[\"gs://bucket1/export/\", \"gs://bucket2/export/\"]` | n/a | -| `druid.export.storage.google.tempLocalDir` | No | Directory used on the local storage of the worker to store temporary files required while uploading the data. Uses the task temporary directory by default. | n/a | -| `druid.export.storage.google.maxRetry` | No | Defines the max number times to attempt GS API calls to avoid failures due to transient errors. | 10 | -| `druid.export.storage.google.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. A large chunk size reduces the API calls to GS; however, it requires more disk space to store the temporary chunks. | 4MiB | +| Runtime parameter | Required | Description | Default | +|---|---|---|---| +| `druid.export.storage.google.allowedExportPaths` | Yes | Array of GCS prefixes allowed as export destinations. Export queries fail if the export destination does not match any of the configured prefixes. For example: `[\"gs://bucket1/export/\", \"gs://bucket2/export/\"]` | n/a | +| `druid.export.storage.google.tempLocalDir` | No | Directory for local storage where the worker stores temporary files before uploading the data to GCS. | n/a | +| `druid.export.storage.google.maxRetry` | No | Maximum number of attempts for GCS API calls to avoid failures due to transient errors. | 10 | +| `druid.export.storage.google.chunkSize` | No | Individual chunk size to store temporarily in `tempDir`. Large chunk sizes reduce the number of API calls to GS, but require more disk space to store temporary chunks. | 4 MiB | -##### LOCAL -You can export to the local storage, which exports the results to the filesystem of the MSQ worker. +##### LOCAL - local file storage + +You can export queries to local storage. This process writes the results to the filesystem on the MSQ worker. This is useful in a single node setup or for testing but is not suitable for production use cases. -Export results to local storage by passing the function `LOCAL()` as an argument for the `EXTERN FUNCTION`. -To use local storage as an export destination, the runtime property `druid.export.storage.baseDir` must be configured on the Indexer/Middle Manager. -This value must be set to an absolute path on the local machine. Exporting data will be allowed to paths which match the prefix set by this value. -Arguments to `LOCAL()` should be passed as named parameters with the value in single quotes in the following example: +To export results to local storage, pass the `LOCAL()` function as an argument to the EXTERN function. +You must configure the runtime property `druid.export.storage.baseDir` as an absolute path on the Indexer or Middle Manager to use local storage as an export destination. +You can export data to paths that match this value as a prefix. +Pass all arguments to `LOCAL()` as named parameters with values enclosed in single quotes. For example: ```sql INSERT INTO @@ -214,13 +219,13 @@ SELECT FROM
``` -Supported arguments to the function: +Supported arguments for the function: -| Parameter | Required | Description | Default | -|-------------|--------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| --| -| `exportPath` | Yes | Absolute path to a subdirectory of `druid.export.storage.baseDir` used as the destination to export the results to. The export query expects the destination to be empty. If the location includes other files or directories, then the query will fail. | n/a | +| Parameter | Required | Description | Default | +|---|---|---|---| +| `exportPath` | Yes | Absolute path to a subdirectory of `druid.export.storage.baseDir` where Druid exports the query results. The destination must be empty. If the location includes other files or directories, the query will fail. | n/a | -For more information, see [Read external data with EXTERN](concepts.md#write-to-an-external-destination-with-extern). +For more information, see [Export external data with EXTERN](concepts.md#write-to-an-external-destination-with-extern). ### `INSERT` diff --git a/docs/multi-stage-query/security.md b/docs/multi-stage-query/security.md index 2aed00ab851f..d98695ebedce 100644 --- a/docs/multi-stage-query/security.md +++ b/docs/multi-stage-query/security.md @@ -81,3 +81,6 @@ The MSQ task engine needs the following permissions for pushing, fetching, and r - `Microsoft.Storage/storageAccounts/blobServices/containers/blobs/write` to write files in durable storage. - `Microsoft.Storage/storageAccounts/blobServices/containers/blobs/add/action` to create files in durable storage. - `Microsoft.Storage/storageAccounts/blobServices/containers/blobs/delete` to delete files when they're no longer needed. + + + diff --git a/docs/tutorials/tutorial-extern.md b/docs/tutorials/tutorial-extern.md new file mode 100644 index 000000000000..d44dd19a1542 --- /dev/null +++ b/docs/tutorials/tutorial-extern.md @@ -0,0 +1,206 @@ +--- +id: tutorial-extern +title: Export query results +sidebar_label: Export results +description: How to use EXTERN to export query results. +--- + + + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +This tutorial demonstrates how to use the Apache Druid® SQL [EXTERN](../multi-stage-query/reference.md#extern-function) function to export data. + +## Prerequisites + +Before you follow the steps in this tutorial, download Druid as described in the [Local quickstart](index.md). +Don't start Druid, you'll do that as part of the tutorial. + +You should be familiar with ingesting and querying data in Druid. +If you haven't already, go through the [Query data](../tutorials/tutorial-query.md) tutorial first. + +## Export query results to the local file system + +This example demonstrates how to configure Druid to export data to the local file system. +While you can use this approach to learn about EXTERN syntax for exporting data, it's not suitable for production scenarios. + +### Configure Druid local export directory + +The following commands set the base path for the Druid exports to `/tmp/druid/`. +If the account running Druid doesn't have access to `/tmp/druid/`, change the path. +For example: `/Users/Example/druid`. +If you change the path in this step, use the updated path in all subsequent steps. + +From the root of the Druid distribution, run the following: + +```bash +export export_path="/tmp/druid" +sed -i -e $'$a\\\n\\\n\\\n#\\\n###Local export\\\n#\\\ndruid.export.storage.baseDir='$export_path' conf/druid/auto/_common/common.runtime.properties +``` + +This adds the following section to the Druid `common.runtime.properties` configuration file located in `conf/druid/auto/_common`: + +``` +# +###Local export +# +druid.export.storage.baseDir=/tmp/druid/ +``` + +### Start Druid and load sample data + +1. From the root of the Druid distribution, launch Druid as follows: + + ```bash + ./bin/start-druid + ``` +1. After Druid starts, open [http://localhost:8888/](http://localhost:8888/) in your browser to access the Web Console. +1. From the [Query view](http://localhost:8888/unified-console.html#workbench), run the following command to load the Wikipedia example data set: + ```sql + REPLACE INTO "wikipedia" OVERWRITE ALL + WITH "ext" AS ( + SELECT * + FROM TABLE( + EXTERN( + '{"type":"http","uris":["https://druid.apache.org/data/wikipedia.json.gz"]}', + '{"type":"json"}' + ) + ) EXTEND ("isRobot" VARCHAR, "channel" VARCHAR, "timestamp" VARCHAR, "flags" VARCHAR, "isUnpatrolled" VARCHAR, "page" VARCHAR, "diffUrl" VARCHAR, "added" BIGINT, "comment" VARCHAR, "commentLength" BIGINT, "isNew" VARCHAR, "isMinor" VARCHAR, "delta" BIGINT, "isAnonymous" VARCHAR, "user" VARCHAR, "deltaBucket" BIGINT, "deleted" BIGINT, "namespace" VARCHAR, "cityName" VARCHAR, "countryName" VARCHAR, "regionIsoCode" VARCHAR, "metroCode" BIGINT, "countryIsoCode" VARCHAR, "regionName" VARCHAR) + ) + SELECT + TIME_PARSE("timestamp") AS "__time", + "isRobot", + "channel", + "flags", + "isUnpatrolled", + "page", + "diffUrl", + "added", + "comment", + "commentLength", + "isNew", + "isMinor", + "delta", + "isAnonymous", + "user", + "deltaBucket", + "deleted", + "namespace", + "cityName", + "countryName", + "regionIsoCode", + "metroCode", + "countryIsoCode", + "regionName" + FROM "ext" + PARTITIONED BY DAY + ``` + +### Query to export data + +Open a new tab and run the following query to export query results to the path: +`/tmp/druid/wiki_example`. +The path must be a subdirectory of the `druid.export.storage.baseDir`. + + +```sql +INSERT INTO + EXTERN( + local(exportPath => '/tmp/druid/wiki_example') + ) +AS CSV +SELECT "channel", + SUM("delta") AS "changes" +FROM "wikipedia" +GROUP BY 1 +LIMIT 10 +``` + +Druid exports the results of the query to the `/tmp/druid/wiki_example` directory. +Run the following command to list the contents of + +```bash +ls /tmp/druid/wiki_example +``` + +The results are a CSV file export of the data and a directory. + +## Export query results to cloud storage + +The steps to export to cloud storage are similar to exporting to the local file system. +Druid supports Amazon S3 or Google Cloud Storage (GCS) as cloud storage destinations. + +1. Enable the extension for your cloud storage destination. See [Loading core extensions](../configuration/extensions.md#loading-core-extensions). + - **Amazon S3**: `druid-s3-extensions` + - **GCS**: `google-extensions` + See [Loading core extensions](../configuration/extensions.md#loading-core-extensions) for more information. +1. Configure the additional properties for your cloud storage destination. Replace `{CLOUD}` with `s3` or `google` accordingly: + - `druid.export.storage.{CLOUD}.tempLocalDir`: Local temporary directory where the query engine stages files to export. + - `druid.export.storage.{CLOUD}.allowedExportPaths`: S3 or GS prefixes allowed as Druid export locations. For example `[\"s3://bucket1/export/\",\"s3://bucket2/export/\"]` or `[\"gs://bucket1/export/\", \"gs://bucket2/export/\"]`. + - `druid.export.storage.{CLOUD}.maxRetry`: Maximum number of times to attempt cloud API calls to avoid failures from transient errors. + - `druid.export.storage.s3.chunkSize`: Maximum size of individual data chunks to store in the temporary directory. +1. Verify the instance role has the correct permissions to the bucket and folders: read, write, create, and delete. See [Permissions for durable storage](../multi-stage-query/security.md#permissions-for-durable-storage). +1. Use the query syntax for your cloud storage type. For example: + + + + + + ```sql + INSERT INTO + EXTERN( + s3(bucket => 'your_bucket', prefix => 'prefix/to/files')) + AS CSV + SELECT "channel", + SUM("delta") AS "changes" + FROM "wikipedia" + GROUP BY 1 + LIMIT 10 + ``` + + + + + + ```sql + INSERT INTO + EXTERN + google(bucket => 'your_bucket', prefix => 'prefix/to/files') + AS CSV + SELECT "channel", + SUM("delta") AS "changes" + FROM "wikipedia" + GROUP BY 1 + LIMIT 10 + ``` + + + + + +1. When querying, use the `rowsPerPage` query context parameter to restrict the output file size. While it's possible to add a very large LIMIT at the end of your query to force Druid to create a single file, we don't recommend this technique. + +## Learn more + +See the following topics for more information: + +* [Export to a destination](../multi-stage-query/reference.md#extern-to-export-to-a-destination) for a reference of the EXTERN. +* [SQL-based ingestion security](../multi-stage-query/security.md/#permissions-for-durable-storage) for cloud permission requirements for MSQ. diff --git a/website/sidebars.json b/website/sidebars.json index e53040063188..b7cf66750388 100644 --- a/website/sidebars.json +++ b/website/sidebars.json @@ -38,7 +38,8 @@ "tutorials/tutorial-sql-query-view", "tutorials/tutorial-unnest-arrays", "tutorials/tutorial-query-deep-storage", - "tutorials/tutorial-latest-by"] + "tutorials/tutorial-latest-by", + "tutorials/tutorial-extern"] }, "tutorials/tutorial-sketches-theta", "tutorials/tutorial-jdbc", From b79382fa4d3adf5994990b89331348a7b53e3258 Mon Sep 17 00:00:00 2001 From: Charles Smith Date: Fri, 20 Dec 2024 10:22:50 -0800 Subject: [PATCH 3/4] [Docs] Updates the years in the compaction example (#17366) * Updates the years in the compaction example * update --- docs/data-management/compaction.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/data-management/compaction.md b/docs/data-management/compaction.md index 91ebf0d521ef..51bf7ee86427 100644 --- a/docs/data-management/compaction.md +++ b/docs/data-management/compaction.md @@ -78,7 +78,7 @@ Unless you modify the segment granularity in [`granularitySpec`](manual-compacti If segments have different segment granularities before compaction but there is some overlap in interval, Druid attempts find start and end of the overlapping interval and uses the closest segment granularity level for the compacted segment. -For example consider two overlapping segments: segment "A" for the interval 01/01/2021-01/02/2021 with day granularity and segment "B" for the interval 01/01/2021-02/01/2021. Druid attempts to combine and compact the overlapped segments. In this example, the earliest start time for the two segments is 01/01/2020 and the latest end time of the two segments is 02/01/2020. Druid compacts the segments together even though they have different segment granularity. Druid uses month segment granularity for the newly compacted segment even though segment A's original segment granularity was DAY. +For example consider two overlapping segments: segment "A" for the interval 01/01/2020-01/02/2020 with day granularity and segment "B" for the interval 01/01/2020-02/01/2020. Druid attempts to combine and compact the overlapped segments. In this example, the earliest start time for the two segments is 01/01/2020 and the latest end time of the two segments is 02/01/2020. Druid compacts the segments together even though they have different segment granularity. Druid uses month segment granularity for the newly compacted segment even though segment A's original segment granularity was day granularity. ### Query granularity handling From 09840ad6626649d33ae525335be221b3b9ee4ad7 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 6 Jan 2025 10:52:26 +0530 Subject: [PATCH 4/4] Docs: Add details of cost balancer strategy (#17595) Fixes #17530 --- docs/configuration/index.md | 2 +- docs/design/coordinator.md | 14 +++++++++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 75ffab25a605..26601c6db40f 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -896,7 +896,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti |`druid.coordinator.kill.ignoreDurationToRetain`|A way to override `druid.coordinator.kill.durationToRetain` and tell the coordinator that you do not care about the end date of unused segment intervals when it comes to killing them. If true, the coordinator considers all unused segments as eligible to be killed.|false| |`druid.coordinator.kill.bufferPeriod`|The amount of time that a segment must be unused before it is able to be permanently removed from metadata and deep storage. This can serve as a buffer period to prevent data loss if data ends up being needed after being marked unused.|`P30D`| |`druid.coordinator.kill.maxSegments`|The number of unused segments to kill per kill task. This number must be greater than 0. This only applies when `druid.coordinator.kill.on=true`.|100| -|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy for the Coordinator to use to distribute segments among the Historical services. `diskNormalized` weights the costs according to the servers' disk usage ratios - there are known issues with this strategy distributing segments unevenly across the cluster. `random` distributes segments among services randomly.|`cost`| +|`druid.coordinator.balancer.strategy`|The [balancing strategy](../design/coordinator.md#balancing-segments-in-a-tier) used by the Coordinator to distribute segments among the Historical servers in a tier. The `cost` strategy distributes segments by minimizing a cost function, `diskNormalized` weights these costs with the disk usage ratios of the servers and `random` distributes segments randomly.|`cost`| |`druid.coordinator.loadqueuepeon.http.repeatDelay`|The start and repeat delay (in milliseconds) for the load queue peon, which manages the load/drop queue of segments for any server.|1 minute| |`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop requests to batch in one HTTP request. Note that it must be smaller than `druid.segmentCache.numLoadingThreads` config on Historical service.|1| |`druid.coordinator.asOverlord.enabled`|Boolean value for whether this Coordinator service should act like an Overlord as well. This configuration allows users to simplify a Druid cluster by not having to deploy any standalone Overlord services. If set to true, then Overlord console is available at `http://coordinator-host:port/console.html` and be sure to set `druid.coordinator.asOverlord.overlordService` also.|false| diff --git a/docs/design/coordinator.md b/docs/design/coordinator.md index 9cb2279d4976..bc4c5ebc1cba 100644 --- a/docs/design/coordinator.md +++ b/docs/design/coordinator.md @@ -79,11 +79,19 @@ On each run, the Coordinator determines and cleans up unneeded eternity tombston ## Segment availability -If a Historical service restarts or becomes unavailable for any reason, the Coordinator will notice a service has gone missing and treat all segments served by that service as being dropped. Given a sufficient period of time, the segments may be reassigned to other Historical services in the cluster. However, each segment that is dropped is not immediately forgotten. Instead, there is a transitional data structure that stores all dropped segments with an associated lifetime. The lifetime represents a period of time in which the Coordinator will not reassign a dropped segment. Hence, if a Historical service becomes unavailable and available again within a short period of time, the Historical service will start up and serve segments from its cache without any those segments being reassigned across the cluster. +If a Historical service restarts or becomes unavailable for any reason, the Coordinator notices that a service has gone missing and treats all segments served by that service as being dropped. The segments are then reassigned to other Historical services in the cluster. However, each segment that is dropped is not immediately forgotten. Instead, there is a transitional data structure that stores all dropped segments with an associated lifetime. The lifetime represents a period of time in which the Coordinator will not reassign a dropped segment. Hence, if a Historical service becomes unavailable and available again within a short period of time, the Historical service will start up and serve segments from its cache without any of those segments being reassigned across the cluster. -## Balancing segment load +## Balancing segments in a tier -To ensure an even distribution of segments across Historical services in the cluster, the Coordinator service will find the total size of all segments being served by every Historical service each time the Coordinator runs. For every Historical service tier in the cluster, the Coordinator service will determine the Historical service with the highest utilization and the Historical service with the lowest utilization. The percent difference in utilization between the two services is computed, and if the result exceeds a certain threshold, a number of segments will be moved from the highest utilized service to the lowest utilized service. There is a configurable limit on the number of segments that can be moved from one service to another each time the Coordinator runs. Segments to be moved are selected at random and only moved if the resulting utilization calculation indicates the percentage difference between the highest and lowest servers has decreased. +Druid queries perform optimally when segments are distributed evenly across Historical services. An ideal distribution would ensure that all Historicals participate equally in the query load thus avoiding hot-spots in the system. To some extent, this can be achieved by keeping multiple replicas of a segment in a cluster. +But in a tier with several Historicals (or a low replication factor), segment replication is not sufficient to attain balance. +Thus, the Coordinator constantly monitors the set of segments present on each Historical in a tier and employs one of the following strategies to identify segments that may be moved from one Historical to another to retain balance. + +- `cost` (default): For a given segment in a tier, this strategy picks the server with the minimum "cost" of placing that segment. The cost is a function of the data interval of the segment and the data intervals of all the segments already present on the candidate server. In essence, this strategy tries to avoid placing segments with adjacent or overlapping data intervals on the same server. This is based on the premise that adjacent-interval segments are more likely to be used together in a query and placing them on the same server may lead to skewed CPU usages of Historicals. +- `diskNormalized`: A derivative of the `cost` strategy that weights the cost of placing a segment on a server with the disk usage ratio of the server. There are known issues with this strategy and is not recommended for a production cluster. +- `random`: Distributes segments randomly across servers. This is an experimental strategy and is not recommended for a production cluster. + +All of the above strategies prioritize moving segments from the Historical with the least available disk space. ## Automatic compaction