-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hive cannot read ORC ACID table updated by Trino twice #8448
Hive cannot read ORC ACID table updated by Trino twice #8448
Conversation
Add "Fixes: issue_id" to PR desc. |
onTrino().executeQuery("INSERT INTO test_double_update VALUES(1, 'x');"); | ||
onTrino().executeQuery("INSERT INTO test_double_update VALUES(2, 'y');"); | ||
onTrino().executeQuery("UPDATE test_double_update SET column2 = 'xy1';"); | ||
onTrino().executeQuery("UPDATE test_double_update SET column2 = 'xy2';"); | ||
assertThat(onHive().executeQuery("SELECT * FROM test_double_update;")).containsOnly(row(1, "xy2"), row(2, "xy2")); | ||
onHive().executeQuery("DROP TABLE IF EXISTS test_double_update"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop ;
from query strings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we check if it reads correctly in both Trino and Hive ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, will do
plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java
Outdated
Show resolved
Hide resolved
@@ -75,7 +77,8 @@ public HiveSplit( | |||
@JsonProperty("bucketConversion") Optional<BucketConversion> bucketConversion, | |||
@JsonProperty("bucketValidation") Optional<BucketValidation> bucketValidation, | |||
@JsonProperty("s3SelectPushdownEnabled") boolean s3SelectPushdownEnabled, | |||
@JsonProperty("acidInfo") Optional<AcidInfo> acidInfo) | |||
@JsonProperty("acidInfo") Optional<AcidInfo> acidInfo, | |||
@JsonProperty("startingRowId") OptionalLong initialRowId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep JSON key and parameter name consistent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -362,6 +364,7 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) { | |||
splitBytes = internalSplit.getEnd() - internalSplit.getStart(); | |||
} | |||
|
|||
OptionalLong initialRowId = OptionalLong.of((Long.MAX_VALUE / getBufferedInternalSplitCount()) * currentSplitIndex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using getBufferedInternalSplitCount()
for slicing id space between splits is not correct. It is not how many splits will there be in total for table. But how many ware currently buffered for execution.
Splits are dynamically loaded in the background by BackgroundHiveSplitLoader
and value returned by getBufferedInternalSplitCount()
will change over time. It also decreases as splits are taken away from the queue for execution.
I do not think we can do any better than what @electrum suggested in #8268 (comment):
Allocate unique row IDs across writers by assigning ranges in the splits. For example, we could give each writer a large range of 2^42, which would allow both a huge number of rows and splits.
We need to statically decide how many splits we allow, and how many rows we allow for each split.
E.g. If we give each split 2^42 possible values we can have:
- 4,194,304 splits
- 4,398,046,511,104 rows per split
should be enough. Still we should control at runtime if we did not exceed either number of splits or number of rows generated for given split, and fail query nicely if that happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok thanks for the in depth analysis, I will rework this slightly.
d0d60ee
to
8bc3ee8
Compare
private static final long MAX_NUMBER_OF_SPLITS = 4194304; //we want to have 2^42 different ids per split so this is the maximum possible number of splits (2^64/2^42) | ||
private static final long MAX_NUMBER_OF_ROWS_PER_SPLIT = 4398046511104L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private static final long MAX_NUMBER_OF_SPLITS = 4194304; //we want to have 2^42 different ids per split so this is the maximum possible number of splits (2^64/2^42) | |
private static final long MAX_NUMBER_OF_ROWS_PER_SPLIT = 4398046511104L; | |
// We partitions the rowId splace between splits assigning each split 2^42 ids. | |
// As we need to encode the split number in id it allows us to have at most 2^22 splits per query | |
private static final long MAX_NUMBER_OF_ROWS_PER_SPLIT = 1L << 42; | |
private static final long MAX_NUMBER_OF_SPLITS = 1L << 22; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -362,6 +367,9 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) { | |||
splitBytes = internalSplit.getEnd() - internalSplit.getStart(); | |||
} | |||
|
|||
long currentSplitNumber = numberOfProcessedSplits.getAndIncrement(); | |||
checkState(currentSplitNumber < MAX_NUMBER_OF_SPLITS, "Number of splits is higher than maximum possible number of splits"); | |||
OptionalLong initialRowId = OptionalLong.of(currentSplitNumber * MAX_NUMBER_OF_ROWS_PER_SPLIT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer currentSplitNumber << 42
.
Also please extract 42
as PER_SPLIT_ROW_ID_BITS
and 64 - PER_SPLIT_ROW_ID_BITS
as SPLIT_ID_BITS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
OptionalLong initialRowId, | ||
OptionalLong maxNumberOfRowsPerSplit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why optional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, thanks
} | ||
checkState(maxNumberOfRowsPerSplit > insertRowCounter, "Trying to insert too many rows in a single split"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add maxNumberOfRowsPerSplit
to error message. Also make it throw TrinoException
with GENERIC_INSUFFICIENT_RESOURCES
error code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -362,6 +367,9 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) { | |||
splitBytes = internalSplit.getEnd() - internalSplit.getStart(); | |||
} | |||
|
|||
long currentSplitNumber = numberOfProcessedSplits.getAndIncrement(); | |||
checkState(currentSplitNumber < MAX_NUMBER_OF_SPLITS, "Number of splits is higher than maximum possible number of splits"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throw TrinoException
with GENERIC_INSUFFICIENT_RESOURCES
error code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
883d1b3
to
e0b84f1
Compare
@losipiuk comments addressed |
// We partitions the rowId space between splits assigning each split 2^42 ids. | ||
// As we need to encode the split number in id it allows us to have at most 2^22 splits per query |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move comment to the top of section which devices 4 constants
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
e0b84f1
to
7b83bf6
Compare
if (currentSplitNumber >= MAX_NUMBER_OF_SPLITS) { | ||
throw new TrinoException(GENERIC_INSUFFICIENT_RESOURCES, format("Number of splits is higher than maximum possible number of splits %d", MAX_NUMBER_OF_SPLITS)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I have second thoughts about that.
I did not think about this before, but I do not like the fact that current code it limits the number of splits that can be processed by a query, even if we are not doing UPDATE and HiveUpdatablePageSource
is not being constructed.
What I think would be better, to just record splitNumber
in HiveSplit
here (instead intialRowId
and maxNumberOfRowsPerSplit
).
Then we can compute do the validation and compute initialRowId
and maxNumberOfRowsPerSplit
in HivePageSourceProvider
where HiveUpdatablePageSource
is created.
Then we can make the splitNumber
in HiveSplit
non-optional.
And we should move constants definitions to HivePageSourceProvider
.
WDYT @homar ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds really good, thanks
736ddc6
to
776ed3b
Compare
@losipiuk I addressed all the comments |
Fixes trinodb#8268 The problem was caused by multiple rows having the same (writeId, bucket, rowId). In order to fix this it is necessary to ensure unique row IDs across writers. To achieve it different writers will have separated id ranges in the split assigned to them
776ed3b
to
10768f1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx
CI: #8478 |
|
||
long currentSplitNumber = hiveSplit.getSplitNumber(); | ||
if (currentSplitNumber >= MAX_NUMBER_OF_SPLITS) { | ||
throw new TrinoException(GENERIC_INSUFFICIENT_RESOURCES, format("Number of splits is higher than maximum possible number of splits %d", MAX_NUMBER_OF_SPLITS)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Provide actual value of currentSplitNumber
in the exception message as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if super beneficial. Most of the time it will be == MAX_NUMBER_OF_SPLITS
as we are generating splits from sequence; and we throw as soon as we cross the boundary.
Fixes #8268
The problem was caused by multiple rows having
the same (writeId, bucket, rowId). In order to fix this
it is necessary to ensure unique row IDs across writers.
To achieve it different writers will have separated
id ranges in the split assigned to them