-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Timestamp type in xlang JDBC Read and Write #22561
Conversation
3dc1a57
to
006885e
Compare
Run Python 3.8 PostCommit |
661be51
to
77a42c6
Compare
Run Python 3.8 PostCommit |
Run Python 3.8 PostCommit |
Codecov Report
@@ Coverage Diff @@
## master #22561 +/- ##
==========================================
+ Coverage 73.69% 73.73% +0.04%
==========================================
Files 713 713
Lines 94821 94974 +153
==========================================
+ Hits 69874 70030 +156
+ Misses 23648 23645 -3
Partials 1299 1299
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
dd7cb95
to
aec0fd8
Compare
Run Python 3.8 PostCommit |
Run Python 3.8 PostCommit |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
c8647eb
to
8d4575a
Compare
Rebased in order to update CHANGES.md. ready for review now. |
Run Python 3.8 PostCommit |
@@ -647,9 +648,36 @@ def _from_typing(cls, typ): | |||
('micros', np.int64)]) | |||
|
|||
|
|||
@LogicalType.register_logical_type | |||
class DateTimeLogicalType(NoArgumentLogicalType[Timestamp, np.int64]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this? I thought the solution here was to use the MicrosInstant logical type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. removed this class the test still pass and confirmed with database content.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah messed up local virtualenv. Remove @LogicalType.register_logical_type
or the entire class here causes the following error:
Traceback (most recent call last):
File "...virtualenv/py38beam/lib/python3.8/site-packages/apache_beam/typehints/schemas.py", line 408, in typing_from_runner_api
field_py_type = self.typing_from_runner_api(field.type)
File "...virtualenv/py38beam/lib/python3.8/site-packages/apache_beam/typehints/schemas.py", line 362, in typing_from_runner_api
base = self.typing_from_runner_api(base_type)
File "...virtualenv/py38beam/lib/python3.8/site-packages/apache_beam/typehints/schemas.py", line 447, in typing_from_runner_api
return LogicalType.from_runner_api(
File "...virtualenv/py38beam/lib/python3.8/site-packages/apache_beam/typehints/schemas.py", line 621, in from_runner_api
raise ValueError(
ValueError: No logical type registered for URN 'beam:logical_type:datetime:v1'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
...
File "...virtualenv/py38beam/lib/python3.8/site-packages/apache_beam/typehints/schemas.py", line 179, in typing_from_runner_api
return SchemaTranslation(
File "...virtualenv/py38beam/lib/python3.8/site-packages/apache_beam/typehints/schemas.py", line 412, in typing_from_runner_api
raise ValueError(
ValueError: Failed to decode schema due to an issue with Field proto:
name: "f_timestamp"
type {
nullable: true
logical_type {
urn: "beam:logical_type:datetime:v1"
representation {
atomic_type: INT64
}
}
}
id: 1
encoding_position: 1
Still need this to register the URN.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused though, how did we get a field with URN beam:logical_type:datetime:v1
? Based on the change in JdbcUtil I thought the intention here was to map JDBC timestamp columns to the MicrosInstant logical type, with URN beam:logical_type:micros_instant:v1
, in order to avoid the proiblematic InstantCoder in DATETIME.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change in JdbcUtil and Python are for different purposes. Basically there are two things missing for java<->portable timestamp: (1) Java JDBCIO support of portable timestamp; (2) Python support for java timestamp.
The JdbcUtil change is for write to jdbc. It adds support in Java's JDBCIO of converting portable timestamp type (beam:logical_type:micros_instant:v1) to sql timestamp;
The Python coder and schema change is for read from jdbc. It adds supports in Python's RowCoder of identifying (by schema change) and decoding (by coder change) the java timestamp type (datetime) to language timestamp. The field 'beam:logical_type:datetime:v1' is passed from java here when the row contains datetime field type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to change coder for Instant in java side but it did not quite work (see comment here) and ended up adding support for InstantCoder coded timestamp in Python side. I understand this is less elegant but it does not touch java Instant coders which is used everywhere not only in schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make portable timestamp and java timestamp the same type? I think it would be preferable to produce micros_instant for the timestamp type in the JDBC read. I'd really like to avoid having Java's DATETIME primitive leak into Python.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this is preferable in terms of consistency. To do this we will need to make distinction of timestamp in schema based IOs (including JDBC) and windowing timestamp, making the former produce micros_instant instead of joda.Instant, which is similar to the approach here #11456. However given the scope of breaking changes this may leave as next major version change...
If I understood correctly it's not leaking Java's DATETIME primitive (Schema.FieldType DATETIME), the schema passed to python is the logical type beam:logical_type:datetime:v1
that not yet implemented in python. However the coder that used to decode this type is actually available in python which is TimestampCoder. This is still in line with Portable Beam Schema design doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could there be a new logical type then for use in JDBC read and write that uses millisecond precision, and maps to joda.Instant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now renamed this usage to millis_instant logical type
// MicrosInstant uses native java.time.Instant instead of joda.Instant. | ||
java.time.Instant value = | ||
element.getLogicalTypeValue(fieldWithIndex.getIndex(), java.time.Instant.class); | ||
ps.setTimestamp(i + 1, value == null ? null : new Timestamp(value.toEpochMilli())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish we had a more general solution for mapping these to java time vs. joda time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is a mapping from java.time.Instant to java.sql.Timestamp, joda.time is not involved. The comment was a clarification because I see joda.time.Instant is usually used in the codebase (that's why I did not declare import Instant but write down the whole class name here)
Do you think we need a general Row.getJavaInstant() as Row.getDateTime() ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok, my mistake. Regardless there's no action needed here, I was just lamenting that this date/time type situation is messy.
# Special case for datetime logical type. | ||
# DATETIME_URN explicitly uses TimestampCoder which deals with fix length | ||
# 8-bytes big-endian-long instead of varint coder. | ||
return TimestampCoder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do implement beam:logical_type:datetime
in Python we should:
- Add the logical type in schema.proto, and reference the URN from that file:
beam/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto
Line 117 in 4799828
message LogicalTypes { - Consider if we can keep the logic contained in the logical type implementation, e.g. can we do the byte flipping in a to_language_type and to_representation_type implementation?
- Add some test cases to standard_coders.yaml, like this one:
beam/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
Lines 449 to 454 in 4799828
coder: urn: "beam:coder:row:v1" # f_timestamp: logical(micros_instant), f_string: string, f_int: int64 payload: "\n\x7f\n\x0bf_timestamp\x1ap:n\n#beam:logical_type:micros_instant:v1\x1aG2E\nC\n\r\n\x07seconds\x1a\x02\x10\x04\n\x0c\n\x06micros\x1a\x02\x10\x04\x12$4d3f6e8f-7412-4ad7-bfd9-b424a1664aef\n\x0e\n\x08f_string\x1a\x02\x10\x07\n\x0b\n\x05f_int\x1a\x02\x10\x04\x12$33dafd37-397c-4083-a84e-42177d122221" examples: "\x03\x00\x02\x00\xb6\x95\xd5\xf9\x05\xc0\xc4\x07\x1b2020-08-13T14:14:14.123456Z\xc0\xf7\x85\xda\xae\x98\xeb\x02": {f_timestamp: {seconds: 1597328054, micros: 123456}, f_string: "2020-08-13T14:14:14.123456Z", f_int: 1597328054123456} var filteredCases = []struct{ filter, reason string }{
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the instructions! Will complete the proto change. beam:logical_type:datetime:v1
is the millisecond precision timestamp backed by a fixed length INT64 that encoded with BigEndianLong. Will complete the proto change.
can we do the byte flipping in a to_language_type and to_representation_type implementation?
Considered this before. Under the current framework the value is already decoded with VarInt coder before sent to to_language_type or to_representation_type which is incorrect. All I need is a 8-byte fixed long integer primitive which does not exist in portable primitives, and there is not yet a consensus on reference coder (context). Therefore I ended up this solution to make things work while not introducing new primitives or other fundamental changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah... test case in standard_coders.yaml won't work either because Both micros_instant and millis_instant has Timestamp language type and the former will take over the priority of millis_instant (by design). When a timestamp is decoded to an integer, the test will call micros_instant's to_language_type.
i.e. MillisInstant->Timestamp conversion is one direction and Timestamp <-> MicrosInstant is bidirectional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I'd really like to have this tested at the standard_coders.yaml
level as a unit test, it's not ideal if the only thing verifying compatibility is an integration test in Python.
If you have time tomorrow maybe we could do a quick video call to live debug what's going wrong here, and see if we can work around it. If it will be painful we can leave it as future work, but I'd like to understand the level of effort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Managed to get a test case using the following code snippet
def generate_millis():
# Logical type that deals with millis_instant urn (MillisInstant)
MillisLogicalType = LogicalType._known_logical_types.get_logical_type_by_urn('beam:logical_type:millis_instant:v1')
# Original Logical type used to represent Timestamp (MicrosInstant)
TimestampLogicalType = LogicalType._known_logical_types.get_logical_type_by_language_type(Timestamp)
LogicalType._known_logical_types.by_language_type[Timestamp] = MillisLogicalType
schema = beam.typehints.schemas.named_tuple_to_schema(TestTuple)
coder = beam.coders.row_coder.RowCoder(schema)
print("payload = %s" % schema.SerializeToString())
examples = (TestTuple(
f_timestamp=Timestamp.from_rfc3339("2020-08-13T14:14:14.123Z"),
f_string="2020-08-13T14:14:14.123Z",
f_int=1597328054123),)
for example in examples:
print("example = %s" % coder.encode(example))
# recover original registration
LogicalType._known_logical_types.by_language_type[Timestamp] = TimestampLogicalType
The workaround is temporarily change the mapping of Timestamp -> MillisInstant logical type. Without it Timestamp always maps to MicrosInstant logical type in Python.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahhh ok, so it looks like the issue was using Python to generate the test values. Another strategy might be to create the schema proto directly. But this way works too and is probably less code, thanks!
d0498fa
to
df31d95
Compare
# Special case for datetime logical type. | ||
# DATETIME_URN explicitly uses TimestampCoder which deals with fix length | ||
# 8-bytes big-endian-long instead of varint coder. | ||
return TimestampCoder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I'd really like to have this tested at the standard_coders.yaml
level as a unit test, it's not ideal if the only thing verifying compatibility is an integration test in Python.
If you have time tomorrow maybe we could do a quick video call to live debug what's going wrong here, and see if we can work around it. If it will be painful we can leave it as future work, but I'd like to understand the level of effort.
dbf0c2f
to
5ac3b4a
Compare
Run Python_PVR_Flink PreCommit |
Run Java PreCommit |
Run Python 3.8 PostCommit |
Run Java PreCommit |
postcommit test Apparently the branch itself works but now failing when merge onto master. Investigating. Update: related to the change of #22679. After it tests fail. |
* Support beam:logical_type:datetime:v1 in python sdk * Support micro_instant logical type in java jdbc * Fix test timestamp milli precision support in mysql
Co-authored-by: Brian Hulette <[email protected]>
a6f94e8
to
b0484e7
Compare
Rebased onto latest master; the last commit is a fix of test |
Run Python 3.8 PostCommit |
@@ -587,6 +588,7 @@ def to_language_type(value): | |||
def register_logical_type(cls, logical_type_cls): | |||
"""Register an implementation of LogicalType.""" | |||
cls._known_logical_types.add(logical_type_cls.urn(), logical_type_cls) | |||
return logical_type_cls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These places were some minor bugs. If not return logical_type_cls, class decorated with @register_logical_type
cannot be imported (will be None) and docstring is also missing from pydoc; signatures of to_representation_type
and to_language_type
should have a value parameter as its child classes. Fixed here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
Run Java PreCommit |
Run Python PreCommit |
1 similar comment
Run Python PreCommit |
Part of #19817
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.