diff --git a/server/src/main/java/com/hedera/block/server/pbj/PbjBlockStreamServiceProxy.java b/server/src/main/java/com/hedera/block/server/pbj/PbjBlockStreamServiceProxy.java index fe3afa6b7..a6fe9a83d 100644 --- a/server/src/main/java/com/hedera/block/server/pbj/PbjBlockStreamServiceProxy.java +++ b/server/src/main/java/com/hedera/block/server/pbj/PbjBlockStreamServiceProxy.java @@ -29,7 +29,6 @@ import com.hedera.block.server.producer.ProducerBlockItemObserver; import com.hedera.block.server.producer.ProducerConfig; import com.hedera.block.server.service.ServiceStatus; -import com.hedera.hapi.block.BlockItemSetUnparsed; import com.hedera.hapi.block.BlockItemUnparsed; import com.hedera.hapi.block.PublishStreamRequestUnparsed; import com.hedera.hapi.block.PublishStreamResponse; @@ -196,9 +195,7 @@ private Bytes createSubscribeStreamResponse( private List parsePublishStreamRequest( @NonNull final Bytes message, @NonNull final RequestOptions options) throws ParseException { final PublishStreamRequestUnparsed request = PublishStreamRequestUnparsed.PROTOBUF.parse(message); - final Bytes b = request.blockItems(); - final BlockItemSetUnparsed blockItemSet = BlockItemSetUnparsed.PROTOBUF.parse(b); - return blockItemSet.blockItems(); + return request.blockItems().blockItems(); } @NonNull diff --git a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java index 2473997ac..e2433df98 100644 --- a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java @@ -125,7 +125,9 @@ public void onNext(@NonNull final List blockItems) { livenessCalculator.refresh(); // Publish the block to the mediator - publisher.publish(blockItems); + if (!blockItems.isEmpty()) { + publisher.publish(blockItems); + } } else { LOGGER.log(ERROR, getClass().getName() + " is not accepting BlockItems"); diff --git a/server/src/test/java/com/hedera/block/server/pbj/PbjBlockStreamServiceIntegrationTest.java b/server/src/test/java/com/hedera/block/server/pbj/PbjBlockStreamServiceIntegrationTest.java index 6bedf1b71..ecea77d21 100644 --- a/server/src/test/java/com/hedera/block/server/pbj/PbjBlockStreamServiceIntegrationTest.java +++ b/server/src/test/java/com/hedera/block/server/pbj/PbjBlockStreamServiceIntegrationTest.java @@ -17,10 +17,12 @@ package com.hedera.block.server.pbj; import static com.hedera.block.server.producer.Util.getFakeHash; +import static com.hedera.block.server.util.PersistTestUtils.generateBlockItemsUnparsed; import static java.lang.System.Logger; import static java.lang.System.Logger.Level.INFO; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.hedera.block.server.config.BlockNodeContext; import com.hedera.block.server.events.BlockNodeEventHandler; @@ -37,17 +39,16 @@ import com.hedera.block.server.util.TestConfigUtil; import com.hedera.block.server.util.TestUtils; import com.hedera.hapi.block.Acknowledgement; -import com.hedera.hapi.block.BlockItemSet; +import com.hedera.hapi.block.BlockItemSetUnparsed; import com.hedera.hapi.block.BlockItemUnparsed; +import com.hedera.hapi.block.BlockUnparsed; import com.hedera.hapi.block.EndOfStream; import com.hedera.hapi.block.ItemAcknowledgement; +import com.hedera.hapi.block.PublishStreamRequestUnparsed; import com.hedera.hapi.block.PublishStreamResponse; import com.hedera.hapi.block.PublishStreamResponseCode; import com.hedera.hapi.block.SubscribeStreamRequest; -import com.hedera.hapi.block.SubscribeStreamResponse; import com.hedera.hapi.block.SubscribeStreamResponseUnparsed; -import com.hedera.hapi.block.stream.Block; -import com.hedera.hapi.block.stream.BlockItem; import com.hedera.pbj.runtime.grpc.Pipeline; import com.hedera.pbj.runtime.io.buffer.Bytes; import com.lmax.disruptor.BatchEventProcessor; @@ -60,9 +61,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -76,43 +79,43 @@ public class PbjBlockStreamServiceIntegrationTest { private Notifier notifier; @Mock - private Pipeline helidonPublishStreamObserver1; + private Pipeline helidonPublishStreamObserver1; @Mock - private Pipeline helidonPublishStreamObserver2; + private Pipeline helidonPublishStreamObserver2; @Mock - private Pipeline helidonPublishStreamObserver3; + private Pipeline helidonPublishStreamObserver3; @Mock private SubscribeStreamRequest subscribeStreamRequest; @Mock - private Pipeline subscribeStreamObserver1; + private Pipeline subscribeStreamObserver1; @Mock - private Pipeline subscribeStreamObserver2; + private Pipeline subscribeStreamObserver2; @Mock - private Pipeline subscribeStreamObserver3; + private Pipeline subscribeStreamObserver3; @Mock - private Pipeline subscribeStreamObserver4; + private Pipeline subscribeStreamObserver4; @Mock - private Pipeline subscribeStreamObserver5; + private Pipeline subscribeStreamObserver5; @Mock - private Pipeline subscribeStreamObserver6; + private Pipeline subscribeStreamObserver6; @Mock private WebServer webServer; @Mock - private BlockWriter> blockWriter; + private BlockWriter> blockWriter; @Mock - private BlockReader blockReader; + private BlockReader blockReader; private static final String TEMP_DIR = "block-node-unit-test-dir"; @@ -137,91 +140,104 @@ public void tearDown() { TestUtils.deleteDirectory(testPath.toFile()); } - // @Test - // public void testPublishBlockStreamRegistrationAndExecution() throws IOException, NoSuchAlgorithmException { - // - // final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); - // final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) - // .build(); - // final var notifier = new NotifierImpl(streamMediator, blockNodeContext, serviceStatus); - // final var blockNodeEventHandler = new StreamPersistenceHandlerImpl( - // streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); - // - // final PbjBlockStreamServiceProxy pbjBlockStreamServiceProxy = new PbjBlockStreamServiceProxy( - // streamMediator, serviceStatus, blockNodeEventHandler, notifier, blockNodeContext); - // - // // Register 3 producers - // final Flow.Subscriber publishStreamObserver = - // pbjBlockStreamServiceProxy.publishBlockStream(helidonPublishStreamObserver1); - // - // pbjBlockStreamServiceProxy.publishBlockStream(helidonPublishStreamObserver2); - // pbjBlockStreamServiceProxy.publishBlockStream(helidonPublishStreamObserver3); - // - // // Register 3 consumers - // pbjBlockStreamServiceProxy.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver1); - // pbjBlockStreamServiceProxy.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver2); - // pbjBlockStreamServiceProxy.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver3); - // - // List blockItems = generateBlockItems(1); - // for (int i = 0; i < blockItems.size(); i++) { - // if (i == 9) { - // - // when(blockWriter.write(List.of(blockItems.get(i)))).thenReturn(Optional.of(List.of(blockItems.get(i)))); - // } else { - // when(blockWriter.write(List.of(blockItems.get(i)))).thenReturn(Optional.empty()); - // } - // } - // - // for (BlockItem blockItem : blockItems) { - // final PublishStreamRequest publishStreamRequest = PublishStreamRequest.newBuilder() - // .blockItems(new BlockItemSet(List.of(blockItem))) - // .build(); - // - // // Calling onNext() as Helidon does with each block item for - // // the first producer. - // publishStreamObserver.onNext(publishStreamRequest); - // } - // - // // Verify all 10 BlockItems were sent to each of the 3 consumers - // verify(subscribeStreamObserver1, timeout(testTimeout).times(1)) - // .onNext(buildSubscribeStreamResponse(blockItems.getFirst())); - // verify(subscribeStreamObserver1, timeout(testTimeout).times(8)) - // .onNext(buildSubscribeStreamResponse(blockItems.get(1))); - // verify(subscribeStreamObserver1, timeout(testTimeout).times(1)) - // .onNext(buildSubscribeStreamResponse(blockItems.get(9))); - // - // verify(subscribeStreamObserver2, timeout(testTimeout).times(1)) - // .onNext(buildSubscribeStreamResponse(blockItems.getFirst())); - // verify(subscribeStreamObserver2, timeout(testTimeout).times(8)) - // .onNext(buildSubscribeStreamResponse(blockItems.get(1))); - // verify(subscribeStreamObserver2, timeout(testTimeout).times(1)) - // .onNext(buildSubscribeStreamResponse(blockItems.get(9))); - // - // verify(subscribeStreamObserver3, timeout(testTimeout).times(1)) - // .onNext(buildSubscribeStreamResponse(blockItems.getFirst())); - // verify(subscribeStreamObserver3, timeout(testTimeout).times(8)) - // .onNext(buildSubscribeStreamResponse(blockItems.get(1))); - // verify(subscribeStreamObserver3, timeout(testTimeout).times(1)) - // .onNext(buildSubscribeStreamResponse(blockItems.get(9))); - // - // // Only 1 response is expected per block sent - // final Acknowledgement itemAck = buildAck(List.of(blockItems.get(9))); - // final PublishStreamResponse publishStreamResponse = - // PublishStreamResponse.newBuilder().acknowledgement(itemAck).build(); - // - // // Verify all 3 producers received the response - // verify(helidonPublishStreamObserver1, timeout(testTimeout).times(1)).onNext(publishStreamResponse); - // - // verify(helidonPublishStreamObserver2, timeout(testTimeout).times(1)).onNext(publishStreamResponse); - // - // verify(helidonPublishStreamObserver3, timeout(testTimeout).times(1)).onNext(publishStreamResponse); - // - // // Close the stream as Helidon does - // publishStreamObserver.onComplete(); - // - // // verify the onCompleted() method is invoked on the wrapped StreamObserver - // verify(helidonPublishStreamObserver1, timeout(testTimeout).times(1)).onComplete(); - // } + @Test + public void testPublishBlockStreamRegistrationAndExecution() throws IOException, NoSuchAlgorithmException { + + final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); + final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) + .build(); + final var notifier = new NotifierImpl(streamMediator, blockNodeContext, serviceStatus); + final var blockNodeEventHandler = new StreamPersistenceHandlerImpl( + streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); + + final PbjBlockStreamServiceProxy pbjBlockStreamServiceProxy = new PbjBlockStreamServiceProxy( + streamMediator, serviceStatus, blockNodeEventHandler, notifier, blockNodeContext); + + // Register 3 producers + final Bytes publishStreamRequestBytes = + PublishStreamRequestUnparsed.PROTOBUF.toBytes(PublishStreamRequestUnparsed.newBuilder() + .blockItems(BlockItemSetUnparsed.newBuilder().build()) + .build()); + final Pipeline producerPipeline = pbjBlockStreamServiceProxy.open( + PbjBlockStreamService.BlockStreamMethod.publishBlockStream, null, helidonPublishStreamObserver1); + pbjBlockStreamServiceProxy + .open(PbjBlockStreamService.BlockStreamMethod.publishBlockStream, null, helidonPublishStreamObserver2) + .onNext(publishStreamRequestBytes); + pbjBlockStreamServiceProxy + .open(PbjBlockStreamService.BlockStreamMethod.publishBlockStream, null, helidonPublishStreamObserver3) + .onNext(publishStreamRequestBytes); + + final Bytes subscribeStreamRequestBytes = SubscribeStreamRequest.PROTOBUF.toBytes( + SubscribeStreamRequest.newBuilder().startBlockNumber(1).build()); + // Register 3 consumers + pbjBlockStreamServiceProxy + .open(PbjBlockStreamService.BlockStreamMethod.subscribeBlockStream, null, subscribeStreamObserver1) + .onNext(subscribeStreamRequestBytes); + pbjBlockStreamServiceProxy + .open(PbjBlockStreamService.BlockStreamMethod.subscribeBlockStream, null, subscribeStreamObserver2) + .onNext(subscribeStreamRequestBytes); + pbjBlockStreamServiceProxy + .open(PbjBlockStreamService.BlockStreamMethod.subscribeBlockStream, null, subscribeStreamObserver3) + .onNext(subscribeStreamRequestBytes); + + List blockItems = generateBlockItemsUnparsed(1); + for (int i = 0; i < blockItems.size(); i++) { + if (i == 9) { + when(blockWriter.write(List.of(blockItems.get(i)))).thenReturn(Optional.of(List.of(blockItems.get(i)))); + } else { + when(blockWriter.write(List.of(blockItems.get(i)))).thenReturn(Optional.empty()); + } + } + + for (BlockItemUnparsed blockItem : blockItems) { + // Calling onNext() as Helidon does + final BlockItemSetUnparsed blockItemSet = + BlockItemSetUnparsed.newBuilder().blockItems(blockItem).build(); + final PublishStreamRequestUnparsed publishStreamRequest = PublishStreamRequestUnparsed.newBuilder() + .blockItems(blockItemSet) + .build(); + producerPipeline.onNext(PublishStreamRequestUnparsed.PROTOBUF.toBytes(publishStreamRequest)); + } + + // Verify all 10 BlockItems were sent to each of the 3 consumers + verify(subscribeStreamObserver1, timeout(testTimeout).times(1)) + .onNext(buildSubscribeStreamResponse(blockItems.getFirst())); + verify(subscribeStreamObserver1, timeout(testTimeout).times(8)) + .onNext(buildSubscribeStreamResponse(blockItems.get(1))); + verify(subscribeStreamObserver1, timeout(testTimeout).times(1)) + .onNext(buildSubscribeStreamResponse(blockItems.get(9))); + + verify(subscribeStreamObserver2, timeout(testTimeout).times(1)) + .onNext(buildSubscribeStreamResponse(blockItems.getFirst())); + verify(subscribeStreamObserver2, timeout(testTimeout).times(8)) + .onNext(buildSubscribeStreamResponse(blockItems.get(1))); + verify(subscribeStreamObserver2, timeout(testTimeout).times(1)) + .onNext(buildSubscribeStreamResponse(blockItems.get(9))); + + verify(subscribeStreamObserver3, timeout(testTimeout).times(1)) + .onNext(buildSubscribeStreamResponse(blockItems.getFirst())); + verify(subscribeStreamObserver3, timeout(testTimeout).times(8)) + .onNext(buildSubscribeStreamResponse(blockItems.get(1))); + verify(subscribeStreamObserver3, timeout(testTimeout).times(1)) + .onNext(buildSubscribeStreamResponse(blockItems.get(9))); + + // Only 1 response is expected per block sent + final Acknowledgement itemAck = buildAck(List.of(blockItems.get(9))); + final PublishStreamResponse publishStreamResponse = + PublishStreamResponse.newBuilder().acknowledgement(itemAck).build(); + + // Verify all 3 producers received the response + final Bytes responseBytes = PublishStreamResponse.PROTOBUF.toBytes(publishStreamResponse); + verify(helidonPublishStreamObserver1, timeout(testTimeout).times(1)).onNext(responseBytes); + verify(helidonPublishStreamObserver2, timeout(testTimeout).times(1)).onNext(responseBytes); + verify(helidonPublishStreamObserver3, timeout(testTimeout).times(1)).onNext(responseBytes); + + // Close the stream as Helidon does + helidonPublishStreamObserver1.onComplete(); + + // verify the onCompleted() method is invoked on the wrapped StreamObserver + verify(helidonPublishStreamObserver1, timeout(testTimeout).times(1)).onComplete(); + } // @Test // public void testSubscribeBlockStream() throws IOException { @@ -568,42 +584,41 @@ public void tearDown() { // timeout(testTimeout).times(1)).onNext(expectedSubscriberStreamNotAvailable); // } - private static void verifySubscribeStreamResponse( - int numberOfBlocks, - int blockItemsToWait, - int blockItemsToSkip, - Pipeline streamObserver, - List blockItems) { - - // Each block has 10 BlockItems. Verify all the BlockItems - // in a given block per iteration. - for (int block = 0; block < numberOfBlocks; block += 10) { - - if (block < blockItemsToWait || block >= blockItemsToSkip) { - continue; - } - - final BlockItem headerBlockItem = blockItems.get(block); - final SubscribeStreamResponse headerSubStreamResponse = buildSubscribeStreamResponse(headerBlockItem); - - final BlockItem bodyBlockItem = blockItems.get(block + 1); - final SubscribeStreamResponse bodySubStreamResponse = buildSubscribeStreamResponse(bodyBlockItem); - - final BlockItem stateProofBlockItem = blockItems.get(block + 9); - final SubscribeStreamResponse stateProofStreamResponse = buildSubscribeStreamResponse(stateProofBlockItem); - - verify(streamObserver, timeout(testTimeout).times(1)).onNext(headerSubStreamResponse); - verify(streamObserver, timeout(testTimeout).times(8)).onNext(bodySubStreamResponse); - verify(streamObserver, timeout(testTimeout).times(1)).onNext(stateProofStreamResponse); - } - } + // private static void verifySubscribeStreamResponse( + // int numberOfBlocks, + // int blockItemsToWait, + // int blockItemsToSkip, + // Pipeline pipeline, + // List blockItems) { + // + // // Each block has 10 BlockItems. Verify all the BlockItems + // // in a given block per iteration. + // for (int block = 0; block < numberOfBlocks; block += 10) { + // + // if (block < blockItemsToWait || block >= blockItemsToSkip) { + // continue; + // } + // + // final BlockItemUnparsed headerBlockItem = blockItems.get(block); + // final Bytes headerSubStreamResponse = buildSubscribeStreamResponse(headerBlockItem); + // + // final BlockItemUnparsed bodyBlockItem = blockItems.get(block + 1); + // final Bytes bodySubStreamResponse = buildSubscribeStreamResponse(bodyBlockItem); + // + // final BlockItemUnparsed stateProofBlockItem = blockItems.get(block + 9); + // final Bytes stateProofStreamResponse = buildSubscribeStreamResponse(stateProofBlockItem); + // + // verify(pipeline, timeout(testTimeout).times(1)).onNext(headerSubStreamResponse); + // verify(pipeline, timeout(testTimeout).times(8)).onNext(bodySubStreamResponse); + // verify(pipeline, timeout(testTimeout).times(1)).onNext(stateProofStreamResponse); + // } + // } - private static SubscribeStreamResponse buildSubscribeStreamResponse(BlockItem blockItem) { - final BlockItemSet subscribeStreamResponseSet = - BlockItemSet.newBuilder().blockItems(blockItem).build(); - return SubscribeStreamResponse.newBuilder() - .blockItems(subscribeStreamResponseSet) - .build(); + private static Bytes buildSubscribeStreamResponse(BlockItemUnparsed blockItem) { + return SubscribeStreamResponseUnparsed.PROTOBUF.toBytes(SubscribeStreamResponseUnparsed.newBuilder() + .blockItems( + BlockItemSetUnparsed.newBuilder().blockItems(blockItem).build()) + .build()); } private static PublishStreamResponse buildEndOfStreamResponse() { diff --git a/stream/src/main/proto/com/hedera/hapi/block/unparsed.proto b/stream/src/main/proto/com/hedera/hapi/block/unparsed.proto index 069d0e638..66b69286f 100644 --- a/stream/src/main/proto/com/hedera/hapi/block/unparsed.proto +++ b/stream/src/main/proto/com/hedera/hapi/block/unparsed.proto @@ -9,7 +9,7 @@ option java_multiple_files = true; import "block_service.proto"; message PublishStreamRequestUnparsed { - bytes block_items = 1; + BlockItemSetUnparsed block_items = 1; } message SubscribeStreamResponseUnparsed {