Skip to content

Commit

Permalink
refactor to remove deprecated code
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasferrandiz committed Dec 16, 2024
1 parent daa12ca commit f7c3ef9
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 24 deletions.
11 changes: 5 additions & 6 deletions pkg/controlloop/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type PodController struct {
netAttachDefLister nadlister.NetworkAttachmentDefinitionLister
broadcaster record.EventBroadcaster
recorder record.EventRecorder
workqueue workqueue.RateLimitingInterface
workqueue workqueue.TypedRateLimitingInterface[*v1.Pod]
mountPath string
cleanupFunc garbageCollector
}
Expand Down Expand Up @@ -108,9 +108,8 @@ func newPodController(k8sCoreClient kubernetes.Interface, wbClient wbclientset.I
networksInformer := netAttachDefInformer.Informer()
podsInformer := k8sPodFilteredInformer.Informer()

queue := workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(),
ipReconcilerQueueName)
queue := workqueue.NewTypedRateLimitingQueue[*v1.Pod](
workqueue.DefaultTypedControllerRateLimiter[*v1.Pod]())

podsInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -166,7 +165,7 @@ func (pc *PodController) processNextWorkItem() bool {
}
defer pc.workqueue.Done(queueItem)

pod := queueItem.(*v1.Pod)
pod := queueItem
err := pc.garbageCollectPodIPs(pod)
logging.Verbosef("result of garbage collecting pods: %+v", err)
pc.handleResult(pod, err)
Expand Down Expand Up @@ -344,7 +343,7 @@ func (pc *PodController) addressGarbageCollectionFailed(pod *v1.Pod, err error)
}
}

func onPodDelete(queue workqueue.RateLimitingInterface, obj interface{}) {
func onPodDelete(queue workqueue.TypedRateLimitingInterface[*v1.Pod], obj interface{}) {
pod, err := podFromTombstone(obj)
if err != nil {
logging.Errorf("cannot create pod object from %v on pod delete: %v", obj, err)
Expand Down
27 changes: 9 additions & 18 deletions pkg/node-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type Controller struct {
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
workqueue workqueue.RateLimitingInterface
workqueue workqueue.TypedRateLimitingInterface[string]

// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
Expand Down Expand Up @@ -103,9 +103,9 @@ func NewController(
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
ratelimiter := workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
ratelimiter := workqueue.NewTypedMaxOfRateLimiter(
workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second),
&workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
)

c := &Controller{
Expand All @@ -121,7 +121,7 @@ func NewController(
nadInformer: nadInformer,
nadLister: nadInformer.Lister(),
nadSynced: nadInformer.Informer().HasSynced,
workqueue: workqueue.NewRateLimitingQueue(ratelimiter),
workqueue: workqueue.NewTypedRateLimitingQueue(ratelimiter),
recorder: recorder,
sortResults: sortResults,
whereaboutsNamespace: whereaboutsNamespace,
Expand Down Expand Up @@ -253,29 +253,20 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool {
}

// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
err := func(key string) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.workqueue.Done(obj)
var key string
var ok bool
defer c.workqueue.Done(key)
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}

// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if err := c.syncHandler(ctx, key); err != nil {
Expand All @@ -285,7 +276,7 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool {
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
c.workqueue.Forget(key)
logger.Info("Successfully synced", "resourceName", key)
return nil
}(obj)
Expand Down

0 comments on commit f7c3ef9

Please sign in to comment.