Skip to content

Commit

Permalink
GH#323: Fix syslogStream tests (#423)
Browse files Browse the repository at this point in the history
* update rlp_03 to 9.0.0; fix syslogStreamTest; add NO_PRECEDING_AGGREGATE CommandProperty to throw error if using syslog stream with preceding aggregate.

* fix TeragrepSyslogStep.toString() to contain parameters instead of hashCode; change assertion to equals.
  • Loading branch information
eemhu authored Dec 2, 2024
1 parent 01bfd88 commit 19221ea
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 45 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
<teragrep.pth_03.version>9.2.0</teragrep.pth_03.version>
<teragrep.pth_06.version>3.3.2</teragrep.pth_06.version>
<teragrep.rlp_01.version>4.0.1</teragrep.rlp_01.version>
<teragrep.rlp_03.version>1.7.6</teragrep.rlp_03.version>
<teragrep.rlp_03.version>9.0.0</teragrep.rlp_03.version>
</properties>
<dependencies>
<!--DPL-dependencies -->
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/teragrep/pth10/ast/StepList.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@ private void analyze() {
}
}

if (step.hasProperty(AbstractStep.CommandProperty.NO_PRECEDING_AGGREGATE)) {
if (aggregateCount > 0) {
throw new RuntimeException("Step '" + step + "' cannot be used after aggregations!");
}
}

if (step.hasProperty(AbstractStep.CommandProperty.SEQUENTIAL_ONLY)) {
LOGGER.info("[Analyze] Sequential only command: <{}>", step);
// set the breakpoint just once
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/teragrep/pth10/steps/AbstractStep.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public enum CommandProperty {
IGNORE_DEFAULT_SORTING, // Command applies a certain order to the rows
SEQUENTIAL_ONLY, // Works only in Sequential mode (forEachBatch)
AGGREGATE, // If there are multiple aggregate commands, switch to sequential mode is necessary
REQUIRE_PRECEDING_AGGREGATE // this command requires an aggregate command before it
REQUIRE_PRECEDING_AGGREGATE, // this command requires an aggregate command before it
NO_PRECEDING_AGGREGATE // command does not allow an aggregate command before it
}

protected final Set<CommandProperty> properties = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class TeragrepSyslogStep extends AbstractStep {
public TeragrepSyslogStep(String relpHost, int relpPort) {
this.relpHost = relpHost;
this.relpPort = relpPort;
this.properties.add(CommandProperty.NO_PRECEDING_AGGREGATE);
}

@Override
Expand All @@ -75,4 +76,9 @@ public Dataset<Row> get(Dataset<Row> dataset) {

return dataset.map(syslogStreamer, dataset.exprEnc());
}

@Override
public String toString() {
return String.format("TeragrepSyslogStep{relpHost=%s, relpPort=%d}", relpHost, relpPort);
}
}
144 changes: 101 additions & 43 deletions src/test/java/com/teragrep/pth10/SyslogStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,15 @@
*/
package com.teragrep.pth10;

import com.teragrep.rlp_03.Server;
import com.teragrep.rlp_03.SyslogFrameProcessor;
import org.apache.spark.sql.streaming.StreamingQueryException;
import com.teragrep.net_01.channel.socket.PlainFactory;
import com.teragrep.net_01.eventloop.EventLoop;
import com.teragrep.net_01.eventloop.EventLoopFactory;
import com.teragrep.net_01.server.Server;
import com.teragrep.net_01.server.ServerFactory;
import com.teragrep.rlp_03.frame.FrameDelegationClockFactory;
import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate;
import com.teragrep.rlp_03.frame.delegate.FrameContext;
import com.teragrep.rlp_03.frame.delegate.FrameDelegate;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
Expand All @@ -57,9 +63,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* Tests for | teragrep exec syslog stream Uses streaming datasets
Expand All @@ -71,6 +83,14 @@ public class SyslogStreamTest {

private static final Logger LOGGER = LoggerFactory.getLogger(SyslogStreamTest.class);

private final List<String> messages = new ArrayList<>();
private final int listenPort = 9999;

private Server server;
private EventLoop eventLoop;
private Thread eventLoopThread;
private ExecutorService executorService;

private final String testFile = "src/test/resources/regexTransformationTest_data*.jsonl"; // * to make the path into a directory path
private final StructType testSchema = new StructType(new StructField[] {
new StructField("_time", DataTypes.TimestampType, false, new MetadataBuilder().build()),
Expand All @@ -95,74 +115,112 @@ void setEnv() {
@BeforeEach
void setUp() {
this.streamingTestUtil.setUp();
messages.clear();
serverSetup();
}

@AfterEach
void tearDown() {
this.streamingTestUtil.tearDown();
eventLoop.stop();
Assertions.assertDoesNotThrow(() -> eventLoopThread.join());
executorService.shutdown();
Assertions.assertDoesNotThrow(() -> server.close());
}

private void serverSetup() {
executorService = Executors.newFixedThreadPool(1);

Consumer<FrameContext> syslogConsumer = new Consumer<FrameContext>() {

// NOTE: synchronized because frameDelegateSupplier returns this instance for all the parallel connections
@Override
public synchronized void accept(FrameContext frameContext) {
messages.add(frameContext.relpFrame().payload().toString());
}
};

/*
* New instance of the frameDelegate is provided for every connection
*/
Supplier<FrameDelegate> frameDelegateSupplier = () -> new DefaultFrameDelegate(syslogConsumer);

/*
* EventLoop is used to notice any events from the connections
*/
EventLoopFactory eventLoopFactory = new EventLoopFactory();
try {
eventLoop = eventLoopFactory.create();
}
catch (IOException e) {
throw new RuntimeException(e);
}

eventLoopThread = new Thread(eventLoop);
/*
* eventLoopThread must run, otherwise nothing will be processed
*/
eventLoopThread.start();

/*
* ServerFactory is used to create server instances
*/
ServerFactory serverFactory = new ServerFactory(
eventLoop,
executorService,
new PlainFactory(),
new FrameDelegationClockFactory(frameDelegateSupplier)
);

try {
server = serverFactory.create(listenPort);
System.out.println("server started at port <" + listenPort + ">");
}
catch (IOException ioException) {
throw new UncheckedIOException(ioException);
}
}

// ----------------------------------------
// Tests
// ----------------------------------------

@Disabled(value = "RLP-03 has to be updated") /* FIXME: Update rlp_03 library to work with new rlp_01 version! */
@Test
@DisabledIfSystemProperty(
named = "skipSparkTest",
matches = "true"
) // teragrep exec syslog stream
public void syslogStreamSendingTest() {
final int expectedSyslogs = 10;
AtomicInteger numberOfSyslogMessagesSent = new AtomicInteger();
AtomicReferenceArray<String> arrayOfSyslogs = new AtomicReferenceArray<>(expectedSyslogs);

final Consumer<byte[]> cbFunction = (message) -> {
LOGGER.debug("Server received the following syslog message:\n <[{}]>\n-----", new String(message));
Assertions.assertTrue(numberOfSyslogMessagesSent.get() <= expectedSyslogs);
arrayOfSyslogs.set(numberOfSyslogMessagesSent.getAndIncrement(), new String(message));
};

final int port = 9999;
final Server server = new Server(port, new SyslogFrameProcessor(cbFunction));
Assertions.assertDoesNotThrow(server::start);

streamingTestUtil
.performDPLTest(
"index=index_A | teragrep exec syslog stream host 127.0.0.1 port " + port, testFile, ds -> {
LOGGER.debug("Syslog msgs = <{}>", numberOfSyslogMessagesSent.get());
Assertions.assertEquals(expectedSyslogs, numberOfSyslogMessagesSent.get());

for (int i = 0; i < expectedSyslogs; i++) {
String s = arrayOfSyslogs.get(i);
for (int j = 0; j < expectedSyslogs; j++) {
if (i == j)
continue;
Assertions.assertFalse(arrayOfSyslogs.compareAndSet(j, s, s));
}

}
Assertions.assertAll("stop server", server::stop);
"index=index_A | teragrep exec syslog stream host 127.0.0.1 port " + listenPort, testFile,
ds -> {
LOGGER.debug("Syslog msgs = <{}>", messages.size());
Assertions.assertEquals(10, messages.size());
Assertions.assertEquals(10, new HashSet<>(messages).size());
}
);
}

@Disabled(value = "RLP-03 has to be updated") // FIXME: update rlp_03
@Test
@DisabledIfSystemProperty(
named = "skipSparkTest",
matches = "true"
) // teragrep exec syslog stream, with preceding aggregation command
public void syslogStreamSendingFailureTest() {
RuntimeException rte = streamingTestUtil
.performThrowingDPLTest(
RuntimeException.class,
"index=index_A | stats count(_raw) as craw | teragrep exec syslog stream host 127.1.0.1 port 9998",
testFile, ds -> {
}
);

Assertions.assertNotNull(rte);
Assertions
.assertThrows(
StreamingQueryException.class,
() -> streamingTestUtil
.performDPLTest(
"index=index_A | stats count(_raw) as craw | teragrep exec syslog stream host 127.0.0.1 port 9998",
testFile, ds -> {
}
)
.assertEquals(
rte.getMessage(),
"Step 'TeragrepSyslogStep{relpHost=127.1.0.1, relpPort=9998}' cannot be used after aggregations!"
);
}
}

0 comments on commit 19221ea

Please sign in to comment.