diff --git a/CHANGELOG.md b/CHANGELOG.md index f0ded8bd0fe36..714b569456e93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -69,6 +69,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827)) - Fix compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944)) - Support OpenSSL Provider with default Netty allocator ([#5460](https://github.com/opensearch-project/OpenSearch/pull/5460)) +- Avoid negative memory result in IndicesQueryCache stats calculation ([#6917](https://github.com/opensearch-project/OpenSearch/pull/6917)) ### Security @@ -88,6 +89,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Enable sort optimization for all NumericTypes ([#6464](https://github.com/opensearch-project/OpenSearch/pull/6464) - Remove 'cluster_manager' role attachment when using 'node.master' deprecated setting ([#6331](https://github.com/opensearch-project/OpenSearch/pull/6331)) - Add new cluster settings to ignore weighted round-robin routing and fallback to default behaviour. ([#6834](https://github.com/opensearch-project/OpenSearch/pull/6834)) +- Add experimental support for ZSTD compression. ([#3577](https://github.com/opensearch-project/OpenSearch/pull/3577)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490)) @@ -103,6 +105,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `net.minidev:json-smart` from 2.4.7 to 2.4.10 - Bump `org.apache.maven:maven-model` from 3.6.2 to 3.9.1 - Bump `org.codehaus.jettison:jettison` from 1.5.3 to 1.5.4 ([#6878](https://github.com/opensearch-project/OpenSearch/pull/6878)) +- Add `com.github.luben:zstd-jni:1.5.4-1` ([#3577](https://github.com/opensearch-project/OpenSearch/pull/3577)) +- Bump: Netty from 4.1.90.Final to 4.1.91.Final , ASM 9.4 to ASM 9.5, ByteBuddy 1.14.2 to 1.14.3 ([#6981](https://github.com/opensearch-project/OpenSearch/pull/6981)) ### Changed - Require MediaType in Strings.toString API ([#6009](https://github.com/opensearch-project/OpenSearch/pull/6009)) diff --git a/buildSrc/version.properties b/buildSrc/version.properties index a7a1f8717abc3..1562a5aecfd35 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -16,7 +16,7 @@ supercsv = 2.4.0 # Update to 2.17.2+ is breaking OpenSearchJsonLayout (see https://issues.apache.org/jira/browse/LOG4J2-3562) log4j = 2.17.1 slf4j = 1.7.36 -asm = 9.4 +asm = 9.5 jettison = 1.5.4 woodstox = 6.4.0 kotlin = 1.7.10 @@ -26,7 +26,7 @@ guava = 31.1-jre # when updating the JNA version, also update the version in buildSrc/build.gradle jna = 5.5.0 -netty = 4.1.90.Final +netty = 4.1.91.Final joda = 2.12.2 # client dependencies @@ -51,7 +51,7 @@ junit = 4.13.2 hamcrest = 2.1 mockito = 5.2.0 objenesis = 3.2 -bytebuddy = 1.14.2 +bytebuddy = 1.14.3 # benchmark dependencies jmh = 1.35 diff --git a/modules/lang-expression/licenses/asm-9.4.jar.sha1 b/modules/lang-expression/licenses/asm-9.4.jar.sha1 deleted file mode 100644 index 75f2b0fe9a112..0000000000000 --- a/modules/lang-expression/licenses/asm-9.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -b4e0e2d2e023aa317b7cfcfc916377ea348e07d1 \ No newline at end of file diff --git a/modules/lang-expression/licenses/asm-9.5.jar.sha1 b/modules/lang-expression/licenses/asm-9.5.jar.sha1 new file mode 100644 index 0000000000000..ea4aa3581dc87 --- /dev/null +++ b/modules/lang-expression/licenses/asm-9.5.jar.sha1 @@ -0,0 +1 @@ +dc6ea1875f4d64fbc85e1691c95b96a3d8569c90 \ No newline at end of file diff --git a/modules/lang-expression/licenses/asm-commons-9.4.jar.sha1 b/modules/lang-expression/licenses/asm-commons-9.4.jar.sha1 deleted file mode 100644 index e0e2a2f4e63e9..0000000000000 --- a/modules/lang-expression/licenses/asm-commons-9.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -8fc2810ddbcbbec0a8bbccb3f8eda58321839912 \ No newline at end of file diff --git a/modules/lang-expression/licenses/asm-commons-9.5.jar.sha1 b/modules/lang-expression/licenses/asm-commons-9.5.jar.sha1 new file mode 100644 index 0000000000000..5be792660c19f --- /dev/null +++ b/modules/lang-expression/licenses/asm-commons-9.5.jar.sha1 @@ -0,0 +1 @@ +19ab5b5800a3910d30d3a3e64fdb00fd0cb42de0 \ No newline at end of file diff --git a/modules/lang-expression/licenses/asm-tree-9.4.jar.sha1 b/modules/lang-expression/licenses/asm-tree-9.4.jar.sha1 deleted file mode 100644 index 50ce6d740aab7..0000000000000 --- a/modules/lang-expression/licenses/asm-tree-9.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -a99175a17d7fdc18cbcbd0e8ea6a5d276844190a \ No newline at end of file diff --git a/modules/lang-expression/licenses/asm-tree-9.5.jar.sha1 b/modules/lang-expression/licenses/asm-tree-9.5.jar.sha1 new file mode 100644 index 0000000000000..fb42db6a9d15c --- /dev/null +++ b/modules/lang-expression/licenses/asm-tree-9.5.jar.sha1 @@ -0,0 +1 @@ +fd33c8b6373abaa675be407082fdfda35021254a \ No newline at end of file diff --git a/modules/lang-painless/licenses/asm-9.4.jar.sha1 b/modules/lang-painless/licenses/asm-9.4.jar.sha1 deleted file mode 100644 index 75f2b0fe9a112..0000000000000 --- a/modules/lang-painless/licenses/asm-9.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -b4e0e2d2e023aa317b7cfcfc916377ea348e07d1 \ No newline at end of file diff --git a/modules/lang-painless/licenses/asm-9.5.jar.sha1 b/modules/lang-painless/licenses/asm-9.5.jar.sha1 new file mode 100644 index 0000000000000..ea4aa3581dc87 --- /dev/null +++ b/modules/lang-painless/licenses/asm-9.5.jar.sha1 @@ -0,0 +1 @@ +dc6ea1875f4d64fbc85e1691c95b96a3d8569c90 \ No newline at end of file diff --git a/modules/lang-painless/licenses/asm-analysis-9.4.jar.sha1 b/modules/lang-painless/licenses/asm-analysis-9.4.jar.sha1 deleted file mode 100644 index 850a070775e4d..0000000000000 --- a/modules/lang-painless/licenses/asm-analysis-9.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -0a5fec9dfc039448d4fd098fbaffcaf55373b223 \ No newline at end of file diff --git a/modules/lang-painless/licenses/asm-analysis-9.5.jar.sha1 b/modules/lang-painless/licenses/asm-analysis-9.5.jar.sha1 new file mode 100644 index 0000000000000..9e87d3ce7d719 --- /dev/null +++ b/modules/lang-painless/licenses/asm-analysis-9.5.jar.sha1 @@ -0,0 +1 @@ +490bacc77de7cbc0be1a30bb3471072d705be4a4 \ No newline at end of file diff --git a/modules/lang-painless/licenses/asm-commons-9.4.jar.sha1 b/modules/lang-painless/licenses/asm-commons-9.4.jar.sha1 deleted file mode 100644 index e0e2a2f4e63e9..0000000000000 --- a/modules/lang-painless/licenses/asm-commons-9.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -8fc2810ddbcbbec0a8bbccb3f8eda58321839912 \ No newline at end of file diff --git a/modules/lang-painless/licenses/asm-commons-9.5.jar.sha1 b/modules/lang-painless/licenses/asm-commons-9.5.jar.sha1 new file mode 100644 index 0000000000000..5be792660c19f --- /dev/null +++ b/modules/lang-painless/licenses/asm-commons-9.5.jar.sha1 @@ -0,0 +1 @@ +19ab5b5800a3910d30d3a3e64fdb00fd0cb42de0 \ No newline at end of file diff --git a/modules/lang-painless/licenses/asm-tree-9.4.jar.sha1 b/modules/lang-painless/licenses/asm-tree-9.4.jar.sha1 deleted file mode 100644 index 50ce6d740aab7..0000000000000 --- a/modules/lang-painless/licenses/asm-tree-9.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -a99175a17d7fdc18cbcbd0e8ea6a5d276844190a \ No newline at end of file diff --git a/modules/lang-painless/licenses/asm-tree-9.5.jar.sha1 b/modules/lang-painless/licenses/asm-tree-9.5.jar.sha1 new file mode 100644 index 0000000000000..fb42db6a9d15c --- /dev/null +++ b/modules/lang-painless/licenses/asm-tree-9.5.jar.sha1 @@ -0,0 +1 @@ +fd33c8b6373abaa675be407082fdfda35021254a \ No newline at end of file diff --git a/modules/lang-painless/licenses/asm-util-9.4.jar.sha1 b/modules/lang-painless/licenses/asm-util-9.4.jar.sha1 deleted file mode 100644 index 8c5854f41bcda..0000000000000 --- a/modules/lang-painless/licenses/asm-util-9.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -ab1e0a84b72561dbaf1ee260321e72148ebf4b19 \ No newline at end of file diff --git a/modules/lang-painless/licenses/asm-util-9.5.jar.sha1 b/modules/lang-painless/licenses/asm-util-9.5.jar.sha1 new file mode 100644 index 0000000000000..5fffbfe655deb --- /dev/null +++ b/modules/lang-painless/licenses/asm-util-9.5.jar.sha1 @@ -0,0 +1 @@ +64b5a1fc8c1b15ed2efd6a063e976bc8d3dc5ffe \ No newline at end of file diff --git a/modules/transport-netty4/build.gradle b/modules/transport-netty4/build.gradle index 124f0a4fef3a8..a68d70d124094 100644 --- a/modules/transport-netty4/build.gradle +++ b/modules/transport-netty4/build.gradle @@ -253,6 +253,11 @@ thirdPartyAudit { 'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerLimitField', 'io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess', 'io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess', - 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator' + 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator', + 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$1', + 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$2', + 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$3', + 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$4', + 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$5' ) } diff --git a/modules/transport-netty4/licenses/netty-buffer-4.1.90.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-buffer-4.1.90.Final.jar.sha1 deleted file mode 100644 index 67604d11c1eca..0000000000000 --- a/modules/transport-netty4/licenses/netty-buffer-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -937eb60c19c5f5c1326b96123c9ec3d33238d4d5 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-buffer-4.1.91.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-buffer-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..158024bc892d5 --- /dev/null +++ b/modules/transport-netty4/licenses/netty-buffer-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +d8f180291c3501e931968ca7e40ae0323c4eacee \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-codec-4.1.90.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-4.1.90.Final.jar.sha1 deleted file mode 100644 index c8fb04a021807..0000000000000 --- a/modules/transport-netty4/licenses/netty-codec-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -9992a22c82e18b8fd4f34989535f3e504e55aa37 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-codec-4.1.91.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..764a03d3d73d1 --- /dev/null +++ b/modules/transport-netty4/licenses/netty-codec-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +3044b8e325e33f72c96ac1ea51dda85bef090cc0 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-codec-http-4.1.90.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-http-4.1.90.Final.jar.sha1 deleted file mode 100644 index 861599ce1d1d2..0000000000000 --- a/modules/transport-netty4/licenses/netty-codec-http-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -19bbcd46f8ee0d118486f98eff22fe665b9689e5 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-codec-http-4.1.91.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-http-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..ca956129d98c1 --- /dev/null +++ b/modules/transport-netty4/licenses/netty-codec-http-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +4519d2ff470941f0086214b19c9acf992868112f \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-codec-http2-4.1.90.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-http2-4.1.90.Final.jar.sha1 deleted file mode 100644 index 64caa309f2c05..0000000000000 --- a/modules/transport-netty4/licenses/netty-codec-http2-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -ea3be877ea976b3d71e1a872958d32854b24db66 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-codec-http2-4.1.91.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-http2-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..d57336af7f414 --- /dev/null +++ b/modules/transport-netty4/licenses/netty-codec-http2-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +4ee7027e1653c6ee3f843191e0d932f29e8e14e1 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-common-4.1.90.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-common-4.1.90.Final.jar.sha1 deleted file mode 100644 index afb531805329e..0000000000000 --- a/modules/transport-netty4/licenses/netty-common-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -43597a09382c6ae2bef469a9b3a41e8a17850638 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-common-4.1.91.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-common-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..deaad405402f2 --- /dev/null +++ b/modules/transport-netty4/licenses/netty-common-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +93e5056462a242718e7689d81180d125c79d7723 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-handler-4.1.90.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-handler-4.1.90.Final.jar.sha1 deleted file mode 100644 index c98bfb52393d6..0000000000000 --- a/modules/transport-netty4/licenses/netty-handler-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -64f6946ce4d9189cec5341d3f5f86ac5653099b5 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-handler-4.1.91.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-handler-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..3e121e5de16b8 --- /dev/null +++ b/modules/transport-netty4/licenses/netty-handler-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +444cf41e4fe28c47ffebba5e77b9458a12f938a1 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-resolver-4.1.90.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-resolver-4.1.90.Final.jar.sha1 deleted file mode 100644 index b92177828aa56..0000000000000 --- a/modules/transport-netty4/licenses/netty-resolver-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -47c415d8c83f08b820ba00e6497a6cf19dd0155f \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-resolver-4.1.91.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-resolver-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..bc57e2d01a2bf --- /dev/null +++ b/modules/transport-netty4/licenses/netty-resolver-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +04725d117d4b71ef0e743aa79062489b45472b26 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-transport-4.1.90.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-transport-4.1.90.Final.jar.sha1 deleted file mode 100644 index c7a77dbf6aaa8..0000000000000 --- a/modules/transport-netty4/licenses/netty-transport-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -82d68da212f62b076c763f5efa9b072d2abc018f \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-transport-4.1.91.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-transport-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..2562ece34790b --- /dev/null +++ b/modules/transport-netty4/licenses/netty-transport-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +c2f6bd7143194ca842b535546a405c06aa993934 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-transport-native-unix-common-4.1.90.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-transport-native-unix-common-4.1.90.Final.jar.sha1 deleted file mode 100644 index 5f954b2595927..0000000000000 --- a/modules/transport-netty4/licenses/netty-transport-native-unix-common-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -e42282002cf22105e7e993651aead231246d0220 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-transport-native-unix-common-4.1.91.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-transport-native-unix-common-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..6f45d642c8c0d --- /dev/null +++ b/modules/transport-netty4/licenses/netty-transport-native-unix-common-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +80990b5885b8b67be096d7090cba18f05c67120e \ No newline at end of file diff --git a/plugins/repository-azure/licenses/netty-codec-dns-4.1.90.Final.jar.sha1 b/plugins/repository-azure/licenses/netty-codec-dns-4.1.90.Final.jar.sha1 deleted file mode 100644 index 3ef0c5df26b85..0000000000000 --- a/plugins/repository-azure/licenses/netty-codec-dns-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -ebf5da8e6edf783d069d9aca346ff46c55772de6 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/netty-codec-dns-4.1.91.Final.jar.sha1 b/plugins/repository-azure/licenses/netty-codec-dns-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..de151d86d4595 --- /dev/null +++ b/plugins/repository-azure/licenses/netty-codec-dns-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +2c0242c69eee44ee559d02c564dbceee8bf0a5c7 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/netty-codec-http2-4.1.90.Final.jar.sha1 b/plugins/repository-azure/licenses/netty-codec-http2-4.1.90.Final.jar.sha1 deleted file mode 100644 index 64caa309f2c05..0000000000000 --- a/plugins/repository-azure/licenses/netty-codec-http2-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -ea3be877ea976b3d71e1a872958d32854b24db66 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/netty-codec-http2-4.1.91.Final.jar.sha1 b/plugins/repository-azure/licenses/netty-codec-http2-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..d57336af7f414 --- /dev/null +++ b/plugins/repository-azure/licenses/netty-codec-http2-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +4ee7027e1653c6ee3f843191e0d932f29e8e14e1 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/netty-codec-socks-4.1.90.Final.jar.sha1 b/plugins/repository-azure/licenses/netty-codec-socks-4.1.90.Final.jar.sha1 deleted file mode 100644 index 2738db8a6710a..0000000000000 --- a/plugins/repository-azure/licenses/netty-codec-socks-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -7397535a4e03d2f74c71aa2282eb7a2760ffc37b \ No newline at end of file diff --git a/plugins/repository-azure/licenses/netty-codec-socks-4.1.91.Final.jar.sha1 b/plugins/repository-azure/licenses/netty-codec-socks-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..10d7478ce02ca --- /dev/null +++ b/plugins/repository-azure/licenses/netty-codec-socks-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +8f0a52677da411a8ab762c426d723c7f54471504 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/netty-handler-proxy-4.1.90.Final.jar.sha1 b/plugins/repository-azure/licenses/netty-handler-proxy-4.1.90.Final.jar.sha1 deleted file mode 100644 index 60bde875d0faf..0000000000000 --- a/plugins/repository-azure/licenses/netty-handler-proxy-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -6ab526a43a14f7796434fa6a705c34201603235f \ No newline at end of file diff --git a/plugins/repository-azure/licenses/netty-handler-proxy-4.1.91.Final.jar.sha1 b/plugins/repository-azure/licenses/netty-handler-proxy-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..116ed58f33a4d --- /dev/null +++ b/plugins/repository-azure/licenses/netty-handler-proxy-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +2e3e57eae1a61e4e5f558e39619186fec6c424d3 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/netty-resolver-dns-4.1.90.Final.jar.sha1 b/plugins/repository-azure/licenses/netty-resolver-dns-4.1.90.Final.jar.sha1 deleted file mode 100644 index 6124f27a050e0..0000000000000 --- a/plugins/repository-azure/licenses/netty-resolver-dns-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -c9e6762805fe1bc854352dbc8020226f38674bce \ No newline at end of file diff --git a/plugins/repository-azure/licenses/netty-resolver-dns-4.1.91.Final.jar.sha1 b/plugins/repository-azure/licenses/netty-resolver-dns-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..5f96d34bab52c --- /dev/null +++ b/plugins/repository-azure/licenses/netty-resolver-dns-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +e1567967f5a85a469b10b7394e3e2b90ea5c0b12 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/netty-transport-native-unix-common-4.1.90.Final.jar.sha1 b/plugins/repository-azure/licenses/netty-transport-native-unix-common-4.1.90.Final.jar.sha1 deleted file mode 100644 index 5f954b2595927..0000000000000 --- a/plugins/repository-azure/licenses/netty-transport-native-unix-common-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -e42282002cf22105e7e993651aead231246d0220 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/netty-transport-native-unix-common-4.1.91.Final.jar.sha1 b/plugins/repository-azure/licenses/netty-transport-native-unix-common-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..6f45d642c8c0d --- /dev/null +++ b/plugins/repository-azure/licenses/netty-transport-native-unix-common-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +80990b5885b8b67be096d7090cba18f05c67120e \ No newline at end of file diff --git a/plugins/repository-hdfs/licenses/netty-all-4.1.90.Final.jar.sha1 b/plugins/repository-hdfs/licenses/netty-all-4.1.90.Final.jar.sha1 deleted file mode 100644 index 829204d91b994..0000000000000 --- a/plugins/repository-hdfs/licenses/netty-all-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -0fb2bac7d106f8db84b111202bfb1c68a1aa89b8 \ No newline at end of file diff --git a/plugins/repository-hdfs/licenses/netty-all-4.1.91.Final.jar.sha1 b/plugins/repository-hdfs/licenses/netty-all-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..fd9d37b2f0c8d --- /dev/null +++ b/plugins/repository-hdfs/licenses/netty-all-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +d96d417b6c6b4a786d54418e09593c4b2292f437 \ No newline at end of file diff --git a/plugins/transport-nio/build.gradle b/plugins/transport-nio/build.gradle index 8a6a6a334e1a9..7f7c75b8e2142 100644 --- a/plugins/transport-nio/build.gradle +++ b/plugins/transport-nio/build.gradle @@ -193,6 +193,11 @@ thirdPartyAudit { 'io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess', 'io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess', - 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator' + 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator', + 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$1', + 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$2', + 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$3', + 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$4', + 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$5' ) } diff --git a/plugins/transport-nio/licenses/netty-buffer-4.1.90.Final.jar.sha1 b/plugins/transport-nio/licenses/netty-buffer-4.1.90.Final.jar.sha1 deleted file mode 100644 index 67604d11c1eca..0000000000000 --- a/plugins/transport-nio/licenses/netty-buffer-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -937eb60c19c5f5c1326b96123c9ec3d33238d4d5 \ No newline at end of file diff --git a/plugins/transport-nio/licenses/netty-buffer-4.1.91.Final.jar.sha1 b/plugins/transport-nio/licenses/netty-buffer-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..158024bc892d5 --- /dev/null +++ b/plugins/transport-nio/licenses/netty-buffer-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +d8f180291c3501e931968ca7e40ae0323c4eacee \ No newline at end of file diff --git a/plugins/transport-nio/licenses/netty-codec-4.1.90.Final.jar.sha1 b/plugins/transport-nio/licenses/netty-codec-4.1.90.Final.jar.sha1 deleted file mode 100644 index c8fb04a021807..0000000000000 --- a/plugins/transport-nio/licenses/netty-codec-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -9992a22c82e18b8fd4f34989535f3e504e55aa37 \ No newline at end of file diff --git a/plugins/transport-nio/licenses/netty-codec-4.1.91.Final.jar.sha1 b/plugins/transport-nio/licenses/netty-codec-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..764a03d3d73d1 --- /dev/null +++ b/plugins/transport-nio/licenses/netty-codec-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +3044b8e325e33f72c96ac1ea51dda85bef090cc0 \ No newline at end of file diff --git a/plugins/transport-nio/licenses/netty-codec-http-4.1.90.Final.jar.sha1 b/plugins/transport-nio/licenses/netty-codec-http-4.1.90.Final.jar.sha1 deleted file mode 100644 index 861599ce1d1d2..0000000000000 --- a/plugins/transport-nio/licenses/netty-codec-http-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -19bbcd46f8ee0d118486f98eff22fe665b9689e5 \ No newline at end of file diff --git a/plugins/transport-nio/licenses/netty-codec-http-4.1.91.Final.jar.sha1 b/plugins/transport-nio/licenses/netty-codec-http-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..ca956129d98c1 --- /dev/null +++ b/plugins/transport-nio/licenses/netty-codec-http-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +4519d2ff470941f0086214b19c9acf992868112f \ No newline at end of file diff --git a/plugins/transport-nio/licenses/netty-common-4.1.90.Final.jar.sha1 b/plugins/transport-nio/licenses/netty-common-4.1.90.Final.jar.sha1 deleted file mode 100644 index afb531805329e..0000000000000 --- a/plugins/transport-nio/licenses/netty-common-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -43597a09382c6ae2bef469a9b3a41e8a17850638 \ No newline at end of file diff --git a/plugins/transport-nio/licenses/netty-common-4.1.91.Final.jar.sha1 b/plugins/transport-nio/licenses/netty-common-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..deaad405402f2 --- /dev/null +++ b/plugins/transport-nio/licenses/netty-common-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +93e5056462a242718e7689d81180d125c79d7723 \ No newline at end of file diff --git a/plugins/transport-nio/licenses/netty-handler-4.1.90.Final.jar.sha1 b/plugins/transport-nio/licenses/netty-handler-4.1.90.Final.jar.sha1 deleted file mode 100644 index c98bfb52393d6..0000000000000 --- a/plugins/transport-nio/licenses/netty-handler-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -64f6946ce4d9189cec5341d3f5f86ac5653099b5 \ No newline at end of file diff --git a/plugins/transport-nio/licenses/netty-handler-4.1.91.Final.jar.sha1 b/plugins/transport-nio/licenses/netty-handler-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..3e121e5de16b8 --- /dev/null +++ b/plugins/transport-nio/licenses/netty-handler-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +444cf41e4fe28c47ffebba5e77b9458a12f938a1 \ No newline at end of file diff --git a/plugins/transport-nio/licenses/netty-resolver-4.1.90.Final.jar.sha1 b/plugins/transport-nio/licenses/netty-resolver-4.1.90.Final.jar.sha1 deleted file mode 100644 index b92177828aa56..0000000000000 --- a/plugins/transport-nio/licenses/netty-resolver-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -47c415d8c83f08b820ba00e6497a6cf19dd0155f \ No newline at end of file diff --git a/plugins/transport-nio/licenses/netty-resolver-4.1.91.Final.jar.sha1 b/plugins/transport-nio/licenses/netty-resolver-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..bc57e2d01a2bf --- /dev/null +++ b/plugins/transport-nio/licenses/netty-resolver-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +04725d117d4b71ef0e743aa79062489b45472b26 \ No newline at end of file diff --git a/plugins/transport-nio/licenses/netty-transport-4.1.90.Final.jar.sha1 b/plugins/transport-nio/licenses/netty-transport-4.1.90.Final.jar.sha1 deleted file mode 100644 index c7a77dbf6aaa8..0000000000000 --- a/plugins/transport-nio/licenses/netty-transport-4.1.90.Final.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -82d68da212f62b076c763f5efa9b072d2abc018f \ No newline at end of file diff --git a/plugins/transport-nio/licenses/netty-transport-4.1.91.Final.jar.sha1 b/plugins/transport-nio/licenses/netty-transport-4.1.91.Final.jar.sha1 new file mode 100644 index 0000000000000..2562ece34790b --- /dev/null +++ b/plugins/transport-nio/licenses/netty-transport-4.1.91.Final.jar.sha1 @@ -0,0 +1 @@ +c2f6bd7143194ca842b535546a405c06aa993934 \ No newline at end of file diff --git a/sandbox/modules/custom-codecs/build.gradle b/sandbox/modules/custom-codecs/build.gradle new file mode 100644 index 0000000000000..bf1bc719b0ae6 --- /dev/null +++ b/sandbox/modules/custom-codecs/build.gradle @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +apply plugin: 'opensearch.opensearchplugin' +apply plugin: 'opensearch.yaml-rest-test' + +opensearchplugin { + name 'custom-codecs' + description 'A plugin that implements custom compression codecs.' + classname 'org.opensearch.index.codec.customcodecs.CustomCodecPlugin' + licenseFile rootProject.file('licenses/APACHE-LICENSE-2.0.txt') + noticeFile rootProject.file('NOTICE.txt') +} + +dependencies { + api "com.github.luben:zstd-jni:1.5.4-1" +} + +yamlRestTest.enabled = false; +testingConventions.enabled = false; diff --git a/sandbox/modules/custom-codecs/licenses/zstd-jni-1.5.4-1.jar.sha1 b/sandbox/modules/custom-codecs/licenses/zstd-jni-1.5.4-1.jar.sha1 new file mode 100644 index 0000000000000..e95377f702a6c --- /dev/null +++ b/sandbox/modules/custom-codecs/licenses/zstd-jni-1.5.4-1.jar.sha1 @@ -0,0 +1 @@ +291ccaacc039e41932de877303edb6af98a91c24 diff --git a/sandbox/modules/custom-codecs/licenses/zstd-jni-LICENSE.txt b/sandbox/modules/custom-codecs/licenses/zstd-jni-LICENSE.txt new file mode 100644 index 0000000000000..c4dd507c1c72f --- /dev/null +++ b/sandbox/modules/custom-codecs/licenses/zstd-jni-LICENSE.txt @@ -0,0 +1,29 @@ +----------------------------------------------------------------------------- +** Beginning of "BSD License" text. ** + +Zstd-jni: JNI bindings to Zstd Library + +Copyright (c) 2015-present, Luben Karavelov/ All rights reserved. + +BSD License + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, this + list of conditions and the following disclaimer in the documentation and/or + other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/sandbox/modules/custom-codecs/licenses/zstd-jni-NOTICE.txt b/sandbox/modules/custom-codecs/licenses/zstd-jni-NOTICE.txt new file mode 100644 index 0000000000000..389c97cbc892d --- /dev/null +++ b/sandbox/modules/custom-codecs/licenses/zstd-jni-NOTICE.txt @@ -0,0 +1 @@ +The code for the JNI bindings to Zstd library was originally authored by Luben Karavelov diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java new file mode 100644 index 0000000000000..1e0245f3c8c6b --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.EnginePlugin; +import org.opensearch.index.codec.CodecServiceFactory; +import org.opensearch.index.IndexSettings; + +import java.util.Optional; + +/** + * A plugin that implements custom codecs. Supports these codecs: + * + * + * @opensearch.internal + */ +public final class CustomCodecPlugin extends Plugin implements EnginePlugin { + + /** Creates a new instance */ + public CustomCodecPlugin() {} + + /** + * @param indexSettings is the default indexSettings + * @return the engine factory + */ + @Override + public Optional getCustomCodecServiceFactory(final IndexSettings indexSettings) { + return Optional.of(new CustomCodecServiceFactory()); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecService.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecService.java new file mode 100644 index 0000000000000..4dd25caa86d94 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecService.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.Codec; +import org.opensearch.common.collect.MapBuilder; +import org.opensearch.index.codec.CodecService; +import org.opensearch.index.mapper.MapperService; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; + +/** + * CustomCodecService provides ZSTD and ZSTDNODICT compression codecs. + */ +public class CustomCodecService extends CodecService { + private final Map codecs; + + /** + * Creates a new CustomCodecService. + * + * @param mapperService A mapper service. + * @param logger A logger. + */ + public CustomCodecService(MapperService mapperService, Logger logger) { + super(mapperService, logger); + final MapBuilder codecs = MapBuilder.newMapBuilder(); + if (mapperService == null) { + codecs.put(Lucene95CustomCodec.Mode.ZSTD.name(), new ZstdCodec()); + codecs.put(Lucene95CustomCodec.Mode.ZSTDNODICT.name(), new ZstdNoDictCodec()); + } else { + codecs.put( + Lucene95CustomCodec.Mode.ZSTD.name(), + new PerFieldMappingPostingFormatCodec(Lucene95CustomCodec.Mode.ZSTD, mapperService) + ); + codecs.put( + Lucene95CustomCodec.Mode.ZSTDNODICT.name(), + new PerFieldMappingPostingFormatCodec(Lucene95CustomCodec.Mode.ZSTDNODICT, mapperService) + ); + } + this.codecs = codecs.immutableMap(); + } + + @Override + public Codec codec(String name) { + Codec codec = codecs.get(name); + if (codec == null) { + return super.codec(name); + } + return codec; + } + + @Override + public String[] availableCodecs() { + ArrayList ac = new ArrayList(Arrays.asList(super.availableCodecs())); + ac.addAll(codecs.keySet()); + return ac.toArray(new String[0]); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecServiceFactory.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecServiceFactory.java new file mode 100644 index 0000000000000..9a1872abfcbd7 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecServiceFactory.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.opensearch.index.codec.CodecService; +import org.opensearch.index.codec.CodecServiceConfig; +import org.opensearch.index.codec.CodecServiceFactory; + +/** + * A factory for creating new {@link CodecService} instance + */ +public class CustomCodecServiceFactory implements CodecServiceFactory { + + /** Creates a new instance. */ + public CustomCodecServiceFactory() {} + + @Override + public CodecService createCodecService(CodecServiceConfig config) { + return new CustomCodecService(config.getMapperService(), config.getLogger()); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java new file mode 100644 index 0000000000000..652306e59559b --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.StoredFieldsFormat; +import org.apache.lucene.codecs.FilterCodec; +import org.apache.lucene.codecs.lucene95.Lucene95Codec; + +abstract class Lucene95CustomCodec extends FilterCodec { + public static final int DEFAULT_COMPRESSION_LEVEL = 6; + + /** Each mode represents a compression algorithm. */ + public enum Mode { + ZSTD, + ZSTDNODICT + } + + private final StoredFieldsFormat storedFieldsFormat; + + /** new codec for a given compression algorithm and default compression level */ + public Lucene95CustomCodec(Mode mode) { + this(mode, DEFAULT_COMPRESSION_LEVEL); + } + + public Lucene95CustomCodec(Mode mode, int compressionLevel) { + super(mode.name(), new Lucene95Codec()); + this.storedFieldsFormat = new Lucene95CustomStoredFieldsFormat(mode, compressionLevel); + } + + @Override + public StoredFieldsFormat storedFieldsFormat() { + return storedFieldsFormat; + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java new file mode 100644 index 0000000000000..e0253516b6d0a --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import java.io.IOException; +import java.util.Objects; +import org.apache.lucene.codecs.StoredFieldsFormat; +import org.apache.lucene.codecs.StoredFieldsReader; +import org.apache.lucene.codecs.StoredFieldsWriter; +import org.apache.lucene.codecs.compressing.CompressionMode; +import org.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingStoredFieldsFormat; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.SegmentInfo; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; + +/** Stored field format used by pluggable codec */ +public class Lucene95CustomStoredFieldsFormat extends StoredFieldsFormat { + + /** A key that we use to map to a mode */ + public static final String MODE_KEY = Lucene95CustomStoredFieldsFormat.class.getSimpleName() + ".mode"; + + private static final int ZSTD_BLOCK_LENGTH = 10 * 48 * 1024; + private static final int ZSTD_MAX_DOCS_PER_BLOCK = 4096; + private static final int ZSTD_BLOCK_SHIFT = 10; + + private final CompressionMode zstdCompressionMode; + private final CompressionMode zstdNoDictCompressionMode; + + private final Lucene95CustomCodec.Mode mode; + + /** default constructor */ + public Lucene95CustomStoredFieldsFormat() { + this(Lucene95CustomCodec.Mode.ZSTD, Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL); + } + + /** + * Creates a new instance. + * + * @param mode The mode represents ZSTD or ZSTDNODICT + */ + public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode) { + this(mode, Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL); + } + + /** + * Creates a new instance with the specified mode and compression level. + * + * @param mode The mode represents ZSTD or ZSTDNODICT + * @param compressionLevel The compression level for the mode. + */ + public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode, int compressionLevel) { + this.mode = Objects.requireNonNull(mode); + zstdCompressionMode = new ZstdCompressionMode(compressionLevel); + zstdNoDictCompressionMode = new ZstdNoDictCompressionMode(compressionLevel); + } + + @Override + public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo si, FieldInfos fn, IOContext context) throws IOException { + String value = si.getAttribute(MODE_KEY); + if (value == null) { + throw new IllegalStateException("missing value for " + MODE_KEY + " for segment: " + si.name); + } + Lucene95CustomCodec.Mode mode = Lucene95CustomCodec.Mode.valueOf(value); + return impl(mode).fieldsReader(directory, si, fn, context); + } + + @Override + public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo si, IOContext context) throws IOException { + String previous = si.putAttribute(MODE_KEY, mode.name()); + if (previous != null && previous.equals(mode.name()) == false) { + throw new IllegalStateException( + "found existing value for " + MODE_KEY + " for segment: " + si.name + " old = " + previous + ", new = " + mode.name() + ); + } + return impl(mode).fieldsWriter(directory, si, context); + } + + private StoredFieldsFormat impl(Lucene95CustomCodec.Mode mode) { + switch (mode) { + case ZSTD: + return new Lucene90CompressingStoredFieldsFormat( + "CustomStoredFieldsZstd", + zstdCompressionMode, + ZSTD_BLOCK_LENGTH, + ZSTD_MAX_DOCS_PER_BLOCK, + ZSTD_BLOCK_SHIFT + ); + case ZSTDNODICT: + return new Lucene90CompressingStoredFieldsFormat( + "CustomStoredFieldsZstdNoDict", + zstdNoDictCompressionMode, + ZSTD_BLOCK_LENGTH, + ZSTD_MAX_DOCS_PER_BLOCK, + ZSTD_BLOCK_SHIFT + ); + default: + throw new AssertionError(); + } + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/PerFieldMappingPostingFormatCodec.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/PerFieldMappingPostingFormatCodec.java new file mode 100644 index 0000000000000..f1c64853bca40 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/PerFieldMappingPostingFormatCodec.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.opensearch.index.mapper.MapperService; + +/** PerFieldMappingPostingFormatCodec. {@link org.opensearch.index.codec.PerFieldMappingPostingFormatCodec} */ +public class PerFieldMappingPostingFormatCodec extends Lucene95CustomCodec { + + /** + * Creates a new instance. + * + * @param compressionMode The compression mode (ZSTD or ZSTDNODICT). + * @param mapperService The mapper service. + */ + public PerFieldMappingPostingFormatCodec(Lucene95CustomCodec.Mode compressionMode, MapperService mapperService) { + super(compressionMode); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCodec.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCodec.java new file mode 100644 index 0000000000000..086e2461b1f6a --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCodec.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +/** + * ZstdCodec provides ZSTD compressor using the zstd-jni library. + */ +public class ZstdCodec extends Lucene95CustomCodec { + + /** + * Creates a new ZstdCodec instance with the default compression level. + */ + public ZstdCodec() { + this(DEFAULT_COMPRESSION_LEVEL); + } + + /** + * Creates a new ZstdCodec instance. + * + * @param compressionLevel The compression level. + */ + public ZstdCodec(int compressionLevel) { + super(Mode.ZSTD, compressionLevel); + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java new file mode 100644 index 0000000000000..795ddf3ab2d17 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java @@ -0,0 +1,203 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdCompressCtx; +import com.github.luben.zstd.ZstdDecompressCtx; +import com.github.luben.zstd.ZstdDictCompress; +import com.github.luben.zstd.ZstdDictDecompress; +import java.io.IOException; +import org.apache.lucene.codecs.compressing.CompressionMode; +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.BytesRef; + +/** Zstandard Compression Mode */ +public class ZstdCompressionMode extends CompressionMode { + + private static final int NUM_SUB_BLOCKS = 10; + private static final int DICT_SIZE_FACTOR = 6; + private static final int DEFAULT_COMPRESSION_LEVEL = 6; + + private final int compressionLevel; + + /** default constructor */ + protected ZstdCompressionMode() { + this.compressionLevel = DEFAULT_COMPRESSION_LEVEL; + } + + /** + * Creates a new instance. + * + * @param compressionLevel The compression level to use. + */ + protected ZstdCompressionMode(int compressionLevel) { + this.compressionLevel = compressionLevel; + } + + @Override + public Compressor newCompressor() { + return new ZstdCompressor(compressionLevel); + } + + @Override + public Decompressor newDecompressor() { + return new ZstdDecompressor(); + } + + /** zstandard compressor */ + private static final class ZstdCompressor extends Compressor { + + private final int compressionLevel; + private byte[] compressedBuffer; + + /** compressor with a given compresion level */ + public ZstdCompressor(int compressionLevel) { + this.compressionLevel = compressionLevel; + compressedBuffer = BytesRef.EMPTY_BYTES; + } + + /*resuable compress function*/ + private void doCompress(byte[] bytes, int offset, int length, ZstdCompressCtx cctx, DataOutput out) throws IOException { + if (length == 0) { + out.writeVInt(0); + return; + } + final int maxCompressedLength = (int) Zstd.compressBound(length); + compressedBuffer = ArrayUtil.grow(compressedBuffer, maxCompressedLength); + + int compressedSize = cctx.compressByteArray(compressedBuffer, 0, compressedBuffer.length, bytes, offset, length); + + out.writeVInt(compressedSize); + out.writeBytes(compressedBuffer, compressedSize); + } + + private void compress(byte[] bytes, int offset, int length, DataOutput out) throws IOException { + assert offset >= 0 : "offset value must be greater than 0"; + + final int dictLength = length / (NUM_SUB_BLOCKS * DICT_SIZE_FACTOR); + final int blockLength = (length - dictLength + NUM_SUB_BLOCKS - 1) / NUM_SUB_BLOCKS; + out.writeVInt(dictLength); + out.writeVInt(blockLength); + + final int end = offset + length; + assert end >= 0 : "buffer read size must be greater than 0"; + + try (ZstdCompressCtx cctx = new ZstdCompressCtx()) { + cctx.setLevel(compressionLevel); + + // dictionary compression first + doCompress(bytes, offset, dictLength, cctx, out); + cctx.loadDict(new ZstdDictCompress(bytes, offset, dictLength, compressionLevel)); + + for (int start = offset + dictLength; start < end; start += blockLength) { + int l = Math.min(blockLength, end - start); + doCompress(bytes, start, l, cctx, out); + } + } + } + + @Override + public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException { + final int length = (int) buffersInput.size(); + byte[] bytes = new byte[length]; + buffersInput.readBytes(bytes, 0, length); + compress(bytes, 0, length, out); + } + + @Override + public void close() throws IOException {} + } + + /** zstandard decompressor */ + private static final class ZstdDecompressor extends Decompressor { + + private byte[] compressedBuffer; + + /** default decompressor */ + public ZstdDecompressor() { + compressedBuffer = BytesRef.EMPTY_BYTES; + } + + /*resuable decompress function*/ + private void doDecompress(DataInput in, ZstdDecompressCtx dctx, BytesRef bytes, int decompressedLen) throws IOException { + final int compressedLength = in.readVInt(); + if (compressedLength == 0) { + return; + } + + compressedBuffer = ArrayUtil.grow(compressedBuffer, compressedLength); + in.readBytes(compressedBuffer, 0, compressedLength); + + bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + decompressedLen); + int uncompressed = dctx.decompressByteArray(bytes.bytes, bytes.length, decompressedLen, compressedBuffer, 0, compressedLength); + + if (decompressedLen != uncompressed) { + throw new IllegalStateException(decompressedLen + " " + uncompressed); + } + bytes.length += uncompressed; + } + + @Override + public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException { + assert offset + length <= originalLength : "buffer read size must be within limit"; + + if (length == 0) { + bytes.length = 0; + return; + } + final int dictLength = in.readVInt(); + final int blockLength = in.readVInt(); + bytes.bytes = ArrayUtil.grow(bytes.bytes, dictLength); + bytes.offset = bytes.length = 0; + + try (ZstdDecompressCtx dctx = new ZstdDecompressCtx()) { + + // decompress dictionary first + doDecompress(in, dctx, bytes, dictLength); + + dctx.loadDict(new ZstdDictDecompress(bytes.bytes, 0, dictLength)); + + int offsetInBlock = dictLength; + int offsetInBytesRef = offset; + + // Skip unneeded blocks + while (offsetInBlock + blockLength < offset) { + final int compressedLength = in.readVInt(); + in.skipBytes(compressedLength); + offsetInBlock += blockLength; + offsetInBytesRef -= blockLength; + } + + // Read blocks that intersect with the interval we need + while (offsetInBlock < offset + length) { + bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + blockLength); + int l = Math.min(blockLength, originalLength - offsetInBlock); + doDecompress(in, dctx, bytes, l); + offsetInBlock += blockLength; + } + + bytes.offset = offsetInBytesRef; + bytes.length = length; + + assert bytes.isValid() : "decompression output is corrupted"; + } + } + + @Override + public Decompressor clone() { + return new ZstdDecompressor(); + } + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCodec.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCodec.java new file mode 100644 index 0000000000000..c33ca1f4ff6e7 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCodec.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +/** + * ZstdNoDictCodec provides ZSTD compressor without a dictionary support. + */ +public class ZstdNoDictCodec extends Lucene95CustomCodec { + + /** + * Creates a new ZstdNoDictCodec instance with the default compression level. + */ + public ZstdNoDictCodec() { + this(DEFAULT_COMPRESSION_LEVEL); + } + + /** + * Creates a new ZstdNoDictCodec instance. + * + * @param compressionLevel The compression level. + */ + public ZstdNoDictCodec(int compressionLevel) { + super(Mode.ZSTDNODICT, compressionLevel); + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java new file mode 100644 index 0000000000000..61808191556f0 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java @@ -0,0 +1,178 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import com.github.luben.zstd.Zstd; +import java.io.IOException; +import org.apache.lucene.codecs.compressing.CompressionMode; +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.BytesRef; + +/** ZSTD Compression Mode (without a dictionary support). */ +public class ZstdNoDictCompressionMode extends CompressionMode { + + private static final int NUM_SUB_BLOCKS = 10; + private static final int DEFAULT_COMPRESSION_LEVEL = 6; + + private final int compressionLevel; + + /** default constructor */ + protected ZstdNoDictCompressionMode() { + this.compressionLevel = DEFAULT_COMPRESSION_LEVEL; + } + + /** + * Creates a new instance with the given compression level. + * + * @param compressionLevel The compression level. + */ + protected ZstdNoDictCompressionMode(int compressionLevel) { + this.compressionLevel = compressionLevel; + } + + @Override + public Compressor newCompressor() { + return new ZstdCompressor(compressionLevel); + } + + @Override + public Decompressor newDecompressor() { + return new ZstdDecompressor(); + } + + /** zstandard compressor */ + private static final class ZstdCompressor extends Compressor { + + private final int compressionLevel; + private byte[] compressedBuffer; + + /** compressor with a given compresion level */ + public ZstdCompressor(int compressionLevel) { + this.compressionLevel = compressionLevel; + compressedBuffer = BytesRef.EMPTY_BYTES; + } + + private void compress(byte[] bytes, int offset, int length, DataOutput out) throws IOException { + assert offset >= 0 : "offset value must be greater than 0"; + + int blockLength = (length + NUM_SUB_BLOCKS - 1) / NUM_SUB_BLOCKS; + out.writeVInt(blockLength); + + final int end = offset + length; + assert end >= 0 : "buffer read size must be greater than 0"; + + for (int start = offset; start < end; start += blockLength) { + int l = Math.min(blockLength, end - start); + + if (l == 0) { + out.writeVInt(0); + return; + } + + final int maxCompressedLength = (int) Zstd.compressBound(l); + compressedBuffer = ArrayUtil.grow(compressedBuffer, maxCompressedLength); + + int compressedSize = (int) Zstd.compressByteArray( + compressedBuffer, + 0, + compressedBuffer.length, + bytes, + start, + l, + compressionLevel + ); + + out.writeVInt(compressedSize); + out.writeBytes(compressedBuffer, compressedSize); + } + } + + @Override + public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException { + final int length = (int) buffersInput.size(); + byte[] bytes = new byte[length]; + buffersInput.readBytes(bytes, 0, length); + compress(bytes, 0, length, out); + } + + @Override + public void close() throws IOException {} + } + + /** zstandard decompressor */ + private static final class ZstdDecompressor extends Decompressor { + + private byte[] compressed; + + /** default decompressor */ + public ZstdDecompressor() { + compressed = BytesRef.EMPTY_BYTES; + } + + @Override + public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException { + assert offset + length <= originalLength : "buffer read size must be within limit"; + + if (length == 0) { + bytes.length = 0; + return; + } + + final int blockLength = in.readVInt(); + bytes.offset = bytes.length = 0; + int offsetInBlock = 0; + int offsetInBytesRef = offset; + + // Skip unneeded blocks + while (offsetInBlock + blockLength < offset) { + final int compressedLength = in.readVInt(); + in.skipBytes(compressedLength); + offsetInBlock += blockLength; + offsetInBytesRef -= blockLength; + } + + // Read blocks that intersect with the interval we need + while (offsetInBlock < offset + length) { + bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + blockLength); + final int compressedLength = in.readVInt(); + if (compressedLength == 0) { + return; + } + compressed = ArrayUtil.grow(compressed, compressedLength); + in.readBytes(compressed, 0, compressedLength); + + int l = Math.min(blockLength, originalLength - offsetInBlock); + bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + l); + + byte[] output = new byte[l]; + + final int uncompressed = (int) Zstd.decompressByteArray(output, 0, l, compressed, 0, compressedLength); + System.arraycopy(output, 0, bytes.bytes, bytes.length, uncompressed); + + bytes.length += uncompressed; + offsetInBlock += blockLength; + } + + bytes.offset = offsetInBytesRef; + bytes.length = length; + + assert bytes.isValid() : "decompression output is corrupted."; + } + + @Override + public Decompressor clone() { + return new ZstdDecompressor(); + } + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java new file mode 100644 index 0000000000000..e996873963b1b --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * A plugin that implements compression codecs with native implementation. + */ +package org.opensearch.index.codec.customcodecs; diff --git a/sandbox/modules/custom-codecs/src/main/plugin-metadata/plugin-security.policy b/sandbox/modules/custom-codecs/src/main/plugin-metadata/plugin-security.policy new file mode 100644 index 0000000000000..8161010cfa897 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/plugin-metadata/plugin-security.policy @@ -0,0 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +grant codeBase "${codebase.zstd-jni}" { + permission java.lang.RuntimePermission "loadLibrary.*"; +}; diff --git a/sandbox/modules/custom-codecs/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec b/sandbox/modules/custom-codecs/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec new file mode 100644 index 0000000000000..8b37d91cd8bc4 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec @@ -0,0 +1,2 @@ +org.opensearch.index.codec.customcodecs.ZstdCodec +org.opensearch.index.codec.customcodecs.ZstdNoDictCodec diff --git a/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java b/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java new file mode 100644 index 0000000000000..fcfb06ca6b050 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java @@ -0,0 +1,219 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.tests.util.LineFileDocs; +import org.apache.lucene.tests.util.TestUtil; +import org.opensearch.test.OpenSearchTestCase; +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; +import org.apache.lucene.store.ByteArrayDataInput; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.ByteBuffersDataOutput; +import org.apache.lucene.util.BytesRef; + +import java.util.List; +import java.nio.ByteBuffer; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Random; + +/** + * Test cases for compressors (based on {@See org.opensearch.common.compress.DeflateCompressTests}). + */ +public abstract class AbstractCompressorTests extends OpenSearchTestCase { + + abstract Compressor compressor(); + + abstract Decompressor decompressor(); + + public void testEmpty() throws IOException { + final byte[] bytes = "".getBytes(StandardCharsets.UTF_8); + doTest(bytes); + } + + public void testShortLiterals() throws IOException { + final byte[] bytes = "1234567345673456745608910123".getBytes(StandardCharsets.UTF_8); + doTest(bytes); + } + + public void testRandom() throws IOException { + Random r = random(); + for (int i = 0; i < 10; i++) { + final byte[] bytes = new byte[TestUtil.nextInt(r, 1, 100000)]; + r.nextBytes(bytes); + doTest(bytes); + } + } + + public void testLineDocs() throws IOException { + Random r = random(); + LineFileDocs lineFileDocs = new LineFileDocs(r); + for (int i = 0; i < 10; i++) { + int numDocs = TestUtil.nextInt(r, 1, 200); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + for (int j = 0; j < numDocs; j++) { + String s = lineFileDocs.nextDoc().get("body"); + bos.write(s.getBytes(StandardCharsets.UTF_8)); + } + doTest(bos.toByteArray()); + } + lineFileDocs.close(); + } + + public void testRepetitionsL() throws IOException { + Random r = random(); + for (int i = 0; i < 10; i++) { + int numLongs = TestUtil.nextInt(r, 1, 10000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + long theValue = r.nextLong(); + for (int j = 0; j < numLongs; j++) { + if (r.nextInt(10) == 0) { + theValue = r.nextLong(); + } + bos.write((byte) (theValue >>> 56)); + bos.write((byte) (theValue >>> 48)); + bos.write((byte) (theValue >>> 40)); + bos.write((byte) (theValue >>> 32)); + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + doTest(bos.toByteArray()); + } + } + + public void testRepetitionsI() throws IOException { + Random r = random(); + for (int i = 0; i < 10; i++) { + int numInts = TestUtil.nextInt(r, 1, 20000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int theValue = r.nextInt(); + for (int j = 0; j < numInts; j++) { + if (r.nextInt(10) == 0) { + theValue = r.nextInt(); + } + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + doTest(bos.toByteArray()); + } + } + + public void testRepetitionsS() throws IOException { + Random r = random(); + for (int i = 0; i < 10; i++) { + int numShorts = TestUtil.nextInt(r, 1, 40000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + short theValue = (short) r.nextInt(65535); + for (int j = 0; j < numShorts; j++) { + if (r.nextInt(10) == 0) { + theValue = (short) r.nextInt(65535); + } + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + doTest(bos.toByteArray()); + } + } + + public void testMixed() throws IOException { + Random r = random(); + LineFileDocs lineFileDocs = new LineFileDocs(r); + for (int i = 0; i < 2; ++i) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int prevInt = r.nextInt(); + long prevLong = r.nextLong(); + while (bos.size() < 400000) { + switch (r.nextInt(4)) { + case 0: + addInt(r, prevInt, bos); + break; + case 1: + addLong(r, prevLong, bos); + break; + case 2: + addString(lineFileDocs, bos); + break; + case 3: + addBytes(r, bos); + break; + default: + throw new IllegalStateException("Random is broken"); + } + } + doTest(bos.toByteArray()); + } + } + + private void addLong(Random r, long prev, ByteArrayOutputStream bos) { + long theValue = prev; + if (r.nextInt(10) != 0) { + theValue = r.nextLong(); + } + bos.write((byte) (theValue >>> 56)); + bos.write((byte) (theValue >>> 48)); + bos.write((byte) (theValue >>> 40)); + bos.write((byte) (theValue >>> 32)); + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + + private void addInt(Random r, int prev, ByteArrayOutputStream bos) { + int theValue = prev; + if (r.nextInt(10) != 0) { + theValue = r.nextInt(); + } + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + + private void addString(LineFileDocs lineFileDocs, ByteArrayOutputStream bos) throws IOException { + String s = lineFileDocs.nextDoc().get("body"); + bos.write(s.getBytes(StandardCharsets.UTF_8)); + } + + private void addBytes(Random r, ByteArrayOutputStream bos) throws IOException { + byte bytes[] = new byte[TestUtil.nextInt(r, 1, 10000)]; + r.nextBytes(bytes); + bos.write(bytes); + } + + private void doTest(byte[] bytes) throws IOException { + final int length = bytes.length; + + ByteBuffersDataInput in = new ByteBuffersDataInput(List.of(ByteBuffer.wrap(bytes))); + ByteBuffersDataOutput out = new ByteBuffersDataOutput(); + + // let's compress + Compressor compressor = compressor(); + compressor.compress(in, out); + byte[] compressed = out.toArrayCopy(); + + // let's decompress + BytesRef outbytes = new BytesRef(); + Decompressor decompressor = decompressor(); + decompressor.decompress(new ByteArrayDataInput(compressed), length, 0, length, outbytes); + + // get the uncompressed array out of outbytes + byte[] restored = new byte[outbytes.length]; + System.arraycopy(outbytes.bytes, 0, restored, 0, outbytes.length); + + assertArrayEquals(bytes, restored); + } + +} diff --git a/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java b/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java new file mode 100644 index 0000000000000..78cf62c08f889 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; + +/** + * Test ZSTD compression (with dictionary enabled) + */ +public class ZstdCompressorTests extends AbstractCompressorTests { + + private final Compressor compressor = new ZstdCompressionMode().newCompressor(); + private final Decompressor decompressor = new ZstdCompressionMode().newDecompressor(); + + @Override + Compressor compressor() { + return compressor; + } + + @Override + Decompressor decompressor() { + return decompressor; + } +} diff --git a/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java b/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java new file mode 100644 index 0000000000000..2eda81a6af2ab --- /dev/null +++ b/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; + +/** + * Test ZSTD compression (with no dictionary). + */ +public class ZstdNoDictCompressorTests extends AbstractCompressorTests { + + private final Compressor compressor = new ZstdNoDictCompressionMode().newCompressor(); + private final Decompressor decompressor = new ZstdNoDictCompressionMode().newDecompressor(); + + @Override + Compressor compressor() { + return compressor; + } + + @Override + Decompressor decompressor() { + return decompressor; + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 1b24c63a5fed9..91d0b0ae06387 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -737,6 +737,7 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception { final SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(replicaShard.store().directory()); replicaShard.finalizeReplication(segmentInfos); + ensureYellow(INDEX_NAME); final int docCount = scaledRandomIntBetween(10, 200); for (int i = 0; i < docCount; i++) { @@ -744,7 +745,8 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception { refresh(INDEX_NAME); } // Refresh, this should trigger round of segment replication - assertBusy(() -> { assertDocCounts(docCount, replicaNode); }); + waitForSearchableDocs(docCount, primaryNode, replicaNode); + verifyStoreContent(); final IndexShard replicaAfterFailure = getIndexShard(replicaNode, INDEX_NAME); assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId()); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 86e4e50a08a38..1644d1c3e63ba 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -282,22 +282,18 @@ public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogRefresh() throws Exce testPeerRecovery(false, randomIntBetween(2, 5), false); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193") public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataFlush() throws Exception { testPeerRecovery(true, 1, true); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193") public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogFlush() throws Exception { testPeerRecovery(true, randomIntBetween(2, 5), true); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193") public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataRefresh() throws Exception { testPeerRecovery(true, 1, false); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193") public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogRefresh() throws Exception { testPeerRecovery(true, randomIntBetween(2, 5), false); } diff --git a/server/src/main/java/org/opensearch/common/io/VersionedCodecStreamWrapper.java b/server/src/main/java/org/opensearch/common/io/VersionedCodecStreamWrapper.java index 9a1a951b0796e..ff0af3954a3a3 100644 --- a/server/src/main/java/org/opensearch/common/io/VersionedCodecStreamWrapper.java +++ b/server/src/main/java/org/opensearch/common/io/VersionedCodecStreamWrapper.java @@ -11,7 +11,6 @@ import java.io.IOException; import org.apache.lucene.codecs.CodecUtil; -import org.apache.lucene.store.BufferedChecksumIndexInput; import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; @@ -47,10 +46,9 @@ public VersionedCodecStreamWrapper(IndexIOStreamHandler indexIOStreamHandler, * @return stream content parsed into {@link T} */ public T readStream(IndexInput indexInput) throws IOException { - ChecksumIndexInput checksumIndexInput = new BufferedChecksumIndexInput(indexInput); - int readStreamVersion = checkHeader(checksumIndexInput); - T content = getHandlerForVersion(readStreamVersion).readContent(checksumIndexInput); - checkFooter(checksumIndexInput); + CodecUtil.checksumEntireFile(indexInput); + int readStreamVersion = checkHeader(indexInput); + T content = getHandlerForVersion(readStreamVersion).readContent(indexInput); return content; } diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index a4ba510d013bd..bdb043b7b9aa1 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -503,7 +503,7 @@ public IndexService newIndexService( NamedWriteableRegistry namedWriteableRegistry, BooleanSupplier idFieldDataEnabled, ValuesSourceRegistry valuesSourceRegistry, - IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory, + IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, BiFunction translogFactorySupplier ) throws IOException { final IndexEventListener eventListener = freeze(); diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index eacb07f2e5c9c..a77ea53d7560c 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -138,7 +138,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final NodeEnvironment nodeEnv; private final ShardStoreDeleter shardStoreDeleter; private final IndexStorePlugin.DirectoryFactory directoryFactory; - private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory; + private final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory; private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory; private final CheckedFunction readerWrapper; private final IndexCache indexCache; @@ -194,7 +194,7 @@ public IndexService( Client client, QueryCache queryCache, IndexStorePlugin.DirectoryFactory directoryFactory, - IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory, + IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, IndexEventListener eventListener, Function> wrapperFactory, MapperRegistry mapperRegistry, @@ -470,11 +470,7 @@ public synchronized IndexShard createShard( Store remoteStore = null; if (this.indexSettings.isRemoteStoreEnabled()) { - Directory remoteDirectory = remoteDirectoryFactory.newDirectory( - this.indexSettings.getRemoteStoreRepository(), - this.indexSettings, - path - ); + Directory remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path); remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 2629b2c954015..ab83d9b0f4bf1 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -494,6 +494,13 @@ public boolean isSystem() { return indexSettings.getIndexMetadata().isSystem(); } + /** + * Returns the name of the default codec in codecService + */ + public String getDefaultCodecName() { + return codecService.codec(CodecService.DEFAULT_CODEC).getName(); + } + /** * USE THIS METHOD WITH CARE! * Returns the primary term the index shard is supposed to be on. In case of primary promotion or when a replica learns about @@ -1484,7 +1491,7 @@ public Tuple, ReplicationCheckpoint> getLatestSegme return null; } if (getEngineOrNull() == null) { - return new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId)); + return new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId, getDefaultCodecName())); } // do not close the snapshot - caller will close it. final GatedCloseable snapshot = getSegmentInfosSnapshot(); @@ -1501,13 +1508,14 @@ public Tuple, ReplicationCheckpoint> getLatestSegme // getSegmentInfosSnapshot, so computing length from SegmentInfos can cause issues. shardRouting.primary() ? store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum() - : store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes() + : store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes(), + getEngine().config().getCodec().getName() ) ); } catch (IOException e) { throw new OpenSearchException("Error Fetching SegmentInfos and latest checkpoint", e); } - }).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId))); + }).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId, getDefaultCodecName()))); } /** @@ -1577,6 +1585,12 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp ); return false; } + if (localCheckpoint.getCodec().equals(requestCheckpoint.getCodec()) == false) { + logger.trace( + () -> new ParameterizedMessage("Shard does not support the received lucene codec version {}", requestCheckpoint.getCodec()) + ); + return false; + } return true; } @@ -3078,9 +3092,11 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final S * calculations of the global checkpoint. However, we can not assert that we are in the translog stage of recovery here as * while the global checkpoint update may have emanated from the primary when we were in that state, we could subsequently move * to recovery finalization, or even finished recovery before the update arrives here. + * When remote translog is enabled for an index, replication operation is limited to primary term validation and does not + * update local checkpoint at replica, so the local checkpoint at replica can be less than globalCheckpoint. */ - assert state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED - : "supposedly in-sync shard copy received a global checkpoint [" + assert (state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED) + || indexSettings.isRemoteTranslogStoreEnabled() : "supposedly in-sync shard copy received a global checkpoint [" + globalCheckpoint + "] " + "that is higher than its local checkpoint [" @@ -4402,12 +4418,25 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re ((RemoteSegmentStoreDirectory) remoteDirectory).init(); Map uploadedSegments = ((RemoteSegmentStoreDirectory) remoteDirectory) .getSegmentsUploadedToRemoteStore(); - final Directory storeDirectory = store.directory(); store.incRef(); remoteStore.incRef(); List downloadedSegments = new ArrayList<>(); List skippedSegments = new ArrayList<>(); try { + final Directory storeDirectory; + if (recoveryState.getStage() == RecoveryState.Stage.INDEX) { + storeDirectory = new StoreRecovery.StatsDirectoryWrapper(store.directory(), recoveryState.getIndex()); + for (String file : uploadedSegments.keySet()) { + long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum()); + if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) { + recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), false); + } else { + recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), true); + } + } + } else { + storeDirectory = store.directory(); + } String segmentInfosSnapshotFilename = null; Set localSegmentFiles = Sets.newHashSet(storeDirectory.listAll()); for (String file : uploadedSegments.keySet()) { diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 972a76bc17eb5..31a863129cc8c 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -268,7 +268,9 @@ public void copyFrom(Directory from, String src, String dest, IOContext context) in.copyFrom(new FilterDirectory(from) { @Override public IndexInput openInput(String name, IOContext context) throws IOException { - index.addFileDetail(dest, l, false); + if (index.getFileDetails(dest) == null) { + index.addFileDetail(dest, l, false); + } copies.set(true); final IndexInput input = in.openInput(name, context); return new IndexInput("StatsDirectoryWrapper(" + input.toString() + ")") { @@ -311,7 +313,7 @@ public void readBytes(byte[] b, int offset, int len) throws IOException { }; } }, src, dest, context); - if (copies.get() == false) { + if (copies.get() == false && index.getFileDetails(dest) == null) { index.addFileDetail(dest, l, true); // hardlinked - we treat it as reused since the file was already somewhat there } else { assert index.getFileDetails(dest) != null : "File [" + dest + "] has no file details"; diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index a0bd32403bd39..c385303813844 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -17,6 +17,7 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.opensearch.common.UUIDs; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; @@ -135,7 +136,9 @@ private Map readLatestMetadataFile() throws IOE private Map readMetadataFile(String metadataFilename) throws IOException { try (IndexInput indexInput = remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)) { - RemoteSegmentMetadata metadata = metadataStreamWrapper.readStream(indexInput); + byte[] metadataBytes = new byte[(int) indexInput.length()]; + indexInput.readBytes(metadataBytes, 0, (int) indexInput.length()); + RemoteSegmentMetadata metadata = metadataStreamWrapper.readStream(new ByteArrayIndexInput(metadataFilename, metadataBytes)); return metadata.getMetadata(); } } @@ -150,25 +153,31 @@ public static class UploadedSegmentMetadata { private final String originalFilename; private final String uploadedFilename; private final String checksum; + private final long length; - UploadedSegmentMetadata(String originalFilename, String uploadedFilename, String checksum) { + UploadedSegmentMetadata(String originalFilename, String uploadedFilename, String checksum, long length) { this.originalFilename = originalFilename; this.uploadedFilename = uploadedFilename; this.checksum = checksum; + this.length = length; } @Override public String toString() { - return String.join(SEPARATOR, originalFilename, uploadedFilename, checksum); + return String.join(SEPARATOR, originalFilename, uploadedFilename, checksum, String.valueOf(length)); } public String getChecksum() { return this.checksum; } + public long getLength() { + return this.length; + } + public static UploadedSegmentMetadata fromString(String uploadedFilename) { String[] values = uploadedFilename.split(SEPARATOR); - return new UploadedSegmentMetadata(values[0], values[1], values[2]); + return new UploadedSegmentMetadata(values[0], values[1], values[2], Long.parseLong(values[3])); } } @@ -270,6 +279,9 @@ public void deleteFile(String name) throws IOException { */ @Override public long fileLength(String name) throws IOException { + if (segmentsUploadedToRemoteStore.containsKey(name)) { + return segmentsUploadedToRemoteStore.get(name).getLength(); + } String remoteFilename = getExistingRemoteFilename(name); if (remoteFilename != null) { return remoteDataDirectory.fileLength(remoteFilename); @@ -314,7 +326,7 @@ public void copyFrom(Directory from, String src, String dest, IOContext context, } remoteDataDirectory.copyFrom(from, src, remoteFilename, context); String checksum = getChecksumOfLocalFile(from, src); - UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum); + UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum, from.fileLength(src)); segmentsUploadedToRemoteStore.put(src, segmentMetadata); } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index e77eb52bd3891..cb5548167a577 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -27,7 +27,7 @@ * * @opensearch.internal */ -public class RemoteSegmentStoreDirectoryFactory implements IndexStorePlugin.RemoteDirectoryFactory { +public class RemoteSegmentStoreDirectoryFactory implements IndexStorePlugin.DirectoryFactory { private final Supplier repositoriesService; @@ -36,7 +36,8 @@ public RemoteSegmentStoreDirectoryFactory(Supplier reposito } @Override - public Directory newDirectory(String repositoryName, IndexSettings indexSettings, ShardPath path) throws IOException { + public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throws IOException { + String repositoryName = indexSettings.getRemoteStoreRepository(); try (Repository repository = repositoriesService.get().repository(repositoryName)) { assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; BlobPath commonBlobPath = ((BlobStoreRepository) repository).basePath(); diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java index 75b28baafe57f..f36055e5d7327 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java @@ -17,6 +17,8 @@ import org.opensearch.index.store.remote.utils.cache.stats.StatsCounter; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.Objects; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; @@ -45,7 +47,7 @@ class LRUCache implements RefCountedCache { private final HashMap> data; /** the LRU list */ - private final LinkedDeque> lru; + private final LinkedHashMap> lru; private final RemovalListener listener; @@ -53,7 +55,7 @@ class LRUCache implements RefCountedCache { private final StatsCounter statsCounter; - private volatile ReentrantLock lock; + private final ReentrantLock lock; /** * this tracks cache usage on the system (as long as cache entry is in the cache) @@ -65,51 +67,25 @@ class LRUCache implements RefCountedCache { */ private long activeUsage; - static class Node implements Linked> { + static class Node { final K key; V value; long weight; - Node prev; - - Node next; - int refCount; Node(K key, V value, long weight) { this.key = key; this.value = value; this.weight = weight; - this.prev = null; - this.next = null; this.refCount = 0; } - public Node getPrevious() { - return prev; - } - - public void setPrevious(Node prev) { - this.prev = prev; - } - - public Node getNext() { - return next; - } - - public void setNext(Node next) { - this.next = next; - } - public boolean evictable() { return (refCount == 0); } - - V getValue() { - return value; - } } public LRUCache(long capacity, RemovalListener listener, Weigher weigher) { @@ -117,7 +93,7 @@ public LRUCache(long capacity, RemovalListener listener, Weigher weighe this.listener = listener; this.weigher = weigher; this.data = new HashMap<>(); - this.lru = new LinkedDeque<>(); + this.lru = new LinkedHashMap<>(); this.lock = new ReentrantLock(); this.statsCounter = new DefaultStatsCounter<>(); @@ -126,7 +102,6 @@ public LRUCache(long capacity, RemovalListener listener, Weigher weighe @Override public V get(K key) { Objects.requireNonNull(key); - final ReentrantLock lock = this.lock; lock.lock(); try { Node node = data.get(key); @@ -149,7 +124,6 @@ public V put(K key, V value) { Objects.requireNonNull(key); Objects.requireNonNull(value); - final ReentrantLock lock = this.lock; lock.lock(); try { Node node = data.get(key); @@ -170,7 +144,6 @@ public V put(K key, V value) { public V compute(K key, BiFunction remappingFunction) { Objects.requireNonNull(key); Objects.requireNonNull(remappingFunction); - final ReentrantLock lock = this.lock; lock.lock(); try { final Node node = data.get(key); @@ -203,7 +176,6 @@ public V compute(K key, BiFunction remappingF @Override public void remove(K key) { Objects.requireNonNull(key); - final ReentrantLock lock = this.lock; lock.lock(); try { removeNode(key); @@ -214,7 +186,6 @@ public void remove(K key) { @Override public void clear() { - final ReentrantLock lock = this.lock; lock.lock(); try { usage = 0L; @@ -238,7 +209,6 @@ public long size() { @Override public void incRef(K key) { Objects.requireNonNull(key); - final ReentrantLock lock = this.lock; lock.lock(); try { Node node = data.get(key); @@ -250,7 +220,7 @@ public void incRef(K key) { if (node.evictable()) { // since it become active, we should remove it from eviction list - lru.remove(node); + lru.remove(node.key); } node.refCount++; @@ -264,7 +234,6 @@ public void incRef(K key) { @Override public void decRef(K key) { Objects.requireNonNull(key); - final ReentrantLock lock = this.lock; lock.lock(); try { Node node = data.get(key); @@ -273,7 +242,7 @@ public void decRef(K key) { if (node.evictable()) { // if it becomes evictable, we should add it to eviction list - lru.add(node); + lru.put(node.key, node); } if (node.refCount == 0) { @@ -289,22 +258,17 @@ public void decRef(K key) { @Override public long prune() { long sum = 0L; - final ReentrantLock lock = this.lock; lock.lock(); try { - Node node = lru.peek(); - // If weighted values are used, then the pending operations will adjust - // the size to reflect the correct weight - while (node != null) { + final Iterator> iterator = lru.values().iterator(); + while (iterator.hasNext()) { + final Node node = iterator.next(); + iterator.remove(); data.remove(node.key, node); sum += node.weight; statsCounter.recordRemoval(node.weight); listener.onRemoval(new RemovalNotification<>(node.key, node.value, RemovalReason.EXPLICIT)); - Node tmp = node; - node = node.getNext(); - lru.remove(tmp); } - usage -= sum; } finally { lock.unlock(); @@ -314,7 +278,6 @@ public long prune() { @Override public CacheUsage usage() { - final ReentrantLock lock = this.lock; lock.lock(); try { return new CacheUsage(usage, activeUsage); @@ -325,7 +288,6 @@ public CacheUsage usage() { @Override public CacheStats stats() { - final ReentrantLock lock = this.lock; lock.lock(); try { return statsCounter.snapshot(); @@ -372,7 +334,7 @@ private void removeNode(K key) { } usage -= node.weight; if (node.evictable()) { - lru.remove(node); + lru.remove(node.key); } statsCounter.recordRemoval(node.weight); listener.onRemoval(new RemovalNotification<>(node.key, node.value, RemovalReason.EXPLICIT)); @@ -386,13 +348,10 @@ private boolean hasOverflowed() { private void evict() { // Attempts to evict entries from the cache if it exceeds the maximum // capacity. - while (hasOverflowed()) { - final Node node = lru.poll(); - - if (node == null) { - return; - } - + final Iterator> iterator = lru.values().iterator(); + while (hasOverflowed() && iterator.hasNext()) { + final Node node = iterator.next(); + iterator.remove(); // Notify the listener only if the entry was evicted data.remove(node.key, node); usage -= node.weight; diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/Linked.java b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/Linked.java deleted file mode 100644 index 0982909aca874..0000000000000 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/Linked.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.store.remote.utils.cache; - -import java.util.Deque; - -/** - * An element that is linked on the {@link Deque}. - * - * @opensearch.internal - */ -public interface Linked> { - - /** - * Retrieves the previous element or null if either the element is - * unlinked or the first element on the deque. - */ - T getPrevious(); - - /** - * Sets the previous element or null if there is no link. - */ - void setPrevious(T prev); - - /** - * Retrieves the next element or null if either the element is - * unlinked or the last element on the deque. - */ - T getNext(); - - /** - * Sets the next element or null if there is no link. - */ - void setNext(T next); -} diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LinkedDeque.java b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LinkedDeque.java deleted file mode 100644 index 4ca42a7fcf24d..0000000000000 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LinkedDeque.java +++ /dev/null @@ -1,432 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.store.remote.utils.cache; - -import java.util.AbstractCollection; -import java.util.Collection; -import java.util.Deque; -import java.util.Iterator; -import java.util.NoSuchElementException; - -/** - * Linked list implementation of the {@link Deque} interface where the link - * pointers are tightly integrated with the element. Linked deques have no - * capacity restrictions; they grow as necessary to support usage. They are not - * thread-safe; in the absence of external synchronization, they do not support - * concurrent access by multiple threads. Null elements are prohibited. - *

- * Most LinkedDeque operations run in constant time by assuming that - * the {@link Linked} parameter is associated with the deque instance. Any usage - * that violates this assumption will result in non-deterministic behavior. - *

- * The iterators returned by this class are not fail-fast: If - * the deque is modified at any time after the iterator is created, the iterator - * will be in an unknown state. Thus, in the face of concurrent modification, - * the iterator risks arbitrary, non-deterministic behavior at an undetermined - * time in the future. - * - * @param the type of elements held in this collection - * - * @opensearch.internal - */ -public final class LinkedDeque> extends AbstractCollection implements Deque { - // This class provides a doubly-linked list that is optimized for the virtual - // machine. The first and last elements are manipulated instead of a slightly - // more convenient sentinel element to avoid the insertion of null checks with - // NullPointerException throws in the byte code. The links to a removed - // element are cleared to help a generational garbage collector if the - // discarded elements inhabit more than one generation. - - /** - * Pointer to first node. - * Invariant: (first == null && last == null) || - * (first.prev == null) - */ - E first; - - /** - * Pointer to last node. - * Invariant: (first == null && last == null) || - * (last.next == null) - */ - E last; - - /** - * Links the element to the front of the deque so that it becomes the first - * element. - * - * @param e the unlinked element - */ - void linkFirst(final E e) { - final E f = first; - first = e; - - if (f == null) { - last = e; - } else { - f.setPrevious(e); - e.setNext(f); - } - } - - /** - * Links the element to the back of the deque so that it becomes the last - * element. - * - * @param e the unlinked element - */ - void linkLast(final E e) { - final E l = last; - last = e; - - if (l == null) { - first = e; - } else { - l.setNext(e); - e.setPrevious(l); - } - } - - /** - * Unlinks the non-null first element. - */ - E unlinkFirst() { - final E f = first; - final E next = f.getNext(); - f.setNext(null); - - first = next; - if (next == null) { - last = null; - } else { - next.setPrevious(null); - } - return f; - } - - /** - * Unlinks the non-null last element. - */ - E unlinkLast() { - final E l = last; - final E prev = l.getPrevious(); - l.setPrevious(null); - last = prev; - if (prev == null) { - first = null; - } else { - prev.setNext(null); - } - return l; - } - - /** - * Unlinks the non-null element. - */ - void unlink(E e) { - final E prev = e.getPrevious(); - final E next = e.getNext(); - - if (prev == null) { - first = next; - } else { - prev.setNext(next); - e.setPrevious(null); - } - - if (next == null) { - last = prev; - } else { - next.setPrevious(prev); - e.setNext(null); - } - } - - @Override - public boolean isEmpty() { - return (first == null); - } - - void checkNotEmpty() { - if (isEmpty()) { - throw new NoSuchElementException(); - } - } - - /** - * {@inheritDoc} - *

- * Beware that, unlike in most collections, this method is NOT a - * constant-time operation. - */ - @Override - public int size() { - int size = 0; - for (E e = first; e != null; e = e.getNext()) { - size++; - } - return size; - } - - @Override - public void clear() { - for (E e = first; e != null;) { - E next = e.getNext(); - e.setPrevious(null); - e.setNext(null); - e = next; - } - first = last = null; - } - - @Override - public boolean contains(Object o) { - return (o instanceof Linked) && contains((Linked) o); - } - - // A fast-path containment check - boolean contains(Linked e) { - return (e.getPrevious() != null) || (e.getNext() != null) || (e == first); - } - - /** - * Moves the element to the front of the deque so that it becomes the first - * element. - * - * @param e the linked element - */ - public void moveToFront(E e) { - if (e != first) { - unlink(e); - linkFirst(e); - } - } - - /** - * Moves the element to the back of the deque so that it becomes the last - * element. - * - * @param e the linked element - */ - public void moveToBack(E e) { - if (e != last) { - unlink(e); - linkLast(e); - } - } - - @Override - public E peek() { - return peekFirst(); - } - - @Override - public E peekFirst() { - return first; - } - - @Override - public E peekLast() { - return last; - } - - @Override - public E getFirst() { - checkNotEmpty(); - return peekFirst(); - } - - @Override - public E getLast() { - checkNotEmpty(); - return peekLast(); - } - - @Override - public E element() { - return getFirst(); - } - - @Override - public boolean offer(E e) { - return offerLast(e); - } - - @Override - public boolean offerFirst(E e) { - if (contains(e)) { - return false; - } - linkFirst(e); - return true; - } - - @Override - public boolean offerLast(E e) { - if (contains(e)) { - return false; - } - linkLast(e); - return true; - } - - @Override - public boolean add(E e) { - return offerLast(e); - } - - @Override - public void addFirst(E e) { - if (!offerFirst(e)) { - throw new IllegalArgumentException(); - } - } - - @Override - public void addLast(E e) { - if (!offerLast(e)) { - throw new IllegalArgumentException(); - } - } - - @Override - public E poll() { - return pollFirst(); - } - - @Override - public E pollFirst() { - return isEmpty() ? null : unlinkFirst(); - } - - @Override - public E pollLast() { - return isEmpty() ? null : unlinkLast(); - } - - @Override - public E remove() { - return removeFirst(); - } - - @Override - @SuppressWarnings("unchecked") - public boolean remove(Object o) { - return (o instanceof Linked) && remove((E) o); - } - - // A fast-path removal - boolean remove(E e) { - if (contains(e)) { - unlink(e); - return true; - } - return false; - } - - @Override - public E removeFirst() { - checkNotEmpty(); - return pollFirst(); - } - - @Override - public boolean removeFirstOccurrence(Object o) { - return remove(o); - } - - @Override - public E removeLast() { - checkNotEmpty(); - return pollLast(); - } - - @Override - public boolean removeLastOccurrence(Object o) { - return remove(o); - } - - @Override - public boolean removeAll(Collection c) { - boolean modified = false; - for (Object o : c) { - modified |= remove(o); - } - return modified; - } - - @Override - public void push(E e) { - addFirst(e); - } - - @Override - public E pop() { - return removeFirst(); - } - - @Override - public Iterator iterator() { - return new AbstractLinkedIterator(first) { - @Override - E computeNext() { - return cursor.getNext(); - } - }; - } - - @Override - public Iterator descendingIterator() { - return new AbstractLinkedIterator(last) { - @Override - E computeNext() { - return cursor.getPrevious(); - } - }; - } - - abstract class AbstractLinkedIterator implements Iterator { - E cursor; - - /** - * Creates an iterator that can can traverse the deque. - * - * @param start the initial element to begin traversal from - */ - AbstractLinkedIterator(E start) { - cursor = start; - } - - @Override - public boolean hasNext() { - return (cursor != null); - } - - @Override - public E next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - E e = cursor; - cursor = computeNext(); - return e; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - /** - * Retrieves the next element to traverse to or null if there are - * no more elements. - */ - abstract E computeNext(); - } -} diff --git a/server/src/main/java/org/opensearch/indices/IndicesQueryCache.java b/server/src/main/java/org/opensearch/indices/IndicesQueryCache.java index 78970267f67c2..2669da3f417c3 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesQueryCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesQueryCache.java @@ -128,13 +128,17 @@ public QueryCacheStats getStats(ShardId shard) { // We also have some shared ram usage that we try to distribute to // proportionally to their number of cache entries of each shard - long totalSize = 0; - for (QueryCacheStats s : stats.values()) { - totalSize += s.getCacheSize(); + if (stats.isEmpty()) { + shardStats.add(new QueryCacheStats(sharedRamBytesUsed, 0, 0, 0, 0)); + } else { + long totalSize = 0; + for (QueryCacheStats s : stats.values()) { + totalSize += s.getCacheSize(); + } + final double weight = totalSize == 0 ? 1d / stats.size() : ((double) shardStats.getCacheSize()) / totalSize; + final long additionalRamBytesUsed = Math.round(weight * sharedRamBytesUsed); + shardStats.add(new QueryCacheStats(additionalRamBytesUsed, 0, 0, 0, 0)); } - final double weight = totalSize == 0 ? 1d / stats.size() : ((double) shardStats.getCacheSize()) / totalSize; - final long additionalRamBytesUsed = Math.round(weight * sharedRamBytesUsed); - shardStats.add(new QueryCacheStats(additionalRamBytesUsed, 0, 0, 0, 0)); return shardStats; } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 9c83a33a37712..bb880f2df58da 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -288,7 +288,7 @@ public class IndicesService extends AbstractLifecycleComponent private final Set danglingIndicesToWrite = Sets.newConcurrentHashSet(); private final boolean nodeWriteDanglingIndicesInfo; private final ValuesSourceRegistry valuesSourceRegistry; - private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory; + private final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory; private final BiFunction translogFactorySupplier; @Override @@ -318,7 +318,7 @@ public IndicesService( Map directoryFactories, ValuesSourceRegistry valuesSourceRegistry, Map recoveryStateFactories, - IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory, + IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, Supplier repositoriesServiceSupplier ) { this.settings = settings; @@ -433,7 +433,7 @@ public IndicesService( Map directoryFactories, ValuesSourceRegistry valuesSourceRegistry, Map recoveryStateFactories, - IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory, + IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, Supplier repositoriesServiceSupplier ) { this.settings = settings; diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 6f04c6cf6f665..3ab0a7539fb06 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -14,6 +14,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.index.IndexService; import org.opensearch.index.shard.IndexShard; @@ -147,6 +148,12 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener { if (segrepHandler != null) { logger.warn("Override handler for allocation id {}", request.getTargetAllocationId()); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index bf626ff93760c..1858449e13ae8 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -401,7 +401,7 @@ public void messageReceived(final ForceSyncRequest request, TransportChannel cha return; } startReplication( - ReplicationCheckpoint.empty(request.getShardId()), + ReplicationCheckpoint.empty(request.getShardId(), indexShard.getDefaultCodecName()), indexShard, new SegmentReplicationTargetService.SegmentReplicationListener() { @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 57e667b06a223..32521fb0cd944 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication.checkpoint; +import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -30,29 +31,32 @@ public class ReplicationCheckpoint implements Writeable, Comparable tracker.shardAllocationId.equals(id) == false) .collect(Collectors.toSet()); - final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 1, 1, 1L); - final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 2, 2, 50L); - final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 2, 3, 100L); + final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint( + tracker.shardId(), + 0L, + 1, + 1, + 1L, + Codec.getDefault().getName() + ); + final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint( + tracker.shardId(), + 0L, + 2, + 2, + 50L, + Codec.getDefault().getName() + ); + final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint( + tracker.shardId(), + 0L, + 2, + 3, + 100L, + Codec.getDefault().getName() + ); tracker.setLatestReplicationCheckpoint(initialCheckpoint); tracker.setLatestReplicationCheckpoint(secondCheckpoint); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 014a37249612b..c4db88782638f 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -8,6 +8,7 @@ package org.opensearch.index.shard; +import org.apache.lucene.codecs.Codec; import org.apache.lucene.index.SegmentInfos; import org.junit.Assert; import org.opensearch.ExceptionsHelper; @@ -306,7 +307,7 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { assertEquals(false, primaryShard.getReplicationTracker().isPrimaryMode()); assertEquals(true, primaryShard.routingEntry().primary()); - spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L), spyShard); + spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, Codec.getDefault().getName()), spyShard); // Verify that checkpoint is not processed as shard routing is primary. verify(spy, times(0)).startReplication(any(), any(), any()); @@ -1020,7 +1021,7 @@ private void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCoun private void resolveCheckpointInfoResponseListener(ActionListener listener, IndexShard primary) { try { - final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId), primary); + final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId, primary.getDefaultCodecName()), primary); listener.onResponse( new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); @@ -1034,7 +1035,7 @@ private void startReplicationAndAssertCancellation(IndexShard replica, SegmentRe throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); final SegmentReplicationTarget target = targetService.startReplication( - ReplicationCheckpoint.empty(replica.shardId), + ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()), replica, new SegmentReplicationTargetService.SegmentReplicationListener() { @Override diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java index 0105d0dc309c2..7be86aa0d96a4 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java @@ -52,7 +52,10 @@ public void setup() { } public void testNewDirectory() throws IOException { - Settings settings = Settings.builder().put(IndexMetadata.SETTING_INDEX_UUID, "uuid_1").build(); + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid_1") + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, "remote_store_repository") + .build(); IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings); Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0"); ShardPath shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0)); @@ -66,7 +69,7 @@ public void testNewDirectory() throws IOException { when(repositoriesService.repository("remote_store_repository")).thenReturn(repository); - try (Directory directory = remoteSegmentStoreDirectoryFactory.newDirectory("remote_store_repository", indexSettings, shardPath)) { + try (Directory directory = remoteSegmentStoreDirectoryFactory.newDirectory(indexSettings, shardPath)) { assertTrue(directory instanceof RemoteSegmentStoreDirectory); ArgumentCaptor blobPathCaptor = ArgumentCaptor.forClass(BlobPath.class); verify(blobStore, times(2)).blobContainer(blobPathCaptor.capture()); @@ -80,17 +83,14 @@ public void testNewDirectory() throws IOException { } public void testNewDirectoryRepositoryDoesNotExist() { - Settings settings = Settings.builder().build(); + Settings settings = Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, "remote_store_repository").build(); IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings); Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0"); ShardPath shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0)); when(repositoriesService.repository("remote_store_repository")).thenThrow(new RepositoryMissingException("Missing")); - assertThrows( - IllegalArgumentException.class, - () -> remoteSegmentStoreDirectoryFactory.newDirectory("remote_store_repository", indexSettings, shardPath) - ); + assertThrows(IllegalArgumentException.class, () -> remoteSegmentStoreDirectoryFactory.newDirectory(indexSettings, shardPath)); } } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 956279c3ea048..49a2d50dfae06 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -64,16 +64,17 @@ public void testUploadedSegmentMetadataToString() { RemoteSegmentStoreDirectory.UploadedSegmentMetadata metadata = new RemoteSegmentStoreDirectory.UploadedSegmentMetadata( "abc", "pqr", - "123456" + "123456", + 1234 ); - assertEquals("abc::pqr::123456", metadata.toString()); + assertEquals("abc::pqr::123456::1234", metadata.toString()); } public void testUploadedSegmentMetadataFromString() { RemoteSegmentStoreDirectory.UploadedSegmentMetadata metadata = RemoteSegmentStoreDirectory.UploadedSegmentMetadata.fromString( - "_0.cfe::_0.cfe__uuidxyz::4567" + "_0.cfe::_0.cfe__uuidxyz::4567::372000" ); - assertEquals("_0.cfe::_0.cfe__uuidxyz::4567", metadata.toString()); + assertEquals("_0.cfe::_0.cfe__uuidxyz::4567::372000", metadata.toString()); } public void testGetMetadataFilename() { @@ -141,9 +142,42 @@ public void testInitNoMetadataFile() throws IOException { private Map getDummyMetadata(String prefix, int commitGeneration) { Map metadata = new HashMap<>(); - metadata.put(prefix + ".cfe", prefix + ".cfe::" + prefix + ".cfe__" + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000)); - metadata.put(prefix + ".cfs", prefix + ".cfs::" + prefix + ".cfs__" + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000)); - metadata.put(prefix + ".si", prefix + ".si::" + prefix + ".si__" + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000)); + metadata.put( + prefix + ".cfe", + prefix + + ".cfe::" + + prefix + + ".cfe__" + + UUIDs.base64UUID() + + "::" + + randomIntBetween(1000, 5000) + + "::" + + randomIntBetween(512000, 1024000) + ); + metadata.put( + prefix + ".cfs", + prefix + + ".cfs::" + + prefix + + ".cfs__" + + UUIDs.base64UUID() + + "::" + + randomIntBetween(1000, 5000) + + "::" + + randomIntBetween(512000, 1024000) + ); + metadata.put( + prefix + ".si", + prefix + + ".si::" + + prefix + + ".si__" + + UUIDs.base64UUID() + + "::" + + randomIntBetween(1000, 5000) + + "::" + + randomIntBetween(512000, 1024000) + ); metadata.put( "segments_" + commitGeneration, "segments_" @@ -154,6 +188,8 @@ private Map getDummyMetadata(String prefix, int commitGeneration + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000) + + "::" + + randomIntBetween(1024, 5120) ); return metadata; } @@ -250,7 +286,7 @@ public void testDeleteFileException() throws IOException { assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.deleteFile("_0.si")); } - public void testFileLenght() throws IOException { + public void testFileLength() throws IOException { populateMetadata(); remoteSegmentStoreDirectory.init(); @@ -259,9 +295,7 @@ public void testFileLenght() throws IOException { assertTrue(uploadedSegments.containsKey("_0.si")); - when(remoteDataDirectory.fileLength(startsWith("_0.si"))).thenReturn(1234L); - - assertEquals(1234L, remoteSegmentStoreDirectory.fileLength("_0.si")); + assertEquals(uploadedSegments.get("_0.si").getLength(), remoteSegmentStoreDirectory.fileLength("_0.si")); } public void testFileLenghtNoSuchFile() throws IOException { @@ -376,8 +410,8 @@ public void testContainsFile() throws IOException { ); Map metadata = new HashMap<>(); - metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234"); - metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345"); + metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512"); + metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024"); when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(createMetadataFileBytes(metadata)); @@ -390,7 +424,7 @@ public void testContainsFile() throws IOException { UnsupportedOperationException.class, () -> uploadedSegmentMetadataMap.put( "_100.si", - new RemoteSegmentStoreDirectory.UploadedSegmentMetadata("_100.si", "_100.si__uuid1", "1234") + new RemoteSegmentStoreDirectory.UploadedSegmentMetadata("_100.si", "_100.si__uuid1", "1234", 500) ) ); @@ -531,8 +565,8 @@ public void testIncorrectChecksumCorruptIndexException() throws IOException { ); Map metadata = new HashMap<>(); - metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234"); - metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345"); + metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512"); + metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024"); BytesStreamOutput output = new BytesStreamOutput(); IndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096); diff --git a/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java index 2a30e58b8802c..3a73015c25589 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java @@ -59,11 +59,27 @@ private Map getDummyData() { String prefix = "_0"; expectedOutput.put( prefix + ".cfe", - prefix + ".cfe::" + prefix + ".cfe__" + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000) + prefix + + ".cfe::" + + prefix + + ".cfe__" + + UUIDs.base64UUID() + + "::" + + randomIntBetween(1000, 5000) + + "::" + + randomIntBetween(1024, 2048) ); expectedOutput.put( prefix + ".cfs", - prefix + ".cfs::" + prefix + ".cfs__" + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000) + prefix + + ".cfs::" + + prefix + + ".cfs__" + + UUIDs.base64UUID() + + "::" + + randomIntBetween(1000, 5000) + + "::" + + randomIntBetween(1024, 2048) ); return expectedOutput; } diff --git a/server/src/test/java/org/opensearch/indices/IndicesQueryCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesQueryCacheTests.java index b1a44d94aa0e1..5d293e6c598f6 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesQueryCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesQueryCacheTests.java @@ -143,6 +143,7 @@ public void testBasics() throws IOException { assertEquals(0L, stats.getCacheCount()); assertEquals(0L, stats.getHitCount()); assertEquals(0L, stats.getMissCount()); + assertEquals(0L, stats.getMemorySizeInBytes()); assertEquals(1, s.count(new DummyQuery(0))); @@ -151,6 +152,7 @@ public void testBasics() throws IOException { assertEquals(1L, stats.getCacheCount()); assertEquals(0L, stats.getHitCount()); assertEquals(1L, stats.getMissCount()); + assertTrue(stats.getMemorySizeInBytes() > 0L && stats.getMemorySizeInBytes() < Long.MAX_VALUE); for (int i = 1; i < 20; ++i) { assertEquals(1, s.count(new DummyQuery(i))); @@ -161,6 +163,7 @@ public void testBasics() throws IOException { assertEquals(20L, stats.getCacheCount()); assertEquals(0L, stats.getHitCount()); assertEquals(20L, stats.getMissCount()); + assertTrue(stats.getMemorySizeInBytes() > 0L && stats.getMemorySizeInBytes() < Long.MAX_VALUE); s.count(new DummyQuery(10)); @@ -169,6 +172,7 @@ public void testBasics() throws IOException { assertEquals(20L, stats.getCacheCount()); assertEquals(1L, stats.getHitCount()); assertEquals(20L, stats.getMissCount()); + assertTrue(stats.getMemorySizeInBytes() > 0L && stats.getMemorySizeInBytes() < Long.MAX_VALUE); IOUtils.close(r, dir); @@ -178,6 +182,7 @@ public void testBasics() throws IOException { assertEquals(20L, stats.getCacheCount()); assertEquals(1L, stats.getHitCount()); assertEquals(20L, stats.getMissCount()); + assertTrue(stats.getMemorySizeInBytes() > 0L && stats.getMemorySizeInBytes() < Long.MAX_VALUE); cache.onClose(shard); @@ -187,6 +192,7 @@ public void testBasics() throws IOException { assertEquals(0L, stats.getCacheCount()); assertEquals(0L, stats.getHitCount()); assertEquals(0L, stats.getMissCount()); + assertTrue(stats.getMemorySizeInBytes() >= 0L && stats.getMemorySizeInBytes() < Long.MAX_VALUE); cache.close(); // this triggers some assertions } @@ -227,12 +233,14 @@ public void testTwoShards() throws IOException { assertEquals(1L, stats1.getCacheCount()); assertEquals(0L, stats1.getHitCount()); assertEquals(1L, stats1.getMissCount()); + assertTrue(stats1.getMemorySizeInBytes() >= 0L && stats1.getMemorySizeInBytes() < Long.MAX_VALUE); QueryCacheStats stats2 = cache.getStats(shard2); assertEquals(0L, stats2.getCacheSize()); assertEquals(0L, stats2.getCacheCount()); assertEquals(0L, stats2.getHitCount()); assertEquals(0L, stats2.getMissCount()); + assertTrue(stats2.getMemorySizeInBytes() >= 0L && stats2.getMemorySizeInBytes() < Long.MAX_VALUE); assertEquals(1, s2.count(new DummyQuery(0))); @@ -241,12 +249,14 @@ public void testTwoShards() throws IOException { assertEquals(1L, stats1.getCacheCount()); assertEquals(0L, stats1.getHitCount()); assertEquals(1L, stats1.getMissCount()); + assertTrue(stats1.getMemorySizeInBytes() >= 0L && stats1.getMemorySizeInBytes() < Long.MAX_VALUE); stats2 = cache.getStats(shard2); assertEquals(1L, stats2.getCacheSize()); assertEquals(1L, stats2.getCacheCount()); assertEquals(0L, stats2.getHitCount()); assertEquals(1L, stats2.getMissCount()); + assertTrue(stats2.getMemorySizeInBytes() >= 0L && stats2.getMemorySizeInBytes() < Long.MAX_VALUE); for (int i = 0; i < 20; ++i) { assertEquals(1, s2.count(new DummyQuery(i))); @@ -257,12 +267,14 @@ public void testTwoShards() throws IOException { assertEquals(1L, stats1.getCacheCount()); assertEquals(0L, stats1.getHitCount()); assertEquals(1L, stats1.getMissCount()); + assertTrue(stats1.getMemorySizeInBytes() >= 0L && stats1.getMemorySizeInBytes() < Long.MAX_VALUE); stats2 = cache.getStats(shard2); assertEquals(10L, stats2.getCacheSize()); assertEquals(20L, stats2.getCacheCount()); assertEquals(1L, stats2.getHitCount()); assertEquals(20L, stats2.getMissCount()); + assertTrue(stats2.getMemorySizeInBytes() >= 0L && stats2.getMemorySizeInBytes() < Long.MAX_VALUE); IOUtils.close(r1, dir1); @@ -272,12 +284,14 @@ public void testTwoShards() throws IOException { assertEquals(1L, stats1.getCacheCount()); assertEquals(0L, stats1.getHitCount()); assertEquals(1L, stats1.getMissCount()); + assertTrue(stats1.getMemorySizeInBytes() >= 0L && stats1.getMemorySizeInBytes() < Long.MAX_VALUE); stats2 = cache.getStats(shard2); assertEquals(10L, stats2.getCacheSize()); assertEquals(20L, stats2.getCacheCount()); assertEquals(1L, stats2.getHitCount()); assertEquals(20L, stats2.getMissCount()); + assertTrue(stats2.getMemorySizeInBytes() >= 0L && stats2.getMemorySizeInBytes() < Long.MAX_VALUE); cache.onClose(shard1); @@ -287,12 +301,14 @@ public void testTwoShards() throws IOException { assertEquals(0L, stats1.getCacheCount()); assertEquals(0L, stats1.getHitCount()); assertEquals(0L, stats1.getMissCount()); + assertTrue(stats1.getMemorySizeInBytes() >= 0L && stats1.getMemorySizeInBytes() < Long.MAX_VALUE); stats2 = cache.getStats(shard2); assertEquals(10L, stats2.getCacheSize()); assertEquals(20L, stats2.getCacheCount()); assertEquals(1L, stats2.getHitCount()); assertEquals(20L, stats2.getMissCount()); + assertTrue(stats2.getMemorySizeInBytes() >= 0L && stats2.getMemorySizeInBytes() < Long.MAX_VALUE); IOUtils.close(r2, dir2); cache.onClose(shard2); @@ -303,12 +319,14 @@ public void testTwoShards() throws IOException { assertEquals(0L, stats1.getCacheCount()); assertEquals(0L, stats1.getHitCount()); assertEquals(0L, stats1.getMissCount()); + assertTrue(stats1.getMemorySizeInBytes() >= 0L && stats1.getMemorySizeInBytes() < Long.MAX_VALUE); stats2 = cache.getStats(shard2); assertEquals(0L, stats2.getCacheSize()); assertEquals(0L, stats2.getCacheCount()); assertEquals(0L, stats2.getHitCount()); assertEquals(0L, stats2.getMissCount()); + assertTrue(stats2.getMemorySizeInBytes() >= 0L && stats2.getMemorySizeInBytes() < Long.MAX_VALUE); cache.close(); // this triggers some assertions } diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index 78767ee1dcf8c..6e27a4db6afec 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -17,6 +17,7 @@ import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.IndexService; +import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; @@ -47,7 +48,7 @@ public class OngoingSegmentReplicationsTests extends IndexShardTestCase { private final IndicesService mockIndicesService = mock(IndicesService.class); - private ReplicationCheckpoint testCheckpoint; + private ReplicationCheckpoint testCheckpoint, olderCodecTestCheckpoint; private DiscoveryNode primaryDiscoveryNode; private DiscoveryNode replicaDiscoveryNode; private IndexShard primary; @@ -73,8 +74,12 @@ public void setUp() throws Exception { ShardId testShardId = primary.shardId(); + CodecService codecService = new CodecService(null, null); + String defaultCodecName = codecService.codec(CodecService.DEFAULT_CODEC).getName(); + // This mirrors the creation of the ReplicationCheckpoint inside CopyState - testCheckpoint = new ReplicationCheckpoint(testShardId, primary.getOperationPrimaryTerm(), 0L, 0L); + testCheckpoint = new ReplicationCheckpoint(testShardId, primary.getOperationPrimaryTerm(), 0L, 0L, defaultCodecName); + olderCodecTestCheckpoint = new ReplicationCheckpoint(testShardId, primary.getOperationPrimaryTerm(), 0L, 0L, "Lucene94"); IndexService mockIndexService = mock(IndexService.class); when(mockIndicesService.indexServiceSafe(testShardId.getIndex())).thenReturn(mockIndexService); when(mockIndexService.getShard(testShardId.id())).thenReturn(primary); @@ -89,6 +94,44 @@ public void tearDown() throws Exception { super.tearDown(); } + public void testSuccessfulCodecCompatibilityCheck() throws Exception { + indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); + primary.refresh("Test"); + OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); + // replica checkpoint is on same/higher lucene codec than primary + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + testCheckpoint + ); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + listener.onResponse(null); + }; + final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + } + + public void testFailCodecCompatibilityCheck() throws Exception { + indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); + primary.refresh("Test"); + OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); + // replica checkpoint is on lower/older lucene codec than primary + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + olderCodecTestCheckpoint + ); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + listener.onResponse(null); + }; + try { + final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + } catch (CancellableThreads.ExecutionCancelledException ex) { + Assert.assertTrue(ex.getMessage().contains("Requested unsupported codec version")); + } + } + public void testPrepareAndSendSegments() throws IOException { indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); primary.refresh("Test"); diff --git a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java index 10b747b822819..fdd707ae88195 100644 --- a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication; +import org.apache.lucene.codecs.Codec; import org.apache.lucene.util.Version; import org.junit.Assert; import org.opensearch.action.ActionListener; @@ -93,7 +94,13 @@ public void tearDown() throws Exception { } public void testGetCheckpointMetadata() { - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), PRIMARY_TERM, SEGMENTS_GEN, VERSION); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + VERSION, + Codec.getDefault().getName() + ); replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, mock(ActionListener.class)); CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear(); assertEquals(1, requestList.length); @@ -104,7 +111,13 @@ public void testGetCheckpointMetadata() { } public void testGetSegmentFiles() { - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), PRIMARY_TERM, SEGMENTS_GEN, VERSION); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + VERSION, + Codec.getDefault().getName() + ); StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", 1L, "checksum", Version.LATEST); replicationSource.getSegmentFiles( REPLICATION_ID, @@ -126,7 +139,13 @@ public void testGetSegmentFiles() { */ public void testTransportTimeoutForGetSegmentFilesAction() { long fileSize = (long) (Math.pow(10, 9)); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), PRIMARY_TERM, SEGMENTS_GEN, VERSION); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + VERSION, + Codec.getDefault().getName() + ); StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", fileSize, "checksum", Version.LATEST); replicationSource.getSegmentFiles( REPLICATION_ID, @@ -145,7 +164,13 @@ public void testTransportTimeoutForGetSegmentFilesAction() { public void testGetSegmentFiles_CancelWhileRequestOpen() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), PRIMARY_TERM, SEGMENTS_GEN, VERSION); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + VERSION, + Codec.getDefault().getName() + ); StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", 1L, "checksum", Version.LATEST); replicationSource.getSegmentFiles( REPLICATION_ID, diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java index 0d05b1ec8679e..41022b77b46e1 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication; +import org.apache.lucene.codecs.Codec; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; @@ -55,7 +56,13 @@ public void setUp() throws Exception { when(mockIndexService.getShard(testShardId.id())).thenReturn(mockIndexShard); // This mirrors the creation of the ReplicationCheckpoint inside CopyState - testCheckpoint = new ReplicationCheckpoint(testShardId, mockIndexShard.getOperationPrimaryTerm(), 0L, 0L); + testCheckpoint = new ReplicationCheckpoint( + testShardId, + mockIndexShard.getOperationPrimaryTerm(), + 0L, + 0L, + Codec.getDefault().getName() + ); testThreadPool = new TestThreadPool("test", Settings.EMPTY); CapturingTransport transport = new CapturingTransport(); localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index bae0afb5bcc3b..357a88c27fc46 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -15,6 +15,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; +import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.shard.IndexShard; @@ -62,10 +63,12 @@ public void setUp() throws Exception { .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()) .build(); + CodecService codecService = new CodecService(null, null); + String defaultCodecName = codecService.codec(CodecService.DEFAULT_CODEC).getName(); primaryShard = newStartedShard(true, settings); replicaShard = newShard(false, settings, new NRTReplicationEngineFactory()); recoverReplica(replicaShard, primaryShard, true, getReplicationFunc(replicaShard)); - checkpoint = new ReplicationCheckpoint(replicaShard.shardId(), 0L, 0L, 0L); + checkpoint = new ReplicationCheckpoint(replicaShard.shardId(), 0L, 0L, 0L, defaultCodecName); SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class); replicationSource = mock(SegmentReplicationSource.class); when(replicationSourceFactory.get(replicaShard)).thenReturn(replicationSource); @@ -76,13 +79,15 @@ public void setUp() throws Exception { initialCheckpoint.getShardId(), initialCheckpoint.getPrimaryTerm(), initialCheckpoint.getSegmentsGen(), - initialCheckpoint.getSegmentInfosVersion() + 1 + initialCheckpoint.getSegmentInfosVersion() + 1, + defaultCodecName ); newPrimaryCheckpoint = new ReplicationCheckpoint( initialCheckpoint.getShardId(), initialCheckpoint.getPrimaryTerm() + 1, initialCheckpoint.getSegmentsGen(), - initialCheckpoint.getSegmentInfosVersion() + 1 + initialCheckpoint.getSegmentInfosVersion() + 1, + defaultCodecName ); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 0c766c66413dd..a029d87f4a575 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication; +import org.apache.lucene.codecs.Codec; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.StringField; @@ -106,7 +107,8 @@ public void setUp() throws Exception { spyIndexShard.shardId(), spyIndexShard.getPendingPrimaryTerm(), testSegmentInfos.getGeneration(), - testSegmentInfos.version + testSegmentInfos.version, + Codec.getDefault().getName() ); } diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index 8a67292703da0..c851edf5e1bc8 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication.checkpoint; +import org.apache.lucene.codecs.Codec; import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.ActionTestUtils; @@ -104,7 +105,7 @@ public void testPublishCheckpointActionOnPrimary() { mockTargetService ); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1, Codec.getDefault().getName()); final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); action.shardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { @@ -139,7 +140,7 @@ public void testPublishCheckpointActionOnReplica() { mockTargetService ); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1, Codec.getDefault().getName()); final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index a87a8de206a39..e3b48302ae6ef 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -8,12 +8,14 @@ package org.opensearch.indices.replication.common; +import org.apache.lucene.codecs.Codec; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.util.Version; import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.index.codec.CodecService; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.shard.ShardId; @@ -49,7 +51,10 @@ public class CopyStateTests extends IndexShardTestCase { public void testCopyStateCreation() throws IOException { final IndexShard mockIndexShard = createMockIndexShard(); - CopyState copyState = new CopyState(ReplicationCheckpoint.empty(mockIndexShard.shardId()), mockIndexShard); + CopyState copyState = new CopyState( + ReplicationCheckpoint.empty(mockIndexShard.shardId(), new CodecService(null, null).codec("default").getName()), + mockIndexShard + ); ReplicationCheckpoint checkpoint = copyState.getCheckpoint(); assertEquals(TEST_SHARD_ID, checkpoint.getShardId()); // version was never set so this should be zero @@ -67,7 +72,13 @@ public static IndexShard createMockIndexShard() throws IOException { when(mockShard.store()).thenReturn(mockStore); SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major); - ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint(mockShard.shardId(), mockShard.getOperationPrimaryTerm(), 0L, 0L); + ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint( + mockShard.shardId(), + mockShard.getOperationPrimaryTerm(), + 0L, + 0L, + Codec.getDefault().getName() + ); final Tuple, ReplicationCheckpoint> gatedCloseableReplicationCheckpointTuple = new Tuple<>( new GatedCloseable<>(testSegmentInfos, () -> {}), testCheckpoint diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 7b81bd45cc8b6..80e603bd80420 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1299,7 +1299,10 @@ public void getCheckpointMetadata( ActionListener listener ) { try { - final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primaryShard.shardId), primaryShard); + final CopyState copyState = new CopyState( + ReplicationCheckpoint.empty(primaryShard.shardId, primaryShard.getDefaultCodecName()), + primaryShard + ); listener.onResponse( new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); @@ -1353,7 +1356,7 @@ public final List replicateSegments(IndexShard primary for (IndexShard replica : replicaShards) { final SegmentReplicationTargetService targetService = prepareForReplication(primaryShard, replica); final SegmentReplicationTarget target = targetService.startReplication( - ReplicationCheckpoint.empty(replica.shardId), + ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()), replica, new SegmentReplicationTargetService.SegmentReplicationListener() { @Override