diff --git a/bin/container b/bin/container index 39c6f1c25e..1906391b17 100755 --- a/bin/container +++ b/bin/container @@ -20,7 +20,7 @@ git_prefix=${git_url%.*} git_prefix=${git_prefix#*.com/} git_repo=${git_prefix#*.com:} -GCP_PROJECT=$(gcloud config get project) +GCP_PROJECT=$(gcloud config get project) || true REPOSITORY=ghcr.io/${git_repo} TEMPLATES=$(cd etc; ls k8s_*.yaml) || true @@ -100,7 +100,7 @@ if [[ -n $prep ]]; then LIBFILE=build/libs/*-1.0-SNAPSHOT-all.jar build_time=`date --utc -Imin -r $LIBFILE` || true - [[ -n $build_time ]] || build_time=unknown + [[ -n $build_time ]] || build_time="1970-01-01T00:00:00Z" cat < var/deployed_version.json { @@ -116,14 +116,13 @@ EOF fi if [[ -n $build ]]; then - echo Cleaning old images... - images=$(docker images | fgrep $REPOSITORY | awk '{print $1":"$2}' | fgrep -v '') || true - [[ -n $images ]] && docker rmi $images || true - echo Building Dockerfile.$target - echo docker build -f Dockerfile.$target -t $target . - docker build -f Dockerfile.$target -t $target . - docker tag $target $udmi_ref + echo docker build -f Dockerfile.$target -t $udmi_ref . + docker build -f Dockerfile.$target -t $udmi_ref -t $target . + + echo Cleaning dangling images... + images=$(docker images -f "dangling=true" -q) + [[ -n $images ]] && docker rmi $images fi if [[ -n $push ]]; then diff --git a/common/src/main/java/com/google/udmi/util/CertManager.java b/common/src/main/java/com/google/udmi/util/CertManager.java index 92737dcae8..7f10d440f5 100644 --- a/common/src/main/java/com/google/udmi/util/CertManager.java +++ b/common/src/main/java/com/google/udmi/util/CertManager.java @@ -48,7 +48,7 @@ public class CertManager { private final File keyFile; private final File crtFile; private final char[] password; - private final boolean isSsl; + private final Transport transport; { Security.addProvider(new BouncyCastleProvider()); @@ -60,9 +60,9 @@ public class CertManager { public CertManager(File caCrtFile, File clientDir, Transport transport, String passString, Consumer logging) { this.caCrtFile = caCrtFile; - isSsl = Transport.SSL.equals(transport); + this.transport = transport; - if (isSsl) { + if (Transport.SSL.equals(transport)) { String prefix = keyPrefix(clientDir); crtFile = new File(clientDir, prefix + "_private.crt"); keyFile = new File(clientDir, prefix + "_private.pem"); @@ -144,7 +144,10 @@ public SSLSocketFactory getCertSocketFactory() throws Exception { */ public SocketFactory getSocketFactory() { try { - if (!isSsl) { + if (!Transport.SSL.equals(transport)) { + if (Transport.TCP.equals(transport)) { + return SocketFactory.getDefault(); + } return SSLSocketFactory.getDefault(); } return getCertSocketFactory(); diff --git a/contrib/mango/.gitignore b/contrib/mango/.gitignore new file mode 100644 index 0000000000..8fce603003 --- /dev/null +++ b/contrib/mango/.gitignore @@ -0,0 +1 @@ +data/ diff --git a/contrib/mango/mango_docker b/contrib/mango/mango_docker index ac1136866d..3e2a816243 100755 --- a/contrib/mango/mango_docker +++ b/contrib/mango/mango_docker @@ -1,15 +1,8 @@ #!/bin/sh set -o errexit -o nounset -cd "$(dirname "$0")"/mount - -MANGO_VERSION=5.2.0 -curl -sSL https://store.mango-os.com/downloads/m2m2-udmi-${MANGO_VERSION}.zip -o udmi.zip -rm -rf udmi && unzip -q udmi.zip -d udmi && rm udmi.zip - docker run --rm \ -p 8080:8080 \ -p 8443:8443 \ - -v ./data:/opt/mango-data \ - -v ./udmi:/opt/mango/web/modules/udmi \ - ghcr.io/radixiot/mango:${MANGO_VERSION} + -v "$(dirname "$0")"/data:/opt/mango-data \ + ghcr.io/radixiot/mango:latest diff --git a/contrib/mango/mount/.gitignore b/contrib/mango/mount/.gitignore deleted file mode 100644 index 72e8ffc0db..0000000000 --- a/contrib/mango/mount/.gitignore +++ /dev/null @@ -1 +0,0 @@ -* diff --git a/etc/test_itemized.out b/etc/test_itemized.out index 8d555a4ebe..cd4b5d04ae 100644 --- a/etc/test_itemized.out +++ b/etc/test_itemized.out @@ -49,7 +49,7 @@ 1 RESULT fail gateway gateway_proxy_events BETA 0/10 Failed waiting until All proxy devices received pointset 1 RESULT pass gateway gateway_proxy_state PREVIEW 10/10 Sequence complete 1 RESULT pass gateway gateway_proxy_state PREVIEW 10/10 Sequence complete -1 RESULT fail discovery.scan scan_single_future PREVIEW 0/10 Failed check that discovery events were valid: bacnet scan_addr snoozle does not match expression [1-9][0-9]* +1 RESULT fail discovery.scan scan_single_future PREVIEW 0/10 Failed check that discovery events were valid: bacnet scan_addr snoozle does not match expression 0|[1-9][0-9]* 1 RESULT pass discovery.scan scan_periodic_now_enumerate PREVIEW 10/10 Sequence complete 4 RESULT pass gateway gateway_proxy_events BETA 10/10 Sequence complete 1 RESULT pass gateway gateway_proxy_events BETA 10/10 Sequence complete diff --git a/etc/validator.out b/etc/validator.out index 339156c505..da6b3a0c04 100644 --- a/etc/validator.out +++ b/etc/validator.out @@ -339,15 +339,15 @@ sites/udmi_site_model/out/devices/AHU-22/state.out "sub_folder" : "update", "sub_type" : "state", "status" : { - "message" : "missing pointset subblock", - "detail" : "While validating pointset message at REDACTED_TIMESTAMP", + "message" : "1 schema violations found", + "detail" : "state_update: 1 schema violations found; /system: object has missing required properties ([\"serial_no\"])", "category" : "validation.device.schema", "timestamp" : "REDACTED_TIMESTAMP", "level" : 500 }, "errors" : [ { - "message" : "missing pointset subblock", - "detail" : "While validating pointset message at REDACTED_TIMESTAMP", + "message" : "1 schema violations found", + "detail" : "state_update: 1 schema violations found; /system: object has missing required properties ([\"serial_no\"])", "category" : "validation.device.schema", "timestamp" : "REDACTED_TIMESTAMP", "level" : 500 @@ -382,12 +382,19 @@ sites/udmi_site_model/out/devices/AHU-22/state_system.out "sub_folder" : "system", "sub_type" : "state", "status" : { - "message" : "Successful validation", - "category" : "validation.device.receive", + "message" : "1 schema violations found", + "detail" : "state_system: 1 schema violations found; object has missing required properties ([\"serial_no\"])", + "category" : "validation.device.schema", "timestamp" : "REDACTED_TIMESTAMP", - "level" : 200 + "level" : 500 }, - "errors" : [ ] + "errors" : [ { + "message" : "1 schema violations found", + "detail" : "state_system: 1 schema violations found; object has missing required properties ([\"serial_no\"])", + "category" : "validation.device.schema", + "timestamp" : "REDACTED_TIMESTAMP", + "level" : 500 + } ] } :::::::::::::: sites/udmi_site_model/out/devices/GAT-123/config.out @@ -787,14 +794,20 @@ sites/udmi_site_model/out/devices/SNS-4/state.out "sub_folder" : "update", "sub_type" : "state", "status" : { - "message" : "missing pointset subblock", - "detail" : "While validating pointset message at REDACTED_TIMESTAMP", - "category" : "validation.device.schema", + "message" : "Multiple validation errors", + "detail" : "1 schema violations found; Device has missing points: split_threshold, triangulating_axis", + "category" : "validation.device.multiple", "timestamp" : "REDACTED_TIMESTAMP", "level" : 500 }, "errors" : [ { - "message" : "missing pointset subblock", + "message" : "1 schema violations found", + "detail" : "state_update: 1 schema violations found; /system: object has missing required properties ([\"serial_no\"])", + "category" : "validation.device.schema", + "timestamp" : "REDACTED_TIMESTAMP", + "level" : 500 + }, { + "message" : "Device has missing points: split_threshold, triangulating_axis", "detail" : "While validating pointset message at REDACTED_TIMESTAMP", "category" : "validation.device.schema", "timestamp" : "REDACTED_TIMESTAMP", @@ -837,10 +850,17 @@ sites/udmi_site_model/out/devices/SNS-4/state_system.out "sub_folder" : "system", "sub_type" : "state", "status" : { - "message" : "Successful validation", - "category" : "validation.device.receive", + "message" : "1 schema violations found", + "detail" : "state_system: 1 schema violations found; object has missing required properties ([\"serial_no\"])", + "category" : "validation.device.schema", "timestamp" : "REDACTED_TIMESTAMP", - "level" : 200 + "level" : 500 }, - "errors" : [ ] + "errors" : [ { + "message" : "1 schema violations found", + "detail" : "state_system: 1 schema violations found; object has missing required properties ([\"serial_no\"])", + "category" : "validation.device.schema", + "timestamp" : "REDACTED_TIMESTAMP", + "level" : 500 + } ] } diff --git a/pubber/src/main/java/daq/pubber/Pubber.java b/pubber/src/main/java/daq/pubber/Pubber.java index 287d92fbd1..f0d9f1fd78 100644 --- a/pubber/src/main/java/daq/pubber/Pubber.java +++ b/pubber/src/main/java/daq/pubber/Pubber.java @@ -430,8 +430,10 @@ private boolean attemptConnection() { isConnected = false; deviceManager.stop(); super.stop(); - if (deviceTarget == null) { - throw new RuntimeException("Mqtt publisher not initialized"); + if (deviceTarget == null || !deviceTarget.isActive()) { + error("Mqtt publisher not active"); + disconnectMqtt(); + initializeMqtt(); } registerMessageHandlers(); connect(); @@ -455,10 +457,12 @@ public void shutdown() { deviceState.system.operation.mode = SystemMode.SHUTDOWN; } - super.shutdown(); - ifNotNullThen(deviceManager, dm -> captureExceptions("device manager shutdown", dm::shutdown)); - captureExceptions("publishing shutdown state", this::publishSynchronousState); - captureExceptions("disconnecting mqtt", this::disconnectMqtt); + if (isConnected()) { + captureExceptions("Publishing shutdown state", this::publishSynchronousState); + } + ifNotNullThen(deviceManager, dm -> captureExceptions("Device manager shutdown", dm::shutdown)); + captureExceptions("Pubber sender shutdown", super::shutdown); + captureExceptions("Disconnecting mqtt", this::disconnectMqtt); } @Override @@ -536,7 +540,6 @@ public void persistEndpoint(EndpointConfiguration endpoint) { writePersistentStore(); } - @Override public void resetConnection(String targetEndpoint) { try { diff --git a/pubber/src/main/java/daq/pubber/BacnetProvider.java b/pubber/src/main/java/daq/pubber/PubberBacnetProvider.java similarity index 86% rename from pubber/src/main/java/daq/pubber/BacnetProvider.java rename to pubber/src/main/java/daq/pubber/PubberBacnetProvider.java index c4ba34d140..716e93abd7 100644 --- a/pubber/src/main/java/daq/pubber/BacnetProvider.java +++ b/pubber/src/main/java/daq/pubber/PubberBacnetProvider.java @@ -20,8 +20,10 @@ import java.util.function.BiConsumer; import java.util.stream.Collectors; import udmi.lib.base.ManagerBase; +import udmi.lib.client.LocalnetManager; import udmi.lib.intf.ManagerHost; import udmi.schema.DiscoveryEvents; +import udmi.schema.FamilyLocalnetState; import udmi.schema.Level; import udmi.schema.Metadata; import udmi.schema.PointPointsetModel; @@ -30,9 +32,10 @@ /** * Provides for the bacnet family of stuffs. */ -public class BacnetProvider extends ManagerBase implements PubberFamilyProvider { +public class PubberBacnetProvider extends ManagerBase implements PubberFamilyProvider { private static final int BACNET_DISCOVERY_RATE_SEC = 1; + private final LocalnetManager localnetHost; private final Deque toReport = new ArrayDeque<>(); private Map bacnetDevices; private BiConsumer publisher; @@ -41,10 +44,16 @@ public class BacnetProvider extends ManagerBase implements PubberFamilyProvider /** * Provider for metadata-based (simulated) bacnet discovery. */ - public BacnetProvider(ManagerHost host, String family, String deviceId) { + public PubberBacnetProvider(ManagerHost host, String family, String deviceId) { super(host, deviceId); checkState(family.equals(BACNET)); - updateInterval(BACNET_DISCOVERY_RATE_SEC); + localnetHost = ((LocalnetManager) host); + } + + private void addStateMapEntry() { + FamilyLocalnetState stateEntry = new FamilyLocalnetState(); + stateEntry.addr = getBacnetAddr(getMetadata(deviceId)); + localnetHost.update(BACNET, stateEntry); } @Override @@ -87,12 +96,12 @@ public synchronized void startScan(boolean enumerate, toReport.addAll(bacnetDevices.keySet()); this.publisher = publisher; this.enumerate = enumerate; - startPeriodicSend(); + updateInterval(BACNET_DISCOVERY_RATE_SEC); } @Override public synchronized void stopScan() { - cancelPeriodicSend(); + super.stop(); } @Override @@ -117,5 +126,6 @@ public void setSiteModel(SiteModel siteModel) { bacnetDevices = siteModel.allMetadata().entrySet().stream() .filter(entry -> nonNull(getBacnetAddr(entry.getValue()))) .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + addStateMapEntry(); } } diff --git a/pubber/src/main/java/daq/pubber/PubberGatewayManager.java b/pubber/src/main/java/daq/pubber/PubberGatewayManager.java index 27184a447c..e0225ec02b 100644 --- a/pubber/src/main/java/daq/pubber/PubberGatewayManager.java +++ b/pubber/src/main/java/daq/pubber/PubberGatewayManager.java @@ -47,7 +47,8 @@ public void setMetadata(Metadata metadata) { @Override public void activate() { - ifNotNullThen(proxyDevices, p -> p.values().forEach(ProxyDeviceHost::activate)); + ifNotNullThen(proxyDevices, p -> p.values() + .parallelStream().forEach(ProxyDeviceHost::activate)); } @Override diff --git a/pubber/src/main/java/daq/pubber/PubberIpProvider.java b/pubber/src/main/java/daq/pubber/PubberIpProvider.java index 96b569f793..c8b1864a77 100644 --- a/pubber/src/main/java/daq/pubber/PubberIpProvider.java +++ b/pubber/src/main/java/daq/pubber/PubberIpProvider.java @@ -75,6 +75,27 @@ static String getDefaultInterfaceStatic(List routeLines) { return currentInterface.get(); } + /** + * Try to use first match per family. + * + *
+   * ip addr show eth0
+   * 149: eth0@if150: *BROADCAST,MULTICAST,UP,LOWER_UP,M-DOWN> mtu 1500 qdisc noqueue state UP
+   *     link/ether 02:42:ac:11:00:10 brd ff:ff:ff:ff:ff:ff
+   *     inet 10.0.0.10/24 brd 10.0.0.255 scope global eth0
+   *        valid_lft forever preferred_lft forever
+   *     inet6 fd00:1234:abc:1::10/120 scope global flags 02
+   *        valid_lft forever preferred_lft forever
+   *     inet6 fe80::42:acff:fe11:10/64 scope link
+   *        valid_lft forever preferred_lft forever
+   *
+   * ip -6 route
+   * fd00:1234:abc:1::/120 dev eth0  metric 256
+   * fe80::/64 dev eth0  metric 256
+   * default via fd00:1234:abc:1::1 dev eth0  metric 1024
+   * multicast ff00::/8 dev eth0  metric 256
+   * 
+ */ @VisibleForTesting static Map getInterfaceAddressesStatic(List strings) { Map interfaceMap = new HashMap<>(); @@ -83,7 +104,7 @@ static Map getInterfaceAddressesStatic(List strings) { Matcher matcher = pattern.matcher(line); if (matcher.matches()) { String family = IFACE_MAP.get(matcher.group(1)); - interfaceMap.put(family, matcher.group(2)); + interfaceMap.putIfAbsent(family, matcher.group(2)); } } }); diff --git a/pubber/src/main/java/daq/pubber/PubberLocalnetManager.java b/pubber/src/main/java/daq/pubber/PubberLocalnetManager.java index 661912e5e6..3ee53cc88b 100644 --- a/pubber/src/main/java/daq/pubber/PubberLocalnetManager.java +++ b/pubber/src/main/java/daq/pubber/PubberLocalnetManager.java @@ -1,5 +1,7 @@ package daq.pubber; +import static com.google.udmi.util.GeneralUtils.catchToNull; + import com.google.udmi.util.SiteModel; import java.util.HashMap; import java.util.Map; @@ -11,6 +13,7 @@ import udmi.lib.intf.ManagerHost; import udmi.schema.LocalnetConfig; import udmi.schema.LocalnetState; +import udmi.schema.Metadata; import udmi.schema.PubberConfiguration; /** @@ -27,7 +30,7 @@ public class PubberLocalnetManager extends PubberManager implements LocalnetMana ProtocolFamily.IPV_4, PubberIpProvider.class, ProtocolFamily.IPV_6, PubberIpProvider.class, ProtocolFamily.ETHER, PubberIpProvider.class, - ProtocolFamily.BACNET, BacnetProvider.class); + ProtocolFamily.BACNET, PubberBacnetProvider.class); /** * Create a new container with the given host. @@ -36,13 +39,7 @@ public PubberLocalnetManager(ManagerHost host, PubberConfiguration configuration super(host, configuration); localnetState = new LocalnetState(); localnetState.families = new HashMap<>(); - localnetProviders = new HashMap<>(); - LOCALNET_PROVIDERS.forEach((family, providerClass) -> { - if (host instanceof Pubber || providerClass != PubberIpProvider.class) { - localnetProviders.put(family, instantiateProvider(family)); - } - }); } /** @@ -58,7 +55,27 @@ PubberFamilyProvider instantiateProvider(String family) { } } + /** + * Set site model. + */ public void setSiteModel(SiteModel siteModel) { + LOCALNET_PROVIDERS.forEach((family, providerClass) -> { + if (providerClass == PubberVendorProvider.class) { + localnetProviders.put(family, instantiateProvider(family)); + return; + } + if (providerClass == PubberBacnetProvider.class && host instanceof Pubber) { + localnetProviders.put(family, instantiateProvider(family)); + return; + } + if (providerClass == PubberIpProvider.class && host instanceof Pubber pubberHost) { + Metadata metadata = siteModel.getMetadata(getDeviceId()); + String addr = catchToNull(() -> metadata.localnet.families.get(family).addr); + if (addr != null) { + localnetProviders.put(family, instantiateProvider(family)); + } + } + }); localnetProviders.forEach((key, value) -> value.setSiteModel(siteModel)); } diff --git a/pubber/src/main/java/daq/pubber/PubberPointsetManager.java b/pubber/src/main/java/daq/pubber/PubberPointsetManager.java index 917560602a..5384e549b4 100644 --- a/pubber/src/main/java/daq/pubber/PubberPointsetManager.java +++ b/pubber/src/main/java/daq/pubber/PubberPointsetManager.java @@ -39,7 +39,7 @@ public class PubberPointsetManager extends PubberManager implements PointsetMana private static final Map DEFAULT_POINTS = ImmutableMap.of( "recalcitrant_angle", makePointPointsetModel(true, 50, 50, "Celsius"), "faulty_finding", makePointPointsetModel(true, 40, 0, "deg"), - "superimposition_reading", makePointPointsetModel(false) + "superimposition_reading", new PointPointsetModel() ); private final ExtraPointsetEvent pointsetEvent = new ExtraPointsetEvent(); private final Map managedPoints = new HashMap<>(); @@ -65,11 +65,6 @@ private static PointPointsetModel makePointPointsetModel(boolean writable, int v return pointMetadata; } - private static PointPointsetModel makePointPointsetModel(boolean writable) { - PointPointsetModel pointMetadata = new PointPointsetModel(); - return pointMetadata; - } - private AbstractPoint makePoint(String name, PointPointsetModel point) { if (BOOLEAN_UNITS.contains(point.units)) { return new PubberRandomBoolean(name, point); @@ -88,11 +83,6 @@ public PointPointsetState getPointState(AbstractPoint point) { return ifTrueGet(options.noPointState, PointPointsetState::new, pointState); } - @Override - public void restorePoint(String pointName) { - PointsetManager.super.restorePoint(pointName); - } - /** * Set the underlying static model for this pointset. This is information that would NOT be * normally available for a device, but would, e.g. be programmed directly into a device. It's @@ -181,9 +171,8 @@ public void updatePointsetPointsConfig(PointsetConfig config) { "state/config pointset mismatch"); // Special testing provisions for forcing an extra point (designed to cause a violation). - ifNotNullThen(options.extraPoint, - extraPoint -> pointsetEvent.points.put(extraPoint, - udmi.lib.client.PointsetManager.extraPointsetEvent())); + ifNotNullThen(options.extraPoint, extraPoint -> pointsetEvent.points.put(extraPoint, + PointsetManager.extraPointsetEvent())); // Mark device state as dirty, so the system will send a consolidated state update. updateState(); diff --git a/pubber/src/main/java/daq/pubber/PubberProxyDevice.java b/pubber/src/main/java/daq/pubber/PubberProxyDevice.java index 45fef9ef42..fd15c39ede 100644 --- a/pubber/src/main/java/daq/pubber/PubberProxyDevice.java +++ b/pubber/src/main/java/daq/pubber/PubberProxyDevice.java @@ -47,22 +47,24 @@ private static PubberConfiguration makeProxyConfiguration(ManagerHost host, Stri @Override public void shutdown() { deviceManager.shutdown(); + isActive().set(false); } @Override public void stop() { deviceManager.stop(); + isActive().set(false); } private void publishDirtyState() { if (stateDirty.getAndSet(false)) { - pubberHost.publish(deviceId, deviceState); + publish(deviceId, deviceState); } } @Override public void publish(String targetId, Object message) { - pubberHost.publish(targetId, message); + publish(message); } @Override diff --git a/pubber/src/main/java/daq/pubber/PubberSystemManager.java b/pubber/src/main/java/daq/pubber/PubberSystemManager.java index 376f194c0e..460b12688d 100644 --- a/pubber/src/main/java/daq/pubber/PubberSystemManager.java +++ b/pubber/src/main/java/daq/pubber/PubberSystemManager.java @@ -73,7 +73,9 @@ public PubberSystemManager(ManagerHost host, PubberConfiguration configuration) systemState.operation.operational = true; systemState.operation.mode = SystemMode.INITIAL; - systemState.serial_no = configuration.serialNo; + if (host instanceof Pubber) { + systemState.serial_no = configuration.serialNo; + } systemState.last_config = new Date(0); ifNotNullThen(options.extraField, value -> systemState.extraField = value); diff --git a/pubber/src/main/java/daq/pubber/PubberUdmiPublisher.java b/pubber/src/main/java/daq/pubber/PubberUdmiPublisher.java index 2a57345c9b..a2573c9fd5 100644 --- a/pubber/src/main/java/daq/pubber/PubberUdmiPublisher.java +++ b/pubber/src/main/java/daq/pubber/PubberUdmiPublisher.java @@ -27,6 +27,7 @@ import static udmi.lib.base.MqttDevice.ERRORS_TOPIC; import static udmi.lib.base.MqttDevice.STATE_TOPIC; import static udmi.lib.base.MqttPublisher.DEFAULT_CONFIG_WAIT_SEC; +import static udmi.lib.client.SystemManager.UDMI_PUBLISHER_LOG_CATEGORY; import static udmi.schema.BlobsetConfig.SystemBlobsets.IOT_ENDPOINT_CONFIG; import com.google.common.collect.ImmutableMap; @@ -218,14 +219,6 @@ private void sendPartialState() { publishStateMessage(dupeState); } - private void sendDupeState() { - State dupeState = new State(); - dupeState.system = getDeviceState().system; - dupeState.timestamp = getDeviceState().timestamp; - dupeState.version = getDeviceState().version; - publishStateMessage(dupeState); - } - @Override default void publish(String targetId, Object message) { publishDeviceMessage(targetId, message); @@ -266,7 +259,6 @@ static String getGatewayId(String targetId, PubberConfiguration configuration) { * whether the device is a gateway or a proxy device. */ default void registerMessageHandlers() { - getDeviceTarget().unregisterHandlers(); getDeviceTarget().registerHandler(CONFIG_TOPIC, this::configHandler, Config.class); String gatewayId = getGatewayId(getDeviceId(), getConfig()); if (isGatewayDevice()) { @@ -280,7 +272,16 @@ default void registerMessageHandlers() { } } + /** + * Get MqttDevice for given proxy. + * + * @param proxyId Proxy device id + * @return MqttDevice + */ default MqttDevice getMqttDevice(String proxyId) { + if (getDeviceTarget() == null) { + return null; + } return new MqttDevice(proxyId, getDeviceTarget()); } @@ -322,10 +323,12 @@ default void publisherHandler(String type, String phase, Throwable cause, String if (isTrue(getConfig().options.barfConfig)) { error("Restarting system because of restart-on-error configuration setting"); getDeviceManager().systemLifecycle(SystemMode.RESTART); + return; } } String usePhase = isTrue(getOptions().badCategory) ? "apply" : phase; - String category = format(SYSTEM_CATEGORY_FORMAT, type, usePhase); + String category = type == null ? UDMI_PUBLISHER_LOG_CATEGORY : + format(SYSTEM_CATEGORY_FORMAT, type, usePhase); Entry report = entryFromException(category, cause); getDeviceManager().localLog(report); publishLogMessage(report, targetId); @@ -399,7 +402,12 @@ private String exceptionDetail(Throwable e) { return buffer.toString(); } - private void configHandler(Config config) { + /** + * Configures the handler with the given configuration. + * + * @param config The configuration to be applied. + */ + default void configHandler(Config config) { try { configPreprocess(getDeviceId(), config); debug(format("Config update %s%s", getDeviceId(), getDeviceManager().getTestingTag()), @@ -456,6 +464,7 @@ private void processConfigUpdate(Config configMsg) { if (configMsg.system == null && isTrue(getConfig().options.barfConfig)) { error("Empty config system block and configured to restart on bad config!"); getDeviceManager().systemLifecycle(SystemMode.RESTART); + return; } GeneralUtils.copyFields(configMsg, getDeviceConfig(), true); info(format("%s received config %s", getTimestamp(), isoConvert(configMsg.timestamp))); @@ -483,7 +492,6 @@ default void checkSmokyFailure() { } } - /** * Deferred config actions. */ @@ -491,14 +499,16 @@ default void deferredConfigActions() { if (!isConnected()) { return; } - - getDeviceManager().maybeRestartSystem(); - - // Do redirect after restart system check, since this might take a long time. - maybeRedirectEndpoint(); + DeviceManager deviceManager = getDeviceManager(); + deviceManager.maybeRestartSystem(); + SystemState systemState = deviceManager.getSystemManager().getSystemState(); + SystemMode mode = catchToNull(() -> systemState.operation.mode); + if (SystemMode.INITIAL.equals(mode) || SystemMode.ACTIVE.equals(mode)) { + // Do redirect after restart system check, since this might take a long time. + maybeRedirectEndpoint(getExtractedEndpoint()); + } } - /** * For testing, if configured, send a slate of bad messages for testing by the message handling * infrastructure. Uses the sekrit REPLACE_MESSAGE_WITH field to sneak bad output into the pipe. @@ -618,11 +628,11 @@ private void removeBlobsetBlobState(SystemBlobsets blobId) { * Attempts to redirect the endpoint based on configuration settings and handles redirection * logic. */ - default void maybeRedirectEndpoint() { + default void maybeRedirectEndpoint(EndpointConfiguration extractedEndpoint) { String redirectRegistry = getConfig().options.redirectRegistry; String currentSignature = toJsonString(getConfig().endpoint); String extractedSignature = - redirectRegistry == null ? toJsonString(getExtractedEndpoint()) + redirectRegistry == null ? toJsonString(extractedEndpoint) : redirectedEndpoint(redirectRegistry); if (extractedSignature == null) { @@ -638,18 +648,18 @@ default void maybeRedirectEndpoint() { return; // No need to redirect anything! } - if (getExtractedEndpoint() != null) { - if (!Objects.equals(endpointState.generation, getExtractedEndpoint().generation)) { + if (extractedEndpoint != null) { + if (!Objects.equals(endpointState.generation, extractedEndpoint.generation)) { notice("Starting new endpoint generation"); endpointState.phase = null; endpointState.status = null; - endpointState.generation = getExtractedEndpoint().generation; + endpointState.generation = extractedEndpoint.generation; } - if (getExtractedEndpoint().error != null) { + if (extractedEndpoint.error != null) { setAttemptedEndpoint(extractedSignature); endpointState.phase = BlobPhase.FINAL; - Exception applyError = new RuntimeException(getExtractedEndpoint().error); + Exception applyError = new RuntimeException(extractedEndpoint.error); endpointState.status = exceptionStatus(applyError, Category.BLOBSET_BLOB_APPLY); publishSynchronousState(); return; @@ -663,7 +673,7 @@ default void maybeRedirectEndpoint() { endpointState.phase = BlobPhase.APPLY; publishSynchronousState(); resetConnection(extractedSignature); - persistEndpoint(getExtractedEndpoint()); + persistEndpoint(extractedEndpoint); endpointState.phase = BlobPhase.FINAL; markStateDirty(); } catch (Exception e) { @@ -683,7 +693,6 @@ default void maybeRedirectEndpoint() { String getWorkingEndpoint(); - void setAttemptedEndpoint(String s); String getAttemptedEndpoint(); diff --git a/pubber/src/main/java/udmi/lib/base/ListPublisher.java b/pubber/src/main/java/udmi/lib/base/ListPublisher.java index 310f78e78e..f904e87ad7 100644 --- a/pubber/src/main/java/udmi/lib/base/ListPublisher.java +++ b/pubber/src/main/java/udmi/lib/base/ListPublisher.java @@ -55,12 +55,11 @@ public void registerHandler(String deviceId, String topicSuffix, Consumer handler, Class messageType) { Consumer foo = (Consumer) handler; Class clazz = (Class) messageType; - handlers.put(topicSuffix, new SimpleEntry<>(foo, clazz)); - } - - @Override - public void unregisterHandlers() { - handlers.clear(); + if (handler == null) { + handlers.remove(topicSuffix); + } else { + handlers.put(topicSuffix, new SimpleEntry<>(foo, clazz)); + } } @Override diff --git a/pubber/src/main/java/udmi/lib/base/ManagerBase.java b/pubber/src/main/java/udmi/lib/base/ManagerBase.java index 5c9e8b45b2..6a4e46a048 100644 --- a/pubber/src/main/java/udmi/lib/base/ManagerBase.java +++ b/pubber/src/main/java/udmi/lib/base/ManagerBase.java @@ -183,21 +183,24 @@ protected synchronized void startPeriodicSend() { protected synchronized void cancelPeriodicSend() { if (periodicSender != null) { - try { - warn(format("Terminating %s %s sender", deviceId, this.getClass().getSimpleName())); - periodicSender.cancel(false); - } catch (Exception e) { - throw new RuntimeException("While cancelling executor", e); - } finally { - periodicSender = null; - } + warn(format("Terminating %s %s sender", deviceId, this.getClass().getSimpleName())); + cancelFuture(periodicSender); + periodicSender = null; } if (initialUpdate != null) { - initialUpdate.cancel(false); + cancelFuture(initialUpdate); initialUpdate = null; } } + private void cancelFuture(ScheduledFuture future) { + try { + future.cancel(false); + } catch (Exception e) { + throw new RuntimeException("While cancelling future", e); + } + } + private void stopExecutor() { try { executor.shutdown(); diff --git a/pubber/src/main/java/udmi/lib/base/MqttDevice.java b/pubber/src/main/java/udmi/lib/base/MqttDevice.java index 5d1d30f642..4d88472b57 100644 --- a/pubber/src/main/java/udmi/lib/base/MqttDevice.java +++ b/pubber/src/main/java/udmi/lib/base/MqttDevice.java @@ -62,10 +62,6 @@ public void registerHandler(String topicSuffix, Consumer handler, Class publishCore(deviceId, topicSuffix, data, callback)); + safeSleep(ATTACH_DELAY_MS); + if (isActive()) { + publisherExecutor.submit(() -> publishCore(deviceId, topicSuffix, data, callback)); + } return; } if (callback != null) { @@ -219,9 +222,15 @@ private void publishCore(String deviceId, String topicSuffix, Object data, Runna if (mqttClients.isEmpty()) { warn("Last client closed, shutting down connection."); close(); + shutdown(); + // Force reconnect to address potential bad states + onError.accept(new ConnectionClosedException()); } - } else { + } else if (getGatewayId().equals(deviceId)) { close(); + shutdown(); + // Force reconnect to address potential bad states + onError.accept(new ConnectionClosedException()); } } } @@ -281,7 +290,6 @@ public void close() { try { warn("Closing publisher connection"); mqttClients.keySet().forEach(this::closeMqttClient); - unregisterHandlers(); } catch (Exception e) { error("While closing publisher", deviceId, null, "close", e); } @@ -308,7 +316,7 @@ private void validateCloudIotOptions() { private MqttClient newProxyClient(String deviceId) { String gatewayId = getGatewayId(); info(format("Connecting device %s through gateway %s", deviceId, gatewayId)); - final MqttClient mqttClient = getConnectedClient(gatewayId, true); + final MqttClient mqttClient = getConnectedClient(gatewayId); long timeToWait = mqttClient.getTimeToWait(); try { startupLatchWait(connectionLatch, "gateway startup exchange"); @@ -499,20 +507,13 @@ public void registerHandler(String deviceId, String mqttSuffix, info(format("Removing handler %s", handlerKey)); handlers.remove(handlerKey); handlersType.remove(handlerKey); - } else if (handlers.put(handlerKey, (Consumer) handler) == null) { + } else { + handlers.put(handlerKey, (Consumer) handler); info(format("Registered handler for %s as %s", handlerKey, messageType.getSimpleName())); handlersType.put(handlerKey, (Class) messageType); - } else { - throw new IllegalStateException("Overwriting existing handler " + handlerKey); } } - @Override - public void unregisterHandlers() { - handlers.clear(); - handlersType.clear(); - } - private String getHandlerKey(String topic) { return format(HANDLER_KEY_FORMAT, registryId, topic); } @@ -533,7 +534,7 @@ private String getDeviceId(String topic) { public void connect(String targetId, boolean clean) { ifTrueThen(clean, () -> closeMqttClient(targetId)); - getConnectedClient(targetId, true); + getConnectedClient(targetId); } private void success(String message, String deviceId, String type, String phase) { @@ -570,9 +571,9 @@ private boolean sendMessage(String deviceId, String mqttTopic, private MqttClient getActiveClient(String targetId) { checkAuthentication(targetId); - MqttClient connectedClient = getConnectedClient(targetId, false); - if (connectedClient != null && connectedClient.isConnected()) { - return connectedClient; + MqttClient client = getConnectedClient(targetId); + if (client.isConnected()) { + return client; } return null; } @@ -594,23 +595,21 @@ private void checkAuthentication(String targetId) { warn("Authentication retry time reached for " + authId); reauthTimes.remove(authId); synchronized (mqttClients) { - MqttClient client = cleanClients(authId); try { - client.disconnect(); - client.close(); + close(); + shutdown(); + // Force reconnect to address potential bad states + onError.accept(new ConnectionClosedException()); } catch (Exception e) { throw new RuntimeException("While trying to reconnect mqtt client", e); } } } - private MqttClient getConnectedClient(String deviceId, boolean proxyActiveOnly) { + private MqttClient getConnectedClient(String deviceId) { try { synchronized (mqttClients) { if (isProxyDevice(deviceId)) { - if (!proxyActiveOnly && !mqttClients.containsKey(deviceId)) { - return null; - } return mqttClients.computeIfAbsent(deviceId, this::newProxyClient); } return mqttClients.computeIfAbsent(deviceId, this::newDirectClient); @@ -725,6 +724,8 @@ public void connectionLost(Throwable cause) { boolean connected = cleanClients(deviceId).isConnected(); warn("MQTT Connection Lost: " + connected + cause); close(); + shutdown(); + // Force reconnect to address potential bad states onError.accept(new ConnectionClosedException()); } diff --git a/pubber/src/main/java/udmi/lib/client/DiscoveryManager.java b/pubber/src/main/java/udmi/lib/client/DiscoveryManager.java index 3e5888ec91..35cb427588 100644 --- a/pubber/src/main/java/udmi/lib/client/DiscoveryManager.java +++ b/pubber/src/main/java/udmi/lib/client/DiscoveryManager.java @@ -10,11 +10,11 @@ import static java.lang.Math.floorMod; import static java.lang.String.format; import static java.util.Optional.ofNullable; -import static java.util.function.Predicate.not; import static udmi.schema.FamilyDiscoveryState.Phase.PENDING; import static udmi.schema.FamilyDiscoveryState.Phase.STOPPED; import java.time.Instant; +import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -31,8 +31,6 @@ * Discovery client. */ public interface DiscoveryManager extends SubBlockManager { - - /** * Determines whether enumeration to a specific depth level is required. * @@ -59,8 +57,8 @@ default void updateDiscoveryScan(Map raw) { Map families = ofNullable(raw).orElse(Map.of()); ifNullThen(getDiscoveryState().families, () -> getDiscoveryState().families = new HashMap<>()); - List toRemove = getDiscoveryState().families.keySet().stream() - .filter(not(families::containsKey)).toList(); + List toRemove = new ArrayList<>(getDiscoveryState().families.keySet()); + toRemove.removeIf(families::containsKey); toRemove.forEach(this::removeDiscoveryScan); families.keySet().forEach(this::scheduleDiscoveryScan); @@ -221,5 +219,4 @@ default boolean shouldEnumerate(String family) { void startDiscoveryScan(String family, Date scanGeneration); void updateDiscoveryEnumeration(DiscoveryConfig config); - } diff --git a/pubber/src/main/java/udmi/lib/client/ProxyDeviceHost.java b/pubber/src/main/java/udmi/lib/client/ProxyDeviceHost.java index 042b37a798..6acb68e7b6 100644 --- a/pubber/src/main/java/udmi/lib/client/ProxyDeviceHost.java +++ b/pubber/src/main/java/udmi/lib/client/ProxyDeviceHost.java @@ -1,8 +1,10 @@ package udmi.lib.client; import static com.google.udmi.util.GeneralUtils.friendlyStackTrace; +import static com.google.udmi.util.JsonUtil.safeSleep; import static java.lang.String.format; import static udmi.lib.base.ManagerBase.updateStateHolder; +import static udmi.lib.base.MqttPublisher.DEFAULT_CONFIG_WAIT_SEC; import java.util.concurrent.atomic.AtomicBoolean; import udmi.lib.base.MqttDevice; @@ -36,16 +38,23 @@ public interface ProxyDeviceHost extends ManagerHost, ManagerLog { * Logs information or errors based on the success or failure of these operations. */ default void activate() { - try { - isActive().set(false); - info("Activating proxy device " + getDeviceId()); - MqttDevice mqttDevice = getUdmiPublisher().getMqttDevice(getDeviceId()); - mqttDevice.registerHandler(MqttDevice.CONFIG_TOPIC, this::configHandler, Config.class); - mqttDevice.connect(getDeviceId()); - getDeviceManager().activate(); - isActive().set(true); - } catch (Exception e) { - error(format("Could not connect proxy device %s: %s", getDeviceId(), friendlyStackTrace(e))); + while (getUdmiPublisher().isConnected() && !isActive().get()) { + try { + isActive().set(false); + info("Activating proxy device " + getDeviceId()); + MqttDevice mqttDevice = getUdmiPublisher().getMqttDevice(getDeviceId()); + if (mqttDevice == null) { + throw new RuntimeException("Publisher is not connected"); + } + mqttDevice.registerHandler(MqttDevice.CONFIG_TOPIC, this::configHandler, Config.class); + mqttDevice.connect(getDeviceId()); + getDeviceManager().activate(); + isActive().set(true); + } catch (Exception e) { + error(format("Could not connect proxy device %s: %s", + getDeviceId(), friendlyStackTrace(e))); + safeSleep(DEFAULT_CONFIG_WAIT_SEC * 1000); + } } } @@ -76,7 +85,6 @@ default void update(Object update) { getStateDirty().set(true); } - @Override default FamilyProvider getLocalnetProvider(String family) { return getManagerHost().getLocalnetProvider(family); @@ -88,9 +96,7 @@ default void setMetadata(Metadata metadata) { void error(String message); - AtomicBoolean getStateDirty(); State getDeviceState(); - } diff --git a/pubber/src/main/java/udmi/lib/client/SystemManager.java b/pubber/src/main/java/udmi/lib/client/SystemManager.java index 660a4f8e0d..8ecf985e27 100644 --- a/pubber/src/main/java/udmi/lib/client/SystemManager.java +++ b/pubber/src/main/java/udmi/lib/client/SystemManager.java @@ -110,6 +110,7 @@ default void maybeRestartSystem() { && SystemMode.RESTART.equals(configMode)) { error("System mode requesting device restart"); systemLifecycle(SystemMode.RESTART); + return; } if (SystemMode.ACTIVE.equals(configMode)) { diff --git a/pubber/src/main/java/udmi/lib/intf/Publisher.java b/pubber/src/main/java/udmi/lib/intf/Publisher.java index d0dbcc5809..f07fe8de4d 100644 --- a/pubber/src/main/java/udmi/lib/intf/Publisher.java +++ b/pubber/src/main/java/udmi/lib/intf/Publisher.java @@ -27,11 +27,6 @@ public interface Publisher { void registerHandler(String deviceId, String topicSuffix, Consumer handler, Class messageType); - /** - * Unregister handlers. - */ - void unregisterHandlers(); - /** * Connect the given device id. * diff --git a/pubber/src/main/java/udmi/lib/intf/UdmiPublisher.java b/pubber/src/main/java/udmi/lib/intf/UdmiPublisher.java index b03d77bd3d..5b138d1d9a 100644 --- a/pubber/src/main/java/udmi/lib/intf/UdmiPublisher.java +++ b/pubber/src/main/java/udmi/lib/intf/UdmiPublisher.java @@ -8,6 +8,8 @@ */ public interface UdmiPublisher extends ManagerHost { + boolean isConnected(); + MqttDevice getMqttDevice(String deviceId); void configPreprocess(String deviceId, Config config); diff --git a/pubber/src/test/java/daq/pubber/PubberTest.java b/pubber/src/test/java/daq/pubber/PubberTest.java index ac11d8ade4..f9881fdd48 100644 --- a/pubber/src/test/java/daq/pubber/PubberTest.java +++ b/pubber/src/test/java/daq/pubber/PubberTest.java @@ -196,9 +196,9 @@ public void extractedEndpointConfigBlob() { } @Test - public void redirectEndpoint() throws InterruptedException { + public void redirectEndpoint() { configurePubberEndpoint(); - pubber.maybeRedirectEndpoint(); + pubber.maybeRedirectEndpoint(pubber.getExtractedEndpoint()); assertEquals(BlobPhase.FINAL, pubber.getDeviceState().blobset.blobs.get(IOT_ENDPOINT_CONFIG.value()).phase); Date initialGeneration = pubber.getDeviceState().blobset.blobs.get( @@ -206,7 +206,7 @@ public void redirectEndpoint() throws InterruptedException { assertNotEquals(null, initialGeneration); configurePubberRedirect(); - pubber.maybeRedirectEndpoint(); + pubber.maybeRedirectEndpoint(pubber.getExtractedEndpoint()); assertEquals(BlobPhase.FINAL, pubber.getDeviceState().blobset.blobs.get(IOT_ENDPOINT_CONFIG.value()).phase); Date redirectGeneration = pubber.getDeviceState().blobset.blobs.get( diff --git a/tests/sites/missing/devices/AHU-22/expected/errors.map b/tests/sites/missing/devices/AHU-22/expected/errors.map index edbfc00a79..6c9ae65ca3 100644 --- a/tests/sites/missing/devices/AHU-22/expected/errors.map +++ b/tests/sites/missing/devices/AHU-22/expected/errors.map @@ -1,10 +1,10 @@ Exceptions for AHU-22 While converting device config While converting point filter_differential_pressure_sensor - protocol ref FAXLE12.present_value does not match expression bacnet://[1-9][0-9]*/([A-Z]{2,4}):([1-9][0-9]*)(#[_a-z]+)? + protocol ref FAXLE12.present_value does not match expression bacnet://(0|[1-9][0-9]*)/([A-Z]{2,4}):(0|[1-9][0-9]*)(#[_a-z]+)? While converting device config While converting point filter_alarm_pressure_status - protocol ref MCN8.present_value does not match expression bacnet://[1-9][0-9]*/([A-Z]{2,4}):([1-9][0-9]*)(#[_a-z]+)? + protocol ref MCN8.present_value does not match expression bacnet://(0|[1-9][0-9]*)/([A-Z]{2,4}):(0|[1-9][0-9]*)(#[_a-z]+)? While converting device config While converting point filter_differential_pressure - protocol ref AI2.differential does not match expression bacnet://[1-9][0-9]*/([A-Z]{2,4}):([1-9][0-9]*)(#[_a-z]+)? + protocol ref AI2.differential does not match expression bacnet://(0|[1-9][0-9]*)/([A-Z]{2,4}):(0|[1-9][0-9]*)(#[_a-z]+)? diff --git a/udmis/build.gradle b/udmis/build.gradle index 96b90662fb..ce88919556 100644 --- a/udmis/build.gradle +++ b/udmis/build.gradle @@ -60,6 +60,7 @@ jacocoTestReport { checkstyle { ignoreFailures = false maxWarnings = 0 + configFile = file('../etc/checkstyle.xml') } checkstyleMain.source = 'src/main/java' diff --git a/udmis/config/checkstyle/checkstyle.xml b/udmis/config/checkstyle/checkstyle.xml deleted file mode 100644 index df33c6b25e..0000000000 --- a/udmis/config/checkstyle/checkstyle.xml +++ /dev/nulldiff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/IotAccessBase.java b/udmis/src/main/java/com/google/bos/udmi/service/access/IotAccessBase.java index 3e28c8ca2d..42d5717100 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/IotAccessBase.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/IotAccessBase.java @@ -6,7 +6,6 @@ import static com.google.udmi.util.GeneralUtils.ifNotNullThen; import static com.google.udmi.util.JsonUtil.isoConvert; import static com.google.udmi.util.JsonUtil.safeSleep; -import static com.google.udmi.util.JsonUtil.stringifyTerse; import static com.google.udmi.util.JsonUtil.toMap; import static java.lang.String.format; import static java.util.Objects.requireNonNull; diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/MessageDispatcher.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/MessageDispatcher.java index f054bc2b74..e478edef33 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/MessageDispatcher.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/MessageDispatcher.java @@ -2,12 +2,10 @@ import static com.google.udmi.util.GeneralUtils.ifNotNullGet; import static com.google.udmi.util.GeneralUtils.ifNotNullThen; -import static com.google.udmi.util.GeneralUtils.ifTrueGet; import com.google.bos.udmi.service.messaging.impl.MessageDispatcherImpl; import java.util.AbstractMap.SimpleEntry; import java.util.Collection; -import java.util.Map; import java.util.function.Consumer; import udmi.schema.EndpointConfiguration; import udmi.schema.Envelope; diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/FileMessagePipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/FileMessagePipe.java index 3afbd62333..ebea385d3f 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/FileMessagePipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/FileMessagePipe.java @@ -5,7 +5,6 @@ import static com.google.udmi.util.JsonUtil.JSON_EXT; import static com.google.udmi.util.JsonUtil.loadFileString; import static com.google.udmi.util.JsonUtil.safeSleep; -import static com.google.udmi.util.JsonUtil.toMap; import static com.google.udmi.util.JsonUtil.toStringMap; import static java.lang.String.format; diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/TraceMessagePipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/TraceMessagePipe.java index 8c37828da7..52568fd93b 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/TraceMessagePipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/TraceMessagePipe.java @@ -14,8 +14,6 @@ import com.google.bos.udmi.service.messaging.MessagePipe; import com.google.common.collect.ImmutableMap; -import com.google.udmi.util.GeneralUtils; -import com.google.udmi.util.JsonUtil; import java.io.File; import java.util.HashMap; import java.util.Map; diff --git a/udmis/src/main/java/com/google/bos/udmi/service/pod/ContainerBase.java b/udmis/src/main/java/com/google/bos/udmi/service/pod/ContainerBase.java index 9cda6b6aa7..fa25924777 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/pod/ContainerBase.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/pod/ContainerBase.java @@ -20,7 +20,6 @@ import com.google.bos.udmi.service.core.ComponentName; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.udmi.util.GeneralUtils; import com.google.udmi.util.JsonUtil; import java.io.PrintStream; import java.time.Duration; diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/EtcdDataProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/support/EtcdDataProvider.java index 421000f9a7..5d4ff36fb9 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/EtcdDataProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/EtcdDataProvider.java @@ -3,7 +3,6 @@ import static com.google.api.client.util.Preconditions.checkState; import static com.google.bos.udmi.service.core.DistributorPipe.clientId; import static com.google.udmi.util.GeneralUtils.CSV_JOINER; -import static com.google.udmi.util.GeneralUtils.encodeBase64; import static com.google.udmi.util.GeneralUtils.friendlyStackTrace; import static com.google.udmi.util.GeneralUtils.ifNotNullGet; import static com.google.udmi.util.GeneralUtils.isNullOrNotEmpty; diff --git a/udmis/src/test/java/com/google/bos/udmi/service/messaging/impl/LocalMessagePipeTest.java b/udmis/src/test/java/com/google/bos/udmi/service/messaging/impl/LocalMessagePipeTest.java index 52941d441a..3eeaad33bf 100644 --- a/udmis/src/test/java/com/google/bos/udmi/service/messaging/impl/LocalMessagePipeTest.java +++ b/udmis/src/test/java/com/google/bos/udmi/service/messaging/impl/LocalMessagePipeTest.java @@ -2,7 +2,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; import com.google.bos.udmi.service.messaging.StateUpdate; import com.google.bos.udmi.service.messaging.impl.MessageBase.Bundle; @@ -59,4 +58,4 @@ void publishUntyped() { assertThrows(Exception.class, () -> testSend(new BespokeObject()), "missing expected exception"); } -} \ No newline at end of file +} diff --git a/validator/build.gradle b/validator/build.gradle index e1d46d2f6e..d18cf638a1 100644 --- a/validator/build.gradle +++ b/validator/build.gradle @@ -59,6 +59,7 @@ jacocoTestReport { checkstyle { ignoreFailures = false maxWarnings = 0 + configFile = file('../etc/checkstyle.xml') } checkstyleMain.source = 'src/main/java' diff --git a/validator/config/checkstyle/checkstyle.xml b/validator/config/checkstyle/checkstyle.xml deleted file mode 100644 index df33c6b25e..0000000000 --- a/validator/config/checkstyle/checkstyle.xml +++ /dev/nulldiff --git a/validator/src/main/java/com/google/bos/iot/core/proxy/MockPublisher.java b/validator/src/main/java/com/google/bos/iot/core/proxy/MockPublisher.java index c6096d5bc3..2a17953c8e 100644 --- a/validator/src/main/java/com/google/bos/iot/core/proxy/MockPublisher.java +++ b/validator/src/main/java/com/google/bos/iot/core/proxy/MockPublisher.java @@ -5,7 +5,6 @@ import com.google.daq.mqtt.util.MessagePublisher; import com.google.daq.mqtt.validator.Validator; -import com.google.udmi.util.GeneralUtils; import com.google.udmi.util.JsonUtil; import com.google.udmi.util.SiteModel; import java.util.HashMap; diff --git a/validator/src/main/java/com/google/bos/iot/core/proxy/ProxyTarget.java b/validator/src/main/java/com/google/bos/iot/core/proxy/ProxyTarget.java index 5e26a218e1..a125f18066 100644 --- a/validator/src/main/java/com/google/bos/iot/core/proxy/ProxyTarget.java +++ b/validator/src/main/java/com/google/bos/iot/core/proxy/ProxyTarget.java @@ -11,7 +11,6 @@ import com.google.api.services.cloudiot.v1.model.Device; import com.google.cloud.ServiceOptions; import com.google.daq.mqtt.util.MessagePublisher; -import com.google.udmi.util.GeneralUtils; import java.time.Duration; import java.time.LocalDateTime; import java.util.Base64; diff --git a/validator/src/main/java/com/google/daq/mqtt/registrar/Summarizer.java b/validator/src/main/java/com/google/daq/mqtt/registrar/Summarizer.java index dfc4432a66..db228f5516 100644 --- a/validator/src/main/java/com/google/daq/mqtt/registrar/Summarizer.java +++ b/validator/src/main/java/com/google/daq/mqtt/registrar/Summarizer.java @@ -16,7 +16,6 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; -import java.util.stream.Collectors; import udmi.schema.CloudModel; abstract class Summarizer { diff --git a/validator/src/main/java/com/google/daq/mqtt/sequencer/Feature.java b/validator/src/main/java/com/google/daq/mqtt/sequencer/Feature.java index ba3a9850e7..72482e253b 100644 --- a/validator/src/main/java/com/google/daq/mqtt/sequencer/Feature.java +++ b/validator/src/main/java/com/google/daq/mqtt/sequencer/Feature.java @@ -4,8 +4,6 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -import java.lang.reflect.Method; -import java.util.function.Supplier; import udmi.schema.Bucket; import udmi.schema.FeatureDiscovery.FeatureStage; diff --git a/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceBase.java b/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceBase.java index e552d251d1..584cddff1a 100644 --- a/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceBase.java +++ b/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceBase.java @@ -365,7 +365,7 @@ private static void setupSequencer() { cloudRegion = exeConfig.cloud_region; registryId = SiteModel.getRegistryActual(exeConfig); - deviceMetadata = readDeviceMetadata(); + deviceMetadata = readDeviceMetadata(getDeviceId()); serialNo = ofNullable(exeConfig.serial_no) .orElseGet(() -> GeneralUtils.catchToNull(() -> deviceMetadata.system.serial_no)); @@ -438,9 +438,15 @@ static void resetState() { validationState = null; } - private static Metadata readDeviceMetadata() { - File moddataFile = siteModel.getSubdirectory(format(OUT_DEVICE_FORMAT, getDeviceId())); - File metadataFile = siteModel.getDeviceFile(getDeviceId(), METADATA_JSON); + /** + * Read device metadata from site model. + * + * @param deviceId deviceId + * @return Metadata + */ + public static Metadata readDeviceMetadata(String deviceId) { + File moddataFile = siteModel.getSubdirectory(format(OUT_DEVICE_FORMAT, deviceId)); + File metadataFile = siteModel.getDeviceFile(deviceId, METADATA_JSON); System.err.println("Checking for modified metadata file " + moddataFile.getAbsolutePath()); File useFile = moddataFile.exists() ? moddataFile : metadataFile; System.err.println("Reading device metadata file " + useFile.getPath()); diff --git a/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/GatewaySequences.java b/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/GatewaySequences.java index 6f5d94387b..cfdb66be13 100644 --- a/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/GatewaySequences.java +++ b/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/GatewaySequences.java @@ -1,6 +1,7 @@ package com.google.daq.mqtt.sequencer.sequences; import static com.google.common.collect.Sets.difference; +import static com.google.daq.mqtt.util.ConfigManager.configFrom; import static com.google.daq.mqtt.util.TimePeriodConstants.TWO_MINUTES_MS; import static com.google.udmi.util.GeneralUtils.CSV_JOINER; import static java.lang.String.format; @@ -28,7 +29,7 @@ import udmi.schema.Config; import udmi.schema.Envelope.SubFolder; import udmi.schema.FeatureDiscovery.FeatureStage; -import udmi.schema.PointsetConfig; +import udmi.schema.Metadata; /** * Specific tests for logical gateway devices. This is not the same as proxied devices (devices that @@ -108,9 +109,8 @@ private void updateProxyConfig(String proxyId) { } private Config makeDefaultConfig(String id) { - Config config = new Config(); - config.pointset = new PointsetConfig(); - return config; + Metadata proxyMetadata = readDeviceMetadata(id); + return configFrom(proxyMetadata).deviceConfig(); } private Set receivedDevices(Set proxyIds, SubFolder subFolder) { diff --git a/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/ProxiedSequences.java b/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/ProxiedSequences.java index b3a4dc85da..21cc402134 100644 --- a/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/ProxiedSequences.java +++ b/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/ProxiedSequences.java @@ -21,8 +21,6 @@ import udmi.schema.Level; import udmi.schema.PointPointsetConfig; -; - /** * Validate pointset related functionality for proxied devices. */ diff --git a/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/SystemSequences.java b/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/SystemSequences.java index ae8bfd7070..0b0ce45420 100644 --- a/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/SystemSequences.java +++ b/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/SystemSequences.java @@ -5,7 +5,6 @@ import static java.lang.String.format; import static udmi.schema.Bucket.SYSTEM; import static udmi.schema.FeatureDiscovery.FeatureStage.ALPHA; -import static udmi.schema.FeatureDiscovery.FeatureStage.BETA; import static udmi.schema.FeatureDiscovery.FeatureStage.STABLE; import com.google.daq.mqtt.sequencer.Feature; diff --git a/validator/src/main/java/com/google/daq/mqtt/util/Diagnoser.java b/validator/src/main/java/com/google/daq/mqtt/util/Diagnoser.java index a8fe50a079..618d6f8000 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/Diagnoser.java +++ b/validator/src/main/java/com/google/daq/mqtt/util/Diagnoser.java @@ -1,11 +1,8 @@ package com.google.daq.mqtt.util; -import com.google.udmi.util.JsonUtil; import com.google.udmi.util.SiteModel; -import java.io.File; import java.util.ArrayList; import java.util.List; -import udmi.schema.ExecutionConfiguration; /** * Simple utility to diagnose basic configuration problems. diff --git a/validator/src/main/java/com/google/daq/mqtt/util/ImpulseRunningAverage.java b/validator/src/main/java/com/google/daq/mqtt/util/ImpulseRunningAverage.java index 4666fbab9d..0958fb9a1a 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/ImpulseRunningAverage.java +++ b/validator/src/main/java/com/google/daq/mqtt/util/ImpulseRunningAverage.java @@ -1,7 +1,5 @@ package com.google.daq.mqtt.util; -import static java.lang.String.format; - /** * Basic class to gather and manage time statistics. */ diff --git a/validator/src/main/java/com/google/daq/mqtt/util/ObjectDiffEngine.java b/validator/src/main/java/com/google/daq/mqtt/util/ObjectDiffEngine.java index f8173ca547..37f95dd6e0 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/ObjectDiffEngine.java +++ b/validator/src/main/java/com/google/daq/mqtt/util/ObjectDiffEngine.java @@ -16,7 +16,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import udmi.schema.State; @@ -240,4 +239,4 @@ public void mapSemanticKey(String keyPath, String keyName, String description, public boolean isInitialized() { return previous != null; } -} \ No newline at end of file +} diff --git a/validator/src/main/java/com/google/daq/mqtt/util/PubSubClient.java b/validator/src/main/java/com/google/daq/mqtt/util/PubSubClient.java index ba7f02b1c1..6b1f2c5dd9 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/PubSubClient.java +++ b/validator/src/main/java/com/google/daq/mqtt/util/PubSubClient.java @@ -20,7 +20,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.google.api.client.util.Base64; -import com.google.api.core.ApiFuture; import com.google.api.gax.rpc.NotFoundException; import com.google.bos.iot.core.proxy.IotReflectorClient; import com.google.cloud.Tuple; diff --git a/validator/src/main/java/com/google/daq/mqtt/util/providers/BacnetFamilyProvider.java b/validator/src/main/java/com/google/daq/mqtt/util/providers/BacnetFamilyProvider.java index 2e63b73f0e..7523275fd2 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/providers/BacnetFamilyProvider.java +++ b/validator/src/main/java/com/google/daq/mqtt/util/providers/BacnetFamilyProvider.java @@ -13,9 +13,9 @@ */ public class BacnetFamilyProvider implements FamilyProvider { - private static final Pattern BACNET_ADDR = Pattern.compile("[1-9][0-9]*"); + private static final Pattern BACNET_ADDR = Pattern.compile("0|[1-9][0-9]*"); private static final Pattern BACNET_REF = Pattern.compile( - "bacnet://[1-9][0-9]*/([A-Z]{2,4}):([1-9][0-9]*)(#[_a-z]+)?"); + "bacnet://(0|[1-9][0-9]*)/([A-Z]{2,4}):(0|[1-9][0-9]*)(#[_a-z]+)?"); @Override public String familyKey() { diff --git a/validator/src/main/java/com/google/daq/mqtt/validator/ReportingDevice.java b/validator/src/main/java/com/google/daq/mqtt/validator/ReportingDevice.java index 1d86939cbd..f85996f04b 100644 --- a/validator/src/main/java/com/google/daq/mqtt/validator/ReportingDevice.java +++ b/validator/src/main/java/com/google/daq/mqtt/validator/ReportingDevice.java @@ -2,15 +2,11 @@ import static com.google.udmi.util.Common.SUBFOLDER_PROPERTY_KEY; import static com.google.udmi.util.Common.SUBTYPE_PROPERTY_KEY; -import static com.google.udmi.util.GeneralUtils.ifNotNullThen; -import static com.google.udmi.util.GeneralUtils.ifTrueThen; import static com.google.udmi.util.JsonUtil.convertTo; import static java.lang.String.format; import static java.util.Optional.ofNullable; import static org.junit.Assert.assertTrue; -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableSet; import com.google.daq.mqtt.util.ExceptionList; import com.google.daq.mqtt.util.ValidationException; import com.google.udmi.util.Common; @@ -23,15 +19,10 @@ import java.util.Set; import java.util.stream.Collectors; import udmi.schema.Category; -import udmi.schema.DiscoveryEvents; -import udmi.schema.DiscoveryState; import udmi.schema.Entry; import udmi.schema.Envelope.SubType; import udmi.schema.Level; import udmi.schema.Metadata; -import udmi.schema.PointsetEvents; -import udmi.schema.PointsetState; -import udmi.schema.State; /** * Encapsulation of device data for a basic reporting device. diff --git a/validator/src/main/java/com/google/udmi/util/PubSubReflector.java b/validator/src/main/java/com/google/udmi/util/PubSubReflector.java index 01b50c6288..c4c805d28e 100644 --- a/validator/src/main/java/com/google/udmi/util/PubSubReflector.java +++ b/validator/src/main/java/com/google/udmi/util/PubSubReflector.java @@ -6,7 +6,6 @@ import static com.google.udmi.util.Common.CATEGORY_PROPERTY_KEY; import static com.google.udmi.util.Common.DEVICE_ID_KEY; import static com.google.udmi.util.Common.PUBLISH_TIME_KEY; -import static com.google.udmi.util.Common.REGISTRY_ID_PROPERTY_KEY; import static com.google.udmi.util.Common.SOURCE_KEY; import static com.google.udmi.util.Common.SOURCE_SEPARATOR; import static com.google.udmi.util.Common.SOURCE_SEPARATOR_REGEX; @@ -29,7 +28,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.google.api.client.util.Base64; -import com.google.api.core.ApiFuture; import com.google.api.gax.rpc.NotFoundException; import com.google.bos.iot.core.proxy.IotReflectorClient; import com.google.cloud.pubsub.v1.AckReplyConsumer; diff --git a/validator/src/test/java/com/google/daq/mqtt/mapping/MappingAgentTest.java b/validator/src/test/java/com/google/daq/mqtt/mapping/MappingAgentTest.java index 0e725cfdc2..57c8e3781a 100644 --- a/validator/src/test/java/com/google/daq/mqtt/mapping/MappingAgentTest.java +++ b/validator/src/test/java/com/google/daq/mqtt/mapping/MappingAgentTest.java @@ -2,7 +2,6 @@ import static com.google.daq.mqtt.TestCommon.GATEWAY_ID; import static com.google.daq.mqtt.TestCommon.REGISTRY_ID; -import static com.google.daq.mqtt.TestCommon.SITE_DIR; import static com.google.udmi.util.SiteModel.MOCK_PROJECT; import static org.junit.Assert.assertEquals; @@ -41,4 +40,4 @@ private ExecutionConfiguration getExecutionConfig() { executionConfiguration.site_name = REGISTRY_ID; return executionConfiguration; } -} \ No newline at end of file +} diff --git a/validator/src/test/java/com/google/daq/mqtt/util/BacnetFamilyProviderTest.java b/validator/src/test/java/com/google/daq/mqtt/util/BacnetFamilyProviderTest.java index 6290d88f27..6e75458a99 100644 --- a/validator/src/test/java/com/google/daq/mqtt/util/BacnetFamilyProviderTest.java +++ b/validator/src/test/java/com/google/daq/mqtt/util/BacnetFamilyProviderTest.java @@ -17,13 +17,14 @@ */ public class BacnetFamilyProviderTest { - public static final Set GOOD_ADDRS = ImmutableSet.of("10", "23", "9273123"); + public static final Set GOOD_ADDRS = ImmutableSet.of("0", "10", "23", "4194302"); public static final Set BAD_ADDRS = ImmutableSet.of( "", "x", "snoop", "01293", "0x9122", "B87AC9", "87a8c"); public static final Set GOOD_REFERENCES = ImmutableSet.of( "bacnet://291842/AI:2#present_value", "bacnet://29212/AI:2#something_else", + "bacnet://0/DO:0", "bacnet://1/AI:2", "bacnet://291842/BO:21"); @@ -71,4 +72,4 @@ public void bacnet_addr_validation() { .filter(GeneralUtils::isNotEmpty).toList(); assertEquals("Not enough validation errors", BAD_ADDRS.size(), badErrors.size()); } -} \ No newline at end of file +} diff --git a/validator/src/test/java/com/google/daq/mqtt/util/RunningAverageBaseTest.java b/validator/src/test/java/com/google/daq/mqtt/util/RunningAverageBaseTest.java index 45678be0de..8d04ab76fd 100644 --- a/validator/src/test/java/com/google/daq/mqtt/util/RunningAverageBaseTest.java +++ b/validator/src/test/java/com/google/daq/mqtt/util/RunningAverageBaseTest.java @@ -2,7 +2,6 @@ import static org.junit.Assert.assertEquals; -import com.google.common.util.concurrent.AtomicDouble; import org.junit.Test; /** @@ -30,4 +29,4 @@ public void sampleTest() { runningAverage.update(10, 0); assertEquals("zero ten", 0.15977278341120008, runningAverage.get(), ASSERT_DELTA); } -} \ No newline at end of file +} diff --git a/validator/src/test/java/com/google/daq/mqtt/validator/PlaybackTest.java b/validator/src/test/java/com/google/daq/mqtt/validator/PlaybackTest.java index 414cf96fe7..4b7f02c7a8 100644 --- a/validator/src/test/java/com/google/daq/mqtt/validator/PlaybackTest.java +++ b/validator/src/test/java/com/google/daq/mqtt/validator/PlaybackTest.java @@ -1,7 +1,6 @@ package com.google.daq.mqtt.validator; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static udmi.schema.Level.INFO; import com.google.daq.mqtt.TestCommon;