diff --git a/common/src/main/java/io/brokerqe/claire/clients/bundled/BundledArtemisClient.java b/common/src/main/java/io/brokerqe/claire/clients/bundled/BundledArtemisClient.java index 5fd0187c..e53b85cc 100644 --- a/common/src/main/java/io/brokerqe/claire/clients/bundled/BundledArtemisClient.java +++ b/common/src/main/java/io/brokerqe/claire/clients/bundled/BundledArtemisClient.java @@ -74,7 +74,15 @@ public Object executeCommand() { return executeCommand(Constants.DURATION_3_MINUTES); } + public Object executeCommand(boolean disableOutput) { + return executeCommand(Constants.DURATION_3_MINUTES, disableOutput); + } + public Object executeCommand(long maxTimeout) { + return executeCommand(maxTimeout, false); + } + + public Object executeCommand(long maxTimeout, boolean disableOutput) { String cmdOutput; String[] command = constructClientCommand(); cmdOutput = (String) deployableClient.getExecutor().executeCommand(maxTimeout, command); @@ -82,7 +90,9 @@ public Object executeCommand(long maxTimeout) { LOGGER.debug("[PERF] Client detected, to see it's output use trace logging."); LOGGER.trace(cmdOutput); } else { - LOGGER.debug(cmdOutput); + if (!disableOutput) { + LOGGER.debug(cmdOutput); + } } return parseOutput(cmdOutput); } diff --git a/common/src/main/java/io/brokerqe/claire/clients/bundled/BundledClientOptions.java b/common/src/main/java/io/brokerqe/claire/clients/bundled/BundledClientOptions.java index 32daa24a..ab2c6535 100644 --- a/common/src/main/java/io/brokerqe/claire/clients/bundled/BundledClientOptions.java +++ b/common/src/main/java/io/brokerqe/claire/clients/bundled/BundledClientOptions.java @@ -21,6 +21,7 @@ public class BundledClientOptions { Boolean multicast = false; Protocol protocol; int timeout = 90; + public boolean disableOutput; public BundledClientOptions withDeployableClient(DeployableClient deployableClient) { this.deployableClient = deployableClient; @@ -81,6 +82,11 @@ public BundledClientOptions withTimeout(int timeout) { return this; } + public BundledClientOptions withDisabledOutput(boolean disableOutput) { + this.disableOutput = disableOutput; + return this; + } + public DeployableClient getDeployableClient() { return deployableClient; } diff --git a/common/src/main/java/io/brokerqe/claire/clients/bundled/BundledMessagingClient.java b/common/src/main/java/io/brokerqe/claire/clients/bundled/BundledMessagingClient.java index 253da4ca..d2dbe370 100644 --- a/common/src/main/java/io/brokerqe/claire/clients/bundled/BundledMessagingClient.java +++ b/common/src/main/java/io/brokerqe/claire/clients/bundled/BundledMessagingClient.java @@ -35,6 +35,7 @@ public abstract class BundledMessagingClient implements MessagingClient { private int sentMessages = 0; private Executor subscriberExecutor; private int timeout; + private boolean disableOutput; public BundledMessagingClient(BundledClientOptions options) { @@ -50,6 +51,7 @@ public BundledMessagingClient(BundledClientOptions options) { this.persistenceDisabled = options.persistenceDisabled; this.isMulticast = options.multicast; this.timeout = options.timeout; + this.disableOutput = options.disableOutput; } abstract String getProtocol(); @@ -163,7 +165,9 @@ public int receiveMessages(long duration) { String cmdOutput; String[] command = constructClientCommand(CONSUMER); cmdOutput = (String) deployableClient.getExecutor().executeCommand(duration, command); - LOGGER.debug("[{}] {}", deployableClient.getContainerName(), cmdOutput); + if (!disableOutput) { + LOGGER.debug("[{}] {}", deployableClient.getContainerName(), cmdOutput); + } return parseMessageCount(cmdOutput, CONSUMER); } } diff --git a/operator-suite/src/main/java/io/brokerqe/claire/ResourceManager.java b/operator-suite/src/main/java/io/brokerqe/claire/ResourceManager.java index 6ecee52a..96b18066 100644 --- a/operator-suite/src/main/java/io/brokerqe/claire/ResourceManager.java +++ b/operator-suite/src/main/java/io/brokerqe/claire/ResourceManager.java @@ -847,7 +847,8 @@ public static MessagingClient createMessagingClient(ClientType clientType, Pod e .withPersistenceDisabled(persistenceDisabled) .withDestinationQueue(queue) .withDestinationUrl(serviceUrl) - .withMulticast(multicast); + .withMulticast(multicast) + .withDisabledOutput(true); if (clientType.equals(ClientType.BUNDLED_AMQP)) { messagingClient = new BundledAmqpMessagingClient(options); } else if (clientType.equals(ClientType.BUNDLED_CORE)) { diff --git a/operator-suite/src/test/java/io/brokerqe/claire/AbstractSystemTests.java b/operator-suite/src/test/java/io/brokerqe/claire/AbstractSystemTests.java index b955a88f..dd3f482e 100644 --- a/operator-suite/src/test/java/io/brokerqe/claire/AbstractSystemTests.java +++ b/operator-suite/src/test/java/io/brokerqe/claire/AbstractSystemTests.java @@ -618,7 +618,6 @@ public Map browseMessages(Pod brokerPod, String fqqn, String us public Map browseMessages(Pod brokerPod, String fqqn, int messageCount, String username, String password) { String namespace = brokerPod.getMetadata().getNamespace(); - LOGGER.debug("[{}] Browse messages in {} on {}", namespace, fqqn, brokerPod.getMetadata().getName()); Map commandOptions = new HashMap<>(Map.of( "destination", fqqn, "url", "tcp://" + brokerPod.getMetadata().getName() + ":61616" @@ -630,7 +629,7 @@ public Map browseMessages(Pod brokerPod, String fqqn, int messa DeployableClient deployableClient = new BundledClientDeployment(brokerPod.getMetadata().getNamespace(), brokerPod); BundledArtemisClient browserClient = new BundledArtemisClient(deployableClient, ArtemisCommand.BROWSE_CLIENT, username, password, commandOptions); - Map browsedMessages = (Map) browserClient.executeCommand(); + Map browsedMessages = (Map) browserClient.executeCommand(true); LOGGER.debug("[{}] Browsed {} messages from {} on {}", namespace, browsedMessages.get(-1), fqqn, brokerPod.getMetadata().getName()); return browsedMessages; } diff --git a/operator-suite/src/test/java/io/brokerqe/claire/federation/MirroringUnsecuredTests.java b/operator-suite/src/test/java/io/brokerqe/claire/federation/MirroringUnsecuredTests.java index 732053a0..7447b6d2 100644 --- a/operator-suite/src/test/java/io/brokerqe/claire/federation/MirroringUnsecuredTests.java +++ b/operator-suite/src/test/java/io/brokerqe/claire/federation/MirroringUnsecuredTests.java @@ -4,7 +4,6 @@ */ package io.brokerqe.claire.federation; -import io.amq.broker.v1beta1.ActiveMQArtemis; import io.amq.broker.v1beta1.ActiveMQArtemisBuilder; import io.brokerqe.claire.ArtemisConstants; import io.brokerqe.claire.ArtemisVersion; @@ -15,6 +14,8 @@ import io.brokerqe.claire.clients.MessagingClient; import io.brokerqe.claire.junit.TestValidSince; import io.fabric8.kubernetes.api.model.Pod; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -32,6 +33,27 @@ @TestValidSince(ArtemisVersion.VERSION_2_33) public class MirroringUnsecuredTests extends MirroringTests { + List prodBrokerPods; + List drBrokerPods; + Pod prodBrokerPod; + Pod drBrokerPod; + private int totalReceivedA = 0; + private int totalReceivedB = 0; + private int totalSentA = 0; + private int totalSentB = 0; + + @BeforeEach + void resetCounters() { + totalReceivedA = 0; + totalReceivedB = 0; + totalSentA = 0; + totalSentB = 0; + prodBrokerPods = null; + drBrokerPods = null; + prodBrokerPod = null; + drBrokerPod = null; + } + void setupDeployment(int size) { getClient().createSecretEncodedData(prodNamespace, LOGGER_SECRET_NAME, Map.of(ArtemisConstants.LOGGING_PROPERTIES_CONFIG_KEY, TestUtils.getFileContentAsBase64(DEBUG_LOG_FILE)), true); getClient().createSecretEncodedData(drNamespace, LOGGER_SECRET_NAME, Map.of(ArtemisConstants.LOGGING_PROPERTIES_CONFIG_KEY, TestUtils.getFileContentAsBase64(DEBUG_LOG_FILE)), true); @@ -58,7 +80,7 @@ void setupDeployment(int size) { .editOrNewSpec() .withNewDeploymentPlan() .withSize(size) - .withClustered(true) + .withClustered(false) // federation + mirring does not need clustered .withPersistenceEnabled(true) .withMessageMigration(true) .withManagementRBACEnabled(true) @@ -66,7 +88,6 @@ void setupDeployment(int size) { .withJournalType("aio") .withEnableMetricsPlugin(true) .withJolokiaAgentEnabled(true) - .withClustered(true) .editOrNewExtraMounts() .withSecrets(LOGGER_SECRET_NAME) .endExtraMounts() @@ -137,7 +158,7 @@ void setupDeployment(int size) { .editOrNewSpec() .withNewDeploymentPlan() .withSize(size) - .withClustered(true) + .withClustered(false) // federation + mirroring does not need clustered .withPersistenceEnabled(true) .withMessageMigration(true) .withManagementRBACEnabled(true) @@ -145,7 +166,6 @@ void setupDeployment(int size) { .withJournalType("aio") .withEnableMetricsPlugin(true) .withJolokiaAgentEnabled(true) - .withClustered(true) .editOrNewExtraMounts() .withSecrets(LOGGER_SECRET_NAME) .endExtraMounts() @@ -234,8 +254,9 @@ void simpleMirroringTest() { prodBroker = ResourceManager.addToBrokerProperties(prodBroker, deployAddresses, false); // make sure given addresses are present on DR broker - ActiveMQArtemis brk = ResourceManager.getArtemisClient().inNamespace(prodNamespace).resource(prodBroker).get(); - brk.getStatus().getConditions().contains(ArtemisConstants.CONDITION_TYPE_BROKER_PROPERTIES_APPLIED); + // TODO: make use of CONDITION_TYPE_BROKER_PROPERTIES_APPLIED? +// ActiveMQArtemis brk = ResourceManager.getArtemisClient().inNamespace(prodNamespace).resource(prodBroker).get(); +// assertTrue(brk.getStatus().getConditions().contains(ArtemisConstants.CONDITION_TYPE_BROKER_PROPERTIES_APPLIED)); checkMessageCount(prodNamespace, prodBrokerPod, "my-deletion1", 0, ADMIN, ADMIN_PASS); checkMessageCount(drNamespace, drBrokerPod, "my-deletion1", 0, ADMIN, ADMIN_PASS); checkMessageCount(drNamespace, drBrokerPod, "my-deletion2", 0, ADMIN, ADMIN_PASS); @@ -338,6 +359,7 @@ void addressFilteringTest() { } @Test + @Disabled("ENTMQBR-9474") void scaleUpDownTest() { setupDeployment(1); int scaleUpSize = 4; @@ -347,14 +369,14 @@ void scaleUpDownTest() { drBroker = doArtemisScale(drNamespace, drBroker, 1, scaleUpSize); // Send few messages & do checks - int initialCountA = 2; // 200 - int initialCountB = 1; // 50 + int initialCountA = 200; + int initialCountB = 50; int scaleupCountA = 150; int scaleupCountB = 40; - List prodBrokerPods = getClient().listPodsByPrefixName(prodNamespace, prodBroker.getMetadata().getName()); - List drBrokerPods = getClient().listPodsByPrefixName(drNamespace, drBroker.getMetadata().getName()); + prodBrokerPods = getClient().listPodsByPrefixName(prodNamespace, prodBroker.getMetadata().getName()); + drBrokerPods = getClient().listPodsByPrefixName(drNamespace, drBroker.getMetadata().getName()); // Send messages int brokerCount = getClient().listPodsByPrefixName(prodNamespace, prodBroker.getMetadata().getName()).size(); @@ -362,58 +384,62 @@ void scaleUpDownTest() { Pod prodBrokerPodI = prodBrokerPods.get(i); Pod drBrokerPodI = drBrokerPods.get(i); - LOGGER.info("[{}] Send {} messages to {}", prodNamespace, initialCountA, prodBrokerPodI.getMetadata().getName()); + LOGGER.info("[{}] Send {} messages to {}/{}", prodNamespace, initialCountA, prodBrokerPodI.getMetadata().getName(), addressA); MessagingClient prodClientA = ResourceManager.createMessagingClient(ClientType.BUNDLED_CORE, prodBrokerPodI, allDefaultPort, addressA, initialCountA, ADMIN, ADMIN_PASS); - int sent0 = prodClientA.sendMessages(); - assertThat("Sent different amount of messages than expected", sent0, equalTo(initialCountA)); + int sent0A = prodClientA.sendMessages(); + assertThat("Sent different amount of messages than expected", sent0A, equalTo(initialCountA)); + totalSentA += sent0A; - LOGGER.info("[{}] Send {} messages to {}", prodNamespace, initialCountB, prodBrokerPodI.getMetadata().getName()); + LOGGER.info("[{}] Send {} messages to {}/{}", prodNamespace, initialCountB, prodBrokerPodI.getMetadata().getName(), addressB); MessagingClient prodClientB = ResourceManager.createMessagingClient(ClientType.BUNDLED_CORE, prodBrokerPodI, allDefaultPort, addressB, initialCountB, ADMIN, ADMIN_PASS); int sent0B = prodClientB.sendMessages(); assertThat("Sent different amount of messages than expected", sent0B, equalTo(initialCountB)); + totalSentB += sent0B; } - Pod prodBrokerPod = prodBrokerPods.get(0); - Pod drBrokerPod = drBrokerPods.get(0); - - checkClusteredMessageCount(prodBroker, drBroker, addressA, initialCountA * scaleUpSize, ADMIN, ADMIN_PASS); - checkClusteredMessageCount(prodBroker, drBroker, addressB, initialCountB * scaleUpSize, ADMIN, ADMIN_PASS); -// -// // Receive messages -// LOGGER.info("[{}] Receive {} messages from {}", drNamespace, initialCountA, prodBrokerPod.getMetadata().getName()); -// MessagingClient prodClientA1 = ResourceManager.createMessagingClient(ClientType.BUNDLED_CORE, prodBrokerPod, allDefaultPort, addressA, initialCountA - scaleupCountA, ADMIN, ADMIN_PASS); -// int receivedA1 = prodClientA1.receiveMessages(); -// assertThat("Received different amount of messages than expected", receivedA1, equalTo(initialCountA - scaleupCountA)); -//// assertThat("Sent & received different amount of messages than expected", sent0, equalTo(received1)); -// -// LOGGER.info("[{}] Receive {} messages from {}", drNamespace, initialCountA, prodBrokerPod.getMetadata().getName()); -// MessagingClient prodClientB1 = ResourceManager.createMessagingClient(ClientType.BUNDLED_CORE, prodBrokerPod, allDefaultPort, addressB, initialCountB - scaleupCountB, ADMIN, ADMIN_PASS); -// int receivedB1 = prodClientB1.receiveMessages(); -// assertThat("Received different amount of messages than expected", receivedB1, equalTo(initialCountB - scaleupCountB)); -// -// getQueueStats(prodNamespace, prodBrokerPod, null, addressABPrefix, ADMIN, ADMIN_PASS, true); -// getQueueStats(drNamespace, drBrokerPod, null, addressABPrefix, ADMIN, ADMIN_PASS, true); -// checkClusteredMessageCount(prodBroker, drBroker, addressA, initialCountA * scaleUpSize - 50, ADMIN, ADMIN_PASS); -// checkClusteredMessageCount(prodBroker, drBroker, addressB, initialCountB * scaleUpSize - 10, ADMIN, ADMIN_PASS); - - // TODO scaledown does not work properly? // Scale down to 2 - getQueueStats(prodNamespace, prodBrokerPod, null, addressABPrefix, ADMIN, ADMIN_PASS, true); - getQueueStats(drNamespace, drBrokerPod, null, addressABPrefix, ADMIN, ADMIN_PASS, true); + updateCheckDeployment(initialCountA * scaleUpSize, initialCountB * scaleUpSize); + receiveMessagesAB(prodBrokerPod, initialCountA - scaleupCountA, initialCountB - scaleupCountB); + + // Scale down to 2 prodBroker = doArtemisScale(prodNamespace, prodBroker, scaleUpSize, scaleDownSize, true); drBroker = doArtemisScale(drNamespace, drBroker, scaleUpSize, scaleDownSize, true); + // BUG: fails expected 750, got 400 messages. Message migration is not working with clustered -> false + updateCheckDeployment(initialCountA * scaleUpSize - 50, initialCountB * scaleUpSize - 10); + receiveMessagesAB(prodBrokerPod, 100, 30); - // Refresh variables - prodBrokerPods = getClient().listPodsByPrefixName(prodNamespace, prodBroker.getMetadata().getName()); - drBrokerPods = getClient().listPodsByPrefixName(drNamespace, drBroker.getMetadata().getName()); - prodBrokerPod = prodBrokerPods.get(0); - drBrokerPod = drBrokerPods.get(0); + // Scale down to 1 + prodBroker = doArtemisScale(prodNamespace, prodBroker, scaleDownSize, 1); + drBroker = doArtemisScale(drNamespace, drBroker, scaleDownSize, 1); + updateCheckDeployment(initialCountA * scaleUpSize, initialCountB * scaleUpSize); + receiveMessagesAB(prodBrokerPod, 50, 10); - getQueueStats(prodNamespace, prodBrokerPod, null, addressABPrefix, ADMIN, ADMIN_PASS, true); - getQueueStats(drNamespace, drBrokerPod, null, addressABPrefix, ADMIN, ADMIN_PASS, true); -// waitForScaleDownDrainer(testNamespace, operator.getOperatorName(), brokerName, Constants.DURATION_2_MINUTES, initialSize, scaledDownSize); - // Send few messages & do checks -// checkClusteredMessageCount(prodBroker, drBroker, addressA, initialCountA * scaleUpSize, ADMIN, ADMIN_PASS); -// checkClusteredMessageCount(prodBroker, drBroker, addressB, initialCountB * scaleUpSize, ADMIN, ADMIN_PASS); + assertThat("Total sentA != totalReceivedA!", totalSentA, equalTo(totalReceivedA)); + assertThat("Total sentB != totalReceivedB!", totalSentB, equalTo(totalReceivedB)); teardownDeployment(false); } + + private void updateCheckDeployment(int expectedA, int expectedB) { + prodBrokerPods = getClient().listPodsByPrefixName(prodNamespace, prodBroker.getMetadata().getName()); + drBrokerPods = getClient().listPodsByPrefixName(drNamespace, drBroker.getMetadata().getName()); + prodBrokerPod = prodBrokerPods.get(prodBrokerPods.size() - 1); + drBrokerPod = drBrokerPods.get(drBrokerPods.size() - 1); + Map> statsProd = getQueueStats(prodNamespace, prodBrokerPod, null, addressABPrefix, ADMIN, ADMIN_PASS, true); + Map> statsDr = getQueueStats(drNamespace, drBrokerPod, null, addressABPrefix, ADMIN, ADMIN_PASS, true); + checkClusteredMessageCount(prodBroker, drBroker, addressA, expectedA, ADMIN, ADMIN_PASS); + checkClusteredMessageCount(prodBroker, drBroker, addressB, expectedB, ADMIN, ADMIN_PASS); + } + + private void receiveMessagesAB(Pod prodBrokerPod, int receiveA, int receiveB) { + LOGGER.info("[{}] Receive {} messages from {}/{}", drNamespace, receiveA, prodBrokerPod.getMetadata().getName(), addressA); + MessagingClient prodClientA1 = ResourceManager.createMessagingClient(ClientType.BUNDLED_CORE, prodBrokerPod, allDefaultPort, addressA, receiveA, ADMIN, ADMIN_PASS); + int receivedA1 = prodClientA1.receiveMessages(); + assertThat("Received different amount of messages than expected", receivedA1, equalTo(50)); + totalReceivedA += receivedA1; + + LOGGER.info("[{}] Receive {} messages from {}/{}", drNamespace, receiveB, prodBrokerPod.getMetadata().getName(), addressB); + MessagingClient prodClientB1 = ResourceManager.createMessagingClient(ClientType.BUNDLED_CORE, prodBrokerPod, allDefaultPort, addressB, receiveB, ADMIN, ADMIN_PASS); + int receivedB1 = prodClientB1.receiveMessages(); + assertThat("Received different amount of messages than expected", receivedB1, equalTo(10)); + totalReceivedB += receivedB1; + } }