From 39bd3bf24d06472191b6cf438f4179f45ef93a06 Mon Sep 17 00:00:00 2001 From: FelixTing Date: Tue, 7 Jan 2025 18:37:31 +0800 Subject: [PATCH] feat: Add device up/down detection Signed-off-by: FelixTing --- internal/application/callback.go | 9 +- internal/application/command.go | 39 ++++++-- internal/application/devicereturn.go | 108 ++++++++++++++++++++++ internal/autoevent/executor.go | 2 +- internal/cache/devices.go | 7 +- internal/config/types.go | 6 +- internal/container/allowedfailstracker.go | 61 ++++++++++++ internal/container/deviceservice.go | 10 +- internal/controller/http/command_test.go | 5 +- internal/syncutils/atomicint.go | 34 +++++++ pkg/service/init.go | 12 +++ 11 files changed, 275 insertions(+), 18 deletions(-) create mode 100644 internal/application/devicereturn.go create mode 100644 internal/container/allowedfailstracker.go create mode 100644 internal/syncutils/atomicint.go diff --git a/internal/application/callback.go b/internal/application/callback.go index c53cd477a..eebf49c60 100644 --- a/internal/application/callback.go +++ b/internal/application/callback.go @@ -1,6 +1,6 @@ // -*- Mode: Go; indent-tabs-mode: t -*- // -// Copyright (C) 2020-2023 IOTech Ltd +// Copyright (C) 2020-2025 IOTech Ltd // // SPDX-License-Identifier: Apache-2.0 @@ -96,6 +96,10 @@ func AddDevice(addDeviceRequest requests.AddDeviceRequest, dic *di.Container) er return errors.NewCommonEdgeX(errors.KindServerError, errMsg, err) } + config := container.ConfigurationFrom(dic.Get) + reqFailsTracker := container.AllowedRequestFailuresTrackerFrom(dic.Get) + reqFailsTracker.Set(device.Name, int(config.Device.AllowedFails)) + lc.Debugf("starting AutoEvents for device %s", device.Name) container.AutoEventManagerFrom(dic.Get).RestartForDevice(device.Name) return nil @@ -190,6 +194,9 @@ func DeleteDevice(name string, dic *di.Container) errors.EdgeX { return errors.NewCommonEdgeX(errors.KindServerError, errMsg, err) } + reqFailsTracker := container.AllowedRequestFailuresTrackerFrom(dic.Get) + reqFailsTracker.Remove(device.Name) + return nil } diff --git a/internal/application/command.go b/internal/application/command.go index 5752e800f..87d14c64c 100644 --- a/internal/application/command.go +++ b/internal/application/command.go @@ -1,6 +1,6 @@ // -*- Mode: Go; indent-tabs-mode: t -*- // -// Copyright (C) 2020-2023 IOTech Ltd +// Copyright (C) 2020-2025 IOTech Ltd // // SPDX-License-Identifier: Apache-2.0 @@ -34,20 +34,27 @@ import ( "github.com/edgexfoundry/go-mod-core-contracts/v4/models" ) -func GetCommand(ctx context.Context, deviceName string, commandName string, queryParams string, regexCmd bool, dic *di.Container) (*dtos.Event, errors.EdgeX) { +func GetCommand(ctx context.Context, deviceName string, commandName string, queryParams string, regexCmd bool, dic *di.Container) (res *dtos.Event, err errors.EdgeX) { if deviceName == "" { return nil, errors.NewCommonEdgeX(errors.KindContractInvalid, "device name is empty", nil) } if commandName == "" { return nil, errors.NewCommonEdgeX(errors.KindContractInvalid, "command is empty", nil) } + var device models.Device + defer func() { + if err != nil { + DeviceRequestFailed(deviceName, dic) + } else { + DeviceRequestSucceeded(device, dic) + } + }() - device, err := validateServiceAndDeviceState(deviceName, dic) + device, err = validateServiceAndDeviceState(deviceName, dic) if err != nil { return nil, errors.NewCommonEdgeXWrapper(err) } - var res *dtos.Event _, cmdExist := cache.Profiles().DeviceCommand(device.ProfileName, commandName) if cmdExist { res, err = readDeviceCommand(device, commandName, queryParams, dic) @@ -68,20 +75,27 @@ func GetCommand(ctx context.Context, deviceName string, commandName string, quer return res, nil } -func SetCommand(ctx context.Context, deviceName string, commandName string, queryParams string, requests map[string]any, dic *di.Container) (*dtos.Event, errors.EdgeX) { +func SetCommand(ctx context.Context, deviceName string, commandName string, queryParams string, requests map[string]any, dic *di.Container) (event *dtos.Event, err errors.EdgeX) { if deviceName == "" { return nil, errors.NewCommonEdgeX(errors.KindContractInvalid, "device name is empty", nil) } if commandName == "" { return nil, errors.NewCommonEdgeX(errors.KindContractInvalid, "command is empty", nil) } + var device models.Device + defer func() { + if err != nil { + DeviceRequestFailed(deviceName, dic) + } else { + DeviceRequestSucceeded(device, dic) + } + }() - device, err := validateServiceAndDeviceState(deviceName, dic) + device, err = validateServiceAndDeviceState(deviceName, dic) if err != nil { return nil, errors.NewCommonEdgeXWrapper(err) } - var event *dtos.Event _, cmdExist := cache.Profiles().DeviceCommand(device.ProfileName, commandName) if cmdExist { event, err = writeDeviceCommand(device, commandName, queryParams, requests, dic) @@ -447,8 +461,17 @@ func validateServiceAndDeviceState(deviceName string, dic *di.Container) (models return models.Device{}, errors.NewCommonEdgeX(errors.KindServiceLocked, fmt.Sprintf("device %s locked", device.Name), nil) } // check device's OperatingState + // if it's a device return attempt, operating state is allowed to be DOWN if device.OperatingState == models.Down { - return models.Device{}, errors.NewCommonEdgeX(errors.KindServiceLocked, fmt.Sprintf("device %s OperatingState is DOWN", device.Name), nil) + err := errors.NewCommonEdgeX(errors.KindServiceLocked, fmt.Sprintf("device %s OperatingState is DOWN", device.Name), nil) + config := container.ConfigurationFrom(dic.Get) + if config.Device.AllowedFails == 0 || config.Device.DeviceDownTimeout == 0 { + return models.Device{}, err + } + reqFailsTracker := container.AllowedRequestFailuresTrackerFrom(dic.Get) + if reqFailsTracker.Value(deviceName) > 0 { + return models.Device{}, err + } } // check device's ProfileName diff --git a/internal/application/devicereturn.go b/internal/application/devicereturn.go new file mode 100644 index 000000000..3befdb8ac --- /dev/null +++ b/internal/application/devicereturn.go @@ -0,0 +1,108 @@ +// -*- Mode: Go; indent-tabs-mode: t -*- +// +// Copyright (C) 2025 IOTech Ltd +// +// SPDX-License-Identifier: Apache-2.0 + +package application + +import ( + "context" + "time" + + bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v4/bootstrap/container" + "github.com/edgexfoundry/go-mod-bootstrap/v4/di" + + "github.com/edgexfoundry/go-mod-core-contracts/v4/common" + "github.com/edgexfoundry/go-mod-core-contracts/v4/models" + + "github.com/edgexfoundry/device-sdk-go/v4/internal/cache" + sdkCommon "github.com/edgexfoundry/device-sdk-go/v4/internal/common" + "github.com/edgexfoundry/device-sdk-go/v4/internal/container" +) + +func deviceReturn(deviceName string, dic *di.Container) { + lc := bootstrapContainer.LoggingClientFrom(dic.Get) + dc := bootstrapContainer.DeviceClientFrom(dic.Get) + config := container.ConfigurationFrom(dic.Get) + + for { + LOOP: + time.Sleep(time.Duration(config.Device.DeviceDownTimeout) * time.Second) + lc.Infof("Checking operational state for device: %s", deviceName) + + d, found := cache.Devices().ForName(deviceName) + if !found { + lc.Warnf("Device %s not found. Exiting retry loop.", deviceName) + return + } + + if d.OperatingState == models.Up { + lc.Infof("Device %s is already operational. Exiting retry loop.", deviceName) + return + } + + p, found := cache.Profiles().ForName(d.ProfileName) + if !found { + lc.Warnf("Device %s has no profile. Cannot set operational state automatically.", deviceName) + return + } + + for _, dr := range p.DeviceResources { + if dr.Properties.ReadWrite == common.ReadWrite_R || + dr.Properties.ReadWrite == common.ReadWrite_RW || + dr.Properties.ReadWrite == common.ReadWrite_WR { + _, err := GetCommand(context.Background(), deviceName, dr.Name, "", true, dic) + if err == nil { + lc.Infof("Device %s responsive: setting operational state to up.", deviceName) + sdkCommon.UpdateOperatingState(deviceName, models.Up, lc, dc) + return + } else { + lc.Errorf("Device %s unresponsive: retrying in %v seconds.", deviceName, config.Device.DeviceDownTimeout) + goto LOOP + } + } + } + lc.Infof("Device %s has no readable resources. Setting operational state to up without checking.", deviceName) + sdkCommon.UpdateOperatingState(deviceName, models.Up, lc, dc) + return + } +} + +func DeviceRequestFailed(deviceName string, dic *di.Container) { + config := container.ConfigurationFrom(dic.Get) + if config.Device.AllowedFails > 0 { + lc := bootstrapContainer.LoggingClientFrom(dic.Get) + dc := bootstrapContainer.DeviceClientFrom(dic.Get) + reqFailsTracker := container.AllowedRequestFailuresTrackerFrom(dic.Get) + + if reqFailsTracker.Decrease(deviceName) == 0 { + d, ok := cache.Devices().ForName(deviceName) + if !ok { + return + } + if d.OperatingState != models.Down { + lc.Infof("Marking device %s non-operational", deviceName) + sdkCommon.UpdateOperatingState(deviceName, models.Down, lc, dc) + } + if config.Device.DeviceDownTimeout > 0 { + lc.Warnf("Will retry device %s in %v seconds", deviceName, config.Device.DeviceDownTimeout) + go deviceReturn(deviceName, dic) + } + return + } + } +} + +func DeviceRequestSucceeded(d models.Device, dic *di.Container) { + config := container.ConfigurationFrom(dic.Get) + reqFailsTracker := container.AllowedRequestFailuresTrackerFrom(dic.Get) + if config.Device.AllowedFails > 0 && reqFailsTracker.Value(d.Name) < int(config.Device.AllowedFails) { + reqFailsTracker.Set(d.Name, int(config.Device.AllowedFails)) + if d.OperatingState == models.Down { + lc := bootstrapContainer.LoggingClientFrom(dic.Get) + dc := bootstrapContainer.DeviceClientFrom(dic.Get) + sdkCommon.UpdateOperatingState(d.Name, models.Up, lc, dc) + } + } +} diff --git a/internal/autoevent/executor.go b/internal/autoevent/executor.go index 473add0ad..6d7a195e9 100644 --- a/internal/autoevent/executor.go +++ b/internal/autoevent/executor.go @@ -1,6 +1,6 @@ // -*- Mode: Go; indent-tabs-mode: t -*- // -// Copyright (C) 2019-2023 IOTech Ltd +// Copyright (C) 2019-2025 IOTech Ltd // // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cache/devices.go b/internal/cache/devices.go index b92adcfc9..a60886354 100644 --- a/internal/cache/devices.go +++ b/internal/cache/devices.go @@ -1,6 +1,6 @@ // -*- Mode: Go; indent-tabs-mode: t -*- // -// Copyright (C) 2020-2023 IOTech Ltd +// Copyright (C) 2020-2025 IOTech Ltd // // SPDX-License-Identifier: Apache-2.0 @@ -50,13 +50,10 @@ type deviceCache struct { func newDeviceCache(devices []models.Device, dic *di.Container) DeviceCache { defaultSize := len(devices) dMap := make(map[string]*models.Device, defaultSize) - for i, d := range devices { - dMap[d.Name] = &devices[i] - } - dc = &deviceCache{deviceMap: dMap, dic: dic} lastConnectedMetrics := make(map[string]gometrics.Gauge) for _, d := range devices { + dMap[d.Name] = &d deviceMetric := gometrics.NewGauge() registerMetric(d.Name, deviceMetric, dic) lastConnectedMetrics[d.Name] = deviceMetric diff --git a/internal/config/types.go b/internal/config/types.go index b6437f81c..e05adaea9 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -1,7 +1,7 @@ // -*- mode: Go; indent-tabs-mode: t -*- // // Copyright (C) 2017-2018 Canonical Ltd -// Copyright (C) 2018-2023 IOTech Ltd +// Copyright (C) 2018-2025 IOTech Ltd // Copyright (c) 2021 Intel Corporation // // SPDX-License-Identifier: Apache-2.0 @@ -53,6 +53,10 @@ type DeviceInfo struct { EnableAsyncReadings bool // Labels are properties applied to the device service to help with searching Labels []string + // AllowedFails specifies the number of failed requests allowed before a device is marked as down. + AllowedFails uint + // DeviceDownTimeout specifies the duration in seconds that the Device Service will try to contact a device if it is marked as down. + DeviceDownTimeout uint } // DiscoveryInfo is a struct which contains configuration of device auto discovery. diff --git a/internal/container/allowedfailstracker.go b/internal/container/allowedfailstracker.go new file mode 100644 index 000000000..5f9271e47 --- /dev/null +++ b/internal/container/allowedfailstracker.go @@ -0,0 +1,61 @@ +// -*- Mode: Go; indent-tabs-mode: t -*- +// +// Copyright (C) 2025 IOTech Ltd +// +// SPDX-License-Identifier: Apache-2.0 + +package container + +import "github.com/edgexfoundry/device-sdk-go/v4/internal/syncutils" + +// AllowedFailuresTracker wraps a map of device names to atomic integers that track the number of allowed request +// failures for each device. +type AllowedFailuresTracker struct { + data map[string]*syncutils.AtomicInt +} + +// NewAllowedFailuresTracker creates and initializes a new tracker. +func NewAllowedFailuresTracker() AllowedFailuresTracker { + return AllowedFailuresTracker{ + data: make(map[string]*syncutils.AtomicInt), + } +} + +// Get retrieves the AtomicInt for a given device name. +// Returns nil if the device does not exist. +func (aft *AllowedFailuresTracker) Get(deviceName string) *syncutils.AtomicInt { + return aft.data[deviceName] +} + +// Set initializes or updates the AtomicInt for a given device. +func (aft *AllowedFailuresTracker) Set(deviceName string, value int) { + if _, exists := aft.data[deviceName]; !exists { + aft.data[deviceName] = &syncutils.AtomicInt{} + } + aft.data[deviceName].Set(value) +} + +// Decrease decreases the AtomicInt value for a given device by 1. +// Returns the updated value or -1 if the device does not exist. +func (aft *AllowedFailuresTracker) Decrease(deviceName string) int { + if atomicInt, exists := aft.data[deviceName]; exists { + if atomicInt.Value() >= 0 { + return atomicInt.Decrease() + } + } + return -1 +} + +// Value retrieves the current value of the AtomicInt for a device. +// Returns -1 if the device does not exist. +func (aft *AllowedFailuresTracker) Value(deviceName string) int { + if atomicInt, exists := aft.data[deviceName]; exists { + return atomicInt.Value() + } + return -1 +} + +// Remove deletes the entry for a given device name from the tracker. +func (aft *AllowedFailuresTracker) Remove(deviceName string) { + delete(aft.data, deviceName) +} diff --git a/internal/container/deviceservice.go b/internal/container/deviceservice.go index 4a3fed8e4..00f4f1140 100644 --- a/internal/container/deviceservice.go +++ b/internal/container/deviceservice.go @@ -1,6 +1,6 @@ // -*- Mode: Go; indent-tabs-mode: t -*- // -// Copyright (C) 2020-2023 IOTech Ltd +// Copyright (C) 2020-2025 IOTech Ltd // // SPDX-License-Identifier: Apache-2.0 @@ -60,3 +60,11 @@ func DiscoveryRequestIdFrom(get di.Get) string { } return id } + +// AllowedRequestFailuresTrackerName contains the name of allowed request failures tracker in the DIC. +var AllowedRequestFailuresTrackerName = di.TypeInstanceToName(AllowedFailuresTracker{}) + +// AllowedRequestFailuresTrackerFrom helper function queries the DIC and returns a device request failures tracker. +func AllowedRequestFailuresTrackerFrom(get di.Get) AllowedFailuresTracker { + return get(AllowedRequestFailuresTrackerName).(AllowedFailuresTracker) +} diff --git a/internal/controller/http/command_test.go b/internal/controller/http/command_test.go index e3e2744f7..89b0b3301 100644 --- a/internal/controller/http/command_test.go +++ b/internal/controller/http/command_test.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2022-2023 IOTech Ltd +// Copyright (C) 2022-2025 IOTech Ltd // // SPDX-License-Identifier: Apache-2.0 @@ -208,6 +208,9 @@ func mockDic() *di.Container { bootstrapContainer.MetricsManagerInterfaceName: func(get di.Get) interface{} { return mockMetricsManager }, + container.AllowedRequestFailuresTrackerName: func(get di.Get) any { + return container.NewAllowedFailuresTracker() + }, }) return dic diff --git a/internal/syncutils/atomicint.go b/internal/syncutils/atomicint.go new file mode 100644 index 000000000..532aecf3e --- /dev/null +++ b/internal/syncutils/atomicint.go @@ -0,0 +1,34 @@ +// -*- Mode: Go; indent-tabs-mode: t -*- +// +// Copyright (C) 2025 IOTech Ltd +// +// SPDX-License-Identifier: Apache-2.0 + +package syncutils + +import "sync" + +type AtomicInt struct { + mutex sync.RWMutex + value int +} + +func (i *AtomicInt) Value() int { + i.mutex.RLock() + defer i.mutex.RUnlock() + v := i.value + return v +} + +func (i *AtomicInt) Decrease() int { + i.mutex.Lock() + defer i.mutex.Unlock() + i.value-- + return i.value +} + +func (i *AtomicInt) Set(v int) { + i.mutex.Lock() + defer i.mutex.Unlock() + i.value = v +} diff --git a/pkg/service/init.go b/pkg/service/init.go index 502021d7d..85805608a 100644 --- a/pkg/service/init.go +++ b/pkg/service/init.go @@ -63,6 +63,18 @@ func (b *Bootstrap) BootstrapHandler(ctx context.Context, wg *sync.WaitGroup, _ return false } + devices := cache.Devices().All() + config := container.ConfigurationFrom(dic.Get) + reqFailsTracker := container.NewAllowedFailuresTracker() + for _, d := range devices { + reqFailsTracker.Set(d.Name, int(config.Device.AllowedFails)) + } + dic.Update(di.ServiceConstructorMap{ + container.AllowedRequestFailuresTrackerName: func(get di.Get) any { + return reqFailsTracker + }, + }) + if s.AsyncReadingsEnabled() { s.asyncCh = make(chan *models.AsyncValues, s.config.Device.AsyncBufferSize) wg.Add(1)