From 3d6b00fa253caca249f9fb3bef145cbbffc8d47d Mon Sep 17 00:00:00 2001 From: Antonio Murgia Date: Thu, 16 Nov 2023 14:51:39 +0100 Subject: [PATCH] Update delta protocol written spec Update openapi spec wip wip wip Add nix stuff wippp --- .../docs/protocols/delta-sharing-protocol.md | 278 ++++++++++++++++- protocol/delta-sharing-protocol-api.yml | 279 ++++++++++++++++-- sbin/buildNativeContainer.sh | 5 +- sbin/startDocusaurus.sh | 5 + server/app/build.gradle.kts | 1 + .../api/deltasharing/DeltaMappers.java | 85 +++--- .../server/DeltaSharesApiImpl.java | 69 +++-- .../java/io/whitefox/api/server/ApiUtils.java | 6 - .../io/whitefox/api/server/DeltaHeaders.java | 5 +- .../whitefox/api/server/WhitefoxMappers.java | 4 + server/core/build.gradle.kts | 1 + .../main/java/io/whitefox/core/Metadata.java | 147 --------- .../main/java/io/whitefox/core/Protocol.java | 38 --- .../io/whitefox/core/ReadTableResult.java | 55 ---- .../core/ReadTableResultToBeSigned.java | 56 ---- .../main/java/io/whitefox/core/TableFile.java | 1 - .../io/whitefox/core/TableFileToBeSigned.java | 2 - .../java/io/whitefox/core/delta/Metadata.java | 274 +++++++++++++++++ .../java/io/whitefox/core/delta/Protocol.java | 90 ++++++ .../java/io/whitefox/core/delta/Stats.java | 25 ++ .../whitefox/core/delta/signed/DeltaFile.java | 59 ++++ .../core/delta/signed/DeltaFileAction.java | 4 + .../core/delta/signed/FileAction.java | 4 + .../core/delta/signed/ParquetAddFile.java | 74 +++++ .../core/delta/signed/ParquetCDFFile.java | 61 ++++ .../core/delta/signed/ParquetFile.java | 79 +++++ .../core/delta/signed/ParquetFileAction.java | 7 + .../core/delta/signed/ParquetRemoveFile.java | 67 +++++ .../delta/unsigned/DeltaFileToBeSigned.java | 48 +++ .../delta/unsigned/FileActionToBeSigned.java | 6 + .../unsigned/ParquetAddFileToBeSigned.java | 49 +++ .../unsigned/ParquetCDFFileToBeSigned.java | 44 +++ .../unsigned/ParquetFileActionToBeSigned.java | 5 + .../delta/unsigned/ParquetFileToBeSigned.java | 53 ++++ .../unsigned/ParquetRemoveFileToBeSigned.java | 45 +++ .../core/partitions/PartitionValue.java | 5 + .../core/results/DeltaReadTableResult.java | 17 ++ .../DeltaReadTableResultToBeSigned.java | 18 ++ .../core/results/ParquetReadTableResult.java | 17 ++ .../ParquetReadTableResultToBeSigned.java | 18 ++ .../core/results/ReadTableResult.java | 13 + .../results/ReadTableResultToBeSigned.java | 15 + .../core/services/DeltaSharedTable.java | 44 ++- .../core/services/DeltaSharesService.java | 48 ++- .../core/services/DeltaSharesServiceImpl.java | 8 +- .../services/DeltaSharingCapabilities.java | 185 ++++++++++++ .../io/whitefox/core/services/FileSigner.java | 6 +- .../io/whitefox/core/services/NoOpSigner.java | 67 ++++- .../whitefox/core/services/S3FileSigner.java | 84 +++++- .../core/services/DeltaSharedTableTest.java | 2 +- .../DeltaSharingCapabilitiesTest.java | 111 +++++++ shell.nix | 6 + 52 files changed, 2242 insertions(+), 453 deletions(-) mode change 100644 => 100755 sbin/buildNativeContainer.sh create mode 100755 sbin/startDocusaurus.sh delete mode 100644 server/core/src/main/java/io/whitefox/core/Metadata.java delete mode 100644 server/core/src/main/java/io/whitefox/core/Protocol.java delete mode 100644 server/core/src/main/java/io/whitefox/core/ReadTableResult.java delete mode 100644 server/core/src/main/java/io/whitefox/core/ReadTableResultToBeSigned.java create mode 100644 server/core/src/main/java/io/whitefox/core/delta/Metadata.java create mode 100644 server/core/src/main/java/io/whitefox/core/delta/Protocol.java create mode 100644 server/core/src/main/java/io/whitefox/core/delta/Stats.java create mode 100644 server/core/src/main/java/io/whitefox/core/delta/signed/DeltaFile.java create mode 100644 server/core/src/main/java/io/whitefox/core/delta/signed/DeltaFileAction.java create mode 100644 server/core/src/main/java/io/whitefox/core/delta/signed/FileAction.java create mode 100644 server/core/src/main/java/io/whitefox/core/delta/signed/ParquetAddFile.java create mode 100644 server/core/src/main/java/io/whitefox/core/delta/signed/ParquetCDFFile.java create mode 100644 server/core/src/main/java/io/whitefox/core/delta/signed/ParquetFile.java create mode 100644 server/core/src/main/java/io/whitefox/core/delta/signed/ParquetFileAction.java create mode 100644 server/core/src/main/java/io/whitefox/core/delta/signed/ParquetRemoveFile.java create mode 100644 server/core/src/main/java/io/whitefox/core/delta/unsigned/DeltaFileToBeSigned.java create mode 100644 server/core/src/main/java/io/whitefox/core/delta/unsigned/FileActionToBeSigned.java create mode 100644 server/core/src/main/java/io/whitefox/core/delta/unsigned/ParquetAddFileToBeSigned.java create mode 100644 server/core/src/main/java/io/whitefox/core/delta/unsigned/ParquetCDFFileToBeSigned.java create mode 100644 server/core/src/main/java/io/whitefox/core/delta/unsigned/ParquetFileActionToBeSigned.java create mode 100644 server/core/src/main/java/io/whitefox/core/delta/unsigned/ParquetFileToBeSigned.java create mode 100644 server/core/src/main/java/io/whitefox/core/delta/unsigned/ParquetRemoveFileToBeSigned.java create mode 100644 server/core/src/main/java/io/whitefox/core/partitions/PartitionValue.java create mode 100644 server/core/src/main/java/io/whitefox/core/results/DeltaReadTableResult.java create mode 100644 server/core/src/main/java/io/whitefox/core/results/DeltaReadTableResultToBeSigned.java create mode 100644 server/core/src/main/java/io/whitefox/core/results/ParquetReadTableResult.java create mode 100644 server/core/src/main/java/io/whitefox/core/results/ParquetReadTableResultToBeSigned.java create mode 100644 server/core/src/main/java/io/whitefox/core/results/ReadTableResult.java create mode 100644 server/core/src/main/java/io/whitefox/core/results/ReadTableResultToBeSigned.java create mode 100644 server/core/src/main/java/io/whitefox/core/services/DeltaSharingCapabilities.java create mode 100644 server/core/src/test/java/io/whitefox/core/services/DeltaSharingCapabilitiesTest.java create mode 100644 shell.nix diff --git a/docsite/docs/protocols/delta-sharing-protocol.md b/docsite/docs/protocols/delta-sharing-protocol.md index 35a6fb5d4..e73ec48f1 100644 --- a/docsite/docs/protocols/delta-sharing-protocol.md +++ b/docsite/docs/protocols/delta-sharing-protocol.md @@ -1376,12 +1376,39 @@ delta-table-version: 123 This is the API for clients to query the table schema and other metadata. -HTTP Request | Value --|- -Method | `GET` -Header | `Authorization: Bearer [token]` -URL | `{prefix}/shares/{share}/schemas/{schema}/tables/{table}/metadata` -URL Parameters | **\{share\}**: The share name to query. It's case-insensitive.
**\{schema\}**: The schema name to query. It's case-insensitive.
**\{table\}**: The table name to query. It's case-insensitive. + + + + + + + + + + + + + + + + + + + + + +
HTTP RequestValue
Method`GET`
Headers +`Authorization: Bearer {token}` + +Optional: `delta-sharing-capabilities: responseformat=delta;readerfeatures=deletionvectors`, see +[Delta Sharing Capabilities Header](#delta-sharing-capabilities-header) for details. +
URL +`{prefix}/shares/{share}/schemas/{schema}/tables/{table}/metadata` +
URL Parameters +**\{share\}**: The share name to query. It's case-insensitive.
+**\{schema\}**: The schema name to query. It's case-insensitive.
+**\{table\}**: The table name to query. It's case-insensitive. +
200: The table metadata was successfully returned. @@ -1409,10 +1436,17 @@ URL Parameters | **\{share\}**: The share name to query. It's case-insensitive.< A sequence of JSON strings delimited by newline. Each line is a JSON object defined in [API Response Format](#api-response-format). +When `responseformat=parquet`, each line is a JSON object defined in [API Response Format in Parquet](#api-response-format-in-parquet). + The response contains two lines: - The first line is [a JSON wrapper object](#json-wrapper-object-in-each-line) containing the table [Protocol](#protocol) object. - The second line is [a JSON wrapper object](#json-wrapper-object-in-each-line) containing the table [Metadata](#metadata) object. +When `responseformat=delta`, each line is a Json object defined in [API Response Format in Delta](#api-response-format-in-delta). +The response contains two lines: +- The first line is [a JSON wrapper object](#json-wrapper-object-in-each-line-in-delta) containing the delta [Protocol](#protocol-in-delta-format) object. +- The second line is [a JSON wrapper object](#json-wrapper-object-in-each-line-in-delta) containing the delta [Metadata](#metadata-in-delta-format) object. + @@ -1572,7 +1606,7 @@ The response contains two lines:
-Example (See [API Response Format](#api-response-format) for more details about the format): +Example (See [API Response Format in Parquet](#api-response-format-in-parquet) for more details about the format): `GET {prefix}/shares/vaccine_share/schemas/acme_vaccine_data/tables/vaccine_patients/metadata` @@ -1627,6 +1661,10 @@ This is the API for clients to read data from a table. Optional: `Content-Type: application/json; charset=utf-8` +Optional: `delta-sharing-capabilities: responseformat=delta;readerfeatures=deletionvectors`, see +[Delta Sharing Capabilities Header](#delta-sharing-capabilities-header) for details. + + @@ -1697,7 +1735,8 @@ returned in the response. Body -A sequence of JSON strings delimited by newline. Each line is a JSON object defined in [API Response Format](#api-response-format). +When `responseformat=parquet`, a sequence of JSON strings delimited by newline. Each line is a JSON object defined in [API Response Format in Parquet](#api-response-format-in-parquet). + The response contains multiple lines: - The first line is [a JSON wrapper object](#json-wrapper-object-in-each-line) containing the table [Protocol](#protocol) object. @@ -1707,6 +1746,18 @@ The response contains multiple lines: - The lines are [files](#file) in the table (otherwise). - The ordering of the lines doesn't matter. +When `responseformat=delta`, a sequence of JSON strings delimited by newline. Each line is a JSON object defined in [API Response Format in Delta](#api-response-format-in-delta). + +The response contains multiple lines: +- The first line is [a JSON wrapper object](#json-wrapper-object-in-each-line-in-delta) containing the delta [Protocol](#protocol-in-delta-format) object. +- The second line is [a JSON wrapper object](#json-wrapper-object-in-each-line-in-delta) containing the delta [Metadata](#metadata-in-delta-format) object. +- The rest of the lines are [JSON wrapper objects](#json-wrapper-object-in-each-line-in-delta) for [Metadata](#metadata-in-delta-format), or [files](#file-in-delta-format). + - The lines are [files](#file-in-delta-format) which wraps the delta single action in the table (otherwise), with possible historical [Metadata](#metadata-in-delta-format) (when startingVersion is set). + - The ordering of the lines doesn't matter. + +The delta actions are wrapped because they will be used to construct a local delta log on the recipient +side and then leverage the delta library to read data. + @@ -1899,7 +1950,7 @@ The request body should be a JSON string containing the following optional field When `predicateHints` and `limitHint` are both present, the server should apply `predicateHints` first then `limitHint`. As these two parameters are hints rather than enforcement, the client must always apply `predicateHints` and `limitHint` on the response returned by the server if it wishes to filter and limit the returned data. An empty JSON object (`{}`) should be provided when these two parameters are missing. -Example (See [API Response Format](#api-response-format) for more details about the format): +Example (See [API Response Format in Parquet](#api-response-format-in-parquet) for more details about the format): `POST {prefix}/shares/vaccine_share/schemas/acme_vaccine_data/tables/vaccine_patients/query` @@ -1993,6 +2044,8 @@ The change data feed represents row-level changes between versions of a Delta ta `Authorization: Bearer [token]` +Optional: `delta-sharing-capabilities: responseformat=delta;readerfeatures=deletionvectors`, see [Delta Sharing Capabilities Header](#delta-sharing-capabilities-header) for details. + @@ -2048,7 +2101,7 @@ The change data feed represents row-level changes between versions of a Delta ta Body -A sequence of JSON strings delimited by newline. Each line is a JSON object defined in [API Response Format](#api-response-format). +When `responseformat=parquet`, a sequence of JSON strings delimited by newline. Each line is a JSON object defined in [API Response Format in Parquet](#api-response-format-in-parquet). The response contains multiple lines: - The first line is [a JSON wrapper object](#json-wrapper-object-in-each-line) containing the table [Protocol](#protocol) object. @@ -2057,6 +2110,13 @@ The response contains multiple lines: - Historical [Metadata](#metadata) will be returned if includeHistoricalMetadata is set to true. - The ordering of the lines doesn't matter. +When `responseformat=delta`, a sequence of JSON strings delimited by newline. Each line is a JSON object defined in [API Response Format in Parquet](#api-response-format-in-delta). +- The first line is [a JSON wrapper object](#json-wrapper-object-in-each-line-in-delta) containing the delta [Protocol](#protocol-in-delta-format) object. +- The second line is [a JSON wrapper object](#json-wrapper-object-in-each-line-in-delta) containing the delta [Metadata](#metadata-in-delta-format) object. +- The rest of the lines are [JSON wrapper objects](#json-wrapper-object-in-each-line) for [Files](#file-in-delta-format) of the change data feed. + - Historical [Metadata](#metadata) will be returned if includeHistoricalMetadata is set to true. + - The ordering of the lines doesn't matter. + @@ -2216,7 +2276,7 @@ The response contains multiple lines: -Example (See [API Response Format](#api-response-format) for more details about the format): +Example (See [API Response Format in Parquet](#api-response-format-in-parquet) for more details about the format): `GET {prefix}/shares/vaccine_share/schemas/acme_vaccine_data/tables/vaccine_patients/changes?startingVersion=0&endingVersion=2` @@ -2289,7 +2349,61 @@ content-type: application/x-ndjson; charset=utf-8 ### Timestamp Format Accepted timestamp format by a delta sharing server: in the ISO8601 format, in the UTC timezone, such as `2022-01-01T00:00:00Z`. -## API Response Format +## Delta Sharing Capabilities Header + +This section explains the details of delta sharing capabilities header, which was introduced to help +delta sharing catch up with features in [delta protocol](https://github.com/delta-io/delta/blob/master/PROTOCOL.md). + +The key of the header is **delta-sharing-capabilities**, the value is semicolon separated capabilities. +Each capability is in the format of "key=value1,value2", values are separated by commas. +Example: "responseformat=delta;readerfeatures=deletionvectors,columnmapping". All keys and values should +be case-insensitive when processed by the server. + +This header can be used in the request for [Query Table Metadata](#query-table-metadata), +[Query Table](#read-data-from-a-table), and [Query Table Changes](#read-change-data-feed-from-a-table). + +**Compatibility** + + + + + + + + + + + + + + + + + +
Client/ServerServer that doesn't recognize the headerServer that recognizes the header
Client that doesn't specify the headerResponse is in parquet formatResponse must be in parquet format.
Client that specifies the headerThe header is ignored at the server, and the format of the response must always be parquet.The header is processed properly by the server.
If there's only one responseFormat specified, the server must respect and return in the requested format.
If there's a list of responseFormat specified, such as `responseFormat=delta,parquet`. The server may choose to respond in parquet format if the table does not have any advanced features. The server must respond in delta format if the table has advanced features which are not compatible with the parquet format.
+ +- If the client requests `delta` format and the response is in `parquet` format, the delta sharing +client will NOT throw an error. Ideally, the caller of the client's method should handle such +responses to be compatible with legacy servers. +- If the client doesn't specify any header, or requests `parquet` format and the response is in +`delta` format, the delta sharing client must throw an error. + +### responseFormat +Indicates the format to expect in the [API Response Format in Parquet](#api-response-format-in-parquet), two values are supported. + +- parquet: Represents the format of the delta sharing protocol that has been used in `delta-sharing-spark` 1.0 +and less, also the default format if `responseFormat` is missing from the header. All the existing delta +sharing connectors are able to process data in this format. +- **delta**: format can be used to read a shared delta table with minReaderVersion > 1, which contains +readerFeatures such as Deletion Vector or Column Mapping. `delta-sharing-spark` libraries +that are able to process `responseformat=delta` will be released soon. + +### readerFeatures +readerfeatures is only useful when `responseformat=delta`, it includes values from [delta reader +features](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#table-features). It's set by the +caller of `DeltaSharingClient` to indicate its ability to process delta readerFeatures. + +## API Response Format in Parquet This section discusses the API Response Format returned by the server. @@ -2299,7 +2413,7 @@ The JSON object in each line is a wrapper object that may contain the following Field Name | Data Type | Description | Optional/Required -|-|-|- -protocol | The [Protocol](#protocol) JSON object. | Defines the versioning information about the API Response Format. | Optional +protocol | The [Protocol](#protocol) JSON object. | Defines the versioning information about the API Response Format in Parquet. | Optional metaData | The [Metadata](#metadata) JSON object. | The table metadata including schema, partitionColumns, etc. | Optional file | The [File](#file) JSON object. | An individual data file in the table. | Optional @@ -2335,7 +2449,7 @@ description | String | User-provided description for this table | Optional format | [Format](#format) Object | Specification of the encoding for the files stored in the table. | Required schemaString | String | Schema of the table. This is a serialized JSON string which can be deserialized to a [Schema](#schema-object) Object. | Required partitionColumns | Array[String] | An array containing the names of columns by which the data should be partitioned. When a table doesn’t have partition columns, this will be an **empty** array. | Required -configuration | Map[String, String] | A map containing configuration options for the table +configuration | Map[String, String] | A map containing configuration options for the table | Optional version | Long | The table version the metadata corresponds to, returned when querying table data with a version or timestamp parameter, or cdf query with includeHistoricalMetadata set to true. | Optional size | Long | The size of the table in bytes, will be returned if available in the delta log. | Optional numFiles | Long | The number of files in the table, will be returned if available in the delta log. | Optional @@ -2762,6 +2876,119 @@ nullCount | The number of `null` values for this column minValues | A value smaller than all values present in the file for this column maxValues | A value larger than all values present in the file for this column +## API Response Format in Delta + +This section discusses the API Response Format in Delta returned by the server. When a table is shared +as delta format, the actions in the response could be put in a delta log in the local storage on the +recipient side for the delta library to read data out of it directly. This way of sharing makes the +delta sharing protocol more transparent and robust in supporting advanced delta feature, and minimizes code duplication. + +### JSON Wrapper Object In Each Line in Delta + +The JSON object in each line is a wrapper object that may contain the following fields. For each +field, it is a wrapper of a [delta action](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#actions)(which keeps the action in its delta format and with original +values), and with some additional fields for delta sharing functionalities. + +Field Name | Data Type | Description | Optional/Required +-|-|-|- +protocol | The [Protocol in Delta Format](#protocol-in-delta-format) JSON object. | A wrapper of delta protocol. | Optional +metaData | The [Metadata in Delta Format](#metadata-in-delta-format) JSON object. | A wrapper of delta metadata, including some delta sharing specific fields. | Optional +file | The [File in Delta Format](#file-in-delta-format) JSON object. | A wrapper of a delta single action in the table. | Optional + +It must contain only **ONE** of the above fields. + +### Protocol in Delta Format + +A wrapper of a [delta protocol](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#protocol-evolution). + +Field Name | Data Type | Description | Optional/Required +-|-|-|- +deltaProtocol | Delta Protocol | Need to be parsed by a delta library as a delta protocol. | Required + +Example (for illustration purposes; each JSON object must be a single line in the response): + +```json +{ + "protocol": { + "deltaProtocol": { + "minReaderVersion": 3, + "minWriterVersion": 7 + } + } +} +``` + +### Metadata in Delta Format + +A wrapper of a [delta Metadata](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-metadata). + +Field Name | Data Type | Description | Optional/Required +-|-|-|- +deltaMetadata | Delta Metadata | Need to be parsed by a delta library as delta metadata | Required +version | Long | The table version the metadata corresponds to, returned when querying table data with a version or timestamp parameter, or cdf query with includeHistoricalMetadata set to true. | Optional +size | Long | The size of the table in bytes, will be returned if available in the delta log. | Optional +numFiles | Long | The number of files in the table, will be returned if available in the delta log. | Optional + +Example (for illustration purposes; each JSON object must be a single line in the response): + +```json +{ + "metaData": { + "version": 20, + "size": 123456, + "numFiles": 5, + "deltaMetadata": { + "partitionColumns": [ + "date" + ], + "format": { + "provider": "parquet" + }, + "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"eventTime\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}", + "id": "f8d5c169-3d01-4ca3-ad9e-7dc3355aedb2", + "configuration": { + "enableChangeDataFeed": "true" + } + } + } +} +``` + +### File in Delta Format + +A wrapper of a delta file action, which can be [Add File and Remove File](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file), +or [Add CDC File](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-cdc-file) + +Field Name | Data Type | Description | Optional/Required +-|-|-|- +id | String | A unique string for the file in a table. The same file is guaranteed to have the same id across multiple requests. A client may cache the file content and use this id as a key to decide whether to use the cached file content. | Required +deletionVectorFileId | String | A unique string for the deletion vector file in a table. The same deletion vector file is guaranteed to have the same id across multiple requests. A client may cache the file content and use this id as a key to decide whether to use the cached file content. | Optional +version | Long | The table version of the file, returned when querying a table data with a version or timestamp parameter. | Optional +timestamp | Long | The unix timestamp corresponding to the table version of the file, in milliseconds, returned when querying a table data with a version or timestamp parameter. | Optional +expirationTimestamp | Long | The unix timestamp corresponding to the expiration of the url, in milliseconds, returned when the server supports the feature. | Optional +deltaSingleAction | Delta SingleAction | Need to be parsed by a delta library as a delta single action, the path field is replaced by pr-signed url. | Required + +Example (for illustration purposes; each JSON object must be a single line in the response): + +```json +{ + "file": { + "id": "591723a8-6a27-4240-a90e-57426f4736d2", + "size": 573, + "expirationTimestamp": 1652140800000, + "deltaSingleAction": { + "add": { + "path": "https://.s3.us-west-2.amazonaws.com/delta-exchange-test/table2/date%3D2021-04-28/part-00000-591723a8-6a27-4240-a90e-57426f4736d2.c000.snappy.parquet?...", + "partitionValues": { + "date": "2021-04-28" + }, + "stats": "{\"numRecords\":1,\"minValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"maxValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"nullCount\":{\"eventTime\":0}}" + } + } + } +} +``` + ## SQL Expressions for Filtering The client may send a sequence of predicates to the server as a hint to request fewer files when it only wishes to query a subset of the data (e.g., data where the `country` field is `US`). The server may try its best to filter files based on the predicates. This is **BEST EFFORT**, so the server may return files that don’t satisfy the predicates. For example, if the server fails to parse a SQL expression, the server can skip it. Hence, the client should always apply predicates to filter the data returned by the server. @@ -2824,7 +3051,9 @@ ValueType | Description "long" | Represents a Long type. "string" | Represents a String type. "date" | Represents a Date type in "yyyy-mm-dd" format. - +"float" | Represents a Float type. +"double" | Represents a Double type. +"timestamp" | Represents a timestamp in [Timestamp Format](#timestamp-format). Examples @@ -2880,6 +3109,25 @@ Examples } ``` +## Delta Sharing Streaming Specs +Delta Sharing Streaming is supported starting from delta-sharing-spark 0.6.0. As it's implemented +based on spark structured streaming, it leverages a pull model to consume the new data of the shared +table from the delta sharing server. In addition to most options supported in delta streaming, +there are two options/spark configs for delta sharing streaming. + +- spark config **spark.delta.sharing.streaming.queryTableVersionIntervalSeconds**: DeltaSharingSource +leverages [getTableVersion](#query-table-version) rpc to check whether there is new data available +to consume. In order to reduce the traffic burden to the delta sharing server, there's a minimum 30 +seconds interval between two getTableVersion rpcs to the delta sharing server. Though, if you are ok +with less freshness on the data and want to reduce the traffic to the server, you can set this +config to a larger number, for example: 60s or 120s. An error will be thrown if it's set less than 30 seconds. +- option **maxVersionsPerRpc**: DeltaSharingSource leverages [QueryTable](#read-data-from-a-table) +rpc to continuously read new data from the delta sharing server. There might be too much +new data to be returned from the server if the streaming has paused for a while on the recipient +side. Its default value is 100, a smaller number is recommended such as `.option("maxVersionsPerRpc", 10)` +to reduce the traffic load for each rpc. This shouldn't affect the freshness of the data significantly +assuming the process time of the delta sharing server grows linearly with `maxVersionsPerRpc`. + # Profile File Format diff --git a/protocol/delta-sharing-protocol-api.yml b/protocol/delta-sharing-protocol-api.yml index fbc435827..bb2ff4841 100644 --- a/protocol/delta-sharing-protocol-api.yml +++ b/protocol/delta-sharing-protocol-api.yml @@ -527,6 +527,12 @@ paths: description: 'If set to true, return the historical metadata if seen in the delta log. This is for the streaming client to check if the table schema is still read compatible.' schema: type: boolean + - in: header + name: delta-sharing-capabilities + required: false + description: 'Delta Sharing Capabilities' + schema: + type: string responses: '400': $ref: "#/components/responses/400" @@ -581,11 +587,14 @@ components: not Unary Represents a logical not check. This op should have once child. The supported value types: ValueType Description - "bool" Represents an Boolean type. - "int" Represents an Integer type. - "long" Represents a Long type. - "string" Represents a String type. - "date" Represents a Date type in "yyyy-mm-dd" format. + "bool" Represents an Boolean type. + "int" Represents an Integer type. + "long" Represents a Long type. + "string" Represents a String type. + "date" Represents a Date type in "yyyy-mm-dd" format. + "float" Represents a Float type. + "double" Represents a Double type. + "timestamp" Represents a timestamp in ISO8601 format, in the UTC timezone. ListShareResponse: type: object @@ -771,27 +780,31 @@ components: properties: protocol: # it refers to ./delta-sharing-protocol.md#protocol - $ref: '#/components/schemas/ProtocolObject' + $ref: '#/components/schemas/ParquetProtocolObject' metadata: # it refers to ./delta-sharing-protocol.md#metadata - $ref: '#/components/schemas/MetadataObject' + $ref: '#/components/schemas/ParquetMetadataObject' # This is not used for the spec but comes handy for autogeneration TableQueryResponseObject: + oneOf: + - $ref: '#/components/schemas/ParquetTableQueryResponseObject' + - $ref: '#/components/schemas/DeltaTableQueryResponseObject' + ParquetTableQueryResponseObject: type: object properties: protocol: # it refers to ./delta-sharing-protocol.md#protocol - $ref: '#/components/schemas/ProtocolObject' + $ref: '#/components/schemas/ParquetProtocolObject' metadata: # it refers to ./delta-sharing-protocol.md#metadata - $ref: '#/components/schemas/MetadataObject' + $ref: '#/components/schemas/ParquetMetadataObject' files: type: array items: # it refers to ./delta-sharing-protocol.md#file - $ref: '#/components/schemas/FileObject' - FileObject: + $ref: '#/components/schemas/ParquetFileObject' + ParquetFileObject: type: object properties: file: @@ -825,7 +838,7 @@ components: - id - partitionValues - size - ProtocolObject: + ParquetProtocolObject: type: object properties: protocol: @@ -834,15 +847,14 @@ components: minReaderVersion: type: integer format: int32 - FormatObject: + ParquetFormatObject: type: object properties: provider: type: string required: - provider - - MetadataObject: + ParquetMetadataObject: type: object properties: metaData: @@ -855,7 +867,7 @@ components: description: type: string format: - $ref: '#/components/schemas/FormatObject' + $ref: '#/components/schemas/ParquetFormatObject' schemaString: type: string partitionColumns: @@ -881,7 +893,240 @@ components: - format - schemaString - partitionColumns - + DeltaTableQueryResponseObject: + type: object + properties: + protocol: + # it refers to ./delta-sharing-protocol.md#protocol + $ref: '#/components/schemas/DeltaProtocolObject' + metadata: + # it refers to ./delta-sharing-protocol.md#metadata + $ref: '#/components/schemas/DeltaMetadataObject' + files: + type: array + items: + # it refers to ./delta-sharing-protocol.md#file + $ref: '#/components/schemas/DeltaFileObject' + DeltaProtocolObject: + type: object + properties: + protocol: + type: object + properties: + deltaProtocol: + type: object + properties: + minReaderVersion: + type: integer + format: int32 + minWriterVersion: + type: integer + format: int32 + DeltaFormatObject: + type: object + properties: + provider: + type: string + options: + type: object + additionalProperties: + type: string + required: + - provider + DeltaMetadata: + type: object + description: see https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-metadata + required: + - id + - format + - schemaString + - partitionColumns + - configuration + properties: + id: + type: string + name: + type: string + description: + type: string + format: + $ref: '#/components/schemas/DeltaFormatObject' + schemaString: + type: string + partitionColumns: + type: array + items: + type: string + createdTime: + type: integer + format: int64 + configuration: + type: object + additionalProperties: + type: string + DeltaMetadataObject: + type: object + properties: + metaData: + type: object + properties: + version: + type: integer + format: int64 + size: + type: integer + format: int64 + numFiles: + type: integer + format: int64 + deltaMetadata: + $ref: '#/components/schemas/DeltaMetadata' + required: [ deltaMetadata ] + DeltaFileObject: + required: + - id + - deltaSingleAction + properties: + id: + type: string + deletionVectorFileId: + type: string + version: + type: integer + format: int64 + timestamp: + type: integer + format: int64 + expirationTimestamp: + type: integer + format: int64 + deltaSingleAction: + $ref: '#/components/schemas/DeltaSingleAction' + DeltaSingleAction: + type: object + description: only one field can be not null, container of delta actions such as file, add, cdf or remove see https://github.com/delta-io/delta/tree/master/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions + properties: + add: + $ref: '#/components/schemas/DeltaAddFileForCDFAction' + cdf: + $ref: '#/components/schemas/DeltaAddCDCFileAction' + file: + $ref: '#/components/schemas/DeltaAddFileAction' + remove: + $ref: '#/components/schemas/DeltaRemoveFileAction' + DeltaAddFileForCDFAction: + type: object + description: see io.delta.sharing.server.model.AddFileForCDF + properties: + url: + type: string + id: + type: string + partitionValues: + type: object + additionalProperties: + type: string + size: + type: integer + format: int64 + expirationTimestamp: + type: integer + format: int64 + version: + type: integer + format: int64 + timestamp: + type: integer + format: int64 + stats: + type: string + DeltaAddCDCFileAction: + type: object + description: see io.delta.sharing.server.model.AddCDCFile + properties: + url: + type: string + id: + type: string + partitionValues: + type: object + additionalProperties: + type: string + size: + type: integer + format: int64 + expirationTimestamp: + type: integer + format: int64 + timestamp: + type: integer + format: int64 + version: + type: integer + format: int64 + DeltaAddFileAction: + type: object + description: see io.delta.sharing.server.model.AddFile + properties: + url: + type: string + id: + type: string + partitionValues: + type: object + additionalProperties: + type: string + size: + type: integer + format: int64 + stats: + type: string + expirationTimestamp: + type: integer + format: int64 + timestamp: + type: integer + format: int64 + version: + type: integer + format: int64 + DeltaRemoveFileAction: + type: object + properties: + url: + type: string + id: + type: string + partitionValues: + type: object + additionalProperties: + type: string + size: + type: integer + format: int64 + expirationTimestamp: + type: integer + format: int64 + timestamp: + type: integer + format: int64 + version: + type: integer + format: int64 + DeltaEndStreamAction: + description: An action that is returned as the last line of the streaming response. It allows the server to include additional data that might be dynamically generated while the streaming message is sent + type: object + properties: + refreshToken: + type: string + description: a token used to refresh pre-signed urls for a long running query + nextPageToken: + type: string + description: a token used to retrieve the subsequent page of a query + minUrlExpirationTimestamp: + description: the minimum url expiration timestamp of the urls returned in current response + type: integer + format: int64 responses: "400": description: The request is malformed diff --git a/sbin/buildNativeContainer.sh b/sbin/buildNativeContainer.sh old mode 100644 new mode 100755 index f18e23a7c..b7478f6d1 --- a/sbin/buildNativeContainer.sh +++ b/sbin/buildNativeContainer.sh @@ -1,2 +1,5 @@ #!/bin/bash -./gradlew server:imageBuild -Dquarkus.package.type=native -Dquarkus.native.container-build=true -Dquarkus.container-image.name=server-native \ No newline at end of file +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +pushd $SCRIPT_DIR/.. +nix-shell --run './gradlew server:app:imageBuild -Dquarkus.package.type=native -Dquarkus.native.container-build=true -Dquarkus.container-image.name=server-native' +popd diff --git a/sbin/startDocusaurus.sh b/sbin/startDocusaurus.sh new file mode 100755 index 000000000..d3cc42fad --- /dev/null +++ b/sbin/startDocusaurus.sh @@ -0,0 +1,5 @@ +#!/bin/bash +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +pushd $SCRIPT_DIR/.. +nix-shell --run './gradlew docsite:npm_run_start' +popd \ No newline at end of file diff --git a/server/app/build.gradle.kts b/server/app/build.gradle.kts index fce88e2d9..5104765cd 100644 --- a/server/app/build.gradle.kts +++ b/server/app/build.gradle.kts @@ -49,6 +49,7 @@ val serverGeneratorProperties = mapOf( "dateLibrary" to "java8", "disallowAdditionalPropertiesIfNotPresent" to "false", "generateBuilders" to "false", + "legacyDiscriminatorBehavior" to "false", "generatePom" to "false", "interfaceOnly" to "true", "library" to "quarkus", diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java b/server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java index 6948065c7..44826675a 100644 --- a/server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java @@ -5,6 +5,10 @@ import io.whitefox.core.*; import io.whitefox.core.Schema; import io.whitefox.core.Share; +import io.whitefox.core.delta.Metadata; +import io.whitefox.core.delta.Protocol; +import io.whitefox.core.results.ReadTableResult; +import io.whitefox.core.services.DeltaSharingCapabilities; import java.util.*; import java.util.stream.Collectors; @@ -58,37 +62,46 @@ public static TableQueryResponseObject readTableResult2api(ReadTableResult readT .collect(Collectors.toList())); } - private static MetadataObject metadata2Api(Metadata metadata) { - return new MetadataObject() - .metaData(new MetadataObjectMetaData() + private static ParquetMetadataObject metadata2Api(Metadata metadata) { + return new ParquetMetadataObject() + .metaData(new ParquetMetadataObjectMetaData() + .numFiles(metadata.numFiles().orElse(null)) + .version(metadata.version()) + .size(metadata.size().orElse(null)) .id(metadata.id()) .name(metadata.name().orElse(null)) .description(metadata.description().orElse(null)) - .format(new FormatObject().provider(metadata.format().provider())) + .format(new ParquetFormatObject().provider(metadata.format().provider())) .schemaString(metadata.tableSchema().structType().toJson()) .partitionColumns(metadata.partitionColumns()) - ._configuration(metadata.configuration()) - .version(metadata.version().orElse(null)) - .numFiles(metadata.numFiles().orElse(null))); + ._configuration(metadata.configuration())); } - private static ProtocolObject protocol2Api(Protocol protocol) { - return new ProtocolObject() - .protocol(new ProtocolObjectProtocol() - .minReaderVersion(protocol.minReaderVersion().orElse(1))); + private static DeltaProtocolObject protocol2Api(Protocol protocol) { + return new DeltaProtocolObject() + .protocol(new DeltaProtocolObjectProtocol() + .deltaProtocol(new DeltaProtocolObjectProtocolDeltaProtocol() + .minReaderVersion(protocol.minReaderVersion().orElse(1)) + .minWriterVersion(protocol.minWriterVersion().orElse(1)))); } - private static FileObject file2Api(TableFile f) { - return new FileObject() - ._file(new FileObjectFile() - .id(f.id()) - .url(f.url()) - .partitionValues(f.partitionValues()) - .size(f.size()) - .stats(f.stats().orElse(null)) - .version(f.version().orElse(null)) - .timestamp(f.timestamp().orElse(null)) - .expirationTimestamp(f.expirationTimestamp())); + private static DeltaFileObject file2Api(TableFile f) { + return new DeltaFileObject() + .id(f.id()) + .version(f.version().orElse(null)) + .deletionVectorFileId(null) // TODO + .timestamp(f.timestamp().orElse(null)) + .expirationTimestamp(f.expirationTimestamp()) + .deltaSingleAction(new DeltaSingleAction() + ._file(new DeltaAddFileAction() + .id(f.id()) + .url(f.url()) + .partitionValues(f.partitionValues()) + .size(f.size()) + .stats(f.stats().orElse(null)) + .version(f.version().orElse(null)) + .timestamp(f.timestamp().orElse(null)) + .expirationTimestamp(f.expirationTimestamp()))); } public static TableReferenceAndReadRequest api2TableReferenceAndReadRequest( @@ -104,28 +117,16 @@ public static io.whitefox.api.deltasharing.model.v1.generated.Table table2api( .schema(sharedTable.schema()); } - /** - * NOTE: this is an undocumented feature of the reference impl of delta-sharing, it's not part of the - * protocol - * ---- - * Return the [[io.whitefox.api.server.DeltaHeaders.DELTA_SHARE_CAPABILITIES_HEADER]] header - * that will be set in the response w/r/t the one received in the request. - * If the request did not contain any, we will return an empty one. - */ - public static Map toHeaderCapabilitiesMap(String headerCapabilities) { - if (headerCapabilities == null) { - return Map.of(); - } - return Arrays.stream(headerCapabilities.toLowerCase().split(";")) - .map(h -> h.split("=")) - .filter(h -> h.length == 2) - .map(splits -> Map.entry(splits[0], splits[1])) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - public static TableMetadataResponseObject toTableResponseMetadata(Metadata m) { return new TableMetadataResponseObject() - .protocol(new ProtocolObject().protocol(new ProtocolObjectProtocol().minReaderVersion(1))) + .protocol(new ParquetProtocolObject() + .protocol(new ParquetProtocolObjectProtocol().minReaderVersion(1))) .metadata(metadata2Api(m)); } + + public static String toCapabilitiesHeader(DeltaSharingCapabilities deltaSharingCapabilities) { + return deltaSharingCapabilities.values().entrySet().stream() + .map(entry -> entry.getKey() + "=" + String.join(",", entry.getValue())) + .collect(Collectors.joining(";")); + } } diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java b/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java index 8bb0b18ca..4775df8e5 100644 --- a/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java @@ -14,6 +14,7 @@ import io.whitefox.api.server.ApiUtils; import io.whitefox.core.services.ContentAndToken; import io.whitefox.core.services.DeltaSharesService; +import io.whitefox.core.services.DeltaSharingCapabilities; import io.whitefox.core.services.ShareService; import jakarta.inject.Inject; import jakarta.ws.rs.core.MediaType; @@ -60,8 +61,9 @@ public Response getTableChanges( Integer startingVersion, Integer endingVersion, String endingTimestamp, - Boolean includeHistoricalMetadata) { - return Response.ok().build(); + Boolean includeHistoricalMetadata, + String deltaSharingCapabilities) { + return Response.status(Response.Status.NOT_IMPLEMENTED).build(); } @Override @@ -72,20 +74,22 @@ public Response getTableMetadata( String startingTimestamp, String deltaSharingCapabilities) { return wrapExceptions( - () -> optionalToNotFound( - deltaSharesService.getTableMetadata(share, schema, table, startingTimestamp), - m -> optionalToNotFound( - deltaSharesService.getTableVersion(share, schema, table, startingTimestamp), - v -> Response.ok( - tableResponseSerializer.serialize(DeltaMappers.toTableResponseMetadata(m)), - ndjsonMediaType) - .status(Response.Status.OK.getStatusCode()) - .header(DELTA_TABLE_VERSION_HEADER, String.valueOf(v)) - .header( - DELTA_SHARE_CAPABILITIES_HEADER, - getResponseFormatHeader( - DeltaMappers.toHeaderCapabilitiesMap(deltaSharingCapabilities))) - .build())), + () -> { + DeltaSharingCapabilities requestCapabilities = + new DeltaSharingCapabilities(deltaSharingCapabilities); + return optionalToNotFound( + deltaSharesService.getTableMetadata( + share, schema, table, startingTimestamp, requestCapabilities), + m -> Response.ok( + tableResponseSerializer.serialize(DeltaMappers.toTableResponseMetadata(m)), + ndjsonMediaType) + .status(Response.Status.OK.getStatusCode()) + .header(DELTA_TABLE_VERSION_HEADER, String.valueOf(m.version())) + .header( + DeltaSharingCapabilities.DELTA_SHARE_CAPABILITIES_HEADER, + DeltaMappers.toCapabilitiesHeader(m.tableCapabilities())) + .build()); + }, exceptionToResponse); } @@ -193,20 +197,25 @@ public Response queryTable( return wrapExceptions( () -> optionalToNotFound( deltaSharesService.getTableVersion(share, schema, table, startingTimestamp), - version -> Response.ok( - tableQueryResponseSerializer.serialize( - DeltaMappers.readTableResult2api(deltaSharesService.queryTable( - share, - schema, - table, - DeltaMappers.api2ReadTableRequest(queryRequest)))), - ndjsonMediaType) - .header(DELTA_TABLE_VERSION_HEADER, version) - .header( - DELTA_SHARE_CAPABILITIES_HEADER, - getResponseFormatHeader( - DeltaMappers.toHeaderCapabilitiesMap(deltaSharingCapabilities))) - .build()), + version -> { + var capabilities = new DeltaSharingCapabilities(deltaSharingCapabilities); + var readTableResult = deltaSharesService.queryTable( + share, + schema, + table, + DeltaMappers.api2ReadTableRequest(queryRequest), + capabilities); + return Response.ok( + tableQueryResponseSerializer.serialize( + DeltaMappers.readTableResult2api(readTableResult)), + ndjsonMediaType) + .header(DELTA_TABLE_VERSION_HEADER, version) + .header( + DeltaSharingCapabilities.DELTA_SHARE_CAPABILITIES_HEADER, + DeltaMappers.toCapabilitiesHeader( + readTableResult.metadata().tableCapabilities())) + .build(); + }), exceptionToResponse); } diff --git a/server/app/src/main/java/io/whitefox/api/server/ApiUtils.java b/server/app/src/main/java/io/whitefox/api/server/ApiUtils.java index b5ece4313..1cb765de5 100644 --- a/server/app/src/main/java/io/whitefox/api/server/ApiUtils.java +++ b/server/app/src/main/java/io/whitefox/api/server/ApiUtils.java @@ -60,12 +60,6 @@ default Response optionalToNotFound(Optional opt, Function f return opt.map(fn).orElse(notFoundResponse()); } - default String getResponseFormatHeader(Map deltaSharingCapabilities) { - return String.format( - "%s=%s", - DeltaHeaders.DELTA_SHARING_RESPONSE_FORMAT, getResponseFormat(deltaSharingCapabilities)); - } - default String getResponseFormat(Map deltaSharingCapabilities) { return deltaSharingCapabilities.getOrDefault( DeltaHeaders.DELTA_SHARING_RESPONSE_FORMAT, diff --git a/server/app/src/main/java/io/whitefox/api/server/DeltaHeaders.java b/server/app/src/main/java/io/whitefox/api/server/DeltaHeaders.java index 6c261d1a0..725f186fe 100644 --- a/server/app/src/main/java/io/whitefox/api/server/DeltaHeaders.java +++ b/server/app/src/main/java/io/whitefox/api/server/DeltaHeaders.java @@ -1,7 +1,8 @@ package io.whitefox.api.server; +import io.whitefox.core.services.DeltaSharingCapabilities; + public interface DeltaHeaders { - String DELTA_SHARING_RESPONSE_FORMAT = "responseformat"; + String DELTA_SHARING_RESPONSE_FORMAT = DeltaSharingCapabilities.DELTA_SHARING_RESPONSE_FORMAT; String DELTA_TABLE_VERSION_HEADER = "Delta-Table-Version"; - String DELTA_SHARE_CAPABILITIES_HEADER = "delta-sharing-capabilities"; } diff --git a/server/app/src/main/java/io/whitefox/api/server/WhitefoxMappers.java b/server/app/src/main/java/io/whitefox/api/server/WhitefoxMappers.java index ac5dd4d36..9a401f5ad 100644 --- a/server/app/src/main/java/io/whitefox/api/server/WhitefoxMappers.java +++ b/server/app/src/main/java/io/whitefox/api/server/WhitefoxMappers.java @@ -14,6 +14,9 @@ import io.whitefox.core.actions.*; import io.whitefox.core.actions.CreateMetastore; import io.whitefox.core.actions.CreateStorage; +import io.whitefox.core.delta.Metadata; +import io.whitefox.core.delta.Protocol; + import java.util.ArrayList; import java.util.Optional; import java.util.function.Function; @@ -173,6 +176,7 @@ public static MetastoreType api2MetastoreType( } } + // TODO we need to resolve the json schema anyOf/oneOf problem private static MetadataObject metadata2Api(Metadata metadata) { return new MetadataObject() .metaData(new MetadataObjectMetaData() diff --git a/server/core/build.gradle.kts b/server/core/build.gradle.kts index 368b32d9e..46de6b69a 100644 --- a/server/core/build.gradle.kts +++ b/server/core/build.gradle.kts @@ -2,6 +2,7 @@ plugins { `java-library` `java-test-fixtures` id("whitefox.java-conventions") + id("io.freefair.lombok") version "8.4" } val quarkusPlatformGroupId: String by project diff --git a/server/core/src/main/java/io/whitefox/core/Metadata.java b/server/core/src/main/java/io/whitefox/core/Metadata.java deleted file mode 100644 index 79c6eaaf7..000000000 --- a/server/core/src/main/java/io/whitefox/core/Metadata.java +++ /dev/null @@ -1,147 +0,0 @@ -package io.whitefox.core; - -import io.whitefox.annotations.SkipCoverageGenerated; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; - -public class Metadata { - private final String id; - private final Optional name; - private final Optional description; - private final Format format; - private final TableSchema tableSchema; - private final List partitionColumns; - private final Map configuration; - private final Optional version; - private final Optional size; - private final Optional numFiles; - - public enum Format { - PARQUET("parquet"); - - private final String provider; - - private Format(final String provider) { - this.provider = provider; - } - - public String provider() { - return this.provider; - } - } - - public Metadata( - String id, - Optional name, - Optional description, - Format format, - TableSchema tableSchema, - List partitionColumns, - Map configuration, - Optional version, - Optional size, - Optional numFiles) { - this.id = id; - this.name = name; - this.description = description; - this.format = format; - this.tableSchema = tableSchema; - this.partitionColumns = partitionColumns; - this.configuration = configuration; - this.version = version; - this.size = size; - this.numFiles = numFiles; - } - - public String id() { - return id; - } - - public Optional name() { - return name; - } - - public Optional description() { - return description; - } - - public Format format() { - return format; - } - - public TableSchema tableSchema() { - return tableSchema; - } - - public List partitionColumns() { - return partitionColumns; - } - - public Map configuration() { - return configuration; - } - - public Optional version() { - return version; - } - - public Optional size() { - return size; - } - - public Optional numFiles() { - return numFiles; - } - - @Override - @SkipCoverageGenerated - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Metadata metadata = (Metadata) o; - return Objects.equals(id, metadata.id) - && Objects.equals(name, metadata.name) - && Objects.equals(description, metadata.description) - && format == metadata.format - && Objects.equals(tableSchema, metadata.tableSchema) - && Objects.equals(partitionColumns, metadata.partitionColumns) - && Objects.equals(configuration, metadata.configuration) - && Objects.equals(version, metadata.version) - && Objects.equals(size, metadata.size) - && Objects.equals(numFiles, metadata.numFiles); - } - - @Override - @SkipCoverageGenerated - public int hashCode() { - return Objects.hash( - id, - name, - description, - format, - tableSchema, - partitionColumns, - configuration, - version, - size, - numFiles); - } - - @Override - @SkipCoverageGenerated - public String toString() { - return "Metadata{" + "id='" - + id + '\'' + ", name=" - + name + ", description=" - + description + ", format=" - + format + ", tableSchema=" - + tableSchema + ", partitionColumns=" - + partitionColumns + ", configuration=" - + configuration + ", version=" - + version + ", size=" - + size + ", numFiles=" - + numFiles + '}'; - } -} diff --git a/server/core/src/main/java/io/whitefox/core/Protocol.java b/server/core/src/main/java/io/whitefox/core/Protocol.java deleted file mode 100644 index 63aed6431..000000000 --- a/server/core/src/main/java/io/whitefox/core/Protocol.java +++ /dev/null @@ -1,38 +0,0 @@ -package io.whitefox.core; - -import io.whitefox.annotations.SkipCoverageGenerated; -import java.util.Objects; -import java.util.Optional; - -public class Protocol { - private final Optional minReaderVersion; - - public Protocol(Optional minReaderVersion) { - this.minReaderVersion = minReaderVersion; - } - - public Optional minReaderVersion() { - return minReaderVersion; - } - - @Override - @SkipCoverageGenerated - public String toString() { - return "Protocol{" + "minReaderVersion=" + minReaderVersion + '}'; - } - - @Override - @SkipCoverageGenerated - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Protocol protocol = (Protocol) o; - return Objects.equals(minReaderVersion, protocol.minReaderVersion); - } - - @Override - @SkipCoverageGenerated - public int hashCode() { - return Objects.hash(minReaderVersion); - } -} diff --git a/server/core/src/main/java/io/whitefox/core/ReadTableResult.java b/server/core/src/main/java/io/whitefox/core/ReadTableResult.java deleted file mode 100644 index 23e11791f..000000000 --- a/server/core/src/main/java/io/whitefox/core/ReadTableResult.java +++ /dev/null @@ -1,55 +0,0 @@ -package io.whitefox.core; - -import io.whitefox.annotations.SkipCoverageGenerated; -import java.util.List; -import java.util.Objects; - -public class ReadTableResult { - private final Protocol protocol; - private final Metadata metadata; - private final List files; - - public ReadTableResult(Protocol protocol, Metadata metadata, List files) { - this.protocol = protocol; - this.metadata = metadata; - this.files = files; - } - - @Override - @SkipCoverageGenerated - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ReadTableResult that = (ReadTableResult) o; - return Objects.equals(protocol, that.protocol) - && Objects.equals(metadata, that.metadata) - && Objects.equals(files, that.files); - } - - @Override - @SkipCoverageGenerated - public int hashCode() { - return Objects.hash(protocol, metadata, files); - } - - @Override - @SkipCoverageGenerated - public String toString() { - return "QueryTableResult{" + "protocol=" - + protocol + ", metadata=" - + metadata + ", files=" - + files + '}'; - } - - public Protocol protocol() { - return protocol; - } - - public Metadata metadata() { - return metadata; - } - - public List files() { - return files; - } -} diff --git a/server/core/src/main/java/io/whitefox/core/ReadTableResultToBeSigned.java b/server/core/src/main/java/io/whitefox/core/ReadTableResultToBeSigned.java deleted file mode 100644 index 584c319ee..000000000 --- a/server/core/src/main/java/io/whitefox/core/ReadTableResultToBeSigned.java +++ /dev/null @@ -1,56 +0,0 @@ -package io.whitefox.core; - -import io.whitefox.annotations.SkipCoverageGenerated; -import java.util.List; -import java.util.Objects; - -public class ReadTableResultToBeSigned { - private final Protocol protocol; - private final Metadata metadata; - private final List other; - - public ReadTableResultToBeSigned( - Protocol protocol, Metadata metadata, List other) { - this.protocol = protocol; - this.metadata = metadata; - this.other = other; - } - - @Override - @SkipCoverageGenerated - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ReadTableResultToBeSigned that = (ReadTableResultToBeSigned) o; - return Objects.equals(protocol, that.protocol) - && Objects.equals(metadata, that.metadata) - && Objects.equals(other, that.other); - } - - @Override - @SkipCoverageGenerated - public int hashCode() { - return Objects.hash(protocol, metadata, other); - } - - @Override - @SkipCoverageGenerated - public String toString() { - return "QueryTableResult{" + "protocol=" - + protocol + ", metadata=" - + metadata + ", other=" - + other + '}'; - } - - public Protocol protocol() { - return protocol; - } - - public Metadata metadata() { - return metadata; - } - - public List other() { - return other; - } -} diff --git a/server/core/src/main/java/io/whitefox/core/TableFile.java b/server/core/src/main/java/io/whitefox/core/TableFile.java index 42c897abd..37b0a1f0e 100644 --- a/server/core/src/main/java/io/whitefox/core/TableFile.java +++ b/server/core/src/main/java/io/whitefox/core/TableFile.java @@ -6,7 +6,6 @@ import java.util.Optional; public class TableFile { - private final String url; private final String id; private final long size; diff --git a/server/core/src/main/java/io/whitefox/core/TableFileToBeSigned.java b/server/core/src/main/java/io/whitefox/core/TableFileToBeSigned.java index b7cda3d8f..34a73bee6 100644 --- a/server/core/src/main/java/io/whitefox/core/TableFileToBeSigned.java +++ b/server/core/src/main/java/io/whitefox/core/TableFileToBeSigned.java @@ -9,9 +9,7 @@ public class TableFileToBeSigned { private final String url; private final long size; - private final long version; - private final Optional timestamp; private final String stats; diff --git a/server/core/src/main/java/io/whitefox/core/delta/Metadata.java b/server/core/src/main/java/io/whitefox/core/delta/Metadata.java new file mode 100644 index 000000000..dc319a9a0 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/delta/Metadata.java @@ -0,0 +1,274 @@ +package io.whitefox.core.delta; + +import io.whitefox.annotations.SkipCoverageGenerated; +import io.whitefox.core.types.StructType; + +import java.util.*; + +public abstract class Metadata { + Metadata() { + } + + ParquetMetadata parquet(String id, Optional name, Optional description, ParquetFormat format, StructType schema, List partitionColumns, Map configuration, Optional version, Optional size, Optional numFiles) { + return new ParquetMetadata(id, name, description, format, schema, partitionColumns, configuration, version, size, numFiles); + } + + DeltaMetadata delta(Optional version, Optional size, Optional numFiles, String id, Optional name, Optional description, DeltaFormat format, StructType schema, List partitionColumns, Optional createdTime, Map configuration) { + return new DeltaMetadata(version, size, numFiles, id, name, description, format, schema, partitionColumns, createdTime, configuration); + } + + public static class DeltaFormat { + private final String provider; + private final Map options; + + public DeltaFormat(String provider, Map options) { + this.provider = provider; + this.options = Collections.unmodifiableMap(options); + } + + public String provider() { + return provider; + } + + public Map options() { + return options; + } + + @Override + @SkipCoverageGenerated + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeltaFormat that = (DeltaFormat) o; + return Objects.equals(provider, that.provider) && Objects.equals(options, that.options); + } + + @Override + @SkipCoverageGenerated + public int hashCode() { + return Objects.hash(provider, options); + } + + @Override + @SkipCoverageGenerated + public String toString() { + return "DeltaFormat{" + + "provider='" + provider + '\'' + + ", options=" + options + + '}'; + } + } + + public static class DeltaMetadata extends Metadata { + private final Optional version; + private final Optional size; + private final Optional numFiles; + private final String id; + private final Optional name; + private final Optional description; + private final DeltaFormat format; + private final StructType schema; + private final List partitionColumns; + private final Optional createdTime; + private final Map configuration; + + DeltaMetadata(Optional version, Optional size, Optional numFiles, String id, Optional name, Optional description, DeltaFormat format, StructType schema, List partitionColumns, Optional createdTime, Map configuration) { + this.version = version; + this.size = size; + this.numFiles = numFiles; + this.id = id; + this.name = name; + this.description = description; + this.format = format; + this.schema = schema; + this.partitionColumns = partitionColumns; + this.createdTime = createdTime; + this.configuration = configuration; + } + + public Optional version() { + return version; + } + + public Optional size() { + return size; + } + + public Optional numFiles() { + return numFiles; + } + + public String id() { + return id; + } + + public Optional name() { + return name; + } + + public Optional description() { + return description; + } + + public DeltaFormat format() { + return format; + } + + public StructType schema() { + return schema; + } + + public List partitionColumns() { + return partitionColumns; + } + + public Optional createdTime() { + return createdTime; + } + + public Map configuration() { + return configuration; + } + + @Override + @SkipCoverageGenerated + public String toString() { + return "DeltaMetadata{" + + "version=" + version + + ", size=" + size + + ", numFiles=" + numFiles + + ", id='" + id + '\'' + + ", name=" + name + + ", description=" + description + + ", format=" + format + + ", schema=" + schema + + ", partitionColumns=" + partitionColumns + + ", createdTime=" + createdTime + + ", configuration=" + configuration + + '}'; + } + + @Override + @SkipCoverageGenerated + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeltaMetadata that = (DeltaMetadata) o; + return Objects.equals(version, that.version) && Objects.equals(size, that.size) && Objects.equals(numFiles, that.numFiles) && Objects.equals(id, that.id) && Objects.equals(name, that.name) && Objects.equals(description, that.description) && Objects.equals(format, that.format) && Objects.equals(schema, that.schema) && Objects.equals(partitionColumns, that.partitionColumns) && Objects.equals(createdTime, that.createdTime) && Objects.equals(configuration, that.configuration); + } + + @Override + @SkipCoverageGenerated + public int hashCode() { + return Objects.hash(version, size, numFiles, id, name, description, format, schema, partitionColumns, createdTime, configuration); + } + } + + public static class ParquetMetadata extends Metadata { + private final String id; + private final Optional name; + private final Optional description; + private final ParquetFormat format; + private final StructType schema; + private final List partitionColumns; + private final Map configuration; + private final Optional version; + private final Optional size; + private final Optional numFiles; + + ParquetMetadata(String id, Optional name, Optional description, ParquetFormat format, StructType schema, List partitionColumns, Map configuration, Optional version, Optional size, Optional numFiles) { + this.id = id; + this.name = name; + this.description = description; + this.format = format; + this.schema = schema; + this.partitionColumns = partitionColumns; + this.configuration = configuration; + this.version = version; + this.size = size; + this.numFiles = numFiles; + HashSet partitionColumnSet = new HashSet<>(schema.fieldNames()); + for (String partitionColumn : partitionColumns) { + if (!partitionColumnSet.contains(partitionColumn)){ + throw new IllegalArgumentException( + String.format("Partition column %s is not part of schema %s", partitionColumn, schema) + ); + } + } + } + + public String id() { + return id; + } + + public Optional name() { + return name; + } + + public Optional description() { + return description; + } + + public ParquetFormat format() { + return format; + } + + public StructType schema() { + return schema; + } + + public List partitionColumns() { + return partitionColumns; + } + + public Map configuration() { + return configuration; + } + + public Optional version() { + return version; + } + + public Optional size() { + return size; + } + + public Optional numFiles() { + return numFiles; + } + + @Override + @SkipCoverageGenerated + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ParquetMetadata that = (ParquetMetadata) o; + return Objects.equals(id, that.id) && Objects.equals(name, that.name) && Objects.equals(description, that.description) && format == that.format && Objects.equals(schema, that.schema) && Objects.equals(partitionColumns, that.partitionColumns) && Objects.equals(configuration, that.configuration) && Objects.equals(version, that.version) && Objects.equals(size, that.size) && Objects.equals(numFiles, that.numFiles); + } + + @Override + @SkipCoverageGenerated + public int hashCode() { + return Objects.hash(id, name, description, format, schema, partitionColumns, configuration, version, size, numFiles); + } + + @Override + @SkipCoverageGenerated + public String toString() { + return "ParquetMetadata{" + + "id='" + id + '\'' + + ", name=" + name + + ", description=" + description + + ", format=" + format + + ", schema=" + schema + + ", partitionColumns=" + partitionColumns + + ", configuration=" + configuration + + ", version=" + version + + ", size=" + size + + ", numFiles=" + numFiles + + '}'; + } + } + + public enum ParquetFormat {PARQUET_FORMAT} +} diff --git a/server/core/src/main/java/io/whitefox/core/delta/Protocol.java b/server/core/src/main/java/io/whitefox/core/delta/Protocol.java new file mode 100644 index 000000000..e4e2f7041 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/delta/Protocol.java @@ -0,0 +1,90 @@ +package io.whitefox.core.delta; + +import io.whitefox.annotations.SkipCoverageGenerated; +import io.whitefox.core.services.DeltaSharingCapabilities; + +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public abstract class Protocol { + Protocol() { + } + + Protocol delta(int minReaderVersion, int minWriterVersion, Set readerFeatures, Set writerFeatures) { + return new DeltaProtocol(minReaderVersion, minWriterVersion, readerFeatures, writerFeatures); + } + + ParquetProtocol parquet() { + return ParquetProtocol.INSTANCE; + } + + + public static class DeltaProtocol extends Protocol { + private final int minReaderVersion; + private final int minWriterVersion; + + private final Set readerFeatures; + + private final Set writerFeatures; + + DeltaProtocol(int minReaderVersion, + int minWriterVersion, + Set readerFeatures, + Set writerFeatures) { + this.minReaderVersion = minReaderVersion; + this.minWriterVersion = minWriterVersion; + this.readerFeatures = readerFeatures; + this.writerFeatures = writerFeatures; + } + + public int minReaderVersion() { + return minReaderVersion; + } + + public int minWriterVersion() { + return minWriterVersion; + } + + public Set readerFeatures() { + return readerFeatures; + } + + public Set writerFeatures() { + return writerFeatures; + } + + @Override + @SkipCoverageGenerated + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeltaProtocol that = (DeltaProtocol) o; + return minReaderVersion == that.minReaderVersion && minWriterVersion == that.minWriterVersion && Objects.equals(readerFeatures, that.readerFeatures) && Objects.equals(writerFeatures, that.writerFeatures); + } + + @Override + @SkipCoverageGenerated + public int hashCode() { + return Objects.hash(minReaderVersion, minWriterVersion, readerFeatures, writerFeatures); + } + + @Override + @SkipCoverageGenerated + public String toString() { + return "DeltaProtocolObject{" + + "minReaderVersion=" + minReaderVersion + + ", minWriterVersion=" + minWriterVersion + + ", readerFeatures=" + readerFeatures + + ", writerFeatures=" + writerFeatures + + '}'; + } + } + + public static class ParquetProtocol extends Protocol { + private ParquetProtocol(){} + + public static final ParquetProtocol INSTANCE = new ParquetProtocol(); + } + +} \ No newline at end of file diff --git a/server/core/src/main/java/io/whitefox/core/delta/Stats.java b/server/core/src/main/java/io/whitefox/core/delta/Stats.java new file mode 100644 index 000000000..4189c3ed4 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/delta/Stats.java @@ -0,0 +1,25 @@ +package io.whitefox.core.delta; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.RequiredArgsConstructor; + +import java.util.List; +import java.util.Set; + +@Data +@Builder(toBuilder = true) +@AllArgsConstructor +@RequiredArgsConstructor +public class Stats { + private final long numRecords; + private final Set> minValues = Set.of(); + private final Set> maxValues = Set.of(); + private final Set> nullCount = Set.of(); + @Data + public static class ColumnStat { + private final List columnPath; + private final T value; + } +} diff --git a/server/core/src/main/java/io/whitefox/core/delta/signed/DeltaFile.java b/server/core/src/main/java/io/whitefox/core/delta/signed/DeltaFile.java new file mode 100644 index 000000000..ef77605e3 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/delta/signed/DeltaFile.java @@ -0,0 +1,59 @@ +package io.whitefox.core.delta.signed; + +import io.whitefox.core.delta.unsigned.DeltaFileToBeSigned; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.RequiredArgsConstructor; + +import java.util.Optional; + +@Data +@Builder(toBuilder = true) +@AllArgsConstructor +public class DeltaFile implements DeltaFileAction { + + /** + * A unique string for the file in a table. The same file is guaranteed to have the same id across multiple requests. A client may cache the file content and use this id as a key to decide whether to use the cached file content. + */ + private final String id; + + /** + * A unique string for the deletion vector file in a table. The same deletion vector file is guaranteed to have the same id across multiple requests. A client may cache the file content and use this id as a key to decide whether to use the cached file content. + */ + private final Optional deletionVectorFileId; + + /** + * The table version of the file, returned when querying a table data with a version or timestamp parameter. + */ + private final Optional version; + + /** + * The unix timestamp corresponding to the table version of the file, in milliseconds, returned when querying a table data with a version or timestamp parameter. + */ + private final Optional timestamp; + + /** + * The unix timestamp corresponding to the expiration of the url, in milliseconds, returned when the server supports the feature. + */ + private final Optional expirationTimestamp; + + /** + * Need to be parsed by a delta library as a delta single action, the path field is replaced by pr-signed url. + */ + private final ParquetFileAction deltaSingleAction; + + public static DeltaFile signed(DeltaFileToBeSigned tbs, + ParquetFileAction signed, + Optional newExpirationTimestamp, + String newId) { + return new DeltaFile( + newId, + tbs.getDeletionVectorFileId(), + tbs.getVersion(), + tbs.getTimestamp(), + newExpirationTimestamp, + signed + ); + } +} diff --git a/server/core/src/main/java/io/whitefox/core/delta/signed/DeltaFileAction.java b/server/core/src/main/java/io/whitefox/core/delta/signed/DeltaFileAction.java new file mode 100644 index 000000000..a31eb546c --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/delta/signed/DeltaFileAction.java @@ -0,0 +1,4 @@ +package io.whitefox.core.delta.signed; + +public interface DeltaFileAction extends FileAction { +} diff --git a/server/core/src/main/java/io/whitefox/core/delta/signed/FileAction.java b/server/core/src/main/java/io/whitefox/core/delta/signed/FileAction.java new file mode 100644 index 000000000..89d7f5186 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/delta/signed/FileAction.java @@ -0,0 +1,4 @@ +package io.whitefox.core.delta.signed; + +public interface FileAction { +} diff --git a/server/core/src/main/java/io/whitefox/core/delta/signed/ParquetAddFile.java b/server/core/src/main/java/io/whitefox/core/delta/signed/ParquetAddFile.java new file mode 100644 index 000000000..6714fde96 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/delta/signed/ParquetAddFile.java @@ -0,0 +1,74 @@ +package io.whitefox.core.delta.signed; + +import io.whitefox.core.delta.Stats; +import io.whitefox.core.delta.unsigned.ParquetAddFileToBeSigned; +import io.whitefox.core.delta.unsigned.ParquetCDFFileToBeSigned; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.RequiredArgsConstructor; + +import java.net.URI; +import java.util.Map; +import java.util.Optional; + +@Data +@AllArgsConstructor +@RequiredArgsConstructor +@Builder(toBuilder = true) +public class ParquetAddFile implements ParquetFileAction { + /** + * An https url that a client can use to read the file directly. The same file in different responses may have different urls + */ + private final String url; + /** + * A unique string for the file in a table. The same file is guaranteed to have the same id across multiple requests. A client may cache the file content and use this id as a key to decide whether to use the cached file content. + */ + private final String id; + /** + * A map from partition column to value for this file. When the table doesn’t have partition columns, this will be an empty map. + */ + private final Map partitionValues; + /** + * The size of this file in bytes. + */ + private final long size; + /** + * The timestamp of the file in milliseconds from epoch. + */ + private final long timestamp; + /** + * The table version of this file + */ + private final int version; + /** + * Contains statistics (e.g., count, min/max values for columns) about the data in this file. This field may be missing. A file may or may not have stats. A client can decide whether to use stats or drop it. + */ + private final Optional stats; + /** + * The unix timestamp corresponding to the expiration of the url, in milliseconds, returned when the server supports the feature. + */ + private final Optional expirationTimestamp; + + @Override + public Optional getExpTs() { + return expirationTimestamp; + } + + public static ParquetAddFile signed( + ParquetAddFileToBeSigned f, + URI signedUrl, + Optional expirationTimestamp, + String newId + ) { + return new ParquetAddFile( + signedUrl.toASCIIString(), + newId, + f.getPartitionValues(), + f.getSize(), + f.getTimestamp(), + f.getVersion(), + f.getStats(), + expirationTimestamp); + } +} diff --git a/server/core/src/main/java/io/whitefox/core/delta/signed/ParquetCDFFile.java b/server/core/src/main/java/io/whitefox/core/delta/signed/ParquetCDFFile.java new file mode 100644 index 000000000..fb9fb66ab --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/delta/signed/ParquetCDFFile.java @@ -0,0 +1,61 @@ +package io.whitefox.core.delta.signed; + +import io.whitefox.core.delta.unsigned.ParquetCDFFileToBeSigned; +import io.whitefox.core.delta.unsigned.ParquetFileToBeSigned; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.RequiredArgsConstructor; + +import java.net.URI; +import java.util.Map; +import java.util.Optional; + +@Data +@AllArgsConstructor +@RequiredArgsConstructor +@Builder(toBuilder = true) +public class ParquetCDFFile implements ParquetFileAction { + /** + * An https url that a client can use to read the file directly. The same file in different responses may have different urls + */ + private final String url; + /** + * A unique string for the file in a table. The same file is guaranteed to have the same id across multiple requests. A client may cache the file content and use this id as a key to decide whether to use the cached file content. + */ + private final String id; + /** + * A map from partition column to value for this file. When the table doesn’t have partition columns, this will be an empty map. + */ + private final Map partitionValues; + /** + * The size of this file in bytes. + */ + private final long size; + /** + * The timestamp of the file in milliseconds from epoch. + */ + private final long timestamp; + /** + * The table version of this file + */ + private final int version; + /** + * The unix timestamp corresponding to the expiration of the url, in milliseconds, returned when the server supports the feature. + */ + private final Optional expirationTimestamp; + + @Override + public Optional getExpTs() { + return expirationTimestamp; + } + + public static ParquetCDFFile signed( + ParquetCDFFileToBeSigned f, + URI signedUrl, + Optional expirationTimestamp, + String newId + ) { + return new ParquetCDFFile(signedUrl.toASCIIString(), newId, f.getPartitionValues(), f.getSize(), f.getTimestamp(), f.getVersion(), expirationTimestamp); + } +} diff --git a/server/core/src/main/java/io/whitefox/core/delta/signed/ParquetFile.java b/server/core/src/main/java/io/whitefox/core/delta/signed/ParquetFile.java new file mode 100644 index 000000000..e8f35c8b0 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/delta/signed/ParquetFile.java @@ -0,0 +1,79 @@ +package io.whitefox.core.delta.signed; + +import io.whitefox.core.delta.Stats; +import io.whitefox.core.delta.unsigned.DeltaFileToBeSigned; +import io.whitefox.core.delta.unsigned.ParquetFileToBeSigned; +import io.whitefox.core.delta.unsigned.ParquetRemoveFileToBeSigned; +import lombok.Builder; +import lombok.Data; + +import java.net.URI; +import java.util.Map; +import java.util.Optional; + +@Data +@Builder(toBuilder = true) +public class ParquetFile implements ParquetFileAction { + + /** + * A https url that a client can use to read the file directly. The same file in different responses may have different urls. + */ + private final String url; + + /** + * A unique string for the file in a table. The same file is guaranteed to have the same id across multiple requests. A client may cache the file content and use this id as a key to decide whether to use the cached file content. + */ + private final String id; + + /** + * A map from partition column to value for this file. + */ + private final Map partitionValues; + + /** + * The size of this file in bytes + */ + private final long size; + + /** + * Contains statistics (e.g., count, min/max values for columns) about the data in this file. This field may be missing. + */ + private final Optional stats; + + /** + * The table version of the file, returned when querying a table data with a version or timestamp parameter. + */ + private final Optional version; + + /** + * The unix timestamp corresponding to the table version of the file, in milliseconds, returned when querying a table data with a version or timestamp parameter. + */ + private final Optional timestamp; + + /** + * The unix timestamp corresponding to the expiration of the url, in milliseconds, returned when the server supports the feature. + */ + private final Optional expirationTimestamp; + + @Override + public Optional getExpTs() { + return expirationTimestamp; + } + + public static ParquetFile signed(ParquetFileToBeSigned f, + URI signedUrl, + Optional expirationTimestamp, + String newId + ) { + return new ParquetFile( + signedUrl.toASCIIString(), + newId, + f.getPartitionValues(), + f.getSize(), + f.getStats(), + f.getVersion(), + f.getTimestamp(), + expirationTimestamp + ); + } +} diff --git a/server/core/src/main/java/io/whitefox/core/delta/signed/ParquetFileAction.java b/server/core/src/main/java/io/whitefox/core/delta/signed/ParquetFileAction.java new file mode 100644 index 000000000..8488191f5 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/delta/signed/ParquetFileAction.java @@ -0,0 +1,7 @@ +package io.whitefox.core.delta.signed; + +import java.util.Optional; + +public interface ParquetFileAction extends FileAction { + Optional getExpTs(); +} diff --git a/server/core/src/main/java/io/whitefox/core/delta/signed/ParquetRemoveFile.java b/server/core/src/main/java/io/whitefox/core/delta/signed/ParquetRemoveFile.java new file mode 100644 index 000000000..882450380 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/delta/signed/ParquetRemoveFile.java @@ -0,0 +1,67 @@ +package io.whitefox.core.delta.signed; + +import io.whitefox.core.delta.unsigned.ParquetRemoveFileToBeSigned; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.RequiredArgsConstructor; + +import java.net.URI; +import java.util.Map; +import java.util.Optional; + + +@Data +@AllArgsConstructor +@RequiredArgsConstructor +@Builder(toBuilder = true) +public class ParquetRemoveFile implements ParquetFileAction { + /** + * An https url that a client can use to read the file directly. The same file in different responses may have different urls + */ + private final String url; + /** + * A unique string for the file in a table. The same file is guaranteed to have the same id across multiple requests. A client may cache the file content and use this id as a key to decide whether to use the cached file content. + */ + private final String id; + /** + * A map from partition column to value for this file. When the table doesn’t have partition columns, this will be an empty map. + */ + private final Map partitionValues; + /** + * The size of this file in bytes. + */ + private final long size; + /** + * The timestamp of the file in milliseconds from epoch. + */ + private final long timestamp; + /** + * The table version of this file + */ + private final int version; + /** + * The unix timestamp corresponding to the expiration of the url, in milliseconds, returned when the server supports the feature. + */ + private final Optional expirationTimestamp; + + public static ParquetRemoveFile signed(ParquetRemoveFileToBeSigned f, + URI signedUrl, + Optional expirationTimestamp, + String newId) { + return new ParquetRemoveFile( + signedUrl.toASCIIString(), + newId, + f.getPartitionValues(), + f.getSize(), + f.getTimestamp(), + f.getVersion(), + expirationTimestamp + ); + } + + @Override + public Optional getExpTs() { + return expirationTimestamp; + } +} diff --git a/server/core/src/main/java/io/whitefox/core/delta/unsigned/DeltaFileToBeSigned.java b/server/core/src/main/java/io/whitefox/core/delta/unsigned/DeltaFileToBeSigned.java new file mode 100644 index 000000000..1beefee63 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/delta/unsigned/DeltaFileToBeSigned.java @@ -0,0 +1,48 @@ +package io.whitefox.core.delta.unsigned; + +import io.whitefox.core.delta.signed.DeltaFile; +import io.whitefox.core.delta.signed.FileAction; +import io.whitefox.core.delta.signed.ParquetFile; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.RequiredArgsConstructor; + +import java.util.Optional; + +@Data +@Builder(toBuilder = true) +@RequiredArgsConstructor +@AllArgsConstructor +public class DeltaFileToBeSigned implements FileActionToBeSigned { + + /** + * A unique string for the file in a table. The same file is guaranteed to have the same id across multiple requests. A client may cache the file content and use this id as a key to decide whether to use the cached file content. + */ + private final String id; + + /** + * A unique string for the deletion vector file in a table. The same deletion vector file is guaranteed to have the same id across multiple requests. A client may cache the file content and use this id as a key to decide whether to use the cached file content. + */ + private final Optional deletionVectorFileId = Optional.empty(); + + /** + * The table version of the file, returned when querying a table data with a version or timestamp parameter. + */ + private final Optional version = Optional.empty(); + + /** + * The unix timestamp corresponding to the table version of the file, in milliseconds, returned when querying a table data with a version or timestamp parameter. + */ + private final Optional timestamp = Optional.empty(); + + /** + * The unix timestamp corresponding to the expiration of the url, in milliseconds, returned when the server supports the feature. + */ + private final Optional expirationTimestamp = Optional.empty(); + + /** + * Need to be parsed by a delta library as a delta single action, the path field is replaced by pr-signed url. + */ + private final ParquetFileActionToBeSigned deltaSingleAction; +} diff --git a/server/core/src/main/java/io/whitefox/core/delta/unsigned/FileActionToBeSigned.java b/server/core/src/main/java/io/whitefox/core/delta/unsigned/FileActionToBeSigned.java new file mode 100644 index 000000000..7949cb1e5 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/delta/unsigned/FileActionToBeSigned.java @@ -0,0 +1,6 @@ +package io.whitefox.core.delta.unsigned; + +import io.whitefox.core.delta.signed.FileAction; + +public interface FileActionToBeSigned { +} \ No newline at end of file diff --git a/server/core/src/main/java/io/whitefox/core/delta/unsigned/ParquetAddFileToBeSigned.java b/server/core/src/main/java/io/whitefox/core/delta/unsigned/ParquetAddFileToBeSigned.java new file mode 100644 index 000000000..863365596 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/delta/unsigned/ParquetAddFileToBeSigned.java @@ -0,0 +1,49 @@ +package io.whitefox.core.delta.unsigned; + +import io.whitefox.core.delta.Stats; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.RequiredArgsConstructor; + +import java.util.Map; +import java.util.Optional; + +@Data +@AllArgsConstructor +@RequiredArgsConstructor +@Builder(toBuilder = true) +public class ParquetAddFileToBeSigned implements ParquetFileActionToBeSigned { + /** + * An https url that a client can use to read the file directly. The same file in different responses may have different urls + */ + private final String url; + /** + * A unique string for the file in a table. The same file is guaranteed to have the same id across multiple requests. A client may cache the file content and use this id as a key to decide whether to use the cached file content. + */ + private final String id; + /** + * A map from partition column to value for this file. When the table doesn’t have partition columns, this will be an empty map. + */ + private final Map partitionValues; + /** + * The size of this file in bytes. + */ + private final long size; + /** + * The timestamp of the file in milliseconds from epoch. + */ + private final long timestamp; + /** + * The table version of this file + */ + private final int version; + /** + * Contains statistics (e.g., count, min/max values for columns) about the data in this file. This field may be missing. A file may or may not have stats. A client can decide whether to use stats or drop it. + */ + private final Optional stats = Optional.empty(); + /** + * The unix timestamp corresponding to the expiration of the url, in milliseconds, returned when the server supports the feature. + */ + private final Optional expirationTimestamp = Optional.empty(); +} diff --git a/server/core/src/main/java/io/whitefox/core/delta/unsigned/ParquetCDFFileToBeSigned.java b/server/core/src/main/java/io/whitefox/core/delta/unsigned/ParquetCDFFileToBeSigned.java new file mode 100644 index 000000000..c94c94e21 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/delta/unsigned/ParquetCDFFileToBeSigned.java @@ -0,0 +1,44 @@ +package io.whitefox.core.delta.unsigned; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.RequiredArgsConstructor; + +import java.util.Map; +import java.util.Optional; + +@Data +@AllArgsConstructor +@RequiredArgsConstructor +@Builder(toBuilder = true) +public class ParquetCDFFileToBeSigned implements ParquetFileActionToBeSigned { + /** + * An https url that a client can use to read the file directly. The same file in different responses may have different urls + */ + private final String url; + /** + * A unique string for the file in a table. The same file is guaranteed to have the same id across multiple requests. A client may cache the file content and use this id as a key to decide whether to use the cached file content. + */ + private final String id; + /** + * A map from partition column to value for this file. When the table doesn’t have partition columns, this will be an empty map. + */ + private final Map partitionValues; + /** + * The size of this file in bytes. + */ + private final long size; + /** + * The timestamp of the file in milliseconds from epoch. + */ + private final long timestamp; + /** + * The table version of this file + */ + private final int version; + /** + * The unix timestamp corresponding to the expiration of the url, in milliseconds, returned when the server supports the feature. + */ + private final Optional expirationTimestamp = Optional.empty(); +} diff --git a/server/core/src/main/java/io/whitefox/core/delta/unsigned/ParquetFileActionToBeSigned.java b/server/core/src/main/java/io/whitefox/core/delta/unsigned/ParquetFileActionToBeSigned.java new file mode 100644 index 000000000..6b06c57ce --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/delta/unsigned/ParquetFileActionToBeSigned.java @@ -0,0 +1,5 @@ +package io.whitefox.core.delta.unsigned; + +public interface ParquetFileActionToBeSigned extends FileActionToBeSigned { + String getUrl(); +} diff --git a/server/core/src/main/java/io/whitefox/core/delta/unsigned/ParquetFileToBeSigned.java b/server/core/src/main/java/io/whitefox/core/delta/unsigned/ParquetFileToBeSigned.java new file mode 100644 index 000000000..29f65d1be --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/delta/unsigned/ParquetFileToBeSigned.java @@ -0,0 +1,53 @@ +package io.whitefox.core.delta.unsigned; + +import io.whitefox.core.delta.Stats; +import lombok.Builder; +import lombok.Data; + +import java.util.Map; +import java.util.Optional; + +@Data +@Builder(toBuilder = true) +public class ParquetFileToBeSigned implements ParquetFileActionToBeSigned { + + /** + * A https url that a client can use to read the file directly. The same file in different responses may have different urls. + */ + private final String url; + + /** + * A unique string for the file in a table. The same file is guaranteed to have the same id across multiple requests. A client may cache the file content and use this id as a key to decide whether to use the cached file content. + */ + private final String id; + + /** + * A map from partition column to value for this file. + */ + private final Map partitionValues; + + /** + * The size of this file in bytes + */ + private final long size; + + /** + * Contains statistics (e.g., count, min/max values for columns) about the data in this file. This field may be missing. + */ + private final Optional stats; + + /** + * The table version of the file, returned when querying a table data with a version or timestamp parameter. + */ + private final Optional version; + + /** + * The unix timestamp corresponding to the table version of the file, in milliseconds, returned when querying a table data with a version or timestamp parameter. + */ + private final Optional timestamp; + + /** + * The unix timestamp corresponding to the expiration of the url, in milliseconds, returned when the server supports the feature. + */ + private final Optional expirationTimestamp; +} diff --git a/server/core/src/main/java/io/whitefox/core/delta/unsigned/ParquetRemoveFileToBeSigned.java b/server/core/src/main/java/io/whitefox/core/delta/unsigned/ParquetRemoveFileToBeSigned.java new file mode 100644 index 000000000..cdac84692 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/delta/unsigned/ParquetRemoveFileToBeSigned.java @@ -0,0 +1,45 @@ +package io.whitefox.core.delta.unsigned; + +import io.whitefox.core.delta.signed.ParquetRemoveFile; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.RequiredArgsConstructor; + +import java.util.Map; +import java.util.Optional; + + +@Data +@AllArgsConstructor +@Builder(toBuilder = true) +public class ParquetRemoveFileToBeSigned implements ParquetFileActionToBeSigned { + /** + * An https url that a client can use to read the file directly. The same file in different responses may have different urls + */ + private final String url; + /** + * A unique string for the file in a table. The same file is guaranteed to have the same id across multiple requests. A client may cache the file content and use this id as a key to decide whether to use the cached file content. + */ + private final String id; + /** + * A map from partition column to value for this file. When the table doesn’t have partition columns, this will be an empty map. + */ + private final Map partitionValues; + /** + * The size of this file in bytes. + */ + private final long size; + /** + * The timestamp of the file in milliseconds from epoch. + */ + private final long timestamp; + /** + * The table version of this file + */ + private final int version; + /** + * The unix timestamp corresponding to the expiration of the url, in milliseconds, returned when the server supports the feature. + */ + private final Optional expirationTimestamp; +} diff --git a/server/core/src/main/java/io/whitefox/core/partitions/PartitionValue.java b/server/core/src/main/java/io/whitefox/core/partitions/PartitionValue.java new file mode 100644 index 000000000..453700af0 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/partitions/PartitionValue.java @@ -0,0 +1,5 @@ +package io.whitefox.core.partitions; + +public class PartitionValue { + public static class StringPartitionValue +} diff --git a/server/core/src/main/java/io/whitefox/core/results/DeltaReadTableResult.java b/server/core/src/main/java/io/whitefox/core/results/DeltaReadTableResult.java new file mode 100644 index 000000000..0a4d77bcd --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/results/DeltaReadTableResult.java @@ -0,0 +1,17 @@ +package io.whitefox.core.results; + +import io.whitefox.core.delta.signed.DeltaFileAction; +import io.whitefox.core.delta.Metadata; +import io.whitefox.core.delta.Protocol; +import lombok.Builder; +import lombok.Data; + +import java.util.List; + +@Data +@Builder(toBuilder = true) +public class DeltaReadTableResult implements ReadTableResult { + private final Metadata.ParquetMetadata metadata; + private final Protocol.ParquetProtocol protocol; + private final List files; +} diff --git a/server/core/src/main/java/io/whitefox/core/results/DeltaReadTableResultToBeSigned.java b/server/core/src/main/java/io/whitefox/core/results/DeltaReadTableResultToBeSigned.java new file mode 100644 index 000000000..bdfea5f46 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/results/DeltaReadTableResultToBeSigned.java @@ -0,0 +1,18 @@ +package io.whitefox.core.results; + +import io.whitefox.core.delta.Metadata; +import io.whitefox.core.delta.Protocol; +import io.whitefox.core.delta.signed.DeltaFileAction; +import io.whitefox.core.delta.unsigned.DeltaFileActionToBeSigned; +import lombok.Builder; +import lombok.Data; + +import java.util.List; + +@Data +@Builder(toBuilder = true) +public class DeltaReadTableResultToBeSigned implements ReadTableResult { + private final Metadata.ParquetMetadata metadata; + private final Protocol.ParquetProtocol protocol; + private final List files; +} diff --git a/server/core/src/main/java/io/whitefox/core/results/ParquetReadTableResult.java b/server/core/src/main/java/io/whitefox/core/results/ParquetReadTableResult.java new file mode 100644 index 000000000..8956399df --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/results/ParquetReadTableResult.java @@ -0,0 +1,17 @@ +package io.whitefox.core.results; + +import io.whitefox.core.delta.Metadata; +import io.whitefox.core.delta.signed.ParquetFileAction; +import io.whitefox.core.delta.Protocol; +import lombok.Builder; +import lombok.Data; + +import java.util.List; + +@Data +@Builder(toBuilder = true) +public class ParquetReadTableResult implements ReadTableResult { + private final Metadata.ParquetMetadata metadata; + private final Protocol.ParquetProtocol protocol; + private final List files; +} diff --git a/server/core/src/main/java/io/whitefox/core/results/ParquetReadTableResultToBeSigned.java b/server/core/src/main/java/io/whitefox/core/results/ParquetReadTableResultToBeSigned.java new file mode 100644 index 000000000..4c8e5e94e --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/results/ParquetReadTableResultToBeSigned.java @@ -0,0 +1,18 @@ +package io.whitefox.core.results; + +import io.whitefox.core.delta.Metadata; +import io.whitefox.core.delta.Protocol; +import io.whitefox.core.delta.signed.ParquetFileAction; +import io.whitefox.core.delta.unsigned.ParquetFileActionToBeSigned; +import lombok.Builder; +import lombok.Data; + +import java.util.List; + +@Data +@Builder(toBuilder = true) +public class ParquetReadTableResultToBeSigned implements ReadTableResult { + private final Metadata.ParquetMetadata metadata; + private final Protocol.ParquetProtocol protocol; + private final List files; +} diff --git a/server/core/src/main/java/io/whitefox/core/results/ReadTableResult.java b/server/core/src/main/java/io/whitefox/core/results/ReadTableResult.java new file mode 100644 index 000000000..66bda3013 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/results/ReadTableResult.java @@ -0,0 +1,13 @@ +package io.whitefox.core.results; + +import io.whitefox.core.delta.signed.FileAction; +import io.whitefox.core.delta.Metadata; +import io.whitefox.core.delta.Protocol; + +import java.util.List; + +public interface ReadTableResult { + Metadata getMetadata(); + Protocol getProtocol(); + List getFiles(); +} diff --git a/server/core/src/main/java/io/whitefox/core/results/ReadTableResultToBeSigned.java b/server/core/src/main/java/io/whitefox/core/results/ReadTableResultToBeSigned.java new file mode 100644 index 000000000..b7b9f864e --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/results/ReadTableResultToBeSigned.java @@ -0,0 +1,15 @@ +package io.whitefox.core.results; + +import io.whitefox.core.delta.Metadata; +import io.whitefox.core.delta.Protocol; +import io.whitefox.core.delta.unsigned.FileActionToBeSigned; + +import java.util.List; + +public interface ReadTableResultToBeSigned { + Metadata getMetadata(); + + Protocol getProtocol(); + + List getFiles(); +} diff --git a/server/core/src/main/java/io/whitefox/core/services/DeltaSharedTable.java b/server/core/src/main/java/io/whitefox/core/services/DeltaSharedTable.java index 0cf6a26ea..f6dfe61ee 100644 --- a/server/core/src/main/java/io/whitefox/core/services/DeltaSharedTable.java +++ b/server/core/src/main/java/io/whitefox/core/services/DeltaSharedTable.java @@ -3,6 +3,10 @@ import io.delta.standalone.DeltaLog; import io.delta.standalone.Snapshot; import io.whitefox.core.*; +import io.whitefox.core.delta.Metadata; +import io.whitefox.core.delta.Protocol; +import io.whitefox.core.results.ReadTableResultToBeSigned; + import java.sql.Timestamp; import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; @@ -39,6 +43,7 @@ public static DeltaSharedTable of( var dt = DeltaLog.forTable( hadoopConfigBuilder.buildConfig(sharedTable.internalTable().provider().storage()), dataPath); + if (!dt.tableExists()) { throw new IllegalArgumentException( String.format("Cannot find a delta table at %s", dataPath)); @@ -54,31 +59,52 @@ public static DeltaSharedTable of(SharedTable sharedTable) { return of(sharedTable, TableSchemaConverter.INSTANCE, new HadoopConfigBuilder()); } - public Optional getMetadata(Optional startingTimestamp) { - return getSnapshot(startingTimestamp).map(this::metadataFromSnapshot); + public Optional getMetadata( + Optional startingTimestamp, DeltaSharingCapabilities deltaSharingCapabilities) { + return getSnapshot(startingTimestamp) + .map(snapshot -> metadataFromSnapshot(snapshot, deltaSharingCapabilities)); } - private Metadata metadataFromSnapshot(Snapshot snapshot) { + private Metadata metadataFromSnapshot( + Snapshot snapshot, DeltaSharingCapabilities requestCapabilities) { + return new Metadata( snapshot.getMetadata().getId(), Optional.of(tableDetails.name()), Optional.ofNullable(snapshot.getMetadata().getDescription()), - Metadata.Format.PARQUET, + Metadata.Format.PARQUET, // TODO this depends on the table new TableSchema(tableSchemaConverter.convertDeltaSchemaToWhitefox( snapshot.getMetadata().getSchema())), snapshot.getMetadata().getPartitionColumns(), snapshot.getMetadata().getConfiguration(), - Optional.of(snapshot.getVersion()), + snapshot.getVersion(), Optional.empty(), // size is fine to be empty - Optional.empty() // numFiles is ok to be empty here too + Optional.empty(), // numFiles is ok to be empty here too + compatibleCapabilities( + DeltaSharingCapabilities.defaultValue(), + requestCapabilities) // TODO read it from the table ); } + private DeltaSharingCapabilities compatibleCapabilities( + DeltaSharingCapabilities tableCapabilities, DeltaSharingCapabilities requestCapabilities) { + if (requestCapabilities + .getResponseFormat() + .contains(DeltaSharingCapabilities.DeltaSharingResponseFormat.DELTA)) { + return tableCapabilities.withResponseFormat( + DeltaSharingCapabilities.DeltaSharingResponseFormat.DELTA); + } else { + return tableCapabilities.withResponseFormat( + DeltaSharingCapabilities.DeltaSharingResponseFormat.PARQUET); + } + } + public Optional getTableVersion(Optional startingTimestamp) { return getSnapshot(startingTimestamp).map(Snapshot::getVersion); } - public ReadTableResultToBeSigned queryTable(ReadTableRequest readTableRequest) { + public ReadTableResultToBeSigned queryTable( + ReadTableRequest readTableRequest, DeltaSharingCapabilities requestCapabilities) { Snapshot snapshot; if (readTableRequest instanceof ReadTableRequest.ReadTableCurrentVersion) { snapshot = deltaLog.snapshot(); @@ -92,8 +118,8 @@ public ReadTableResultToBeSigned queryTable(ReadTableRequest readTableRequest) { throw new IllegalArgumentException("Unknown ReadTableRequest type: " + readTableRequest); } return new ReadTableResultToBeSigned( - new Protocol(Optional.of(1)), - metadataFromSnapshot(snapshot), + new Protocol(Optional.of(1), Optional.of(1)), // TODO + metadataFromSnapshot(snapshot, requestCapabilities), snapshot.getAllFiles().stream() .map(f -> new TableFileToBeSigned( location() + "/" + f.getPath(), diff --git a/server/core/src/main/java/io/whitefox/core/services/DeltaSharesService.java b/server/core/src/main/java/io/whitefox/core/services/DeltaSharesService.java index 82c2baba7..2267a5a6f 100644 --- a/server/core/src/main/java/io/whitefox/core/services/DeltaSharesService.java +++ b/server/core/src/main/java/io/whitefox/core/services/DeltaSharesService.java @@ -1,36 +1,78 @@ package io.whitefox.core.services; import io.whitefox.core.*; -import io.whitefox.core.Metadata; +import io.whitefox.core.delta.Metadata; import io.whitefox.core.Schema; import io.whitefox.core.Share; import io.whitefox.core.SharedTable; +import io.whitefox.core.results.ReadTableResult; + import java.util.List; import java.util.Optional; public interface DeltaSharesService { + /** + * @return the table version if it exists, otherwise {@link Optional#empty} + */ Optional getTableVersion( String share, String schema, String table, String startingTimestamp); + /** + * @return a list (up to maxResults size) of {@link Share} and a token to retrieve the next page. + * The listing will start from the token passed as nextPageToken (if any, otherwise from the first). + */ ContentAndToken> listShares( Optional nextPageToken, Optional maxResults); + /** + * @return the table metadata if exists, otherwise {@link Optional#empty}. + * This method will also evaluate the input deltaSharingCapabilities and match + * them with the actual table capabilities. + */ Optional getTableMetadata( - String share, String schema, String table, String startingTimestamp); + String share, + String schema, + String table, + String startingTimestamp, + DeltaSharingCapabilities deltaSharingCapabilities); + /** + * @return a list (up to maxResults size) of {@link Schema} that are part of input share + * and a token to retrieve the next page. The listing will start from the token passed as nextPageToken + * (if any, otherwise from the first). If the share does not exist, it will return {@link Optional#empty()} + */ Optional>> listSchemas( String share, Optional nextPageToken, Optional maxResults); + /** + * @return a list (up to maxResults size) of {@link SharedTable} that are part of input share and schema + * and a token to retrieve the next page. The listing will start from the token passed as nextPageToken + * (if any, otherwise from the first). If the share or the schema does not exist, + * it will return {@link Optional#empty()} + */ Optional>> listTables( String share, String schema, Optional nextPageToken, Optional maxResults); + /** + * @return a list (up to maxResults size) of {@link SharedTable} that are part of input share + * and a token to retrieve the next page. The listing will start from the token passed as nextPageToken + * (if any, otherwise from the first). If the share does not exist, it will return {@link Optional#empty()} + */ Optional>> listTablesOfShare( String share, Optional token, Optional maxResults); + /** + * @return metadata needed to read the table from a delta-sharing client. + * This method will also evaluate the input deltaSharingCapabilities and match them with the actual table capabilities. + */ ReadTableResult queryTable( - String share, String schema, String table, ReadTableRequest queryRequest); + String share, + String schema, + String table, + ReadTableRequest queryRequest, + DeltaSharingCapabilities capabilities); } diff --git a/server/core/src/main/java/io/whitefox/core/services/DeltaSharesServiceImpl.java b/server/core/src/main/java/io/whitefox/core/services/DeltaSharesServiceImpl.java index a2a64338a..66d834081 100644 --- a/server/core/src/main/java/io/whitefox/core/services/DeltaSharesServiceImpl.java +++ b/server/core/src/main/java/io/whitefox/core/services/DeltaSharesServiceImpl.java @@ -1,6 +1,8 @@ package io.whitefox.core.services; import io.whitefox.core.*; +import io.whitefox.core.delta.Metadata; +import io.whitefox.core.results.ReadTableResult; import io.whitefox.core.services.exceptions.TableNotFound; import io.whitefox.persistence.StorageManager; import jakarta.enterprise.context.ApplicationScoped; @@ -123,7 +125,11 @@ public Optional>> listTablesOfShare( @Override public ReadTableResult queryTable( - String share, String schema, String tableName, ReadTableRequest queryRequest) { + String share, + String schema, + String tableName, + ReadTableRequest queryRequest, + DeltaSharingCapabilities requestCapabilities) { SharedTable sharedTable = storageManager .getSharedTable(share, schema, tableName) .orElseThrow(() -> new TableNotFound(String.format( diff --git a/server/core/src/main/java/io/whitefox/core/services/DeltaSharingCapabilities.java b/server/core/src/main/java/io/whitefox/core/services/DeltaSharingCapabilities.java new file mode 100644 index 000000000..461cc5ed5 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/services/DeltaSharingCapabilities.java @@ -0,0 +1,185 @@ +package io.whitefox.core.services; + +import io.whitefox.annotations.SkipCoverageGenerated; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DeltaSharingCapabilities { + private static final Logger log = LoggerFactory.getLogger(DeltaSharingCapabilities.class); + private final Map> values; + + public Map> values() { + return values; + } + + public static final String DELTA_SHARE_CAPABILITIES_HEADER = "delta-sharing-capabilities"; + + public DeltaSharingCapabilities(Map> values) { + this.values = Collections.unmodifiableMap(values); + } + + public static DeltaSharingCapabilities defaultValue() { + return new DeltaSharingCapabilities((String) null); + } + + public DeltaSharingCapabilities(String s) { + this(parseDeltaSharingCapabilities(s)); + } + + public DeltaSharingCapabilities withResponseFormat( + DeltaSharingCapabilities.DeltaSharingResponseFormat format) { + return new DeltaSharingCapabilities(Stream.concat( + values.entrySet().stream(), + Stream.of(Map.entry(DELTA_SHARING_RESPONSE_FORMAT, Set.of(format.toString())))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + } + + private static Map> parseDeltaSharingCapabilities(String s) { + if (s == null) { + return Map.of(); + } else { + return Arrays.stream(s.split(";", -1)) + .flatMap(entry -> { + if (StringUtils.isBlank(entry)) { + return Stream.empty(); + } + var keyAndValues = entry.split("=", -1); + if (keyAndValues.length != 2) { + throw new IllegalArgumentException(String.format( + "Each %s must be in the format key=value", DELTA_SHARE_CAPABILITIES_HEADER)); + } + var key = keyAndValues[0]; + var values = Arrays.stream(keyAndValues[1].split(",", -1)) + .collect(Collectors.toUnmodifiableSet()); + return Stream.of(Map.entry(key, values)); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + } + + public Set getResponseFormat() { + var value = values.get(DELTA_SHARING_RESPONSE_FORMAT); + if (value == null || value.isEmpty()) { + return Set.of(DeltaSharingResponseFormat.PARQUET); + } else { + return value.stream() + .flatMap(s -> { + try { + return Stream.of(DeltaSharingResponseFormat.valueOf(s.toUpperCase())); + } catch (IllegalArgumentException e) { + log.warn("Ignoring unknown {}: {}", DELTA_SHARING_RESPONSE_FORMAT, s); + return Stream.empty(); + } + }) + .collect(Collectors.toUnmodifiableSet()); + } + } + + public Set getReaderFeatures() { + var value = values.get(DELTA_SHARING_READER_FEATURES); + if (value == null || value.isEmpty()) { + return Set.of(); + } else { + return value.stream() + .flatMap(s -> { + try { + return Stream.of(DeltaSharingFeatures.fromString(s)); + } catch (IllegalArgumentException e) { + log.warn("Ignoring unknown {}: {}", DELTA_SHARING_READER_FEATURES, s); + return Stream.empty(); + } + }) + .collect(Collectors.toSet()); + } + } + + @Override + @SkipCoverageGenerated + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeltaSharingCapabilities that = (DeltaSharingCapabilities) o; + return Objects.equals(values, that.values); + } + + @Override + @SkipCoverageGenerated + public int hashCode() { + return Objects.hash(values); + } + + @Override + @SkipCoverageGenerated + public String toString() { + return "DeltaSharingCapabilities{" + "values=" + values + '}'; + } + + public static final String DELTA_SHARING_RESPONSE_FORMAT = "responseformat"; + + public enum DeltaSharingResponseFormat { + DELTA, + PARQUET + } + + public static final String DELTA_SHARING_READER_FEATURES = "readerfeatures"; + static final String DELTA_SHARING_READER_FEATURE_DELETION_VECTOR = "deletionvectors"; + static final String DELTA_SHARING_READER_FEATURE_COLUMN_MAPPING = "columnmapping"; + static final String DELTA_SHARING_READER_FEATURE_TIMESTAMP_NTZ = "timestampntz"; + static final String DELTA_SHARING_READER_FEATURE_DOMAIN_METADATA = "domainmetadata"; + static final String DELTA_SHARING_READER_FEATURE_V2CHECKPOINT = "v2checkpoint"; + static final String DELTA_SHARING_READER_FEATURE_CHECK_CONSTRAINTS = "checkconstraints"; + static final String DELTA_SHARING_READER_FEATURE_GENERATED_COLUMNS = "generatedcolumns"; + static final String DELTA_SHARING_READER_FEATURE_ALLOW_COLUMN_DEFAULTS = "allowcolumndefaults"; + static final String DELTA_SHARING_READER_FEATURE_IDENTITY_COLUMNS = "identitycolumns"; + + public enum DeltaSharingFeatures { + DELETION_VECTORS(DELTA_SHARING_READER_FEATURE_DELETION_VECTOR), + COLUMN_MAPPING(DELTA_SHARING_READER_FEATURE_COLUMN_MAPPING), + TIMESTAMP_NTZ(DELTA_SHARING_READER_FEATURE_TIMESTAMP_NTZ), + DOMAIN_METADATA(DELTA_SHARING_READER_FEATURE_DOMAIN_METADATA), + V2CHECKPOINT(DELTA_SHARING_READER_FEATURE_V2CHECKPOINT), + CHECK_CONSTRAINTS(DELTA_SHARING_READER_FEATURE_CHECK_CONSTRAINTS), + GENERATED_COLUMNS(DELTA_SHARING_READER_FEATURE_GENERATED_COLUMNS), + ALLOW_COLUMN_DEFAULTS(DELTA_SHARING_READER_FEATURE_ALLOW_COLUMN_DEFAULTS), + IDENTITY_COLUMNS(DELTA_SHARING_READER_FEATURE_IDENTITY_COLUMNS); + + DeltaSharingFeatures(String stringRepresentation) { + this.stringRepresentation = stringRepresentation; + } + + private final String stringRepresentation; + + public String stringRepresentation() { + return stringRepresentation; + } + + public static DeltaSharingFeatures fromString(String s) { + switch (s.toLowerCase()) { + case DELTA_SHARING_READER_FEATURE_DELETION_VECTOR: + return DeltaSharingFeatures.DELETION_VECTORS; + case DELTA_SHARING_READER_FEATURE_COLUMN_MAPPING: + return DeltaSharingFeatures.COLUMN_MAPPING; + case DELTA_SHARING_READER_FEATURE_TIMESTAMP_NTZ: + return DeltaSharingFeatures.TIMESTAMP_NTZ; + case DELTA_SHARING_READER_FEATURE_DOMAIN_METADATA: + return DeltaSharingFeatures.DOMAIN_METADATA; + case DELTA_SHARING_READER_FEATURE_V2CHECKPOINT: + return DeltaSharingFeatures.V2CHECKPOINT; + case DELTA_SHARING_READER_FEATURE_CHECK_CONSTRAINTS: + return DeltaSharingFeatures.CHECK_CONSTRAINTS; + case DELTA_SHARING_READER_FEATURE_GENERATED_COLUMNS: + return DeltaSharingFeatures.GENERATED_COLUMNS; + case DELTA_SHARING_READER_FEATURE_ALLOW_COLUMN_DEFAULTS: + return DeltaSharingFeatures.ALLOW_COLUMN_DEFAULTS; + case DELTA_SHARING_READER_FEATURE_IDENTITY_COLUMNS: + return DeltaSharingFeatures.IDENTITY_COLUMNS; + default: + throw new IllegalArgumentException("Unknown reader feature: " + s); + } + } + } +} diff --git a/server/core/src/main/java/io/whitefox/core/services/FileSigner.java b/server/core/src/main/java/io/whitefox/core/services/FileSigner.java index 5497d5a4e..aa0a182cd 100644 --- a/server/core/src/main/java/io/whitefox/core/services/FileSigner.java +++ b/server/core/src/main/java/io/whitefox/core/services/FileSigner.java @@ -1,8 +1,8 @@ package io.whitefox.core.services; -import io.whitefox.core.TableFile; -import io.whitefox.core.TableFileToBeSigned; +import io.whitefox.core.delta.signed.FileAction; +import io.whitefox.core.delta.unsigned.FileActionToBeSigned; public interface FileSigner { - TableFile sign(TableFileToBeSigned s); + FileAction sign(FileActionToBeSigned s); } diff --git a/server/core/src/main/java/io/whitefox/core/services/NoOpSigner.java b/server/core/src/main/java/io/whitefox/core/services/NoOpSigner.java index f585deb53..aaa4c5385 100644 --- a/server/core/src/main/java/io/whitefox/core/services/NoOpSigner.java +++ b/server/core/src/main/java/io/whitefox/core/services/NoOpSigner.java @@ -2,19 +2,66 @@ import io.whitefox.core.TableFile; import io.whitefox.core.TableFileToBeSigned; +import io.whitefox.core.delta.signed.*; +import io.whitefox.core.delta.unsigned.*; + import java.util.Optional; public class NoOpSigner implements FileSigner { @Override - public TableFile sign(TableFileToBeSigned s) { - return new TableFile( - s.url(), - s.url(), // maybe we can hash this - s.size(), - Optional.of(s.version()), - s.timestamp(), - s.partitionValues(), - Long.MAX_VALUE, - Optional.of(s.stats())); + public FileAction sign(FileActionToBeSigned s) { + if (s instanceof DeltaFileToBeSigned){ + var delta = (DeltaFileToBeSigned) s; + var pfa = sign(delta.getDeltaSingleAction()); + return DeltaFile.signed( + delta, + pfa, + pfa.getExpTs(), + delta.getId() + ); + } else if (s instanceof ParquetFileActionToBeSigned) { + return sign((ParquetFileActionToBeSigned) s); + } else { + throw new IllegalArgumentException(String.format("Unknown FileAction to be signed %s", s)); + } + } + + private ParquetFileAction sign(ParquetFileActionToBeSigned tbs) { + if (tbs instanceof ParquetFileToBeSigned) { + var f = (ParquetFileToBeSigned) tbs; + return ParquetFile.signed( + f, + f.getId(), + f.getExpirationTimestamp(), + sign(f.getUrl()) + ); + } else if (tbs instanceof ParquetAddFileToBeSigned) { + var f = (ParquetAddFileToBeSigned) tbs; + return ParquetAddFile.signed( + f, + f.getExpirationTimestamp(), + sign(f.getUrl()) + ); + } else if (tbs instanceof ParquetCDFFileToBeSigned) { + var f = (ParquetCDFFileToBeSigned) tbs; + return ParquetCDFFile.signed( + f, + f.getExpirationTimestamp(), + sign(f.getUrl()) + ); + } else if (tbs instanceof ParquetRemoveFileToBeSigned) { + var f = (ParquetRemoveFileToBeSigned) tbs; + return ParquetRemoveFile.signed( + f, + f.getExpirationTimestamp(), + sign(f.getUrl()) + ); + } else { + throw new IllegalArgumentException(String.format("Unknown FileAction to be signed %s", tbs)); + } + } + + private String sign(String url) { + return url; } } diff --git a/server/core/src/main/java/io/whitefox/core/services/S3FileSigner.java b/server/core/src/main/java/io/whitefox/core/services/S3FileSigner.java index c7d4554ff..b1fb67aab 100644 --- a/server/core/src/main/java/io/whitefox/core/services/S3FileSigner.java +++ b/server/core/src/main/java/io/whitefox/core/services/S3FileSigner.java @@ -8,9 +8,13 @@ import io.whitefox.core.TableFile; import io.whitefox.core.TableFileIdHashFunction; import io.whitefox.core.TableFileToBeSigned; +import io.whitefox.core.delta.signed.*; +import io.whitefox.core.delta.unsigned.*; + import java.net.URI; import java.net.URL; import java.util.Date; +import java.util.Objects; import java.util.Optional; public class S3FileSigner implements FileSigner { @@ -24,30 +28,82 @@ public S3FileSigner(AmazonS3 s3Client, TableFileIdHashFunction tableFileIdHashFu } @Override - public TableFile sign(TableFileToBeSigned s) { - URI absPath = URI.create(s.url()); + public FileAction sign(FileActionToBeSigned s) { + if (s instanceof DeltaFileToBeSigned) { + var delta = (DeltaFileToBeSigned) s; + var pfa = sign(delta.getDeltaSingleAction()); + return DeltaFile.signed( + delta, + pfa.getExpTs(), + pfa + ); + } else if (s instanceof ParquetFileActionToBeSigned) { + return sign((ParquetFileActionToBeSigned) s); + } else { + throw new IllegalArgumentException(String.format("Unknown FileAction to be signed %s", s)); + } + } + + private ParquetFileAction sign(ParquetFileActionToBeSigned tbs) { + if (tbs instanceof ParquetFileToBeSigned) { + var f = (ParquetFileToBeSigned) tbs; + return ParquetFile.signed( + f, + f.getExpirationTimestamp(), + sign(f.getUrl()) + ); + } else if (tbs instanceof ParquetAddFileToBeSigned) { + var f = (ParquetAddFileToBeSigned) tbs; + return ParquetAddFile.signed( + f, + f.getExpirationTimestamp(), + sign(f.getUrl()) + ); + } else if (tbs instanceof ParquetCDFFileToBeSigned) { + var f = (ParquetCDFFileToBeSigned) tbs; + return ParquetCDFFile.signed( + f, + f.getExpirationTimestamp(), + sign(f.getUrl()) + ); + } else if (tbs instanceof ParquetRemoveFileToBeSigned) { + var f = (ParquetRemoveFileToBeSigned) tbs; + return ParquetRemoveFile.signed( + f, + f.getExpirationTimestamp(), + sign(f.getUrl()) + ); + } else { + throw new IllegalArgumentException(String.format("Unknown FileAction to be signed %s", tbs)); + } + } + + private String sign(String url) { + URI absPath = URI.create(url); String bucketName = absPath.getHost(); String keyName = stripPrefix(absPath.getPath(), "/"); Date expirationDate = new Date(System.currentTimeMillis() + SECONDS.toMillis(3600)); - URL presignedUrl = s3Client.generatePresignedUrl( - buildPresignedUrlRequest(bucketName, keyName, expirationDate)); - return new TableFile( - presignedUrl.toString(), - tableFileIdHashFunction.hash(s.url()), - s.size(), - Optional.of(s.version()), - s.timestamp(), - s.partitionValues(), - expirationDate.getTime(), - Optional.ofNullable(s.stats())); + return Objects.toString(s3Client.generatePresignedUrl(buildPresignedUrlRequest(bucketName, keyName, expirationDate))); } +// @Override +// public TableFile sign(TableFileToBeSigned s) { + // return new TableFile( +// presignedUrl.toString(), +// tableFileIdHashFunction.hash(s.url()), +// s.size(), +// Optional.of(s.version()), +// s.timestamp(), +// s.partitionValues(), +// expirationDate.getTime(), +// Optional.ofNullable(s.stats())); +// } +// private String stripPrefix(String string, String prefix) { if (string.startsWith(prefix)) { return string.substring(prefix.length()); } else return string; } - private GeneratePresignedUrlRequest buildPresignedUrlRequest( String bucketName, String keyName, Date expirationDate) { return new GeneratePresignedUrlRequest(bucketName, keyName) diff --git a/server/core/src/test/java/io/whitefox/core/services/DeltaSharedTableTest.java b/server/core/src/test/java/io/whitefox/core/services/DeltaSharedTableTest.java index 2a7a3c644..6b219553c 100644 --- a/server/core/src/test/java/io/whitefox/core/services/DeltaSharedTableTest.java +++ b/server/core/src/test/java/io/whitefox/core/services/DeltaSharedTableTest.java @@ -28,7 +28,7 @@ void getTableVersion() throws ExecutionException, InterruptedException { void getTableMetadata() { var PTable = new SharedTable("delta-table", "default", "share1", deltaTable("delta-table")); var DTable = DeltaSharedTable.of(PTable); - var metadata = DTable.getMetadata(Optional.empty()); + var metadata = DTable.getMetadata(Optional.empty(), DeltaSharingCapabilities.defaultValue()); assertTrue(metadata.isPresent()); assertEquals("56d48189-cdbc-44f2-9b0e-2bded4c79ed7", metadata.get().id()); } diff --git a/server/core/src/test/java/io/whitefox/core/services/DeltaSharingCapabilitiesTest.java b/server/core/src/test/java/io/whitefox/core/services/DeltaSharingCapabilitiesTest.java new file mode 100644 index 000000000..f7866bb20 --- /dev/null +++ b/server/core/src/test/java/io/whitefox/core/services/DeltaSharingCapabilitiesTest.java @@ -0,0 +1,111 @@ +package io.whitefox.core.services; + +import java.util.Set; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class DeltaSharingCapabilitiesTest { + + @Test + void parseSimpleResponseFormatDelta() { + Assertions.assertEquals( + Set.of(DeltaSharingCapabilities.DeltaSharingResponseFormat.DELTA), + new DeltaSharingCapabilities("responseformat=delta").getResponseFormat()); + } + + @Test + void parseSimpleResponseFormatParquet() { + Assertions.assertEquals( + Set.of(DeltaSharingCapabilities.DeltaSharingResponseFormat.PARQUET), + new DeltaSharingCapabilities("responseformat=PaRquEt").getResponseFormat()); + } + + @Test + void failToParseUnknownResponseFormatAndReturnNothing() { + Assertions.assertEquals( + Set.of(), new DeltaSharingCapabilities("responseformat=iceberg").getResponseFormat()); + } + + @Test + void failToParseUnknownResponseFormatAndReturnOthers() { + Assertions.assertEquals( + Set.of( + DeltaSharingCapabilities.DeltaSharingResponseFormat.DELTA, + DeltaSharingCapabilities.DeltaSharingResponseFormat.PARQUET), + new DeltaSharingCapabilities("responseformat=iceberg,parquet,delta").getResponseFormat()); + } + + @Test + void noCapabilitiesEqualsDefault() { + Assertions.assertEquals( + Set.of(DeltaSharingCapabilities.DeltaSharingResponseFormat.PARQUET), + new DeltaSharingCapabilities((String) null).getResponseFormat()); + Assertions.assertEquals( + Set.of(), new DeltaSharingCapabilities((String) null).getReaderFeatures()); + Assertions.assertEquals( + Set.of(DeltaSharingCapabilities.DeltaSharingResponseFormat.PARQUET), + new DeltaSharingCapabilities("").getResponseFormat()); + Assertions.assertEquals(Set.of(), new DeltaSharingCapabilities("").getReaderFeatures()); + } + + @Test + void parseSimpleReaderFeature() { + Assertions.assertEquals( + Set.of(DeltaSharingCapabilities.DeltaSharingFeatures.DELETION_VECTORS), + new DeltaSharingCapabilities(String.format( + "%s=%s", + DeltaSharingCapabilities.DELTA_SHARING_READER_FEATURES, + DeltaSharingCapabilities.DELTA_SHARING_READER_FEATURE_DELETION_VECTOR)) + .getReaderFeatures()); + } + + @Test + void failToParseUnknownReaderFeatureAndReturnNothing() { + Assertions.assertEquals( + Set.of(), + new DeltaSharingCapabilities(String.format( + "%s=%s", DeltaSharingCapabilities.DELTA_SHARING_READER_FEATURES, "unknown")) + .getReaderFeatures()); + } + + @Test + void failToParseUnknownReaderFeatureAndReturnOthers() { + Assertions.assertEquals( + Set.of( + DeltaSharingCapabilities.DeltaSharingFeatures.COLUMN_MAPPING, + DeltaSharingCapabilities.DeltaSharingFeatures.DOMAIN_METADATA), + new DeltaSharingCapabilities(String.format( + "%s=%s,%s,%s", + DeltaSharingCapabilities.DELTA_SHARING_READER_FEATURES, + "unknown", + DeltaSharingCapabilities.DELTA_SHARING_READER_FEATURE_COLUMN_MAPPING, + DeltaSharingCapabilities.DELTA_SHARING_READER_FEATURE_DOMAIN_METADATA)) + .getReaderFeatures()); + } + + @Test + void kitchenSink() { + var readerFeatures = String.format( + "%s=%s,%s,%s", + DeltaSharingCapabilities.DELTA_SHARING_READER_FEATURES, + "unknown", + DeltaSharingCapabilities.DELTA_SHARING_READER_FEATURE_COLUMN_MAPPING, + DeltaSharingCapabilities.DELTA_SHARING_READER_FEATURE_DOMAIN_METADATA); + var responseFormat = "responseformat=iceberg,parquet,delta"; + var randomStuff = "thisIsAKey=these,some,values,lol"; + var capabilities = new DeltaSharingCapabilities( + String.format("%s;%s;%s", readerFeatures, responseFormat, randomStuff)); + Assertions.assertEquals( + Set.of( + DeltaSharingCapabilities.DeltaSharingResponseFormat.DELTA, + DeltaSharingCapabilities.DeltaSharingResponseFormat.PARQUET), + capabilities.getResponseFormat()); + Assertions.assertEquals( + Set.of( + DeltaSharingCapabilities.DeltaSharingFeatures.COLUMN_MAPPING, + DeltaSharingCapabilities.DeltaSharingFeatures.DOMAIN_METADATA), + capabilities.getReaderFeatures()); + Assertions.assertEquals( + Set.of("these", "some", "values", "lol"), capabilities.values().get("thisIsAKey")); + } +} diff --git a/shell.nix b/shell.nix new file mode 100644 index 000000000..44a894860 --- /dev/null +++ b/shell.nix @@ -0,0 +1,6 @@ +with (import {}); +mkShell { + buildInputs = [ + jdk11 + ]; +} \ No newline at end of file