From 307398296b6ea219b9c56a44c56545b8c1ab6247 Mon Sep 17 00:00:00 2001 From: Michal T Date: Fri, 10 Nov 2023 10:19:29 +0100 Subject: [PATCH] MQTT Pub Sub correct implementation --- .../claire/clients/MessagingClient.java | 1 + .../bundled/BundledMessagingClient.java | 4 ++ .../clients/container/BaseJMSClient.java | 1 + .../claire/clients/container/MqttClient.java | 51 ++++++++++++------- .../clients/container/SystemtestClient.java | 8 ++- .../claire/clients/StClientDeployment.java | 6 +-- .../brokerqe/claire/AbstractSystemTests.java | 7 ++- .../io/brokerqe/claire/smoke/SmokeTests.java | 9 ++-- 8 files changed, 57 insertions(+), 30 deletions(-) diff --git a/common/src/main/java/io/brokerqe/claire/clients/MessagingClient.java b/common/src/main/java/io/brokerqe/claire/clients/MessagingClient.java index d83cc6db..0f34f1bd 100644 --- a/common/src/main/java/io/brokerqe/claire/clients/MessagingClient.java +++ b/common/src/main/java/io/brokerqe/claire/clients/MessagingClient.java @@ -15,6 +15,7 @@ public interface MessagingClient { int sendMessages(); int receiveMessages(); void subscribe(); + void unsubscribe(); Object getSentMessages(); Object getReceivedMessages(); boolean compareMessages(); diff --git a/common/src/main/java/io/brokerqe/claire/clients/bundled/BundledMessagingClient.java b/common/src/main/java/io/brokerqe/claire/clients/bundled/BundledMessagingClient.java index 50473aab..0a4a7373 100644 --- a/common/src/main/java/io/brokerqe/claire/clients/bundled/BundledMessagingClient.java +++ b/common/src/main/java/io/brokerqe/claire/clients/bundled/BundledMessagingClient.java @@ -168,6 +168,10 @@ public void subscribe() { subscriberExecutor.execBackgroundCommand(command); } + public void unsubscribe() { + throw new UnsupportedOperationException("[" + deployableClient.getContainerName() + "] Unsubscribe not supported on bundled clients"); + } + public int getSubscribedMessages() { String cmdOutput = subscriberExecutor.getBackgroundCommandData(5); return parseMessageCount(cmdOutput, CONSUMER); diff --git a/common/src/main/java/io/brokerqe/claire/clients/container/BaseJMSClient.java b/common/src/main/java/io/brokerqe/claire/clients/container/BaseJMSClient.java index 9f1a1169..39fdd65d 100644 --- a/common/src/main/java/io/brokerqe/claire/clients/container/BaseJMSClient.java +++ b/common/src/main/java/io/brokerqe/claire/clients/container/BaseJMSClient.java @@ -175,6 +175,7 @@ public void subscribe() { public int getSubscribedMessages() { String cmdOutput = subscriberExecWatch.getBackgroundCommandData(5); this.receivedMessages = parseMessages(cmdOutput); + LOGGER.debug("[{}][RX] \n{}", deployableClient.getContainerName(), cmdOutput); return receivedMessages.size(); } diff --git a/common/src/main/java/io/brokerqe/claire/clients/container/MqttClient.java b/common/src/main/java/io/brokerqe/claire/clients/container/MqttClient.java index 5e7a5e60..aa87566b 100644 --- a/common/src/main/java/io/brokerqe/claire/clients/container/MqttClient.java +++ b/common/src/main/java/io/brokerqe/claire/clients/container/MqttClient.java @@ -39,10 +39,11 @@ public abstract class MqttClient extends SystemtestClient { private String keystore; private String saslMechanism; private boolean secured; - private Executor subscriberExecWatch; + private Executor subscriberExecutor; private List sentMessages; private List receivedMessages; - private final String identifier = TestUtils.generateRandomName(); + private final String identifierSender = TestUtils.generateRandomName(); + private final String identifierReceiver = identifierSender + "receiver"; private String randomContent = null; String getCliExecutableBase() { @@ -117,16 +118,16 @@ private void assignBrokerUri(String brokerUri) { @Override public int sendMessages() { String cmdOutput; - String[] command = constructClientCommand(MessagingClient.SENDER); + String[] command = constructClientCommand(SENDER); cmdOutput = (String) deployableClient.getExecutor().executeCommand(Constants.DURATION_3_MINUTES, command); LOGGER.debug("[{}][TX] \n{}", deployableClient.getContainerName(), cmdOutput); - this.sentMessages = parseMessages(cmdOutput); + this.sentMessages = parseMessages(cmdOutput, SENDER); return sentMessages.size(); } @Override public int receiveMessages() { - if (subscriberExecWatch != null) { + if (subscriberExecutor != null) { // executed client on background return getSubscribedMessages(); } else { @@ -135,7 +136,7 @@ public int receiveMessages() { String[] command = constructClientCommand(MessagingClient.RECEIVER); cmdOutput = (String) deployableClient.getExecutor().executeCommand(Constants.DURATION_3_MINUTES, command); LOGGER.debug("[{}][RX] \n{}", deployableClient.getContainerName(), cmdOutput); - this.receivedMessages = parseMessages(cmdOutput); + this.receivedMessages = parseMessages(cmdOutput, RECEIVER); return receivedMessages.size(); } } @@ -148,12 +149,22 @@ public void subscribe() { } else { command = constructClientCommand(MessagingClient.RECEIVER, 60); } - subscriberExecWatch = deployableClient.getExecutor(); - subscriberExecWatch.execBackgroundCommand(command); + subscriberExecutor = deployableClient.getExecutor(); + subscriberExecutor.execBackgroundCommand(command); LOGGER.debug("[{}][SUBSCRIBE] Sleeping for while to ensure subscriber is connected before moving forward", deployableClient.getContainerName()); TestUtils.threadSleep(Constants.DURATION_5_SECONDS); } + @Override + public void unsubscribe() { + String[] command = {"sh", "-c", + String.format("for proc in /proc/[0-9]*/cmdline; " + + "do echo $(basename $(dirname $proc)) $(cat $proc | tr \"\\0\" \" \"); done | " + + "grep %s | grep -v grep | cut -d ' ' -f 1 | xargs kill", identifierReceiver)}; + deployableClient.getExecutor().executeCommand(command); + LOGGER.debug("[{}][UNSUBSCRIBE] MQTT Client with {}", deployableClient.getContainerName(), identifierReceiver); + } + public String testBroker() { String cmdOutput; String[] command = constructClientCommand("test"); @@ -205,8 +216,8 @@ public boolean compareMessages() { } public int getSubscribedMessages() { - String cmdOutput = subscriberExecWatch.getBackgroundCommandData(5); - this.receivedMessages = parseMessages(cmdOutput); + String cmdOutput = subscriberExecutor.getBackgroundCommandData(5); + this.receivedMessages = parseMessages(cmdOutput, RECEIVER); return receivedMessages.size(); } @@ -225,11 +236,11 @@ private String[] constructClientCommandOptions(String clientType) { // we need to use debug mode, to be able to parse sent/received message if (clientType.equals(SENDER)) { options = senderOptions; - options.put("identifier", identifier); + options.put("identifier", identifierSender); commandBuild.append("publish -d --userProperty id=" + TestUtils.getRandomString(6) + ""); } else if (clientType.equals(RECEIVER)) { options = receiverOptions; - options.put("identifier", identifier); + options.put("identifier", identifierReceiver); commandBuild.append("subscribe -d"); } else if (clientType.equals("test")) { options = testOptions; @@ -255,6 +266,7 @@ private String[] constructClientCommand(String clientType, int timeout) { // cli-hivemq-mqtt-subscribe -t test -h 10.128.3.9 -p 1883 --mqttVersion=5 -d -i=tralal --qos=2 String command; String client; + String identifier; if (!destinationAddress.equals(destinationQueue)) { LOGGER.warn("[{}] MQTT client does not support FQQN (!?) Using provided address instead {}", deployableClient.getContainerName(), destinationAddress); } @@ -262,8 +274,10 @@ private String[] constructClientCommand(String clientType, int timeout) { if (clientType.equals(SENDER)) { client = getCliExecutableBase() + "-publish"; randomContent = TestUtils.getRandomString(26); + identifier = identifierSender; } else if (clientType.equals(RECEIVER)) { client = getCliExecutableBase() + "-subscribe"; + identifier = identifierReceiver; } else { throw new ClaireRuntimeException("Unsupported client type!" + clientType); } @@ -297,16 +311,15 @@ private String[] constructClientCommand(String clientType, int timeout) { return command.split(" "); } - private List parseMessages(String output) { + private List parseMessages(String output, String clientType) { if (output == null) { throw new ClaireRuntimeException("Provided unexpected empty/null command output!"); } List jsonMessages = new ArrayList<>(); - List lines = List.of(output.split("\n")); + List lines = List.of(output.replaceAll("\\s+", " ").split("Client ")); Map data = new HashMap<>(); - Map userProperties = new HashMap<>(); for (String line : lines) { - if (line.contains("sending PUBLISH") || line.contains("received PUBLISH")) { + if (clientType.equals(SENDER) && line.contains("sending PUBLISH") || clientType.equals(RECEIVER) && line.contains("received PUBLISH")) { String workLine = line.substring(line.indexOf("MqttPublish{") + "MqttPublish{".length(), line.length() - 1); if (workLine.contains("userProperties=[(")) { String userProps = workLine.substring(workLine.indexOf("userProperties=[") + "userProperties=[".length(), workLine.lastIndexOf(")]") + 1); @@ -322,8 +335,10 @@ private List parseMessages(String output) { String[] splitted = item.split("="); data.put(splitted[0].trim(), splitted[1].trim()); } - String content = line.substring(line.indexOf("('") + 2, line.indexOf("')")); - data.put("content", content); + if (line.contains("content")) { + String content = line.substring(line.indexOf("('") + 2, line.indexOf("')")); + data.put("content", content); + } jsonMessages.add(new JSONObject(data)); break; } diff --git a/common/src/main/java/io/brokerqe/claire/clients/container/SystemtestClient.java b/common/src/main/java/io/brokerqe/claire/clients/container/SystemtestClient.java index 57ea52ab..66f43628 100644 --- a/common/src/main/java/io/brokerqe/claire/clients/container/SystemtestClient.java +++ b/common/src/main/java/io/brokerqe/claire/clients/container/SystemtestClient.java @@ -19,6 +19,7 @@ public abstract class SystemtestClient implements MessagingClient { protected DeployableClient deployableClient; protected Map senderOptions = null; protected Map receiverOptions = null; + Logger logger = LoggerFactory.getLogger(MessagingClient.class); public boolean compareMessages(Object sentMessagesObject, Object receivedMessagesObject) { if (sentMessagesObject == null || receivedMessagesObject == null) { @@ -31,7 +32,6 @@ public boolean compareMessages(Object sentMessagesObject, Object receivedMessage public boolean compareMessages(List sentMessages, List receivedMessages) { // Method compares only number of sent and received messages and real comparison of messageIDs (if is present in other group) - Logger logger = LoggerFactory.getLogger(MessagingClient.class); if (sentMessages.size() != receivedMessages.size()) { logger.warn("[{}] Sent {} and received {} messages are not same!", deployableClient.getContainerName(), sentMessages.size(), receivedMessages.size()); return false; @@ -58,4 +58,10 @@ public boolean compareMessages(List sentMessages, List r return true; } } + + @Override + public void unsubscribe() { + logger.warn("[{}] Unsubscribe not supported for systemtests-clients (only mqtt client)", deployableClient.getContainerName()); + throw new UnsupportedOperationException("[" + deployableClient.getContainerName() + "] Unsubscribe not supported on systemtest clients clients"); + } } diff --git a/operator-suite/src/main/java/io/brokerqe/claire/clients/StClientDeployment.java b/operator-suite/src/main/java/io/brokerqe/claire/clients/StClientDeployment.java index 4344302d..61cdc928 100644 --- a/operator-suite/src/main/java/io/brokerqe/claire/clients/StClientDeployment.java +++ b/operator-suite/src/main/java/io/brokerqe/claire/clients/StClientDeployment.java @@ -28,7 +28,6 @@ public abstract class StClientDeployment implements KubernetesDeployableClient { private static final Logger LOGGER = LoggerFactory.getLogger(StClientDeployment.class); private final String namespace; static KubeClient kubeClient = ResourceManager.getKubeClient(); - private ExecutorOperator executor; private Pod pod; private Deployment deployment; @@ -60,10 +59,7 @@ public String getContainerName() { @Override public Executor getExecutor() { - if (this.executor == null) { - this.executor = new ExecutorOperator(getContainer()); - } - return this.executor; + return new ExecutorOperator(getContainer()); } @Override diff --git a/operator-suite/src/test/java/io/brokerqe/claire/AbstractSystemTests.java b/operator-suite/src/test/java/io/brokerqe/claire/AbstractSystemTests.java index 09ec047e..95700dee 100644 --- a/operator-suite/src/test/java/io/brokerqe/claire/AbstractSystemTests.java +++ b/operator-suite/src/test/java/io/brokerqe/claire/AbstractSystemTests.java @@ -315,9 +315,12 @@ public void testMessaging(ClientType clientType, String namespace, Pod brokerPod } else if (clientType.equals(ClientType.BUNDLED_CORE)) { messagingClient = ResourceManager.createMessagingClient(ClientType.BUNDLED_CORE, brokerPod, "61616", address, messages, username, password); } else if (clientType.equals(ClientType.ST_AMQP_QPID_JMS)) { - clients = ResourceManager.deployClientsContainer(namespace); Pod clientsPod = getClient().getFirstPodByPrefixName(namespace, Constants.PREFIX_SYSTEMTESTS_CLIENTS); - messagingClient = ResourceManager.createMessagingClient(ClientType.BUNDLED_AMQP, clientsPod, brokerPod.getStatus().getPodIP(), "5672", address, messages, username, password); + if (clientsPod == null) { + clients = ResourceManager.deployClientsContainer(namespace); + clientsPod = getClient().getFirstPodByPrefixName(namespace, Constants.PREFIX_SYSTEMTESTS_CLIENTS); + } + messagingClient = ResourceManager.createMessagingClient(ClientType.ST_AMQP_QPID_JMS, clientsPod, brokerPod.getStatus().getPodIP(), "5672", address, messages, username, password); } else { throw new ClaireRuntimeException("Unknown/Unsupported client type!" + clientType); } diff --git a/operator-suite/src/test/java/io/brokerqe/claire/smoke/SmokeTests.java b/operator-suite/src/test/java/io/brokerqe/claire/smoke/SmokeTests.java index e2fcf3fc..32ee47e6 100644 --- a/operator-suite/src/test/java/io/brokerqe/claire/smoke/SmokeTests.java +++ b/operator-suite/src/test/java/io/brokerqe/claire/smoke/SmokeTests.java @@ -121,7 +121,6 @@ void sendReceiveAMQPMessageTest() { } @Test - @Disabled("MQTT Output Parsing is broken") @Tag(Constants.TAG_SMOKE) void sendReceiveSystemTestsClientMessageTest() { Deployment clients = ResourceManager.deployClientsContainer(testNamespace); @@ -147,18 +146,20 @@ void sendReceiveSystemTestsClientMessageTest() { assertThat(messagingClient.compareMessages(), is(true)); // Subscriber - Publisher - LOGGER.info("[{}] Starting AMQP subscriber - publisher test", testNamespace); + LOGGER.info("[{}] Starting Bundled AMQP subscriber - publisher test", testNamespace); testMessaging(ClientType.BUNDLED_AMQP, testNamespace, brokerPod, myAddress, 10); + LOGGER.info("[{}] Starting SystemTest clients AMQP subscriber - publisher test", testNamespace); + testMessaging(ClientType.ST_AMQP_QPID_JMS, testNamespace, brokerPod, myAddress, 10); // MQTT Subscriber - Publisher - LOGGER.info("[{}] Starting MQTT subscriber - publisher test", testNamespace); + LOGGER.info("[{}] Starting SystemTest clients MQTT subscriber - publisher test", testNamespace); msgsExpected = 1; - // Subscriber - Publisher MessagingClient messagingMqttClient = ResourceManager.createMessagingClient(ClientType.ST_MQTT_V5, clientsPod, brokerPod.getStatus().getPodIP(), "5672", myAddress, msgsExpected); messagingMqttClient.subscribe(); sent = messagingMqttClient.sendMessages(); + messagingMqttClient.unsubscribe(); received = messagingMqttClient.receiveMessages(); assertThat(sent, equalTo(msgsExpected)); assertThat(sent, equalTo(received));