Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hive cannot read ORC ACID table updated by Trino twice #8268

Closed
Tracked by #3325
findepi opened this issue Jun 11, 2021 · 16 comments · Fixed by #8448
Closed
Tracked by #3325

Hive cannot read ORC ACID table updated by Trino twice #8268

findepi opened this issue Jun 11, 2021 · 16 comments · Fixed by #8448
Labels
bug Something isn't working
Milestone

Comments

@findepi
Copy link
Member

findepi commented Jun 11, 2021

0: jdbc:hive2://localhost:10000/default> SELECT * FROM test_test_update_subquery_false_NONE_nw4lj1py3ix2;
INFO  : Compiling command(queryId=hive_20210612021658_c6a18d73-d787-4e75-b53e-698310076520): SELECT * FROM test_test_update_subquery_false_NONE_nw4lj1py3ix2
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:test_test_update_subquery_false_none_nw4lj1py3ix2.column1, type:int, comment:null), FieldSchema(name:test_test_update_subquery_false_none_nw4lj1py3ix2.column2, type:string, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20210612021658_c6a18d73-d787-4e75-b53e-698310076520); Time taken: 0.139 seconds
INFO  : Executing command(queryId=hive_20210612021658_c6a18d73-d787-4e75-b53e-698310076520): SELECT * FROM test_test_update_subquery_false_NONE_nw4lj1py3ix2
INFO  : Completed executing command(queryId=hive_20210612021658_c6a18d73-d787-4e75-b53e-698310076520); Time taken: 0.001 seconds
INFO  : OK


Error: java.io.IOException: java.io.IOException: Two readers for {originalWriteId: 3, bucket: 536870912(1.0.0), row: 0, currentWriteId 4}: new [key={originalWriteId: 3, bucket: 536870912(1.0.0), row: 0, currentWriteId 4}, nextRecord={2, 3, 536870912, 0, 4, null}, reader=Hive ORC Reader(hdfs://hadoop-master:9000/user/hive/warehouse/test_test_update_subquery_false_none_nw4lj1py3ix2/delete_delta_0000004_0000004_0002/bucket_00000, 9223372036854775807)], old [key={originalWriteId: 3, bucket: 536870912(1.0.0), row: 0, currentWriteId 4}, nextRecord={2, 3, 536870912, 0, 4, null}, reader=Hive ORC Reader(hdfs://hadoop-master:9000/user/hive/warehouse/test_test_update_subquery_false_none_nw4lj1py3ix2/delete_delta_0000004_0000004_0000/bucket_00000, 9223372036854775807)] (state=,code=0)

repro steps in #8267 as a TODO
full repro steps in a comment below #8268 (comment)

@findepi findepi added the bug Something isn't working label Jun 11, 2021
@findepi findepi mentioned this issue Jun 11, 2021
8 tasks
@Praveen2112
Copy link
Member

Since we update on the same key multiple times it throws this exception. If we do a major compaction post each update. Things should work as expected. Will test and raise a PR.

@findepi
Copy link
Member Author

findepi commented Jun 17, 2021

If we do a major compaction post each update. Things should work as expected.

Trino cannot do a major compaction.

Table should be readable also if a compaction doesn't happen.
Even if we were able to do a compaction, it would not be feasible to require a compaction after every update or every other update.
And it wouldn't be correct from concurrency perspective either.

@Praveen2112
Copy link
Member

But for tests maybe we could trigger the compaction from Hive after assertion. Or we would need to apply update on a different set of columns.

@findepi
Copy link
Member Author

findepi commented Jun 17, 2021

I would see the bug fixed rather than worked around in tests. Assuming it is indeed a bug.

@djsstarburst do you happen to recognize this?

@Praveen2112
Copy link
Member

But the bug is in Hive side ? https://issues.apache.org/jira/browse/HIVE-22318

It looks like the delete delta are of same size and it would create similar record identifier. (as quoted in the JIRA)

@Praveen2112
Copy link
Member

Other workaround is to ensure the update is applied on different set of rows instead of applying on same set of rows.

@Praveen2112
Copy link
Member

Corresponding JIRA in Hive : https://issues.apache.org/jira/browse/HIVE-22318

@findepi
Copy link
Member Author

findepi commented Jun 17, 2021

But the bug is in Hive side ? https://issues.apache.org/jira/browse/HIVE-22318

the jira talks about Hive's MERGE statement.
So the bug can be in Hive's ORC reader, or Hive ORC writer, or Hive MERGE statement implementation.

Can we assume at this point there is no bug on the Trino side?

@Praveen2112
Copy link
Member

the jira talks about Hive's MERGE statement.

This is seen both during merge statement and when selecting from that Table. From the exception and its source it looks like the issue is during Reading the ORC file (with a bunch of delete delta). Ref : https://github.com/apache/hive/blob/d0bbe76ad626244802d062b0a93a9f1cd4fc5f20/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java#L1225

Can we assume at this point there is no bug on the Trino side?

Yes !! We are able to read the data and its the updated one so its not a bug in Trino side.

@findepi
Copy link
Member Author

findepi commented Jun 18, 2021

Full repro steps:

bin/ptl env up --environment singlenode --config config-hdp3
# client/trino-cli/target/trino-cli-*-executable.jar --debug --server localhost:8080 --catalog hive --schema default
trino:default> CREATE TABLE region AS TABLE tpch.tiny.region;
CREATE TABLE: 5 rows

trino:default> CREATE TABLE t (column1 int, column2 varchar) WITH (transactional = true);
CREATE TABLE
trino:default> INSERT INTO t VALUES (1, 'x');
INSERT: 1 row

trino:default> INSERT INTO t VALUES (2, 'y');
INSERT: 1 row

trino:default> UPDATE t SET column2 = (SELECT max(name) FROM region); -- BTW the problem is reproducible also when using SET column2 = 'MIDDLE EAST' here
UPDATE: 2 rows

trino:default> UPDATE t SET column2 = (SELECT min(name) FROM region); -- BTW the problem is reproducible also when using SET column2 = 'AFRICA' here
UPDATE: 2 rows

trino:default> SELECT * FROM t;
            ->
 column1 | column2
---------+---------
       2 | AFRICA
       1 | AFRICA
(2 rows)

now in Hive:

$ docker exec -itu hive ptl-hadoop-master bash -l
[hive@hadoop-master /]$ beeline -n hive

0: jdbc:hive2://localhost:10000/default> SELECT * FROM t;

Error: java.io.IOException: java.io.IOException: Two readers for {originalWriteId: 3, bucket: 536870912(1.0.0), row: 0, currentWriteId 4}: new [key={originalWriteId: 3, bucket: 536870912(1.0.0), row: 0, currentWriteId 4}, nextRecord={2, 3, 536870912, 0, 4, null}, reader=Hive ORC Reader(hdfs://hadoop-master:9000/user/hive/warehouse/t/delete_delta_0000004_0000004_0002/bucket_00000, 9223372036854775807)], old [key={originalWriteId: 3, bucket: 536870912(1.0.0), row: 0, currentWriteId 4}, nextRecord={2, 3, 536870912, 0, 4, null}, reader=Hive ORC Reader(hdfs://hadoop-master:9000/user/hive/warehouse/t/delete_delta_0000004_0000004_0001/bucket_00000, 9223372036854775807)] (state=,code=0)

however if i recreate the table in Trino

DROP TABLE t;
CREATE TABLE t (column1 int, column2 varchar) WITH (transactional = true);

and then run INSERTs and UPDATEs in Hive then the SELECT * FROM t does not fail in Hive (and it does not fail in Trino either)

INSERT INTO t VALUES (1, 'x');
INSERT INTO t VALUES (2, 'y');
UPDATE t SET column2 = 'MIDDLE EAST'; -- not using subquery here, because Hive doesn't support that and this must not matter
UPDATE t SET column2 = 'AFRICA'; -- as above
SELECT * FROM t;

+------------+------------+
| t.column1  | t.column2  |
+------------+------------+
| 1          | AFRICA     |
| 2          | AFRICA     |
+------------+------------+

or, if i recreate and populate the table in Trino

DROP TABLE t;
CREATE TABLE t (column1 int, column2 varchar) WITH (transactional = true);
INSERT INTO t VALUES (1, 'x');
INSERT INTO t VALUES (2, 'y');

and then run UPDATEs in Hive then the SELECT * FROM t does not fail in Hive (and it does not fail in Trino either)

UPDATE t SET column2 = 'MIDDLE EAST'; -- as above
UPDATE t SET column2 = 'AFRICA'; -- as above
SELECT * FROM t;

+------------+------------+
| t.column1  | t.column2  |
+------------+------------+
| 1          | AFRICA     |
| 2          | AFRICA     |
+------------+------------+

To be the above is quite convincing it's a problem in how Trino UPDATE creates delta files.
It creates them in way that can be read by Trino, but cannot be read by Hive.
And it's not an inherent problem with the Hive reader.
I am not saying Hive reader is bug-free, but Hive is the reference implementation of Hive, so
Trino should produce ORC ACID files readable by Hive if possible. And it clearly is possible
in this case.

@Praveen2112
Copy link
Member

To be the above is quite convincing it's a problem in how Trino UPDATE creates delta files.

When we run a query like this on fresh table

INSERT INTO t VALUES (1, 'x');

Trino inserts the data into the following directory

/user/hive/warehouse/t/delta_0000001_0000001_0000

And when we insert another row in it

INSERT INTO t VALUES (2, 'y');

Trino inserts the data into the following directory

/user/hive/warehouse/t/delta_0000002_0000002_0000

So now when we run an update like this

UPDATE t SET column2 = 'MIDDLE EAST';

Trino creates a delta directories for each of the directory (delta_0000001_0000001_0000, delta_0000002_0000002_0000)
for deletes and inserts unlike Hive which creates a directory per transaction and now the deleted rows are uniquely mapped to each file as each deleted row information has the same rowId but different transactionId - so now hive could use this to delete corresponding row in any of the base or delta file (so does Trino)

now when we run an update like this

UPDATE t SET column2 = 'INDIA';

Trino creates two more directories for the new delta ( referring to delta_0000001_0000001_0000, delta_0000002_0000002_0000) but now the deleted rows information have same rowId and the transactionId. When hive reads the delete_delta directory it has two files having same delete row information and it throws that TwoReader exception.Additional hive doesn't know how to map this delete information to which of the file (while in Trino it knows the mapping details so it works properly).

One solution is introduce a different bucket number for each of the delta directories created so that similar rowIds could be mapped to a different bucket.

Please correct me if I am wrong.

@djsstarburst
Copy link
Member

I'm surprised that Hive can't read files with the same bucket but different statementIds. Confirming what you found, I used the orc-tools to decode the data files after the two inserts and two updates in the test cased by @findepi. The results are below.

I guess it's obvious - - and I just tested it - - that if the two rows were inserted in a single insert transaction the test passes, because there is only one split in the bucket.

To avoid producing files with different statementIds, I think Trino UPDATE would have to add an ExchangeNode layer to flow all the splits belonging to a single bucket into one node and one file. @electrum, your thoughts?

./delta_0000001_0000001_0000/bucket_00000
{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":0,"currentTransaction":1,"row":{"column1":1,"column2":"x"}}
________________________________________________________________________________________________________________________

./delta_0000002_0000002_0000/bucket_00000
{"operation":0,"originalTransaction":2,"bucket":536870912,"rowId":0,"currentTransaction":2,"row":{"column1":2,"column2":"y"}}
________________________________________________________________________________________________________________________

./delete_delta_0000003_0000003_0000/bucket_00000
{"operation":2,"originalTransaction":1,"bucket":536870912,"rowId":0,"currentTransaction":3,"row":null}
________________________________________________________________________________________________________________________

./delete_delta_0000003_0000003_0001/bucket_00000
{"operation":2,"originalTransaction":2,"bucket":536870912,"rowId":0,"currentTransaction":3,"row":null}
________________________________________________________________________________________________________________________

./delta_0000003_0000003_0001/bucket_00000
{"operation":0,"originalTransaction":3,"bucket":536870912,"rowId":0,"currentTransaction":3,"row":{"column1":2,"column2":"MIDDLE EAST"}}
________________________________________________________________________________________________________________________

./delta_0000003_0000003_0000/bucket_00000
{"operation":0,"originalTransaction":3,"bucket":536870912,"rowId":0,"currentTransaction":3,"row":{"column1":1,"column2":"MIDDLE EAST"}}
________________________________________________________________________________________________________________________

./delete_delta_0000004_0000004_0000/bucket_00000
{"operation":2,"originalTransaction":3,"bucket":536870912,"rowId":0,"currentTransaction":4,"row":null}
________________________________________________________________________________________________________________________

./delta_0000004_0000004_0000/bucket_00000
{"operation":0,"originalTransaction":4,"bucket":536870912,"rowId":0,"currentTransaction":4,"row":{"column1":2,"column2":"AFRICA"}}
________________________________________________________________________________________________________________________

./delete_delta_0000004_0000004_0002/bucket_00000
{"operation":2,"originalTransaction":3,"bucket":536870912,"rowId":0,"currentTransaction":4,"row":null}
________________________________________________________________________________________________________________________

./delta_0000004_0000004_0002/bucket_00000
{"operation":0,"originalTransaction":4,"bucket":536870912,"rowId":0,"currentTransaction":4,"row":{"column1":1,"column2":"AFRICA"}}
________________________________________________________________________________________________________________________


@electrum
Copy link
Member

@electrum
Copy link
Member

The equality for ReaderKey is the tuple (originalWriteId, bucket, rowId, currentWriteId). As @Praveen2112 noted, we end up with multiple rows for the same (writeId, bucket, rowId) which is illegal, because they are the same row. We can't change bucket because that is based on the declared bucketing column(s).

Assuming this is the issue, then we need to ensure unique row IDs across the writers. I can think of two ways to do this:

  • Process all files for the same bucket with a single writer. This would be significant work to change split generation and split handling to allow multiple files in one split. We do this in Raptor, but I'd rather not introduce the complexity to Hive, which is already very complex.
  • Allocate unique row IDs across writers by assigning ranges in the splits. For example, we could give each writer a large range of 2^42, which would allow both a huge number of rows and splits.

@electrum
Copy link
Member

It looks like the current row ID generation has a bug where it gets reset for every page (which is not the cause of this issue but needs to be fixed regardless):

private Block buildAcidRowIdsColumn(int positionCount)
{
long[] rowIds = new long[positionCount];
for (int i = 0; i < positionCount; i++) {
rowIds[i] = i;
}
return new LongArrayBlock(positionCount, Optional.empty(), rowIds);
}

@electrum
Copy link
Member

Note that long term we could switch to the first strategy of single writer per bucket, after merge lands and we change the implementation of update/delete to use the merge connector APIs, which support redistribution.

homar added a commit to homar/trino that referenced this issue Jul 6, 2021
Fixes trinodb#8268
The problem was caused by multiple rows having
the same (writeId, bucket, rowId). In order to fix this
it is necessary to ensure unique row IDs across writers.
To achieve it different writers will have separated
id ranges in the split assigned to them
losipiuk pushed a commit that referenced this issue Jul 6, 2021
Fixes #8268
The problem was caused by multiple rows having
the same (writeId, bucket, rowId). In order to fix this
it is necessary to ensure unique row IDs across writers.
To achieve it different writers will have separated
id ranges in the split assigned to them
@losipiuk losipiuk mentioned this issue Jul 6, 2021
11 tasks
@losipiuk losipiuk added this to the 360 milestone Jul 6, 2021
OLMS99 pushed a commit to OLMS99/trino that referenced this issue Jul 9, 2021
Fixes trinodb#8268
The problem was caused by multiple rows having
the same (writeId, bucket, rowId). In order to fix this
it is necessary to ensure unique row IDs across writers.
To achieve it different writers will have separated
id ranges in the split assigned to them
sumannewton pushed a commit to sumannewton/trino that referenced this issue Jan 17, 2022
Fixes trinodb#8268
The problem was caused by multiple rows having
the same (writeId, bucket, rowId). In order to fix this
it is necessary to ensure unique row IDs across writers.
To achieve it different writers will have separated
id ranges in the split assigned to them
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Development

Successfully merging a pull request may close this issue.

5 participants