Skip to content

Commit

Permalink
feat: optimize blockstream processing (#358)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs authored Nov 25, 2024
1 parent c2fb9d0 commit 544d983
Show file tree
Hide file tree
Showing 44 changed files with 1,353 additions and 1,243 deletions.
1 change: 1 addition & 0 deletions server/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ services:
- "8080:8080"
- "5005:5005"
- "9999:9999"
- "8849:8849"

cadvisor:
image: "gcr.io/cadvisor/cadvisor:v0.47.0"
Expand Down
6 changes: 5 additions & 1 deletion server/docker/logging.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ io.helidon.common.level=INFO
#com.hedera.block.level=FINE
#com.hedera.block.server.level=FINE

# Configure specific loggers
# Configure specific Block Node loggers
#com.hedera.block.server.producer.ProducerBlockItemObserver.level=FINE
#com.hedera.block.server.mediator.LiveStreamMediatorImpl.level=FINE
#com.hedera.block.server.persistence.storage.write.BlockAsDirWriter.level=FINE
#com.hedera.block.server.consumer.ConsumerStreamResponseObserver.level=FINE
#com.hedera.block.server.pbj.PbjBlockStreamServiceProxy.level=FINE

# Helidon PBJ Plugin loggers
#com.hedera.pbj.grpc.helidon.PbjProtocolHandler.level=FINE

# Console handler configuration
handlers = java.util.logging.ConsoleHandler
Expand Down
5 changes: 3 additions & 2 deletions server/docker/update-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ echo "REGISTRY_PREFIX=" >> .env
echo "BLOCKNODE_STORAGE_ROOT_PATH=/app/storage" >> .env

if [ true = "$is_debug" ]; then
# wait for debugger to attach
echo "SERVER_OPTS='-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005'" >> .env
# The server will wait for the debugger to attach on port 5005
# JProfiler can attach on port 8849
echo "SERVER_OPTS='-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005' -agentpath:/path/to/libjprofilerti.so=port=8849 " >> .env
fi

if [ true = "$is_smoke_test" ]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import com.hedera.block.server.mediator.SubscriptionHandler;
import com.hedera.block.server.metrics.BlockNodeMetricTypes;
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.hapi.block.SubscribeStreamResponse;
import com.hedera.hapi.block.stream.BlockItem;
import com.hedera.hapi.block.BlockItemUnparsed;
import com.hedera.hapi.block.SubscribeStreamResponseUnparsed;
import com.hedera.pbj.runtime.OneOf;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.InstantSource;
Expand All @@ -43,13 +43,14 @@
* by Helidon). The ConsumerBlockItemObserver implements the BlockNodeEventHandler interface so the
* Disruptor can invoke the onEvent() method when a new SubscribeStreamResponse is available.
*/
public class ConsumerStreamResponseObserver implements BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponse>> {
public class ConsumerStreamResponseObserver
implements BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponseUnparsed>> {

private final Logger LOGGER = System.getLogger(getClass().getName());

private final MetricsService metricsService;
private final Flow.Subscriber<? super SubscribeStreamResponse> subscribeStreamResponseObserver;
private final SubscriptionHandler<SubscribeStreamResponse> subscriptionHandler;
private final Flow.Subscriber<? super SubscribeStreamResponseUnparsed> subscribeStreamResponseObserver;
private final SubscriptionHandler<SubscribeStreamResponseUnparsed> subscriptionHandler;

private final AtomicBoolean isResponsePermitted = new AtomicBoolean(true);
private final ResponseSender statusResponseSender = new StatusResponseSender();
Expand All @@ -74,8 +75,8 @@ public class ConsumerStreamResponseObserver implements BlockNodeEventHandler<Obj
*/
public ConsumerStreamResponseObserver(
@NonNull final InstantSource producerLivenessClock,
@NonNull final SubscriptionHandler<SubscribeStreamResponse> subscriptionHandler,
@NonNull final Flow.Subscriber<? super SubscribeStreamResponse> subscribeStreamResponseObserver,
@NonNull final SubscriptionHandler<SubscribeStreamResponseUnparsed> subscriptionHandler,
@NonNull final Flow.Subscriber<? super SubscribeStreamResponseUnparsed> subscribeStreamResponseObserver,
@NonNull final BlockNodeContext blockNodeContext) {

this.livenessCalculator = new LivenessCalculator(
Expand All @@ -102,7 +103,8 @@ public ConsumerStreamResponseObserver(
* @param b true if the event is the last in the sequence
*/
@Override
public void onEvent(@NonNull final ObjectEvent<SubscribeStreamResponse> event, final long l, final boolean b) {
public void onEvent(
@NonNull final ObjectEvent<SubscribeStreamResponseUnparsed> event, final long l, final boolean b) {

// Only send the response if the consumer has not cancelled
// or closed the stream.
Expand All @@ -114,7 +116,7 @@ public void onEvent(@NonNull final ObjectEvent<SubscribeStreamResponse> event, f
// Refresh the producer liveness and pass the BlockItem to the downstream observer.
livenessCalculator.refresh();

final SubscribeStreamResponse subscribeStreamResponse = event.get();
final SubscribeStreamResponseUnparsed subscribeStreamResponse = event.get();
final ResponseSender responseSender = getResponseSender(subscribeStreamResponse);
responseSender.send(subscribeStreamResponse);
}
Expand All @@ -127,9 +129,10 @@ public boolean isTimeoutExpired() {
}

@NonNull
private ResponseSender getResponseSender(@NonNull final SubscribeStreamResponse subscribeStreamResponse) {
private ResponseSender getResponseSender(@NonNull final SubscribeStreamResponseUnparsed subscribeStreamResponse) {

final OneOf<SubscribeStreamResponse.ResponseOneOfType> responseType = subscribeStreamResponse.response();
final OneOf<SubscribeStreamResponseUnparsed.ResponseOneOfType> responseType =
subscribeStreamResponse.response();
return switch (responseType.kind()) {
case STATUS -> {
isResponsePermitted.set(false);
Expand All @@ -142,13 +145,13 @@ private ResponseSender getResponseSender(@NonNull final SubscribeStreamResponse
}

private interface ResponseSender {
void send(@NonNull final SubscribeStreamResponse subscribeStreamResponse);
void send(@NonNull final SubscribeStreamResponseUnparsed subscribeStreamResponse);
}

private final class BlockItemsResponseSender implements ResponseSender {
private boolean streamStarted = false;

public void send(@NonNull final SubscribeStreamResponse subscribeStreamResponse) {
public void send(@NonNull final SubscribeStreamResponseUnparsed subscribeStreamResponse) {

if (subscribeStreamResponse.blockItems() == null) {
final String message = PROTOCOL_VIOLATION_MESSAGE.formatted(
Expand All @@ -157,21 +160,19 @@ public void send(@NonNull final SubscribeStreamResponse subscribeStreamResponse)
throw new IllegalArgumentException(message);
}

final List<BlockItem> blockItems =
final List<BlockItemUnparsed> blockItems =
Objects.requireNonNull(subscribeStreamResponse.blockItems()).blockItems();

// Only start sending BlockItems after we've reached
// the beginning of a block.
if (!streamStarted && blockItems.getFirst().hasBlockHeader()) {
LOGGER.log(
DEBUG,
"Sending BlockItem Batch downstream for block: "
+ blockItems.getFirst().blockHeader().number());
final BlockItemUnparsed firstBlockItem = blockItems.getFirst();
if (!streamStarted && firstBlockItem.hasBlockHeader()) {
streamStarted = true;
}

if (streamStarted) {
metricsService
.get(BlockNodeMetricTypes.Counter.LiveBlockItemsReceived)
.get(BlockNodeMetricTypes.Counter.LiveBlockItemsConsumed)
.add(blockItems.size());
subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
}
Expand All @@ -181,7 +182,7 @@ public void send(@NonNull final SubscribeStreamResponse subscribeStreamResponse)
// TODO: Implement another StatusResponseSender that will unsubscribe the observer once the
// status code is fixed.
private final class StatusResponseSender implements ResponseSender {
public void send(@NonNull final SubscribeStreamResponse subscribeStreamResponse) {
public void send(@NonNull final SubscribeStreamResponseUnparsed subscribeStreamResponse) {
LOGGER.log(DEBUG, "Sending SubscribeStreamResponse downstream: " + subscribeStreamResponse);
subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
subscribeStreamResponseObserver.onComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
package com.hedera.block.server.mediator;

import com.hedera.block.server.notifier.Notifiable;
import com.hedera.hapi.block.SubscribeStreamResponse;
import com.hedera.hapi.block.stream.BlockItem;
import com.hedera.hapi.block.BlockItemUnparsed;
import com.hedera.hapi.block.SubscribeStreamResponseUnparsed;
import java.util.List;

/**
* Use this interface to combine the contract for mediating the live stream of blocks from the
* Hedera network with the contract to be notified of critical system events.
*/
public interface LiveStreamMediator
extends StreamMediator<List<BlockItem>, SubscribeStreamResponse>, Notifiable {}
extends StreamMediator<List<BlockItemUnparsed>, SubscribeStreamResponseUnparsed>, Notifiable {}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.hedera.block.server.events.ObjectEvent;
import com.hedera.block.server.persistence.storage.write.BlockWriter;
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.hapi.block.SubscribeStreamResponse;
import com.hedera.hapi.block.SubscribeStreamResponseUnparsed;
import com.lmax.disruptor.BatchEventProcessor;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Map;
Expand All @@ -41,16 +41,15 @@ public class LiveStreamMediatorBuilder {
private final ServiceStatus serviceStatus;

private Map<
BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponse>>,
BatchEventProcessor<ObjectEvent<SubscribeStreamResponse>>>
BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponseUnparsed>>,
BatchEventProcessor<ObjectEvent<SubscribeStreamResponseUnparsed>>>
subscribers;

/** The initial capacity of the subscriber map. */
private static final int SUBSCRIBER_INIT_CAPACITY = 32;

private LiveStreamMediatorBuilder(
@NonNull final BlockNodeContext blockNodeContext,
@NonNull final ServiceStatus serviceStatus) {
@NonNull final BlockNodeContext blockNodeContext, @NonNull final ServiceStatus serviceStatus) {
this.subscribers = new ConcurrentHashMap<>(SUBSCRIBER_INIT_CAPACITY);
this.blockNodeContext = blockNodeContext;
this.serviceStatus = serviceStatus;
Expand All @@ -67,8 +66,7 @@ private LiveStreamMediatorBuilder(
*/
@NonNull
public static LiveStreamMediatorBuilder newBuilder(
@NonNull final BlockNodeContext blockNodeContext,
@NonNull final ServiceStatus serviceStatus) {
@NonNull final BlockNodeContext blockNodeContext, @NonNull final ServiceStatus serviceStatus) {
return new LiveStreamMediatorBuilder(blockNodeContext, serviceStatus);
}

Expand All @@ -85,8 +83,8 @@ public static LiveStreamMediatorBuilder newBuilder(
public LiveStreamMediatorBuilder subscribers(
@NonNull
final Map<
BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponse>>,
BatchEventProcessor<ObjectEvent<SubscribeStreamResponse>>>
BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponseUnparsed>>,
BatchEventProcessor<ObjectEvent<SubscribeStreamResponseUnparsed>>>
subscribers) {
this.subscribers = subscribers;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import com.hedera.block.server.events.ObjectEvent;
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.hapi.block.BlockItemSet;
import com.hedera.hapi.block.SubscribeStreamResponse;
import com.hedera.hapi.block.BlockItemSetUnparsed;
import com.hedera.hapi.block.BlockItemUnparsed;
import com.hedera.hapi.block.SubscribeStreamResponseCode;
import com.hedera.hapi.block.stream.BlockItem;
import com.hedera.hapi.block.SubscribeStreamResponseUnparsed;
import com.lmax.disruptor.BatchEventProcessor;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.List;
Expand All @@ -46,7 +46,8 @@
* subscribers as they arrive via a RingBuffer maintained in the base class and persists the block
* items to a store.
*/
class LiveStreamMediatorImpl extends SubscriptionHandlerBase<SubscribeStreamResponse> implements LiveStreamMediator {
class LiveStreamMediatorImpl extends SubscriptionHandlerBase<SubscribeStreamResponseUnparsed>
implements LiveStreamMediator {

private final Logger LOGGER = System.getLogger(getClass().getName());

Expand All @@ -69,8 +70,8 @@ class LiveStreamMediatorImpl extends SubscriptionHandlerBase<SubscribeStreamResp
LiveStreamMediatorImpl(
@NonNull
final Map<
BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponse>>,
BatchEventProcessor<ObjectEvent<SubscribeStreamResponse>>>
BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponseUnparsed>>,
BatchEventProcessor<ObjectEvent<SubscribeStreamResponseUnparsed>>>
subscribers,
@NonNull final ServiceStatus serviceStatus,
@NonNull final BlockNodeContext blockNodeContext) {
Expand All @@ -96,25 +97,26 @@ class LiveStreamMediatorImpl extends SubscriptionHandlerBase<SubscribeStreamResp
* consumers
*/
@Override
public void publish(@NonNull final List<BlockItem> blockItems) {
public void publish(@NonNull final List<BlockItemUnparsed> blockItems) {

if (serviceStatus.isRunning()) {

// Publish the block for all subscribers to receive
LOGGER.log(DEBUG, "Publishing BlockItem");
final BlockItemSet blockItemsSet =
BlockItemSet.newBuilder().blockItems(blockItems).build();
final var subscribeStreamResponse = SubscribeStreamResponse.newBuilder()
final BlockItemSetUnparsed blockItemsSet =
BlockItemSetUnparsed.newBuilder().blockItems(blockItems).build();

final var subscribeStreamResponse = SubscribeStreamResponseUnparsed.newBuilder()
.blockItems(blockItemsSet)
.build();

LOGGER.log(DEBUG, "Publishing BlockItems: " + blockItems.size());
ringBuffer.publishEvent((event, sequence) -> event.set(subscribeStreamResponse));

long remainingCapacity = ringBuffer.remainingCapacity();
metricsService.get(MediatorRingBufferRemainingCapacity).set(remainingCapacity);

// Increment the block item counter by all block items published
metricsService.get(LiveBlockItems).add(blockItems.size());

} else {
LOGGER.log(ERROR, "StreamMediator is not accepting BlockItems");
}
Expand All @@ -133,16 +135,16 @@ public void notifyUnrecoverableError() {
LOGGER.log(ERROR, "Sending an error response to end the stream for all consumers.");

// Publish an end of stream response to all downstream consumers
final SubscribeStreamResponse endStreamResponse = buildEndStreamResponse();
final SubscribeStreamResponseUnparsed endStreamResponse = buildEndStreamResponse();
ringBuffer.publishEvent((event, sequence) -> event.set(endStreamResponse));
}

@NonNull
private static SubscribeStreamResponse buildEndStreamResponse() {
private static SubscribeStreamResponseUnparsed buildEndStreamResponse() {
// The current spec does not contain a generic error code for
// SubscribeStreamResponseCode.
// TODO: Replace READ_STREAM_SUCCESS (2) with a generic error code?
return SubscribeStreamResponse.newBuilder()
return SubscribeStreamResponseUnparsed.newBuilder()
.status(SubscribeStreamResponseCode.READ_STREAM_SUCCESS)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.notifier.Notifiable;
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.hapi.block.SubscribeStreamResponse;
import com.hedera.hapi.block.SubscribeStreamResponseUnparsed;
import dagger.Binds;
import dagger.Module;
import dagger.Provides;
Expand Down Expand Up @@ -61,7 +61,7 @@ static LiveStreamMediator providesLiveStreamMediator(
*/
@Binds
@Singleton
SubscriptionHandler<SubscribeStreamResponse> bindSubscriptionHandler(
SubscriptionHandler<SubscribeStreamResponseUnparsed> bindSubscriptionHandler(
@NonNull final LiveStreamMediator liveStreamMediator);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import com.hedera.block.server.events.BlockNodeEventHandler;
import com.hedera.block.server.events.ObjectEvent;
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.hapi.block.SubscribeStreamResponse;
import com.hedera.hapi.block.stream.BlockItem;
import com.hedera.hapi.block.BlockItemUnparsed;
import com.hedera.hapi.block.SubscribeStreamResponseUnparsed;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.List;

Expand All @@ -51,27 +51,27 @@ public NoOpLiveStreamMediator(@NonNull final BlockNodeContext blockNodeContext)
* {@inheritDoc}
*/
@Override
public void publish(@NonNull List<BlockItem> blockItems) {
public void publish(@NonNull List<BlockItemUnparsed> blockItems) {
metricsService.get(LiveBlockItems).add(blockItems.size());
}

/**
* {@inheritDoc}
*/
@Override
public void subscribe(@NonNull BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {}
public void subscribe(@NonNull BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponseUnparsed>> handler) {}

/**
* {@inheritDoc}
*/
@Override
public void unsubscribe(@NonNull BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {}
public void unsubscribe(@NonNull BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponseUnparsed>> handler) {}

/**
* {@inheritDoc}
*/
@Override
public boolean isSubscribed(@NonNull BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {
public boolean isSubscribed(@NonNull BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponseUnparsed>> handler) {
return false;
}

Expand Down
Loading

0 comments on commit 544d983

Please sign in to comment.