Skip to content

Commit

Permalink
Fix mqtt connection issues (#1052)
Browse files Browse the repository at this point in the history
  • Loading branch information
MertCingoz authored Dec 18, 2024
1 parent 66c7673 commit 77883a9
Show file tree
Hide file tree
Showing 57 changed files with 272 additions and 950 deletions.
17 changes: 8 additions & 9 deletions bin/container
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ git_prefix=${git_url%.*}
git_prefix=${git_prefix#*.com/}
git_repo=${git_prefix#*.com:}

GCP_PROJECT=$(gcloud config get project)
GCP_PROJECT=$(gcloud config get project) || true
REPOSITORY=ghcr.io/${git_repo}
TEMPLATES=$(cd etc; ls k8s_*.yaml) || true

Expand Down Expand Up @@ -100,7 +100,7 @@ if [[ -n $prep ]]; then

LIBFILE=build/libs/*-1.0-SNAPSHOT-all.jar
build_time=`date --utc -Imin -r $LIBFILE` || true
[[ -n $build_time ]] || build_time=unknown
[[ -n $build_time ]] || build_time="1970-01-01T00:00:00Z"

cat <<EOF > var/deployed_version.json
{
Expand All @@ -116,14 +116,13 @@ EOF
fi

if [[ -n $build ]]; then
echo Cleaning old images...
images=$(docker images | fgrep $REPOSITORY | awk '{print $1":"$2}' | fgrep -v '<none>') || true
[[ -n $images ]] && docker rmi $images || true

echo Building Dockerfile.$target
echo docker build -f Dockerfile.$target -t $target .
docker build -f Dockerfile.$target -t $target .
docker tag $target $udmi_ref
echo docker build -f Dockerfile.$target -t $udmi_ref .
docker build -f Dockerfile.$target -t $udmi_ref -t $target .

echo Cleaning dangling images...
images=$(docker images -f "dangling=true" -q)
[[ -n $images ]] && docker rmi $images
fi

if [[ -n $push ]]; then
Expand Down
11 changes: 7 additions & 4 deletions common/src/main/java/com/google/udmi/util/CertManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class CertManager {
private final File keyFile;
private final File crtFile;
private final char[] password;
private final boolean isSsl;
private final Transport transport;

{
Security.addProvider(new BouncyCastleProvider());
Expand All @@ -60,9 +60,9 @@ public class CertManager {
public CertManager(File caCrtFile, File clientDir, Transport transport,
String passString, Consumer<String> logging) {
this.caCrtFile = caCrtFile;
isSsl = Transport.SSL.equals(transport);
this.transport = transport;

if (isSsl) {
if (Transport.SSL.equals(transport)) {
String prefix = keyPrefix(clientDir);
crtFile = new File(clientDir, prefix + "_private.crt");
keyFile = new File(clientDir, prefix + "_private.pem");
Expand Down Expand Up @@ -144,7 +144,10 @@ public SSLSocketFactory getCertSocketFactory() throws Exception {
*/
public SocketFactory getSocketFactory() {
try {
if (!isSsl) {
if (!Transport.SSL.equals(transport)) {
if (Transport.TCP.equals(transport)) {
return SocketFactory.getDefault();
}
return SSLSocketFactory.getDefault();
}
return getCertSocketFactory();
Expand Down
1 change: 1 addition & 0 deletions contrib/mango/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
data/
11 changes: 2 additions & 9 deletions contrib/mango/mango_docker
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
#!/bin/sh
set -o errexit -o nounset

cd "$(dirname "$0")"/mount

MANGO_VERSION=5.2.0
curl -sSL https://store.mango-os.com/downloads/m2m2-udmi-${MANGO_VERSION}.zip -o udmi.zip
rm -rf udmi && unzip -q udmi.zip -d udmi && rm udmi.zip

docker run --rm \
-p 8080:8080 \
-p 8443:8443 \
-v ./data:/opt/mango-data \
-v ./udmi:/opt/mango/web/modules/udmi \
ghcr.io/radixiot/mango:${MANGO_VERSION}
-v "$(dirname "$0")"/data:/opt/mango-data \
ghcr.io/radixiot/mango:latest
1 change: 0 additions & 1 deletion contrib/mango/mount/.gitignore

This file was deleted.

2 changes: 1 addition & 1 deletion etc/test_itemized.out
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
1 RESULT fail gateway gateway_proxy_events BETA 0/10 Failed waiting until All proxy devices received pointset
1 RESULT pass gateway gateway_proxy_state PREVIEW 10/10 Sequence complete
1 RESULT pass gateway gateway_proxy_state PREVIEW 10/10 Sequence complete
1 RESULT fail discovery.scan scan_single_future PREVIEW 0/10 Failed check that discovery events were valid: bacnet scan_addr snoozle does not match expression [1-9][0-9]*
1 RESULT fail discovery.scan scan_single_future PREVIEW 0/10 Failed check that discovery events were valid: bacnet scan_addr snoozle does not match expression 0|[1-9][0-9]*
1 RESULT pass discovery.scan scan_periodic_now_enumerate PREVIEW 10/10 Sequence complete
4 RESULT pass gateway gateway_proxy_events BETA 10/10 Sequence complete
1 RESULT pass gateway gateway_proxy_events BETA 10/10 Sequence complete
Expand Down
52 changes: 36 additions & 16 deletions etc/validator.out
Original file line number Diff line number Diff line change
Expand Up @@ -339,15 +339,15 @@ sites/udmi_site_model/out/devices/AHU-22/state.out
"sub_folder" : "update",
"sub_type" : "state",
"status" : {
"message" : "missing pointset subblock",
"detail" : "While validating pointset message at REDACTED_TIMESTAMP",
"message" : "1 schema violations found",
"detail" : "state_update: 1 schema violations found; /system: object has missing required properties ([\"serial_no\"])",
"category" : "validation.device.schema",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
},
"errors" : [ {
"message" : "missing pointset subblock",
"detail" : "While validating pointset message at REDACTED_TIMESTAMP",
"message" : "1 schema violations found",
"detail" : "state_update: 1 schema violations found; /system: object has missing required properties ([\"serial_no\"])",
"category" : "validation.device.schema",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
Expand Down Expand Up @@ -382,12 +382,19 @@ sites/udmi_site_model/out/devices/AHU-22/state_system.out
"sub_folder" : "system",
"sub_type" : "state",
"status" : {
"message" : "Successful validation",
"category" : "validation.device.receive",
"message" : "1 schema violations found",
"detail" : "state_system: 1 schema violations found; object has missing required properties ([\"serial_no\"])",
"category" : "validation.device.schema",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 200
"level" : 500
},
"errors" : [ ]
"errors" : [ {
"message" : "1 schema violations found",
"detail" : "state_system: 1 schema violations found; object has missing required properties ([\"serial_no\"])",
"category" : "validation.device.schema",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
} ]
}
::::::::::::::
sites/udmi_site_model/out/devices/GAT-123/config.out
Expand Down Expand Up @@ -787,14 +794,20 @@ sites/udmi_site_model/out/devices/SNS-4/state.out
"sub_folder" : "update",
"sub_type" : "state",
"status" : {
"message" : "missing pointset subblock",
"detail" : "While validating pointset message at REDACTED_TIMESTAMP",
"category" : "validation.device.schema",
"message" : "Multiple validation errors",
"detail" : "1 schema violations found; Device has missing points: split_threshold, triangulating_axis",
"category" : "validation.device.multiple",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
},
"errors" : [ {
"message" : "missing pointset subblock",
"message" : "1 schema violations found",
"detail" : "state_update: 1 schema violations found; /system: object has missing required properties ([\"serial_no\"])",
"category" : "validation.device.schema",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
}, {
"message" : "Device has missing points: split_threshold, triangulating_axis",
"detail" : "While validating pointset message at REDACTED_TIMESTAMP",
"category" : "validation.device.schema",
"timestamp" : "REDACTED_TIMESTAMP",
Expand Down Expand Up @@ -837,10 +850,17 @@ sites/udmi_site_model/out/devices/SNS-4/state_system.out
"sub_folder" : "system",
"sub_type" : "state",
"status" : {
"message" : "Successful validation",
"category" : "validation.device.receive",
"message" : "1 schema violations found",
"detail" : "state_system: 1 schema violations found; object has missing required properties ([\"serial_no\"])",
"category" : "validation.device.schema",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 200
"level" : 500
},
"errors" : [ ]
"errors" : [ {
"message" : "1 schema violations found",
"detail" : "state_system: 1 schema violations found; object has missing required properties ([\"serial_no\"])",
"category" : "validation.device.schema",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
} ]
}
17 changes: 10 additions & 7 deletions pubber/src/main/java/daq/pubber/Pubber.java
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,10 @@ private boolean attemptConnection() {
isConnected = false;
deviceManager.stop();
super.stop();
if (deviceTarget == null) {
throw new RuntimeException("Mqtt publisher not initialized");
if (deviceTarget == null || !deviceTarget.isActive()) {
error("Mqtt publisher not active");
disconnectMqtt();
initializeMqtt();
}
registerMessageHandlers();
connect();
Expand All @@ -455,10 +457,12 @@ public void shutdown() {
deviceState.system.operation.mode = SystemMode.SHUTDOWN;
}

super.shutdown();
ifNotNullThen(deviceManager, dm -> captureExceptions("device manager shutdown", dm::shutdown));
captureExceptions("publishing shutdown state", this::publishSynchronousState);
captureExceptions("disconnecting mqtt", this::disconnectMqtt);
if (isConnected()) {
captureExceptions("Publishing shutdown state", this::publishSynchronousState);
}
ifNotNullThen(deviceManager, dm -> captureExceptions("Device manager shutdown", dm::shutdown));
captureExceptions("Pubber sender shutdown", super::shutdown);
captureExceptions("Disconnecting mqtt", this::disconnectMqtt);
}

@Override
Expand Down Expand Up @@ -536,7 +540,6 @@ public void persistEndpoint(EndpointConfiguration endpoint) {
writePersistentStore();
}


@Override
public void resetConnection(String targetEndpoint) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import udmi.lib.base.ManagerBase;
import udmi.lib.client.LocalnetManager;
import udmi.lib.intf.ManagerHost;
import udmi.schema.DiscoveryEvents;
import udmi.schema.FamilyLocalnetState;
import udmi.schema.Level;
import udmi.schema.Metadata;
import udmi.schema.PointPointsetModel;
Expand All @@ -30,9 +32,10 @@
/**
* Provides for the bacnet family of stuffs.
*/
public class BacnetProvider extends ManagerBase implements PubberFamilyProvider {
public class PubberBacnetProvider extends ManagerBase implements PubberFamilyProvider {

private static final int BACNET_DISCOVERY_RATE_SEC = 1;
private final LocalnetManager localnetHost;
private final Deque<String> toReport = new ArrayDeque<>();
private Map<String, Metadata> bacnetDevices;
private BiConsumer<String, DiscoveryEvents> publisher;
Expand All @@ -41,10 +44,16 @@ public class BacnetProvider extends ManagerBase implements PubberFamilyProvider
/**
* Provider for metadata-based (simulated) bacnet discovery.
*/
public BacnetProvider(ManagerHost host, String family, String deviceId) {
public PubberBacnetProvider(ManagerHost host, String family, String deviceId) {
super(host, deviceId);
checkState(family.equals(BACNET));
updateInterval(BACNET_DISCOVERY_RATE_SEC);
localnetHost = ((LocalnetManager) host);
}

private void addStateMapEntry() {
FamilyLocalnetState stateEntry = new FamilyLocalnetState();
stateEntry.addr = getBacnetAddr(getMetadata(deviceId));
localnetHost.update(BACNET, stateEntry);
}

@Override
Expand Down Expand Up @@ -87,12 +96,12 @@ public synchronized void startScan(boolean enumerate,
toReport.addAll(bacnetDevices.keySet());
this.publisher = publisher;
this.enumerate = enumerate;
startPeriodicSend();
updateInterval(BACNET_DISCOVERY_RATE_SEC);
}

@Override
public synchronized void stopScan() {
cancelPeriodicSend();
super.stop();
}

@Override
Expand All @@ -117,5 +126,6 @@ public void setSiteModel(SiteModel siteModel) {
bacnetDevices = siteModel.allMetadata().entrySet().stream()
.filter(entry -> nonNull(getBacnetAddr(entry.getValue())))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
addStateMapEntry();
}
}
3 changes: 2 additions & 1 deletion pubber/src/main/java/daq/pubber/PubberGatewayManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public void setMetadata(Metadata metadata) {

@Override
public void activate() {
ifNotNullThen(proxyDevices, p -> p.values().forEach(ProxyDeviceHost::activate));
ifNotNullThen(proxyDevices, p -> p.values()
.parallelStream().forEach(ProxyDeviceHost::activate));
}

@Override
Expand Down
23 changes: 22 additions & 1 deletion pubber/src/main/java/daq/pubber/PubberIpProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,27 @@ static String getDefaultInterfaceStatic(List<String> routeLines) {
return currentInterface.get();
}

/**
* Try to use first match per family.
*
* <pre>
* ip addr show eth0
* 149: eth0@if150: *BROADCAST,MULTICAST,UP,LOWER_UP,M-DOWN> mtu 1500 qdisc noqueue state UP
* link/ether 02:42:ac:11:00:10 brd ff:ff:ff:ff:ff:ff
* inet 10.0.0.10/24 brd 10.0.0.255 scope global eth0
* valid_lft forever preferred_lft forever
* inet6 fd00:1234:abc:1::10/120 scope global flags 02
* valid_lft forever preferred_lft forever
* inet6 fe80::42:acff:fe11:10/64 scope link
* valid_lft forever preferred_lft forever
*
* ip -6 route
* fd00:1234:abc:1::/120 dev eth0 metric 256
* fe80::/64 dev eth0 metric 256
* default via fd00:1234:abc:1::1 dev eth0 metric 1024
* multicast ff00::/8 dev eth0 metric 256
* </pre>
*/
@VisibleForTesting
static Map<String, String> getInterfaceAddressesStatic(List<String> strings) {
Map<String, String> interfaceMap = new HashMap<>();
Expand All @@ -83,7 +104,7 @@ static Map<String, String> getInterfaceAddressesStatic(List<String> strings) {
Matcher matcher = pattern.matcher(line);
if (matcher.matches()) {
String family = IFACE_MAP.get(matcher.group(1));
interfaceMap.put(family, matcher.group(2));
interfaceMap.putIfAbsent(family, matcher.group(2));
}
}
});
Expand Down
Loading

0 comments on commit 77883a9

Please sign in to comment.