Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add device up/down detection #1672

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion internal/application/callback.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2020-2023 IOTech Ltd
// Copyright (C) 2020-2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down Expand Up @@ -96,6 +96,10 @@ func AddDevice(addDeviceRequest requests.AddDeviceRequest, dic *di.Container) er
return errors.NewCommonEdgeX(errors.KindServerError, errMsg, err)
}

config := container.ConfigurationFrom(dic.Get)
reqFailsTracker := container.AllowedRequestFailuresTrackerFrom(dic.Get)
reqFailsTracker.Set(device.Name, int(config.Device.AllowedFails))

lc.Debugf("starting AutoEvents for device %s", device.Name)
container.AutoEventManagerFrom(dic.Get).RestartForDevice(device.Name)
return nil
Expand Down Expand Up @@ -190,6 +194,9 @@ func DeleteDevice(name string, dic *di.Container) errors.EdgeX {
return errors.NewCommonEdgeX(errors.KindServerError, errMsg, err)
}

reqFailsTracker := container.AllowedRequestFailuresTrackerFrom(dic.Get)
reqFailsTracker.Remove(device.Name)

return nil
}

Expand Down
39 changes: 31 additions & 8 deletions internal/application/command.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2020-2023 IOTech Ltd
// Copyright (C) 2020-2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down Expand Up @@ -34,20 +34,27 @@ import (
"github.com/edgexfoundry/go-mod-core-contracts/v4/models"
)

func GetCommand(ctx context.Context, deviceName string, commandName string, queryParams string, regexCmd bool, dic *di.Container) (*dtos.Event, errors.EdgeX) {
func GetCommand(ctx context.Context, deviceName string, commandName string, queryParams string, regexCmd bool, dic *di.Container) (res *dtos.Event, err errors.EdgeX) {
if deviceName == "" {
return nil, errors.NewCommonEdgeX(errors.KindContractInvalid, "device name is empty", nil)
}
if commandName == "" {
return nil, errors.NewCommonEdgeX(errors.KindContractInvalid, "command is empty", nil)
}
var device models.Device
defer func() {
if err != nil {
DeviceRequestFailed(deviceName, dic)
} else {
DeviceRequestSucceeded(device, dic)
}
}()

device, err := validateServiceAndDeviceState(deviceName, dic)
device, err = validateServiceAndDeviceState(deviceName, dic)
if err != nil {
return nil, errors.NewCommonEdgeXWrapper(err)
}

var res *dtos.Event
_, cmdExist := cache.Profiles().DeviceCommand(device.ProfileName, commandName)
if cmdExist {
res, err = readDeviceCommand(device, commandName, queryParams, dic)
Expand All @@ -68,20 +75,27 @@ func GetCommand(ctx context.Context, deviceName string, commandName string, quer
return res, nil
}

func SetCommand(ctx context.Context, deviceName string, commandName string, queryParams string, requests map[string]any, dic *di.Container) (*dtos.Event, errors.EdgeX) {
func SetCommand(ctx context.Context, deviceName string, commandName string, queryParams string, requests map[string]any, dic *di.Container) (event *dtos.Event, err errors.EdgeX) {
if deviceName == "" {
return nil, errors.NewCommonEdgeX(errors.KindContractInvalid, "device name is empty", nil)
}
if commandName == "" {
return nil, errors.NewCommonEdgeX(errors.KindContractInvalid, "command is empty", nil)
}
var device models.Device
defer func() {
if err != nil {
DeviceRequestFailed(deviceName, dic)
} else {
DeviceRequestSucceeded(device, dic)
}
}()

device, err := validateServiceAndDeviceState(deviceName, dic)
device, err = validateServiceAndDeviceState(deviceName, dic)
if err != nil {
return nil, errors.NewCommonEdgeXWrapper(err)
}

var event *dtos.Event
_, cmdExist := cache.Profiles().DeviceCommand(device.ProfileName, commandName)
if cmdExist {
event, err = writeDeviceCommand(device, commandName, queryParams, requests, dic)
Expand Down Expand Up @@ -447,8 +461,17 @@ func validateServiceAndDeviceState(deviceName string, dic *di.Container) (models
return models.Device{}, errors.NewCommonEdgeX(errors.KindServiceLocked, fmt.Sprintf("device %s locked", device.Name), nil)
}
// check device's OperatingState
// if it's a device return attempt, operating state is allowed to be DOWN
if device.OperatingState == models.Down {
return models.Device{}, errors.NewCommonEdgeX(errors.KindServiceLocked, fmt.Sprintf("device %s OperatingState is DOWN", device.Name), nil)
err := errors.NewCommonEdgeX(errors.KindServiceLocked, fmt.Sprintf("device %s OperatingState is DOWN", device.Name), nil)
config := container.ConfigurationFrom(dic.Get)
if config.Device.AllowedFails == 0 || config.Device.DeviceDownTimeout == 0 {
return models.Device{}, err
}
reqFailsTracker := container.AllowedRequestFailuresTrackerFrom(dic.Get)
if reqFailsTracker.Value(deviceName) > 0 {
return models.Device{}, err
}
}

// check device's ProfileName
Expand Down
108 changes: 108 additions & 0 deletions internal/application/devicereturn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package application

import (
"context"
"time"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v4/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v4/di"

"github.com/edgexfoundry/go-mod-core-contracts/v4/common"
"github.com/edgexfoundry/go-mod-core-contracts/v4/models"

"github.com/edgexfoundry/device-sdk-go/v4/internal/cache"
sdkCommon "github.com/edgexfoundry/device-sdk-go/v4/internal/common"
"github.com/edgexfoundry/device-sdk-go/v4/internal/container"
)

func deviceReturn(deviceName string, dic *di.Container) {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
dc := bootstrapContainer.DeviceClientFrom(dic.Get)
config := container.ConfigurationFrom(dic.Get)

for {
LOOP:
time.Sleep(time.Duration(config.Device.DeviceDownTimeout) * time.Second)
lc.Infof("Checking operational state for device: %s", deviceName)

d, found := cache.Devices().ForName(deviceName)
if !found {
lc.Warnf("Device %s not found. Exiting retry loop.", deviceName)
return
}

if d.OperatingState == models.Up {
lc.Infof("Device %s is already operational. Exiting retry loop.", deviceName)
return
}

p, found := cache.Profiles().ForName(d.ProfileName)
if !found {
lc.Warnf("Device %s has no profile. Cannot set operational state automatically.", deviceName)
return
}

for _, dr := range p.DeviceResources {
if dr.Properties.ReadWrite == common.ReadWrite_R ||
dr.Properties.ReadWrite == common.ReadWrite_RW ||
dr.Properties.ReadWrite == common.ReadWrite_WR {
_, err := GetCommand(context.Background(), deviceName, dr.Name, "", true, dic)
if err == nil {
lc.Infof("Device %s responsive: setting operational state to up.", deviceName)
sdkCommon.UpdateOperatingState(deviceName, models.Up, lc, dc)
return
} else {
lc.Errorf("Device %s unresponsive: retrying in %v seconds.", deviceName, config.Device.DeviceDownTimeout)
goto LOOP
}
}
}
lc.Infof("Device %s has no readable resources. Setting operational state to up without checking.", deviceName)
sdkCommon.UpdateOperatingState(deviceName, models.Up, lc, dc)
return
}
}

func DeviceRequestFailed(deviceName string, dic *di.Container) {
config := container.ConfigurationFrom(dic.Get)
if config.Device.AllowedFails > 0 {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
dc := bootstrapContainer.DeviceClientFrom(dic.Get)
reqFailsTracker := container.AllowedRequestFailuresTrackerFrom(dic.Get)

if reqFailsTracker.Decrease(deviceName) == 0 {
d, ok := cache.Devices().ForName(deviceName)
if !ok {
return
}
if d.OperatingState != models.Down {
lc.Infof("Marking device %s non-operational", deviceName)
sdkCommon.UpdateOperatingState(deviceName, models.Down, lc, dc)
}
if config.Device.DeviceDownTimeout > 0 {
lc.Warnf("Will retry device %s in %v seconds", deviceName, config.Device.DeviceDownTimeout)
go deviceReturn(deviceName, dic)
}
return
}
}
}

func DeviceRequestSucceeded(d models.Device, dic *di.Container) {
config := container.ConfigurationFrom(dic.Get)
reqFailsTracker := container.AllowedRequestFailuresTrackerFrom(dic.Get)
if config.Device.AllowedFails > 0 && reqFailsTracker.Value(d.Name) < int(config.Device.AllowedFails) {
reqFailsTracker.Set(d.Name, int(config.Device.AllowedFails))
if d.OperatingState == models.Down {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
dc := bootstrapContainer.DeviceClientFrom(dic.Get)
sdkCommon.UpdateOperatingState(d.Name, models.Up, lc, dc)
}
}
}
2 changes: 1 addition & 1 deletion internal/autoevent/executor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2019-2023 IOTech Ltd
// Copyright (C) 2019-2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down
7 changes: 2 additions & 5 deletions internal/cache/devices.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2020-2023 IOTech Ltd
// Copyright (C) 2020-2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down Expand Up @@ -50,13 +50,10 @@ type deviceCache struct {
func newDeviceCache(devices []models.Device, dic *di.Container) DeviceCache {
defaultSize := len(devices)
dMap := make(map[string]*models.Device, defaultSize)
for i, d := range devices {
dMap[d.Name] = &devices[i]
}

dc = &deviceCache{deviceMap: dMap, dic: dic}
lastConnectedMetrics := make(map[string]gometrics.Gauge)
for _, d := range devices {
dMap[d.Name] = &d
deviceMetric := gometrics.NewGauge()
registerMetric(d.Name, deviceMetric, dic)
lastConnectedMetrics[d.Name] = deviceMetric
Expand Down
6 changes: 5 additions & 1 deletion internal/config/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// -*- mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2017-2018 Canonical Ltd
// Copyright (C) 2018-2023 IOTech Ltd
// Copyright (C) 2018-2025 IOTech Ltd
// Copyright (c) 2021 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
Expand Down Expand Up @@ -53,6 +53,10 @@ type DeviceInfo struct {
EnableAsyncReadings bool
// Labels are properties applied to the device service to help with searching
Labels []string
// AllowedFails specifies the number of failed requests allowed before a device is marked as down.
AllowedFails uint
// DeviceDownTimeout specifies the duration in seconds that the Device Service will try to contact a device if it is marked as down.
DeviceDownTimeout uint
}

// DiscoveryInfo is a struct which contains configuration of device auto discovery.
Expand Down
61 changes: 61 additions & 0 deletions internal/container/allowedfailstracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package container

import "github.com/edgexfoundry/device-sdk-go/v4/internal/syncutils"

// AllowedFailuresTracker wraps a map of device names to atomic integers that track the number of allowed request
// failures for each device.
type AllowedFailuresTracker struct {
data map[string]*syncutils.AtomicInt
}

// NewAllowedFailuresTracker creates and initializes a new tracker.
func NewAllowedFailuresTracker() AllowedFailuresTracker {
return AllowedFailuresTracker{
data: make(map[string]*syncutils.AtomicInt),
}
}

// Get retrieves the AtomicInt for a given device name.
// Returns nil if the device does not exist.
func (aft *AllowedFailuresTracker) Get(deviceName string) *syncutils.AtomicInt {
return aft.data[deviceName]
}

// Set initializes or updates the AtomicInt for a given device.
func (aft *AllowedFailuresTracker) Set(deviceName string, value int) {
if _, exists := aft.data[deviceName]; !exists {
aft.data[deviceName] = &syncutils.AtomicInt{}
}
aft.data[deviceName].Set(value)
}

// Decrease decreases the AtomicInt value for a given device by 1.
// Returns the updated value or -1 if the device does not exist.
func (aft *AllowedFailuresTracker) Decrease(deviceName string) int {
if atomicInt, exists := aft.data[deviceName]; exists {
if atomicInt.Value() >= 0 {
return atomicInt.Decrease()
}
}
return -1
}

// Value retrieves the current value of the AtomicInt for a device.
// Returns -1 if the device does not exist.
func (aft *AllowedFailuresTracker) Value(deviceName string) int {
if atomicInt, exists := aft.data[deviceName]; exists {
return atomicInt.Value()
}
return -1
}

// Remove deletes the entry for a given device name from the tracker.
func (aft *AllowedFailuresTracker) Remove(deviceName string) {
delete(aft.data, deviceName)
}
10 changes: 9 additions & 1 deletion internal/container/deviceservice.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2020-2023 IOTech Ltd
// Copyright (C) 2020-2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down Expand Up @@ -60,3 +60,11 @@ func DiscoveryRequestIdFrom(get di.Get) string {
}
return id
}

// AllowedRequestFailuresTrackerName contains the name of allowed request failures tracker in the DIC.
var AllowedRequestFailuresTrackerName = di.TypeInstanceToName(AllowedFailuresTracker{})

// AllowedRequestFailuresTrackerFrom helper function queries the DIC and returns a device request failures tracker.
func AllowedRequestFailuresTrackerFrom(get di.Get) AllowedFailuresTracker {
return get(AllowedRequestFailuresTrackerName).(AllowedFailuresTracker)
}
5 changes: 4 additions & 1 deletion internal/controller/http/command_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2022-2023 IOTech Ltd
// Copyright (C) 2022-2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down Expand Up @@ -208,6 +208,9 @@ func mockDic() *di.Container {
bootstrapContainer.MetricsManagerInterfaceName: func(get di.Get) interface{} {
return mockMetricsManager
},
container.AllowedRequestFailuresTrackerName: func(get di.Get) any {
return container.NewAllowedFailuresTracker()
},
})

return dic
Expand Down
Loading