Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into MetadataMultiType
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>

Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Dec 11, 2024
2 parents c5cb306 + 6d3b225 commit e501f09
Show file tree
Hide file tree
Showing 88 changed files with 2,973 additions and 352 deletions.
2 changes: 1 addition & 1 deletion DocumentsFromSnapshotMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dependencies {
implementation project(":RFS")
implementation project(":transformation")
implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerLoaders')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:openSearch23PlusTargetTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

implementation group: 'org.apache.logging.log4j', name: 'log4j-api'
implementation group: 'org.apache.logging.log4j', name: 'log4j-core'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.ByteBuffersDirectory;
import org.apache.lucene.util.BytesRef;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
Expand All @@ -36,6 +37,7 @@
import static org.mockito.Mockito.when;

@Slf4j
@Disabled("https://opensearch.atlassian.net/browse/MIGRATIONS-2254")
public class PerformanceVerificationTest {

@Test
Expand Down
4 changes: 3 additions & 1 deletion RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ dependencies {

implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerInterface')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJMESPathMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJinjavaTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJoltMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

implementation group: 'org.jcommander', name: 'jcommander'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
Expand Down Expand Up @@ -62,7 +64,7 @@ dependencies {
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter'

testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerLoaders')
testRuntimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:openSearch23PlusTargetTransformerProvider')
testRuntimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,17 @@ protected DirectoryReader wrapReader(DirectoryReader reader, boolean softDeletes
return reader;
}

protected RfsLuceneDocument getDocument(IndexReader reader, int docId, boolean isLive) {
protected RfsLuceneDocument getDocument(IndexReader reader, int docSegId, boolean isLive) {
Document document;
try {
document = reader.document(docId);
document = reader.document(docSegId);

Check failure on line 167 in RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java

View workflow job for this annotation

GitHub Actions / Run SonarQube Analysis

java:S1874

Remove this use of "document"; it is deprecated.
} catch (IOException e) {
log.atError().setCause(e).setMessage("Failed to read document at Lucene index location {}")
.addArgument(docId).log();
log.atError()
.setCause(e)
.setMessage("Failed to read document segment id {} from source {}")
.addArgument(docSegId)
.addArgument(indexDirectoryPath)
.log();
return null;
}

Expand Down Expand Up @@ -207,18 +211,21 @@ protected RfsLuceneDocument getDocument(IndexReader reader, int docId, boolean i
}
}
if (id == null) {
log.atError().setMessage("Document with index {} does not have an id. Skipping")
.addArgument(docId).log();
log.atWarn().setMessage("Skipping document segment id {} from source {}, it does not have an referenceable id.")
.addArgument(docSegId)
.addArgument(indexDirectoryPath)
.log();
return null; // Skip documents with missing id
}

if (sourceBytes == null || sourceBytes.bytes.length == 0) {
log.atWarn().setMessage("Document {} doesn't have the _source field enabled")
.addArgument(id).log();
log.atWarn().setMessage("Skipping document segment id {} document id {} from source {}, it doesn't have the _source field enabled.")
.addArgument(docSegId)
.addArgument(id)
.addArgument(indexDirectoryPath)
.log();
return null; // Skip these
}

log.atDebug().setMessage("Reading document {}").addArgument(id).log();
} catch (RuntimeException e) {
StringBuilder errorMessage = new StringBuilder();
errorMessage.append("Unable to parse Document id from Document. The Document's Fields: ");
Expand All @@ -232,7 +239,10 @@ protected RfsLuceneDocument getDocument(IndexReader reader, int docId, boolean i
return null; // Skip these
}

log.atDebug().setMessage("Document {} read successfully").addArgument(id).log();
log.atDebug().setMessage("Document id {} from source {} read successfully.")
.addArgument(id)
.addArgument(indexDirectoryPath)
.log();
return new RfsLuceneDocument(id, type, sourceBytes.utf8ToString(), routing);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public SearchClusterContainer(final ContainerVersion version) {
super(DockerImageName.parse(version.imageName));
this.withExposedPorts(9200, 9300)
.withEnv(version.getInitializationType().getEnvVariables())
.waitingFor(Wait.forHttp("/").forPort(9200).forStatusCode(200).withStartupTimeout(Duration.ofSeconds(20)));
.waitingFor(Wait.forHttp("/").forPort(9200).forStatusCode(200).withStartupTimeout(Duration.ofMinutes(5)));

this.containerVersion = version;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ services:
condition: service_started
opensearchtarget:
condition: service_started
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317" #--transformer-config-base64 W3sgIkpzb25Kb2x0VHJhbnNmb3JtZXJQcm92aWRlciI6ClsKICB7CiAgICAic2NyaXB0IjogewogICAgICAib3BlcmF0aW9uIjogInNoaWZ0IiwKICAgICAgInNwZWMiOiB7CiAgICAgICAgIm1ldGhvZCI6ICJtZXRob2QiLAogICAgICAgICJVUkkiOiAiVVJJIiwKICAgICAgICAiaGVhZGVycyI6ICJoZWFkZXJzIiwKICAgICAgICAicGF5bG9hZCI6IHsKICAgICAgICAgICJpbmxpbmVkSnNvbkJvZHkiOiB7CiAgICAgICAgICAgICJ0b3AiOiB7CiAgICAgICAgICAgICAgInRhZ1RvRXhjaXNlIjogewogICAgICAgICAgICAgICAgIioiOiAicGF5bG9hZC5pbmxpbmVkSnNvbkJvZHkudG9wLiYiIAogICAgICAgICAgICAgIH0sCiAgICAgICAgICAgICAgIioiOiAicGF5bG9hZC5pbmxpbmVkSnNvbkJvZHkudG9wLiYiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAiKiI6ICJwYXlsb2FkLmlubGluZWRKc29uQm9keS4mIgogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfQogICAgfQogIH0sIAogewogICAic2NyaXB0IjogewogICAgICJvcGVyYXRpb24iOiAibW9kaWZ5LW92ZXJ3cml0ZS1iZXRhIiwKICAgICAic3BlYyI6IHsKICAgICAgICJVUkkiOiAiPXNwbGl0KCcvZXh0cmFUaGluZ1RvUmVtb3ZlJyxAKDEsJikpIgogICAgIH0KICB9CiB9LAogewogICAic2NyaXB0IjogewogICAgICJvcGVyYXRpb24iOiAibW9kaWZ5LW92ZXJ3cml0ZS1iZXRhIiwKICAgICAic3BlYyI6IHsKICAgICAgICJVUkkiOiAiPWpvaW4oJycsQCgxLCYpKSIKICAgICB9CiAgfQogfQpdCn1dCg=="

# command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317 "
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317 --transformer-config '[{\"TypeMappingSanitizationTransformerProvider\":{\"sourceProperties\":{\"version\":{\"major\":7,\"minor\":10}}}}]'"
opensearchtarget:
image: 'opensearchproject/opensearch:2.15.0'
environment:
Expand Down
4 changes: 2 additions & 2 deletions TrafficCapture/trafficReplayer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ transform to add GZIP encoding and another to apply a new header would be config
```

To run only one transformer without any configuration, the `--transformer-config` argument can simply
be set to the name of the transformer (e.g. 'JsonTransformerForOpenSearch23PlusTargetTransformerProvider',
be set to the name of the transformer (e.g. 'TypeMappingSanitizationTransformerProvider',
without quotes or any json surrounding it).

The user can also specify a file to read the transformations from using the `--transformer-config-file`. Users can
also pass the script as an argument via `--transformer-config-base64`. Each of the `transformer-config` options
is mutually exclusive.

Some simple transformations are included to change headers to add compression or to force an HTTP message payload to
be chunked. Another transformer, [JsonTypeMappingTransformer.java](../../transformation/transformationPlugins/jsonMessageTransformers/openSearch23PlusTargetTransformerProvider/src/main/java/org/opensearch/migrations/transform/JsonTypeMappingTransformer.java),
be chunked. Another transformer, [TypeMappingSanitizationTransformer.java](../../transformation/transformationPlugins/jsonMessageTransformers/jsonTypeMappingsSanitizationTransformer/src/main/java/org/opensearch/migrations/transform/TypeMappingsSanitizationTransformer.java),
is a work-in-progress to excise type mapping references from URIs and message payloads since versions of OpenSource
greater than 2.3 do not support them.

Expand Down
5 changes: 3 additions & 2 deletions TrafficCapture/trafficReplayer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ dependencies {
implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerLoaders')
implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerInterface')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJMESPathMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJinjavaTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJoltMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:openSearch23PlusTargetTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

implementation group: 'org.jcommander', name: 'jcommander'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
Expand Down Expand Up @@ -59,7 +60,7 @@ dependencies {
testImplementation testFixtures(project(path: ':coreUtilities'))
testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJMESPathMessageTransformerProvider')
testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJoltMessageTransformerProvider')
testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:openSearch23PlusTargetTransformerProvider')
testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

testImplementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5'
testImplementation group: 'org.junit.jupiter', name:'junit-jupiter-api'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
} else {
content.release();
}
} else if (msg instanceof HttpMessage) {
}
if (msg instanceof HttpMessage) { // this & HttpContent are interfaces & 'Full' messages implement both
message = (HttpMessage) msg;
}
if (msg instanceof LastHttpContent) {
Expand All @@ -206,16 +207,16 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
var finalMsg = (message instanceof HttpRequest)
? new DefaultFullHttpRequest(message.protocolVersion(),
((HttpRequest) message).method(),
((HttpRequest) message).uri(),
aggregatedContents,
message.headers(),
((LastHttpContent) msg).trailingHeaders())
((HttpRequest) message).method(),
((HttpRequest) message).uri(),
aggregatedContents,
message.headers(),
((LastHttpContent) msg).trailingHeaders())
: new DefaultFullHttpResponse(message.protocolVersion(),
((HttpResponse)message).status(),
aggregatedContents,
message.headers(),
((LastHttpContent) msg).trailingHeaders());
((HttpResponse)message).status(),
aggregatedContents,
message.headers(),
((LastHttpContent) msg).trailingHeaders());
super.channelRead(ctx, finalMsg);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public class ParsedHttpMessagesAsDicts {
public static final String STATUS_CODE_KEY = "Status-Code";
public static final String RESPONSE_TIME_MS_KEY = "response_time_ms";
public static final String EXCEPTION_KEY_STRING = "Exception";
public static final String REQUEST_URI_KEY = "Request-URI";
public static final String METHOD_KEY = "Method";
public static final String HTTP_VERSION_KEY = "HTTP-Version";
public static final String PAYLOAD_KEY = "payload";

public final Optional<Map<String, Object>> sourceRequestOp;
public final Optional<Map<String, Object>> sourceResponseOp;
Expand Down Expand Up @@ -183,15 +187,15 @@ private static Map<String, Object> convertRequest(
var message = (HttpJsonRequestWithFaultingPayload) messageHolder.get();
if (message != null) {
var map = new LinkedHashMap<>(message.headers());
map.put("Request-URI", message.path());
map.put("Method", message.method());
map.put("HTTP-Version", message.protocol());
map.put(REQUEST_URI_KEY, message.path());
map.put(METHOD_KEY, message.method());
map.put(HTTP_VERSION_KEY, message.protocol());
context.setMethod(message.method());
context.setEndpoint(message.path());
context.setHttpVersion(message.protocol());
encodeBinaryPayloadIfExists(message);
if (!message.payload().isEmpty()) {
map.put("payload", message.payload());
map.put(PAYLOAD_KEY, message.payload());
}
return map;
} else {
Expand Down Expand Up @@ -223,14 +227,14 @@ private static Map<String, Object> convertResponse(
var message = (HttpJsonResponseWithFaultingPayload) messageHolder.get();
if (message != null) {
var map = new LinkedHashMap<>(message.headers());
map.put("HTTP-Version", message.protocol());
map.put(HTTP_VERSION_KEY, message.protocol());
map.put(STATUS_CODE_KEY, Integer.parseInt(message.code()));
map.put("Reason-Phrase", message.reason());
map.put(RESPONSE_TIME_MS_KEY, latency.toMillis());
context.setHttpVersion(message.protocol());
encodeBinaryPayloadIfExists(message);
if (!message.payload().isEmpty()) {
map.put("payload", message.payload());
map.put(PAYLOAD_KEY, message.payload());
}
return map;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ public static String getTransactionSummaryStringPreamble() {
.add("SOURCE_STATUS_CODE/TARGET_STATUS_CODE...")
.add("SOURCE_RESPONSE_SIZE_BYTES/TARGET_RESPONSE_SIZE_BYTES...")
.add("SOURCE_LATENCY_MS/TARGET_LATENCY_MS...")
.add("METHOD...")
.add("URI...")
.toString();
}

Expand Down Expand Up @@ -218,6 +220,16 @@ public static String toTransactionSummaryString(
transformStreamToString(parsed.targetResponseList.stream(),
r -> "" + r.get(ParsedHttpMessagesAsDicts.RESPONSE_TIME_MS_KEY))
)
// method
.add(
parsed.sourceRequestOp
.map(r -> (String) r.get(ParsedHttpMessagesAsDicts.METHOD_KEY))
.orElse(MISSING_STR))
// uri
.add(
parsed.sourceRequestOp
.map(r -> (String) r.get(ParsedHttpMessagesAsDicts.REQUEST_URI_KEY))
.orElse(MISSING_STR))
.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ public Object getNextTopLevelObject() throws IOException {
pushCompletedValue(parser.getText());
break;
case VALUE_NUMBER_INT:
pushCompletedValue(parser.getIntValue());
pushCompletedValue(parser.getLongValue());
break;
case VALUE_NUMBER_FLOAT:
pushCompletedValue(parser.getFloatValue());
pushCompletedValue(parser.getDoubleValue());
break;
case NOT_AVAILABLE:
// pipeline stall - need more data
Expand Down
Loading

0 comments on commit e501f09

Please sign in to comment.