Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subquery cache & friends #21888

Closed
wants to merge 13 commits into from
Closed

Conversation

sopel39
Copy link
Member

@sopel39 sopel39 commented May 9, 2024

Implement subquery cache for Hive/Iceberg/Delta

Subquery cache is a lightweight mechanism for caching
source stage computations. It works across queries, but
also within a query if similar subqueries are identified.

Subquery cache works with both streaming and FTE mode. Cache
results are never stalled since data is cached per split. Dedicated
"cache splits ids" include create time and change set
(in case of Delta/Iceberg).

Subquery cache works as follows:
1. During planning, subqueries eligible for caching
   are identified. If there are similar subqueries within
   a query, then common subplan is extracted.
2. Query plan is rewritten using caching plan alternatives
   (fallback to original subquery, cache data, load from cache)
3. SPI PlanSignatures are computed for each cached subquery
4. Splits are scheduled deterministically on nodes based on (PlanSignature, SplitId) pair
5. On the worker cache plugin (currently only memory based) will determine
   if cached data is available for a given split

@cla-bot cla-bot bot added the cla-signed label May 9, 2024
@github-actions github-actions bot added iceberg Iceberg connector delta-lake Delta Lake connector hive Hive connector labels May 9, 2024
@sopel39 sopel39 force-pushed the ks/subquery_cache branch 2 times, most recently from 9f4aa11 to ad12339 Compare May 9, 2024 16:10
@sopel39 sopel39 force-pushed the ks/subquery_cache branch 5 times, most recently from c270355 to a72b5df Compare May 15, 2024 12:16
@sopel39 sopel39 force-pushed the ks/subquery_cache branch from a72b5df to 4f474e2 Compare May 21, 2024 14:55
@github-actions github-actions bot added the ui Web UI label May 21, 2024
@sopel39 sopel39 force-pushed the ks/subquery_cache branch 4 times, most recently from f771ae0 to a0ab7fc Compare May 22, 2024 08:25
@sopel39 sopel39 marked this pull request as ready for review May 22, 2024 08:25
@sopel39 sopel39 changed the title WIP: Subquery cache & friends Subquery cache & friends May 22, 2024
@sopel39 sopel39 force-pushed the ks/subquery_cache branch 5 times, most recently from 865c615 to 74302ec Compare May 23, 2024 12:38
@deigote
Copy link
Member

deigote commented May 27, 2024

Hi 👋🏽 maybe a dumb question, but from subquery cache for Hive/Iceberg/Delta I'm not clear if this is about a subquery cache for the Hive/Iceberg/Delta connectors, or rather a cache for any connector that uses Hive/Iceberg/Delta under the hood. I'm hoping the latter but it'd be great if you could clarify 🙏🏽 !

@sopel39
Copy link
Member Author

sopel39 commented May 27, 2024

or rather a cache for any connector that uses Hive/Iceberg/Delta under the hood.

@deigote I'm not sure what you mean by any connector that uses Hive/Iceberg/Delta under the hood. However, this PR makes subquery cache a 1st class citizen, where source of data can be from any connector as long as connector implements getCacheTableId, getCacheColumnId, getCacheSplitId

@sopel39
Copy link
Member Author

sopel39 commented May 29, 2024

Removed dynamic row filtering from PR as it will be handled separately (#22175 (comment))

@kekwan
Copy link
Contributor

kekwan commented Jun 6, 2024

Looking forward to this. Would this work also solve CTE #10?

@deigote
Copy link
Member

deigote commented Jun 6, 2024

@kekwan the way I understood it, it wouldn't "solve" it but it'd contribute to making it a much less severe issue. The CTEs would still execute twice, but their results would be cached on quite a low level. Hopefully the cache hit ratio would be very high but I'm guessing it'd depend on how busy the workers are (I'm assuming the busier they are the more cache evictions).

@sopel39 sopel39 force-pushed the ks/subquery_cache branch from bc4d54b to 9e3e422 Compare June 11, 2024 13:03
lukasz-stec and others added 13 commits June 11, 2024 15:03
ChooseAlternativeNode defines alternative sub-plans that can be used
to execute given part of the query.
The actual sub-plan is then chosen per split during task execution.
Alternative sub-plans cannot span multiple stages and are only supported
for source stages.

Co-authored-by: Assaf Bern <[email protected]>
These methods are required by subquery cache to describe
split data for cache key purpose.

ConnectorPageSourceProvider#getUnenforcedPredicate
is used to describe what unenforced predicate will be
applied on split data.

ConnectorPageSourceProvider#prunePredicate is used
to simplify filter predicates on per split bases
(e.g. removing paritioning predicates that fully
contain split data)

Co-authored-by: Kamil Endruszkiewicz <[email protected]>
Co-authored-by: radek <[email protected]>
CacheManager is a set of SPI classes for implementing
split level cache storage.

MemoryCacheManager is a high-performance implementation of
CacheManager that keeps cached data in revocable memory.
Cache table id together with split id and column id represent
rows produced by ConnectorPageSource for a given split.

Cache ids can also be used to canonicalise query plans
for the purpouse of comparison or cache key generation.

This commit implements cache ids for Hive, Iceberg, Delta and TPCH
connectors.

Co-authored-by: Kamil Endruszkiewicz <[email protected]>
Co-authored-by: radek <[email protected]>
Co-authored-by: lukasz-stec <[email protected]>
Cache hit rate depend on deterministic split generation.
Hive connector has a concept of "initial splits" which
are smaller and there is a limited of them.
Therefore, if deterministic splits are
required, then initial splits must be disabled because
Hive split generation doesn't have guaranteed ordering.
Dynamic filter id might be registered by both local join
and as coming from coordinator.
CanonicalSubplanExtractor creates a canonical
representation of a subplan using cache ids
provided by the connector. Canonical subplans
are used to compare plans against each other
and enable extracting of common subplans.

Co-authored-by: Kamil Endruszkiewicz <[email protected]>
Subquery cache is a lightweight mechanism for caching
source stage computations. It works across queries, but
also within a query if similar subqueries are identified.

Subquery cache works with both streaming and FTE mode. Cache
results are never stalled since data is cached per split. Dedicated
"cache splits ids" include create time and change set
(in case of Delta/Iceberg).

Subquery cache works as follows:
1. During planning, subqueries eligible for caching
   are identified. If there are similar subqueries within
   a query, then common subplan is extracted.
2. Query plan is rewritten using caching plan alternatives
   (fallback to original subquery, cache data, load from cache)
3. SPI PlanSignatures are computed for each cached subquery
4. Splits are scheduled deterministically on nodes based on (PlanSignature, SplitId) pair
5. On the worker cache plugin (currently only memory based) will determine
   if cached data is available for a given split

Co-authored-by: Kamil Endruszkiewicz <[email protected]>
Co-authored-by: radek <[email protected]>
Co-authored-by: lukasz-stec <[email protected]>
Co-authored-by: Raunaq Morarka <[email protected]>
@sopel39
Copy link
Member Author

sopel39 commented Jun 11, 2024

rebased after #22190

@sopel39 sopel39 force-pushed the ks/subquery_cache branch from 9e3e422 to a2aa506 Compare June 11, 2024 13:13
Copy link
Member

@martint martint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments. Still reviewing.

@@ -28,7 +29,7 @@ public interface DriverFactory

OptionalInt getDriverInstances();

Driver createDriver(DriverContext driverContext);
Driver createDriver(DriverContext driverContext, Optional<ScheduledSplit> split);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation and purpose for this new argument?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added to support alternative plans for the source stage. The alternative in this context is a concrete list of operators (a Driver instance) chosen based on the split.

Copy link
Member Author

@sopel39 sopel39 Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martint it's also required so that we can make alternative cache decision based on a split. The decisions are:

  • read from cache
  • cache data
  • fallback to original plan

Without split one cannot make that decision.

{
private final List<PlanNode> alternatives;

private final FilteredTableScan originalTableScan;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this for? What does "original" mean in this context (e.g. what if the plan is formulated with a set of alternatives from the get go?)

Also, this seems overly specific. What if the original plan had operations other than a table scan and filter?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was named originalTableScan because ChooseAlternativeNode is ATM created for some existing original sub-plan. The reason we have it though is different. Alternatives work only on a source stage level and are chosen based on a split. This means we need a single source of splits for the ChooseAlternativeNode and originalTableScan is exactly that. The filter part is needed to support dynamic filters at the split source level.

Copy link
Member Author

@sopel39 sopel39 Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would slightly rephase what @lukasz-stec said that this is the TableHandle that is used to enumerate splits that are later used to choose alternative (either connector or caching alternative).

Without that how would you know which TableHandle use to create split source? It also makes sense to use the same TableHandle for split enumeration that was used to enumerate subplan alternatives.

Technically alternatives don't even need to have TableScan inside. You could be choosing between static alternatives (ValueNodes) for a given split.

ConnectorSession session,
ConnectorSplit split,
ConnectorTableHandle table,
TupleDomain<ColumnHandle> dynamicFilter)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the argument named "dynamicFilter"? Rename it to "constraint"

Copy link
Member Author

@sopel39 sopel39 Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I though about it, but I think it's less confusing when the arg is actually named dynamic filter. We could pass DynamicFilter itself, but I think it's overkill.

Some connectors won't simplify dynamic filters because they will use them for index lookups.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the point of view of this method, it doesn't care whether that tuple domain comes from a dynamic filter, does it? That's the caller's choice.

@@ -13,6 +13,8 @@
*/
package io.trino.spi.connector;

import io.trino.spi.predicate.TupleDomain;

import java.util.List;

public interface ConnectorPageSourceProvider
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new methods seem misplaced in this class. Why are they associated with the PageSourceProvider (and not RecordSetProvider? The should really live outside of either of those, as they have nothing to do with a data stream itself. They are more related to split management.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and not RecordSetProvider

RecordSetProviders usually don't have granular splits compared to lakehouse connectors and they don't do opportunistic filtering, hence the methods here are less useful for records. ConnectorRecordSetProvider also doesn't accept dynamic filter as argument. For RecordSetProviders these methods could probably be NOOPs

as they have nothing to do with a data stream itself. They are more related to split management.

Actually it's quite related. Lakehouse connectors use getUnenforcedPredicate (which in turn uses prunePredicate) when creating page source. They've always done that albeit it was not formalized. Now it's formalized and exposed to engine as additional per-split metainfo. It's important for correctness too since cache needs to know what opportunistic predicate was used to filter stream data.

I was thinking about something like ConnectorSplitInfoProvider, but it would have to be hooked to PageSourceProviders internally anyway (and probably rooted at ConnectorPageSourceProviderFactory). Hence, I think the current location of these methods is probably optimal.

Comment on lines +48 to +50
* Prunes columns from predicate that are not effective in filtering split data.
* If split is completely filtered out by given predicate, then this
* method must return {@link TupleDomain#none}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a weird mix of column pruning and constraint pruning (i..e, a column-wise intersection and a all-or-none intersection of the constraint). It would be more general and easier to reason about if it were a pure intersection of the given tupledomain with the constraint guaranteed by the split.

I can't tell yet what's the right abstraction as there are no uses of it up to this commit -- I will revisit once I review the rest of the PR.

Copy link
Member Author

@sopel39 sopel39 Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be more general and easier to reason about if it were a pure intersection of the given tupledomain with the constraint guaranteed by the split.

I mostly agree. However, in case of bucketing you cannot generate TupleDomain for bucketing columns, yet you are still able to perform filtering.
The same applies to Iceberg transformation partitioning.

Generally, determining "if contains" relationship is simpler computationally than actually getting all intersecting values. In this case we actually don't need to know actual intersection, hence this method definition.

Copy link
Member

@martint martint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more comments and questions as I continue perusing the code.

ConnectorSession session,
ConnectorTableHandle tableHandle,
DynamicFilter dynamicFilter,
boolean preferDeterministicSplits,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked about this offline a few weeks ago. Instead of adding this, we should revisit whether the adaptive split logic is still useful and remove it if not. It's almost certainly not useful for data formats such as ORC and Parquet that cannot be split across row group boundaries.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created #22787.

initial splits could still be useful for ORC/Parquet if row groups are sufficiently small, but TBH I haven't seen any issues caused by not having initial splits.

cc @assaf2

Comment on lines +49 to +50
* applied on output of `cachedSplitA`. Before serialization as a cache key, predicate
* needs to be normalized using {@code io.trino.plugin.base.cache.CacheUtils#normalizeTupleDomain(TupleDomain)}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this enforced? Who is responsible for the normalization? What happens if it's not normalized?

Copy link
Member

@assaf2 assaf2 Jul 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this enforced?

It's not enforced.

Who is responsible for the normalization?

The concrete CacheManager if it would like to serialize these TupleDomains and use the result as part of its cache key (for instance, MemoryCacheManager doesn't do that)

What happens if it's not normalized?

Equal TupleDomains might not be serialized in the same way. A CacheManager that serializes these predicates without normalization into its cache key, might experience unnecessary cache misses.

* subset of `cachedSplitA.predicate`. To do so, `cachedSplitB.predicate` must be
* applied on output of `cachedSplitA`. Before serialization as a cache key, predicate
* needs to be normalized using {@code io.trino.plugin.base.cache.CacheUtils#normalizeTupleDomain(TupleDomain)}.
* @param unenforcedPredicate Unenforced (best-effort) predicate that should be applied on cached rows.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear what this predicate is for.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

predicate must be applied by CacheManager while unenforcedPredicate can be applied by CacheManager (not mandatory, as the engine will apply it as well on the CacheManager's result.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's explained in next sentence:

Output of `cachedSplitA` can be used to derive output of matching `cachedSplitB` as long as `cachedSplitB.unenforcedPredicate` is a subset of `cachedSplitA.unenforcedPredicate`

So if pages were cached with unenforcedPredicate=all, but later engine asks for pages with unenforcedPredicate=1, then CacheManager is free to return unenforcedPredicate=all pages.
CacheManager might also apply unenforcedPredicate=1 filter on cached pages if it knows how to do that efficiently.

unenforcedPredicate also gives us flexibility if we don't know which columns are indexed by CacheManager. Let's say you have col1=10 and col2=20 and only col1 is indexed. Then by keeping col1=10 and col2=20 filter above LoadFromCache operator, we can ask CacheManager for pages with unenforced predicate. This prevents us from having multiple alternatives where only subset or entire filter is enforced (we would need to have alternatives for col1=10, col2=20 and true)

import static io.airlift.slice.SizeOf.instanceSize;
import static java.util.Objects.requireNonNull;

public class CacheSplitId
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a record

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It similar to other classes like:

PlanNodeId
TransactionId
...

etc etc.

These classes don't really expose id as separate getter. They just expose toString method. So keeping them as classes most likely makes sense and is less confusing than record. Think about it as "named type" kind of classes.

import static io.airlift.slice.SizeOf.instanceSize;
import static java.util.Objects.requireNonNull;

public class SignatureKey
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a record

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same arg as for the CacheXXID classes. These are more like "named types".

* {@link ColumnHandle}). {@link CacheColumnId} for complex projections will use canonicalized and formatted
* version of projection expression.
*/
public class CanonicalSubplan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the relationship between CanonicalSubplan and PlanSignature?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We first extract CanonicalSubplans, then we try to identify common subqueries, so several CanonicalSubplans might be replaced with a single common subquery + an adaptation for each. PlanSignature is calculated based on the common subquery.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CanonicalSubplan can be used to derive PlanSignature, but CanonicalSubplan can also be used to:

  • reconstruct original subplan
  • find common subplan between multiple CanonicalSubplans

* {@link ColumnHandle}). {@link CacheColumnId} for complex projections will use canonicalized and formatted
* version of projection expression.
*/
public class CanonicalSubplan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a dedicated class for this? Why isn't a regular PlanNode tree sufficient to represent a canonical structure (after canonicalization, of course)?

Copy link
Member

@assaf2 assaf2 Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This object contains more information than just the plan. For example, it contains keyChain that is later on used to determine if 2 (or more) canonicalSubplans might be represented by the same subquery (+ adaptation for each)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't a regular PlanNode tree sufficient to represent a canonical structure

PlanNode itself will never be fully canonical (even after canonicalisation of symbols).
Scan, ScanFilter, ScanProject, ScanFilterProject are represented by single CanonicalSubplan node, so that common subplan can be extracted.

The same goes for Filter, FilterProject and Project. They can be represented by single CanonicalSubplan node.

When joins are supported, join tree will most likely be flattened to single CanonicalSubplan.

I would also mention that extracting common subplans directly from PlanNodes in single step would be extremely difficult, so splitting this into two stages makes it much more manageable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PlanNode itself will never be fully canonical (even after canonicalisation of symbols).

Why not? Can you give me an example? At the end of the day, canonicalization is about establishing a set of conventions.

Copy link
Member Author

@sopel39 sopel39 Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The examples are in the comment above. Essentially one needs to canonicalize group of nodes rather than a single PlanNode itself:

  • Scan Filter Project are order invariant mostly with regards to Filter and Project (even though we sometimes don't push filter through project due to perf reasons)
  • Combinations of Scan, ScanProject, ScanFilter, ScanFilterProject, ... can be canonicalized to single CanonicalSubplan node, so that we can find common subplan and adaptations between similar, canonical input subplans.
  • Same as above, but for Filter, FilterProject, ProjectFilter, ...
  • multi-joins can be canonicalized into single multi-join CanonicalSubplan node since join order is irrelevant.

At the end of the day, canonicalization is about establishing a set of conventions.

The conventions go beyond single PlanNode, but rather address group of PlanNodes

Comment on lines +24 to +32
/**
* Returns a table identifier for the purpose of caching with {@link CacheManager}.
* {@link CacheTableId} together with {@link CacheSplitId} and {@link CacheColumnId}s represents
* rows produced by {@link ConnectorPageSource} for a given split. Local table properties
* (e.g. rows order) must be part of {@link CacheTableId} if they are present. List of selected
* columns should not be part of {@link CacheTableId}. {@link CacheTableId} should not contain
* elements that can be derived from {@link CacheSplitId} such as predicate on partition column
* which can filter splits entirely.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too complicated. Why are all those conditions required?

What does it mean for "List of selected columns should not be part of ...", especially in the case of a ConnectorTableHandle representing a pushed-down subplan?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too complicated. Why are all those conditions required?

Local table properties (e.g. rows order) must be part of {@link CacheTableId} - Otherwise, we'll get a correctness error

{@link CacheTableId} should not contain elements that can be derived from {@link CacheSplitId} such as predicate on partition column which can filter splits entirely - This is to maximize cache hit rate

What does it mean for "List of selected columns should not be part of ...", especially in the case of a ConnectorTableHandle representing a pushed-down subplan?

If 2 ConnectorTableHandles are "same" but contain different sets of selected columns, we can create a common subquery with all those columns, so we want them to have the same CacheTableId.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rephased this Javadoc. Essentially it boils down to:

  • the cache is column based, hence we need ColumnId
  • the cache is split based, hence we need SplitId
  • we obviously also need TableId.

TableId should contain elements (that describe data) that cannot be directly or indirectly derived from ColumnId or SplitId.

* are eligible for caching with {@link CacheManager}. Connector should convert provided
* {@link ConnectorTableHandle} into canonical one by pruning of every non-canonical field.
*/
ConnectorTableHandle getCanonicalTableHandle(ConnectorTableHandle handle);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand what this is. ConnectorTableHandle represents an opaque reference to a table that can be carried in query plans, transmitted to workers, etc. What is a "property of a ConnectorTableHandle"? What does it mean for it to "affect final query results when underlying table is queried"?

Also, what's the purpose of this, given that there's getCacheTableId above?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is a "property of a ConnectorTableHandle"? What does it mean for it to "affect final query results when underlying table is queried"?

For example, for HiveTableHandle, compactEffectivePredicate is a "property of a ConnectorTableHandle" and is replaced with TupleDomain.all(), because such a change doesn't affect the query result, as the engine also applies this predicate. This way we maximize the cache hit rate.

Also, what's the purpose of this, given that there's getCacheTableId above?

The canonical table handle is passed to getCacheTableId as an argument.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand what this is. ConnectorTableHandle represents an opaque reference to a table that can be carried in query plans, transmitted to workers, etc. What is a "property of a ConnectorTableHandle"? What does it mean for it to "affect final query results when underlying table is queried"?

Connector will rembember "unenforced predicate" in table handle, if you have two subqueries:

scan(tab1) => filter(x=1)
scan(tab1) => filter(x=2)

then you should be able to extract "common subquery".

However, because "unenforced predicate" in table handle affects produced data, then cache table ids would be different.

Hence, getCanonicalTableHandle erases "unenforced predicate" in table handle, which makes cache table ids matchig again.

"Unenforced predicate" is pushed again after common subplan is constructed

*/
private final List<Type> columnsTypes;

private volatile int hashCode;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is volatile necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's neither good or bad. In this case I prefer to keep it since computation of hashCode might be expensive for signature, hence I would rather do really once.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unnecessary. You can get approximately the same semantics (minus the impact of a volatile read) by writing to a non-volatile int field, since there's no tearing for that type and the hashcode is deterministic (e.g., see how Java's String class does it).

@@ -273,6 +286,20 @@ public Plan plan(Analysis analysis, Stage stage, boolean collectPlanStatistics)
}
}

if (cacheEnabled) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to add a condition like stage.ordinal() >= OPTIMIZED.ordinal() here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would mean validation of plan when only optimizaion was requires, right? I think it's correct atm

@sopel39
Copy link
Member Author

sopel39 commented Jul 26, 2024

I've applied comments and answered questions. Since I don't have access to starburstdata repo, I've opened a new PR here: #22827

@sopel39 sopel39 closed this Jul 26, 2024
@sopel39 sopel39 reopened this Jul 26, 2024
@sopel39 sopel39 closed this Jul 26, 2024
@assaf2
Copy link
Member

assaf2 commented Jul 28, 2024

I've applied comments and answered questions. Since I don't have access to starburstdata repo, I've opened a new PR here: #21888

This link is recursive, the correct one is probably #22827

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed delta-lake Delta Lake connector hive Hive connector iceberg Iceberg connector performance ui Web UI
Development

Successfully merging this pull request may close these issues.

8 participants