Skip to content

Commit

Permalink
Add bulk ingest edge instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Dec 21, 2023
1 parent 883c1e1 commit e6b9e72
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import com.linecorp.armeria.server.annotation.Post;
import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser;
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
Expand All @@ -22,13 +25,17 @@ public class BulkIngestApi {
private static final Logger LOG = LoggerFactory.getLogger(BulkIngestApi.class);
private final BulkIngestKafkaProducer bulkIngestKafkaProducer;
private final DatasetRateLimitingService datasetRateLimitingService;
private final Counter incomingByteTotal;
private final String BULK_INGEST_INCOMING_BYTE_TOTAL = "kaldb_preprocessor_incoming_byte";

public BulkIngestApi(
BulkIngestKafkaProducer bulkIngestKafkaProducer,
DatasetRateLimitingService datasetRateLimitingService) {
DatasetRateLimitingService datasetRateLimitingService,
MeterRegistry meterRegistry) {

this.bulkIngestKafkaProducer = bulkIngestKafkaProducer;
this.datasetRateLimitingService = datasetRateLimitingService;
this.incomingByteTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_TOTAL);
}

@Blocking
Expand All @@ -37,7 +44,9 @@ public HttpResponse addDocument(String bulkRequest) {
// 1. Kaldb does not support the concept of "updates". It's always an add.
// 2. The "index" is used as the span name
try {
Map<String, List<Trace.Span>> docs = BulkApiRequestParser.parseRequest(bulkRequest);
byte[] bulkRequestBytes = bulkRequest.getBytes(StandardCharsets.UTF_8);
incomingByteTotal.increment(bulkRequestBytes.length);
Map<String, List<Trace.Span>> docs = BulkApiRequestParser.parseRequest(bulkRequestBytes);

// todo - our rate limiter doesn't have a way to acquire permits across multiple datasets
// so today as a limitation we reject any request that has documents against multiple indexes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.slack.kaldb.writer.SpanFormatter;
import com.slack.service.murron.trace.Trace;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
Expand Down Expand Up @@ -33,7 +32,7 @@ public class BulkApiRequestParser {

private static final String SERVICE_NAME_KEY = "service_name";

public static Map<String, List<Trace.Span>> parseRequest(String postBody) throws IOException {
public static Map<String, List<Trace.Span>> parseRequest(byte[] postBody) throws IOException {
return convertIndexRequestToTraceFormat(parseBulkRequest(postBody));
}

Expand Down Expand Up @@ -110,12 +109,11 @@ protected static IngestDocument convertRequestToDocument(IndexRequest indexReque
// and transform it
}

protected static List<IndexRequest> parseBulkRequest(String postBody) throws IOException {
protected static List<IndexRequest> parseBulkRequest(byte[] postBody) throws IOException {
List<IndexRequest> indexRequests = new ArrayList<>();
BulkRequest bulkRequest = new BulkRequest();
// calls parse under the hood
byte[] bytes = postBody.getBytes(StandardCharsets.UTF_8);
bulkRequest.add(bytes, 0, bytes.length, null, MediaTypeRegistry.JSON);
bulkRequest.add(postBody, 0, postBody.length, null, MediaTypeRegistry.JSON);
List<DocWriteRequest<?>> requests = bulkRequest.requests();
for (DocWriteRequest<?> request : requests) {
if (request.opType() == DocWriteRequest.OpType.INDEX) {
Expand Down
2 changes: 1 addition & 1 deletion kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ private static Set<Service> getServices(
services.add(datasetRateLimitingService);

BulkIngestApi openSearchBulkApiService =
new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService);
new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry);
armeriaServiceBuilder.withAnnotatedService(openSearchBulkApiService);
} else {
PreprocessorService preprocessorService =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.slack.service.murron.trace.Trace;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
Expand All @@ -18,15 +19,16 @@

public class BulkApiRequestParserTest {

private String getRawQueryString(String filename) throws IOException {
private byte[] getRawQueryBytes(String filename) throws IOException {
return Resources.toString(
Resources.getResource(String.format("opensearchRequest/bulk/%s.ndjson", filename)),
Charset.defaultCharset());
Resources.getResource(String.format("opensearchRequest/bulk/%s.ndjson", filename)),
Charset.defaultCharset())
.getBytes(StandardCharsets.UTF_8);
}

@Test
public void testSimpleIndexRequest() throws Exception {
String rawRequest = getRawQueryString("index_simple");
byte[] rawRequest = getRawQueryBytes("index_simple");

List<IndexRequest> indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
assertThat(indexRequests.size()).isEqualTo(1);
Expand All @@ -53,7 +55,7 @@ public void testSimpleIndexRequest() throws Exception {

@Test
public void testIndexNoFields() throws Exception {
String rawRequest = getRawQueryString("index_no_fields");
byte[] rawRequest = getRawQueryBytes("index_no_fields");

List<IndexRequest> indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);

Expand All @@ -76,7 +78,7 @@ public void testIndexNoFields() throws Exception {

@Test
public void testIndexNoFieldsNoId() throws Exception {
String rawRequest = getRawQueryString("index_no_fields_no_id");
byte[] rawRequest = getRawQueryBytes("index_no_fields_no_id");

List<IndexRequest> indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);

Expand All @@ -99,7 +101,7 @@ public void testIndexNoFieldsNoId() throws Exception {

@Test
public void testIndexEmptyRequest() throws Exception {
String rawRequest = getRawQueryString("index_empty_request");
byte[] rawRequest = getRawQueryBytes("index_empty_request");

List<IndexRequest> indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);

Expand All @@ -110,14 +112,14 @@ public void testIndexEmptyRequest() throws Exception {

@Test
public void testOtherBulkRequests() throws Exception {
String rawRequest = getRawQueryString("non_index");
byte[] rawRequest = getRawQueryBytes("non_index");
List<IndexRequest> indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
assertThat(indexRequests.size()).isEqualTo(0);
}

@Test
public void testIndexRequestWithSpecialChars() throws Exception {
String rawRequest = getRawQueryString("index_request_with_special_chars");
byte[] rawRequest = getRawQueryBytes("index_request_with_special_chars");
List<IndexRequest> indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
assertThat(indexRequests.size()).isEqualTo(1);
Map<String, List<Trace.Span>> indexDocs =
Expand All @@ -139,7 +141,7 @@ public void testIndexRequestWithSpecialChars() throws Exception {

@Test
public void testBulkRequests() throws Exception {
String rawRequest = getRawQueryString("bulk_requests");
byte[] rawRequest = getRawQueryBytes("bulk_requests");
List<IndexRequest> indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
assertThat(indexRequests.size()).isEqualTo(1);

Expand All @@ -162,7 +164,7 @@ public void testBulkRequests() throws Exception {

@Test
public void testUpdatesAgainstTwoIndexes() throws Exception {
String rawRequest = getRawQueryString("two_indexes");
byte[] rawRequest = getRawQueryBytes("two_indexes");
List<IndexRequest> indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
assertThat(indexRequests.size()).isEqualTo(2);

Expand All @@ -178,7 +180,7 @@ public void testUpdatesAgainstTwoIndexes() throws Exception {

@Test
public void testTraceSpanGeneratedTimestamp() throws IOException {
String rawRequest = getRawQueryString("index_simple");
byte[] rawRequest = getRawQueryBytes("index_simple");

List<IndexRequest> indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
assertThat(indexRequests.size()).isEqualTo(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void bootstrapCluster() throws Exception {
datasetRateLimitingService.awaitRunning(DEFAULT_START_STOP_DURATION);
bulkIngestKafkaProducer.awaitRunning(DEFAULT_START_STOP_DURATION);

bulkApi = new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService);
bulkApi = new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry);
}

// I looked at making this a @BeforeEach. it's possible if you annotate a test with a @Tag and
Expand Down

0 comments on commit e6b9e72

Please sign in to comment.