diff --git a/.github/workflows/sql-test-and-build-workflow.yml b/.github/workflows/sql-test-and-build-workflow.yml index 3d063a2bfc..25e0387cf3 100644 --- a/.github/workflows/sql-test-and-build-workflow.yml +++ b/.github/workflows/sql-test-and-build-workflow.yml @@ -25,10 +25,10 @@ jobs: matrix: entry: - { os: ubuntu-latest, java: 11 } - - { os: windows-latest, java: 11, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc} + - { os: windows-latest, java: 11, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc -PbuildPlatform=windows } - { os: macos-latest, java: 11, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc } - { os: ubuntu-latest, java: 17 } - - { os: windows-latest, java: 17, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc } + - { os: windows-latest, java: 17, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc -PbuildPlatform=windows } - { os: macos-latest, java: 17, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc } runs-on: ${{ matrix.entry.os }} diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/Batch.java b/core/src/main/java/org/opensearch/sql/executor/streaming/Batch.java new file mode 100644 index 0000000000..cd7d7dae5a --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/Batch.java @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.streaming; + +import lombok.Data; +import org.opensearch.sql.storage.split.Split; + +/** + * A batch of streaming execution. + */ +@Data +public class Batch { + private final Split split; +} diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/Offset.java b/core/src/main/java/org/opensearch/sql/executor/streaming/Offset.java new file mode 100644 index 0000000000..00f040e437 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/Offset.java @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.streaming; + +import lombok.Data; + +/** + * Offset. + */ +@Data +public class Offset { + + private final Long offset; +} diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/StreamingSource.java b/core/src/main/java/org/opensearch/sql/executor/streaming/StreamingSource.java new file mode 100644 index 0000000000..ebd3fa714b --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/StreamingSource.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.streaming; + +import java.util.Optional; + +/** + * Streaming source. + */ +public interface StreamingSource { + /** + * Get current {@link Offset} of stream data. + * + * @return empty if the stream does not has new data. + */ + Optional getLatestOffset(); + + /** + * Get a {@link Batch} from source between (start, end]. + * + * @param start start offset. + * @param end end offset. + * @return @link Batch}. + */ + Batch getBatch(Optional start, Offset end); +} diff --git a/core/src/main/java/org/opensearch/sql/storage/split/Split.java b/core/src/main/java/org/opensearch/sql/storage/split/Split.java new file mode 100644 index 0000000000..e9e0c6fcc1 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/storage/split/Split.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.storage.split; + +import org.opensearch.sql.storage.StorageEngine; + +/** + * Split is a sections of a data set. Each {@link StorageEngine} should have specific + * implementation of Split. + */ +public interface Split { + + /** + * Get the split id. + * @return split id. + */ + String getSplitId(); +} diff --git a/doctest/build.gradle b/doctest/build.gradle index 8378d5ec00..c1d069e50b 100644 --- a/doctest/build.gradle +++ b/doctest/build.gradle @@ -89,7 +89,7 @@ doctest.dependsOn startOpenSearch startOpenSearch.dependsOn startPrometheus doctest.finalizedBy stopOpenSearch stopOpenSearch.finalizedBy stopPrometheus -build.dependsOn doctest +check.dependsOn doctest clean.dependsOn(cleanBootstrap) // 2.0.0-alpha1-SNAPSHOT -> 2.0.0.0-alpha1-SNAPSHOT diff --git a/filesystem/build.gradle b/filesystem/build.gradle new file mode 100644 index 0000000000..0571088132 --- /dev/null +++ b/filesystem/build.gradle @@ -0,0 +1,129 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java-library' + id "io.freefair.lombok" + id 'jacoco' +} + +ext { + hadoop = "3.3.4" + aws = "1.12.330" +} + +configurations.all { + resolutionStrategy.force "commons-io:commons-io:2.8.0" +} + +dependencies { + implementation project(':core') + // required by hadoop filesystem https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/index.html. + implementation("org.apache.hadoop:hadoop-common:${hadoop}") { + exclude group: 'org.apache.zookeeper' + exclude group: 'org.eclipse.jetty' + exclude group: 'com.sun.jersey' + exclude group: 'javax.servlet.jsp' + exclude group: 'javax.servlet' + exclude group: 'org.apache.kerby' + exclude group: 'org.apache.curator' + exclude group: 'com.google.protobuf', module: 'protobuf-java' + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'com.nimbusds', module: 'nimbus-jose-jwt' + // enforce version. + exclude group: 'com.fasterxml.woodstox', module: 'woodstox-core' + exclude group: 'commons-io', module: 'commons-io' + exclude group: 'ch.qos.reload4j', module: 'reload4j' + exclude group: 'org.apache.httpcomponents', module: 'httpcore' + } + implementation('com.fasterxml.woodstox:woodstox-core') + constraints { + implementation('com.fasterxml.woodstox:woodstox-core:6.4.0') { + because 'https://www.mend.io/vulnerability-database/CVE-2022-40156' + } + } + implementation('commons-io:commons-io') + constraints { + implementation('commons-io:commons-io:2.8.0') { + because 'between versions 2.8.0 and 2.5' + } + } + implementation('ch.qos.reload4j:reload4j') + constraints { + implementation('ch.qos.reload4j:reload4j:1.2.22') { + because 'between versions 1.2.22 and 1.2.19' + } + } + implementation('org.apache.httpcomponents:httpcore') + constraints { + implementation('org.apache.httpcomponents:httpcore:4.4.15') { + because 'between versions 4.4.15 and 4.4.13' + } + } + + testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') + testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' + testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4' + testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4' +} + +test { + useJUnitPlatform() + testLogging { + events "passed", "skipped", "failed" + exceptionFormat "full" + } + + // hadoop-fs depend on native library which is missing on windows. + // https://hadoop.apache.org/docs/r3.3.4/hadoop-project-dist/hadoop-common/NativeLibraries.html#Native_Hadoop_Library + if ('windows' == project.getProperties().getOrDefault('buildPlatform', 'linux')) { + excludes = [ + '**/FileSystemStreamSourceTest.class' + ] + } +} + +jacocoTestReport { + reports { + html.enabled true + xml.enabled true + } + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it) + })) + } +} +test.finalizedBy(project.tasks.jacocoTestReport) + +jacocoTestCoverageVerification { + violationRules { + rule { + // hadoop-fs depend on native library which is missing on windows. + // https://hadoop.apache.org/docs/r3.3.4/hadoop-project-dist/hadoop-common/NativeLibraries.html#Native_Hadoop_Library + if ('windows' == project.getProperties().getOrDefault('buildPlatform', 'linux')) { + excludes = [ + 'org.opensearch.sql.filesystem.streaming.FileSystemStreamSource' + ] + } + element = 'CLASS' + limit { + counter = 'LINE' + minimum = 1.0 + } + limit { + counter = 'BRANCH' + minimum = 1.0 + } + } + } + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it) + })) + } +} +check.dependsOn jacocoTestCoverageVerification +jacocoTestCoverageVerification.dependsOn jacocoTestReport diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java new file mode 100644 index 0000000000..7fefb11a85 --- /dev/null +++ b/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.filesystem.storage.split; + +import java.util.Set; +import java.util.UUID; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import org.apache.hadoop.fs.Path; +import org.opensearch.sql.storage.split.Split; + +@Data +public class FileSystemSplit implements Split { + + @Getter + @EqualsAndHashCode.Exclude + private final String splitId = UUID.randomUUID().toString(); + + private final Set paths; +} diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java new file mode 100644 index 0000000000..6a8c90ee80 --- /dev/null +++ b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.filesystem.streaming; + +import java.util.Set; +import lombok.Data; +import org.apache.hadoop.fs.Path; + +/** + * File metadata. Batch id associate with the set of {@link Path}. + */ +@Data +public class FileMetaData { + + private final Long batchId; + + private final Set paths; +} diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java new file mode 100644 index 0000000000..6a9639bdcb --- /dev/null +++ b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java @@ -0,0 +1,105 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.filesystem.streaming; + +import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.SneakyThrows; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.sql.executor.streaming.Batch; +import org.opensearch.sql.executor.streaming.DefaultMetadataLog; +import org.opensearch.sql.executor.streaming.MetadataLog; +import org.opensearch.sql.executor.streaming.Offset; +import org.opensearch.sql.executor.streaming.StreamingSource; +import org.opensearch.sql.filesystem.storage.split.FileSystemSplit; + +/** + * FileSystem Streaming Source use Hadoop FileSystem. + */ +public class FileSystemStreamSource implements StreamingSource { + + private static final Logger log = LogManager.getLogger(FileSystemStreamSource.class); + + private final MetadataLog fileMetaDataLog; + + private Set seenFiles; + + private final FileSystem fs; + + private final Path basePath; + + /** + * Constructor of FileSystemStreamSource. + */ + public FileSystemStreamSource(FileSystem fs, Path basePath) { + this.fs = fs; + this.basePath = basePath; + // todo, need to add state recovery + this.fileMetaDataLog = new DefaultMetadataLog<>(); + // todo, need to add state recovery + this.seenFiles = new HashSet<>(); + } + + @SneakyThrows(value = IOException.class) + @Override + public Optional getLatestOffset() { + // list all files. todo. improvement list performance. + Set allFiles = + Arrays.stream(fs.listStatus(basePath)) + .filter(status -> !status.isDirectory()) + .map(FileStatus::getPath) + .collect(Collectors.toSet()); + + // find unread files. + log.debug("all files {}", allFiles); + Set unread = Sets.difference(allFiles, seenFiles); + + // update seenFiles. + seenFiles = allFiles; + log.debug("seen files {}", seenFiles); + + Optional latestBatchIdOptional = fileMetaDataLog.getLatest().map(Pair::getKey); + if (!unread.isEmpty()) { + long latestBatchId = latestBatchIdOptional.map(id -> id + 1).orElse(0L); + fileMetaDataLog.add(latestBatchId, new FileMetaData(latestBatchId, unread)); + log.debug("latestBatchId {}", latestBatchId); + return Optional.of(new Offset(latestBatchId)); + } else { + log.debug("no unread data"); + Optional offset = + latestBatchIdOptional.isEmpty() + ? Optional.empty() + : Optional.of(new Offset(latestBatchIdOptional.get())); + log.debug("return empty offset {}", offset); + return offset; + } + } + + @Override + public Batch getBatch(Optional start, Offset end) { + Long startBatchId = start.map(Offset::getOffset).map(id -> id + 1).orElse(0L); + Long endBatchId = end.getOffset(); + + Set paths = + fileMetaDataLog.get(Optional.of(startBatchId), Optional.of(endBatchId)).stream() + .map(FileMetaData::getPaths) + .flatMap(Set::stream) + .collect(Collectors.toSet()); + + log.debug("fetch files {} with id from: {} to: {}.", paths, start, end); + return new Batch(new FileSystemSplit(paths)); + } +} diff --git a/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java b/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java new file mode 100644 index 0000000000..fba038f6a3 --- /dev/null +++ b/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java @@ -0,0 +1,166 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.filesystem.streaming; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.executor.streaming.Offset; +import org.opensearch.sql.filesystem.storage.split.FileSystemSplit; +import org.opensearch.sql.storage.split.Split; + +@ExtendWith(MockitoExtension.class) +class FileSystemStreamSourceTest { + + @TempDir Path perTestTempDir; + + FileSystemStreamSource streamSource; + + /** + * use hadoop default filesystem. it only works on unix-like system. for running on windows, it + * require native library. Reference. + * https://hadoop.apache.org/docs/r3.3.4/hadoop-project-dist/hadoop-common/NativeLibraries.html + */ + @BeforeEach + void setup() throws IOException { + streamSource = + new FileSystemStreamSource( + FileSystem.get(new Configuration()), + new org.apache.hadoop.fs.Path(perTestTempDir.toUri())); + } + + @Test + void addOneFileToSource() throws IOException { + emptySource().addFile("log1").latestOffsetShouldBe(0L).batchFromStart("log1"); + } + + @Test + void addMultipleFileInSequence() throws IOException { + emptySource() + .addFile("log1") + .latestOffsetShouldBe(0L) + .batchFromStart("log1") + .addFile("log2") + .latestOffsetShouldBe(1L) + .batchFromStart("log1", "log2") + .batchInBetween(0L, 1L, "log2"); + } + + @Test + void latestOffsetShouldSameIfNoNewFileAdded() throws IOException { + emptySource() + .addFile("log1") + .latestOffsetShouldBe(0L) + .batchFromStart("log1") + .latestOffsetShouldBe(0L) + .batchFromStart("log1"); + } + + @Test + void latestOffsetIsEmptyIfNoFilesInSource() { + emptySource().noOffset(); + } + + @Test + void dirIsFiltered() throws IOException { + emptySource() + .addFile("log1") + .latestOffsetShouldBe(0L) + .addDir("dir1") + .latestOffsetShouldBe(0L) + .batchFromStart("log1"); + } + + @Test + void sneakThrowException() throws IOException { + FileSystem fs = Mockito.mock(FileSystem.class); + doThrow(IOException.class).when(fs).listStatus(any(org.apache.hadoop.fs.Path.class)); + + streamSource = + new FileSystemStreamSource(fs, + new org.apache.hadoop.fs.Path(perTestTempDir.toUri())); + assertThrows(IOException.class, () -> streamSource.getLatestOffset()); + } + + StreamSource emptySource() { + return new StreamSource(); + } + + private class StreamSource { + + StreamSource addFile(String filename) throws IOException { + Path file = Files.createFile(perTestTempDir.resolve(filename)); + assertTrue(file.toFile().exists()); + + return this; + } + + StreamSource addDir(String dirname) throws IOException { + Path dir = Files.createDirectory(perTestTempDir.resolve(dirname)); + assertTrue(dir.toFile().isDirectory()); + + return this; + } + + StreamSource noOffset() { + assertFalse(streamSource.getLatestOffset().isPresent()); + + return this; + } + + StreamSource latestOffsetShouldBe(Long offset) { + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(offset), latestOffset.get()); + + return this; + } + + StreamSource batchFromStart(String... uris) { + assertTrue(streamSource.getLatestOffset().isPresent()); + internalBatchInBetween(Optional.empty(), streamSource.getLatestOffset().get(), uris); + + return this; + } + + StreamSource batchInBetween(Long start, Long end, String... uris) { + internalBatchInBetween(Optional.of(new Offset(start)), new Offset(end), uris); + + return this; + } + + private StreamSource internalBatchInBetween( + Optional start, Offset end, String... uris) { + Split split = streamSource.getBatch(start, end).getSplit(); + assertThat( + ((FileSystemSplit) split).getPaths(), + containsInAnyOrder( + Arrays.stream(uris) + .map(name -> new org.apache.hadoop.fs.Path(perTestTempDir.resolve(name).toUri())) + .toArray())); + return this; + } + } +} diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 11ba5542fd..7a2e5cd406 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -68,6 +68,10 @@ configurations.all { resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib:1.6.0" resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-common:1.6.0" resolutionStrategy.force "com.squareup.okhttp3:okhttp:4.9.3" + resolutionStrategy.force "org.apache.commons:commons-math3:3.6.1" + resolutionStrategy.force "org.apache.commons:commons-lang3:3.12.0" + resolutionStrategy.force "joda-time:joda-time:2.10.12" + resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36" } dependencies { diff --git a/plugin/build.gradle b/plugin/build.gradle index 6a0900c3cc..4754292216 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -93,6 +93,10 @@ configurations.all { resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib:1.6.0" resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-common:1.6.0" resolutionStrategy.force "com.squareup.okhttp3:okhttp:4.9.3" + resolutionStrategy.force "org.apache.commons:commons-math3:3.6.1" + resolutionStrategy.force "org.apache.commons:commons-lang3:3.12.0" + resolutionStrategy.force "joda-time:joda-time:2.10.12" + resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36" } compileJava { options.compilerArgs.addAll(["-processor", 'lombok.launch.AnnotationProcessorHider$AnnotationProcessor']) @@ -108,11 +112,11 @@ dependencies { api "com.fasterxml.jackson.core:jackson-databind:${jackson_databind_version}" api "com.fasterxml.jackson.core:jackson-annotations:${jackson_version}" - api project(":ppl") api project(':legacy') api project(':opensearch') api project(':prometheus') + api project(':filesystem') } test { diff --git a/settings.gradle b/settings.gradle index 2f850f422b..7650959451 100644 --- a/settings.gradle +++ b/settings.gradle @@ -18,4 +18,5 @@ include 'doctest' include 'legacy' include 'sql' include 'prometheus' +include 'filesystem'