From 656924e76d34ad57c98bbc70d5fa00d36eafc578 Mon Sep 17 00:00:00 2001 From: Marcus Weiner Date: Sun, 6 Feb 2022 20:59:35 +0100 Subject: [PATCH 1/2] Move pod ready check into utils --- .../servicecontroller/servicecontroller.go | 18 +++++------------- internal/utils/utils.go | 14 ++++++++++++++ 2 files changed, 19 insertions(+), 13 deletions(-) create mode 100644 internal/utils/utils.go diff --git a/internal/servicecontroller/servicecontroller.go b/internal/servicecontroller/servicecontroller.go index f53b240..c8c346c 100644 --- a/internal/servicecontroller/servicecontroller.go +++ b/internal/servicecontroller/servicecontroller.go @@ -19,6 +19,7 @@ import ( "github.com/costela/hcloud-ip-floater/internal/config" "github.com/costela/hcloud-ip-floater/internal/fipcontroller" "github.com/costela/hcloud-ip-floater/internal/stringset" + "github.com/costela/hcloud-ip-floater/internal/utils" ) type podInformerType struct { @@ -267,7 +268,7 @@ func (sc *Controller) handleNewPod(svcKey string, newPod *corev1.Pod) error { "pod": newPod.Name, }) - if !podIsReady(newPod) { + if !utils.PodIsReady(newPod) { funcLogger.Debug("ignoring non-ready pod") return nil } @@ -288,8 +289,8 @@ func (sc *Controller) handlePodUpdate(svcKey string, oldPod, newPod *corev1.Pod) "pod": newPod.Name, }) - oldReady := podIsReady(oldPod) - newReady := podIsReady(newPod) + oldReady := utils.PodIsReady(oldPod) + newReady := utils.PodIsReady(newPod) if oldReady == newReady { funcLogger.Debug("pod readiness unchanged") @@ -320,15 +321,6 @@ func (sc *Controller) getServiceFromKey(svcKey string) (*corev1.Service, error) return svc, nil } -func podIsReady(pod *corev1.Pod) bool { - for _, condition := range pod.Status.Conditions { - if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue { - return true - } - } - return false -} - func (sc *Controller) handleServiceIPs(svc *corev1.Service, svcIPs stringset.StringSet) error { // TODO: use util/workqueue to avoid blocking informer if hcloud API is slow @@ -393,7 +385,7 @@ func (sc *Controller) getServiceReadyNodes(svcKey string) ([]string, error) { nodes := make([]string, 0, len(pods)) for _, pod := range pods { - if podIsReady(pod) { + if utils.PodIsReady(pod) { nodes = append(nodes, pod.Spec.NodeName) } } diff --git a/internal/utils/utils.go b/internal/utils/utils.go new file mode 100644 index 0000000..ef15fdc --- /dev/null +++ b/internal/utils/utils.go @@ -0,0 +1,14 @@ +package utils + +import ( + corev1 "k8s.io/api/core/v1" +) + +func PodIsReady(pod *corev1.Pod) bool { + for _, condition := range pod.Status.Conditions { + if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue { + return true + } + } + return false +} From d591eb1aef3a542d0be73262e134f43f56c8ee1a Mon Sep 17 00:00:00 2001 From: Marcus Weiner Date: Sun, 6 Feb 2022 22:04:24 +0100 Subject: [PATCH 2/2] Implement controller to assign IP manually for pod --- README.md | 27 ++- internal/config/config.go | 1 + internal/manualcontroller/controller.go | 194 ++++++++++++++++++ .../servicecontroller/servicecontroller.go | 12 ++ main.go | 9 + 5 files changed, 241 insertions(+), 2 deletions(-) create mode 100644 internal/manualcontroller/controller.go diff --git a/README.md b/README.md index 6a680f4..8ef7505 100644 --- a/README.md +++ b/README.md @@ -46,15 +46,38 @@ Either as command line arguments or environment variables. API token for hetzner cloud access. -### `--service-label-selector` or `HCLOUD_IP_FLOATER_SERVICE_LABEL_SELECTOR` +### `--service-label-selector` or `HCLOUD_IP_FLOATER_SERVICE_LABEL_SELECTOR` Service label selector to use when watching for kubernetes services. Any services that do not match this selector will be ignored by the controller. **Default**: `hcloud-ip-floater.cstl.dev/ignore!=true` +### `--manual-assignment-label` or `HCLOUD_IP_FLOATER_MANUAL_ASSIGNMENT_LABEL` + +This is experimental and hasn't seen a lot of production use! + +Label name used to manually assign floating IPs on a Pod. + +This can be useful when other means of routing the traffic to a pod than a load balancer are used. E.g. you could be using the [`ipvlan` CNI plugin](https://www.cni.dev/plugins/current/main/ipvlan/) with [Multus](https://github.com/k8snetworkplumbingwg/multus-cni/). +The label accepts a comma-seperated list of floating IP addresses to assign to the node the pod is on. + +Example: +```yaml +apiVersion: v1 +kind: Pod +metadata: + name: my-pod + labels: + hcloud-ip-floater.cstl.dev/floating-ip: "1.2.3.4,2.3.4.5" +``` + +This mechanism will be ignored if there is a service with the same IP present. + +**Default**: `hcloud-ip-floater.cstl.dev/floating-ip` + ### `--floating-label-selector` or `HCLOUD_IP_FLOATER_FLOATING_LABEL_SELECTOR` -Label selector for hcloud floating IPs. Floating IPs that do not match this selector will be ignored by the controller. +Label selector for hcloud floating IPs. Floating IPs that do not match this selector will be ignored by the controller. **Default**: `hcloud-ip-floater.cstl.dev/ignore!=true` diff --git a/internal/config/config.go b/internal/config/config.go index 15c364f..1c52179 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -4,6 +4,7 @@ var Global struct { LogLevel string `id:"log-level" short:"l" desc:"verbosity level for logs" default:"warn"` HCloudToken string `id:"hcloud-token" desc:"API token for HCloud access"` ServiceLabelSelector string `id:"service-label-selector" desc:"label selector used to match services" default:"hcloud-ip-floater.cstl.dev/ignore!=true"` + ManualAssignmentLabel string `id:"manual-assignment-label" desc:"pod label used to assign an IP without a service" default:"hcloud-ip-floater.cstl.dev/floating-ip"` FloatingLabelSelector string `id:"floating-label-selector" desc:"label selector used to match floating IPs" default:""` // optional MetalLB integration diff --git a/internal/manualcontroller/controller.go b/internal/manualcontroller/controller.go new file mode 100644 index 0000000..723cf3d --- /dev/null +++ b/internal/manualcontroller/controller.go @@ -0,0 +1,194 @@ +package manualcontroller + +import ( + "bytes" + "context" + "crypto/sha256" + "sort" + "strings" + "time" + + "github.com/costela/hcloud-ip-floater/internal/config" + "github.com/costela/hcloud-ip-floater/internal/fipcontroller" + "github.com/costela/hcloud-ip-floater/internal/servicecontroller" + "github.com/costela/hcloud-ip-floater/internal/stringset" + "github.com/costela/hcloud-ip-floater/internal/utils" + "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +type Controller struct { + Logger logrus.FieldLogger + + K8S *kubernetes.Clientset + SVCc *servicecontroller.Controller + FIPc *fipcontroller.Controller + + podInformer cache.SharedInformer +} + +func (c *Controller) Run() { + ipLabel := config.Global.ManualAssignmentLabel + + podInformerFactory := informers.NewSharedInformerFactoryWithOptions( + c.K8S, + time.Duration(config.Global.SyncSeconds)*time.Second, + informers.WithTweakListOptions(func(listOpts *metav1.ListOptions) { + listOpts.LabelSelector = ipLabel + }), + ) + podInformer := podInformerFactory.Core().V1().Pods().Informer() + c.podInformer = podInformer + + podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if !ok { + c.Logger.Errorf("received unexpected object type: %T", obj) + return + } + + c.Logger.WithFields(logrus.Fields{ + "namespace": pod.Namespace, + "name": pod.Name, + "node": pod.Spec.NodeName, + }).Info("New pod") + + value := pod.Labels[ipLabel] + ips := parseIPList(value) + if len(ips) == 0 { + c.Logger.Debug("label not present or empty") + return + } + + for ip := range ips { + c.reconcileIP(ip) + } + }, + UpdateFunc: func(oldObj interface{}, newObj interface{}) { + oldPod, ok := oldObj.(*corev1.Pod) + if !ok { + c.Logger.Errorf("received unexpected object type: %T", oldObj) + return + } + newPod, ok := newObj.(*corev1.Pod) + if !ok { + c.Logger.Errorf("received unexpected object type: %T", newObj) + return + } + + c.Logger.WithFields(logrus.Fields{ + "namespace": newPod.Namespace, + "name": newPod.Name, + "node": newPod.Spec.NodeName, + }).Info("Pod updated") + + // diff label values + oldValue := oldPod.Labels[ipLabel] + oldIPs := parseIPList(oldValue) + newValue := newPod.Labels[ipLabel] + newIPs := parseIPList(newValue) + + removedIPs := oldIPs.Diff(newIPs) + for ip := range removedIPs { + c.reconcileIP(ip) + } + + for ip := range newIPs { + c.reconcileIP(ip) + } + }, + DeleteFunc: func(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if !ok { + c.Logger.Errorf("received unexpected object type: %T", obj) + return + } + + c.Logger.WithFields(logrus.Fields{ + "namespace": pod.Namespace, + "name": pod.Name, + "node": pod.Spec.NodeName, + }).Info("Pod deleted") + + value := pod.Labels[ipLabel] + ips := parseIPList(value) + if len(ips) == 0 { + c.Logger.Debug("label not present or empty") + return + } + + for ip := range ips { + c.reconcileIP(ip) + } + }, + }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + podInformer.Run(ctx.Done()) +} + +func parseIPList(value string) stringset.StringSet { + set := make(stringset.StringSet) + ips := strings.Split(value, ",") + for _, ip := range ips { + set.Add(strings.TrimSpace(ip)) + } + return set +} + +func singleSet(value string) stringset.StringSet { + set := make(stringset.StringSet) + set.Add(value) + return set +} + +// only use with a locked knownIPmu! +func (c *Controller) reconcileIP(ip string) { + log := c.Logger.WithField("ip", ip) + + if c.SVCc.HasServiceIP(ip) { + log.Warn("IP is assigned to a service, cannot use manually on a pod") + return + } + + pods := c.podInformer.GetStore().List() + + nodes := make([]string, 0, len(pods)) + for _, pod := range pods { + pod := pod.(*corev1.Pod) + if !utils.PodIsReady(pod) { + continue + } + ips := parseIPList(pod.Labels[config.Global.ManualAssignmentLabel]) + for labelIP := range ips { + if labelIP == ip { + nodes = append(nodes, pod.Spec.NodeName) + break + } + } + } + + ipSet := singleSet(ip) + + if len(nodes) == 0 { + log.Info("None of the pods are ready") + c.FIPc.ForgetAttachments(ipSet) + return + } + + sort.Slice(nodes, func(i, j int) bool { + a := sha256.Sum256([]byte(nodes[i])) + b := sha256.Sum256([]byte(nodes[j])) + return bytes.Compare(a[:], b[:]) > 0 + }) + + electedNode := nodes[0] + c.FIPc.AttachToNode(ipSet, electedNode) + log.WithField("node", electedNode).Info("Attached IP using manual assignment") +} diff --git a/internal/servicecontroller/servicecontroller.go b/internal/servicecontroller/servicecontroller.go index c8c346c..5af82c4 100644 --- a/internal/servicecontroller/servicecontroller.go +++ b/internal/servicecontroller/servicecontroller.go @@ -422,6 +422,18 @@ func (sc *Controller) forgetServiceIPs(svcKey string) { delete(sc.svcIPs, svcKey) } +func (sc *Controller) HasServiceIP(ip string) bool { + sc.svcIPsMu.Lock() + defer sc.svcIPsMu.Unlock() + + for _, ips := range sc.svcIPs { + if ips.Has(ip) { + return true + } + } + return false +} + func getLoadbalancerIPs(svc *corev1.Service) stringset.StringSet { ips := make(stringset.StringSet, len(svc.Status.LoadBalancer.Ingress)) diff --git a/main.go b/main.go index 40b7146..30b0439 100644 --- a/main.go +++ b/main.go @@ -13,6 +13,7 @@ import ( "github.com/costela/hcloud-ip-floater/internal/config" "github.com/costela/hcloud-ip-floater/internal/fipcontroller" + "github.com/costela/hcloud-ip-floater/internal/manualcontroller" "github.com/costela/hcloud-ip-floater/internal/servicecontroller" ) @@ -79,8 +80,16 @@ func main() { FIPc: fipc, } + mc := manualcontroller.Controller{ + Logger: logger, + K8S: k8s, + SVCc: &sc, + FIPc: fipc, + } + go fipc.Run() go sc.Run() + go mc.Run() select {} }