Skip to content

Commit

Permalink
Add integration tests, refactor to use Timestamp
Browse files Browse the repository at this point in the history
Signed-off-by: currantw <[email protected]>
  • Loading branch information
currantw committed Jan 9, 2025
1 parent c78c0a5 commit 68ec760
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,32 @@ class FlintSparkPPLBuiltInDateTimeFunctionITSuite
assertSameRows(Seq(Row(3)), frame)
}

test("test RELATIVE_TIMESTAMP") {
var frame = sql(s"""
| source = $testTable
| | eval seconds_diff = timestampdiff(SECOND, now(), relative_timestamp("now"))
| | fields seconds_diff
| | head 1
| """.stripMargin)
assertSameRows(Seq(Row(0)), frame)

frame = sql(s"""
| source = $testTable
| | eval hours_diff = timestampdiff(HOUR, now(), relative_timestamp("+1h"))
| | fields hours_diff
| | head 1
| """.stripMargin)
assertSameRows(Seq(Row(1)), frame)

frame = sql(s"""
| source = $testTable
| | eval day = day_of_week(relative_timestamp("@w0"))
| | fields day
| | head 1
| """.stripMargin)
assertSameRows(Seq(Row(1)), frame)
}

test("test CURRENT_TIME is not supported") {
val ex = intercept[UnsupportedOperationException](sql(s"""
| source = $testTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,19 @@
import org.apache.spark.sql.types.DataTypes;
import scala.Function1;
import scala.Function2;
import scala.Function3;
import scala.Option;
import scala.Serializable;
import scala.runtime.AbstractFunction1;
import scala.runtime.AbstractFunction2;
import scala.runtime.AbstractFunction3;
import scala.collection.JavaConverters;
import scala.collection.mutable.WrappedArray;

import java.lang.Boolean;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Instant;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -50,10 +46,6 @@ abstract class SerializableAbstractFunction2<T1, T2, R> extends AbstractFunction
implements Serializable {
}

abstract class SerializableAbstractFunction3<T1, T2, T3, R> extends AbstractFunction3<T1, T2, T3, R>
implements Serializable {
}

/**
* Remove specified keys from a JSON string.
*
Expand Down Expand Up @@ -212,17 +204,15 @@ public BigInteger apply(String ipAddress) {
}

/**
* Returns the {@link Instant} corresponding to the given relative string, current instant, and time zone identifier.
* Returns the {@link Timestamp} corresponding to the given relative string, current timestamp, and time zone identifier.
* Throws {@link RuntimeException} if the relative timestamp string is not supported.
*/
Function3<String, Instant, String, Instant> relativeTimestampFunction = new SerializableAbstractFunction3<String, Instant, String, Instant>() {
Function2<String, Timestamp, Timestamp> relativeTimestampFunction = new SerializableAbstractFunction2<String, Timestamp, Timestamp>() {
@Override
public Instant apply(String relativeDateTimeString, Instant currentInstant, String zoneIdString) {
ZoneId zoneId = ZoneId.of(zoneIdString);
LocalDateTime currentLocalDateTime = LocalDateTime.ofInstant(currentInstant, zoneId);
public Timestamp apply(String relativeDateTimeString, Timestamp currentTimestamp) {
LocalDateTime currentLocalDateTime = currentTimestamp.toLocalDateTime();
LocalDateTime relativeLocalDateTime = TimeUtils.getRelativeLocalDateTime(relativeDateTimeString, currentLocalDateTime);

return relativeLocalDateTime.atZone(zoneId).toInstant();
return Timestamp.valueOf(relativeLocalDateTime);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,7 @@ public interface BuiltinFunctionTransformer {
})
.put(
RELATIVE_TIMESTAMP,
args -> {
return SerializableUdf.visit("relative_timestamp", List.of(args.get(0), CurrentTimestamp$.MODULE$.apply(), CurrentTimeZone$.MODULE$.apply()));
})
args -> SerializableUdf.visit("relative_timestamp", List.of(args.get(0), CurrentTimestamp$.MODULE$.apply())))
.build();

static Expression builtinFunction(org.opensearch.sql.ast.expression.Function function, List<Expression> args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,16 @@

import org.junit.Test;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.sql.Timestamp;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.opensearch.sql.expression.function.SerializableUdf.relativeTimestampFunction;

public class SerializableTimeUdfTest {

// Monday, Jan 03, 2000 @ 01:01:01.100 UTC
private final LocalDateTime MOCK_LOCAL_DATE_TIME = LocalDateTime.parse("2000-01-03T01:01:01.100");
private final ZoneOffset MOCK_ZONE_OFFSET = ZoneOffset.UTC;
private final Instant MOCK_INSTANT = MOCK_LOCAL_DATE_TIME.toInstant(MOCK_ZONE_OFFSET);
// Monday, Jan 03, 2000 @ 01:01:01.100
private final Timestamp MOCK_TIMESTAMP = Timestamp.valueOf("2000-01-03 01:01:01.100");

@Test
public void relativeTimestampTest() {
Expand All @@ -29,12 +25,12 @@ public void relativeTimestampTest() {
For more comprehensive tests, see {@link TimeUtilsTest}.
*/

testValid("now", "2000-01-03T01:01:01.100Z");
testValid("-60m", "2000-01-03T00:01:01.100Z");
testValid("-h", "2000-01-03T00:01:01.100Z");
testValid("+2wk", "2000-01-17T01:01:01.100Z");
testValid("-1h@h", "2000-01-03T00:00:00Z");
testValid("@d", "2000-01-03T00:00:00Z");
testValid("now", "2000-01-03 01:01:01.100");
testValid("-60m", "2000-01-03 00:01:01.100");
testValid("-h", "2000-01-03 00:01:01.100");
testValid("+2wk", "2000-01-17 01:01:01.100");
testValid("-1h@h", "2000-01-03 00:00:00");
testValid("@d", "2000-01-03 00:00:00");

testInvalid("invalid", "The relative date time 'invalid' is not supported.");
testInvalid("INVALID", "The relative date time 'INVALID' is not supported.");
Expand All @@ -49,14 +45,14 @@ public void relativeTimestampTest() {

private void testValid(String relativeString, String expectedTimestampString) {
String testMessage = String.format("\"%s\"", relativeString);
String actualTimestampString = relativeTimestampFunction.apply(relativeString, MOCK_INSTANT, MOCK_ZONE_OFFSET.toString()).toString();
assertEquals(testMessage, expectedTimestampString, actualTimestampString);
Timestamp expectedTimestamp = Timestamp.valueOf(expectedTimestampString);
Timestamp actualTimestamp = relativeTimestampFunction.apply(relativeString, MOCK_TIMESTAMP);
assertEquals(testMessage, expectedTimestamp, actualTimestamp);
}

private void testInvalid(String relativeDateTimeString, String expectedExceptionMessage) {
String testMessage = String.format("\"%s\"", relativeDateTimeString);
String actualExceptionMessage = assertThrows(testMessage, RuntimeException.class,
() -> relativeTimestampFunction.apply(relativeDateTimeString, MOCK_INSTANT, MOCK_ZONE_OFFSET.toString())).getMessage();
private void testInvalid(String relativeString, String expectedExceptionMessage) {
String testMessage = String.format("\"%s\"", relativeString);
String actualExceptionMessage = assertThrows(testMessage, RuntimeException.class, () -> relativeTimestampFunction.apply(relativeString, MOCK_TIMESTAMP)).getMessage();
assertEquals(expectedExceptionMessage, actualExceptionMessage);
}
}

0 comments on commit 68ec760

Please sign in to comment.