Skip to content

Commit

Permalink
feat: Add device up/down detection
Browse files Browse the repository at this point in the history
Signed-off-by: FelixTing <felix@iotechsys.com>
  • Loading branch information
FelixTing committed Jan 7, 2025
1 parent 0d02b50 commit c8f10ab
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 11 deletions.
18 changes: 16 additions & 2 deletions internal/application/command.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2020-2023 IOTech Ltd
// Copyright (C) 2020-2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down Expand Up @@ -44,6 +44,7 @@ func GetCommand(ctx context.Context, deviceName string, commandName string, quer

device, err := validateServiceAndDeviceState(deviceName, dic)
if err != nil {
DeviceRequestFailed(deviceName, dic)
return nil, errors.NewCommonEdgeXWrapper(err)
}

Expand All @@ -58,8 +59,10 @@ func GetCommand(ctx context.Context, deviceName string, commandName string, quer
}

if err != nil {
DeviceRequestFailed(deviceName, dic)
return nil, errors.NewCommonEdgeXWrapper(err)
}
DeviceRequestSucceeded(device, dic)

lc := bootstrapContainer.LoggingClientFrom(dic.Get)
lc.Debugf("GET Device Command successfully. Device: %s, Source: %s, %s: %s", deviceName, commandName, common.CorrelationHeader, utils.FromContext(ctx, common.CorrelationHeader))
Expand All @@ -78,6 +81,7 @@ func SetCommand(ctx context.Context, deviceName string, commandName string, quer

device, err := validateServiceAndDeviceState(deviceName, dic)
if err != nil {
DeviceRequestFailed(deviceName, dic)
return nil, errors.NewCommonEdgeXWrapper(err)
}

Expand All @@ -90,8 +94,10 @@ func SetCommand(ctx context.Context, deviceName string, commandName string, quer
}

if err != nil {
DeviceRequestFailed(deviceName, dic)
return nil, errors.NewCommonEdgeXWrapper(err)
}
DeviceRequestSucceeded(device, dic)

lc := bootstrapContainer.LoggingClientFrom(dic.Get)
lc.Debugf("SET Device Command successfully. Device: %s, Source: %s, %s: %s", deviceName, commandName, common.CorrelationHeader, utils.FromContext(ctx, common.CorrelationHeader))
Expand Down Expand Up @@ -447,8 +453,16 @@ func validateServiceAndDeviceState(deviceName string, dic *di.Container) (models
return models.Device{}, errors.NewCommonEdgeX(errors.KindServiceLocked, fmt.Sprintf("device %s locked", device.Name), nil)
}
// check device's OperatingState
// if it's a device return attempt, operating state is allowed to be DOWN
if device.OperatingState == models.Down {
return models.Device{}, errors.NewCommonEdgeX(errors.KindServiceLocked, fmt.Sprintf("device %s OperatingState is DOWN", device.Name), nil)
err := errors.NewCommonEdgeX(errors.KindServiceLocked, fmt.Sprintf("device %s OperatingState is DOWN", device.Name), nil)
config := container.ConfigurationFrom(dic.Get)
if config.Device.AllowedFails == 0 || config.Device.DevDownTime == 0 {
return models.Device{}, err
}
if device.Retries > 0 {
return models.Device{}, err
}
}

// check device's ProfileName
Expand Down
107 changes: 107 additions & 0 deletions internal/application/opstate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package application

import (
"context"
"time"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v4/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v4/di"

"github.com/edgexfoundry/go-mod-core-contracts/v4/common"
"github.com/edgexfoundry/go-mod-core-contracts/v4/models"

"github.com/edgexfoundry/device-sdk-go/v4/internal/cache"
sdkCommon "github.com/edgexfoundry/device-sdk-go/v4/internal/common"
"github.com/edgexfoundry/device-sdk-go/v4/internal/container"
)

func deviceReturn(deviceName string, dic *di.Container) {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
dc := bootstrapContainer.DeviceClientFrom(dic.Get)
config := container.ConfigurationFrom(dic.Get)

for {
time.Sleep(time.Duration(config.Device.DevDownTime) * time.Second)
lc.Infof("Checking operational state for device: %s", deviceName)

d, found := cache.Devices().ForName(deviceName)
if !found {
lc.Warnf("Device %s not found. Exiting retry loop.", deviceName)
return
}

if d.OperatingState == models.Up {
lc.Infof("Device %s is already operational. Exiting retry loop.", deviceName)
return
}

p, found := cache.Profiles().ForName(d.ProfileName)
if !found {
lc.Warnf("Device %s has no profile. Cannot set operational state automatically.", deviceName)
return
}

for _, dr := range p.DeviceResources {
if dr.Properties.ReadWrite == common.ReadWrite_R ||
dr.Properties.ReadWrite == common.ReadWrite_RW ||
dr.Properties.ReadWrite == common.ReadWrite_WR {
_, err := GetCommand(context.Background(), deviceName, dr.Name, "", true, dic)
if err == nil {
lc.Infof("Device %s responsive: setting operational state to up.", deviceName)
sdkCommon.UpdateOperatingState(deviceName, models.Up, lc, dc)
return
} else {
lc.Errorf("Device %s unresponsive: retrying in %v seconds.", deviceName, config.Device.DevDownTime)
go deviceReturn(deviceName, dic)
return
}
}
}
lc.Warnf("Device %s has no readable resources, cannot be set operational automatically.", deviceName)
return
}
}

func DeviceRequestFailed(deviceName string, dic *di.Container) {
config := container.ConfigurationFrom(dic.Get)
if config.Device.AllowedFails > 0 {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
dc := bootstrapContainer.DeviceClientFrom(dic.Get)

cache.Devices().DecreaseRetriesByName(deviceName)
d, ok := cache.Devices().ForName(deviceName)
if !ok {
return
}

if d.Retries == 0 {
if d.OperatingState != models.Down {
lc.Infof("Marking device %s non-operational", deviceName)
sdkCommon.UpdateOperatingState(deviceName, models.Down, lc, dc)
}
if config.Device.DevDownTime > 0 {
lc.Warnf("Will retry device %s in %v seconds", deviceName, config.Device.DevDownTime)
go deviceReturn(deviceName, dic)
}
return
}
}
}

func DeviceRequestSucceeded(d models.Device, dic *di.Container) {
config := container.ConfigurationFrom(dic.Get)
if config.Device.AllowedFails > 0 && d.Retries < int(config.Device.AllowedFails) {
cache.Devices().ResetRetriesByName(d.Name)
if d.OperatingState == models.Down {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
dc := bootstrapContainer.DeviceClientFrom(dic.Get)
sdkCommon.UpdateOperatingState(d.Name, models.Up, lc, dc)
}
}
}
2 changes: 1 addition & 1 deletion internal/autoevent/executor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2019-2023 IOTech Ltd
// Copyright (C) 2019-2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down
48 changes: 41 additions & 7 deletions internal/cache/devices.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2020-2023 IOTech Ltd
// Copyright (C) 2020-2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand All @@ -12,6 +12,7 @@ import (
"sync"
"time"

"github.com/edgexfoundry/device-sdk-go/v4/internal/container"
bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v4/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v4/di"
"github.com/edgexfoundry/go-mod-core-contracts/v4/errors"
Expand All @@ -38,6 +39,8 @@ type DeviceCache interface {
UpdateAdminState(name string, state models.AdminState) errors.EdgeX
SetLastConnectedByName(name string)
GetLastConnectedByName(name string) int64
DecreaseRetriesByName(name string)
ResetRetriesByName(name string)
}

type deviceCache struct {
Expand All @@ -50,13 +53,12 @@ type deviceCache struct {
func newDeviceCache(devices []models.Device, dic *di.Container) DeviceCache {
defaultSize := len(devices)
dMap := make(map[string]*models.Device, defaultSize)
for i, d := range devices {
dMap[d.Name] = &devices[i]
}

config := container.ConfigurationFrom(dic.Get)
dc = &deviceCache{deviceMap: dMap, dic: dic}
lastConnectedMetrics := make(map[string]gometrics.Gauge)
for _, d := range devices {
d.Retries = int(config.Device.AllowedFails)
dMap[d.Name] = &d
deviceMetric := gometrics.NewGauge()
registerMetric(d.Name, deviceMetric, dic)
lastConnectedMetrics[d.Name] = deviceMetric
Expand Down Expand Up @@ -118,6 +120,9 @@ func (d *deviceCache) Add(device models.Device) errors.EdgeX {
d.mutex.Lock()
defer d.mutex.Unlock()

config := container.ConfigurationFrom(d.dic.Get)
device.Retries = int(config.Device.AllowedFails)

return d.add(device)
}

Expand All @@ -141,8 +146,11 @@ func (d *deviceCache) Update(device models.Device) errors.EdgeX {
d.mutex.Lock()
defer d.mutex.Unlock()

if err := d.removeByName(device.Name); err != nil {
return err
if olddev, exists := d.deviceMap[device.Name]; exists {
device.Retries = olddev.Retries
if err := d.removeByName(device.Name); err != nil {
return err
}
}
return d.add(device)
}
Expand Down Expand Up @@ -223,3 +231,29 @@ func (d *deviceCache) GetLastConnectedByName(name string) int64 {
g := d.lastConnected[name]
return g.Value()
}

func (d *deviceCache) DecreaseRetriesByName(name string) {
d.mutex.Lock()
defer d.mutex.Unlock()

device, ok := d.deviceMap[name]
if !ok {
return
}

if device.Retries >= 0 {
device.Retries--
}
}

func (d *deviceCache) ResetRetriesByName(name string) {
d.mutex.Lock()
defer d.mutex.Unlock()

device, ok := d.deviceMap[name]
if !ok {
return
}
config := container.ConfigurationFrom(d.dic.Get)
device.Retries = int(config.Device.AllowedFails)
}
6 changes: 5 additions & 1 deletion internal/config/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// -*- mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2017-2018 Canonical Ltd
// Copyright (C) 2018-2023 IOTech Ltd
// Copyright (C) 2018-2025 IOTech Ltd
// Copyright (c) 2021 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
Expand Down Expand Up @@ -53,6 +53,10 @@ type DeviceInfo struct {
EnableAsyncReadings bool
// Labels are properties applied to the device service to help with searching
Labels []string
// AllowedFails specifies the number of failed requests allowed before a device is marked as down.
AllowedFails uint
// DevDownTime specifies the duration in seconds that the Device Service will try to contact a device if it is marked as down.
DevDownTime uint
}

// DiscoveryInfo is a struct which contains configuration of device auto discovery.
Expand Down

0 comments on commit c8f10ab

Please sign in to comment.