Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-17407: [Doc][FlightRPC] Flight/gRPC best practices #13873

Merged
merged 15 commits into from
Sep 15, 2022
192 changes: 192 additions & 0 deletions docs/source/cpp/flight.rst
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,198 @@ request/response. On the server, they can inspect incoming headers and
fail the request; hence, they can be used to implement custom
authentication methods.

.. _flight-best-practices:

Best practices
lidavidm marked this conversation as resolved.
Show resolved Hide resolved
==============

gRPC
----

When using the default gRPC transport, options can be passed to it via
:member:`arrow::flight::FlightClientOptions::generic_options`. For example:

.. tab-set::

.. tab-item:: C++

.. code-block:: cpp

auto options = FlightClientOptions::Defaults();
lidavidm marked this conversation as resolved.
Show resolved Hide resolved
// Set the period after which a keepalive ping is sent on transport.
options.generic_options.emplace_back(GRPC_ARG_KEEPALIVE_TIME_MS, 60000);

.. tab-item:: Python

.. code-block:: python

# Set the period after which a keepalive ping is sent on transport.
generic_options = [("GRPC_ARG_KEEPALIVE_TIME_MS", 60000)]
client = pyarrow.flight.FlightClient(server_uri, generic_options=generic_options)

Also see `best gRPC practices`_ and available `gRPC keys`_.

Re-use clients whenever possible
--------------------------------

Creating and closing clients requires setup and teardown on the client and
server side which can take away from actually handling RPCs. Reuse clients
whenever possible to avoid this. Note that clients are thread-safe, so a
single client can be shared across multiple threads.

Don’t round-robin load balance
------------------------------

`Round robin load balancing`_ means every client can have an open connection to
every server, causing an unexpected number of open connections and depleting
server resources.

Debugging connection issues
---------------------------

When facing unexpected disconnects on long running connections use netstat to
monitor the number of open connections. If number of connections is much
greater than the number of clients it might cause issues.

For debugging, certain environment variables enable logging in gRPC. For
example, ``env GRPC_VERBOSITY=info GRPC_TRACE=http`` will print the initial
headers (on both sides) so you can see if gRPC established the connection or
not. It will also print when a message is sent, so you can tell if the
connection is open or not.

gRPC may not report connection errors until a call is actually made.
Hence, to detect connection errors when creating a client, some sort
of dummy RPC should be made.

Memory management
-----------------

Flight tries to reuse allocations made by gRPC to avoid redundant
data copies. However, this means that those allocations may not
be tracked by the Arrow memory pool, and that memory usage behavior,
such as whether free memory is returned to the system, is dependent
on the allocator that gRPC uses (usually the system allocator).

A quick way of testing: attach to the process with a debugger and call
``malloc_trim``, or call :func:`ReleaseUnused <arrow::MemoryPool::ReleaseUnused>`
on the system pool. If memory usage drops, then likely, there is memory
allocated by gRPC or by the application that the system allocator was holding
on to. This can be adjusted in platform-specific ways; see an investigation
in ARROW-16697_ for an example of how this works on Linux/glibc. glibc malloc
can be explicitly told to dump caches.

Excessive traffic
-----------------

gRPC will spawn up to max threads quota of threads for concurrent clients. Those
threads are not necessarily cleaned up (a "cached thread pool" in Java parlance).
glibc malloc clears some per thread state and the default tuning never clears
caches in some workloads.

gRPC's default behavior allows one server to accept many connections from many
different clients, but if requests do a lot of work (as they may under Flight),
the server may not be able to keep up. Configuring the server to limit the number
of clients and reject requests more proactively, and configuring clients to retry
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you configure the server to do this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(IIRC, it's not possible without modifying Flight…)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is referring to gRPC settings.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say either remove this text or have a code sample of how to set the server-side limit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lidavidm marked this conversation as resolved.
Show resolved Hide resolved
with backoff (and potentially connect to a different node), would give more
consistent quality of service.

.. tab-set::

.. tab-item:: C++

.. code-block:: cpp

auto options = FlightClientOptions::Defaults();
// Set the minimum time between subsequent connection attempts.
options.generic_options.emplace_back(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 2000);

.. tab-item:: Python

.. code-block:: python

# Set the minimum time between subsequent connection attempts.
generic_options = [("GRPC_ARG_MIN_RECONNECT_BACKOFF_MS", 2000)]
client = pyarrow.flight.FlightClient(server_uri, generic_options=generic_options)


Limiting DoPut Batch Size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more like a Cookbook kind of thing IMO

--------------------------

You may wish to limit the maximum batch size a client can submit to a server through
DoPut, to prevent a request from taking up too much memory on the server. On
the client-side, set :member:`arrow::flight::FlightClientOptions::write_size_limit_bytes`.
On the server-side, set the gRPC option ``GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH``.
The client-side option will return an error that can be retried with smaller batches,
while the server-side limit will close out the connection. Setting both can be wise, since
the former provides a better user experience but the latter may be necessary to defend
against impolite clients.

rok marked this conversation as resolved.
Show resolved Hide resolved
Closing unresponsive connections
--------------------------------

1. A stale call can be closed using
:member:`arrow::flight::FlightCallOptions::stop_token`. This requires recording the
stop token at call establishment time.

.. tab-set::

.. tab-item:: C++

.. code-block:: cpp

StopSource stop_source;
FlightCallOptions options;
options.stop_token = stop_source.token();
stop_source.RequestStop(Status::Cancelled("StopSource"));
flight_client->DoAction(options, {});


2. Use call timeouts. (This is a general gRPC best practice.)

.. tab-set::

.. tab-item:: C++

.. code-block:: cpp

FlightCallOptions options;
options.timeout = TimeoutDuration{0.2};
Status status = client->GetFlightInfo(options, FlightDescriptor{}).status();

.. tab-item:: Java

.. code-block:: java

Iterator<Result> results = client.doAction(new Action("hang"), CallOptions.timeout(0.2, TimeUnit.SECONDS));

.. tab-item:: Python

.. code-block:: python

options = pyarrow.flight.FlightCallOptions(timeout=0.2)
result = client.do_action(action, options=options)


3. Client timeouts are not great for long-running streaming calls, where it may
be hard to choose a timeout for the entire operation. Instead, what is often
desired is a per-read or per-write timeout so that the operation fails if it
isn't making progress. This can be implemented with a background thread that
calls Cancel() on a timer, with the main thread resetting the timer every time
an operation completes successfully. For a fully-worked out example, see the
Cookbook.

.. note:: There is a long standing ticket for a per-write/per-read timeout
instead of a per call timeout (ARROW-6062_), but this is not (easily)
possible to implement with the blocking gRPC API.

.. _best gRPC practices: https://grpc.io/docs/guides/performance/#general
.. _gRPC keys: https://grpc.github.io/grpc/cpp/group__grpc__arg__keys.html
.. _Round robin load balancing: https://github.com/grpc/grpc/blob/master/doc/load-balancing.md#round_robin
.. _ARROW-15764: https://issues.apache.org/jira/browse/ARROW-15764
.. _ARROW-16697: https://issues.apache.org/jira/browse/ARROW-16697
.. _ARROW-6062: https://issues.apache.org/jira/browse/ARROW-6062


Alternative Transports
======================

Expand Down
4 changes: 4 additions & 0 deletions docs/source/java/flight.rst
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ request/response. On the server, they can inspect incoming headers and
fail the request; hence, they can be used to implement custom
authentication methods.

:ref:`Flight best practices <flight-best-practices>`
====================================================


.. _`FlightClient`: https://arrow.apache.org/docs/java/reference/org/apache/arrow/flight/FlightClient.html
.. _`FlightProducer`: https://arrow.apache.org/docs/java/reference/org/apache/arrow/flight/FlightProducer.html
.. _`FlightServer`: https://arrow.apache.org/docs/java/reference/org/apache/arrow/flight/FlightServer.html
Expand Down
3 changes: 3 additions & 0 deletions docs/source/python/flight.rst
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,6 @@ Middleware are fairly limited, but they can add headers to a
request/response. On the server, they can inspect incoming headers and
fail the request; hence, they can be used to implement custom
authentication methods.

:ref:`Flight best practices <flight-best-practices>`
====================================================
78 changes: 78 additions & 0 deletions go/arrow/flight/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package flight contains server and client implementations for the Arrow Flight RPC
//
// Here we list best practices and common pitfalls for Arrow Flight usage.
//
// GRPC
rok marked this conversation as resolved.
Show resolved Hide resolved
//
// When using gRPC for transport all client methods take an optional list
// of gRPC CallOptions: https://pkg.go.dev/google.golang.org/grpc#CallOption.
// Additional headers can be used or read via
// https://pkg.go.dev/google.golang.org/[email protected]/metadata with the context.
// Also see available gRPC keys
// (https://grpc.github.io/grpc/cpp/group__grpc__arg__keys.html) and a list of
// best gRPC practices (https://grpc.io/docs/guides/performance/#general).
//
// Re-use clients whenever possible
//
// Closing clients causes gRPC to close and clean up connections which can take
// several seconds per connection. This will stall server and client threads if
// done too frequently. Client reuse will avoid this issue.
//
// Don’t round-robin load balance
//
// Round robin balancing can cause every client to have an open connection to
// every server causing an unexpected number of open connections and a depletion
// of resources.
//
// Debugging
//
// Use netstat to see the number of open connections.
// For debug use env GODEBUG=http2debug=1 or GODEBUG=http2debug=2 for verbose
// http2 logs (using 2 is more verbose with frame dumps). This will print the
// initial headers (on both sides) so you can see if grpc established the
// connection or not. It will also print when a message is sent, so you can tell
// if the connection is open or not.
//
// Note: "connect" isn't really a connect and we’ve observed that gRPC does not
// give you the actual error until you first try to make a call. This can cause
// error being reported at unexpected times.
//
// Excessive traffic
//
// There are basically two ways to handle excessive traffic:
// * unbounded goroutines -> everyone gets serviced, but it might take forever.
// This is what you are seeing now. Default behaviour.
// * bounded thread pool -> Reject connections / requests when under load, and have
// clients retry with backoff. This also gives an opportunity to retry with a
// different node. Not everyone gets serviced but quality of service stays consistent.
// Can be set with https://pkg.go.dev/google.golang.org/grpc#NumStreamWorkers
//
// Closing unresponsive connections
//
// * Connection timeout (https://pkg.go.dev/context#WithTimeout) or
// (https://pkg.go.dev/context#WithCancel) can be set via context.Context.
// * There is a long standing ticket for a per-write/per-read timeout instead of a per
// call timeout (https://issues.apache.org/jira/browse/ARROW-6062), but this is not
// (easily) possible to implement with the blocking gRPC API. For now one can also do
// something like set up a background thread that calls cancel() on a timer and have
// the main thread reset the timer every time a write operation completes successfully
// (that means one needs to use to_batches() + write_batch and not write_table).


package flight
1 change: 1 addition & 0 deletions go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PK
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13 h1:fVcFKWvrslecOb/tg+Cc05dkeYx540o0FuFt3nUVDoE=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
Expand Down
2 changes: 1 addition & 1 deletion go/parquet/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// Package parquet provides an implementation of Apache Parquet for Go.
//
// Apache Parquet is an open-source columnar data storage format using the record
// shredding and assembly algorithm to accomodate complex data structures which
// shredding and assembly algorithm to accommodate complex data structures which
// can then be used to efficiently store the data.
//
// This implementation is a native go implementation for reading and writing the
Expand Down
2 changes: 2 additions & 0 deletions r/vignettes/flight.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,5 @@ client %>%
Because `flight_get()` returns an Arrow data structure, you can directly pipe
its result into a [dplyr](https://dplyr.tidyverse.org/) workflow.
See `vignette("dataset", package = "arrow")` for more information on working with Arrow objects via a dplyr interface.

# [Flight best practices](../cpp/flight.html#best-practices)