You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
return the latest Offset which map to S3Metadata. Notice, the StreamSource does NOT guarantee the Offset is mapping to unread files.
return empty if there is no data in data stream source.
// read stream from file data source
Set<Files> allFiles = fileDataSource.listAllObjects();
// get unread files
Set<Files> unreadFileds = Sets.*difference*(allFiles, seenObjects);
// update seenObjects
seenFiles = allFiles
Long latestBatchId = fileMetadataLog.getLatest()
if (!unreadFileds.isEmpty()) {
// has unread files
// update batchId, keep it monotonically increasing
latestBatchId += 1;
// update s3MetadataLog
fileMetadataLog.add(latestBatchId, new S3Metadata(unreadFileds, latestBatchId));
return Optional.of(new Offset(latestBatchId));
} else {
return latestBatchId == -1 ? Optional.empty() : Optional.of(new Offset(latestBatchId));
}
Batch getBatch(Optional start, Offset end)
return the Batch from stream source between (start, end].
Stream source state maintain
FileMetadataLog maintain the mapping between Offset and FileMetadata. The user of FileMetadataLog MUST maintain the monotonically increasing of Offset.
Optional<Pair<Long, FileMetadata>> getLatest(). return the latest Offset and FileMetaData.
List<FileMetadata> get(Optional<Long> start, Optional<Long> end). return the list of FileMetaData between Offset range in [start, end]
boolean add(Long offset, T metadata). add Offset and FileMetaData.
SeenFiles, maintain the seen files from stream source so far.
The text was updated successfully, but these errors were encountered:
Stream source interface
Stream source state maintain
Optional<Pair<Long, FileMetadata>> getLatest()
. return the latest Offset and FileMetaData.List<FileMetadata> get(Optional<Long> start, Optional<Long> end)
. return the list of FileMetaData between Offset range in [start, end]boolean add(Long offset, T metadata)
. add Offset and FileMetaData.The text was updated successfully, but these errors were encountered: