Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate Iceberg NDV with a Theta sketch #14290

Merged
merged 11 commits into from
Sep 29, 2022

Conversation

findepi
Copy link
Member

@findepi findepi commented Sep 26, 2022

Iceberg specification defines the Apache DataSketches's Theta as the
common data sketch for keeping track of distinct values in a table.

This change replaces the use of HLL within Iceberg's ANALYZE with Theta
sketch. The follow-up work is to store the serialized compact form of
the sketch inside the Iceberg Puffin statistics file, but this requires
Iceberg API changes, which are still in progress (e.g. apache/iceberg#4741, apache/iceberg#5794).

@findepi
Copy link
Member Author

findepi commented Sep 26, 2022

CI #14239 (pinot failure considered unrelated)

@martint
Copy link
Member

martint commented Sep 26, 2022

Is there any performance difference from using that implementation? I realize we don't have choice, even if the performance is worse, but I'm just curious.

@findepi
Copy link
Member Author

findepi commented Sep 27, 2022

Is there any performance difference from using that implementation? I realize we don't have choice, even if the performance is worse, but I'm just curious.

i don't know
Since we have no choice really, i didn't run any benchmarks.

From user perspective, this is an experimental and opt-in feature still, so perf regressions are fine.

@findepi
Copy link
Member Author

findepi commented Sep 27, 2022

CI #13946 but -- hold your breath -- other faulttolerant failure is actually related.

@findepi findepi force-pushed the findepi/iceberg-theta branch from c481861 to b64bd26 Compare September 27, 2022 08:14
@findepi
Copy link
Member Author

findepi commented Sep 27, 2022

The faulttolerant tests expected that ANALYZE results are stable, and with the new implementation they are not.
While it's nice that they are stable, it shouldn't be a hard requirement.
Added Allow approximations when testing ANALYZE with fault-tolerance prep commit to deal with this.

@findepi findepi force-pushed the findepi/iceberg-theta branch 3 times, most recently from 916783a to 148b0ee Compare September 27, 2022 13:53
@findepi
Copy link
Member Author

findepi commented Sep 28, 2022

CI -- TestRoles failure fixed in #14323

@findepi
Copy link
Member Author

findepi commented Sep 28, 2022

I discussed the problem of lack of stability with David. It's seems fine to roll with current implementation and improve stability (eg by calculating HLL as well) later, if needed.

Reduce code noise by moving Symbol -> SymbolReference converstion to the
construction method.
A hypothetical new format value should automatically get the same
coverage (same number of assertions) as all the existing ones.
Before the change, some assertions were run conditionally only if format
is one of the known ones, which wasn't future-proof.
The test is analyzing _a table_ to calculate statistics.
ANALYZE can compute data size and number distinct values (NDV) using
approximations and the result is not guaranteed to be stable. When
testing ANALYZE with fault tolerance, allow the results to be differ
between test run and the control run by some small percentage.
@findepi findepi force-pushed the findepi/iceberg-theta branch from b989476 to 81cb1bd Compare September 28, 2022 15:39
@findepi
Copy link
Member Author

findepi commented Sep 28, 2022

(rebased to resolve conflict; build was green https://github.com/trinodb/trino/actions/runs/3142623927/jobs/5106437131)


public static CompactSketch deserialize(Block block, int index)
{
checkArgument(!block.isNull(index), "Value is null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks asymmetrical from the serialize method, it writes a null Block if there is no sketch set, but here you throw.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-static deserialize(Block block, int index, DataSketchState state) is a counter part to non-static serialize(DataSketchState state, BlockBuilder out)

This method is a helper only used

  • in deserialize(Block block, int index, DataSketchState state), conditionally after checking for non-null
  • in IcebergMetadata to pull final results (where it's guaranteed to be non-null, by means of the @CombineFunction

@TypeParameter("T")
public static void input(@TypeParameter("T") Type type, @AggregationState DataSketchState state, @BlockPosition @SqlType("T") Block block, @BlockIndex int index)
{
verify(!block.isNull(index), "Input function is not expected to be called on a NULL input");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ensured by the engine?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think SQL spec says that NULL values do not contribute to aggregations and engine enforces that, but whole truth may be more complex :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's enforced by the engine.
i thought it's still nice to check for null before reading some dummy/stupid value, so that code is self-descriptive.

@findepi
Copy link
Member Author

findepi commented Sep 29, 2022

For reference, I run a small experiment comparing approx_distinct and Theta-based implementation.

For that, I've changed the output function of the $iceberg_theta_stat aggregation to produce just a number

-    @OutputFunction(StandardTypes.VARBINARY)
+    @OutputFunction(StandardTypes.DOUBLE)
     public static void output(@AggregationState DataSketchState state, BlockBuilder out)
     {
         if (state.getUpdateSketch() == null && state.getCompactSketch() == null) {
             getOrCreateUpdateSketch(state);
         }
-        DataSketchStateSerializer.serializeToVarbinary(state, out);
+        // DataSketchStateSerializer.serializeToVarbinary(state, out);
+
+        checkArgument(state.getUpdateSketch() == null || state.getCompactSketch() == null, "A state must not have both transient accumulator and combined form set");
+        CompactSketch compactSketch = Optional.ofNullable(state.getCompactSketch())
+                .orElseGet(() -> state.getUpdateSketch().compact());
+        DoubleType.DOUBLE.writeDouble(out, compactSketch.getEstimate());
     }

i was running on a 3-node cluster, should the distribution matter

SELECT TABLE system.runtime.nodes;
               node_id                |        http_uri        | node_version | coordinator | state
--------------------------------------+------------------------+--------------+-------------+--------
 4e014c23-cf1f-46eb-afba-a35edc4753af | http://127.0.0.1:8080  | testversion  | true        | active
 7fc307c0-ff55-4fec-b88f-1ef5c76fa328 | http://127.0.0.1:65317 | testversion  | false       | active
 2a356cfe-3573-4622-80b5-9cafbac1871e | http://127.0.0.1:65312 | testversion  | false       | active

Test query

SELECT 
    format(
        'SELECT
            column_name, exact_count,
            approx_count, round(abs(approx_count-exact_count)*1e2/exact_count, 1) approx_err,
            theta_count, round(abs(theta_count-exact_count)*1e2/exact_count, 1) theta_err
        FROM ( SELECT %s FROM orders)
        CROSS JOIN UNNEST(ARRAY[%s]) AS _(column_name, exact_count, approx_count, theta_count)
        ;',
        LISTAGG(
            format('count(DISTINCT %1$s) AS "%1$s_exact", approx_distinct(%1$s) AS "%1$s_approx", "$iceberg_theta_stat"(%1$s) AS "%1$s_theta"', column_name),
            U&',\000a') WITHIN GROUP (ORDER BY ordinal_position),
        LISTAGG(
            format('(''%1$s'', "%1$s_exact", "%1$s_approx", "%1$s_theta")', column_name),
            U&',\000a') WITHIN GROUP (ORDER BY ordinal_position)
        )
FROM information_schema.columns
WHERE table_schema = CURRENT_SCHEMA
AND table_name = 'orders';

Results

  column_name  | exact_count | approx_count | approx_err |    theta_count     | theta_err
---------------+-------------+--------------+------------+--------------------+-----------
 orderkey      |       15000 |        15432 |        2.9 | 14836.891287217333 |       1.1
 custkey       |        1000 |          990 |        1.0 |             1000.0 |       0.0
 orderstatus   |           3 |            3 |        0.0 |                3.0 |       0.0
 totalprice    |       14996 |        15167 |        1.1 | 14864.332636540983 |       0.9
 orderdate     |        2401 |         2443 |        1.7 |             2401.0 |       0.0
 orderpriority |           5 |            5 |        0.0 |                5.0 |       0.0
 clerk         |        1000 |          995 |        0.5 |             1000.0 |       0.0
 shippriority  |           1 |            1 |        0.0 |                1.0 |       0.0
 comment       |       14995 |        15011 |        0.1 | 14762.894505331353 |       1.5

Same experiment, this time for sf100

  column_name  | exact_count | approx_count | approx_err |     theta_count      | theta_err
---------------+-------------+--------------+------------+----------------------+-----------
 orderkey      |    15000000 |     15053605 |        0.4 | 1.4902343040861849E7 |       0.7
 custkey       |      999982 |      1014186 |        1.4 |   1020138.1597738483 |       2.0
 orderstatus   |           3 |            3 |        0.0 |                  3.0 |       0.0
 totalprice    |    11944103 |     12476914 |        4.5 | 1.1932049511631161E7 |       0.1
 orderdate     |        2406 |         2449 |        1.8 |               2406.0 |       0.0
 orderpriority |           5 |            5 |        0.0 |                  5.0 |       0.0
 clerk         |       10000 |         9806 |        1.9 |    10042.94228645038 |       0.4
 shippriority  |           1 |            1 |        0.0 |                  1.0 |       0.0
 comment       |    14097230 |     13839831 |        1.8 | 1.3932758752727821E7 |       1.2

Observations:

  • for low number of distinct values, Theta sketch produces exact result (not surprisingly -- it tracks certain number of hashed values, so it's exact for NDV numbers falling under its internal threshold)
  • otherwise error is comparable to one of approx_distinct, sometimes smaller and sometimes bigger

- use applicable static imports
- cast to primitive directly (same behavior, but looks nicer)
Move `ExpressionConverter.getIcebergLiteralValue` to `IcebergTypes` for
re-use. Name it `convertTrinoValueToIceberg` to be consistent with
existing `convertIcebergValueToTrino` method in that class.
- suppress intentional "redundant casts"
- exact division
- cast to primitive directly (looks nicer, same meaning)
- remove redundant `instanceof`
- put decimals together with other numbers
Iceberg specification defines the Apache DataSketches's Theta as the
common data sketch for keeping track of distinct values in a table.

This change replaces the use of HLL within Iceberg's ANALYZE with Theta
sketch. The follow-up work is to store the serialized compact form of
the sketch inside the Iceberg Puffin statistics file, but this requires
Iceberg API changes, which are still in progress.

A side effect of this change is that complex types (array, map, row) can
no longer be analyzed: Trino can calculate a HyperLogLog for these
types, while Iceberg does not specify binary representation for these
types, which is required to feed data into a Theta sketch. However,
NDV for complex types is not as useful as it is for scalar types, so
this shouldn't matter in practice.
@findepi findepi force-pushed the findepi/iceberg-theta branch from 81cb1bd to 90fb612 Compare September 29, 2022 12:09
@findepi
Copy link
Member Author

findepi commented Sep 29, 2022

Added some code reuse in response to #14290 (comment), no other changes.

@findepi findepi merged commit da1de5e into trinodb:master Sep 29, 2022
@findepi findepi deleted the findepi/iceberg-theta branch September 29, 2022 20:29
@findepi findepi added the no-release-notes This pull request does not require release notes entry label Sep 29, 2022
@github-actions github-actions bot added this to the 399 milestone Sep 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed no-release-notes This pull request does not require release notes entry
Development

Successfully merging this pull request may close these issues.

6 participants