diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/Batch.java b/core/src/main/java/org/opensearch/sql/executor/streaming/Batch.java new file mode 100644 index 0000000000..7c27ab4622 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/Batch.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.streaming; + +import java.util.List; +import lombok.Data; +import org.opensearch.sql.storage.split.Split; + +/** + * A batch of streaming execution. + */ +@Data +public class Batch { + private final List splits; +} diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/Offset.java b/core/src/main/java/org/opensearch/sql/executor/streaming/Offset.java new file mode 100644 index 0000000000..00f040e437 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/Offset.java @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.streaming; + +import lombok.Data; + +/** + * Offset. + */ +@Data +public class Offset { + + private final Long offset; +} diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/StreamingSource.java b/core/src/main/java/org/opensearch/sql/executor/streaming/StreamingSource.java new file mode 100644 index 0000000000..ebd3fa714b --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/StreamingSource.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.streaming; + +import java.util.Optional; + +/** + * Streaming source. + */ +public interface StreamingSource { + /** + * Get current {@link Offset} of stream data. + * + * @return empty if the stream does not has new data. + */ + Optional getLatestOffset(); + + /** + * Get a {@link Batch} from source between (start, end]. + * + * @param start start offset. + * @param end end offset. + * @return @link Batch}. + */ + Batch getBatch(Optional start, Offset end); +} diff --git a/core/src/main/java/org/opensearch/sql/storage/split/Split.java b/core/src/main/java/org/opensearch/sql/storage/split/Split.java new file mode 100644 index 0000000000..e9e0c6fcc1 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/storage/split/Split.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.storage.split; + +import org.opensearch.sql.storage.StorageEngine; + +/** + * Split is a sections of a data set. Each {@link StorageEngine} should have specific + * implementation of Split. + */ +public interface Split { + + /** + * Get the split id. + * @return split id. + */ + String getSplitId(); +} diff --git a/filesystem/build.gradle b/filesystem/build.gradle new file mode 100644 index 0000000000..64659d85d3 --- /dev/null +++ b/filesystem/build.gradle @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java-library' + id "io.freefair.lombok" + id 'jacoco' +} + +ext { + hadoop = "3.3.4" + aws = "1.12.330" +} + + +dependencies { + implementation project(':core') + + testImplementation "org.junit.jupiter:junit-jupiter:${junit_jupiter}" + testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' + testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4' + testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4' +} + +test { + useJUnitPlatform() + testLogging { + events "passed", "skipped", "failed" + exceptionFormat "full" + } +} + +jacocoTestReport { + reports { + html.enabled true + xml.enabled true + } + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it) + })) + } +} +test.finalizedBy(project.tasks.jacocoTestReport) + +jacocoTestCoverageVerification { + violationRules { + rule { + element = 'CLASS' + limit { + counter = 'LINE' + minimum = 1.0 + } + limit { + counter = 'BRANCH' + minimum = 1.0 + } + } + } + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it) + })) + } +} +check.dependsOn jacocoTestCoverageVerification +jacocoTestCoverageVerification.dependsOn jacocoTestReport diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java new file mode 100644 index 0000000000..695af94fe4 --- /dev/null +++ b/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.filesystem.storage.split; + +import java.nio.file.Path; +import java.util.Set; +import java.util.UUID; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import org.opensearch.sql.storage.split.Split; + +@Data +public class FileSystemSplit implements Split { + + @Getter + @EqualsAndHashCode.Exclude + private final String splitId = UUID.randomUUID().toString(); + + private final Set paths; +} diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java new file mode 100644 index 0000000000..24d2a822cd --- /dev/null +++ b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.filesystem.streaming; + +import java.nio.file.Path; +import java.util.Set; +import lombok.Data; + +/** + * File metadata. Batch id associate with the set of {@link Path}. + */ +@Data +public class FileMetaData { + + private final Long batchId; + + private final Set paths; +} diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java new file mode 100644 index 0000000000..9207583c5b --- /dev/null +++ b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java @@ -0,0 +1,103 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.filesystem.streaming; + +import com.google.common.collect.Sets; +import java.io.File; +import java.nio.file.FileSystem; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.sql.executor.streaming.Batch; +import org.opensearch.sql.executor.streaming.DefaultMetadataLog; +import org.opensearch.sql.executor.streaming.MetadataLog; +import org.opensearch.sql.executor.streaming.Offset; +import org.opensearch.sql.executor.streaming.StreamingSource; +import org.opensearch.sql.filesystem.storage.split.FileSystemSplit; + +/** + * FileSystem Streaming Source use Hadoop FileSystem. + */ +public class FileSystemStreamSource implements StreamingSource { + + private static final Logger log = LogManager.getLogger(FileSystemStreamSource.class); + + private final MetadataLog fileMetaDataLog; + + private Set seenFiles; + + private final FileSystem fs; + + private final String basePath; + + /** + * Constructor of FileSystemStreamSource. + */ + public FileSystemStreamSource(FileSystem fs, String basePath) { + this.fs = fs; + this.basePath = basePath; + // todo, need to add state recovery + this.fileMetaDataLog = new DefaultMetadataLog<>(); + // todo, need to add state recovery + this.seenFiles = new HashSet<>(); + } + + @Override + public Optional getLatestOffset() { + // list all files. todo. improvement list performance. + Set allFiles = + Arrays.stream(fs.getPath(basePath).toFile().listFiles()) + .filter(file -> !file.isDirectory()) + .map(File::toPath) + .collect(Collectors.toSet()); + + // find unread files. + log.debug("all files {}", allFiles); + Set unread = Sets.difference(allFiles, seenFiles); + + // update seenFiles. + seenFiles = allFiles; + log.debug("seen files {}", seenFiles); + + Optional latestBatchIdOptional = fileMetaDataLog.getLatest().map(Pair::getKey); + if (!unread.isEmpty()) { + long latestBatchId = latestBatchIdOptional.map(id -> id + 1).orElse(0L); + fileMetaDataLog.add(latestBatchId, new FileMetaData(latestBatchId, unread)); + log.debug("latestBatchId {}", latestBatchId); + return Optional.of(new Offset(latestBatchId)); + } else { + log.debug("no unread data"); + Optional offset = + latestBatchIdOptional.isEmpty() + ? Optional.empty() + : Optional.of(new Offset(latestBatchIdOptional.get())); + log.debug("return empty offset {}", offset); + return offset; + } + } + + @Override + public Batch getBatch(Optional start, Offset end) { + Long startBatchId = start.map(Offset::getOffset).map(id -> id + 1).orElse(0L); + Long endBatchId = end.getOffset(); + + Set paths = + fileMetaDataLog.get(Optional.of(startBatchId), Optional.of(endBatchId)).stream() + .map(FileMetaData::getPaths) + .flatMap(Set::stream) + .collect(Collectors.toSet()); + + log.debug("fetch files {} with id from: {} to: {}.", paths, start, end); + return new Batch(Collections.singletonList(new FileSystemSplit(paths))); + } +} diff --git a/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java b/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java new file mode 100644 index 0000000000..537fd10c9f --- /dev/null +++ b/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java @@ -0,0 +1,155 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.filesystem.streaming; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.executor.streaming.Batch; +import org.opensearch.sql.executor.streaming.Offset; +import org.opensearch.sql.filesystem.storage.split.FileSystemSplit; + +@ExtendWith(MockitoExtension.class) +class FileSystemStreamSourceTest { + + @TempDir + Path perTestTempDir; + + FileSystemStreamSource streamSource; + + @BeforeEach + void setup() { + streamSource = + new FileSystemStreamSource( + FileSystems.getDefault(), + perTestTempDir.toString()); + } + + @Test + void getBatchFromFolder() throws IOException { + Path file = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); + assertTrue(file.toFile().exists()); + + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(0L), latestOffset.get()); + + // fetch batch (empty, latestOffset] + assertEquals( + Collections.singletonList( + new FileSystemSplit(ImmutableSet.of(file))), + streamSource.getBatch(Optional.empty(), latestOffset.get()).getSplits()); + } + + @Test + void latestOffsetShouldIncreaseIfNoNewFileAdded() throws IOException { + Path file1 = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); + assertTrue(file1.toFile().exists()); + + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(0L), latestOffset.get()); + + Path file2 = Files.createFile(perTestTempDir.resolve("log.2022.01.02")); + assertTrue(file2.toFile().exists()); + + latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(1L), latestOffset.get()); + + // fetch batch (empty, 1L] + assertBatchEquals( + ImmutableList.of(file1, file2), + streamSource.getBatch(Optional.empty(), latestOffset.get())); + + // fetch batch (empty, 0L] + assertBatchEquals( + ImmutableList.of(file1), streamSource.getBatch(Optional.empty(), new Offset(0L))); + + // fetch batch (0L, 1L] + assertBatchEquals( + ImmutableList.of(file2), + streamSource.getBatch(Optional.of(new Offset(0L)), new Offset(1L))); + } + + @Test + void latestOffsetShouldSameIfNoNewFileAdded() throws IOException { + Path file1 = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); + assertTrue(file1.toFile().exists()); + + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(0L), latestOffset.get()); + + // no new files. + latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(0L), latestOffset.get()); + } + + @Test + void latestOffsetIsEmptyIfNoFilesInSource() { + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isEmpty()); + } + + @Test + void getBatchOutOfRange() throws IOException { + Path file = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); + assertTrue(file.toFile().exists()); + + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(0L), latestOffset.get()); + + assertEquals( + Collections.singletonList( + new FileSystemSplit(ImmutableSet.of(file))), + streamSource.getBatch(Optional.empty(), latestOffset.get()).getSplits()); + } + + @Test + void dirIsFiltered() throws IOException { + Path file = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); + assertTrue(file.toFile().exists()); + + Path dir = Files.createDirectory(perTestTempDir.resolve("logDir")); + assertTrue(dir.toFile().isDirectory()); + + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(0L), latestOffset.get()); + + // fetch batch (empty, latestOffset] + assertEquals( + Collections.singletonList( + new FileSystemSplit(ImmutableSet.of(file))), + streamSource.getBatch(Optional.empty(), latestOffset.get()).getSplits()); + } + + void assertBatchEquals(List expectedFiles, Batch batch) { + assertEquals(1, batch.getSplits().size()); + assertThat( + ((FileSystemSplit) batch.getSplits().get(0)).getPaths(), + containsInAnyOrder(expectedFiles.toArray())); + } +} diff --git a/settings.gradle b/settings.gradle index 2f850f422b..7650959451 100644 --- a/settings.gradle +++ b/settings.gradle @@ -18,4 +18,5 @@ include 'doctest' include 'legacy' include 'sql' include 'prometheus' +include 'filesystem'