From 0e792b511ab84054f70cfc351919f9a22ca02d5f Mon Sep 17 00:00:00 2001 From: Ryan Yeats Date: Mon, 9 Sep 2024 16:04:31 -0700 Subject: [PATCH] Add core federation and broker connections to the management console --- .../activemq/artemis/logs/AuditLogger.java | 57 ++++++ .../management/BrokerConnectionControl.java | 66 ++++++ .../core/management/FederationControl.java | 36 ++++ .../FederationRemoteConsumerControl.java | 71 +++++++ .../management/FederationStreamControl.java | 90 +++++++++ .../core/management/ObjectNameBuilder.java | 42 ++++ .../api/core/management/ResourceNames.java | 8 + .../amqp/connect/AMQPBrokerConnection.java | 9 + .../connect/AMQPBrokerConnectionManager.java | 5 +- .../impl/ActiveMQServerControlImpl.java | 2 + .../impl/BrokerConnectionControlImpl.java | 177 ++++++++++++++++ .../impl/FederationControlImpl.java | 110 ++++++++++ .../FederationRemoteConsumerControlImpl.java | 156 ++++++++++++++ .../impl/FederationStreamControlImpl.java | 190 ++++++++++++++++++ .../core/impl/CoreProtocolManager.java | 18 +- .../artemis/core/server/BrokerConnection.java | 5 + .../server/federation/FederatedAbstract.java | 14 ++ .../federation/FederatedQueueConsumer.java | 2 + .../FederatedQueueConsumerImpl.java | 8 + .../core/server/federation/Federation.java | 18 +- .../server/federation/FederationManager.java | 12 +- .../server/management/ManagementService.java | 24 +++ .../impl/ManagementServiceImpl.java | 68 +++++++ .../group/impl/ClusteredResetMockTest.java | 48 +++++ .../management/ActiveMQServerControlTest.java | 6 + 25 files changed, 1226 insertions(+), 16 deletions(-) create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BrokerConnectionControl.java create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationControl.java create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationRemoteConsumerControl.java create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationStreamControl.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerConnectionControlImpl.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationControlImpl.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationRemoteConsumerControlImpl.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationStreamControlImpl.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java index d37927fe1331..315e3db004d8 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java @@ -2744,4 +2744,61 @@ static void getAuthorizationFailureCount(Object source) { @LogMessage(id = 601781, value = "User {} is getting authorization failure count on target resource: {}", level = LogMessage.Level.INFO) void getAuthorizationFailureCount(String user, Object source); + + static void startFederation(Object source) { + BASE_LOGGER.startFederation(getCaller(), source); + } + + @LogMessage(id = 601782, value = "User {} is starting a federation on target resource: {}", level = LogMessage.Level.INFO) + void startFederation(String user, Object source); + + static void stopFederation(Object source) { + BASE_LOGGER.stopFederation(getCaller(), source); + } + + @LogMessage(id = 601783, value = "User {} is stopping a federation on target resource: {}", level = LogMessage.Level.INFO) + void stopFederation(String user, Object source); + + static void isSharedConnection(Object source) { + BASE_LOGGER.isSharedConnection(getCaller(), source); + } + + @LogMessage(id = 601784, value = "User {} is querying isSharedConnection on target resource: {}", level = LogMessage.Level.INFO) + void isSharedConnection(String user, Object source); + + static void isPull(Object source) { + BASE_LOGGER.isPull(getCaller(), source); + } + + @LogMessage(id = 601785, value = "User {} is querying isPull on target resource: {}", level = LogMessage.Level.INFO) + void isPull(String user, Object source); + + static void getPriority(Object source) { + BASE_LOGGER.getPriority(getCaller(), source); + } + + @LogMessage(id = 601786, value = "User {} is getting priority on target resource: {}", level = LogMessage.Level.INFO) + void getPriority(String user, Object source); + + static void isOpen(Object source) { + BASE_LOGGER.isOpen(getCaller(), source); + } + + @LogMessage(id = 601787, value = "User {} is querying isOpen on target resource: {}", level = LogMessage.Level.INFO) + void isOpen(String user, Object source); + + static void getUri(Object source) { + BASE_LOGGER.getUri(getCaller(), source); + } + + @LogMessage(id = 601788, value = "User {} is getting uri on target resource: {}", level = LogMessage.Level.INFO) + void getUri(String user, Object source); + + static void getProtocol(Object source) { + BASE_LOGGER.getProtocol(getCaller(), source); + } + + @LogMessage(id = 601789, value = "User {} is getting protocol on target resource: {}", level = LogMessage.Level.INFO) + void getProtocol(String user, Object source); + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BrokerConnectionControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BrokerConnectionControl.java new file mode 100644 index 000000000000..fe119bb61fd5 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BrokerConnectionControl.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core.management; + +/** + * A BrokerConnectionControl is used to manage a BrokerConnections. + */ +public interface BrokerConnectionControl extends ActiveMQComponentControl { + + /** + * Returns {@code true} if this connection is open, {@code false} else. + */ + @Attribute(desc = "whether this connection is open") + boolean isOpen(); + + /** + * Returns the name of this broker connection + */ + @Attribute(desc = "name of this broker connection") + String getName(); + + /** + * Returns the connection uri for this broker connection. + */ + @Attribute(desc = "connection uri for this broker connection") + String getUri(); + + /** + * Returns the user this broker connection is using. + */ + @Attribute(desc = "the user this broker connection is using") + String getUser(); + + /** + * Returns the protocol this broker connection is using. + */ + @Attribute(desc = "protocol this broker connection is using") + String getProtocol(); + + /** + * Returns the retry interval used by this broker connection. + */ + @Attribute(desc = "retry interval used by this broker connection") + long getRetryInterval(); + + /** + * Returns the number of reconnection attempts used by this broker connection. + */ + @Attribute(desc = "number of reconnection attempts used by this broker connection") + int getReconnectAttempts(); + +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationControl.java new file mode 100644 index 000000000000..64e732477ab1 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationControl.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core.management; + +/** + * A federationControl is used to manage a federation. + */ +public interface FederationControl extends ActiveMQComponentControl { + + /** + * Returns the name of this federation + */ + @Attribute(desc = "name of this federation") + String getName(); + + /** + * Returns the name of the user the federation is associated with. + */ + @Attribute(desc = "name of the user the federation is associated with") + String getUser(); + +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationRemoteConsumerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationRemoteConsumerControl.java new file mode 100644 index 000000000000..46d29cebb570 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationRemoteConsumerControl.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core.management; + +/** + * A BridgeControl is used to manage a federation stream. + */ +public interface FederationRemoteConsumerControl { + + /** + * Returns the name of the queue that is being federated too + */ + @Attribute(desc = "name of the queue that is being federated too") + String getQueueName(); + + /** + * Returns the address this remote consumer will forward messages from. + */ + @Attribute(desc = "address this remote consumer will forward messages from") + String getAddress(); + + /** + * Returns the priority of this remote consumer will consumer messages. + */ + @Attribute(desc = "address this remote consumer will consumer messages") + int getPriority(); + + /** + * Returns the routing type associated with this address. + */ + @Attribute(desc = "routing type for this address") + String getRoutingType(); + + /** + * Returns the filter string associated with this remote consumer. + */ + @Attribute(desc = "filter string associated with this remote consumer") + String getFilterString(); + + /** + * Returns the queue filter string associated with this remote consumer. + */ + @Attribute(desc = "queue filter string associated with this remote consumer") + String getQueueFilterString(); + + /** + * Returns the fully qualified queue name associated with this remote consumer. + */ +// @Attribute(desc = "fully qualified queue name associated with this remote consumer") +// String getFqqn(); + + /** + * Returns the number of messages that have been federated for this address. + */ + @Attribute(desc = "number of messages that have been federated for this address") + long getFederatedMessageCount(); +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationStreamControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationStreamControl.java new file mode 100644 index 000000000000..8faffefef802 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationStreamControl.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core.management; + +/** + * A BridgeControl is used to manage a federation stream. + */ +public interface FederationStreamControl { + + /** + * Returns the name of this federation stream + */ + @Attribute(desc = "name of this federation stream") + String getName(); + + /** + * Returns any list of static connectors used by this federation stream + */ + @Attribute(desc = "list of static connectors used by this federation stream") + String[] getStaticConnectors() throws Exception; + + /** + * Returns the name of the discovery group used by this federation stream. + */ + @Attribute(desc = "name of the discovery group used by this federation stream") + String getDiscoveryGroupName(); + + /** + * Returns the retry interval used by this federation stream. + */ + @Attribute(desc = "retry interval used by this federation stream") + long getRetryInterval(); + + /** + * Returns the retry interval multiplier used by this federation stream. + */ + @Attribute(desc = "retry interval multiplier used by this federation stream") + double getRetryIntervalMultiplier(); + + /** + * Returns the max retry interval used by this federation stream. + */ + @Attribute(desc = "max retry interval used by this federation stream") + long getMaxRetryInterval(); + + /** + * Returns the number of reconnection attempts used by this federation stream. + */ + @Attribute(desc = "number of reconnection attempts used by this federation stream") + int getReconnectAttempts(); + + /** + * Returns {@code true} if steam allows a shared connection, {@code false} else. + */ + @Attribute(desc = "whether this stream will allow the connection to be shared") + boolean isSharedConnection(); + + /** + * Returns {@code true} if this connection is configured to pull, {@code false} else. + */ + @Attribute(desc = "whether this connection is configured to pull") + boolean isPull(); + + /** + * Returns {@code true} the connection is configured for HA, {@code false} else. + */ + @Attribute(desc = "whether this connection is configured for HA") + boolean isHA(); + + /** + * Returns the name of the user the federation is associated with + */ + @Attribute(desc = "name of the user the federation is associated with") + String getUser(); + +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java index 2d103f591685..ab955b5ba198 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java @@ -126,6 +126,15 @@ public ObjectName getBroadcastGroupObjectName(final String name) throws Exceptio return createObjectName("broadcast-group", name); } + /** + * Returns the ObjectName used by BrokerConnectionControl. + * + * @see BrokerConnectionControl + */ + public ObjectName getBrokerConnectionObjectName(String name) throws Exception { + return createObjectName("broker-connection", name); + } + /** * Returns the ObjectName used by BridgeControl. * @@ -135,6 +144,15 @@ public ObjectName getBridgeObjectName(final String name) throws Exception { return createObjectName("bridge", name); } + /** + * Returns the ObjectName used by FederationControl. + * + * @see FederationControl + */ + public ObjectName getFederationObjectName(String name) throws Exception { + return createObjectName("federation", name); + } + /** * Returns the ObjectName used by ClusterConnectionControl. * @@ -169,4 +187,28 @@ public ObjectName getManagementContextObjectName() throws Exception { public ObjectName getSecurityObjectName() throws Exception { return ObjectName.getInstance("hawtio:type=security,area=jmx,name=ArtemisJMXSecurity"); } + + /** + * Returns the ObjectName used by FederationStreamControl. + * + * @see FederationStreamControl + */ + public ObjectName getFederationStreamObjectName(SimpleString federationName, + SimpleString streamName) throws Exception { + return ObjectName.getInstance(String.format("%s,component=federations,name=%s,streamName=%s", getActiveMQServerName(), ObjectName.quote(federationName.toString()), ObjectName.quote(streamName.toString().toLowerCase()))); + } + + /** + * Returns the ObjectName used by FederationRemoteConsumerControl. + * + * @see FederationRemoteConsumerControl + */ + public ObjectName getFederationRemoteConsumerObjectName(final SimpleString federationName, + final SimpleString streamName, + final SimpleString address, + final SimpleString queueName, + RoutingType routingType) throws Exception { + return ObjectName.getInstance(String.format("%s,component=federations,name=%s,streamName=%s,address=%s,subcomponent=queues,routing-type=%s,queue=%s", getActiveMQServerName(), ObjectName.quote(federationName.toString()), ObjectName.quote(streamName.toString()), ObjectName.quote(address.toString()), ObjectName.quote(routingType.toString().toLowerCase()), ObjectName.quote(queueName.toString()))); + } + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java index e07f99ce4e72..c6f24d4c9b6a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java @@ -34,8 +34,16 @@ public final class ResourceNames { public static final String ADDRESS = "address."; + public static final String BROKER_CONNECTION = "brokerconnection."; + public static final String BRIDGE = "bridge."; + public static final String FEDERATION = "federation."; + + public static final String FEDERATION_STREAM = "federationstream."; + + public static final String FEDERATION_REMOTE_CONSUMER = "federationremoteconsumer."; + public static final String ACCEPTOR = "acceptor."; public static final String DIVERT = "divert."; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java index f47cf26dd367..9ed3edc028e8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java @@ -194,6 +194,15 @@ public AMQPBrokerConnectConfiguration getConfiguration() { return brokerConnectConfiguration; } + @Override + public boolean isOpen() { + if (connection != null) { + return connection.isOpen(); + } else { + return false; + } + } + @Override public boolean isStarted() { return started; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java index 73f77b9cf63e..e8c1d49e995e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java @@ -89,7 +89,7 @@ private void createBrokerConnection(AMQPBrokerConnectConfiguration configuration AMQPBrokerConnection amqpBrokerConnection = new AMQPBrokerConnection(this, configuration, protonProtocolManagerFactory, server); amqpBrokerConnections.put(configuration.getName(), amqpBrokerConnection); server.registerBrokerConnection(amqpBrokerConnection); - + server.getManagementService().registerBrokerConnection(amqpBrokerConnection); if (start) { amqpBrokerConnection.start(); } @@ -142,6 +142,7 @@ public void updateConfiguration(List configurati connection.stop(); } finally { server.unregisterBrokerConnection(connection); + server.getManagementService().unregisterBrokerConnection(connection.getName()); } } @@ -183,6 +184,7 @@ public void updateConfiguration(List configurati connection.stop(); } finally { server.unregisterBrokerConnection(connection); + server.getManagementService().unregisterBrokerConnection(connection.getName()); } } } @@ -211,6 +213,7 @@ public void stop() throws Exception { connection.stop(); } finally { server.unregisterBrokerConnection(connection); + server.getManagementService().unregisterBrokerConnection(connection.getName()); } } } finally { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 3c8e7449a8a1..b15c0f7ef749 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -4018,6 +4018,8 @@ public String listBrokerConnections() { obj.add("name", brokerConnection.getName()); obj.add("protocol", brokerConnection.getProtocol()); obj.add("started", brokerConnection.isStarted()); + obj.add("uri", brokerConnection.getConfiguration().getUri()); + obj.add("open", brokerConnection.isOpen()); connections.add(obj.build()); } return connections.build().toString(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerConnectionControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerConnectionControlImpl.java new file mode 100644 index 000000000000..a3fd0709931b --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerConnectionControlImpl.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.management.impl; + +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanOperationInfo; +import javax.management.NotCompliantMBeanException; + +import org.apache.activemq.artemis.api.core.management.BrokerConnectionControl; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.server.BrokerConnection; +import org.apache.activemq.artemis.logs.AuditLogger; + +public class BrokerConnectionControlImpl extends AbstractControl implements BrokerConnectionControl { + + private final BrokerConnection brokerConnection; + + public BrokerConnectionControlImpl(BrokerConnection brokerConnection, + StorageManager storageManager) throws NotCompliantMBeanException { + super(BrokerConnectionControl.class, storageManager); + this.brokerConnection = brokerConnection; + } + + @Override + public boolean isStarted() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.isStarted(this.brokerConnection); + } + clearIO(); + try { + return brokerConnection.isStarted(); + } finally { + blockOnIO(); + } + } + + @Override + public boolean isOpen() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.isOpen(this.brokerConnection); + } + clearIO(); + try { + return brokerConnection.isOpen(); + } finally { + blockOnIO(); + } + } + + @Override + public void start() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.startBrokerConnection(this.brokerConnection.getName()); + } + clearIO(); + try { + brokerConnection.start(); + } finally { + blockOnIO(); + } + } + + @Override + public void stop() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.startBrokerConnection(this.brokerConnection.getName()); + } + clearIO(); + try { + brokerConnection.stop(); + } finally { + blockOnIO(); + } + } + + @Override + public String getName() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getName(this.brokerConnection); + } + clearIO(); + try { + return brokerConnection.getName(); + } finally { + blockOnIO(); + } + } + + @Override + public String getUri() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getUri(this.brokerConnection); + } + clearIO(); + try { + return brokerConnection.getConfiguration().getUri(); + } finally { + blockOnIO(); + } + } + + @Override + public String getUser() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getUser(this.brokerConnection); + } + clearIO(); + try { + return brokerConnection.getConfiguration().getUser(); + } finally { + blockOnIO(); + } + } + + @Override + public String getProtocol() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getProtocol(this.brokerConnection); + } + clearIO(); + try { + return brokerConnection.getProtocol(); + } finally { + blockOnIO(); + } + } + + @Override + public long getRetryInterval() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getRetryInterval(this.brokerConnection); + } + clearIO(); + try { + return brokerConnection.getConfiguration().getRetryInterval(); + } finally { + blockOnIO(); + } + } + + @Override + public int getReconnectAttempts() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getReconnectAttempts(this.brokerConnection); + } + clearIO(); + try { + return brokerConnection.getConfiguration().getReconnectAttempts(); + } finally { + blockOnIO(); + } + } + + @Override + protected MBeanOperationInfo[] fillMBeanOperationInfo() { + return MBeanInfoHelper.getMBeanOperationsInfo(BrokerConnectionControl.class); + } + + @Override + protected MBeanAttributeInfo[] fillMBeanAttributeInfo() { + return MBeanInfoHelper.getMBeanAttributesInfo(BrokerConnectionControl.class); + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationControlImpl.java new file mode 100644 index 000000000000..75f1d1fdeb56 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationControlImpl.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.management.impl; + +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanOperationInfo; + +import org.apache.activemq.artemis.api.core.management.FederationControl; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.server.federation.Federation; +import org.apache.activemq.artemis.logs.AuditLogger; + +public class FederationControlImpl extends AbstractControl implements FederationControl { + + private final Federation federation; + + public FederationControlImpl(final Federation federation, final StorageManager storageManager) throws Exception { + super(FederationControl.class, storageManager); + this.federation = federation; + } + + @Override + public String getUser() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getUser(this.federation); + } + clearIO(); + try { + return federation.getConfig().getCredentials().getUser(); + } finally { + blockOnIO(); + } + } + + @Override + public String getName() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getName(this.federation); + } + clearIO(); + try { + return federation.getName().toString(); + } finally { + blockOnIO(); + } + } + + @Override + public boolean isStarted() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.isStarted(this.federation); + } + clearIO(); + try { + return federation.isStarted(); + } finally { + blockOnIO(); + } + } + + @Override + public void start() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.startFederation(this.federation); + } + clearIO(); + try { + federation.start(); + } finally { + blockOnIO(); + } + } + + @Override + public void stop() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.stopFederation(this.federation); + } + clearIO(); + try { + federation.stop(); + } finally { + blockOnIO(); + } + } + + @Override + protected MBeanOperationInfo[] fillMBeanOperationInfo() { + return MBeanInfoHelper.getMBeanOperationsInfo(FederationControl.class); + } + + @Override + protected MBeanAttributeInfo[] fillMBeanAttributeInfo() { + return MBeanInfoHelper.getMBeanAttributesInfo(FederationControl.class); + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationRemoteConsumerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationRemoteConsumerControlImpl.java new file mode 100644 index 000000000000..ae6dacb624ca --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationRemoteConsumerControlImpl.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.management.impl; + +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanOperationInfo; + +import org.apache.activemq.artemis.api.core.management.FederationRemoteConsumerControl; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.server.federation.FederatedConsumerKey; +import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer; +import org.apache.activemq.artemis.logs.AuditLogger; + +public class FederationRemoteConsumerControlImpl extends AbstractControl implements FederationRemoteConsumerControl { + + private final FederatedConsumerKey federatedConsumerKey; + private final FederatedQueueConsumer federatedConsumer; + + public FederationRemoteConsumerControlImpl(final FederatedQueueConsumer federatedConsumer, + final StorageManager storageManager) throws Exception { + super(FederationRemoteConsumerControl.class, storageManager); + this.federatedConsumerKey = federatedConsumer.getKey(); + this.federatedConsumer = federatedConsumer; + } + + @Override + public String getAddress() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getAddress(federatedConsumer); + } + clearIO(); + try { + return federatedConsumerKey.getAddress().toString(); + } finally { + blockOnIO(); + } + } + + @Override + public int getPriority() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getPriority(federatedConsumer); + } + clearIO(); + try { + return federatedConsumerKey.getPriority(); + } finally { + blockOnIO(); + } + } + + @Override + public String getRoutingType() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getRoutingType(federatedConsumer); + } + clearIO(); + try { + return federatedConsumerKey.getRoutingType().toString(); + } finally { + blockOnIO(); + } + } + + @Override + public String getFilterString() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getFilterString(federatedConsumer); + } + clearIO(); + try { + return federatedConsumerKey.getFilterString().toString(); + } finally { + blockOnIO(); + } + } + + @Override + public String getQueueName() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getQueueName(federatedConsumer); + } + clearIO(); + try { + return federatedConsumerKey.getQueueName().toString(); + } finally { + blockOnIO(); + } + } + + @Override + public String getQueueFilterString() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getFilterString(federatedConsumer); + } + clearIO(); + try { + return federatedConsumerKey.getQueueFilterString().toString(); + } finally { + blockOnIO(); + } + } + + /* + @Override + public String getFqqn() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getFqqn(federatedConsumer); + } + clearIO(); + try { + return federatedConsumerKey.getFqqn().toString(); + } finally { + blockOnIO(); + } + } +*/ + + @Override + protected MBeanOperationInfo[] fillMBeanOperationInfo() { + return MBeanInfoHelper.getMBeanOperationsInfo(FederationRemoteConsumerControl.class); + } + + @Override + protected MBeanAttributeInfo[] fillMBeanAttributeInfo() { + return MBeanInfoHelper.getMBeanAttributesInfo(FederationRemoteConsumerControl.class); + } + + @Override + public long getFederatedMessageCount() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getMessageCount(federatedConsumer); + } + clearIO(); + try { + return federatedConsumer.federatedMessageCount(); + } finally { + blockOnIO(); + } + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationStreamControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationStreamControlImpl.java new file mode 100644 index 000000000000..6ef1d0695b64 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationStreamControlImpl.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.management.impl; + +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanOperationInfo; + +import org.apache.activemq.artemis.api.core.management.FederationStreamControl; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.server.federation.FederationStream; +import org.apache.activemq.artemis.logs.AuditLogger; + +public class FederationStreamControlImpl extends AbstractControl implements FederationStreamControl { + + private final FederationStream federationStream; + + public FederationStreamControlImpl(final FederationStream federationStream, + final StorageManager storageManager) throws Exception { + super(FederationStreamControl.class, storageManager); + this.federationStream = federationStream; + } + + @Override + public String getUser() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getUser(this.federationStream); + } + clearIO(); + try { + return federationStream.getConfig().getConnectionConfiguration().getUsername(); + } finally { + blockOnIO(); + } + } + + @Override + public String getName() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getName(this.federationStream); + } + clearIO(); + try { + return federationStream.getName().toString(); + } finally { + blockOnIO(); + } + } + + @Override + public String[] getStaticConnectors() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getStaticConnectors(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().getStaticConnectors().toArray(new String[0]); + } finally { + blockOnIO(); + } + } + + @Override + public String getDiscoveryGroupName() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getDiscoveryGroupName(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().getDiscoveryGroupName(); + } finally { + blockOnIO(); + } + } + + @Override + public long getRetryInterval() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getRetryInterval(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().getRetryInterval(); + } finally { + blockOnIO(); + } + } + + @Override + public double getRetryIntervalMultiplier() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getRetryIntervalMultiplier(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().getRetryIntervalMultiplier(); + } finally { + blockOnIO(); + } + } + + @Override + public long getMaxRetryInterval() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getMaxRetryInterval(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().getMaxRetryInterval(); + } finally { + blockOnIO(); + } + } + + @Override + public int getReconnectAttempts() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getReconnectAttempts(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().getReconnectAttempts(); + } finally { + blockOnIO(); + } + } + + @Override + public boolean isSharedConnection() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.isSharedConnection(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().isSharedConnection(); + } finally { + blockOnIO(); + } + } + + @Override + public boolean isPull() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.isPull(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().isPull(); + } finally { + blockOnIO(); + } + } + + @Override + public boolean isHA() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.isHA(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().isHA(); + } finally { + blockOnIO(); + } + } + + @Override + protected MBeanOperationInfo[] fillMBeanOperationInfo() { + return MBeanInfoHelper.getMBeanOperationsInfo(FederationStreamControl.class); + } + + @Override + protected MBeanAttributeInfo[] fillMBeanAttributeInfo() { + return MBeanInfoHelper.getMBeanAttributesInfo(FederationStreamControl.class); + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java index 9f155025b602..08b626861907 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java @@ -477,19 +477,31 @@ public String toString() { //Register close and failure listeners, if the initial downstream connection goes down then we //want to terminate the upstream connection rc.addCloseListener(() -> { - server.getFederationManager().undeploy(config.getName()); + try { + server.getFederationManager().undeploy(config.getName()); + } catch (Throwable ignored) { + logger.debug(ignored.getMessage(), ignored); + } }); rc.addFailureListener(new FailureListener() { @Override public void connectionFailed(ActiveMQException exception, boolean failedOver) { - server.getFederationManager().undeploy(config.getName()); + try { + server.getFederationManager().undeploy(config.getName()); + } catch (Throwable ignored) { + logger.debug(ignored.getMessage(), ignored); + } } @Override public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) { - server.getFederationManager().undeploy(config.getName()); + try { + server.getFederationManager().undeploy(config.getName()); + } catch (Throwable ignored) { + logger.debug(ignored.getMessage(), ignored); + } } }); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java index 7251eaad23eb..28c57a3ace21 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java @@ -35,4 +35,9 @@ public interface BrokerConnection extends ActiveMQComponent { */ BrokerConnectConfiguration getConfiguration(); + /** + * @return if the connection is currently open. + */ + boolean isOpen(); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedAbstract.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedAbstract.java index 8cb020b87674..5d7b71ac9194 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedAbstract.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedAbstract.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.server.federation; +import java.lang.invoke.MethodHandles; import java.util.HashMap; import java.util.Map; @@ -27,8 +28,11 @@ import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl.ClientSessionCallback; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; import org.apache.activemq.artemis.core.server.transformer.Transformer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class FederatedAbstract implements ActiveMQServerBasePlugin { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final WildcardConfiguration DEFAULT_WILDCARD_CONFIGURATION = new WildcardConfiguration(); protected final Federation federation; @@ -115,6 +119,11 @@ public synchronized void createRemoteConsumer(FederatedConsumerKey key, Transfor } } remoteQueueConsumer = new FederatedQueueConsumerImpl(federation, server, transformer, key, upstream, callback); + try { + server.getManagementService().registerFederationRemoteConsumer(remoteQueueConsumer); + } catch (Throwable ignored) { + logger.debug(ignored.getMessage(), ignored); + } remoteQueueConsumer.start(); remoteQueueConsumers.put(key, remoteQueueConsumer); @@ -147,6 +156,11 @@ public synchronized void removeRemoteConsumer(FederatedConsumerKey key) { if (remoteQueueConsumer.decrementCount() <= 0) { remoteQueueConsumer.close(); remoteQueueConsumers.remove(key); + try { + server.getManagementService().unregisterFederationRemoteConsumer(federation.getName(), upstream.getName(), key.getAddress(), key.getQueueName(), key.getRoutingType()); + } catch (Throwable ignored) { + logger.debug(ignored.getMessage(), ignored); + } } if (server.hasBrokerFederationPlugins()) { try { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumer.java index d9d945a95e74..8580efbd0815 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumer.java @@ -52,4 +52,6 @@ static int getNextDelay(int delay, int delayMultiplier, int delayMax) { void start(); void close(); + + long federatedMessageCount(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java index 2b854d9c5a1d..a1b7ad054578 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java @@ -22,6 +22,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; @@ -65,6 +66,7 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi private ClientSessionFactoryInternal clientSessionFactory; private ClientSession clientSession; private ClientConsumerInternal clientConsumer; + private final AtomicLong federatedMessageCount = new AtomicLong(); private final AtomicInteger pendingPullCredit = new AtomicInteger(); private QueueHandle queueHandle; @@ -246,6 +248,11 @@ public synchronized void close() { } } + @Override + public long federatedMessageCount() { + return federatedMessageCount.get(); + } + private void scheduleDisconnect(int delay) { scheduledExecutorService.schedule(() -> { try { @@ -304,6 +311,7 @@ public void onMessage(ClientMessage clientMessage) { message = transformer == null ? message : transformer.transform(message); if (message != null) { server.getPostOffice().route(message, true); + federatedMessageCount.incrementAndGet(); } clientMessage.acknowledge(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/Federation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/Federation.java index e9b974ccca95..05401200fbe7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/Federation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/Federation.java @@ -62,7 +62,7 @@ public Federation(final ActiveMQServer server, final FederationConfiguration con this.name = SimpleString.of(config.getName()); } - public synchronized void start() throws ActiveMQException { + public synchronized void start() throws Exception { if (state == FederationManager.State.STARTED) return; deploy(); for (FederationUpstream connection : upstreams.values()) { @@ -89,7 +89,7 @@ public synchronized void stop() { state = FederationManager.State.STOPPED; } - public synchronized void deploy() throws ActiveMQException { + public synchronized void deploy() throws Exception { for (FederationUpstreamConfiguration upstreamConfiguration : config.getUpstreamConfigurations()) { deploy(upstreamConfiguration, config.getFederationPolicyMap()); } @@ -105,20 +105,22 @@ public boolean isStarted() { return state == FederationManager.State.STARTED; } - public synchronized boolean undeploy(String name) { + public synchronized boolean undeploy(String name) throws Exception { FederationUpstream federationConnection = upstreams.remove(name); if (federationConnection != null) { federationConnection.stop(); + server.getManagementService().unregisterFederationStream(federationConnection.getFederation().getName(),federationConnection.getName()); } FederationDownstream federationConnectionDownstream = downstreams.remove(name); if (federationConnectionDownstream != null) { federationConnectionDownstream.undeploy(); federationConnectionDownstream.stop(); + server.getManagementService().unregisterFederationStream(federationConnectionDownstream.getFederation().getName(),federationConnectionDownstream.getName()); } return true; } - public synchronized boolean deploy(FederationUpstreamConfiguration upstreamConfiguration, Map federationPolicyMap) throws ActiveMQException { + public synchronized boolean deploy(FederationUpstreamConfiguration upstreamConfiguration, Map federationPolicyMap) throws Exception { String name = upstreamConfiguration.getName(); FederationUpstream upstream = upstreams.get(name); @@ -135,8 +137,9 @@ public synchronized boolean deploy(FederationUpstreamConfiguration upstreamConfi return true; } - private synchronized FederationUpstream deploy(String name, FederationUpstreamConfiguration upstreamConfiguration) { + private synchronized FederationUpstream deploy(String name, FederationUpstreamConfiguration upstreamConfiguration) throws Exception { FederationUpstream upstream = new FederationUpstream(server, this, name, upstreamConfiguration); + server.getManagementService().registerFederationStream(upstream); upstreams.put(name, upstream); if (state == FederationManager.State.STARTED) { upstream.start(); @@ -144,7 +147,7 @@ private synchronized FederationUpstream deploy(String name, FederationUpstreamCo return upstream; } - public synchronized boolean deploy(FederationDownstreamConfiguration downstreamConfiguration, Map federationPolicyMap) throws ActiveMQException { + public synchronized boolean deploy(FederationDownstreamConfiguration downstreamConfiguration, Map federationPolicyMap) throws Exception { String name = downstreamConfiguration.getName(); FederationDownstream downstream = downstreams.get(name); @@ -161,7 +164,7 @@ public synchronized boolean deploy(FederationDownstreamConfiguration downstreamC return true; } - private synchronized FederationDownstream deploy(String name, FederationDownstreamConfiguration downstreamConfiguration) { + private synchronized FederationDownstream deploy(String name, FederationDownstreamConfiguration downstreamConfiguration) throws Exception { //If we have a matching upstream connection already configured then use it for the initiating downstream connection FederationConnection connection = null; if (downstreamConfiguration.getConnectionConfiguration().isShareConnection()) { @@ -176,6 +179,7 @@ private synchronized FederationDownstream deploy(String name, FederationDownstre } FederationDownstream downstream = new FederationDownstream(server, this, name, downstreamConfiguration, connection); + server.getManagementService().registerFederationStream(downstream); downstreams.put(name, downstream); if (state == FederationManager.State.STARTED) { downstream.start(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java index 9e213590cfb3..49d3b5ce0f20 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java @@ -50,7 +50,7 @@ public FederationManager(final ActiveMQServer server) { } @Override - public synchronized void start() throws ActiveMQException { + public synchronized void start() throws Exception { if (state == State.STARTED) return; deploy(); for (Federation federation : federations.values()) { @@ -77,7 +77,7 @@ public boolean isStarted() { return state == State.STARTED; } - public synchronized void deploy() throws ActiveMQException { + public synchronized void deploy() throws Exception { for (FederationConfiguration federationConfiguration : server.getConfiguration().getFederationConfigurations()) { deploy(federationConfiguration); } @@ -86,17 +86,18 @@ public synchronized void deploy() throws ActiveMQException { } } - public synchronized boolean undeploy(String name) { + public synchronized boolean undeploy(String name) throws Exception { Federation federation = federations.remove(name); if (federation != null) { federation.stop(); } + server.getManagementService().unregisterFederation(federation.getConfig().getName()); return true; } - public synchronized boolean deploy(FederationConfiguration federationConfiguration) throws ActiveMQException { + public synchronized boolean deploy(FederationConfiguration federationConfiguration) throws Exception { Federation federation = federations.get(federationConfiguration.getName()); if (federation == null) { federation = newFederation(federationConfiguration); @@ -108,8 +109,9 @@ public synchronized boolean deploy(FederationConfiguration federationConfigurati return true; } - private synchronized Federation newFederation(FederationConfiguration federationConfiguration) throws ActiveMQException { + private synchronized Federation newFederation(FederationConfiguration federationConfiguration) throws Exception { Federation federation = new Federation(server, federationConfiguration); + server.getManagementService().registerFederation(federation); federations.put(federationConfiguration.getName(), federation); if (state == State.STARTED) { federation.start(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java index 3e543b5b257a..d9716706bbbb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java @@ -41,9 +41,13 @@ import org.apache.activemq.artemis.core.security.SecurityStore; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.BrokerConnection; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; +import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer; +import org.apache.activemq.artemis.core.server.federation.Federation; +import org.apache.activemq.artemis.core.server.federation.FederationStream; import org.apache.activemq.artemis.core.server.routing.ConnectionRouter; import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup; @@ -124,6 +128,22 @@ void registerBroadcastGroup(BroadcastGroup broadcastGroup, void unregisterBridge(String name) throws Exception; + void registerFederation(Federation federation) throws Exception; + + void unregisterFederation(String name) throws Exception; + + void registerFederationStream(FederationStream federationStream) throws Exception; + + void unregisterFederationStream(SimpleString federationName, SimpleString streamName) throws Exception; + + void registerFederationRemoteConsumer(FederatedQueueConsumer federatedQueueConsumer) throws Exception; + + void unregisterFederationRemoteConsumer(SimpleString name, + SimpleString streamName, + SimpleString address, + SimpleString queueName, + RoutingType routingType) throws Exception; + void registerCluster(ClusterConnection cluster, ClusterConnectionConfiguration configuration) throws Exception; void unregisterCluster(String name) throws Exception; @@ -145,4 +165,8 @@ void registerBroadcastGroup(BroadcastGroup broadcastGroup, Object getAttribute(String resourceName, String attribute, SecurityAuth auth); Object invokeOperation(String resourceName, String operation, Object[] params, SecurityAuth auth) throws Exception; + + void registerBrokerConnection(BrokerConnection brokerConnection) throws Exception; + + void unregisterBrokerConnection(String name) throws Exception; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java index 113418a5cc8b..2841aabed4cb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java @@ -52,9 +52,11 @@ import org.apache.activemq.artemis.api.core.management.AddressControl; import org.apache.activemq.artemis.api.core.management.BaseBroadcastGroupControl; import org.apache.activemq.artemis.api.core.management.BridgeControl; +import org.apache.activemq.artemis.api.core.management.BrokerConnectionControl; import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl; import org.apache.activemq.artemis.api.core.management.ConnectionRouterControl; import org.apache.activemq.artemis.api.core.management.DivertControl; +import org.apache.activemq.artemis.api.core.management.FederationControl; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; import org.apache.activemq.artemis.api.core.management.QueueControl; @@ -67,9 +69,13 @@ import org.apache.activemq.artemis.core.management.impl.BaseBroadcastGroupControlImpl; import org.apache.activemq.artemis.core.management.impl.BridgeControlImpl; import org.apache.activemq.artemis.core.management.impl.BroadcastGroupControlImpl; +import org.apache.activemq.artemis.core.management.impl.BrokerConnectionControlImpl; import org.apache.activemq.artemis.core.management.impl.ClusterConnectionControlImpl; import org.apache.activemq.artemis.core.management.impl.ConnectionRouterControlImpl; import org.apache.activemq.artemis.core.management.impl.DivertControlImpl; +import org.apache.activemq.artemis.core.management.impl.FederationControlImpl; +import org.apache.activemq.artemis.core.management.impl.FederationRemoteConsumerControlImpl; +import org.apache.activemq.artemis.core.management.impl.FederationStreamControlImpl; import org.apache.activemq.artemis.core.management.impl.JGroupsChannelBroadcastGroupControlImpl; import org.apache.activemq.artemis.core.management.impl.JGroupsFileBroadcastGroupControlImpl; import org.apache.activemq.artemis.core.management.impl.QueueControlImpl; @@ -88,12 +94,16 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.BrokerConnection; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; +import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer; +import org.apache.activemq.artemis.core.server.federation.Federation; +import org.apache.activemq.artemis.core.server.federation.FederationStream; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback; import org.apache.activemq.artemis.core.server.management.GuardInvocationHandler; @@ -418,6 +428,19 @@ public void unregisterBroadcastGroup(final String name) throws Exception { unregisterFromRegistry(ResourceNames.BROADCAST_GROUP + name); } + @Override + public void registerBrokerConnection(BrokerConnection brokerConnection) throws Exception { + BrokerConnectionControl control = new BrokerConnectionControlImpl(brokerConnection, storageManager); + registerInJMX(objectNameBuilder.getBrokerConnectionObjectName(brokerConnection.getName()), control); + registerInRegistry(ResourceNames.BROKER_CONNECTION + brokerConnection.getName(), control); + } + + @Override + public void unregisterBrokerConnection(String name) throws Exception { + unregisterFromJMX(objectNameBuilder.getBrokerConnectionObjectName(name)); + unregisterFromRegistry(ResourceNames.BROKER_CONNECTION + name); + } + @Override public void registerBridge(final Bridge bridge) throws Exception { bridge.setNotificationService(this); @@ -432,6 +455,51 @@ public void unregisterBridge(final String name) throws Exception { unregisterFromRegistry(ResourceNames.BRIDGE + name); } + @Override + public void registerFederation(final Federation federation) throws Exception { + FederationControl control = new FederationControlImpl(federation, storageManager); + registerInJMX(objectNameBuilder.getFederationObjectName(federation.getConfig().getName()), control); + registerInRegistry(ResourceNames.FEDERATION + federation.getName(), control); + } + + @Override + public void unregisterFederation(final String name) throws Exception { + unregisterFromJMX(objectNameBuilder.getFederationObjectName(name)); + unregisterFromRegistry(ResourceNames.FEDERATION + name); + } + + @Override + public void registerFederationStream(final FederationStream federationStream) throws Exception { + logger.info("Registering federationStream {} for {} with hashcode {}", federationStream.getName(), federationStream.getFederation().getName(), federationStream.hashCode()); + FederationStreamControlImpl control = new FederationStreamControlImpl(federationStream, storageManager); + registerInJMX(objectNameBuilder.getFederationStreamObjectName(federationStream.getFederation().getName(), federationStream.getName()), control); + registerInRegistry(ResourceNames.FEDERATION_STREAM + federationStream.getName(), control); + } + + @Override + public void unregisterFederationStream(final SimpleString federationName, final SimpleString streamName) throws Exception { + unregisterFromJMX(objectNameBuilder.getFederationStreamObjectName(federationName, streamName)); + unregisterFromRegistry(ResourceNames.FEDERATION_STREAM + streamName); + } + + @Override + public void registerFederationRemoteConsumer(final FederatedQueueConsumer federatedQueueConsumer) throws Exception { + FederationRemoteConsumerControlImpl control = new FederationRemoteConsumerControlImpl(federatedQueueConsumer, storageManager); + + registerInJMX(objectNameBuilder.getFederationRemoteConsumerObjectName(federatedQueueConsumer.getFederation().getName(), federatedQueueConsumer.getFederationUpstream().getName(), federatedQueueConsumer.getKey().getAddress(), federatedQueueConsumer.getKey().getQueueName(), federatedQueueConsumer.getKey().getRoutingType()), control); + registerInRegistry(ResourceNames.FEDERATION_REMOTE_CONSUMER + federatedQueueConsumer.getKey().getQueueName(), control); + } + + @Override + public void unregisterFederationRemoteConsumer(final SimpleString name, + final SimpleString streamName, + final SimpleString address, + final SimpleString queueName, + RoutingType routingType) throws Exception { + unregisterFromJMX(objectNameBuilder.getFederationRemoteConsumerObjectName(name, streamName, address, queueName, routingType)); + unregisterFromRegistry(ResourceNames.FEDERATION_REMOTE_CONSUMER + name); + } + @Override public void registerCluster(final ClusterConnection cluster, final ClusterConnectionConfiguration configuration) throws Exception { ClusterConnectionControl control = new ClusterConnectionControlImpl(cluster, storageManager, configuration); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java index e2ffb3f23195..ccde63d13dfa 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java @@ -45,9 +45,13 @@ import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.security.SecurityStore; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.BrokerConnection; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; +import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer; +import org.apache.activemq.artemis.core.server.federation.Federation; +import org.apache.activemq.artemis.core.server.federation.FederationStream; import org.apache.activemq.artemis.core.server.management.GuardInvocationHandler; import org.apache.activemq.artemis.core.server.routing.ConnectionRouter; import org.apache.activemq.artemis.core.server.cluster.Bridge; @@ -316,6 +320,40 @@ public void unregisterBridge(String name) throws Exception { } + @Override + public void registerFederation(Federation federation) { + + } + + @Override + public void unregisterFederation(String name) { + + } + + @Override + public void registerFederationStream(FederationStream federationStream) { + + } + + @Override + public void unregisterFederationStream(SimpleString federationName, SimpleString streamName) { + + } + + @Override + public void registerFederationRemoteConsumer(FederatedQueueConsumer federatedQueueConsumer) { + + } + + @Override + public void unregisterFederationRemoteConsumer(SimpleString name, + SimpleString streamame, + SimpleString address, + SimpleString queueName, + RoutingType routingType) { + + } + @Override public void registerCluster(ClusterConnection cluster, ClusterConnectionConfiguration configuration) throws Exception { @@ -372,6 +410,16 @@ public Object invokeOperation(String resourceName, String operation, Object[] pa return null; } + @Override + public void registerBrokerConnection(BrokerConnection brokerConnection) { + + } + + @Override + public void unregisterBrokerConnection(String name) { + + } + @Override public void start() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java index 1e7f56e339f6..18b4f24510ec 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java @@ -6149,6 +6149,12 @@ public boolean isStarted() { public BrokerConnectConfiguration getConfiguration() { return null; } + + @Override + public boolean isOpen() { + return false; + } + } Fake fake = new Fake("fake" + UUIDGenerator.getInstance().generateStringUUID()); server.registerBrokerConnection(fake);