diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java index 13d78f72c6ef..4c3a21d08966 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java @@ -2744,4 +2744,19 @@ static void getAuthorizationFailureCount(Object source) { @LogMessage(id = 601781, value = "User {} is getting authorization failure count on target resource: {}", level = LogMessage.Level.INFO) void getAuthorizationFailureCount(String user, Object source); + + static void startFederation(Object source) { + BASE_LOGGER.startFederation(getCaller(), source); + } + + @LogMessage(id = 601782, value = "User {} is starting a federation on target resource: {}", level = LogMessage.Level.INFO) + void startFederation(String user, Object source); + + static void stopFederation(Object source) { + BASE_LOGGER.stopFederation(getCaller(), source); + } + + @LogMessage(id = 601783, value = "User {} is stopping a federation on target resource: {}", level = LogMessage.Level.INFO) + void stopFederation(String user, Object source); + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BrokerConnectionControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BrokerConnectionControl.java new file mode 100644 index 000000000000..547854df91fe --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BrokerConnectionControl.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core.management; + + +/** + * A BrokerConnectionControl is used to manage a BrokerConnections. + */ +public interface BrokerConnectionControl extends ActiveMQComponentControl { + + + /** + * Returns {@code true} if this connection is open, {@code false} else. + */ + @Attribute(desc = "whether this connection is open") + boolean isOpen(); + + /** + * Returns the name of this broker connection + */ + @Attribute(desc = "name of this broker connection") + String getName(); + + /** + * Returns the connection uri for this broker connection. + */ + @Attribute(desc = "connection uri for this broker connection") + String getUri(); + + /** + * Returns the user this broker connection is using. + */ + @Attribute(desc = "the user this broker connection is using") + String getUser(); + + /** + * Returns the protocol this broker connection is using. + */ + @Attribute(desc = "protocol this broker connection is using") + String getProtocol(); + + /** + * Returns the retry interval used by this broker connection. + */ + @Attribute(desc = "retry interval used by this broker connection") + long getRetryInterval(); + + /** + * Returns the number of reconnection attempts used by this broker connection. + */ + @Attribute(desc = "number of reconnection attempts used by this broker connection") + int getReconnectAttempts(); + +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationControl.java new file mode 100644 index 000000000000..64e732477ab1 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationControl.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core.management; + +/** + * A federationControl is used to manage a federation. + */ +public interface FederationControl extends ActiveMQComponentControl { + + /** + * Returns the name of this federation + */ + @Attribute(desc = "name of this federation") + String getName(); + + /** + * Returns the name of the user the federation is associated with. + */ + @Attribute(desc = "name of the user the federation is associated with") + String getUser(); + +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationRemoteConsumerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationRemoteConsumerControl.java new file mode 100644 index 000000000000..5ff800b816ea --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationRemoteConsumerControl.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core.management; + +/** + * A BridgeControl is used to manage a federation stream. + */ +public interface FederationRemoteConsumerControl { + + /** + * Returns the name of the queue that is being federated too + */ + @Attribute(desc = "name of the queue that is being federated too") + String getQueueName(); + + /** + * Returns the address this remote consumer will forward messages from. + */ + @Attribute(desc = "address this remote consumer will forward messages from") + String getAddress(); + + + /** + * Returns the priority of this remote consumer will consumer messages. + */ + @Attribute(desc = "address this remote consumer will consumer messages") + int getPriority(); + + /** + * Returns the routing type associated with this address. + * */ + @Attribute(desc = "routing type for this address") + String getRoutingType(); + + /** + * Returns the filter string associated with this remote consumer. + */ + @Attribute(desc = "filter string associated with this remote consumer") + String getFilterString(); + + /** + * Returns the queue filter string associated with this remote consumer. + */ + @Attribute(desc = "queue filter string associated with this remote consumer") + String getQueueFilterString(); + + /** + * Returns the fully qualified queue name associated with this remote consumer. + */ + @Attribute(desc = "fully qualified queue name associated with this remote consumer") + String getFqqn(); + + /** + * Returns the number of messages that have been federated for this address. + */ + @Attribute(desc = "number of messages that have been federated for this address") + long getFederatedMessageCount(); +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationStreamControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationStreamControl.java new file mode 100644 index 000000000000..6b4eb294c3ce --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationStreamControl.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core.management; + +/** + * A BridgeControl is used to manage a federation stream. + */ +public interface FederationStreamControl extends ActiveMQComponentControl { + + /** + * Returns the name of this federation stream + */ + @Attribute(desc = "name of this federation stream") + String getName(); + + /** + * Returns any list of static connectors used by this federation stream + */ + @Attribute(desc = "list of static connectors used by this federation stream") + String[] getStaticConnectors() throws Exception; + + /** + * Returns the name of the discovery group used by this federation stream. + */ + @Attribute(desc = "name of the discovery group used by this federation stream") + String getDiscoveryGroupName(); + + /** + * Returns the retry interval used by this federation stream. + */ + @Attribute(desc = "retry interval used by this federation stream") + long getRetryInterval(); + + /** + * Returns the retry interval multiplier used by this federation stream. + */ + @Attribute(desc = "retry interval multiplier used by this federation stream") + double getRetryIntervalMultiplier(); + + /** + * Returns the max retry interval used by this federation stream. + */ + @Attribute(desc = "max retry interval used by this federation stream") + long getMaxRetryInterval(); + + /** + * Returns the number of reconnection attempts used by this federation stream. + */ + @Attribute(desc = "number of reconnection attempts used by this federation stream") + int getReconnectAttempts(); + + /** + * Returns {@code true} if steam allows a shared connection, {@code false} else. + */ + @Attribute(desc = "whether this stream will allow the connection to be shared") + boolean isSharedConnection(); + + /** + * Returns {@code true} if this connection is configured to pull, {@code false} else. + */ + @Attribute(desc = "whether this connection is configured to pull") + boolean isPull(); + + /** + * Returns {@code true} the connection is configured for HA, {@code false} else. + */ + @Attribute(desc = "whether this connection is configured for HA") + boolean isHA(); + + /** + * Returns the name of the user the federation is associated with + * + */ + @Attribute(desc = "name of the user the federation is associated with") + String getUser(); + +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java index 2d103f591685..79df5f9d0a9c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java @@ -126,6 +126,15 @@ public ObjectName getBroadcastGroupObjectName(final String name) throws Exceptio return createObjectName("broadcast-group", name); } + /** + * Returns the ObjectName used by BrokerConnectionControl. + * + * @see BrokerConnectionControl + */ + public ObjectName getBrokerConnectionObjectName(String name) throws Exception { + return createObjectName("broker-connection", name); + } + /** * Returns the ObjectName used by BridgeControl. * @@ -135,6 +144,15 @@ public ObjectName getBridgeObjectName(final String name) throws Exception { return createObjectName("bridge", name); } + /** + * Returns the ObjectName used by FederationControl. + * + * @see FederationControl + */ + public ObjectName getFederationObjectName(String name) throws Exception { + return createObjectName("federation", name); + } + /** * Returns the ObjectName used by ClusterConnectionControl. * @@ -169,4 +187,23 @@ public ObjectName getManagementContextObjectName() throws Exception { public ObjectName getSecurityObjectName() throws Exception { return ObjectName.getInstance("hawtio:type=security,area=jmx,name=ArtemisJMXSecurity"); } + + /** + * Returns the ObjectName used by FederationStreamControl. + * + * @see FederationStreamControl + */ + public ObjectName getFederationStreamObjectName(SimpleString federationName,SimpleString streamName) throws Exception { + return ObjectName.getInstance(String.format("%s,component=federations,name=%s,streamName=%s", getActiveMQServerName(), ObjectName.quote(federationName.toString()), ObjectName.quote(streamName.toString().toLowerCase()))); + } + + /** + * Returns the ObjectName used by FederationRemoteConsumerControl. + * + * @see FederationRemoteConsumerControl + */ + public ObjectName getFederationRemoteConsumerObjectName( final SimpleString federationName,final SimpleString streamName,final SimpleString address, final SimpleString queueName, RoutingType routingType) throws Exception { + return ObjectName.getInstance(String.format("%s,component=federations,name=%s,streamName=%s,address=%s,subcomponent=queues,routing-type=%s,queue=%s", getActiveMQServerName(),ObjectName.quote(federationName.toString()),ObjectName.quote(streamName.toString()), ObjectName.quote(address.toString()), ObjectName.quote(routingType.toString().toLowerCase()), ObjectName.quote(queueName.toString()))); + } + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java index e07f99ce4e72..c6f24d4c9b6a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java @@ -34,8 +34,16 @@ public final class ResourceNames { public static final String ADDRESS = "address."; + public static final String BROKER_CONNECTION = "brokerconnection."; + public static final String BRIDGE = "bridge."; + public static final String FEDERATION = "federation."; + + public static final String FEDERATION_STREAM = "federationstream."; + + public static final String FEDERATION_REMOTE_CONSUMER = "federationremoteconsumer."; + public static final String ACCEPTOR = "acceptor."; public static final String DIVERT = "divert."; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java index f47cf26dd367..38603bc6a33d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java @@ -194,6 +194,15 @@ public AMQPBrokerConnectConfiguration getConfiguration() { return brokerConnectConfiguration; } + @Override + public boolean isOpen() { + if(connection!=null) { + return connection.isOpen(); + } else { + return false; + } + } + @Override public boolean isStarted() { return started; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java index 73f77b9cf63e..e8c1d49e995e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java @@ -89,7 +89,7 @@ private void createBrokerConnection(AMQPBrokerConnectConfiguration configuration AMQPBrokerConnection amqpBrokerConnection = new AMQPBrokerConnection(this, configuration, protonProtocolManagerFactory, server); amqpBrokerConnections.put(configuration.getName(), amqpBrokerConnection); server.registerBrokerConnection(amqpBrokerConnection); - + server.getManagementService().registerBrokerConnection(amqpBrokerConnection); if (start) { amqpBrokerConnection.start(); } @@ -142,6 +142,7 @@ public void updateConfiguration(List configurati connection.stop(); } finally { server.unregisterBrokerConnection(connection); + server.getManagementService().unregisterBrokerConnection(connection.getName()); } } @@ -183,6 +184,7 @@ public void updateConfiguration(List configurati connection.stop(); } finally { server.unregisterBrokerConnection(connection); + server.getManagementService().unregisterBrokerConnection(connection.getName()); } } } @@ -211,6 +213,7 @@ public void stop() throws Exception { connection.stop(); } finally { server.unregisterBrokerConnection(connection); + server.getManagementService().unregisterBrokerConnection(connection.getName()); } } } finally { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 7076a2273615..6776ce2d66de 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -4000,6 +4000,8 @@ public String listBrokerConnections() { obj.add("name", brokerConnection.getName()); obj.add("protocol", brokerConnection.getProtocol()); obj.add("started", brokerConnection.isStarted()); + obj.add("uri", brokerConnection.getConfiguration().getUri()); + obj.add("open", brokerConnection.isOpen()); connections.add(obj.build()); } return connections.build().toString(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerConnectionControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerConnectionControlImpl.java new file mode 100644 index 000000000000..b02c006f1fc1 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerConnectionControlImpl.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.management.impl; + +import org.apache.activemq.artemis.api.core.management.BrokerConnectionControl; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.server.BrokerConnection; +import org.apache.activemq.artemis.logs.AuditLogger; + +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanOperationInfo; +import javax.management.NotCompliantMBeanException; + +public class BrokerConnectionControlImpl extends AbstractControl implements BrokerConnectionControl { + private final BrokerConnection brokerConnection; + + public BrokerConnectionControlImpl(BrokerConnection brokerConnection, StorageManager storageManager) throws NotCompliantMBeanException { + super(BrokerConnectionControl.class, storageManager); + this.brokerConnection = brokerConnection; + } + + @Override + public boolean isStarted() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.isStarted(this.brokerConnection); + } + clearIO(); + try { + return brokerConnection.isStarted(); + } finally { + blockOnIO(); + } + } + + @Override + public boolean isOpen() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.isStarted(this.brokerConnection); //TODO + } + clearIO(); + try { + return brokerConnection.isOpen(); + } finally { + blockOnIO(); + } + } + + @Override + public void start() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.startBrokerConnection(this.brokerConnection.getName()); + } + clearIO(); + try { + brokerConnection.start(); + } finally { + blockOnIO(); + } + } + + @Override + public void stop() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.startBrokerConnection(this.brokerConnection.getName()); + } + clearIO(); + try { + brokerConnection.stop(); + } finally { + blockOnIO(); + } + } + + @Override + public String getName() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getName(this.brokerConnection); + } + clearIO(); + try { + return brokerConnection.getName(); + } finally { + blockOnIO(); + } + } + + @Override + public String getUri() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getStaticConnectors(this.brokerConnection); //TODO + } + clearIO(); + try { + return brokerConnection.getConfiguration().getUri(); + } finally { + blockOnIO(); + } + } + + @Override + public String getUser() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getUser(this.brokerConnection); + } + clearIO(); + try { + return brokerConnection.getConfiguration().getUser(); + } finally { + blockOnIO(); + } + } + + @Override + public String getProtocol() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getProducerWindowSize(this.brokerConnection);//TODO + } + clearIO(); + try { + return brokerConnection.getProtocol(); + } finally { + blockOnIO(); + } + } + + @Override + public long getRetryInterval() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getRetryInterval(this.brokerConnection); + } + clearIO(); + try { + return brokerConnection.getConfiguration().getRetryInterval(); + } finally { + blockOnIO(); + } + } + + @Override + public int getReconnectAttempts() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getReconnectAttempts(this.brokerConnection); + } + clearIO(); + try { + return brokerConnection.getConfiguration().getReconnectAttempts(); + } finally { + blockOnIO(); + } + } + + @Override + protected MBeanOperationInfo[] fillMBeanOperationInfo() { + return MBeanInfoHelper.getMBeanOperationsInfo(BrokerConnectionControl.class); + } + + @Override + protected MBeanAttributeInfo[] fillMBeanAttributeInfo() { + return MBeanInfoHelper.getMBeanAttributesInfo(BrokerConnectionControl.class); + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationControlImpl.java new file mode 100644 index 000000000000..86760473360b --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationControlImpl.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.management.impl; + + +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanOperationInfo; + +import org.apache.activemq.artemis.api.core.management.FederationControl; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.server.federation.Federation; +import org.apache.activemq.artemis.logs.AuditLogger; + + +public class FederationControlImpl extends AbstractControl implements FederationControl { + private final Federation federation; + + + public FederationControlImpl(final Federation federation, final StorageManager storageManager) throws Exception { + super(FederationControl.class, storageManager); + this.federation = federation; + } + + @Override + public String getUser() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getUser(this.federation); + } + clearIO(); + try { + return federation.getConfig().getCredentials().getUser(); + } finally { + blockOnIO(); + } + } + + @Override + public String getName() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getName(this.federation); + } + clearIO(); + try { + return federation.getName().toString(); + } finally { + blockOnIO(); + } + } + + @Override + public boolean isStarted() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.isStarted(this.federation); + } + clearIO(); + try { + return federation.isStarted(); + } finally { + blockOnIO(); + } + } + + @Override + public void start() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.startFederation(this.federation); + } + clearIO(); + try { + federation.start(); + } finally { + blockOnIO(); + } + } + + @Override + public void stop() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.stopFederation(this.federation); + } + clearIO(); + try { + federation.stop(); + } finally { + blockOnIO(); + } + } + + @Override + protected MBeanOperationInfo[] fillMBeanOperationInfo() { + return MBeanInfoHelper.getMBeanOperationsInfo(FederationControl.class); + } + + @Override + protected MBeanAttributeInfo[] fillMBeanAttributeInfo() { + return MBeanInfoHelper.getMBeanAttributesInfo(FederationControl.class); + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationRemoteConsumerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationRemoteConsumerControlImpl.java new file mode 100644 index 000000000000..fd6944b10a05 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationRemoteConsumerControlImpl.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.management.impl; + + +import org.apache.activemq.artemis.api.core.management.FederationRemoteConsumerControl; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.server.federation.FederatedConsumerKey; +import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer; +import org.apache.activemq.artemis.logs.AuditLogger; + +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanOperationInfo; + +public class FederationRemoteConsumerControlImpl extends AbstractControl implements FederationRemoteConsumerControl { + + private final FederatedConsumerKey federatedConsumerKey; + private final FederatedQueueConsumer federatedConsumer; + + public FederationRemoteConsumerControlImpl(final FederatedQueueConsumer federatedConsumer, final StorageManager storageManager) throws Exception { + super(FederationRemoteConsumerControl.class, storageManager); + this.federatedConsumerKey = federatedConsumer.getKey(); + this.federatedConsumer = federatedConsumer; + } + + + @Override + public String getAddress() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getAddress(federatedConsumer); + } + clearIO(); + try { + return federatedConsumerKey.getAddress().toString(); + } finally { + blockOnIO(); + } + } + + @Override + public int getPriority() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getMessageExpiryThreadPriority(federatedConsumer); //TODO + } + clearIO(); + try { + return federatedConsumerKey.getPriority(); + } finally { + blockOnIO(); + } + } + + @Override + public String getRoutingType() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getRoutingType(federatedConsumer); + } + clearIO(); + try { + return federatedConsumerKey.getRoutingType().toString(); + } finally { + blockOnIO(); + } + } + + @Override + public String getFilterString() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getFilterString(federatedConsumer); + } + clearIO(); + try { + return federatedConsumerKey.getFilterString().toString(); + } finally { + blockOnIO(); + } + } + + @Override + public String getQueueName() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getQueueName(federatedConsumer); + } + clearIO(); + try { + return federatedConsumerKey.getQueueName().toString(); + } finally { + blockOnIO(); + } + } + + @Override + public String getQueueFilterString() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getFilterString(federatedConsumer); //TODO + } + clearIO(); + try { + return federatedConsumerKey.getQueueFilterString().toString(); + } finally { + blockOnIO(); + } + } + + @Override + public String getFqqn() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getName(federatedConsumer); //TODO + } + clearIO(); + try { + return federatedConsumerKey.getFqqn().toString(); + } finally { + blockOnIO(); + } + } + + + @Override + protected MBeanOperationInfo[] fillMBeanOperationInfo() { + return MBeanInfoHelper.getMBeanOperationsInfo(FederationRemoteConsumerControl.class); + } + + @Override + protected MBeanAttributeInfo[] fillMBeanAttributeInfo() { + return MBeanInfoHelper.getMBeanAttributesInfo(FederationRemoteConsumerControl.class); + } + + @Override + public long getFederatedMessageCount() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getMessageCount(federatedConsumer); + } + clearIO(); + try { + return federatedConsumer.federatedMessageCount(); + } finally { + blockOnIO(); + } + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationStreamControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationStreamControlImpl.java new file mode 100644 index 000000000000..6e72155c9f67 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationStreamControlImpl.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.management.impl; + + +import org.apache.activemq.artemis.api.core.management.FederationStreamControl; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.server.federation.FederationStream; +import org.apache.activemq.artemis.logs.AuditLogger; + +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanOperationInfo; + +public class FederationStreamControlImpl extends AbstractControl implements FederationStreamControl { + + private final FederationStream federationStream; + + public FederationStreamControlImpl(final FederationStream federationStream, final StorageManager storageManager) throws Exception { + super(FederationStreamControl.class, storageManager); + this.federationStream = federationStream; + } + + @Override + public String getUser() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getUser(this.federationStream); + } + clearIO(); + try { + return federationStream.getConfig().getConnectionConfiguration().getUsername(); + } finally { + blockOnIO(); + } + } + + @Override + public String getName() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getName(this.federationStream); + } + clearIO(); + try { + return federationStream.getName().toString(); + } finally { + blockOnIO(); + } + } + + @Override + public String[] getStaticConnectors() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getStaticConnectors(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().getStaticConnectors().toArray(new String[0]); + } finally { + blockOnIO(); + } + } + + @Override + public String getDiscoveryGroupName() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getDiscoveryGroupName(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().getDiscoveryGroupName(); + } finally { + blockOnIO(); + } + } + + @Override + public long getRetryInterval() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getRetryInterval(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().getRetryInterval(); + } finally { + blockOnIO(); + } + } + + @Override + public double getRetryIntervalMultiplier() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getRetryIntervalMultiplier(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().getRetryIntervalMultiplier(); + } finally { + blockOnIO(); + } + } + + @Override + public long getMaxRetryInterval() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getMaxRetryInterval(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().getMaxRetryInterval(); + } finally { + blockOnIO(); + } + } + + @Override + public int getReconnectAttempts() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getReconnectAttempts(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().getReconnectAttempts(); + } finally { + blockOnIO(); + } + } + + + @Override + public boolean isSharedConnection() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.isSharedStore(this.federationStream); //TODO + } + clearIO(); + try { + return federationStream.getConnection().isSharedConnection(); + } finally { + blockOnIO(); + } + } + + @Override + public boolean isPull() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.isHA(this.federationStream); //TODO + } + clearIO(); + try { + return federationStream.getConnection().isPull(); + } finally { + blockOnIO(); + } + } + + @Override + public boolean isHA() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.isHA(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().isHA(); + } finally { + blockOnIO(); + } + } + + @Override + public boolean isStarted() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.isStarted(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().isStarted(); + } finally { + blockOnIO(); + } + } + + + @Override + public void start() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.startFederation(this.federationStream); //TODO + } + clearIO(); + try { + federationStream.start(); + } finally { + blockOnIO(); + } + } + + @Override + public void stop() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.stopFederation(this.federationStream); //TODO + } + clearIO(); + try { + federationStream.stop(); + } finally { + blockOnIO(); + } + } + + @Override + protected MBeanOperationInfo[] fillMBeanOperationInfo() { + return MBeanInfoHelper.getMBeanOperationsInfo(FederationStreamControl.class); + } + + @Override + protected MBeanAttributeInfo[] fillMBeanAttributeInfo() { + return MBeanInfoHelper.getMBeanAttributesInfo(FederationStreamControl.class); + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java index 7251eaad23eb..28c57a3ace21 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java @@ -35,4 +35,9 @@ public interface BrokerConnection extends ActiveMQComponent { */ BrokerConnectConfiguration getConfiguration(); + /** + * @return if the connection is currently open. + */ + boolean isOpen(); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedAbstract.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedAbstract.java index 8cb020b87674..333010753be5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedAbstract.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedAbstract.java @@ -117,6 +117,15 @@ public synchronized void createRemoteConsumer(FederatedConsumerKey key, Transfor remoteQueueConsumer = new FederatedQueueConsumerImpl(federation, server, transformer, key, upstream, callback); remoteQueueConsumer.start(); remoteQueueConsumers.put(key, remoteQueueConsumer); + try { + server.getManagementService().registerFederationRemoteConsumer(remoteQueueConsumer); + final FederatedQueueConsumer finalRemoteQueueConsumer = remoteQueueConsumer; + server.callBrokerFederationPlugins(plugin -> plugin.beforeCloseFederatedQueueConsumer(finalRemoteQueueConsumer)); + } catch (Exception t) { + ActiveMQServerLogger.LOGGER.federationPluginExecutionError("beforeCloseFederatedQueueConsumer", t); + throw new IllegalStateException(t.getMessage(), t.getCause()); + } + if (server.hasBrokerFederationPlugins()) { try { @@ -147,6 +156,12 @@ public synchronized void removeRemoteConsumer(FederatedConsumerKey key) { if (remoteQueueConsumer.decrementCount() <= 0) { remoteQueueConsumer.close(); remoteQueueConsumers.remove(key); + try { + server.getManagementService().unregisterFederationRemoteConsumer(federation.getName(), upstream.getName(), key.getAddress(), key.getQueueName(), key.getRoutingType()); + }catch (Exception e) + { + //TODO + } } if (server.hasBrokerFederationPlugins()) { try { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumer.java index d9d945a95e74..8580efbd0815 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumer.java @@ -52,4 +52,6 @@ static int getNextDelay(int delay, int delayMultiplier, int delayMax) { void start(); void close(); + + long federatedMessageCount(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java index 2b854d9c5a1d..57e147eb520c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java @@ -22,6 +22,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; @@ -65,6 +66,8 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi private ClientSessionFactoryInternal clientSessionFactory; private ClientSession clientSession; private ClientConsumerInternal clientConsumer; + + private final AtomicLong federatedMessageCount = new AtomicLong(); private final AtomicInteger pendingPullCredit = new AtomicInteger(); private QueueHandle queueHandle; @@ -246,6 +249,11 @@ public synchronized void close() { } } + @Override + public long federatedMessageCount() { + return federatedMessageCount.get(); + } + private void scheduleDisconnect(int delay) { scheduledExecutorService.schedule(() -> { try { @@ -304,6 +312,7 @@ public void onMessage(ClientMessage clientMessage) { message = transformer == null ? message : transformer.transform(message); if (message != null) { server.getPostOffice().route(message, true); + federatedMessageCount.incrementAndGet(); } clientMessage.acknowledge(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/Federation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/Federation.java index e9b974ccca95..9eaf027ecb50 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/Federation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/Federation.java @@ -80,9 +80,11 @@ public synchronized void stop() { for (FederationUpstream connection : upstreams.values()) { connection.stop(); + server.getManagementService().unregisterFederationStream(connection.getFederation().getName(), connection.getName()); } for (FederationDownstream connection : downstreams.values()) { connection.stop(); + server.getManagementService().unregisterFederationStream(connection.getFederation().getName(),connection.getName()); } upstreams.clear(); downstreams.clear(); @@ -109,12 +111,15 @@ public synchronized boolean undeploy(String name) { FederationUpstream federationConnection = upstreams.remove(name); if (federationConnection != null) { federationConnection.stop(); + server.getManagementService().unregisterFederationStream(federationConnection.getFederation().getName(),federationConnection.getName()); } FederationDownstream federationConnectionDownstream = downstreams.remove(name); if (federationConnectionDownstream != null) { federationConnectionDownstream.undeploy(); federationConnectionDownstream.stop(); + server.getManagementService().unregisterFederationStream(federationConnectionDownstream.getFederation().getName(),federationConnectionDownstream.getName()); } + server.unregisterBrokerConnection(name); return true; } @@ -137,6 +142,7 @@ public synchronized boolean deploy(FederationUpstreamConfiguration upstreamConfi private synchronized FederationUpstream deploy(String name, FederationUpstreamConfiguration upstreamConfiguration) { FederationUpstream upstream = new FederationUpstream(server, this, name, upstreamConfiguration); + server.getManagementService().registerFederationStream(upstream); upstreams.put(name, upstream); if (state == FederationManager.State.STARTED) { upstream.start(); @@ -180,6 +186,8 @@ private synchronized FederationDownstream deploy(String name, FederationDownstre if (state == FederationManager.State.STARTED) { downstream.start(); } + server.registerBrokerConnection(new FederatedBrokerConnection(downstream)); + server.getManagementService().registerFederationStream(downstream); return downstream; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java index 9e213590cfb3..a455c1b06d53 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java @@ -23,11 +23,14 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.config.FederationConfiguration; +import org.apache.activemq.artemis.core.server.management.ManagementService; public class FederationManager implements ActiveMQComponent { private final ActiveMQServer server; + private final ManagementService managementService; + private Map federations = new HashMap<>(); private State state; @@ -47,6 +50,7 @@ enum State { public FederationManager(final ActiveMQServer server) { this.server = server; + this.managementService = server.getManagementService(); //TODO should this be passed in the constructor? } @Override @@ -55,6 +59,7 @@ public synchronized void start() throws ActiveMQException { deploy(); for (Federation federation : federations.values()) { federation.start(); + managementService.registerFederation(federation); } state = State.STARTED; } @@ -67,6 +72,7 @@ public synchronized void stop() { for (Federation federation : federations.values()) { federation.stop(); + managementService.unregisterFederation(federation.getConfig().getName()); } federations.clear(); state = State.STOPPED; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java index 3e543b5b257a..1fe372ce6d70 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java @@ -41,9 +41,13 @@ import org.apache.activemq.artemis.core.security.SecurityStore; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.BrokerConnection; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; +import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer; +import org.apache.activemq.artemis.core.server.federation.Federation; +import org.apache.activemq.artemis.core.server.federation.FederationStream; import org.apache.activemq.artemis.core.server.routing.ConnectionRouter; import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup; @@ -124,6 +128,18 @@ void registerBroadcastGroup(BroadcastGroup broadcastGroup, void unregisterBridge(String name) throws Exception; + void registerFederation(Federation federation); + + void unregisterFederation(String name); + + void registerFederationStream(FederationStream federationStream); + + void unregisterFederationStream(SimpleString federationName, SimpleString streamName); + + void registerFederationRemoteConsumer(FederatedQueueConsumer federatedQueueConsumer); + + void unregisterFederationRemoteConsumer(SimpleString name, SimpleString streamName,SimpleString address, SimpleString queueName, RoutingType routingType) ; + void registerCluster(ClusterConnection cluster, ClusterConnectionConfiguration configuration) throws Exception; void unregisterCluster(String name) throws Exception; @@ -145,4 +161,8 @@ void registerBroadcastGroup(BroadcastGroup broadcastGroup, Object getAttribute(String resourceName, String attribute, SecurityAuth auth); Object invokeOperation(String resourceName, String operation, Object[] params, SecurityAuth auth) throws Exception; + + void registerBrokerConnection(BrokerConnection brokerConnection) throws Exception; + + void unregisterBrokerConnection(String name) throws Exception; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java index 113418a5cc8b..7b755efe2620 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java @@ -52,9 +52,11 @@ import org.apache.activemq.artemis.api.core.management.AddressControl; import org.apache.activemq.artemis.api.core.management.BaseBroadcastGroupControl; import org.apache.activemq.artemis.api.core.management.BridgeControl; +import org.apache.activemq.artemis.api.core.management.BrokerConnectionControl; import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl; import org.apache.activemq.artemis.api.core.management.ConnectionRouterControl; import org.apache.activemq.artemis.api.core.management.DivertControl; +import org.apache.activemq.artemis.api.core.management.FederationControl; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; import org.apache.activemq.artemis.api.core.management.QueueControl; @@ -67,9 +69,13 @@ import org.apache.activemq.artemis.core.management.impl.BaseBroadcastGroupControlImpl; import org.apache.activemq.artemis.core.management.impl.BridgeControlImpl; import org.apache.activemq.artemis.core.management.impl.BroadcastGroupControlImpl; +import org.apache.activemq.artemis.core.management.impl.BrokerConnectionControlImpl; import org.apache.activemq.artemis.core.management.impl.ClusterConnectionControlImpl; import org.apache.activemq.artemis.core.management.impl.ConnectionRouterControlImpl; import org.apache.activemq.artemis.core.management.impl.DivertControlImpl; +import org.apache.activemq.artemis.core.management.impl.FederationControlImpl; +import org.apache.activemq.artemis.core.management.impl.FederationRemoteConsumerControlImpl; +import org.apache.activemq.artemis.core.management.impl.FederationStreamControlImpl; import org.apache.activemq.artemis.core.management.impl.JGroupsChannelBroadcastGroupControlImpl; import org.apache.activemq.artemis.core.management.impl.JGroupsFileBroadcastGroupControlImpl; import org.apache.activemq.artemis.core.management.impl.QueueControlImpl; @@ -88,12 +94,16 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.BrokerConnection; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; +import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer; +import org.apache.activemq.artemis.core.server.federation.Federation; +import org.apache.activemq.artemis.core.server.federation.FederationStream; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback; import org.apache.activemq.artemis.core.server.management.GuardInvocationHandler; @@ -418,6 +428,20 @@ public void unregisterBroadcastGroup(final String name) throws Exception { unregisterFromRegistry(ResourceNames.BROADCAST_GROUP + name); } + + @Override + public void registerBrokerConnection(BrokerConnection brokerConnection) throws Exception { + BrokerConnectionControl control = new BrokerConnectionControlImpl(brokerConnection, storageManager); + registerInJMX(objectNameBuilder.getBrokerConnectionObjectName(brokerConnection.getName()), control); + registerInRegistry(ResourceNames.BROKER_CONNECTION + brokerConnection.getName(), control); + } + + @Override + public void unregisterBrokerConnection(String name) throws Exception { + unregisterFromJMX(objectNameBuilder.getBrokerConnectionObjectName(name)); + unregisterFromRegistry(ResourceNames.BROKER_CONNECTION + name); + } + @Override public void registerBridge(final Bridge bridge) throws Exception { bridge.setNotificationService(this); @@ -432,6 +456,81 @@ public void unregisterBridge(final String name) throws Exception { unregisterFromRegistry(ResourceNames.BRIDGE + name); } + @Override + public void registerFederation(final Federation federation) { + try { + + FederationControl control = new FederationControlImpl(federation, storageManager); + registerInJMX(objectNameBuilder.getFederationObjectName(federation.getConfig().getName()), control); + registerInRegistry(ResourceNames.FEDERATION + federation.getName(), control); + } catch (Exception e) { + //TODO + throw new RuntimeException(e); + } + } + + @Override + public void unregisterFederation(final String name) { + try { + + unregisterFromJMX(objectNameBuilder.getFederationObjectName(name)); + unregisterFromRegistry(ResourceNames.FEDERATION + name); + } catch (Exception e) { + //TODO + throw new RuntimeException(e); + } + } + + @Override + public void registerFederationStream(final FederationStream federationStream) { + try { + + logger.info("Registering federationStream {} for {} with hashcode {}",federationStream.getName(),federationStream.getFederation().getName(),federationStream.hashCode()); + FederationStreamControlImpl control = new FederationStreamControlImpl(federationStream, storageManager); + registerInJMX(objectNameBuilder.getFederationStreamObjectName(federationStream.getFederation().getName(), federationStream.getName()), control); + registerInRegistry(ResourceNames.FEDERATION_STREAM + federationStream.getName(), control); + } catch (Exception e) { + throw new RuntimeException("Error registering: " + federationStream.getName()+" for "+federationStream.getFederation().getName(),e); + } + } + + @Override + public void unregisterFederationStream(final SimpleString federationName, final SimpleString streamName) { + try { + + unregisterFromJMX(objectNameBuilder.getFederationStreamObjectName(federationName, streamName)); + unregisterFromRegistry(ResourceNames.FEDERATION_STREAM + streamName); + } catch (Exception e) { + //TODO + throw new RuntimeException(e); + } + } + + @Override + public void registerFederationRemoteConsumer(final FederatedQueueConsumer federatedQueueConsumer) { + try { + + FederationRemoteConsumerControlImpl control = new FederationRemoteConsumerControlImpl(federatedQueueConsumer, storageManager); + + registerInJMX(objectNameBuilder.getFederationRemoteConsumerObjectName(federatedQueueConsumer.getFederation().getName(),federatedQueueConsumer.getFederationUpstream().getName(),federatedQueueConsumer.getKey().getAddress(), federatedQueueConsumer.getKey().getQueueName(), federatedQueueConsumer.getKey().getRoutingType()), control); + registerInRegistry(ResourceNames.FEDERATION_REMOTE_CONSUMER + federatedQueueConsumer.getKey().getQueueName(), control); + } catch (Exception e) { + //TODO + throw new RuntimeException(e); + } + } + + @Override + public void unregisterFederationRemoteConsumer(final SimpleString name, final SimpleString streamName, final SimpleString address, final SimpleString queueName, RoutingType routingType) { + try { + unregisterFromJMX(objectNameBuilder.getFederationRemoteConsumerObjectName(name,streamName, address, queueName, routingType)); + unregisterFromRegistry(ResourceNames.FEDERATION_REMOTE_CONSUMER + name); + } catch (Exception e) { + //TODO + throw new RuntimeException(e); + } + } + @Override public void registerCluster(final ClusterConnection cluster, final ClusterConnectionConfiguration configuration) throws Exception { ClusterConnectionControl control = new ClusterConnectionControlImpl(cluster, storageManager, configuration); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java index e2ffb3f23195..1ac51934c25d 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java @@ -45,9 +45,13 @@ import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.security.SecurityStore; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.BrokerConnection; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; +import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer; +import org.apache.activemq.artemis.core.server.federation.Federation; +import org.apache.activemq.artemis.core.server.federation.FederationStream; import org.apache.activemq.artemis.core.server.management.GuardInvocationHandler; import org.apache.activemq.artemis.core.server.routing.ConnectionRouter; import org.apache.activemq.artemis.core.server.cluster.Bridge; @@ -316,6 +320,36 @@ public void unregisterBridge(String name) throws Exception { } + @Override + public void registerFederation(Federation federation) { + + } + + @Override + public void unregisterFederation(String name) { + + } + + @Override + public void registerFederationStream(FederationStream federationStream) { + + } + + @Override + public void unregisterFederationStream(SimpleString federationName, SimpleString streamName) { + + } + + @Override + public void registerFederationRemoteConsumer(FederatedQueueConsumer federatedQueueConsumer) { + + } + + @Override + public void unregisterFederationRemoteConsumer(SimpleString name, SimpleString streamame,SimpleString address, SimpleString queueName, RoutingType routingType) { + + } + @Override public void registerCluster(ClusterConnection cluster, ClusterConnectionConfiguration configuration) throws Exception { @@ -372,6 +406,16 @@ public Object invokeOperation(String resourceName, String operation, Object[] pa return null; } + @Override + public void registerBrokerConnection(BrokerConnection brokerConnection) { + + } + + @Override + public void unregisterBrokerConnection(String name) { + + } + @Override public void start() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java index aaa50c78370e..7321c68a183e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java @@ -6092,6 +6092,12 @@ public boolean isStarted() { public BrokerConnectConfiguration getConfiguration() { return null; } + + @Override + public boolean isOpen() { + return false; + } + } Fake fake = new Fake("fake" + UUIDGenerator.getInstance().generateStringUUID()); server.registerBrokerConnection(fake);