Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35241][table]Support SQL FLOOR and CEIL functions with SECOND and MINUTE for TIMESTAMP_TLZ #25759

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

hanyuzheng7
Copy link
Contributor

@hanyuzheng7 hanyuzheng7 commented Dec 6, 2024

What is the purpose of the change

We need a fix for both SECOND and MINUTE.

The following query doesn't work:

SELECT
  FLOOR(
      CAST(TIMESTAMP '2024-04-25 17:19:42.654' AS TIMESTAMP_LTZ(3))
  TO MINUTE) 

These two queries work:

SELECT
  FLOOR(
      CAST(TIMESTAMP '2024-04-25 17:19:42.654' AS TIMESTAMP_LTZ(3))
  TO HOUR) 
SELECT
  FLOOR(
      TIMESTAMP '2024-04-25 17:19:42.654'
  TO MINUTE) 

Stack trace for the first not working query from above:

Caused by: io.confluent.flink.table.utils.CleanedException: org.codehaus.commons.compiler.CompileException: Line 41, Column 69: No applicable constructor/method found for actual parameters "org.apache.flink.table.data.TimestampData, org.apache.flink.table.data.TimestampData"; candidates are: "public static long org.apache.flink.table.runtime.functions.SqlFunctionUtils.floor(long, long)", "public static float org.apache.flink.table.runtime.functions.SqlFunctionUtils.floor(float)", "public static org.apache.flink.table.data.DecimalData org.apache.flink.table.runtime.functions.SqlFunctionUtils.floor(org.apache.flink.table.data.DecimalData)", "public static int org.apache.flink.table.runtime.functions.SqlFunctionUtils.floor(int, int)", "public static double org.apache.flink.table.runtime.functions.SqlFunctionUtils.floor(double)"
at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:13080)
at org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:9646)
at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9506)
at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9422)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5263)
... 

Brief change log

(for example:)

  • The TaskInfo is stored in the blob store on job creation time as a persistent artifact
  • Deployments RPC transmits only the blob storage reference
  • TaskManagers retrieve the TaskInfo from the blob cache

Verifying this change

  • Added unit tests in TimeFunctionsITCase

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 6, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@hanyuzheng7
Copy link
Contributor Author

@flinkbot run azure

@@ -432,15 +435,17 @@ private Stream<TestSetSpec> temporalOverlapsTestCases() {
}

private Stream<TestSetSpec> ceilTestCases() {
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
return Stream.of(
TestSetSpec.forFunction(BuiltInFunctionDefinitions.FLOOR)
.onFieldsWithData(
// https://issues.apache.org/jira/browse/FLINK-17224
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hanyuzheng7 can you read this ticket and see if we should update this comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jnh5y Now, we test TIMESTAMP_TLZ, which means when we do the test, the result depend on the the timezone you run the test, so we should fix the timezone, so each people can pass these tests in their timezone. If I don't fix the time zone, these test can pass in my laptop, but cannot pass in flink ci server, because my time zone is different than flink ci server's time zone

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can not make any assumption about flink ci timezone.
Nobody guarantees that together with movement to GitHub it stays same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@snuyanzin I think I can extract these tests and generate an new stream TestCases. In this TestCases, I can fix the timeZone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that was the thing I initially tried to suggest
however need to be sure that after test stream finishes the timezone should be reset to the initial value

@jnh5y
Copy link
Contributor

jnh5y commented Dec 9, 2024

Is there any documentation for this function? If it mentions details, we may need to update them.

}

private Stream<TestSetSpec> floorTestCases() {
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's weird that we set it for all test cases...
can we set it only for some certain test cases and then unset it back?

Copy link
Contributor Author

@hanyuzheng7 hanyuzheng7 Dec 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do that in floor and ceil test?
I think I can extract these tests and generate an new stream TestCases.

@hanyuzheng7
Copy link
Contributor Author

@flinkbot run azure

@hanyuzheng7 hanyuzheng7 force-pushed the flink35241foorandceil branch from 104f37c to 63ce219 Compare December 11, 2024 20:51
Copy link
Contributor

@snuyanzin snuyanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested locally

if I floor/ceil to millisecond/nanosecond I still receive same issue as in jira description

@hanyuzheng7
Copy link
Contributor Author

@snuyanzin Yes, because this ticket just handle the SECOND and MINUTE case.

@snuyanzin
Copy link
Contributor

the root cause is same, if you compare output for different inputs, it will be same.
The jira issue was raised for the case the reporter faced however it does not mean it is limited only for this input

@hanyuzheng7
Copy link
Contributor Author

hanyuzheng7 commented Dec 12, 2024

@snuyanzin
The reason we not support millisecond/nanosecond is that floor and ceil not support these two perfectly. You can see here

 /**
     * Keep the algorithm consistent with Calcite DateTimeUtils.julianDateFloor, but here we take
     * time zone into account.
     */
    public static long timestampCeil(TimeUnitRange range, long ts, TimeZone tz) {
        // assume that we are at UTC timezone, just for algorithm performance
        long offset = tz.getOffset(ts);
        long utcTs = ts + offset;

        switch (range) {
            case NANOSECOND:
                return ceil(utcTs, ?) - offset;
            case MILLISECOND:
                return ceil(utcTs, 1L) - offset;
            case SECOND:
                return ceil(utcTs, MILLIS_PER_SECOND) - offset;
            case MINUTE:
                return ceil(utcTs, MILLIS_PER_MINUTE) - offset;
            case HOUR:
                return ceil(utcTs, MILLIS_PER_HOUR) - offset;
            case DAY:
                return ceil(utcTs, MILLIS_PER_DAY) - offset;
            case MILLENNIUM:
            case CENTURY:
            case DECADE:
            case MONTH:
            case YEAR:
            case QUARTER:
            case WEEK:
                int days = (int) (utcTs / MILLIS_PER_DAY + EPOCH_JULIAN);
                return julianDateFloor(range, days, false) * MILLIS_PER_DAY - offset;
            default:
                // for MINUTE and SECONDS etc...,
                // it is more effective to use arithmetic Method
                throw new AssertionError(range);
        }
    }

for DAY and HOUR now base on

    /** The number of milliseconds in a second. */
    private static final long MILLIS_PER_SECOND = 1000L;

    /** The number of milliseconds in a minute. */
    private static final long MILLIS_PER_MINUTE = 60000L;

    /** The number of milliseconds in an hour. */
    private static final long MILLIS_PER_HOUR = 3600000L; // = 60 * 60 * 1000

    /**
     * The number of milliseconds in a day.
     *
     * <p>This is the modulo 'mask' used when converting TIMESTAMP values to DATE and TIME values.
     */
    public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 * 1000

We can use 1L to stand for MILLIS_PER_MILLISECOND, but how can we stand for NANOSECOND, it is smaller than MILLISECOND.

If we want to support it, we need change the time base from MILLISECOND to NANOSECOND, it will change lots of behavior for ceil and floor, so I suggest to generate an new ticket to support millisecond/nanosecond.

@snuyanzin
Copy link
Contributor

snuyanzin commented Dec 12, 2024

At least we have to support it for all timeunits for which we have already existing tests
for instance if there is a test for MILLISECOND like here

.testResult(
$("f1").ceil(TimeIntervalUnit.MILLISECOND),
"CEIL(f1 TO MILLISECOND)",
LocalDate.of(1990, 10, 14),
DATE().nullable())
.testResult(
$("f2").ceil(TimeIntervalUnit.MILLISECOND),
"CEIL(f2 TO MILLISECOND)",
LocalDateTime.of(2020, 2, 29, 1, 56, 59, 988_000_000),
TIMESTAMP().nullable())

then same should be supported for case with cast as well.

If there is no such for NANOSECOND then we could skip it

@hanyuzheng7
Copy link
Contributor Author

Support MILLISECOND now @snuyanzin

@snuyanzin
Copy link
Contributor

I will have a look at this tomorrow
meanwhile can you create a separate jira issue for the rest?

Comment on lines +806 to +807
DateTimeFormatter.ofPattern(
"yyyy-MM-dd' 'HH:mm:ss.SSS")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw this could be extracted into constant since it's used multiple times

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@hanyuzheng7 hanyuzheng7 reopened this Dec 12, 2024
@hanyuzheng7
Copy link
Contributor Author

I will have a look at this tomorrow meanwhile can you create a separate jira issue for the rest?

I will open an jira ticket for it

@hanyuzheng7 hanyuzheng7 reopened this Dec 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants