Skip to content

Commit

Permalink
Add Streaming Source Impl
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Nov 1, 2022
1 parent 30fcd79 commit 7b6a1e2
Show file tree
Hide file tree
Showing 10 changed files with 458 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor.streaming;

import java.util.List;
import lombok.Data;
import org.opensearch.sql.storage.split.Split;

/**
* A batch of streaming execution.
*/
@Data
public class Batch {
private final List<Split> splits;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor.streaming;

import lombok.Data;

/**
* Offset.
*/
@Data
public class Offset {

private final Long offset;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor.streaming;

import java.util.Optional;

/**
* Streaming source.
*/
public interface StreamingSource {
/**
* Get current {@link Offset} of stream data.
*
* @return empty if the stream does not has new data.
*/
Optional<Offset> getLatestOffset();

/**
* Get a {@link Batch} from source between (start, end].
*
* @param start start offset.
* @param end end offset.
* @return @link Batch}.
*/
Batch getBatch(Optional<Offset> start, Offset end);
}
21 changes: 21 additions & 0 deletions core/src/main/java/org/opensearch/sql/storage/split/Split.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.storage.split;

import org.opensearch.sql.storage.StorageEngine;

/**
* Split is a sections of a data set. Each {@link StorageEngine} should have specific
* implementation of Split.
*/
public interface Split {

/**
* Get the split id.
* @return split id.
*/
String getSplitId();
}
69 changes: 69 additions & 0 deletions filesystem/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java-library'
id "io.freefair.lombok"
id 'jacoco'
}

ext {
hadoop = "3.3.4"
aws = "1.12.330"
}


dependencies {
implementation project(':core')

testImplementation "org.junit.jupiter:junit-jupiter:${junit_jupiter}"
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4'
}

test {
useJUnitPlatform()
testLogging {
events "passed", "skipped", "failed"
exceptionFormat "full"
}
}

jacocoTestReport {
reports {
html.enabled true
xml.enabled true
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
}))
}
}
test.finalizedBy(project.tasks.jacocoTestReport)

jacocoTestCoverageVerification {
violationRules {
rule {
element = 'CLASS'
limit {
counter = 'LINE'
minimum = 1.0
}
limit {
counter = 'BRANCH'
minimum = 1.0
}
}
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
}))
}
}
check.dependsOn jacocoTestCoverageVerification
jacocoTestCoverageVerification.dependsOn jacocoTestReport
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.filesystem.storage.split;

import java.nio.file.Path;
import java.util.Set;
import java.util.UUID;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.opensearch.sql.storage.split.Split;

@Data
public class FileSystemSplit implements Split {

@Getter
@EqualsAndHashCode.Exclude
private final String splitId = UUID.randomUUID().toString();

private final Set<Path> paths;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.filesystem.streaming;

import java.nio.file.Path;
import java.util.Set;
import lombok.Data;

/**
* File metadata. Batch id associate with the set of {@link Path}.
*/
@Data
public class FileMetaData {

private final Long batchId;

private final Set<Path> paths;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.filesystem.streaming;

import com.google.common.collect.Sets;
import java.io.File;
import java.nio.file.FileSystem;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.executor.streaming.Batch;
import org.opensearch.sql.executor.streaming.DefaultMetadataLog;
import org.opensearch.sql.executor.streaming.MetadataLog;
import org.opensearch.sql.executor.streaming.Offset;
import org.opensearch.sql.executor.streaming.StreamingSource;
import org.opensearch.sql.filesystem.storage.split.FileSystemSplit;

/**
* FileSystem Streaming Source use Hadoop FileSystem.
*/
public class FileSystemStreamSource implements StreamingSource {

private static final Logger log = LogManager.getLogger(FileSystemStreamSource.class);

private final MetadataLog<FileMetaData> fileMetaDataLog;

private Set<Path> seenFiles;

private final FileSystem fs;

private final String basePath;

/**
* Constructor of FileSystemStreamSource.
*/
public FileSystemStreamSource(FileSystem fs, String basePath) {
this.fs = fs;
this.basePath = basePath;
// todo, need to add state recovery
this.fileMetaDataLog = new DefaultMetadataLog<>();
// todo, need to add state recovery
this.seenFiles = new HashSet<>();
}

@Override
public Optional<Offset> getLatestOffset() {
// list all files. todo. improvement list performance.
Set<Path> allFiles =
Arrays.stream(fs.getPath(basePath).toFile().listFiles())
.filter(file -> !file.isDirectory())
.map(File::toPath)
.collect(Collectors.toSet());

// find unread files.
log.debug("all files {}", allFiles);
Set<Path> unread = Sets.difference(allFiles, seenFiles);

// update seenFiles.
seenFiles = allFiles;
log.debug("seen files {}", seenFiles);

Optional<Long> latestBatchIdOptional = fileMetaDataLog.getLatest().map(Pair::getKey);
if (!unread.isEmpty()) {
long latestBatchId = latestBatchIdOptional.map(id -> id + 1).orElse(0L);
fileMetaDataLog.add(latestBatchId, new FileMetaData(latestBatchId, unread));
log.debug("latestBatchId {}", latestBatchId);
return Optional.of(new Offset(latestBatchId));
} else {
log.debug("no unread data");
Optional<Offset> offset =
latestBatchIdOptional.isEmpty()
? Optional.empty()
: Optional.of(new Offset(latestBatchIdOptional.get()));
log.debug("return empty offset {}", offset);
return offset;
}
}

@Override
public Batch getBatch(Optional<Offset> start, Offset end) {
Long startBatchId = start.map(Offset::getOffset).map(id -> id + 1).orElse(0L);
Long endBatchId = end.getOffset();

Set<Path> paths =
fileMetaDataLog.get(Optional.of(startBatchId), Optional.of(endBatchId)).stream()
.map(FileMetaData::getPaths)
.flatMap(Set::stream)
.collect(Collectors.toSet());

log.debug("fetch files {} with id from: {} to: {}.", paths, start, end);
return new Batch(Collections.singletonList(new FileSystemSplit(paths)));
}
}
Loading

0 comments on commit 7b6a1e2

Please sign in to comment.