Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.2: Changefeed performance improvements #91002

Merged
merged 13 commits into from
Nov 4, 2022

Conversation

miretskiy
Copy link
Contributor

See commits for details.

"Mega merge" of changefeed performance related changes around
backfill/export performance.

  • Ability to run parallel encoders -- controlled via the changefeed.event_consumer_workers setting
  • Support for asynchronous flush to storage changefeed.cloudstorage.async_flush.enabled
  • Switch gzip to use pgzip library; Support for lz4 compression codec
  • Uniformly distribute work when performing initial scan (changefeed.balance_range_distribution.enable)
  • Improve JSON encoder performance.

Release note (performance fixes): Increase changefeed export throughput up to 10x.
Release justification: Significant performance improvements when exporting large scale data sets using changefeeds.

Yevgeniy Miretskiy and others added 13 commits October 31, 2022 13:10
Add a micro benchmark for `tree.AsJSON` method.

Release note: None
Release justification: test only change
Improve performance of `tree.AsJSON` method.

These improvements are important for any query that produces
large number of JSON objects, as well as to changefeeds, which
rely on this function when producing JSON encoded feed.

Most of the changes revolved around modifying underlying types
(s.a. date/timestamp types, box2d, etc) to favor using functions
that append to bytes buffer, instead of relying on slower
functions, such as `fmt.Sprintf`.  The conversion
performance improved around 5-10% for most of the types, and
as high as 50% for time types:

```
Benchmark            old t/op      new t/op    delta
AsJSON/box2d-10    578ns ± 3%    414ns ± 2%   -28.49%  (p=0.000 n=10+9)
AsJSON/box2d[]-10  1.64µs ± 3%   1.19µs ± 4%  -27.14%  (p=0.000 n=10+10)
AsJSON/time-10     232ns ± 2%    103ns ± 1%   -55.61%  (p=0.000 n=10+10)
AsJSON/time[]-10   687ns ± 4%    342ns ± 4%   -50.17%  (p=0.000 n=10+10)
```

Note: Some types in the local benchmark show slight slow down in speed.
No changes were made in those types, and in general, the encoding speed
of these types might be too fast to reliable detect changes:
```
Benchmark            old t/op      new t/op       delta
AsJSON/bool[]-10    65.9ns ± 1%   67.7ns ± 2%    +2.79%  (p=0.001 n=8+9)
```

The emphasis was also placed on reducing allocations.
By relying more heavily on a pooled FmtCtx, which contains
bytes buffer, some conversions resulted in amortized
elimination of allocations (time):
```
Benchmark               old B/op      new t/op    delta
AsJSON/timestamp-10    42.1B ± 3%      0.0B      -100.00%  (p=0.000 n=10+10)
AsJSON/timestamp[]-10  174B ± 4%      60B ± 1%   -65.75%  (p=0.000 n=10+10)
```

Release Note: None
Release Justification: performance improvement
Add JSON encoder benchmark.

Release note: None
Release justification: test only change
Rewrite JSON encoder to improve its performance.

Prior to this change JSON encoder was very inefficient.
This inefficiency had multiple underlying reasons:
  * New Go map objects were constructed for each event.
  * Underlying json conversion functions had inefficiencies
    (tracked in cockroachdb#87968)
  * Conversion of Go maps to JSON incurs the cost
    of sorting the keys -- for each row. Sorting,
    particularly when rows are wide, has significant cost.
  * Each conversion to JSON allocated new array builder
    (to encode keys) and new object builder; that too has cost.
  * Underlying code structure, while attempting to reuse
    code when constructing different "envelope" formats,
    cause the code to be more inefficient.

This PR addresses all of the above.  In particular, since
a schema version for the table is guaranteeed to have
the same set of primary key and value columns, we can construct
JSON builders once.  The expensive sort operation can be performed
once per version; builders can be memoized and cached.

The performance impact is significant:
  * Key encoding speed up is 5-30%, depending on the number of primary
    keys.
  * Value encoding 30% - 60% faster (slowest being "wrapped" envelope
    with diff -- which effectively encodes 2x values)
  * Byte allocations per row reduces by over 70%, with the number
    of allocations reduced similarly.

Release note (enterprise change): Changefeed JSON encoder
performance improved by 50%.
Release justification: performance improvement
Make `span.Frontier` thread safe by default.

Release note: None
Previously, KV events were consumed and processed by changefeed
aggregator using a single, synchronous Go routine. This PR makes
it possible to run up to `changefeed.event_consumer_workers`
consumers to consume events concurrently. The cluster setting
`changefeed.event_consumer_worker_queue_size` is added to help
control the number of concurrent events we can have in flight.

Specifying `changefeed.event_consumer_workers=0` keeps existing
single threaded implementation.

Release note (enterprise change): This change adds the cluster setting
`changefeed.event_consumer_workers` which allows changefeeds to
process events concurrently.
Prior to this change, cloud storage sink trigger
file sized based flush whenever new row would
would push the file size beyond configured threshold.

This had the effect of singificantly reducing the throughput
whenever such event occured -- no additional events could
be added to cloud storage sink, while the previus flush was
active.

This is not necessary.  Cloud storage sink can trigger
file based flushes asynchronously.  The only requirement
is that if a real, non file based, flush arrives, or if we
need to emit resolved timestamps, then we must wait for
all of the active flush requests to complete.

In addition, because every event added to cloud sink has
associate allocation, which is released when file is written
out, performing flushes asynchronously is safe with respect
to memory usage and accounting.

Release note (enterprise change): Changefeeds, using cloud
storage sink, now have better throughput.
Release justification: performance fix
Expand the set of supported compression algorithms in changefeed.

A faster implementation of gzip algorithm is avaible, and is used
by default.  The gzip algorithm implementation can be reverted
to Go standard gzip implementation via the
`changefeed.fast_gzip.enabled` setting.

In addition, add support for compression files with zstd.

Release notes (enterprise change): Changefeed can emit files compressed
with zstd algorithm -- which provides good compression, and is much
faster than gzip.  In addition, a new, faster implementation of
gzip is used by default.
By default, changefeed distributes the work to nodes based
on which nodes are the lease holder for the ranges.
This makes sense since running rangefeed against local node
is more efficient.

In a cluster where ranges are almost uniformly assigned
to each node, running changefeed export is efficient:
all nodes are busy, until they are done.

KV server is responsible for making sure that the ranges
are more or less uniformly distributed across the cluster;
however, this determination is based on the set of all ranges
in the cluster, and not based on a particular table.

As a result, it is possible to have a table that does not
uniform distribution of its ranges across all the nodes.
When this happens, the changefeed export would take long
time due to the long tail: as each node completes its
set of assigned ranges, it idles until changefeed completes.

This PR introduces a change (controlled via
`changefeed.balance_range_distribution.enable` setting)
where the changefeed try to produce a more balanced
assignment, where each node is responsible for roughly
1/Nth of the work for the cluster of N nodes.

Release note (enterprise change): Changefeed exports are
up to 25% faster due to uniform work assignment.
Fix latent array avro encoding bug where previously allocated
memo array might contain 'nil' element, while the code assumed
that the element must always be a map.

Release note: none.
Previously, a default value of 8 was used for the kvevent parallel consumer.
The reason for this was that we observed performance improvements in a 15 node
32 VCPU cluster when we increased this parameter to 8. After 8, the
improvements were much smaller.

The issue with a default of 8 is that that on smaller machines, 8
workers can be too much overhead, especially since the work is CPU intensive.

This change updates the default to be runtime.NumCPU() >> 2 workers, which
aligns with using 8 workers on 32 VCPU machines.

Fixes cockroachdb#89589
Epic: none

Release note: None
To account for the overhead during the encoding stage,
we allocate more memory resources than needed
(`changefeed.event_memory_multiplier`).

Those resources are released when they are amitted
by the sink.  Some sinks, such as file based sinks, batch
many such events (to generate files of target size).
This batching, when combined with compression, could result
in a situation where maximum of allowed resources,
`changefeed.memory.per_changefeed_limit` are all batched,
making it impossible to ingest additional events without
forcing a sink flush.

The only way to avoid premature forced flush, is to increase
the memory limit for changefeed -- but doing so is not without
the costs (i.e. more events batched, more pressure on Go GC, etc).

This PR adjust event resources to match the final size of the
data that will be emitted into the sink -- that is,
we release the overhead back into the pool, once the event
processing is done.

Release note: none
When async flushing enabled, the following sequence of events is
possible (even if very unlikely):

* k@t1 is emitted, causing async flush to write file f1
* k@t2 is emitted, causing async flush to write file f2
* f2 is written out before f1.

In this unlikely scenario -- and the reason why it's unlikely is that we
have to generate megabytes of data to cause a flush, unless, file_size
parameter was pathologically small -- if a client were to read the
contents of the directory right after step 2, and then read directory
after step 3, the client will first observe k@t2, and then observe k@t1
-- which is a violation of ordering guarantees.

This PR fixes this issue by adopting a queue so that if there
is a pending flush in flight, the next flush is queued behind.

It is possible that this simple approach may result in throughput
decrease.  This situation will occur if the underlying
storage (s3 or gcp) cannot keep up with writing out data before
the next file comes in.  However, at this point, such
situation is unlikely for two reasons: one, the rest of
changefeed machinery must be able to generate one or more files
worth of data before the previous flush completes -- and this
task in of itself is not trivial, and two, changefeed must
have enough memory allocated to it so that pushback mechanism
does not trigger.  The above assumption was validated in reasonably
sized test -- i.e. the non-zero queue depth was never observed.
Nonetheless, this PR also adds a log message which may be helpful
to detect the situation when the sink might not keep up with the
incoming data rate.

A more complex solution -- for example, allow unlimited inflight
requests during backfill -- may be revisited later, if the above
assumption proven incorrect.

Fixes cockroachdb#89683

Release note: None.
@miretskiy miretskiy requested a review from a team October 31, 2022 17:18
@miretskiy miretskiy requested review from a team as code owners October 31, 2022 17:18
@miretskiy miretskiy requested review from HonoreDB and removed request for a team October 31, 2022 17:18
@blathers-crl
Copy link

blathers-crl bot commented Oct 31, 2022

Thanks for opening a backport.

Please check the backport criteria before merging:

  • Patches should only be created for serious issues or test-only changes.
  • Patches should not break backwards-compatibility.
  • Patches should change as little code as possible.
  • Patches should not change on-disk formats or node communication protocols.
  • Patches should not add new functionality.
  • Patches must not add, edit, or otherwise modify cluster versions; or add version gates.
If some of the basic criteria cannot be satisfied, ensure that the exceptional criteria are satisfied within.
  • There is a high priority need for the functionality that cannot wait until the next release and is difficult to address in another way.
  • The new functionality is additive-only and only runs for clusters which have specifically “opted in” to it (e.g. by a cluster setting).
  • New code is protected by a conditional check that is trivial to verify and ensures that it only runs for opt-in clusters.
  • The PM and TL on the team that owns the changed code have signed off that the change obeys the above rules.

Add a brief release justification to the body of your PR to justify this backport.

Some other things to consider:

  • What did we do to ensure that a user that doesn’t know & care about this backport, has no idea that it happened?
  • Will this work in a cluster of mixed patch versions? Did we test that?
  • If a user upgrades a patch version, uses this feature, and then downgrades, what happens?

@cockroach-teamcity
Copy link
Member

This change is Reviewable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants