Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Streaming source impl #994

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/sql-test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ jobs:
matrix:
entry:
- { os: ubuntu-latest, java: 11 }
- { os: windows-latest, java: 11, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc}
- { os: windows-latest, java: 11, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc -PbuildPlatform=windows }
- { os: macos-latest, java: 11, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc }
- { os: ubuntu-latest, java: 17 }
- { os: windows-latest, java: 17, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc }
- { os: windows-latest, java: 17, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc -PbuildPlatform=windows }
- { os: macos-latest, java: 17, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc }
runs-on: ${{ matrix.entry.os }}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor.streaming;

import lombok.Data;
import org.opensearch.sql.storage.split.Split;

/**
* A batch of streaming execution.
*/
@Data
public class Batch {
private final Split split;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor.streaming;

import lombok.Data;

/**
* Offset.
*/
@Data
public class Offset {

private final Long offset;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor.streaming;

import java.util.Optional;

/**
* Streaming source.
*/
public interface StreamingSource {
/**
* Get current {@link Offset} of stream data.
*
* @return empty if the stream does not has new data.
*/
Optional<Offset> getLatestOffset();

/**
* Get a {@link Batch} from source between (start, end].
*
* @param start start offset.
* @param end end offset.
* @return @link Batch}.
*/
Batch getBatch(Optional<Offset> start, Offset end);
}
21 changes: 21 additions & 0 deletions core/src/main/java/org/opensearch/sql/storage/split/Split.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.storage.split;

import org.opensearch.sql.storage.StorageEngine;

/**
* Split is a sections of a data set. Each {@link StorageEngine} should have specific
* implementation of Split.
*/
public interface Split {

/**
* Get the split id.
* @return split id.
*/
String getSplitId();
}
2 changes: 1 addition & 1 deletion doctest/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ doctest.dependsOn startOpenSearch
startOpenSearch.dependsOn startPrometheus
doctest.finalizedBy stopOpenSearch
stopOpenSearch.finalizedBy stopPrometheus
build.dependsOn doctest
check.dependsOn doctest
clean.dependsOn(cleanBootstrap)

// 2.0.0-alpha1-SNAPSHOT -> 2.0.0.0-alpha1-SNAPSHOT
Expand Down
129 changes: 129 additions & 0 deletions filesystem/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java-library'
id "io.freefair.lombok"
id 'jacoco'
}

ext {
hadoop = "3.3.4"
aws = "1.12.330"
}

configurations.all {
resolutionStrategy.force "commons-io:commons-io:2.8.0"
}

dependencies {
implementation project(':core')
// required by hadoop filesystem https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/index.html.
implementation("org.apache.hadoop:hadoop-common:${hadoop}") {
exclude group: 'org.apache.zookeeper'
exclude group: 'org.eclipse.jetty'
exclude group: 'com.sun.jersey'
exclude group: 'javax.servlet.jsp'
exclude group: 'javax.servlet'
exclude group: 'org.apache.kerby'
exclude group: 'org.apache.curator'
exclude group: 'com.google.protobuf', module: 'protobuf-java'
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'com.nimbusds', module: 'nimbus-jose-jwt'
// enforce version.
exclude group: 'com.fasterxml.woodstox', module: 'woodstox-core'
exclude group: 'commons-io', module: 'commons-io'
exclude group: 'ch.qos.reload4j', module: 'reload4j'
exclude group: 'org.apache.httpcomponents', module: 'httpcore'
}
implementation('com.fasterxml.woodstox:woodstox-core')
constraints {
implementation('com.fasterxml.woodstox:woodstox-core:6.4.0') {
because 'https://www.mend.io/vulnerability-database/CVE-2022-40156'
}
}
implementation('commons-io:commons-io')
constraints {
implementation('commons-io:commons-io:2.8.0') {
because 'between versions 2.8.0 and 2.5'
}
}
implementation('ch.qos.reload4j:reload4j')
constraints {
implementation('ch.qos.reload4j:reload4j:1.2.22') {
because 'between versions 1.2.22 and 1.2.19'
}
}
implementation('org.apache.httpcomponents:httpcore')
constraints {
implementation('org.apache.httpcomponents:httpcore:4.4.15') {
because 'between versions 4.4.15 and 4.4.13'
}
}

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4'
}

test {
useJUnitPlatform()
testLogging {
events "passed", "skipped", "failed"
exceptionFormat "full"
}

// hadoop-fs depend on native library which is missing on windows.
// https://hadoop.apache.org/docs/r3.3.4/hadoop-project-dist/hadoop-common/NativeLibraries.html#Native_Hadoop_Library
if ('windows' == project.getProperties().getOrDefault('buildPlatform', 'linux')) {
excludes = [
'**/FileSystemStreamSourceTest.class'
]
}
}

jacocoTestReport {
reports {
html.enabled true
xml.enabled true
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
}))
}
}
test.finalizedBy(project.tasks.jacocoTestReport)

jacocoTestCoverageVerification {
violationRules {
rule {
// hadoop-fs depend on native library which is missing on windows.
// https://hadoop.apache.org/docs/r3.3.4/hadoop-project-dist/hadoop-common/NativeLibraries.html#Native_Hadoop_Library
if ('windows' == project.getProperties().getOrDefault('buildPlatform', 'linux')) {
excludes = [
'org.opensearch.sql.filesystem.streaming.FileSystemStreamSource'
]
}
element = 'CLASS'
limit {
counter = 'LINE'
minimum = 1.0
}
limit {
counter = 'BRANCH'
minimum = 1.0
}
}
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
}))
}
}
check.dependsOn jacocoTestCoverageVerification
jacocoTestCoverageVerification.dependsOn jacocoTestReport
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.filesystem.storage.split;

import java.util.Set;
import java.util.UUID;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.apache.hadoop.fs.Path;
import org.opensearch.sql.storage.split.Split;

@Data
public class FileSystemSplit implements Split {

@Getter
@EqualsAndHashCode.Exclude
private final String splitId = UUID.randomUUID().toString();

private final Set<Path> paths;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.filesystem.streaming;

import java.util.Set;
import lombok.Data;
import org.apache.hadoop.fs.Path;

/**
* File metadata. Batch id associate with the set of {@link Path}.
*/
@Data
public class FileMetaData {

private final Long batchId;

private final Set<Path> paths;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.filesystem.streaming;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.SneakyThrows;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.executor.streaming.Batch;
import org.opensearch.sql.executor.streaming.DefaultMetadataLog;
import org.opensearch.sql.executor.streaming.MetadataLog;
import org.opensearch.sql.executor.streaming.Offset;
import org.opensearch.sql.executor.streaming.StreamingSource;
import org.opensearch.sql.filesystem.storage.split.FileSystemSplit;

/**
* FileSystem Streaming Source use Hadoop FileSystem.
*/
public class FileSystemStreamSource implements StreamingSource {

private static final Logger log = LogManager.getLogger(FileSystemStreamSource.class);

private final MetadataLog<FileMetaData> fileMetaDataLog;

private Set<Path> seenFiles;

private final FileSystem fs;

private final Path basePath;

/**
* Constructor of FileSystemStreamSource.
*/
public FileSystemStreamSource(FileSystem fs, Path basePath) {
this.fs = fs;
this.basePath = basePath;
// todo, need to add state recovery
this.fileMetaDataLog = new DefaultMetadataLog<>();
// todo, need to add state recovery
this.seenFiles = new HashSet<>();
}

@SneakyThrows(value = IOException.class)
@Override
public Optional<Offset> getLatestOffset() {
// list all files. todo. improvement list performance.
Set<Path> allFiles =
Arrays.stream(fs.listStatus(basePath))
.filter(status -> !status.isDirectory())
.map(FileStatus::getPath)
.collect(Collectors.toSet());

// find unread files.
log.debug("all files {}", allFiles);
Set<Path> unread = Sets.difference(allFiles, seenFiles);

// update seenFiles.
seenFiles = allFiles;
log.debug("seen files {}", seenFiles);

Optional<Long> latestBatchIdOptional = fileMetaDataLog.getLatest().map(Pair::getKey);
if (!unread.isEmpty()) {
long latestBatchId = latestBatchIdOptional.map(id -> id + 1).orElse(0L);
fileMetaDataLog.add(latestBatchId, new FileMetaData(latestBatchId, unread));
log.debug("latestBatchId {}", latestBatchId);
return Optional.of(new Offset(latestBatchId));
} else {
log.debug("no unread data");
Optional<Offset> offset =
latestBatchIdOptional.isEmpty()
? Optional.empty()
: Optional.of(new Offset(latestBatchIdOptional.get()));
log.debug("return empty offset {}", offset);
return offset;
}
}

@Override
public Batch getBatch(Optional<Offset> start, Offset end) {
Long startBatchId = start.map(Offset::getOffset).map(id -> id + 1).orElse(0L);
Long endBatchId = end.getOffset();

Set<Path> paths =
fileMetaDataLog.get(Optional.of(startBatchId), Optional.of(endBatchId)).stream()
.map(FileMetaData::getPaths)
.flatMap(Set::stream)
.collect(Collectors.toSet());

log.debug("fetch files {} with id from: {} to: {}.", paths, start, end);
return new Batch(new FileSystemSplit(paths));
}
}
Loading