diff --git a/pkg/controlloop/pod.go b/pkg/controlloop/pod.go index 666b20e9b..d4dca35fa 100644 --- a/pkg/controlloop/pod.go +++ b/pkg/controlloop/pod.go @@ -73,7 +73,7 @@ type PodController struct { netAttachDefLister nadlister.NetworkAttachmentDefinitionLister broadcaster record.EventBroadcaster recorder record.EventRecorder - workqueue workqueue.RateLimitingInterface + workqueue workqueue.TypedRateLimitingInterface[*v1.Pod] mountPath string cleanupFunc garbageCollector } @@ -108,9 +108,8 @@ func newPodController(k8sCoreClient kubernetes.Interface, wbClient wbclientset.I networksInformer := netAttachDefInformer.Informer() podsInformer := k8sPodFilteredInformer.Informer() - queue := workqueue.NewNamedRateLimitingQueue( - workqueue.DefaultControllerRateLimiter(), - ipReconcilerQueueName) + queue := workqueue.NewTypedRateLimitingQueue[*v1.Pod]( + workqueue.DefaultTypedControllerRateLimiter[*v1.Pod]()) podsInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ @@ -166,7 +165,7 @@ func (pc *PodController) processNextWorkItem() bool { } defer pc.workqueue.Done(queueItem) - pod := queueItem.(*v1.Pod) + pod := queueItem err := pc.garbageCollectPodIPs(pod) logging.Verbosef("result of garbage collecting pods: %+v", err) pc.handleResult(pod, err) @@ -344,7 +343,7 @@ func (pc *PodController) addressGarbageCollectionFailed(pod *v1.Pod, err error) } } -func onPodDelete(queue workqueue.RateLimitingInterface, obj interface{}) { +func onPodDelete(queue workqueue.TypedRateLimitingInterface[*v1.Pod], obj interface{}) { pod, err := podFromTombstone(obj) if err != nil { logging.Errorf("cannot create pod object from %v on pod delete: %v", obj, err) diff --git a/pkg/node-controller/controller.go b/pkg/node-controller/controller.go index f5e30757d..d751a1022 100644 --- a/pkg/node-controller/controller.go +++ b/pkg/node-controller/controller.go @@ -69,7 +69,7 @@ type Controller struct { // means we can ensure we only process a fixed amount of resources at a // time, and makes it easy to ensure we are never processing the same item // simultaneously in two different workers. - workqueue workqueue.RateLimitingInterface + workqueue workqueue.TypedRateLimitingInterface[string] // recorder is an event recorder for recording Event resources to the // Kubernetes API. @@ -103,9 +103,9 @@ func NewController( eventBroadcaster.StartStructuredLogging(0) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) - ratelimiter := workqueue.NewMaxOfRateLimiter( - workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(50), 300)}, + ratelimiter := workqueue.NewTypedMaxOfRateLimiter( + workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second), + &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(50), 300)}, ) c := &Controller{ @@ -121,7 +121,7 @@ func NewController( nadInformer: nadInformer, nadLister: nadInformer.Lister(), nadSynced: nadInformer.Informer().HasSynced, - workqueue: workqueue.NewRateLimitingQueue(ratelimiter), + workqueue: workqueue.NewTypedRateLimitingQueue(ratelimiter), recorder: recorder, sortResults: sortResults, whereaboutsNamespace: whereaboutsNamespace, @@ -253,29 +253,20 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool { } // We wrap this block in a func so we can defer c.workqueue.Done. - err := func(obj interface{}) error { + err := func(key string) error { // We call Done here so the workqueue knows we have finished // processing this item. We also must remember to call Forget if we // do not want this work item being re-queued. For example, we do // not call Forget if a transient error occurs, instead the item is // put back on the workqueue and attempted again after a back-off // period. - defer c.workqueue.Done(obj) - var key string - var ok bool + defer c.workqueue.Done(key) // We expect strings to come off the workqueue. These are of the // form namespace/name. We do this as the delayed nature of the // workqueue means the items in the informer cache may actually be // more up to date that when the item was initially put onto the // workqueue. - if key, ok = obj.(string); !ok { - // As the item in the workqueue is actually invalid, we call - // Forget here else we'd go into a loop of attempting to - // process a work item that is invalid. - c.workqueue.Forget(obj) - utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil - } + // Run the syncHandler, passing it the namespace/name string of the // Foo resource to be synced. if err := c.syncHandler(ctx, key); err != nil { @@ -285,7 +276,7 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool { } // Finally, if no error occurs we Forget this item so it does not // get queued again until another change happens. - c.workqueue.Forget(obj) + c.workqueue.Forget(key) logger.Info("Successfully synced", "resourceName", key) return nil }(obj)