diff --git a/pom.xml b/pom.xml index 91ff521..d8f8ec5 100644 --- a/pom.xml +++ b/pom.xml @@ -55,7 +55,7 @@ 9.2.0 3.3.2 4.0.1 - 1.7.6 + 9.0.0 diff --git a/src/main/java/com/teragrep/pth10/ast/StepList.java b/src/main/java/com/teragrep/pth10/ast/StepList.java index 846fd32..58874ea 100644 --- a/src/main/java/com/teragrep/pth10/ast/StepList.java +++ b/src/main/java/com/teragrep/pth10/ast/StepList.java @@ -237,6 +237,12 @@ private void analyze() { } } + if (step.hasProperty(AbstractStep.CommandProperty.NO_PRECEDING_AGGREGATE)) { + if (aggregateCount > 0) { + throw new RuntimeException("Step '" + step + "' cannot be used after aggregations!"); + } + } + if (step.hasProperty(AbstractStep.CommandProperty.SEQUENTIAL_ONLY)) { LOGGER.info("[Analyze] Sequential only command: <{}>", step); // set the breakpoint just once diff --git a/src/main/java/com/teragrep/pth10/steps/AbstractStep.java b/src/main/java/com/teragrep/pth10/steps/AbstractStep.java index 5c0f19a..e16e209 100644 --- a/src/main/java/com/teragrep/pth10/steps/AbstractStep.java +++ b/src/main/java/com/teragrep/pth10/steps/AbstractStep.java @@ -59,7 +59,8 @@ public enum CommandProperty { IGNORE_DEFAULT_SORTING, // Command applies a certain order to the rows SEQUENTIAL_ONLY, // Works only in Sequential mode (forEachBatch) AGGREGATE, // If there are multiple aggregate commands, switch to sequential mode is necessary - REQUIRE_PRECEDING_AGGREGATE // this command requires an aggregate command before it + REQUIRE_PRECEDING_AGGREGATE, // this command requires an aggregate command before it + NO_PRECEDING_AGGREGATE // command does not allow an aggregate command before it } protected final Set properties = new HashSet<>(); diff --git a/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepSyslogStep.java b/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepSyslogStep.java index 9fb72e0..b12d285 100644 --- a/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepSyslogStep.java +++ b/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepSyslogStep.java @@ -66,6 +66,7 @@ public class TeragrepSyslogStep extends AbstractStep { public TeragrepSyslogStep(String relpHost, int relpPort) { this.relpHost = relpHost; this.relpPort = relpPort; + this.properties.add(CommandProperty.NO_PRECEDING_AGGREGATE); } @Override @@ -75,4 +76,9 @@ public Dataset get(Dataset dataset) { return dataset.map(syslogStreamer, dataset.exprEnc()); } + + @Override + public String toString() { + return String.format("TeragrepSyslogStep{relpHost=%s, relpPort=%d}", relpHost, relpPort); + } } diff --git a/src/test/java/com/teragrep/pth10/SyslogStreamTest.java b/src/test/java/com/teragrep/pth10/SyslogStreamTest.java index 76692dc..febbdaa 100644 --- a/src/test/java/com/teragrep/pth10/SyslogStreamTest.java +++ b/src/test/java/com/teragrep/pth10/SyslogStreamTest.java @@ -45,9 +45,15 @@ */ package com.teragrep.pth10; -import com.teragrep.rlp_03.Server; -import com.teragrep.rlp_03.SyslogFrameProcessor; -import org.apache.spark.sql.streaming.StreamingQueryException; +import com.teragrep.net_01.channel.socket.PlainFactory; +import com.teragrep.net_01.eventloop.EventLoop; +import com.teragrep.net_01.eventloop.EventLoopFactory; +import com.teragrep.net_01.server.Server; +import com.teragrep.net_01.server.ServerFactory; +import com.teragrep.rlp_03.frame.FrameDelegationClockFactory; +import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate; +import com.teragrep.rlp_03.frame.delegate.FrameContext; +import com.teragrep.rlp_03.frame.delegate.FrameDelegate; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.MetadataBuilder; import org.apache.spark.sql.types.StructField; @@ -57,9 +63,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReferenceArray; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Consumer; +import java.util.function.Supplier; /** * Tests for | teragrep exec syslog stream Uses streaming datasets @@ -71,6 +83,14 @@ public class SyslogStreamTest { private static final Logger LOGGER = LoggerFactory.getLogger(SyslogStreamTest.class); + private final List messages = new ArrayList<>(); + private final int listenPort = 9999; + + private Server server; + private EventLoop eventLoop; + private Thread eventLoopThread; + private ExecutorService executorService; + private final String testFile = "src/test/resources/regexTransformationTest_data*.jsonl"; // * to make the path into a directory path private final StructType testSchema = new StructType(new StructField[] { new StructField("_time", DataTypes.TimestampType, false, new MetadataBuilder().build()), @@ -95,74 +115,112 @@ void setEnv() { @BeforeEach void setUp() { this.streamingTestUtil.setUp(); + messages.clear(); + serverSetup(); } @AfterEach void tearDown() { this.streamingTestUtil.tearDown(); + eventLoop.stop(); + Assertions.assertDoesNotThrow(() -> eventLoopThread.join()); + executorService.shutdown(); + Assertions.assertDoesNotThrow(() -> server.close()); + } + + private void serverSetup() { + executorService = Executors.newFixedThreadPool(1); + + Consumer syslogConsumer = new Consumer() { + + // NOTE: synchronized because frameDelegateSupplier returns this instance for all the parallel connections + @Override + public synchronized void accept(FrameContext frameContext) { + messages.add(frameContext.relpFrame().payload().toString()); + } + }; + + /* + * New instance of the frameDelegate is provided for every connection + */ + Supplier frameDelegateSupplier = () -> new DefaultFrameDelegate(syslogConsumer); + + /* + * EventLoop is used to notice any events from the connections + */ + EventLoopFactory eventLoopFactory = new EventLoopFactory(); + try { + eventLoop = eventLoopFactory.create(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + + eventLoopThread = new Thread(eventLoop); + /* + * eventLoopThread must run, otherwise nothing will be processed + */ + eventLoopThread.start(); + + /* + * ServerFactory is used to create server instances + */ + ServerFactory serverFactory = new ServerFactory( + eventLoop, + executorService, + new PlainFactory(), + new FrameDelegationClockFactory(frameDelegateSupplier) + ); + + try { + server = serverFactory.create(listenPort); + System.out.println("server started at port <" + listenPort + ">"); + } + catch (IOException ioException) { + throw new UncheckedIOException(ioException); + } } // ---------------------------------------- // Tests // ---------------------------------------- - @Disabled(value = "RLP-03 has to be updated") /* FIXME: Update rlp_03 library to work with new rlp_01 version! */ @Test @DisabledIfSystemProperty( named = "skipSparkTest", matches = "true" ) // teragrep exec syslog stream public void syslogStreamSendingTest() { - final int expectedSyslogs = 10; - AtomicInteger numberOfSyslogMessagesSent = new AtomicInteger(); - AtomicReferenceArray arrayOfSyslogs = new AtomicReferenceArray<>(expectedSyslogs); - - final Consumer cbFunction = (message) -> { - LOGGER.debug("Server received the following syslog message:\n <[{}]>\n-----", new String(message)); - Assertions.assertTrue(numberOfSyslogMessagesSent.get() <= expectedSyslogs); - arrayOfSyslogs.set(numberOfSyslogMessagesSent.getAndIncrement(), new String(message)); - }; - - final int port = 9999; - final Server server = new Server(port, new SyslogFrameProcessor(cbFunction)); - Assertions.assertDoesNotThrow(server::start); - streamingTestUtil .performDPLTest( - "index=index_A | teragrep exec syslog stream host 127.0.0.1 port " + port, testFile, ds -> { - LOGGER.debug("Syslog msgs = <{}>", numberOfSyslogMessagesSent.get()); - Assertions.assertEquals(expectedSyslogs, numberOfSyslogMessagesSent.get()); - - for (int i = 0; i < expectedSyslogs; i++) { - String s = arrayOfSyslogs.get(i); - for (int j = 0; j < expectedSyslogs; j++) { - if (i == j) - continue; - Assertions.assertFalse(arrayOfSyslogs.compareAndSet(j, s, s)); - } - - } - Assertions.assertAll("stop server", server::stop); + "index=index_A | teragrep exec syslog stream host 127.0.0.1 port " + listenPort, testFile, + ds -> { + LOGGER.debug("Syslog msgs = <{}>", messages.size()); + Assertions.assertEquals(10, messages.size()); + Assertions.assertEquals(10, new HashSet<>(messages).size()); } ); } - @Disabled(value = "RLP-03 has to be updated") // FIXME: update rlp_03 @Test @DisabledIfSystemProperty( named = "skipSparkTest", matches = "true" ) // teragrep exec syslog stream, with preceding aggregation command public void syslogStreamSendingFailureTest() { + RuntimeException rte = streamingTestUtil + .performThrowingDPLTest( + RuntimeException.class, + "index=index_A | stats count(_raw) as craw | teragrep exec syslog stream host 127.1.0.1 port 9998", + testFile, ds -> { + } + ); + + Assertions.assertNotNull(rte); Assertions - .assertThrows( - StreamingQueryException.class, - () -> streamingTestUtil - .performDPLTest( - "index=index_A | stats count(_raw) as craw | teragrep exec syslog stream host 127.0.0.1 port 9998", - testFile, ds -> { - } - ) + .assertEquals( + rte.getMessage(), + "Step 'TeragrepSyslogStep{relpHost=127.1.0.1, relpPort=9998}' cannot be used after aggregations!" ); } }