diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/DefaultMetadataLog.java b/core/src/main/java/org/opensearch/sql/executor/streaming/DefaultMetadataLog.java new file mode 100644 index 0000000000..e439d93f6c --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/DefaultMetadataLog.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor.streaming; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; +import org.apache.commons.lang3.tuple.Pair; + +/** + * In memory implementation of {@link MetadataLog}. Todo. Current implementation does not guarantee + * thread safe. We will re-evaluate it when adding pipeline execution. + * + * @param type of metadata type. + */ +public class DefaultMetadataLog implements MetadataLog { + + private static final long MIN_ACCEPTABLE_ID = 0L; + + private SortedMap metadataMap = new TreeMap<>(); + + @Override + public boolean add(Long batchId, T metadata) { + Preconditions.checkArgument(batchId >= MIN_ACCEPTABLE_ID, "batch id must large or equal 0"); + + if (metadataMap.containsKey(batchId)) { + return false; + } + metadataMap.put(batchId, metadata); + return true; + } + + @Override + public Optional get(Long batchId) { + if (!metadataMap.containsKey(batchId)) { + return Optional.empty(); + } else { + return Optional.of(metadataMap.get(batchId)); + } + } + + @Override + public List get(Optional startBatchId, Optional endBatchId) { + if (startBatchId.isEmpty() && endBatchId.isEmpty()) { + return new ArrayList<>(metadataMap.values()); + } else { + Long s = startBatchId.orElse(MIN_ACCEPTABLE_ID); + Long e = endBatchId.map(i -> i + 1).orElse(Long.MAX_VALUE); + return new ArrayList<>(metadataMap.subMap(s, e).values()); + } + } + + @Override + public Optional> getLatest() { + if (metadataMap.isEmpty()) { + return Optional.empty(); + } else { + Long latestId = metadataMap.lastKey(); + return Optional.of(Pair.of(latestId, metadataMap.get(latestId))); + } + } + + @Override + public void purge(Long batchId) { + metadataMap.headMap(batchId).clear(); + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/MetadataLog.java b/core/src/main/java/org/opensearch/sql/executor/streaming/MetadataLog.java new file mode 100644 index 0000000000..d6bb9bacd6 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/MetadataLog.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor.streaming; + +import java.util.List; +import java.util.Optional; +import org.apache.commons.lang3.tuple.Pair; + +/** + * Write-ahead Log (WAL). Which allow client write metadata associate with id. + * + * @param type of metadata type. + */ +public interface MetadataLog { + + /** + * add metadata to WAL. + * + * @param id metadata index in WAL. + * @param metadata metadata. + * @return true if add success, otherwise return false. + */ + boolean add(Long id, T metadata); + + /** + * get metadata from WAL. + * + * @param id metadata index in WAL. + * @return metadata. + */ + Optional get(Long id); + + /** + * Return metadata for id between [startId, endId]. + * + * @param startId If startId is empty, return all metadata before endId (inclusive). + * @param endId If end is empty, return all batches after endId (inclusive). + * @return a list of metadata sorted by id (nature order). + */ + List get(Optional startId, Optional endId); + + /** + * Get latest batchId and metadata. + * + * @return pair of id and metadata if not empty. + */ + Optional> getLatest(); + + /** + * Remove all the metadata less then id (exclusive). + * + * @param id smallest batchId should keep. + */ + void purge(Long id); +} diff --git a/core/src/test/java/org/opensearch/sql/executor/streaming/DefaultMetadataLogTest.java b/core/src/test/java/org/opensearch/sql/executor/streaming/DefaultMetadataLogTest.java new file mode 100644 index 0000000000..4d8c4f3e93 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/executor/streaming/DefaultMetadataLogTest.java @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor.streaming; + + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Arrays; +import java.util.Optional; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DefaultMetadataLogTest { + + private DefaultMetadataLog metadataLog; + + @BeforeEach + void setup() { + metadataLog = new DefaultMetadataLog<>(); + } + + @Test + void addMetadataShouldSuccess() { + assertTrue(metadataLog.add(0L, 0L)); + assertTrue(metadataLog.add(1L, 1L)); + } + + @Test + void addMetadataWithSameBatchIdShouldFail() { + assertTrue(metadataLog.add(0L, 0L)); + assertFalse(metadataLog.add(0L, 1L)); + } + + @Test + void addMetadataWithInvalidIdShouldThrowException() { + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> metadataLog.add(-1L, 0L)); + assertEquals("batch id must large or equal 0", exception.getMessage()); + } + + @Test + void getWithIdReturnMetadata() { + metadataLog.add(0L, 0L); + + assertTrue(metadataLog.get(0L).isPresent()); + assertEquals(0L, metadataLog.get(0L).get()); + } + + @Test + void getWithNotExistShouldReturnEmtpy() { + metadataLog.add(0L, 0L); + + assertTrue(metadataLog.get(1L).isEmpty()); + assertTrue(metadataLog.get(-1L).isEmpty()); + } + + @Test + void getWithIdInRangeShouldReturnMetadataList() { + metadataLog.add(0L, 0L); + metadataLog.add(1L, 1L); + metadataLog.add(2L, 2L); + + assertEquals(Arrays.asList(0L, 1L, 2L), metadataLog.get(Optional.of(0L), Optional.of(2L))); + assertEquals(Arrays.asList(0L, 1L, 2L), metadataLog.get(Optional.of(0L), Optional.of(4L))); + assertEquals(Arrays.asList(0L, 1L, 2L), metadataLog.get(Optional.of(-1L), Optional.of(4L))); + assertEquals(Arrays.asList(0L, 1L), metadataLog.get(Optional.of(0L), Optional.of(1L))); + assertEquals(Arrays.asList(1L, 2L), metadataLog.get(Optional.of(1L), Optional.empty())); + assertEquals(Arrays.asList(0L, 1L), metadataLog.get(Optional.empty(), Optional.of(1L))); + assertEquals(Arrays.asList(0L, 1L, 2L), metadataLog.get(Optional.empty(), Optional.empty())); + } + + @Test + void getWithIdOutOfRangeShouldReturnEmpty() { + metadataLog.add(0L, 0L); + metadataLog.add(1L, 1L); + metadataLog.add(2L, 2L); + + assertTrue(metadataLog.get(Optional.of(3L), Optional.of(5L)).isEmpty()); + } + + @Test + void getLatestShouldReturnMetadata() { + metadataLog.add(0L, 10L); + metadataLog.add(1L, 11L); + + Optional> latest = metadataLog.getLatest(); + assertTrue(latest.isPresent()); + assertEquals(1L, latest.get().getLeft()); + assertEquals(11L, latest.get().getRight()); + } + + @Test + void getLatestFromEmptyWALShouldReturnEmpty() { + Optional> latest = metadataLog.getLatest(); + assertTrue(latest.isEmpty()); + } + + @Test + void purgeLatestShouldOnlyKeepLatest() { + metadataLog.add(0L, 10L); + metadataLog.add(1L, 11L); + metadataLog.add(2L, 12L); + + Optional> latest = metadataLog.getLatest(); + assertTrue(latest.isPresent()); + metadataLog.purge(latest.get().getLeft()); + + latest = metadataLog.getLatest(); + assertTrue(latest.isPresent()); + assertEquals(2L, latest.get().getLeft()); + assertEquals(12L, latest.get().getRight()); + } +}