Skip to content

Commit

Permalink
[Streaming Indexing] Fix intermittent 'The bulk request must be termi…
Browse files Browse the repository at this point in the history
…nated by a newline [\n]' failures (#16337)

* [Streaming Indexing] Fix intermittent 'The bulk request must be terminated by a newline [\n]' failures

Signed-off-by: Andriy Redko <[email protected]>

* Address code review comments

Signed-off-by: Andriy Redko <[email protected]>

---------

Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta authored Oct 16, 2024
1 parent 6594516 commit ec7b652
Show file tree
Hide file tree
Showing 12 changed files with 34 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix warnings from SLF4J on startup when repository-s3 is installed ([#16194](https://github.com/opensearch-project/OpenSearch/pull/16194))
- Fix protobuf-java leak through client library dependencies ([#16254](https://github.com/opensearch-project/OpenSearch/pull/16254))
- Fix multi-search with template doesn't return status code ([#16265](https://github.com/opensearch-project/OpenSearch/pull/16265))
- [Streaming Indexing] Fix intermittent 'The bulk request must be terminated by a newline [\n]' failures [#16337](https://github.com/opensearch-project/OpenSearch/pull/16337))
- Fix wrong default value when setting `index.number_of_routing_shards` to null on index creation ([#16331](https://github.com/opensearch-project/OpenSearch/pull/16331))
- Fix disk usage exceeds threshold cluster can't spin up issue ([#15258](https://github.com/opensearch-project/OpenSearch/pull/15258)))

Expand Down
2 changes: 1 addition & 1 deletion buildSrc/version.properties
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ netty = 4.1.114.Final
joda = 2.12.7

# project reactor
reactor_netty = 1.1.22
reactor_netty = 1.1.23
reactor = 3.5.20

# client dependencies
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
a7059b0c18ab7aa0fa9e08b48cb6a20b15c11478

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
94b294fa90aee2e88ad4337251e278aaac21362c

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
a7059b0c18ab7aa0fa9e08b48cb6a20b15c11478

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
94b294fa90aee2e88ad4337251e278aaac21362c
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,36 @@ public void testStreamingLargeDocument() throws IOException {
String.format(
Locale.getDefault(),
"{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n{ \"name\": \"%s\" }\n",
randomAlphaOfLength(5000)
randomAlphaOfLength(7000)
)
);

final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
"POST",
"/_bulk/stream",
Flux.fromStream(stream).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
);

final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);

StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
.expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"1\""))
.expectComplete()
.verify();

assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200));
assertThat(streamingResponse.getWarnings(), empty());
}

public void testStreamingLargeDocumentThatExceedsChunkSize() throws IOException {
final Stream<String> stream = Stream.of(
String.format(
Locale.getDefault(),
"{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n{ \"name\": \"%s\" }\n",
randomAlphaOfLength(9000) /* the default chunk size limit is set 8k */
)
);

final Duration delay = Duration.ofMillis(1);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
"POST",
"/_bulk/stream",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Excepti
spec -> spec.maxChunkSize(maxChunkSize.bytesAsInt())
.maxHeaderSize(maxHeaderSize.bytesAsInt())
.maxInitialLineLength(maxInitialLineLength.bytesAsInt())
.allowPartialChunks(false)
)
.handle((req, res) -> incomingRequest(req, res))
);
Expand Down

0 comments on commit ec7b652

Please sign in to comment.