Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Swift] Spooled client protocol extension #22662

Closed
Tracked by #22271
wendigo opened this issue Jul 13, 2024 · 24 comments · Fixed by #22995
Closed
Tracked by #22271

[Swift] Spooled client protocol extension #22662

wendigo opened this issue Jul 13, 2024 · 24 comments · Fixed by #22995
Assignees
Labels
jdbc Relates to Trino JDBC driver performance roadmap Top level issues for major efforts in the project

Comments

@wendigo
Copy link
Contributor

wendigo commented Jul 13, 2024

Spooled Trino protocol (protocol v1+)

Important

This document is a design proposal and can change in the future. This is part of the ongoing effort to improve performance of the existing client protocol (#22271)

Background

The existing Trino client protocol is over 10 years old and served different purpose upon its inception than the use cases that we want to support today. These new requirements include:

  • Ability to retrieve the result set, out-of-order, in parallel,
  • Support for more space and CPU-efficient result set formats (row and column-oriented),
  • Reduce the load on the coordinator during result set generation by moving it to the workers.

To address aforementioned requirements, we are going to introduce an extension to the existing protocol called spooled protocol, which extend its semantics in a backward-compatible fashion.

Existing client protocol

The current protocol consists of two endpoints that are used to submit queries and retrieve partial results:

  • submission (POST /v1/statement)
  • partial result set retrieval (GET /v1/statement/executing)

Both endpoints share the same QueryResults definition:

{
  "id": "20160128_214710_00012_rk68b",
  "infoUri": "http://coordinator/query.html?20160128_214710_00012_rk68b",
  "nextUri": null,
  "columns": [
    {
      "name": "_col0",
      "type": "bigint",
      "typeSignature": {
        "rawType": "bigint",
        "arguments": []
      }
    }
  ],
  "data": [[1],[5]]
  ...
}

Note

Some of the query results fields were omitted for brevity.

The important fields related to result set retrieval are:

  • data - contains encoded JSON data as list of row values. Individual values are encoded as a result of the Type.getObjectValue call (important: these are low-level, raw representations that are interpreted on the client side (see: AbstractTrinoResultSet).
  • columns - contains list of columns and types descriptors. It's worth to note that data field can't have non-null value without column information present,
  • nextUri - used to retrieve next partial result set.

Spooled protocol extension

To express the partial result sets having a different encoding, spooled protocol extension is introduced, which contains backward and forward-compatible changes to the existing protocol.

These changes are:

  • New data field on-the-wire representation and semantics,
  • Additional headers (i.e. Trino-Query-Data-Encoding)

Trino-Query-Data-Encoding

The header is used by the client libraries to use the new spooled protocol extension and contains the list of encoding in the order of preference (comma separated). If any of the encodings is supported by the server, the client can expect the QueryResults.data to be returned in a new format. If it's not supported, server fallbacks to the existing behaviour for compatibility.

Note

Once the encoding is negotiated between the client and the server, it stays the same for the duration of the query which means that client can expect results in a single format - either existing one or extended.

EncodedQueryData

EncodedQueryData is the extension of the QueryResults.data field that has the following on-the-wire representation:

{
  "id": "20160128_214710_00012_rk68b",
  "infoUri": "http://coordinator/query.html?20160128_214710_00012_rk68b",
  "nextUri": null,
  "columns": [...],
  "data": {
    "encoding": "json",
    "segments": [
      {
        "type": "inline",
        "data": "c3VwZXI=",
        "metadata": {
          "rowOffset": 0,
          "rowsCount": 100,
          "segmentSize": 5
        }
      },
      {
        "type": "spooled",
        "uri": "http://localhost:8080/v1/spooled/download/8eb0d5eb8a45e4a4ac60b284d3173",
        "ackUri": "http://localhost:8080/v1/spooled/ack/8eb0d5eb8a45e4a4ac60b284d3173"
        "headers": {
            "x-amz-server-side​-encryption​-customer-key": ["key"],
            "x-amz-server-side​-encryption​-customer-algorithm": ["AES256"]
        },
        "metadata": {
          "rowOffset": 200,
          "rowsCount": 100,
          "segmentSize": 1024
        }
      }
    ]
  }
}

Meaning of the fields is following:

  • encoding - the string identifier of the encoding format as negotiated between the client and the server, chosen from the client requested encodings sent in the Trino-Query-Data-Encoding header. These are known and shared by both the client and the server and are part of the spooled protocol extension definition,
  • metadata - the Map<String, Object> that represents metadata of the partial result set, i.e. decryption key used to decrypt encoded data,
  • segments - list of data segments representing partial result set. Single QueryResults object can contain arbitrary number of segments which are always ordered.

DataSegment

DataSegment is a representation of the encoded data and has two distinct types: inline and spooled with following semantics:

  • inline segment holds encoded, partial result set data in the base64-encoded data field,
  • spooled segment type points to the encoded partial result set spooled in the configured storage location. Location designated by the URI uri field value along with HTTP headers stored in Map<String, List<String>> headers field are used to retrieve spooled byte[] data from the spooling storage. uri and headers are opaque and contain all of the authentication information required which means that client implementation can retrieve spooled segment data by doing an ordinary GET HTTP call without any processing of the URI or the headers. It's worth to note that URI can point to arbitrary location including endpoints exposed on the coordinator or storage used for spooling (i.e. presigned URIs on S3). This depends on the actual implementation of the spooling manager and server configuration. ackUri is used to along with the http GET method to explicitly acknowledge that segment was retrieved which allows the coordinator to eagerly remove it from the storage, rather than waiting for the TTL to pass.

Note

Data segments are returned in the order specified by the query semantics. Data is spooled when client library requests next batch of result set data by following nextUri. It's up to the client library to decide whether all segments will be spooled at once, buffered and then read.

Important

In order to support the spooled protocol, client implementations need to support both inline and spooled representations as server can use these interchangeably.

Caution

Client implementation must not send any additional information when retrieving spooled data segment, particularly the authentication headers used to authenticate to a Trino.

DataSegment contains a metadata field of type Map<String, Object> with attributes describing a data segment.

Following metadata attributes are always present:

  • rowOffset of the data segment in relation to the whole result set (long),
  • rowsCount number of the rows in the data segment (long),
  • segmentSize size of the encoded data segment (long).

Optional metadata attributes can be a part of the encoding definition shared between the client and server implementations (to be discussed).

Implementation considerations

Compatibility

The implementation of this design must to take into the consideration backward and forward compatibility which means two things:

  • new client implementation must be compatible with old server implementation (without this extension)
  • old client implementation must be compatible with new server implementation (with this extension)

As this is an extension of the protocol, it's an opt-in feature that requires a new client and a new server with additional configuration (spooling storage).

Encodings

Encoding describes the serialization format (like JSON) and other information required to both write (encode) and read (decode) result set as data segments. Example of encodings are:

  • json+zstd which reads as JSON serialization with ZSTD compression,
  • parquet+snappy which reads as parquet encoding with Snappy compression.

Definition and meaning of the encoding is a contract between client and the server and will be specified separately in the future for each encoding.

Spooling

Spooling by definition means buffering data in a separate location by the server and retrieving it later by the client. In order to support spooled protocol extension servers (coordinator and workers) are required to be configured to use the spooling. In the initial implementation we plan to add a plugin SPI for the SpoolingManager and implementation based on the native file-system API. SpoolingManager is responsible for both storing and retrieving spooled data segments from the external storage.

Segments

It's up to the server implementation whether the partial result set will be inlined, spooled, either or both. It's required that client requesting a spooled protocol extensions supports both types of the data segments.

Plugabillity

An encoding is a shared definition between the client and the server, therefore it can't be a Plugin SPI interface. In the initial implementation, we do not plan to provide an external SPI for adding new encodings. These will be shipped as part of the implementation of the client and server libraries. Spooling process on the server-side is pluggable with the new SpoolingManager SPI.

Backward compatibility

Above and for all, the new spooled protocol can't break existing one as it's an opt-in client/server feature. At any given moment, the client or the server can fall back to the original, unmodified v1 protocol to maintain backward compatibility.

Performance

For small result sets, spooling on the storage adds an additional overhead which can be avoided by inlining the data in the response. For larger ones, spooling allows faster, parallel data retrieval as the result set can be fully spooled and query finished as fast as possible and then retrieved, decoded and processed by the client after the query has already completed.

We plan to implement spooling on the worker side which means that when the result set is partially or fully spooled, coordinator node is not involved in encoding of the data to a requested format and spooling it in the storage. According to our benchmarks, JSON encoding in the existing format accounts for significant amount of the CPU time during output processing. Initially we plan to support existing JSON format but the encoding will be moved to worker nodes which will reduce the load on the coordinator.

Security

Protocol supports server-side encryption of data. We've settled on server-side encryption, rather than client-side due to the following reasons:

  • straightforward client support (passing headers only),
  • native support for server-side encryption in cloud storage (S3, GCS, ABFS) which reduces the CPU usage on both client and Trino side,
  • robustness of the client implementation that doesn't need to change if the encryption scheme does,

Proposed API/SPI

Server-side

public interface QueryDataEncoder
{
    DataAttributes encodeTo(OutputStream stream, List<Page> pages)
            throws IOException; // returns segment-level attributes

    String encoding(); // encoding name

    DataAttributes attributes(); // query-level attributes (like encryption key)

    interface Factory
    {
        QueryDataEncoder create(Session session, List<OutputColumn> columns);

        String encoding();
    }
}

Used to encode the data in the specified output format.

// Implementation specific identifier
public interface SpooledSegmentHandle
{
}

public interface SpoolingManager
{
    SpooledSegmentHandle create(SpoolingContext spoolingContext);

    OutputStream createOutputStream(SpooledSegmentHandle segmentHandle)
            throws Exception;

    InputStream unspool(SpooledSegmentHandle segmentHandle)
            throws IOException;

    default void acknowledge(SpooledSegmentHandle segmentHandle)
            throws IOException
    {
    }

    default Optional<URI> directLocation(SpooledSegmentHandle segmentHandle)
    {
        return Optional.empty();
    }

    default Slice serialize(SpooledSegmentHandle handle)
    {
        throw new UnsupportedOperationException();
    }

    default SpooledSegmentHandle deserialize(Slice slice)
    {
        throw new UnsupportedOperationException();
    }

    default Map<String, String> headers(SpooledSegmentHandle deserialize)
    {
        return Map.of();
    }
}

Used to manage, spool and unspool data segments.

Client-side

public interface QueryDataDecoder
{
    ResultRows decode(InputStream inputStream, DataAttributes segmentAttributes)
            throws IOException;

    String encoding();

    interface Factory
    {
        QueryDataDecoder create(List<Column> columns, DataAttributes queryAttributes);

        String encoding();
    }
}

Used to decode the data segment.

@wendigo wendigo self-assigned this Jul 13, 2024
@wendigo wendigo added roadmap Top level issues for major efforts in the project jdbc Relates to Trino JDBC driver performance labels Jul 13, 2024
@sajjoseph
Copy link
Contributor

Really excited about this enhancement.
Does this arrangement support retries for failed requests/corrupted data - especially when we can retrieve data in parallel?

@wendigo
Copy link
Contributor Author

wendigo commented Jul 13, 2024

Yes. It will be possible to retrieve the same segment couple of times

@sajjoseph
Copy link
Contributor

sajjoseph commented Jul 15, 2024

Thanks for the update. Few more thoughts.

  1. Spooled data transfer looks like a communication between client (cli/jdbc/odbc/python...) and Trino cluster. There is no opportunity to introduce independent layers (proxies, smart caching between client and coordinator etc). If we keep the URLs used (including dataUri in spooled segment) in HTTP headers, the proxies and other agents can process that data without decoding the entire http response payload.

  2. v1 protocol transmits metadata in each data chunk being transmitted. Will it continue to happen in spooled version (EncodedQueryData) as well? Is there any opportunity to improve that area?

  3. Please elaborate on the parallel data transfer aspect and how it works. I understood that we are offloading the data preparation from coordinator to workers. But, we are still sending data in a serial fashion (one at a time) from coordinator to client - right? How about coordinator share a list of nextUris with the client and the client retrieves them in parallel?

  4. Hope we can include msgpack as an encoding option for data transfer (assuming that we get better compression than other mechanisms).

Below are some ideas that I thought could benefit client/coordinator communication.

  1. Coordinator and client could establish/negotiate the targetResultSize value (and periodically reevaluate during transfer) while complex queries are running in workers (a possible solution is to send some dummy data - similar to iperf3 - from coordinator to client to determine the transmission rate).

  2. Move nextUri, partialCancelUri, infoUri to HTTP Header area. At present, only clients can see these header values and that too after decoding the data chunk. If we move these to the header fields, we can implement performance improvements where we can download the binary data as is (probably by a proxy node) and deliver to the actual client at the rate it can consume. The advantage is that Trino cluster will not have to hold the data in memory and operate at clients retrieval/consumption rate. Instead, we can offload data to the client proxy node (or in client itself if it is capable of storing/caching the binary data somewhere) at a faster rate. We definitely need to consider security and avoid man-in-the-middle attacks. But, a smart caching/data offloading mechanism can improve end user experience.

  3. Trino Cluster identifier mechanism - when there are multiple clusters behind Trino-gateway like arrangement, it will be great if we can add some kind of identifier to the nextUrl (only Trino-gateway/query routing layer/coordinator will understand/decode this value). In my private fork, I implemented this feature and it is working well. It is backward compatible with all client libraries. (This doesn't have to be a v1+ feature as it can be implemented in Trino coordinator)

Thoughts?

@nickalexander053
Copy link

Would be interested at looking at client implementation that utilise this. Specially for distributed ML frameworks that need to load a lot of data into distributed workers. Of interest would be a spark connector and a client for Ray.

@wendigo
Copy link
Contributor Author

wendigo commented Jul 15, 2024

@sajjoseph

  1. This is correct. In order to drive adoption, we've decided to make it easy for client integrations. We've taken into the account a need for query result set caching and this is possible to implement with proposed changes. We do not want to move the data segment URIs to headers though.
  2. This will be a separate change that will allow for disabling columns and/or stats retransmission.
  3. Data is encoded and spooled on the worker nodes. Coordinator instead of getting an actual data, gets the list of spooled locations (basically URIs from the client perspective). Client instead of getting an actual data, gets these URIs that points to spooled data. In a single request to a nextURI, client can get multiple segments from multiple workers (it depends only on how fast we can spool data on the worker node). The result set target doesn't apply here - we've added a separate "target segment size" to a spooling configuration that sets a target size for a spooled segment (right now it's 16 MB). Usually, you will get multiple segment URIs at once, which makes the total amount of data available for reading much higher than current 1 MB target result size. Since the spooled segment is just an URI and you can get multiple of these, you can read them in parallel, you can even buffer all of the segment URIs until the query if finished and then read all of the segments in parallel at once - there is no limitation here.
  4. I don't think that we will do msgpack but I agree that the JSON is not the best option as it costs a lot (in our benchmarks, the 90% of the CPU used on the spooling operator is the actual encoding of the data). We are still considering options for the columnar encoding of the data that we will introduce next.

Other ideas

  1. Target result size doesn't apply here. There is a "target segment size" that controls how big the spooled segment will be, this is set to 16 MB by default. This is only to avoid having too many small segments or too big ones (as you will read them to memory as a whole). When you fetch the nextURI you will usually get multiple segments from multiple workers and it's usually limited by how fast workers can spool. Instead of getting a 1 MB targetResultSize you will get multiplier of 16 MB.
  2. This is out of scope for this change. We don't want to mess with the protocol too much as it will make client implementations harder. Since the data is now "externalized" and you only get a "pointer" to the data (segment URI) the responses to nextURIs will be usually much smaller than today (KBs rather than MBs). You can assume that the proxy will be able to read the whole response to extract required informations like nextURI or cancelURI.
  3. Isn't the X-Forwarded-Path what you are looking for?

@wendigo
Copy link
Contributor Author

wendigo commented Jul 15, 2024

@nickalexander053 this is one of the use-cases we have in mind :)

@sajjoseph
Copy link
Contributor

Thanks @wendigo for the details and I really appreciate the quick response. This makes sense.

For #3 in the first list (Data is encoded and spooled on the worker nodes...), is there an opportunity to offload spooled data to external places than worker nodes? Client shouldn't care where it gets the data from and this could introduce offline execution (clients submit queries and go away and later will be notified of data availability and clients can come and pick data).

For #3 from second list (Trino Cluster identifier mechanism), did you mean X-Forwarded-Prefix? One challenge with X-Forwarded-Prefix is that clients need to be modified to take advantage of this feature. My original suggestion is backward compatible and so it will work for all clients (ODBC, JDBC, CLI, Python etc). I already implemented this in our fork and we are using it for zero downtime deployments.

@wendigo
Copy link
Contributor Author

wendigo commented Jul 16, 2024

@sajjoseph the spooling manager is a pluggable SPI. One of the implementations is based on native filesystem APIs which allow for spooling to s3/gcs/abfs out of the box.

Please file an issue for the second item. We will discuss it with the trino-gateway group.

@wendigo
Copy link
Contributor Author

wendigo commented Jul 16, 2024

@sajjoseph "Encoding and spooling happens on worker nodes" means that worker nodes are spooling to the external storage :)

@sajjoseph
Copy link
Contributor

Thanks @wendigo for the updates. I am super excited about the possibilities and how this feature can improve customer experience. Thank you!

About the cluster identifier feature, I will file an issue later.

@martint
Copy link
Member

martint commented Jul 23, 2024

Good stuff @wendigo!

A couple of questions:

  • How is the lifecycle of the segments managed? How do coordinators / workers know it's safe to delete a given segment?
  • How does the protocol manage flow control to avoid overrunning the spool (which could be in memory buffers, local file system, etc), in the case of a slow client?

And a comment:

  • Given that the size of the segments is determined statically, and a segment needs to be ready before its URL will be served to the client, this can result in increases in latency for very small queries. I'm not clear what's a good solution for this issue, but it's an inherent tension between streaming results and pre-computing results before serving that needs to be considered.

@wendigo
Copy link
Contributor Author

wendigo commented Jul 23, 2024

@martint there is an explicit http DELETE that clients can do in order to remove a segment if it's no longer needed. Beside that, we want to implement a configurable TTL for segments that will render them useless when they are expired (client won't be able to retrieve them anymore) and the GC process that will cleanup expired segments from the storage.

The data is spooled only if the nextUri is called by the client - the difference is that coordinator is not getting an actual page of data but the uri to the data. If client has abandoned calling nextUri - spooling won't progress.

As for the latency part, spooled extension allows both inline and spooled segments so it's up to the engine whether the data should be inlined or spooled. The current spooling operator implementation has a SpoolingController that tries to decide when to switch from streaming of the inlined data to a spooling (it's based on the configurable thresholds - number of initial rows/size of the initial pages) so that initial couple of hundreds of rows will be inlined, but once the threshold is reached, data will be spooled and client will get URIs instead of inline data.

I was thinking that the client implementation could actually hint the engine whether it prefers the throughput of the data transfer over initial latency.

@wendigo
Copy link
Contributor Author

wendigo commented Jul 23, 2024

@martint here's how we plugged it into the execution on the high level:

Canvas 1

@martint
Copy link
Member

martint commented Jul 23, 2024

In the case when the SpoolingOperator is spooling, how does pushback happen? Today, it's based on task output buffers filling up.

@wendigo
Copy link
Contributor Author

wendigo commented Jul 23, 2024

@martint right now the spooling operator holds only a single spooled segment that wasn't consumed by the coordinator (and thus client). It won't request/accept more pages until client retrieves the segment so it's similar to being blocked while having a buffer full but buffer is represented as the identifier of the spooled segment rather than actual data in memory.

@martint
Copy link
Member

martint commented Jul 23, 2024

How does it know it hasn't been consumed by the coordinator? The coordinator interacts with the Task Output Buffer. The Driver will attempt to fetch from the tip of the operator pipeline as long as the buffer is not full. Does this require that the output buffer understand the internals of these special pages produced by the spooling operator?

@wendigo
Copy link
Contributor Author

wendigo commented Jul 23, 2024

@martint I see what you mean now. I think that we would need to improve that area

@electrum
Copy link
Member

electrum commented Jul 24, 2024

There is no pushback for spooling. We assume that storage is effectively infinite, or large enough to store the entire result set. This is true for the expected use case of object storage for spooling.

Requiring that clients consume the results in a specific order makes this more difficult to use and can result in deadlocks. This doesn't seem useful given that storage is typically unlimited.

Consider using this with a system like MapReduce or Spark where the URIs are scheduled independently as worker tasks. There are no guarantees as to when those tasks will run, and these systems often need all the URIs up front.

We could implement a best effort limit on the total size (IIRC we have something similar for scan size) if needed. You can think of this as similar to CTAS, which writes the full result up front and doesn't have any limit on size.

@sopel39
Copy link
Member

sopel39 commented Aug 7, 2024

@wendigo is there any PoC branch with the new approach or it's still in design phase?

@wendigo
Copy link
Contributor Author

wendigo commented Aug 7, 2024

@sopel39 not yet, why are you asking? :)

wendigo added a commit that referenced this issue Aug 9, 2024
wendigo added a commit that referenced this issue Sep 6, 2024
This change introduces spooled protocol extension as described
in the #22662.

Co-authored-by: Jan Waś <[email protected]>
wendigo added a commit that referenced this issue Sep 6, 2024
This change introduces spooled protocol extension as described
in the #22662.

Co-authored-by: Jan Waś <[email protected]>
@wendigo
Copy link
Contributor Author

wendigo commented Sep 6, 2024

The inital work is done. We will now follow-up with better spooling and some experiments around formats

@samarthjain
Copy link
Member

@wendigo - is there ever a case when coordinator needs to do more than just pulling records from workers and returning to the client? For ex - a query plan that requires coordinator to do some kind of post aggregation or filtering? Or is that work already done on the worker nodes as part of the DAG execution. Curious to know whether the spooled client protocol works for all query shapes.

@wendigo
Copy link
Contributor Author

wendigo commented Sep 25, 2024

@samarthjain final aggregation/sorting happens on workers so no which means that spooling also happens on workers - the question is whether this will be a single worker or many

@wendigo
Copy link
Contributor Author

wendigo commented Nov 21, 2024

@nickalexander053 do you have any pointers how to implement support for it in Ray? :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
jdbc Relates to Trino JDBC driver performance roadmap Top level issues for major efforts in the project
Development

Successfully merging a pull request may close this issue.

7 participants