From 6a90fe63699723c406cbca9e3d34eddcf92bcc67 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Wed, 18 Jan 2023 08:56:42 -0800 Subject: [PATCH] Remove in development filesystem support (#1284) Signed-off-by: Peng Huo --- DEVELOPER_GUIDE.rst | 2 - filesystem/build.gradle | 148 ------------- .../storage/split/FileSystemSplit.java | 24 --- .../filesystem/streaming/FileMetaData.java | 21 -- .../streaming/FileSystemStreamSource.java | 105 ---------- .../streaming/FileSystemStreamSourceTest.java | 166 --------------- .../storage/FSDataSourceFactory.java | 33 --- .../filesystem/storage/FSStorageEngine.java | 48 ----- .../sql/filesystem/storage/FSTable.java | 127 ------------ integ-test/build.gradle | 4 - .../opensearch/sql/ppl/StreamingQueryIT.java | 194 ------------------ plugin/build.gradle | 3 - settings.gradle | 2 - 13 files changed, 877 deletions(-) delete mode 100644 filesystem/build.gradle delete mode 100644 filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java delete mode 100644 filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java delete mode 100644 filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java delete mode 100644 filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java delete mode 100644 filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSDataSourceFactory.java delete mode 100644 filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSStorageEngine.java delete mode 100644 filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSTable.java delete mode 100644 integ-test/src/test/java/org/opensearch/sql/ppl/StreamingQueryIT.java diff --git a/DEVELOPER_GUIDE.rst b/DEVELOPER_GUIDE.rst index 83e2c398c5..516cf23556 100644 --- a/DEVELOPER_GUIDE.rst +++ b/DEVELOPER_GUIDE.rst @@ -141,7 +141,6 @@ The plugin codebase is in standard layout of Gradle project:: ├── core ├── doctest ├── opensearch - ├── filesystem ├── prometheus ├── integ-test ├── legacy @@ -162,7 +161,6 @@ Here are sub-folders (Gradle modules) for plugin source code: - ``core``: core query engine. - ``opensearch``: OpenSearch storage engine. - ``prometheus``: Prometheus storage engine. -- ``filesystem``: File System storage engine (in development). - ``protocol``: request/response protocol formatter. - ``common``: common util code. - ``integ-test``: integration and comparison test. diff --git a/filesystem/build.gradle b/filesystem/build.gradle deleted file mode 100644 index b76c0d517c..0000000000 --- a/filesystem/build.gradle +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -plugins { - id 'java-library' - id "io.freefair.lombok" - id 'jacoco' - id 'java-test-fixtures' -} - -ext { - hadoop = "3.3.4" - aws = "1.12.330" -} - -configurations.all { - resolutionStrategy.force "commons-io:commons-io:2.8.0" -} - -dependencies { - implementation project(':core') - // required by hadoop filesystem https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/index.html. - implementation("org.apache.hadoop:hadoop-common:${hadoop}") { - exclude group: 'org.apache.zookeeper' - exclude group: 'org.eclipse.jetty' - exclude group: 'com.sun.jersey' - exclude group: 'javax.servlet.jsp' - exclude group: 'javax.servlet' - exclude group: 'org.apache.kerby' - exclude group: 'org.apache.curator' - exclude group: 'com.google.protobuf', module: 'protobuf-java' - exclude group: 'org.apache.avro', module: 'avro' - exclude group: 'com.nimbusds', module: 'nimbus-jose-jwt' - // enforce version. - exclude group: 'com.fasterxml.woodstox', module: 'woodstox-core' - exclude group: 'commons-io', module: 'commons-io' - exclude group: 'ch.qos.reload4j', module: 'reload4j' - exclude group: 'org.apache.httpcomponents', module: 'httpcore' - } - implementation('com.fasterxml.woodstox:woodstox-core') - constraints { - implementation('com.fasterxml.woodstox:woodstox-core:6.4.0') { - because 'https://www.mend.io/vulnerability-database/CVE-2022-40156' - } - } - implementation('commons-io:commons-io') - constraints { - implementation('commons-io:commons-io:2.8.0') { - because 'between versions 2.8.0 and 2.5' - } - } - implementation('ch.qos.reload4j:reload4j') - constraints { - implementation('ch.qos.reload4j:reload4j:1.2.22') { - because 'between versions 1.2.22 and 1.2.19' - } - } - implementation('org.apache.httpcomponents:httpcore') - constraints { - implementation('org.apache.httpcomponents:httpcore:4.4.15') { - because 'between versions 4.4.15 and 4.4.13' - } - } - - testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') - testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' - testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4' - testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4' - testFixturesImplementation(project(":core")) - testFixturesImplementation("org.apache.hadoop:hadoop-common:${hadoop}") { - exclude group: 'org.apache.zookeeper' - exclude group: 'org.eclipse.jetty' - exclude group: 'com.sun.jersey' - exclude group: 'javax.servlet.jsp' - exclude group: 'javax.servlet' - exclude group: 'org.apache.kerby' - exclude group: 'org.apache.curator' - exclude group: 'com.google.protobuf', module: 'protobuf-java' - exclude group: 'org.apache.avro', module: 'avro' - exclude group: 'com.nimbusds', module: 'nimbus-jose-jwt' - // enforce version. - exclude group: 'com.fasterxml.woodstox', module: 'woodstox-core' - exclude group: 'commons-io', module: 'commons-io' - exclude group: 'ch.qos.reload4j', module: 'reload4j' - exclude group: 'org.apache.httpcomponents', module: 'httpcore' - } -} - -test { - useJUnitPlatform() - testLogging { - events "passed", "skipped", "failed" - exceptionFormat "full" - } - - // hadoop-fs depend on native library which is missing on windows. - // https://hadoop.apache.org/docs/r3.3.4/hadoop-project-dist/hadoop-common/NativeLibraries.html#Native_Hadoop_Library - if ('windows' == project.getProperties().getOrDefault('buildPlatform', 'linux')) { - excludes = [ - '**/FileSystemStreamSourceTest.class' - ] - } -} - -jacocoTestReport { - reports { - html.enabled true - xml.enabled true - } - afterEvaluate { - classDirectories.setFrom(files(classDirectories.files.collect { - fileTree(dir: it) - })) - } -} -test.finalizedBy(project.tasks.jacocoTestReport) - -jacocoTestCoverageVerification { - violationRules { - rule { - // hadoop-fs depend on native library which is missing on windows. - // https://hadoop.apache.org/docs/r3.3.4/hadoop-project-dist/hadoop-common/NativeLibraries.html#Native_Hadoop_Library - if ('windows' == project.getProperties().getOrDefault('buildPlatform', 'linux')) { - excludes = [ - 'org.opensearch.sql.filesystem.streaming.FileSystemStreamSource' - ] - } - element = 'CLASS' - limit { - counter = 'LINE' - minimum = 1.0 - } - limit { - counter = 'BRANCH' - minimum = 1.0 - } - } - } - afterEvaluate { - classDirectories.setFrom(files(classDirectories.files.collect { - fileTree(dir: it) - })) - } -} -check.dependsOn jacocoTestCoverageVerification -jacocoTestCoverageVerification.dependsOn jacocoTestReport diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java deleted file mode 100644 index 7fefb11a85..0000000000 --- a/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.filesystem.storage.split; - -import java.util.Set; -import java.util.UUID; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import org.apache.hadoop.fs.Path; -import org.opensearch.sql.storage.split.Split; - -@Data -public class FileSystemSplit implements Split { - - @Getter - @EqualsAndHashCode.Exclude - private final String splitId = UUID.randomUUID().toString(); - - private final Set paths; -} diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java deleted file mode 100644 index 6a8c90ee80..0000000000 --- a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.filesystem.streaming; - -import java.util.Set; -import lombok.Data; -import org.apache.hadoop.fs.Path; - -/** - * File metadata. Batch id associate with the set of {@link Path}. - */ -@Data -public class FileMetaData { - - private final Long batchId; - - private final Set paths; -} diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java deleted file mode 100644 index 6a9639bdcb..0000000000 --- a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.filesystem.streaming; - -import com.google.common.collect.Sets; -import java.io.IOException; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; -import lombok.SneakyThrows; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.sql.executor.streaming.Batch; -import org.opensearch.sql.executor.streaming.DefaultMetadataLog; -import org.opensearch.sql.executor.streaming.MetadataLog; -import org.opensearch.sql.executor.streaming.Offset; -import org.opensearch.sql.executor.streaming.StreamingSource; -import org.opensearch.sql.filesystem.storage.split.FileSystemSplit; - -/** - * FileSystem Streaming Source use Hadoop FileSystem. - */ -public class FileSystemStreamSource implements StreamingSource { - - private static final Logger log = LogManager.getLogger(FileSystemStreamSource.class); - - private final MetadataLog fileMetaDataLog; - - private Set seenFiles; - - private final FileSystem fs; - - private final Path basePath; - - /** - * Constructor of FileSystemStreamSource. - */ - public FileSystemStreamSource(FileSystem fs, Path basePath) { - this.fs = fs; - this.basePath = basePath; - // todo, need to add state recovery - this.fileMetaDataLog = new DefaultMetadataLog<>(); - // todo, need to add state recovery - this.seenFiles = new HashSet<>(); - } - - @SneakyThrows(value = IOException.class) - @Override - public Optional getLatestOffset() { - // list all files. todo. improvement list performance. - Set allFiles = - Arrays.stream(fs.listStatus(basePath)) - .filter(status -> !status.isDirectory()) - .map(FileStatus::getPath) - .collect(Collectors.toSet()); - - // find unread files. - log.debug("all files {}", allFiles); - Set unread = Sets.difference(allFiles, seenFiles); - - // update seenFiles. - seenFiles = allFiles; - log.debug("seen files {}", seenFiles); - - Optional latestBatchIdOptional = fileMetaDataLog.getLatest().map(Pair::getKey); - if (!unread.isEmpty()) { - long latestBatchId = latestBatchIdOptional.map(id -> id + 1).orElse(0L); - fileMetaDataLog.add(latestBatchId, new FileMetaData(latestBatchId, unread)); - log.debug("latestBatchId {}", latestBatchId); - return Optional.of(new Offset(latestBatchId)); - } else { - log.debug("no unread data"); - Optional offset = - latestBatchIdOptional.isEmpty() - ? Optional.empty() - : Optional.of(new Offset(latestBatchIdOptional.get())); - log.debug("return empty offset {}", offset); - return offset; - } - } - - @Override - public Batch getBatch(Optional start, Offset end) { - Long startBatchId = start.map(Offset::getOffset).map(id -> id + 1).orElse(0L); - Long endBatchId = end.getOffset(); - - Set paths = - fileMetaDataLog.get(Optional.of(startBatchId), Optional.of(endBatchId)).stream() - .map(FileMetaData::getPaths) - .flatMap(Set::stream) - .collect(Collectors.toSet()); - - log.debug("fetch files {} with id from: {} to: {}.", paths, start, end); - return new Batch(new FileSystemSplit(paths)); - } -} diff --git a/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java b/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java deleted file mode 100644 index fba038f6a3..0000000000 --- a/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.filesystem.streaming; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doThrow; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Arrays; -import java.util.Optional; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.io.TempDir; -import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.executor.streaming.Offset; -import org.opensearch.sql.filesystem.storage.split.FileSystemSplit; -import org.opensearch.sql.storage.split.Split; - -@ExtendWith(MockitoExtension.class) -class FileSystemStreamSourceTest { - - @TempDir Path perTestTempDir; - - FileSystemStreamSource streamSource; - - /** - * use hadoop default filesystem. it only works on unix-like system. for running on windows, it - * require native library. Reference. - * https://hadoop.apache.org/docs/r3.3.4/hadoop-project-dist/hadoop-common/NativeLibraries.html - */ - @BeforeEach - void setup() throws IOException { - streamSource = - new FileSystemStreamSource( - FileSystem.get(new Configuration()), - new org.apache.hadoop.fs.Path(perTestTempDir.toUri())); - } - - @Test - void addOneFileToSource() throws IOException { - emptySource().addFile("log1").latestOffsetShouldBe(0L).batchFromStart("log1"); - } - - @Test - void addMultipleFileInSequence() throws IOException { - emptySource() - .addFile("log1") - .latestOffsetShouldBe(0L) - .batchFromStart("log1") - .addFile("log2") - .latestOffsetShouldBe(1L) - .batchFromStart("log1", "log2") - .batchInBetween(0L, 1L, "log2"); - } - - @Test - void latestOffsetShouldSameIfNoNewFileAdded() throws IOException { - emptySource() - .addFile("log1") - .latestOffsetShouldBe(0L) - .batchFromStart("log1") - .latestOffsetShouldBe(0L) - .batchFromStart("log1"); - } - - @Test - void latestOffsetIsEmptyIfNoFilesInSource() { - emptySource().noOffset(); - } - - @Test - void dirIsFiltered() throws IOException { - emptySource() - .addFile("log1") - .latestOffsetShouldBe(0L) - .addDir("dir1") - .latestOffsetShouldBe(0L) - .batchFromStart("log1"); - } - - @Test - void sneakThrowException() throws IOException { - FileSystem fs = Mockito.mock(FileSystem.class); - doThrow(IOException.class).when(fs).listStatus(any(org.apache.hadoop.fs.Path.class)); - - streamSource = - new FileSystemStreamSource(fs, - new org.apache.hadoop.fs.Path(perTestTempDir.toUri())); - assertThrows(IOException.class, () -> streamSource.getLatestOffset()); - } - - StreamSource emptySource() { - return new StreamSource(); - } - - private class StreamSource { - - StreamSource addFile(String filename) throws IOException { - Path file = Files.createFile(perTestTempDir.resolve(filename)); - assertTrue(file.toFile().exists()); - - return this; - } - - StreamSource addDir(String dirname) throws IOException { - Path dir = Files.createDirectory(perTestTempDir.resolve(dirname)); - assertTrue(dir.toFile().isDirectory()); - - return this; - } - - StreamSource noOffset() { - assertFalse(streamSource.getLatestOffset().isPresent()); - - return this; - } - - StreamSource latestOffsetShouldBe(Long offset) { - Optional latestOffset = streamSource.getLatestOffset(); - assertTrue(latestOffset.isPresent()); - assertEquals(new Offset(offset), latestOffset.get()); - - return this; - } - - StreamSource batchFromStart(String... uris) { - assertTrue(streamSource.getLatestOffset().isPresent()); - internalBatchInBetween(Optional.empty(), streamSource.getLatestOffset().get(), uris); - - return this; - } - - StreamSource batchInBetween(Long start, Long end, String... uris) { - internalBatchInBetween(Optional.of(new Offset(start)), new Offset(end), uris); - - return this; - } - - private StreamSource internalBatchInBetween( - Optional start, Offset end, String... uris) { - Split split = streamSource.getBatch(start, end).getSplit(); - assertThat( - ((FileSystemSplit) split).getPaths(), - containsInAnyOrder( - Arrays.stream(uris) - .map(name -> new org.apache.hadoop.fs.Path(perTestTempDir.resolve(name).toUri())) - .toArray())); - return this; - } - } -} diff --git a/filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSDataSourceFactory.java b/filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSDataSourceFactory.java deleted file mode 100644 index 6b700aa6d4..0000000000 --- a/filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSDataSourceFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.filesystem.storage; - -import java.net.URI; -import java.util.concurrent.atomic.AtomicInteger; -import lombok.RequiredArgsConstructor; -import org.opensearch.sql.datasource.model.DataSource; -import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.datasource.model.DataSourceType; -import org.opensearch.sql.storage.DataSourceFactory; - -@RequiredArgsConstructor -public class FSDataSourceFactory implements DataSourceFactory { - - private final URI basePath; - - private final AtomicInteger result; - - @Override - public DataSourceType getDataSourceType() { - return DataSourceType.FILESYSTEM; - } - - @Override - public DataSource createDataSource(DataSourceMetadata metadata) { - return new DataSource( - metadata.getName(), DataSourceType.FILESYSTEM, new FSStorageEngine(basePath, result)); - } -} diff --git a/filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSStorageEngine.java b/filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSStorageEngine.java deleted file mode 100644 index 0826b4c12f..0000000000 --- a/filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSStorageEngine.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.filesystem.storage; - -import java.net.URI; -import java.util.concurrent.atomic.AtomicInteger; -import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.opensearch.sql.DataSourceSchemaName; -import org.opensearch.sql.storage.StorageEngine; -import org.opensearch.sql.storage.Table; - -/** FileSystem StorageEngine. Used for testing purpose. */ -@RequiredArgsConstructor -public class FSStorageEngine implements StorageEngine { - - private final FileSystem fs; - - private final Path basePath; - - private final AtomicInteger result; - - /** - * constructor. - */ - @SneakyThrows - public FSStorageEngine(URI basePath, AtomicInteger result) { - this.fs = FileSystem.get(new Configuration()); - this.basePath = new Path(basePath); - this.result = result; - } - - @Override - public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName) { - return new FSTable(fs, basePath, result); - } - - @SneakyThrows - public void close() { - fs.close(); - } -} diff --git a/filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSTable.java b/filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSTable.java deleted file mode 100644 index 45f8fc1104..0000000000 --- a/filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSTable.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.filesystem.storage; - -import com.google.common.collect.ImmutableMap; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.opensearch.sql.data.model.ExprValue; -import org.opensearch.sql.data.model.ExprValueUtils; -import org.opensearch.sql.data.type.ExprCoreType; -import org.opensearch.sql.data.type.ExprType; -import org.opensearch.sql.executor.streaming.StreamingSource; -import org.opensearch.sql.filesystem.storage.split.FileSystemSplit; -import org.opensearch.sql.filesystem.streaming.FileSystemStreamSource; -import org.opensearch.sql.planner.logical.LogicalPlan; -import org.opensearch.sql.planner.physical.PhysicalPlan; -import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor; -import org.opensearch.sql.storage.Table; -import org.opensearch.sql.storage.TableScanOperator; -import org.opensearch.sql.storage.split.Split; - -/** - * FileSystem Table. Used for testing purpose. - */ -@RequiredArgsConstructor -public class FSTable implements Table { - private final FileSystem fs; - - private final Path basePath; - - private final AtomicInteger result; - - @Override - public Map getFieldTypes() { - return ImmutableMap.of("value", ExprCoreType.INTEGER); - } - - @Override - public PhysicalPlan implement(LogicalPlan plan) { - return new Output(new FSScan(fs)); - } - - @Override - public StreamingSource asStreamingSource() { - return new FileSystemStreamSource(fs, basePath); - } - - @RequiredArgsConstructor - class Output extends PhysicalPlan { - private final PhysicalPlan input; - - @Override - public void open() { - while (input.hasNext()) { - result.addAndGet(input.next().integerValue()); - } - } - - @Override - public boolean hasNext() { - return false; - } - - @Override - public ExprValue next() { - throw new UnsupportedOperationException(); - } - - // todo, need to refactor physical plan interface. - @Override - public R accept(PhysicalPlanNodeVisitor visitor, C context) { - throw new UnsupportedOperationException(); - } - - @Override - public List getChild() { - return Collections.singletonList(input); - } - } - - @RequiredArgsConstructor - class FSScan extends TableScanOperator { - - private final FileSystem fs; - - private Iterator paths; - - @Override - public String explain() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean hasNext() { - return paths.hasNext(); - } - - @SneakyThrows(IOException.class) - @Override - public ExprValue next() { - BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(paths.next()))); - // every file only contain one line. - ExprValue result = ExprValueUtils.integerValue(Integer.valueOf(reader.readLine())); - - reader.close(); - return result; - } - - @Override - public void add(Split split) { - paths = ((FileSystemSplit) split).getPaths().iterator(); - } - } -} diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 9f177efde5..c948dc77cf 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -72,8 +72,6 @@ configurations.all { resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-common:1.6.0" resolutionStrategy.force "com.squareup.okhttp3:okhttp:4.9.3" resolutionStrategy.force "org.apache.httpcomponents:httpcore:4.4.13" - resolutionStrategy.force "org.apache.commons:commons-math3:3.6.1" - resolutionStrategy.force "org.apache.commons:commons-lang3:3.12.0" resolutionStrategy.force "joda-time:joda-time:2.10.12" resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36" } @@ -93,8 +91,6 @@ dependencies { testImplementation group: 'com.h2database', name: 'h2', version: '2.1.214' testImplementation group: 'org.xerial', name: 'sqlite-jdbc', version: '3.32.3.3' testImplementation group: 'com.google.code.gson', name: 'gson', version: '2.8.9' - testImplementation(testFixtures(project(":core"))) - testImplementation(testFixtures(project(":filesystem"))) } dependencyLicenses.enabled = false diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/StreamingQueryIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/StreamingQueryIT.java deleted file mode 100644 index 50929782c8..0000000000 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/StreamingQueryIT.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.ppl; - -import com.carrotsearch.randomizedtesting.ThreadFilter; -import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.util.Comparator; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import lombok.Getter; -import org.junit.After; -import org.junit.Test; -import org.opensearch.sql.analysis.Analyzer; -import org.opensearch.sql.analysis.ExpressionAnalyzer; -import org.opensearch.sql.ast.dsl.AstDSL; -import org.opensearch.sql.common.response.ResponseListener; -import org.opensearch.sql.datasource.DataSourceService; -import org.opensearch.sql.datasource.DataSourceServiceImpl; -import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.datasource.model.DataSourceType; -import org.opensearch.sql.executor.DefaultExecutionEngine; -import org.opensearch.sql.executor.DefaultQueryManager; -import org.opensearch.sql.executor.ExecutionEngine; -import org.opensearch.sql.executor.QueryId; -import org.opensearch.sql.executor.QueryService; -import org.opensearch.sql.executor.execution.StreamingQueryPlan; -import org.opensearch.sql.expression.function.BuiltinFunctionRepository; -import org.opensearch.sql.filesystem.storage.FSDataSourceFactory; -import org.opensearch.sql.filesystem.storage.FSStorageEngine; -import org.opensearch.sql.planner.Planner; -import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer; -import org.opensearch.sql.storage.DataSourceFactory; - -@ThreadLeakFilters(filters = {StreamingQueryIT.HadoopFSThreadsFilter.class}) -public class StreamingQueryIT extends PPLIntegTestCase { - - private static final int INTERVAL_IN_SECONDS = 1; - - private static final String DATASOURCE_NAME = "fs"; - - private static final String TABLE_NAME = "mock"; - - private final AtomicInteger result = new AtomicInteger(0); - - private Source source; - - private StreamingQuery query; - - @Test - public void testStreamingQuery() throws IOException, InterruptedException { - source = new Source(); - query = fromFile(source.tempDir); - query.run(); - - source.add(0); - source.add(1); - query.sumShouldBe(1); - - // no source update. - query.sumShouldBe(1); - - source.add(1); - query.sumShouldBe(2); - } - - @After - public void clean() throws InterruptedException, IOException { - query.close(); - source.close(); - } - - StreamingQuery fromFile(java.nio.file.Path path) { - return new StreamingQuery(path); - } - - class Source { - @Getter private final java.nio.file.Path tempDir; - - public Source() throws IOException { - tempDir = Files.createTempDirectory("tempDir"); - } - - Source add(int v) throws IOException { - java.nio.file.Path path = Files.createFile(tempDir.resolve(UUID.randomUUID().toString())); - String buf = String.valueOf(v); - FileOutputStream outputStream = new FileOutputStream(path.toFile()); - outputStream.write(buf.getBytes(StandardCharsets.UTF_8)); - outputStream.close(); - return this; - } - - void close() throws IOException { - Files.walk(tempDir) - .sorted(Comparator.reverseOrder()) - .map(java.nio.file.Path::toFile) - .forEach(File::delete); - assertFalse("temp dir still exist", Files.exists(tempDir)); - } - } - - class StreamingQuery { - final DefaultQueryManager queryManager; - - final QueryService queryService; - - final QueryId queryId = QueryId.queryId(); - - final FSStorageEngine storageEngine; - - public StreamingQuery(java.nio.file.Path tempDir) { - result.set(0); - - DataSourceService dataSourceService = - new DataSourceServiceImpl( - new ImmutableSet.Builder() - .add(new FSDataSourceFactory(tempDir.toUri(), result)) - .build()); - dataSourceService.addDataSource(fsDataSourceMetadata()); - storageEngine = - (FSStorageEngine) dataSourceService.getDataSource(DATASOURCE_NAME).getStorageEngine(); - final BuiltinFunctionRepository functionRepository = BuiltinFunctionRepository.getInstance(); - Analyzer analyzer = - new Analyzer( - new ExpressionAnalyzer(functionRepository), dataSourceService, functionRepository); - Planner planner = new Planner(LogicalPlanOptimizer.create()); - - queryManager = DefaultQueryManager.defaultQueryManager(); - queryService = new QueryService(analyzer, new DefaultExecutionEngine(), planner); - } - - public StreamingQuery run() { - queryManager.submit( - new StreamingQueryPlan( - queryId, - AstDSL.relation(AstDSL.qualifiedName(DATASOURCE_NAME, "default", TABLE_NAME)), - queryService, - new ResponseListener<>() { - @Override - public void onResponse(ExecutionEngine.QueryResponse response) { - fail(); - } - - @Override - public void onFailure(Exception e) { - fail(); - } - }, - new StreamingQueryPlan.IntervalTriggerExecution(1))); - return this; - } - - void sumShouldBe(int expected) throws InterruptedException { - TimeUnit.SECONDS.sleep(INTERVAL_IN_SECONDS); - assertEquals(expected, result.get()); - } - - void close() throws InterruptedException, IOException { - assertTrue(queryManager.cancel(queryId)); - - storageEngine.close(); - queryManager.awaitTermination(5, TimeUnit.SECONDS); - } - } - - /** - * org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner could not close. - * https://www.mail-archive.com/common-issues@hadoop.apache.org/msg232722.html - */ - public static class HadoopFSThreadsFilter implements ThreadFilter { - @Override - public boolean reject(Thread t) { - return t.getName().contains("StatisticsDataReferenceCleaner"); - } - } - - private DataSourceMetadata fsDataSourceMetadata() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName(DATASOURCE_NAME); - dataSourceMetadata.setConnector(DataSourceType.FILESYSTEM); - dataSourceMetadata.setProperties(ImmutableMap.of()); - return dataSourceMetadata; - } -} diff --git a/plugin/build.gradle b/plugin/build.gradle index 04f57b0ca6..7af2c932d8 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -96,8 +96,6 @@ configurations.all { resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib:1.6.0" resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-common:1.6.0" resolutionStrategy.force "com.squareup.okhttp3:okhttp:4.9.3" - resolutionStrategy.force "org.apache.commons:commons-math3:3.6.1" - resolutionStrategy.force "org.apache.commons:commons-lang3:3.12.0" resolutionStrategy.force "joda-time:joda-time:2.10.12" resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36" } @@ -118,7 +116,6 @@ dependencies { api project(':legacy') api project(':opensearch') api project(':prometheus') - api project(':filesystem') } test { diff --git a/settings.gradle b/settings.gradle index 7650959451..89d7380801 100644 --- a/settings.gradle +++ b/settings.gradle @@ -18,5 +18,3 @@ include 'doctest' include 'legacy' include 'sql' include 'prometheus' -include 'filesystem' -