Skip to content

Commit

Permalink
MQTT Pub Sub correct implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
michalxo committed Nov 10, 2023
1 parent a203fbb commit 3073982
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public interface MessagingClient {
int sendMessages();
int receiveMessages();
void subscribe();
void unsubscribe();
Object getSentMessages();
Object getReceivedMessages();
boolean compareMessages();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ public void subscribe() {
subscriberExecutor.execBackgroundCommand(command);
}

public void unsubscribe() {
throw new UnsupportedOperationException("[" + deployableClient.getContainerName() + "] Unsubscribe not supported on bundled clients");
}

public int getSubscribedMessages() {
String cmdOutput = subscriberExecutor.getBackgroundCommandData(5);
return parseMessageCount(cmdOutput, CONSUMER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public void subscribe() {
public int getSubscribedMessages() {
String cmdOutput = subscriberExecWatch.getBackgroundCommandData(5);
this.receivedMessages = parseMessages(cmdOutput);
LOGGER.debug("[{}][RX] \n{}", deployableClient.getContainerName(), cmdOutput);
return receivedMessages.size();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ public abstract class MqttClient extends SystemtestClient {
private String keystore;
private String saslMechanism;
private boolean secured;
private Executor subscriberExecWatch;
private Executor subscriberExecutor;
private List<JSONObject> sentMessages;
private List<JSONObject> receivedMessages;
private final String identifier = TestUtils.generateRandomName();
private final String identifierSender = TestUtils.generateRandomName();
private final String identifierReceiver = identifierSender + "receiver";
private String randomContent = null;

String getCliExecutableBase() {
Expand Down Expand Up @@ -117,16 +118,16 @@ private void assignBrokerUri(String brokerUri) {
@Override
public int sendMessages() {
String cmdOutput;
String[] command = constructClientCommand(MessagingClient.SENDER);
String[] command = constructClientCommand(SENDER);
cmdOutput = (String) deployableClient.getExecutor().executeCommand(Constants.DURATION_3_MINUTES, command);
LOGGER.debug("[{}][TX] \n{}", deployableClient.getContainerName(), cmdOutput);
this.sentMessages = parseMessages(cmdOutput);
this.sentMessages = parseMessages(cmdOutput, SENDER);
return sentMessages.size();
}

@Override
public int receiveMessages() {
if (subscriberExecWatch != null) {
if (subscriberExecutor != null) {
// executed client on background
return getSubscribedMessages();
} else {
Expand All @@ -135,7 +136,7 @@ public int receiveMessages() {
String[] command = constructClientCommand(MessagingClient.RECEIVER);
cmdOutput = (String) deployableClient.getExecutor().executeCommand(Constants.DURATION_3_MINUTES, command);
LOGGER.debug("[{}][RX] \n{}", deployableClient.getContainerName(), cmdOutput);
this.receivedMessages = parseMessages(cmdOutput);
this.receivedMessages = parseMessages(cmdOutput, RECEIVER);
return receivedMessages.size();
}
}
Expand All @@ -148,12 +149,22 @@ public void subscribe() {
} else {
command = constructClientCommand(MessagingClient.RECEIVER, 60);
}
subscriberExecWatch = deployableClient.getExecutor();
subscriberExecWatch.execBackgroundCommand(command);
subscriberExecutor = deployableClient.getExecutor();
subscriberExecutor.execBackgroundCommand(command);
LOGGER.debug("[{}][SUBSCRIBE] Sleeping for while to ensure subscriber is connected before moving forward", deployableClient.getContainerName());
TestUtils.threadSleep(Constants.DURATION_5_SECONDS);
}

@Override
public void unsubscribe() {
String[] command = {"sh", "-c",
String.format("for proc in /proc/[0-9]*/cmdline; " +
"do echo $(basename $(dirname $proc)) $(cat $proc | tr \"\\0\" \" \"); done | " +
"grep %s | grep -v grep | cut -d ' ' -f 1 | xargs kill", identifierReceiver)};
deployableClient.getExecutor().executeCommand(command);
LOGGER.debug("[{}][UNSUBSCRIBE] MQTT Client with {}", deployableClient.getContainerName(), identifierReceiver);
}

public String testBroker() {
String cmdOutput;
String[] command = constructClientCommand("test");
Expand Down Expand Up @@ -205,8 +216,8 @@ public boolean compareMessages() {
}

public int getSubscribedMessages() {
String cmdOutput = subscriberExecWatch.getBackgroundCommandData(5);
this.receivedMessages = parseMessages(cmdOutput);
String cmdOutput = subscriberExecutor.getBackgroundCommandData(5);
this.receivedMessages = parseMessages(cmdOutput, RECEIVER);
return receivedMessages.size();
}

Expand All @@ -225,11 +236,11 @@ private String[] constructClientCommandOptions(String clientType) {
// we need to use debug mode, to be able to parse sent/received message
if (clientType.equals(SENDER)) {
options = senderOptions;
options.put("identifier", identifier);
options.put("identifier", identifierSender);
commandBuild.append("publish -d --userProperty id=" + TestUtils.getRandomString(6) + "");
} else if (clientType.equals(RECEIVER)) {
options = receiverOptions;
options.put("identifier", identifier);
options.put("identifier", identifierReceiver);
commandBuild.append("subscribe -d");
} else if (clientType.equals("test")) {
options = testOptions;
Expand All @@ -255,15 +266,18 @@ private String[] constructClientCommand(String clientType, int timeout) {
// cli-hivemq-mqtt-subscribe -t test -h 10.128.3.9 -p 1883 --mqttVersion=5 -d -i=tralal --qos=2
String command;
String client;
String identifier;
if (!destinationAddress.equals(destinationQueue)) {
LOGGER.warn("[{}] MQTT client does not support FQQN (!?) Using provided address instead {}", deployableClient.getContainerName(), destinationAddress);
}
clientDestination = destinationAddress;
if (clientType.equals(SENDER)) {
client = getCliExecutableBase() + "-publish";
randomContent = TestUtils.getRandomString(26);
identifier = identifierSender;
} else if (clientType.equals(RECEIVER)) {
client = getCliExecutableBase() + "-subscribe";
identifier = identifierReceiver;
} else {
throw new ClaireRuntimeException("Unsupported client type!" + clientType);
}
Expand Down Expand Up @@ -297,16 +311,15 @@ private String[] constructClientCommand(String clientType, int timeout) {
return command.split(" ");
}

private List<JSONObject> parseMessages(String output) {
private List<JSONObject> parseMessages(String output, String clientType) {
if (output == null) {
throw new ClaireRuntimeException("Provided unexpected empty/null command output!");
}
List<JSONObject> jsonMessages = new ArrayList<>();
List<String> lines = List.of(output.split("\n"));
List<String> lines = List.of(output.replaceAll("\\s+", " ").split("Client "));
Map<String, String> data = new HashMap<>();
Map<String, String> userProperties = new HashMap<>();
for (String line : lines) {
if (line.contains("sending PUBLISH") || line.contains("received PUBLISH")) {
if (clientType.equals(SENDER) && line.contains("sending PUBLISH") || clientType.equals(RECEIVER) && line.contains("received PUBLISH")) {
String workLine = line.substring(line.indexOf("MqttPublish{") + "MqttPublish{".length(), line.length() - 1);
if (workLine.contains("userProperties=[(")) {
String userProps = workLine.substring(workLine.indexOf("userProperties=[") + "userProperties=[".length(), workLine.lastIndexOf(")]") + 1);
Expand All @@ -322,8 +335,10 @@ private List<JSONObject> parseMessages(String output) {
String[] splitted = item.split("=");
data.put(splitted[0].trim(), splitted[1].trim());
}
String content = line.substring(line.indexOf("('") + 2, line.indexOf("')"));
data.put("content", content);
if (line.contains("content")) {
String content = line.substring(line.indexOf("('") + 2, line.indexOf("')"));
data.put("content", content);
}
jsonMessages.add(new JSONObject(data));
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public abstract class SystemtestClient implements MessagingClient {
protected DeployableClient deployableClient;
protected Map<String, String> senderOptions = null;
protected Map<String, String> receiverOptions = null;
Logger logger = LoggerFactory.getLogger(MessagingClient.class);

public boolean compareMessages(Object sentMessagesObject, Object receivedMessagesObject) {
if (sentMessagesObject == null || receivedMessagesObject == null) {
Expand All @@ -31,7 +32,6 @@ public boolean compareMessages(Object sentMessagesObject, Object receivedMessage

public boolean compareMessages(List<JSONObject> sentMessages, List<JSONObject> receivedMessages) {
// Method compares only number of sent and received messages and real comparison of messageIDs (if is present in other group)
Logger logger = LoggerFactory.getLogger(MessagingClient.class);
if (sentMessages.size() != receivedMessages.size()) {
logger.warn("[{}] Sent {} and received {} messages are not same!", deployableClient.getContainerName(), sentMessages.size(), receivedMessages.size());
return false;
Expand All @@ -58,4 +58,10 @@ public boolean compareMessages(List<JSONObject> sentMessages, List<JSONObject> r
return true;
}
}

@Override
public void unsubscribe() {
logger.warn("[{}] Unsubscribe not supported for systemtests-clients (only mqtt client)", deployableClient.getContainerName());
throw new UnsupportedOperationException("[" + deployableClient.getContainerName() + "] Unsubscribe not supported on systemtest clients clients");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public abstract class StClientDeployment implements KubernetesDeployableClient {
private static final Logger LOGGER = LoggerFactory.getLogger(StClientDeployment.class);
private final String namespace;
static KubeClient kubeClient = ResourceManager.getKubeClient();
private ExecutorOperator executor;
private Pod pod;
private Deployment deployment;

Expand Down Expand Up @@ -60,10 +59,7 @@ public String getContainerName() {

@Override
public Executor getExecutor() {
if (this.executor == null) {
this.executor = new ExecutorOperator(getContainer());
}
return this.executor;
return new ExecutorOperator(getContainer());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,12 @@ public void testMessaging(ClientType clientType, String namespace, Pod brokerPod
} else if (clientType.equals(ClientType.BUNDLED_CORE)) {
messagingClient = ResourceManager.createMessagingClient(ClientType.BUNDLED_CORE, brokerPod, "61616", address, messages, username, password);
} else if (clientType.equals(ClientType.ST_AMQP_QPID_JMS)) {
clients = ResourceManager.deployClientsContainer(namespace);
Pod clientsPod = getClient().getFirstPodByPrefixName(namespace, Constants.PREFIX_SYSTEMTESTS_CLIENTS);
messagingClient = ResourceManager.createMessagingClient(ClientType.BUNDLED_AMQP, clientsPod, brokerPod.getStatus().getPodIP(), "5672", address, messages, username, password);
if (clientsPod == null) {
clients = ResourceManager.deployClientsContainer(namespace);
clientsPod = getClient().getFirstPodByPrefixName(namespace, Constants.PREFIX_SYSTEMTESTS_CLIENTS);
}
messagingClient = ResourceManager.createMessagingClient(ClientType.ST_AMQP_QPID_JMS, clientsPod, brokerPod.getStatus().getPodIP(), "5672", address, messages, username, password);
} else {
throw new ClaireRuntimeException("Unknown/Unsupported client type!" + clientType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ void sendReceiveAMQPMessageTest() {
}

@Test
@Disabled("MQTT Output Parsing is broken")
@Tag(Constants.TAG_SMOKE)
void sendReceiveSystemTestsClientMessageTest() {
Deployment clients = ResourceManager.deployClientsContainer(testNamespace);
Expand All @@ -147,18 +146,20 @@ void sendReceiveSystemTestsClientMessageTest() {
assertThat(messagingClient.compareMessages(), is(true));

// Subscriber - Publisher
LOGGER.info("[{}] Starting AMQP subscriber - publisher test", testNamespace);
LOGGER.info("[{}] Starting Bundled AMQP subscriber - publisher test", testNamespace);
testMessaging(ClientType.BUNDLED_AMQP, testNamespace, brokerPod, myAddress, 10);

LOGGER.info("[{}] Starting SystemTest clients AMQP subscriber - publisher test", testNamespace);
testMessaging(ClientType.ST_AMQP_QPID_JMS, testNamespace, brokerPod, myAddress, 10);

// MQTT Subscriber - Publisher
LOGGER.info("[{}] Starting MQTT subscriber - publisher test", testNamespace);
LOGGER.info("[{}] Starting SystemTest clients MQTT subscriber - publisher test", testNamespace);
msgsExpected = 1;
// Subscriber - Publisher
MessagingClient messagingMqttClient = ResourceManager.createMessagingClient(ClientType.ST_MQTT_V5, clientsPod,
brokerPod.getStatus().getPodIP(), "5672", myAddress, msgsExpected);
messagingMqttClient.subscribe();
sent = messagingMqttClient.sendMessages();
messagingMqttClient.unsubscribe();
received = messagingMqttClient.receiveMessages();
assertThat(sent, equalTo(msgsExpected));
assertThat(sent, equalTo(received));
Expand Down

0 comments on commit 3073982

Please sign in to comment.