From feee58887ac00296244883da7bde0a77788f5989 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Wed, 26 Oct 2022 14:25:10 -0700 Subject: [PATCH 1/3] Add metadatalog interface and default in memory implementation Signed-off-by: Peng Huo --- .../streaming/DefaultMetadataLog.java | 87 ++++++++++++ .../sql/executor/streaming/MetadataLog.java | 68 ++++++++++ .../streaming/DefaultMetadataLogTest.java | 126 ++++++++++++++++++ 3 files changed, 281 insertions(+) create mode 100644 core/src/main/java/org/opensearch/sql/executor/streaming/DefaultMetadataLog.java create mode 100644 core/src/main/java/org/opensearch/sql/executor/streaming/MetadataLog.java create mode 100644 core/src/test/java/org/opensearch/sql/executor/streaming/DefaultMetadataLogTest.java diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/DefaultMetadataLog.java b/core/src/main/java/org/opensearch/sql/executor/streaming/DefaultMetadataLog.java new file mode 100644 index 0000000000..2b5d0cb046 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/DefaultMetadataLog.java @@ -0,0 +1,87 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor.streaming; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; +import org.apache.commons.lang3.tuple.Pair; + +/** + * In memory implementation of {@link MetadataLog}. + * Todo. Current implementation does not guarantee thread safe. We will re-evaluate it when adding + * pipeline execution. + * + * @param type of metadata type. + */ +public class DefaultMetadataLog implements MetadataLog { + + private static final long MIN_ACCEPTABLE_ID = 0L; + + private SortedMap metadataMap = new TreeMap<>(); + + @Override + public boolean add(Long batchId, T metadata) { + Preconditions.checkArgument(batchId >= MIN_ACCEPTABLE_ID, "batch id must large or equal 0"); + + if (metadataMap.containsKey(batchId)) { + return false; + } + metadataMap.put(batchId, metadata); + return true; + } + + @Override + public Optional get(Long batchId) { + if (!metadataMap.containsKey(batchId)) { + return Optional.empty(); + } else { + return Optional.of(metadataMap.get(batchId)); + } + } + + @Override + public List get(Optional startBatchId, Optional endBatchId) { + if (startBatchId.isEmpty() && endBatchId.isEmpty()) { + return new ArrayList<>(metadataMap.values()); + } else { + Long s = startBatchId.orElse(MIN_ACCEPTABLE_ID); + Long e = endBatchId.map(i -> i + 1).orElse(Long.MAX_VALUE); + return new ArrayList<>(metadataMap.subMap(s, e).values()); + } + } + + @Override + public Optional> getLatest() { + if (metadataMap.isEmpty()) { + return Optional.empty(); + } else { + Long latestId = metadataMap.lastKey(); + return Optional.of(Pair.of(latestId, metadataMap.get(latestId))); + } + } + + @Override + public void purge(Long batchId) { + for (long i = MIN_ACCEPTABLE_ID; i < batchId; i++) { + metadataMap.remove(i); + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/MetadataLog.java b/core/src/main/java/org/opensearch/sql/executor/streaming/MetadataLog.java new file mode 100644 index 0000000000..a0e0a96969 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/MetadataLog.java @@ -0,0 +1,68 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor.streaming; + +import java.util.List; +import java.util.Optional; +import org.apache.commons.lang3.tuple.Pair; + +/** + * Write-ahead Log (WAL). Which allow client write metadata associate with id. + * + * @param type of metadata type. + */ +public interface MetadataLog { + + /** + * add metadata to WAL. + * + * @param id metadata index in WAL. + * @param metadata metadata. + * @return true if add success, otherwise return false. + */ + boolean add(Long id, T metadata); + + /** + * get metadata from WAL. + * + * @param id metadata index in WAL. + * @return metadata. + */ + Optional get(Long id); + + /** + * Return metadata for id between [startId, endId]. + * + * @param startId If startId is empty, return all metadata before endId (inclusive). + * @param endId If end is empty, return all batches after endId (inclusive). + * @return a list of metadata sorted by id (nature order). + */ + List get(Optional startId, Optional endId); + + /** + * Get latest batchId and metadata. + * + * @return pair of id and metadata if not empty. + */ + Optional> getLatest(); + + /** + * Remove all the metadata less then id (exclusive). + * @param id smallest batchId should keep. + */ + void purge(Long id); +} diff --git a/core/src/test/java/org/opensearch/sql/executor/streaming/DefaultMetadataLogTest.java b/core/src/test/java/org/opensearch/sql/executor/streaming/DefaultMetadataLogTest.java new file mode 100644 index 0000000000..4d8c4f3e93 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/executor/streaming/DefaultMetadataLogTest.java @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor.streaming; + + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Arrays; +import java.util.Optional; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DefaultMetadataLogTest { + + private DefaultMetadataLog metadataLog; + + @BeforeEach + void setup() { + metadataLog = new DefaultMetadataLog<>(); + } + + @Test + void addMetadataShouldSuccess() { + assertTrue(metadataLog.add(0L, 0L)); + assertTrue(metadataLog.add(1L, 1L)); + } + + @Test + void addMetadataWithSameBatchIdShouldFail() { + assertTrue(metadataLog.add(0L, 0L)); + assertFalse(metadataLog.add(0L, 1L)); + } + + @Test + void addMetadataWithInvalidIdShouldThrowException() { + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> metadataLog.add(-1L, 0L)); + assertEquals("batch id must large or equal 0", exception.getMessage()); + } + + @Test + void getWithIdReturnMetadata() { + metadataLog.add(0L, 0L); + + assertTrue(metadataLog.get(0L).isPresent()); + assertEquals(0L, metadataLog.get(0L).get()); + } + + @Test + void getWithNotExistShouldReturnEmtpy() { + metadataLog.add(0L, 0L); + + assertTrue(metadataLog.get(1L).isEmpty()); + assertTrue(metadataLog.get(-1L).isEmpty()); + } + + @Test + void getWithIdInRangeShouldReturnMetadataList() { + metadataLog.add(0L, 0L); + metadataLog.add(1L, 1L); + metadataLog.add(2L, 2L); + + assertEquals(Arrays.asList(0L, 1L, 2L), metadataLog.get(Optional.of(0L), Optional.of(2L))); + assertEquals(Arrays.asList(0L, 1L, 2L), metadataLog.get(Optional.of(0L), Optional.of(4L))); + assertEquals(Arrays.asList(0L, 1L, 2L), metadataLog.get(Optional.of(-1L), Optional.of(4L))); + assertEquals(Arrays.asList(0L, 1L), metadataLog.get(Optional.of(0L), Optional.of(1L))); + assertEquals(Arrays.asList(1L, 2L), metadataLog.get(Optional.of(1L), Optional.empty())); + assertEquals(Arrays.asList(0L, 1L), metadataLog.get(Optional.empty(), Optional.of(1L))); + assertEquals(Arrays.asList(0L, 1L, 2L), metadataLog.get(Optional.empty(), Optional.empty())); + } + + @Test + void getWithIdOutOfRangeShouldReturnEmpty() { + metadataLog.add(0L, 0L); + metadataLog.add(1L, 1L); + metadataLog.add(2L, 2L); + + assertTrue(metadataLog.get(Optional.of(3L), Optional.of(5L)).isEmpty()); + } + + @Test + void getLatestShouldReturnMetadata() { + metadataLog.add(0L, 10L); + metadataLog.add(1L, 11L); + + Optional> latest = metadataLog.getLatest(); + assertTrue(latest.isPresent()); + assertEquals(1L, latest.get().getLeft()); + assertEquals(11L, latest.get().getRight()); + } + + @Test + void getLatestFromEmptyWALShouldReturnEmpty() { + Optional> latest = metadataLog.getLatest(); + assertTrue(latest.isEmpty()); + } + + @Test + void purgeLatestShouldOnlyKeepLatest() { + metadataLog.add(0L, 10L); + metadataLog.add(1L, 11L); + metadataLog.add(2L, 12L); + + Optional> latest = metadataLog.getLatest(); + assertTrue(latest.isPresent()); + metadataLog.purge(latest.get().getLeft()); + + latest = metadataLog.getLatest(); + assertTrue(latest.isPresent()); + assertEquals(2L, latest.get().getLeft()); + assertEquals(12L, latest.get().getRight()); + } +} From a73a856ca8789467ccabf2cfd1a9f73d778a625e Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Wed, 26 Oct 2022 14:57:06 -0700 Subject: [PATCH 2/3] format update Signed-off-by: Peng Huo --- .../sql/executor/streaming/DefaultMetadataLog.java | 13 ++----------- .../sql/executor/streaming/MetadataLog.java | 9 +-------- 2 files changed, 3 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/DefaultMetadataLog.java b/core/src/main/java/org/opensearch/sql/executor/streaming/DefaultMetadataLog.java index 2b5d0cb046..77cd990187 100644 --- a/core/src/main/java/org/opensearch/sql/executor/streaming/DefaultMetadataLog.java +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/DefaultMetadataLog.java @@ -6,14 +6,6 @@ * compatible open source license. */ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - package org.opensearch.sql.executor.streaming; import com.google.common.base.Preconditions; @@ -25,9 +17,8 @@ import org.apache.commons.lang3.tuple.Pair; /** - * In memory implementation of {@link MetadataLog}. - * Todo. Current implementation does not guarantee thread safe. We will re-evaluate it when adding - * pipeline execution. + * In memory implementation of {@link MetadataLog}. Todo. Current implementation does not guarantee + * thread safe. We will re-evaluate it when adding pipeline execution. * * @param type of metadata type. */ diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/MetadataLog.java b/core/src/main/java/org/opensearch/sql/executor/streaming/MetadataLog.java index a0e0a96969..d6bb9bacd6 100644 --- a/core/src/main/java/org/opensearch/sql/executor/streaming/MetadataLog.java +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/MetadataLog.java @@ -6,14 +6,6 @@ * compatible open source license. */ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - package org.opensearch.sql.executor.streaming; import java.util.List; @@ -62,6 +54,7 @@ public interface MetadataLog { /** * Remove all the metadata less then id (exclusive). + * * @param id smallest batchId should keep. */ void purge(Long id); From 7c399f2436889bcfa9ddf09da305ee10af788564 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Fri, 28 Oct 2022 08:33:30 -0700 Subject: [PATCH 3/3] address comments Signed-off-by: Peng Huo --- .../opensearch/sql/executor/streaming/DefaultMetadataLog.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/DefaultMetadataLog.java b/core/src/main/java/org/opensearch/sql/executor/streaming/DefaultMetadataLog.java index 77cd990187..e439d93f6c 100644 --- a/core/src/main/java/org/opensearch/sql/executor/streaming/DefaultMetadataLog.java +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/DefaultMetadataLog.java @@ -71,8 +71,6 @@ public Optional> getLatest() { @Override public void purge(Long batchId) { - for (long i = MIN_ACCEPTABLE_ID; i < batchId; i++) { - metadataMap.remove(i); - } + metadataMap.headMap(batchId).clear(); } }