Skip to content

Commit

Permalink
Initial Framework Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
rishabhmaurya committed Apr 20, 2023
1 parent 5bbb99a commit ba3153a
Show file tree
Hide file tree
Showing 30 changed files with 765 additions and 78 deletions.
9 changes: 0 additions & 9 deletions .idea/inspectionProfiles/Project_Default.xml

This file was deleted.

11 changes: 0 additions & 11 deletions .idea/runConfigurations/Debug_OpenSearch.xml

This file was deleted.

18 changes: 2 additions & 16 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ClusterConfiguration {
String distribution = 'archive'

@Input
int numNodes = 1
int numNodes = 2

@Input
int numBwcNodes = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class RunTask extends DefaultTestClustersTask {
private static final int DEFAULT_DEBUG_PORT = 5005;
public static final String LOCALHOST_ADDRESS_PREFIX = "127.0.0.1:";

private Boolean debug = false;
private Boolean debug = true;

private Boolean debugServer = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ static List<String> systemJvmOptions() {
* networkaddress.cache.ttl; can be set to -1 to cache forever.
*/
"-Dopensearch.networkaddress.cache.ttl=60",
"-Djava.security.policy=/home/rishma/ws/pa/performance-analyzer/config/opensearch_security.policy",
"-Djdk.attach.allowAttachSelf=true",
"-Dclk.tck=100",
"--add-opens=jdk.attach/sun.tools.attach=ALL-UNNAMED",

/*
* Cache ttl in seconds for negative DNS lookups noting that this overrides the JDK security property
* networkaddress.cache.negative ttl; set to -1 to cache forever.
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ systemProp.org.gradle.dependency.duplicate.project.detection=false

# Enforce the build to fail on deprecated gradle api usage
systemProp.org.gradle.warning.mode=fail

kotlin.stdlib.default.dependency=false
# forcing to use TLS1.2 to avoid failure in vault
# see https://github.com/hashicorp/vault/issues/8750#issuecomment-631236121
systemProp.jdk.tls.client.protocols=TLSv1.2
Expand Down
1 change: 1 addition & 0 deletions gradle/run.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ apply plugin: 'opensearch.testclusters'
testClusters {
runTask {
testDistribution = 'archive'
numberOfNodes = 2
}
}

Expand Down
25 changes: 25 additions & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,31 @@ dependencies {
api "org.apache.logging.log4j:log4j-jul:${versions.log4j}"
api "org.apache.logging.log4j:log4j-core:${versions.log4j}", optional

api 'io.opentelemetry:opentelemetry-api:1.23.1'
api 'io.opentelemetry:opentelemetry-api-logs:1.23.1-alpha'
api 'io.opentelemetry:opentelemetry-api-events:1.23.1-alpha'
api 'io.opentelemetry:opentelemetry-sdk:1.23.1'
api 'io.opentelemetry:opentelemetry-sdk-metrics:1.23.1'
api 'io.opentelemetry:opentelemetry-sdk-common:1.23.1'
api 'io.opentelemetry:opentelemetry-sdk-trace:1.23.1'
api 'io.opentelemetry:opentelemetry-context:1.23.1'
api 'io.opentelemetry:opentelemetry-sdk-logs:1.23.1-alpha'
api 'io.opentelemetry:opentelemetry-semconv:1.23.1-alpha'

// exporters
api 'io.opentelemetry:opentelemetry-exporter-common:1.23.1'
//implementation 'io.opentelemetry:opentelemetry-exporter-jaeger:1.23.1'
api 'io.opentelemetry:opentelemetry-exporter-otlp:1.23.1'
api 'io.opentelemetry:opentelemetry-exporter-otlp-common:1.23.1'
api("com.squareup.okhttp3:okhttp:4.10.0")
api 'org.jetbrains.kotlin:kotlin-stdlib:1.6.20'
api 'com.squareup.okio:okio-jvm:3.0.0'

//implementation "com.squareup.okio:okio:2.8.0"

//implementation 'io.opentelemetry:opentelemetry-exporter-logging:1.23.1'
//implementation 'io.opentelemetry:opentelemetry-exporter-jaeger:1.23.1'

// jna
api "net.java.dev.jna:jna:${versions.jna}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.node.Node;
import org.opensearch.threadpool.RunnableTaskExecutionListener;
import org.opensearch.threadpool.TaskAwareRunnable;
import org.opensearch.tracing.opentelemetry.OpenTelemetryContextWrapper;

import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -128,7 +129,6 @@ public static PrioritizedOpenSearchThreadPoolExecutor newSinglePrioritizing(
) {
return new PrioritizedOpenSearchThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder, timer);
}

public static OpenSearchThreadPoolExecutor newScaling(
String name,
int min,
Expand Down Expand Up @@ -354,7 +354,7 @@ public void execute(Runnable command) {
* @return an {@link ExecutorService} that executes submitted tasks on the current thread
*/
public static ExecutorService newDirectExecutorService() {
return DIRECT_EXECUTOR_SERVICE;
return OpenTelemetryContextWrapper.wrapTask(DIRECT_EXECUTOR_SERVICE);
}

public static String threadName(Settings settings, String namePrefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ final String getName() {
}

@SuppressForbidden(reason = "properly rethrowing errors, see OpenSearchExecutors.rethrowErrors")
OpenSearchThreadPoolExecutor(
protected OpenSearchThreadPoolExecutor(
String name,
int corePoolSize,
int maximumPoolSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.index.seqno;

import io.opentelemetry.context.Context;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand Down Expand Up @@ -63,6 +64,7 @@
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskId;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.tracing.opentelemetry.OTelContextPreservingActionListener;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -129,8 +131,9 @@ final void sync(
String primaryAllocationId,
long primaryTerm,
RetentionLeases retentionLeases,
ActionListener<ReplicationResponse> listener
ActionListener<ReplicationResponse> listener1
) {
final ActionListener<ReplicationResponse> listener = new OTelContextPreservingActionListener<>(listener1, Context.current());
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.RunUnderPrimaryPermit;
import org.opensearch.tracing.opentelemetry.OpenTelemetryService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Transports;

import java.io.Closeable;
import java.io.IOException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.IntSupplier;

/**
* This handler is used for node-to-node peer recovery when the recovery target is a replica/ or a relocating primary
Expand Down Expand Up @@ -106,6 +109,7 @@ && isTargetSameHistory()
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
startingSeqNo = request.startingSeqNo();
if (retentionLeaseRef.get() == null) {

createRetentionLease(startingSeqNo, ActionListener.map(sendFileStep, ignored -> SendFileResult.EMPTY));
} else {
sendFileStep.onResponse(SendFileResult.EMPTY);
Expand Down Expand Up @@ -144,15 +148,23 @@ && isTargetSameHistory()
// If the target previously had a copy of this shard then a file-based recovery might move its global
// checkpoint backwards. We must therefore remove any existing retention lease so that we can create a
// new one later on in the recovery.
shard.removePeerRecoveryRetentionLease(
request.targetNode().getId(),
BiFunction<Object[], ActionListener<?>, Void> removePeerRecoveryRetentionLeaseFun =
(args, actionListener) -> {
shard.removePeerRecoveryRetentionLease((String) args[0],
(ActionListener<ReplicationResponse>) actionListener);
return null;
};
OpenTelemetryService.callFunctionAndStartSpan(
"removePeerRecoveryRetentionLease",
removePeerRecoveryRetentionLeaseFun,
new ThreadedActionListener<>(
logger,
shard.getThreadPool(),
ThreadPool.Names.GENERIC,
deleteRetentionLeaseStep,
false
)
),
request.targetNode().getId()
);
} catch (RetentionLeaseNotFoundException e) {
logger.debug("no peer-recovery retention lease for " + request.targetAllocationId());
Expand All @@ -162,7 +174,21 @@ && isTargetSameHistory()

deleteRetentionLeaseStep.whenComplete(ignored -> {
assert Transports.assertNotTransportThread(this + "[phase1]");
phase1(wrappedSafeCommit.get(), startingSeqNo, () -> estimateNumOps, sendFileStep, false);
BiFunction<Object[], ActionListener<?>, Void> phase1Fun =
(args, actionListener) -> {
phase1((IndexCommit)args[0], (long) args[1], (IntSupplier) args[2],
(ActionListener<SendFileResult>) actionListener, (boolean) args[3]);
return null;
};
OpenTelemetryService.callFunctionAndStartSpan(
"phase1",
phase1Fun,
sendFileStep,
wrappedSafeCommit.get(),
startingSeqNo,
(IntSupplier)(() -> estimateNumOps),
false
);
}, onFailure);

} catch (final Exception e) {
Expand All @@ -174,7 +200,17 @@ && isTargetSameHistory()
sendFileStep.whenComplete(r -> {
assert Transports.assertNotTransportThread(this + "[prepareTargetForTranslog]");
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(countNumberOfHistoryOperations(startingSeqNo), prepareEngineStep);
BiFunction<Object[], ActionListener<?>, Void> prepareTargetForTranslogFun =
(args, actionListener) -> {
prepareTargetForTranslog((int)args[0], (ActionListener<TimeValue>) actionListener);
return null;
};
OpenTelemetryService.callFunctionAndStartSpan(
"prepareTargetForTranslog",
prepareTargetForTranslogFun,
prepareEngineStep,
countNumberOfHistoryOperations(startingSeqNo)
);
}, onFailure);

prepareEngineStep.whenComplete(prepareEngineTime -> {
Expand Down Expand Up @@ -213,18 +249,30 @@ && isTargetSameHistory()
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
final RetentionLeases retentionLeases = shard.getRetentionLeases();
final long mappingVersionOnPrimary = shard.indexSettings().getIndexMetadata().getMappingVersion();
phase2(
BiFunction<Object[], ActionListener<?>, Void> phase2Fun =
(args, actionListener) -> {
try {
phase2((long) args[0], (long) args[1], (Translog.Snapshot) args[2], (long)args[3], (long)args[4],
(RetentionLeases)args[5], (long)args[6], (ActionListener<SendSnapshotResult>) actionListener);
} catch (IOException e) {
throw new RuntimeException(e);
}
return null;
};
OpenTelemetryService.callFunctionAndStartSpan(
"phase2",
phase2Fun,
sendSnapshotStep,
startingSeqNo,
endingSeqNo,
phase2Snapshot,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersionOnPrimary,
sendSnapshotStep
mappingVersionOnPrimary
);

}, onFailure);

finalizeStepAndCompleteFuture(startingSeqNo, sendSnapshotStep, sendFileStep, prepareEngineStep, onFailure);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.indices.recovery;

import io.opentelemetry.api.trace.Span;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
Expand All @@ -57,6 +58,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.tracing.opentelemetry.OpenTelemetryService;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.transport.TransportService;
Expand All @@ -68,6 +70,10 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;

import static io.opentelemetry.api.common.AttributeKey.longKey;
import static io.opentelemetry.api.common.AttributeKey.stringKey;

/**
* The source recovery accepts recovery requests from other peer shards and start the recovery process from this
Expand Down Expand Up @@ -157,9 +163,14 @@ public void clusterChanged(ClusterChangedEvent event) {
}

private void recover(StartRecoveryRequest request, ActionListener<RecoveryResponse> listener) {
Span span = Span.current();
span.setAttribute(stringKey("index-name"), request.shardId().getIndexName());
span.setAttribute(longKey("shard-id"), request.shardId().id());
span.setAttribute(stringKey("source-node"), request.sourceNode().getId());
span.setAttribute(stringKey("target-node"), request.targetNode().getId());

final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
final IndexShard shard = indexService.getShard(request.shardId().id());

final ShardRouting routingEntry = shard.routingEntry();

if (routingEntry.primary() == false || routingEntry.active() == false) {
Expand All @@ -183,6 +194,7 @@ private void recover(StartRecoveryRequest request, ActionListener<RecoveryRespon
request.shardId().id(),
request.targetNode()
);

handler.recoverToTarget(ActionListener.runAfter(listener, () -> ongoingRecoveries.remove(shard, handler)));
}

Expand All @@ -202,7 +214,12 @@ private void reestablish(ReestablishRecoveryRequest request, ActionListener<Reco
class StartRecoveryTransportRequestHandler implements TransportRequestHandler<StartRecoveryRequest> {
@Override
public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel, Task task) throws Exception {
recover(request, new ChannelActionListener<>(channel, Actions.START_RECOVERY, request));
BiFunction<Object[], ActionListener<?>, Void> recoverFunction = (args, actionListener) -> {
recover((StartRecoveryRequest) args[0], (ActionListener<RecoveryResponse>) actionListener);
return null;
};
OpenTelemetryService.callFunctionAndStartSpan("recover", recoverFunction,
new ChannelActionListener<>(channel, Actions.START_RECOVERY, request), request);
}
}

Expand Down
Loading

0 comments on commit ba3153a

Please sign in to comment.