From 39d3a158f8c29bd1ab3989ae06c7250a59ab75c1 Mon Sep 17 00:00:00 2001 From: Fabian Wiesel Date: Thu, 19 Feb 2026 10:42:25 +0100 Subject: [PATCH 1/2] Extract aggregate logic to ApplyAggregates Almost the same logic code was used in the aggregates and the onboarding controller, with the variation that the onboarding controller was also creating the zone aggregate, and the aggregates controller kept aggregates modified outside of the controller untouched. Let's drop that functionality and expect that all aggregates are somehow created externally (i.e. by another controller), and then we can combine that logic in one function, that also can be called by the offboarding controller. Cortex wants to rely on the aggregates in the status, so we keeping extra aggregates is unsupported either way. --- internal/controller/aggregates_controller.go | 40 +-- .../controller/aggregates_controller_test.go | 78 +---- internal/controller/decomission_controller.go | 21 +- .../controller/decomission_controller_test.go | 30 +- internal/controller/onboarding_controller.go | 46 +-- .../controller/onboarding_controller_test.go | 14 +- internal/openstack/aggregates.go | 63 ++++ internal/openstack/aggregates_test.go | 329 ++++++++++++++++++ internal/openstack/suite_test.go | 30 ++ 9 files changed, 457 insertions(+), 194 deletions(-) create mode 100644 internal/openstack/aggregates_test.go create mode 100644 internal/openstack/suite_test.go diff --git a/internal/controller/aggregates_controller.go b/internal/controller/aggregates_controller.go index 7229370d..bb7c4050 100644 --- a/internal/controller/aggregates_controller.go +++ b/internal/controller/aggregates_controller.go @@ -52,7 +52,6 @@ type AggregatesController struct { // +kubebuilder:rbac:groups=kvm.cloud.sap,resources=hypervisors/status,verbs=get;list;watch;create;update;patch;delete func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := logger.FromContext(ctx) hv := &kvmv1.Hypervisor{} if err := ac.Get(ctx, req.NamespacedName, hv); err != nil { return ctrl.Result{}, k8sclient.IgnoreNotFound(err) @@ -69,44 +68,9 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } - aggs, err := openstack.GetAggregatesByName(ctx, ac.computeClient) + err := openstack.ApplyAggregates(ctx, ac.computeClient, hv.Name, hv.Spec.Aggregates) if err != nil { - err = fmt.Errorf("failed listing aggregates: %w", err) - if err2 := ac.setErrorCondition(ctx, hv, err.Error()); err2 != nil { - return ctrl.Result{}, errors.Join(err, err2) - } - return ctrl.Result{}, err - } - - toAdd := Difference(hv.Status.Aggregates, hv.Spec.Aggregates) - toRemove := Difference(hv.Spec.Aggregates, hv.Status.Aggregates) - - // We need to add first the host to the aggregates, because if we first drop - // an aggregate with a filter criterion and then add a new one, we leave the host - // open for period of time. Still, this may fail due to a conflict of aggregates - // with different availability zones, so we collect all the errors and return them - // so it hopefully will converge eventually. - var errs []error - if len(toAdd) > 0 { - log.Info("Adding", "aggregates", toAdd) - for item := range slices.Values(toAdd) { - if err = openstack.AddToAggregate(ctx, ac.computeClient, aggs, hv.Name, item, ""); err != nil { - errs = append(errs, err) - } - } - } - - if len(toRemove) > 0 { - log.Info("Removing", "aggregates", toRemove) - for item := range slices.Values(toRemove) { - if err = openstack.RemoveFromAggregate(ctx, ac.computeClient, aggs, hv.Name, item); err != nil { - errs = append(errs, err) - } - } - } - - if errs != nil { - err = fmt.Errorf("encountered errors during aggregate update: %w", errors.Join(errs...)) + err = fmt.Errorf("failed to apply aggregates: %w", err) if err2 := ac.setErrorCondition(ctx, hv, err.Error()); err2 != nil { return ctrl.Result{}, errors.Join(err, err2) } diff --git a/internal/controller/aggregates_controller_test.go b/internal/controller/aggregates_controller_test.go index 7f714a34..3bb1b1d8 100644 --- a/internal/controller/aggregates_controller_test.go +++ b/internal/controller/aggregates_controller_test.go @@ -302,7 +302,7 @@ var _ = Describe("AggregatesController", func() { Expect(cond.Message).To(ContainSubstring(expectedMessage)) } - Context("when GetAggregates fails", func() { + Context("when ApplyAggregates fails", func() { BeforeEach(func(ctx SpecContext) { By("Setting a missing aggregate") hypervisor := &kvmv1.Hypervisor{} @@ -310,7 +310,7 @@ var _ = Describe("AggregatesController", func() { hypervisor.Spec.Aggregates = []string{"test-aggregate1"} Expect(k8sClient.Update(ctx, hypervisor)).To(Succeed()) - By("Mocking GetAggregates to fail") + By("Mocking GET /os-aggregates to fail (first API call in ApplyAggregates)") fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusInternalServerError) @@ -322,79 +322,7 @@ var _ = Describe("AggregatesController", func() { It("should set error condition", func(ctx SpecContext) { _, err := aggregatesController.Reconcile(ctx, reconcileRequest) Expect(err).To(HaveOccurred()) - sharedErrorConditionChecks(ctx, "failed listing aggregates") - }) - }) - - Context("when AddToAggregate fails", func() { - BeforeEach(func(ctx SpecContext) { - By("Setting a missing aggregate") - hypervisor := &kvmv1.Hypervisor{} - Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed()) - hypervisor.Spec.Aggregates = []string{"test-aggregate1"} - Expect(k8sClient.Update(ctx, hypervisor)).To(Succeed()) - - By("Mocking GetAggregates") - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, AggregateListBodyEmpty) - Expect(err).NotTo(HaveOccurred()) - }) - - By("Mocking CreateAggregate") - fakeServer.Mux.HandleFunc("POST /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, AggregatesPostBody) - Expect(err).NotTo(HaveOccurred()) - }) - - By("Mocking AddHost to fail") - fakeServer.Mux.HandleFunc("POST /os-aggregates/42/action", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusConflict) - _, err := fmt.Fprint(w, `{"conflictingRequest": {"message": "Cannot add host to aggregate", "code": 409}}`) - Expect(err).NotTo(HaveOccurred()) - }) - }) - - It("should set error condition", func(ctx SpecContext) { - _, err := aggregatesController.Reconcile(ctx, reconcileRequest) - Expect(err).To(HaveOccurred()) - sharedErrorConditionChecks(ctx, "encountered errors during aggregate update") - }) - }) - - Context("when RemoveFromAggregate fails", func() { - BeforeEach(func(ctx SpecContext) { - By("Setting existing aggregate in status") - hypervisor := &kvmv1.Hypervisor{} - Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed()) - hypervisor.Status.Aggregates = []string{"test-aggregate2"} - Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) - - By("Mocking GetAggregates") - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, AggregateListBodyFull) - Expect(err).NotTo(HaveOccurred()) - }) - - By("Mocking RemoveHost to fail") - fakeServer.Mux.HandleFunc("POST /os-aggregates/100001/action", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusConflict) - _, err := fmt.Fprint(w, `{"conflictingRequest": {"message": "Cannot remove host from aggregate", "code": 409}}`) - Expect(err).NotTo(HaveOccurred()) - }) - }) - - It("should set error condition", func(ctx SpecContext) { - _, err := aggregatesController.Reconcile(ctx, reconcileRequest) - Expect(err).To(HaveOccurred()) - sharedErrorConditionChecks(ctx, "encountered errors during aggregate update") + sharedErrorConditionChecks(ctx, "failed to get aggregates") }) }) diff --git a/internal/controller/decomission_controller.go b/internal/controller/decomission_controller.go index 3181adf4..2988a852 100644 --- a/internal/controller/decomission_controller.go +++ b/internal/controller/decomission_controller.go @@ -22,10 +22,8 @@ import ( "errors" "fmt" "net/http" - "slices" "github.com/gophercloud/gophercloud/v2" - "github.com/gophercloud/gophercloud/v2/openstack/compute/v2/aggregates" "github.com/gophercloud/gophercloud/v2/openstack/compute/v2/services" "github.com/gophercloud/gophercloud/v2/openstack/placement/v1/resourceproviders" "k8s.io/apimachinery/pkg/api/meta" @@ -115,22 +113,11 @@ func (r *NodeDecommissionReconciler) Reconcile(ctx context.Context, req ctrl.Req return r.setDecommissioningCondition(ctx, hv, msg) } - // Before removing the service, first take the node out of the aggregates, - // so when the node comes back, it doesn't up with the old associations - aggs, err := openstack.GetAggregatesByName(ctx, r.computeClient) - if err != nil { - return r.setDecommissioningCondition(ctx, hv, fmt.Sprintf("cannot list aggregates due to %v", err)) - } - + // Before removing the service, first take the node out of all aggregates, + // so when the node comes back, it doesn't end up with the old associations host := hv.Name - for name, aggregate := range aggs { - if slices.Contains(aggregate.Hosts, host) { - opts := aggregates.RemoveHostOpts{Host: host} - if err = aggregates.RemoveHost(ctx, r.computeClient, aggregate.ID, opts).Err; err != nil { - msg := fmt.Sprintf("failed to remove host %v from aggregate %v due to %v", name, host, err) - return r.setDecommissioningCondition(ctx, hv, msg) - } - } + if err := openstack.ApplyAggregates(ctx, r.computeClient, host, []string{}); err != nil { + return r.setDecommissioningCondition(ctx, hv, fmt.Sprintf("failed to remove host from aggregates: %v", err)) } // Deleting and evicted, so better delete the service diff --git a/internal/controller/decomission_controller_test.go b/internal/controller/decomission_controller_test.go index 06956bec..db695857 100644 --- a/internal/controller/decomission_controller_test.go +++ b/internal/controller/decomission_controller_test.go @@ -395,7 +395,7 @@ var _ = Describe("Decommission Controller", func() { }) }) - Context("When removing host from aggregate fails", func() { + Context("When ApplyAggregates fails", func() { BeforeEach(func() { fakeServer.Mux.HandleFunc("GET /os-hypervisors/detail", func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") @@ -416,37 +416,11 @@ var _ = Describe("Decommission Controller", func() { }`) }) + // Mock only the first API call in ApplyAggregates (GET /os-aggregates) to fail fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - fmt.Fprint(w, AggregateListWithHv) - }) - - fakeServer.Mux.HandleFunc("POST /os-aggregates/100001/action", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) fmt.Fprint(w, `{"error": "Internal Server Error"}`) }) - - // Add handlers for subsequent steps even though we expect failure earlier - fakeServer.Mux.HandleFunc("DELETE /os-services/service-1234", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusNoContent) - }) - - fakeServer.Mux.HandleFunc("GET /resource_providers/c48f6247-abe4-4a24-824e-ea39e108874f", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - fmt.Fprint(w, `{"uuid": "rp-uuid", "name": "hv-test"}`) - }) - - fakeServer.Mux.HandleFunc("GET /resource_providers/rp-uuid/allocations", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - fmt.Fprint(w, `{"allocations": {}}`) - }) - - fakeServer.Mux.HandleFunc("DELETE /resource_providers/rp-uuid", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusAccepted) - }) }) It("should set decommissioning condition with error", func(ctx SpecContext) { diff --git a/internal/controller/onboarding_controller.go b/internal/controller/onboarding_controller.go index 142c26d5..e8bd24a7 100644 --- a/internal/controller/onboarding_controller.go +++ b/internal/controller/onboarding_controller.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "net/http" - "slices" "strings" "time" @@ -197,34 +196,10 @@ func (r *OnboardingController) initialOnboarding(ctx context.Context, hv *kvmv1. return fmt.Errorf("cannot find availability-zone label %v on node", corev1.LabelTopologyZone) } - aggs, err := openstack.GetAggregatesByName(ctx, r.computeClient) - if err != nil { - return fmt.Errorf("cannot list aggregates %w", err) - } - - if err = openstack.AddToAggregate(ctx, r.computeClient, aggs, host, zone, zone); err != nil { - return fmt.Errorf("failed to agg to availability-zone aggregate %w", err) - } - - err = openstack.AddToAggregate(ctx, r.computeClient, aggs, host, testAggregateName, "") - if err != nil { - return fmt.Errorf("failed to agg to test aggregate %w", err) - } - - var errs []error - for aggregateName, aggregate := range aggs { - if aggregateName == testAggregateName || aggregateName == zone { - continue - } - if slices.Contains(aggregate.Hosts, host) { - if err := openstack.RemoveFromAggregate(ctx, r.computeClient, aggs, host, aggregateName); err != nil { - errs = append(errs, err) - } - } - } - - if len(errs) > 0 { - return fmt.Errorf("failed to remove host %v from aggregates due to %w", host, errors.Join(errs...)) + // Apply the desired aggregate state (zone and test aggregate only) + desiredAggregates := []string{zone, testAggregateName} + if err := openstack.ApplyAggregates(ctx, r.computeClient, host, desiredAggregates); err != nil { + return fmt.Errorf("failed to apply aggregates: %w", err) } // The service may be forced down previously due to an HA event, @@ -345,14 +320,15 @@ func (r *OnboardingController) completeOnboarding(ctx context.Context, host stri return ctrl.Result{}, err } - aggs, err := openstack.GetAggregatesByName(ctx, r.computeClient) - if err != nil { - return ctrl.Result{}, fmt.Errorf("failed to get aggregates %w", err) + zone, found := hv.Labels[corev1.LabelTopologyZone] + if !found || zone == "" { + return ctrl.Result{}, fmt.Errorf("cannot find availability-zone label %v on node", corev1.LabelTopologyZone) } - err = openstack.RemoveFromAggregate(ctx, r.computeClient, aggs, host, testAggregateName) - if err != nil { - return ctrl.Result{}, fmt.Errorf("failed to remove from test aggregate %w", err) + // Remove host from test aggregate by only keeping the zone aggregate + desiredAggregates := []string{zone} + if err := openstack.ApplyAggregates(ctx, r.computeClient, host, desiredAggregates); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to apply aggregates: %w", err) } log.Info("removed from test-aggregate", "name", testAggregateName) diff --git a/internal/controller/onboarding_controller_test.go b/internal/controller/onboarding_controller_test.go index bc2fb895..c5706bed 100644 --- a/internal/controller/onboarding_controller_test.go +++ b/internal/controller/onboarding_controller_test.go @@ -455,6 +455,8 @@ var _ = Describe("Onboarding Controller", func() { }) Expect(k8sClient.Status().Update(ctx, hv)).To(Succeed()) + // Mock for ApplyAggregates during completeOnboarding + // Returns aggregates with host in both test-az and tenant_filter_tests fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -490,10 +492,20 @@ var _ = Describe("Onboarding Controller", func() { Expect(err).NotTo(HaveOccurred()) }) + // Mock for removing host from test aggregate (ApplyAggregates will call this) fakeServer.Mux.HandleFunc("POST /os-aggregates/99/action", func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, addedHostToTestBody) + // Return aggregate without the host after removal + _, err := fmt.Fprint(w, `{ + "aggregate": { + "name": "tenant_filter_tests", + "availability_zone": "", + "deleted": false, + "hosts": [], + "id": 99 + } + }`) Expect(err).NotTo(HaveOccurred()) }) diff --git a/internal/openstack/aggregates.go b/internal/openstack/aggregates.go index 1025adac..b7a67e7f 100644 --- a/internal/openstack/aggregates.go +++ b/internal/openstack/aggregates.go @@ -16,6 +16,7 @@ package openstack import ( "context" + "errors" "fmt" "slices" @@ -105,3 +106,65 @@ func RemoveFromAggregate(ctx context.Context, serviceClient *gophercloud.Service return nil } + +// ApplyAggregates ensures a host is in exactly the specified aggregates. +// It adds the host to missing aggregates and removes it from extra ones. +// Pass an empty list to remove the host from all aggregates. +// Aggregates must already exist - this function will not create them. +func ApplyAggregates(ctx context.Context, serviceClient *gophercloud.ServiceClient, host string, desiredAggregates []string) error { + log := logger.FromContext(ctx) + + aggs, err := GetAggregatesByName(ctx, serviceClient) + if err != nil { + return fmt.Errorf("failed to get aggregates: %w", err) + } + + // Build desired set for O(1) lookups + desiredSet := make(map[string]bool, len(desiredAggregates)) + for _, name := range desiredAggregates { + desiredSet[name] = true + } + + // We need to add the host to aggregates first, because if we first drop + // an aggregate with a filter criterion and then add a new one, we leave the host + // open for a period of time. Still, this may fail due to a conflict of aggregates + // with different availability zones, so we collect all the errors and return them + // so it hopefully will converge eventually. + var errs []error + var toRemove []string + + // First, add to any desired aggregates (including creating them if needed) + for _, name := range desiredAggregates { + aggregate, exists := aggs[name] + if !exists || !slices.Contains(aggregate.Hosts, host) { + // Aggregate doesn't exist or host not in it - add immediately + log.Info("Adding to aggregate", "aggregate", name, "host", host) + if err := AddToAggregate(ctx, serviceClient, aggs, host, name, ""); err != nil { + errs = append(errs, err) + } + } + } + + // Second, collect aggregates to remove from (host is in but shouldn't be) + for name, aggregate := range aggs { + if slices.Contains(aggregate.Hosts, host) && !desiredSet[name] { + toRemove = append(toRemove, name) + } + } + + // Remove after all additions are complete + if len(toRemove) > 0 { + log.Info("Removing from aggregates", "aggregates", toRemove, "host", host) + for _, name := range toRemove { + if err := RemoveFromAggregate(ctx, serviceClient, aggs, host, name); err != nil { + errs = append(errs, err) + } + } + } + + if len(errs) > 0 { + return fmt.Errorf("encountered errors during aggregate update: %w", errors.Join(errs...)) + } + + return nil +} diff --git a/internal/openstack/aggregates_test.go b/internal/openstack/aggregates_test.go new file mode 100644 index 00000000..d0758bce --- /dev/null +++ b/internal/openstack/aggregates_test.go @@ -0,0 +1,329 @@ +/* +SPDX-FileCopyrightText: Copyright 2025 SAP SE or an SAP affiliate company and cobaltcore-dev contributors +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package openstack + +import ( + "context" + "fmt" + "net/http" + + "github.com/gophercloud/gophercloud/v2/testhelper" + "github.com/gophercloud/gophercloud/v2/testhelper/client" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("ApplyAggregates", func() { + const ( + aggregateListWithHost = `{ + "aggregates": [ + { + "name": "agg1", + "availability_zone": "az1", + "deleted": false, + "id": 1, + "hosts": ["test-host"] + }, + { + "name": "agg2", + "availability_zone": "az2", + "deleted": false, + "id": 2, + "hosts": ["test-host"] + }, + { + "name": "agg3", + "availability_zone": "az3", + "deleted": false, + "id": 3, + "hosts": [] + } + ] + }` + + aggregateAddHostResponse = `{ + "aggregate": { + "name": "agg3", + "availability_zone": "az3", + "deleted": false, + "id": 3, + "hosts": ["test-host"] + } + }` + + aggregateRemoveHostResponse = `{ + "aggregate": { + "name": "agg1", + "availability_zone": "az1", + "deleted": false, + "id": 1, + "hosts": [] + } + }` + ) + + var ( + fakeServer testhelper.FakeServer + ctx context.Context + ) + + BeforeEach(func() { + fakeServer = testhelper.SetupHTTP() + ctx = context.Background() + }) + + AfterEach(func() { + fakeServer.Teardown() + }) + + Context("when adding host to new aggregate", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + + fakeServer.Mux.HandleFunc("POST /os-aggregates/3/action", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateAddHostResponse) + }) + }) + + It("should add host to agg3", func() { + serviceClient := client.ServiceClient(fakeServer) + err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1", "agg2", "agg3"}) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Context("when removing host from aggregate", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + + fakeServer.Mux.HandleFunc("POST /os-aggregates/1/action", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateRemoveHostResponse) + }) + }) + + It("should remove host from agg1", func() { + serviceClient := client.ServiceClient(fakeServer) + err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg2"}) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Context("when removing host from all aggregates", func() { + var removeCalls int + + BeforeEach(func() { + removeCalls = 0 + + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + + fakeServer.Mux.HandleFunc("POST /os-aggregates/1/action", func(w http.ResponseWriter, r *http.Request) { + removeCalls++ + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateRemoveHostResponse) + }) + + fakeServer.Mux.HandleFunc("POST /os-aggregates/2/action", func(w http.ResponseWriter, r *http.Request) { + removeCalls++ + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"aggregate": {"name": "agg2", "id": 2, "hosts": []}}`) + }) + }) + + It("should remove host from all aggregates", func() { + serviceClient := client.ServiceClient(fakeServer) + err := ApplyAggregates(ctx, serviceClient, "test-host", []string{}) + Expect(err).NotTo(HaveOccurred()) + Expect(removeCalls).To(Equal(2)) + }) + }) + + Context("when host already in desired aggregates", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + }) + + It("should not make any changes", func() { + serviceClient := client.ServiceClient(fakeServer) + err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1", "agg2"}) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Context("when adding and removing simultaneously", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + + fakeServer.Mux.HandleFunc("POST /os-aggregates/3/action", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateAddHostResponse) + }) + + fakeServer.Mux.HandleFunc("POST /os-aggregates/1/action", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateRemoveHostResponse) + }) + }) + + It("should replace agg1 with agg3", func() { + serviceClient := client.ServiceClient(fakeServer) + err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg2", "agg3"}) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Context("when listing aggregates fails", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprint(w, `{"error": "Internal Server Error"}`) + }) + }) + + It("should return an error", func() { + serviceClient := client.ServiceClient(fakeServer) + err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1"}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to get aggregates")) + }) + }) + + Context("when adding host fails", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + + fakeServer.Mux.HandleFunc("POST /os-aggregates/3/action", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusConflict) + fmt.Fprint(w, `{"conflictingRequest": {"message": "Cannot add host", "code": 409}}`) + }) + }) + + It("should return an error", func() { + serviceClient := client.ServiceClient(fakeServer) + err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1", "agg2", "agg3"}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("encountered errors during aggregate update")) + }) + }) + + Context("when removing host fails", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + + fakeServer.Mux.HandleFunc("POST /os-aggregates/1/action", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusConflict) + fmt.Fprint(w, `{"conflictingRequest": {"message": "Cannot remove host", "code": 409}}`) + }) + }) + + It("should return an error", func() { + serviceClient := client.ServiceClient(fakeServer) + err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg2"}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("encountered errors during aggregate update")) + }) + }) + + Context("when host not in any aggregates", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + + fakeServer.Mux.HandleFunc("POST /os-aggregates/3/action", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateAddHostResponse) + }) + }) + + It("should add new host to aggregate", func() { + serviceClient := client.ServiceClient(fakeServer) + err := ApplyAggregates(ctx, serviceClient, "new-host", []string{"agg3"}) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Context("when both adding and removing fail", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + + // Add to agg3 fails + fakeServer.Mux.HandleFunc("POST /os-aggregates/3/action", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusConflict) + fmt.Fprint(w, `{"conflictingRequest": {"message": "Cannot add host", "code": 409}}`) + }) + + // Remove from agg1 fails + fakeServer.Mux.HandleFunc("POST /os-aggregates/1/action", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusConflict) + fmt.Fprint(w, `{"conflictingRequest": {"message": "Cannot remove host", "code": 409}}`) + }) + }) + + It("should return combined errors", func() { + serviceClient := client.ServiceClient(fakeServer) + err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg2", "agg3"}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("encountered errors during aggregate update")) + // Verify it's a joined error with multiple failures + Expect(err.Error()).To(Or(ContainSubstring("Cannot add"), ContainSubstring("Cannot remove"))) + }) + }) +}) diff --git a/internal/openstack/suite_test.go b/internal/openstack/suite_test.go new file mode 100644 index 00000000..8a41f2b0 --- /dev/null +++ b/internal/openstack/suite_test.go @@ -0,0 +1,30 @@ +/* +SPDX-FileCopyrightText: Copyright 2025 SAP SE or an SAP affiliate company and cobaltcore-dev contributors +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package openstack + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestOpenstack(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Openstack Suite") +} From 56118954fcae6e1479737bba9d4c129aef07c727 Mon Sep 17 00:00:00 2001 From: Fabian Wiesel Date: Thu, 26 Feb 2026 11:00:15 +0100 Subject: [PATCH 2/2] Only Aggregate contoller changes aggregates and store uuids For Cortex, we want the uuids in addition to the names, so keep track of them in the status. Let's keep only the aggregates controller changing aggregates. --- api/v1/hypervisor_types.go | 19 +- api/v1/zz_generated.deepcopy.go | 5 + .../api/v1/hypervisorstatus.go | 11 + .../crds/hypervisor-crd.yaml | 7 + .../crd/bases/kvm.cloud.sap_hypervisors.yaml | 7 + internal/controller/aggregates_controller.go | 126 ++++++--- .../controller/aggregates_controller_test.go | 253 +++++++++++++++-- internal/controller/constants.go | 1 + internal/controller/decomission_controller.go | 9 +- .../controller/decomission_controller_test.go | 167 +++--------- internal/controller/onboarding_controller.go | 78 ++++-- .../controller/onboarding_controller_test.go | 258 ++++++------------ internal/openstack/aggregates.go | 177 +++++------- internal/openstack/aggregates_test.go | 81 ++++-- 14 files changed, 674 insertions(+), 525 deletions(-) diff --git a/api/v1/hypervisor_types.go b/api/v1/hypervisor_types.go index 185f7365..c6bf4dc1 100644 --- a/api/v1/hypervisor_types.go +++ b/api/v1/hypervisor_types.go @@ -61,10 +61,17 @@ const ( ConditionReasonReadyEvicting = "Evicting" // ConditionTypeOnboarding reasons - ConditionReasonInitial = "Initial" - ConditionReasonOnboarding = "Onboarding" - ConditionReasonTesting = "Testing" - ConditionReasonAborted = "Aborted" + ConditionReasonInitial = "Initial" + ConditionReasonOnboarding = "Onboarding" + ConditionReasonTesting = "Testing" + ConditionReasonRemovingTestAggregate = "RemovingTestAggregate" + ConditionReasonAborted = "Aborted" + + // ConditionTypeAggregatesUpdated reasons + // Note: ConditionReasonSucceeded and ConditionReasonFailed are shared with eviction_types.go + ConditionReasonTestAggregates = "TestAggregates" + ConditionReasonTerminating = "Terminating" + ConditionReasonEvictionInProgress = "EvictionInProgress" ) // HypervisorSpec defines the desired state of Hypervisor @@ -341,6 +348,10 @@ type HypervisorStatus struct { // Aggregates are the applied aggregates of the hypervisor. Aggregates []string `json:"aggregates,omitempty"` + // +kubebuilder:default:={} + // The UUIDs of the aggregates are used to apply aggregates to the hypervisor. + AggregateUUIDs []string `json:"aggregateUUIDs,omitempty"` + // InternalIP is the internal IP address of the hypervisor. InternalIP string `json:"internalIp,omitempty"` diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index dec733a9..a55c9bde 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -351,6 +351,11 @@ func (in *HypervisorStatus) DeepCopyInto(out *HypervisorStatus) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.AggregateUUIDs != nil { + in, out := &in.AggregateUUIDs, &out.AggregateUUIDs + *out = make([]string, len(*in)) + copy(*out, *in) + } if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions *out = make([]metav1.Condition, len(*in)) diff --git a/applyconfigurations/api/v1/hypervisorstatus.go b/applyconfigurations/api/v1/hypervisorstatus.go index 45c24f55..dbb15244 100644 --- a/applyconfigurations/api/v1/hypervisorstatus.go +++ b/applyconfigurations/api/v1/hypervisorstatus.go @@ -25,6 +25,7 @@ type HypervisorStatusApplyConfiguration struct { ServiceID *string `json:"serviceId,omitempty"` Traits []string `json:"traits,omitempty"` Aggregates []string `json:"aggregates,omitempty"` + AggregateUUIDs []string `json:"aggregateUUIDs,omitempty"` InternalIP *string `json:"internalIp,omitempty"` Evicted *bool `json:"evicted,omitempty"` Conditions []metav1.ConditionApplyConfiguration `json:"conditions,omitempty"` @@ -183,6 +184,16 @@ func (b *HypervisorStatusApplyConfiguration) WithAggregates(values ...string) *H return b } +// WithAggregateUUIDs adds the given value to the AggregateUUIDs field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the AggregateUUIDs field. +func (b *HypervisorStatusApplyConfiguration) WithAggregateUUIDs(values ...string) *HypervisorStatusApplyConfiguration { + for i := range values { + b.AggregateUUIDs = append(b.AggregateUUIDs, values[i]) + } + return b +} + // WithInternalIP sets the InternalIP field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the InternalIP field is set to the value of the last call. diff --git a/charts/openstack-hypervisor-operator/crds/hypervisor-crd.yaml b/charts/openstack-hypervisor-operator/crds/hypervisor-crd.yaml index 521e67a3..a8993754 100644 --- a/charts/openstack-hypervisor-operator/crds/hypervisor-crd.yaml +++ b/charts/openstack-hypervisor-operator/crds/hypervisor-crd.yaml @@ -188,6 +188,13 @@ spec: status: description: HypervisorStatus defines the observed state of Hypervisor properties: + aggregateUUIDs: + default: [] + description: The UUIDs of the aggregates are used to apply aggregates + to the hypervisor. + items: + type: string + type: array aggregates: description: Aggregates are the applied aggregates of the hypervisor. items: diff --git a/config/crd/bases/kvm.cloud.sap_hypervisors.yaml b/config/crd/bases/kvm.cloud.sap_hypervisors.yaml index 348504c7..6aa2bb49 100644 --- a/config/crd/bases/kvm.cloud.sap_hypervisors.yaml +++ b/config/crd/bases/kvm.cloud.sap_hypervisors.yaml @@ -189,6 +189,13 @@ spec: status: description: HypervisorStatus defines the observed state of Hypervisor properties: + aggregateUUIDs: + default: [] + description: The UUIDs of the aggregates are used to apply aggregates + to the hypervisor. + items: + type: string + type: array aggregates: description: Aggregates are the applied aggregates of the hypervisor. items: diff --git a/internal/controller/aggregates_controller.go b/internal/controller/aggregates_controller.go index bb7c4050..14715272 100644 --- a/internal/controller/aggregates_controller.go +++ b/internal/controller/aggregates_controller.go @@ -23,6 +23,8 @@ import ( "fmt" "slices" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -57,56 +59,111 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, k8sclient.IgnoreNotFound(err) } - /// On- and off-boarding need to mess with the aggregates, so let's get out of their way - if !meta.IsStatusConditionFalse(hv.Status.Conditions, kvmv1.ConditionTypeOnboarding) || - meta.IsStatusConditionTrue(hv.Status.Conditions, kvmv1.ConditionTypeTerminating) { + // Wait for onboarding controller to populate HypervisorID and ServiceID + // before attempting to modify aggregates + if hv.Status.HypervisorID == "" || hv.Status.ServiceID == "" { return ctrl.Result{}, nil } - if slices.Equal(hv.Spec.Aggregates, hv.Status.Aggregates) { - // Nothing to be done - return ctrl.Result{}, nil + base := hv.DeepCopy() + desiredAggregates, desiredCondition := ac.determineDesiredState(hv) + + if !slices.Equal(desiredAggregates, hv.Status.Aggregates) { + // Apply aggregates to OpenStack and update status + uuids, err := openstack.ApplyAggregates(ctx, ac.computeClient, hv.Name, desiredAggregates) + if err != nil { + // Set error condition + condition := metav1.Condition{ + Type: kvmv1.ConditionTypeAggregatesUpdated, + Status: metav1.ConditionFalse, + Reason: kvmv1.ConditionReasonFailed, + Message: fmt.Errorf("failed to apply aggregates: %w", err).Error(), + } + + if meta.SetStatusCondition(&hv.Status.Conditions, condition) { + if err2 := ac.Status().Patch(ctx, hv, k8sclient.MergeFromWithOptions(base, + k8sclient.MergeFromWithOptimisticLock{}), k8sclient.FieldOwner(AggregatesControllerName)); err2 != nil { + return ctrl.Result{}, errors.Join(err, err2) + } + } + return ctrl.Result{}, err + } + + hv.Status.Aggregates = desiredAggregates + hv.Status.AggregateUUIDs = uuids } - err := openstack.ApplyAggregates(ctx, ac.computeClient, hv.Name, hv.Spec.Aggregates) - if err != nil { - err = fmt.Errorf("failed to apply aggregates: %w", err) - if err2 := ac.setErrorCondition(ctx, hv, err.Error()); err2 != nil { - return ctrl.Result{}, errors.Join(err, err2) - } - return ctrl.Result{}, err + // Set the condition based on the determined desired state + meta.SetStatusCondition(&hv.Status.Conditions, desiredCondition) + + if equality.Semantic.DeepEqual(base, hv) { + return ctrl.Result{}, nil } - base := hv.DeepCopy() - hv.Status.Aggregates = hv.Spec.Aggregates - meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ - Type: kvmv1.ConditionTypeAggregatesUpdated, - Status: metav1.ConditionTrue, - Reason: kvmv1.ConditionReasonSucceeded, - Message: "Aggregates updated successfully", - }) return ctrl.Result{}, ac.Status().Patch(ctx, hv, k8sclient.MergeFromWithOptions(base, k8sclient.MergeFromWithOptimisticLock{}), k8sclient.FieldOwner(AggregatesControllerName)) } -// setErrorCondition sets the error condition on the Hypervisor status, returns error if update fails -func (ac *AggregatesController) setErrorCondition(ctx context.Context, hv *kvmv1.Hypervisor, msg string) error { - condition := metav1.Condition{ - Type: kvmv1.ConditionTypeAggregatesUpdated, - Status: metav1.ConditionFalse, - Reason: kvmv1.ConditionReasonFailed, - Message: msg, +// determineDesiredState returns the desired aggregates and the corresponding condition +// based on the hypervisor's current state. The condition status is True only when +// spec aggregates are being applied. Otherwise, it's False with a reason explaining +// why different aggregates are applied. +func (ac *AggregatesController) determineDesiredState(hv *kvmv1.Hypervisor) ([]string, metav1.Condition) { + // If terminating AND evicted, remove from all aggregates + // We must wait for eviction to complete before removing aggregates + if meta.IsStatusConditionTrue(hv.Status.Conditions, kvmv1.ConditionTypeTerminating) { + evictingCondition := meta.FindStatusCondition(hv.Status.Conditions, kvmv1.ConditionTypeEvicting) + // Only remove aggregates if eviction is complete (Evicting=False) + // If Evicting condition is not set or still True, keep current aggregates + if evictingCondition != nil && evictingCondition.Status == metav1.ConditionFalse { + return []string{}, metav1.Condition{ + Type: kvmv1.ConditionTypeAggregatesUpdated, + Status: metav1.ConditionFalse, + Reason: kvmv1.ConditionReasonTerminating, + Message: "Aggregates cleared due to termination after eviction", + } + } + // Still evicting or eviction not started - keep current aggregates + return hv.Status.Aggregates, metav1.Condition{ + Type: kvmv1.ConditionTypeAggregatesUpdated, + Status: metav1.ConditionFalse, + Reason: kvmv1.ConditionReasonEvictionInProgress, + Message: "Aggregates unchanged while terminating and eviction in progress", + } } - base := hv.DeepCopy() - if meta.SetStatusCondition(&hv.Status.Conditions, condition) { - if err := ac.Status().Patch(ctx, hv, k8sclient.MergeFromWithOptions(base, - k8sclient.MergeFromWithOptimisticLock{}), k8sclient.FieldOwner(AggregatesControllerName)); err != nil { - return err + // If onboarding is in progress (Initial or Testing), add test aggregate + onboardingCondition := meta.FindStatusCondition(hv.Status.Conditions, kvmv1.ConditionTypeOnboarding) + if onboardingCondition != nil && onboardingCondition.Status == metav1.ConditionTrue { + if onboardingCondition.Reason == kvmv1.ConditionReasonInitial || + onboardingCondition.Reason == kvmv1.ConditionReasonTesting { + zone := hv.Labels[corev1.LabelTopologyZone] + return []string{zone, testAggregateName}, metav1.Condition{ + Type: kvmv1.ConditionTypeAggregatesUpdated, + Status: metav1.ConditionFalse, + Reason: kvmv1.ConditionReasonTestAggregates, + Message: "Test aggregate applied during onboarding instead of spec aggregates", + } + } + + // If removing test aggregate, use Spec.Aggregates (no test aggregate) + if onboardingCondition.Reason == kvmv1.ConditionReasonRemovingTestAggregate { + return hv.Spec.Aggregates, metav1.Condition{ + Type: kvmv1.ConditionTypeAggregatesUpdated, + Status: metav1.ConditionTrue, + Reason: kvmv1.ConditionReasonSucceeded, + Message: "Aggregates from spec applied successfully", + } } } - return nil + // Normal operations or onboarding complete: use Spec.Aggregates + return hv.Spec.Aggregates, metav1.Condition{ + Type: kvmv1.ConditionTypeAggregatesUpdated, + Status: metav1.ConditionTrue, + Reason: kvmv1.ConditionReasonSucceeded, + Message: "Aggregates from spec applied successfully", + } } // SetupWithManager sets up the controller with the Manager. @@ -118,7 +175,6 @@ func (ac *AggregatesController) SetupWithManager(mgr ctrl.Manager) error { if ac.computeClient, err = openstack.GetServiceClient(ctx, "compute", nil); err != nil { return err } - ac.computeClient.Microversion = "2.40" // gophercloud only supports numeric ids return ctrl.NewControllerManagedBy(mgr). Named(AggregatesControllerName). diff --git a/internal/controller/aggregates_controller_test.go b/internal/controller/aggregates_controller_test.go index 3bb1b1d8..fe60055c 100644 --- a/internal/controller/aggregates_controller_test.go +++ b/internal/controller/aggregates_controller_test.go @@ -25,6 +25,7 @@ import ( "github.com/gophercloud/gophercloud/v2/testhelper/client" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -50,6 +51,7 @@ var _ = Describe("AggregatesController", func() { "availability_zone": "", "deleted": false, "id": 100001, + "uuid": "uuid-100001", "hosts": ["hv-test"] }, { @@ -57,29 +59,21 @@ var _ = Describe("AggregatesController", func() { "availability_zone": "", "deleted": false, "id": 99, + "uuid": "uuid-99", "hosts": ["hv-test"] } ] } ` - AggregatesPostBody = ` -{ - "aggregate": { - "name": "test-aggregate1", - "availability_zone": "", - "deleted": false, - "id": 42 - } -}` - AggregateRemoveHostBody = ` { "aggregate": { "name": "test-aggregate3", "availability_zone": "", "deleted": false, - "id": 99 + "id": 99, + "uuid": "uuid-99" } }` @@ -92,7 +86,8 @@ var _ = Describe("AggregatesController", func() { "hosts": [ "hv-test" ], - "id": 42 + "id": 42, + "uuid": "uuid-42" } }` ) @@ -118,6 +113,9 @@ var _ = Describe("AggregatesController", func() { hypervisor := &kvmv1.Hypervisor{ ObjectMeta: metav1.ObjectMeta{ Name: hypervisorName.Name, + Labels: map[string]string{ + corev1.LabelTopologyZone: "zone-a", + }, }, Spec: kvmv1.HypervisorSpec{ LifecycleEnabled: true, @@ -138,6 +136,12 @@ var _ = Describe("AggregatesController", func() { }) Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) + By("Setting HypervisorID and ServiceID in status") + Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed()) + hypervisor.Status.HypervisorID = "test-hypervisor-id" + hypervisor.Status.ServiceID = "test-service-id" + Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) + By("Creating the AggregatesController") aggregatesController = &AggregatesController{ Client: k8sClient, @@ -153,27 +157,205 @@ var _ = Describe("AggregatesController", func() { Expect(result).To(Equal(ctrl.Result{})) }) - Context("Adding new Aggregate", func() { + Context("During onboarding phase", func() { BeforeEach(func(ctx SpecContext) { - By("Setting a missing aggregate") + By("Setting onboarding condition to true") hypervisor := &kvmv1.Hypervisor{} Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed()) - hypervisor.Spec.Aggregates = []string{"test-aggregate1"} + meta.SetStatusCondition(&hypervisor.Status.Conditions, metav1.Condition{ + Type: kvmv1.ConditionTypeOnboarding, + Status: metav1.ConditionTrue, + Reason: "Testing", + Message: "Onboarding in progress", + }) + Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) + + By("Setting desired aggregates including test aggregate") + Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed()) + hypervisor.Spec.Aggregates = []string{"zone-a"} Expect(k8sClient.Update(ctx, hypervisor)).To(Succeed()) - By("Mocking GetAggregates to return empty list") + By("Mocking GetAggregates to return aggregates without host") + aggregateList := `{ + "aggregates": [ + { + "name": "zone-a", + "availability_zone": "zone-a", + "deleted": false, + "id": 1, + "uuid": "uuid-zone-a", + "hosts": [] + }, + { + "name": "tenant_filter_tests", + "availability_zone": "", + "deleted": false, + "id": 99, + "uuid": "uuid-test", + "hosts": [] + } + ] + }` fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, AggregateListBodyEmpty) - Expect(err).NotTo(HaveOccurred()) + fmt.Fprint(w, aggregateList) + }) + + By("Mocking AddHost for both aggregates") + fakeServer.Mux.HandleFunc("POST /os-aggregates/1/action", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"aggregate": {"name": "zone-a", "id": 1, "uuid": "uuid-zone-a", "hosts": ["hv-test"]}}`) + }) + fakeServer.Mux.HandleFunc("POST /os-aggregates/99/action", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"aggregate": {"name": "tenant_filter_tests", "id": 99, "uuid": "uuid-test", "hosts": ["hv-test"]}}`) + }) + }) + + It("should add host to both specified aggregates and test aggregate", func(ctx SpecContext) { + updated := &kvmv1.Hypervisor{} + Expect(aggregatesController.Client.Get(ctx, hypervisorName, updated)).To(Succeed()) + Expect(updated.Status.Aggregates).To(ConsistOf("zone-a", testAggregateName)) + Expect(updated.Status.AggregateUUIDs).To(ConsistOf("uuid-zone-a", "uuid-test")) + + // During onboarding with test aggregate, condition should be False with TestAggregates reason + Expect(meta.IsStatusConditionFalse(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated)).To(BeTrue()) + cond := meta.FindStatusCondition(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated) + Expect(cond).NotTo(BeNil()) + Expect(cond.Reason).To(Equal(kvmv1.ConditionReasonTestAggregates)) + Expect(cond.Message).To(ContainSubstring("Test aggregate applied during onboarding")) + }) + }) + + Context("During normal operations", func() { + BeforeEach(func(ctx SpecContext) { + By("Setting desired aggregates") + hypervisor := &kvmv1.Hypervisor{} + Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed()) + hypervisor.Spec.Aggregates = []string{"zone-a", "zone-b"} + Expect(k8sClient.Update(ctx, hypervisor)).To(Succeed()) + + By("Mocking GetAggregates") + aggregateList := `{ + "aggregates": [ + { + "name": "zone-a", + "availability_zone": "zone-a", + "deleted": false, + "id": 1, + "uuid": "uuid-zone-a", + "hosts": [] + }, + { + "name": "zone-b", + "availability_zone": "zone-b", + "deleted": false, + "id": 2, + "uuid": "uuid-zone-b", + "hosts": [] + } + ] + }` + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateList) }) - By("Mocking CreateAggregate") - fakeServer.Mux.HandleFunc("POST /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + By("Mocking AddHost for both aggregates") + fakeServer.Mux.HandleFunc("POST /os-aggregates/1/action", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"aggregate": {"name": "zone-a", "id": 1, "uuid": "uuid-zone-a", "hosts": ["hv-test"]}}`) + }) + fakeServer.Mux.HandleFunc("POST /os-aggregates/2/action", func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, AggregatesPostBody) + fmt.Fprint(w, `{"aggregate": {"name": "zone-b", "id": 2, "uuid": "uuid-zone-b", "hosts": ["hv-test"]}}`) + }) + }) + + It("should add host to specified aggregates without test aggregate", func(ctx SpecContext) { + updated := &kvmv1.Hypervisor{} + Expect(aggregatesController.Client.Get(ctx, hypervisorName, updated)).To(Succeed()) + Expect(updated.Status.Aggregates).To(ConsistOf("zone-a", "zone-b")) + Expect(updated.Status.AggregateUUIDs).To(ConsistOf("uuid-zone-a", "uuid-zone-b")) + Expect(meta.IsStatusConditionTrue(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated)).To(BeTrue()) + }) + }) + + Context("When Spec.Aggregates matches Status.Aggregates", func() { + Context("but condition is not set", func() { + BeforeEach(func(ctx SpecContext) { + By("Setting matching aggregates in spec and status") + hypervisor := &kvmv1.Hypervisor{} + Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed()) + hypervisor.Spec.Aggregates = []string{"zone-a"} + hypervisor.Status.Aggregates = []string{"zone-a"} + hypervisor.Status.AggregateUUIDs = []string{"uuid-zone-a"} + Expect(k8sClient.Update(ctx, hypervisor)).To(Succeed()) + Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) + + By("Mocking GetAggregates to show host already in aggregate") + aggregateList := `{ + "aggregates": [ + { + "name": "zone-a", + "availability_zone": "zone-a", + "deleted": false, + "id": 1, + "uuid": "uuid-zone-a", + "hosts": ["hv-test"] + } + ] + }` + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateList) + }) + }) + + It("should proceed to update and set the condition", func(ctx SpecContext) { + updated := &kvmv1.Hypervisor{} + Expect(aggregatesController.Client.Get(ctx, hypervisorName, updated)).To(Succeed()) + Expect(updated.Status.Aggregates).To(ConsistOf("zone-a")) + Expect(updated.Status.AggregateUUIDs).To(ConsistOf("uuid-zone-a")) + Expect(meta.IsStatusConditionTrue(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated)).To(BeTrue()) + cond := meta.FindStatusCondition(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated) + Expect(cond.Reason).To(Equal(kvmv1.ConditionReasonSucceeded)) + }) + }) + }) + + Context("Adding to existing Aggregate", func() { + BeforeEach(func(ctx SpecContext) { + By("Setting a desired aggregate") + hypervisor := &kvmv1.Hypervisor{} + Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed()) + hypervisor.Spec.Aggregates = []string{"test-aggregate1"} + Expect(k8sClient.Update(ctx, hypervisor)).To(Succeed()) + + By("Mocking GetAggregates to return aggregate without host") + aggregateList := `{ + "aggregates": [ + { + "name": "test-aggregate1", + "availability_zone": "", + "deleted": false, + "id": 42, + "uuid": "uuid-42", + "hosts": [] + } + ] + }` + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, err := fmt.Fprint(w, aggregateList) Expect(err).NotTo(HaveOccurred()) }) @@ -197,6 +379,7 @@ var _ = Describe("AggregatesController", func() { updated := &kvmv1.Hypervisor{} Expect(aggregatesController.Client.Get(ctx, hypervisorName, updated)).To(Succeed()) Expect(updated.Status.Aggregates).To(ContainElements("test-aggregate1")) + Expect(updated.Status.AggregateUUIDs).To(ContainElements("uuid-42")) Expect(meta.IsStatusConditionTrue(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated)).To(BeTrue()) }) }) @@ -239,6 +422,7 @@ var _ = Describe("AggregatesController", func() { updated := &kvmv1.Hypervisor{} Expect(aggregatesController.Client.Get(ctx, hypervisorName, updated)).To(Succeed()) Expect(updated.Status.Aggregates).To(BeEmpty()) + Expect(updated.Status.AggregateUUIDs).To(BeEmpty()) Expect(meta.IsStatusConditionTrue(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated)).To(BeTrue()) }) }) @@ -251,20 +435,21 @@ var _ = Describe("AggregatesController", func() { Expect(result).To(Equal(ctrl.Result{})) }) - Context("before onboarding", func() { + Context("before onboarding (missing HypervisorID and ServiceID)", func() { BeforeEach(func(ctx SpecContext) { - By("Removing the onboarding condition") + By("Removing HypervisorID and ServiceID from status") hypervisor := &kvmv1.Hypervisor{} Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed()) - hypervisor.Status.Conditions = []metav1.Condition{} + hypervisor.Status.HypervisorID = "" + hypervisor.Status.ServiceID = "" Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) }) - It("should neither update Aggregates and nor set status condition", func(ctx SpecContext) { + It("should return early without updating aggregates or setting condition", func(ctx SpecContext) { updated := &kvmv1.Hypervisor{} Expect(aggregatesController.Client.Get(ctx, hypervisorName, updated)).To(Succeed()) Expect(updated.Status.Aggregates).To(BeEmpty()) - Expect(meta.IsStatusConditionTrue(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated)).To(BeFalse()) + Expect(meta.FindStatusCondition(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated)).To(BeNil()) }) }) @@ -280,13 +465,25 @@ var _ = Describe("AggregatesController", func() { Message: "dontcare", }) Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) + + By("Pre-setting the EvictionInProgress condition to match what controller will determine") + Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed()) + meta.SetStatusCondition(&hypervisor.Status.Conditions, metav1.Condition{ + Type: kvmv1.ConditionTypeAggregatesUpdated, + Status: metav1.ConditionFalse, + Reason: kvmv1.ConditionReasonEvictionInProgress, + Message: "Aggregates unchanged while terminating and eviction in progress", + }) + Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) }) It("should neither update Aggregates and nor set status condition", func(ctx SpecContext) { updated := &kvmv1.Hypervisor{} Expect(aggregatesController.Client.Get(ctx, hypervisorName, updated)).To(Succeed()) Expect(updated.Status.Aggregates).To(BeEmpty()) - Expect(meta.IsStatusConditionTrue(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated)).To(BeFalse()) + Expect(meta.IsStatusConditionFalse(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated)).To(BeTrue()) + cond := meta.FindStatusCondition(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated) + Expect(cond.Reason).To(Equal(kvmv1.ConditionReasonEvictionInProgress)) }) }) }) @@ -322,7 +519,7 @@ var _ = Describe("AggregatesController", func() { It("should set error condition", func(ctx SpecContext) { _, err := aggregatesController.Reconcile(ctx, reconcileRequest) Expect(err).To(HaveOccurred()) - sharedErrorConditionChecks(ctx, "failed to get aggregates") + sharedErrorConditionChecks(ctx, "failed to list aggregates") }) }) diff --git a/internal/controller/constants.go b/internal/controller/constants.go index 32126a65..5151bd28 100644 --- a/internal/controller/constants.go +++ b/internal/controller/constants.go @@ -22,4 +22,5 @@ const ( labelEvictionRequired = "cloud.sap/hypervisor-eviction-required" labelEvictionApproved = "cloud.sap/hypervisor-eviction-succeeded" labelHypervisor = "nova.openstack.cloud.sap/virt-driver" + testAggregateName = "tenant_filter_tests" ) diff --git a/internal/controller/decomission_controller.go b/internal/controller/decomission_controller.go index 2988a852..0a29c126 100644 --- a/internal/controller/decomission_controller.go +++ b/internal/controller/decomission_controller.go @@ -113,11 +113,10 @@ func (r *NodeDecommissionReconciler) Reconcile(ctx context.Context, req ctrl.Req return r.setDecommissioningCondition(ctx, hv, msg) } - // Before removing the service, first take the node out of all aggregates, - // so when the node comes back, it doesn't end up with the old associations - host := hv.Name - if err := openstack.ApplyAggregates(ctx, r.computeClient, host, []string{}); err != nil { - return r.setDecommissioningCondition(ctx, hv, fmt.Sprintf("failed to remove host from aggregates: %v", err)) + // Wait for aggregates controller to remove from all aggregates + if len(hv.Status.Aggregates) > 0 { + msg := fmt.Sprintf("Waiting for aggregates to be removed, current: %v", hv.Status.Aggregates) + return r.setDecommissioningCondition(ctx, hv, msg) } // Deleting and evicted, so better delete the service diff --git a/internal/controller/decomission_controller_test.go b/internal/controller/decomission_controller_test.go index db695857..05ecd4f5 100644 --- a/internal/controller/decomission_controller_test.go +++ b/internal/controller/decomission_controller_test.go @@ -38,32 +38,10 @@ import ( var _ = Describe("Decommission Controller", func() { const ( - EOF = "EOF" - serviceId = "service-1234" - hypervisorName = "node-test" - namespaceName = "namespace-test" - AggregateListWithHv = ` -{ - "aggregates": [ - { - "name": "test-aggregate2", - "availability_zone": "", - "deleted": false, - "id": 100001, - "hosts": ["node-test"] - } - ] -} -` - AggregateRemoveHostBody = ` -{ - "aggregate": { - "name": "test-aggregate2", - "availability_zone": "", - "deleted": false, - "id": 100001 - } -}` + EOF = "EOF" + serviceId = "service-1234" + hypervisorName = "node-test" + namespaceName = "namespace-test" ) var ( @@ -173,27 +151,6 @@ var _ = Describe("Decommission Controller", func() { Expect(fmt.Fprintf(w, HypervisorWithServers, serviceId, "some reason", hypervisorName)).ToNot(BeNil()) }) - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, AggregateListWithHv) - Expect(err).NotTo(HaveOccurred()) - }) - - fakeServer.Mux.HandleFunc("POST /os-aggregates/100001/action", func(w http.ResponseWriter, r *http.Request) { - Expect(r.Header.Get("Content-Type")).To(Equal("application/json")) - expectedBody := `{"remove_host":{"host":"node-test"}}` - body := make([]byte, r.ContentLength) - _, err := r.Body.Read(body) - Expect(err == nil || err.Error() == EOF).To(BeTrue()) - Expect(string(body)).To(MatchJSON(expectedBody)) - - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err = fmt.Fprint(w, AggregateRemoveHostBody) - Expect(err).NotTo(HaveOccurred()) - }) - // c48f6247-abe4-4a24-824e-ea39e108874f comes from the HypervisorWithServers const fakeServer.Mux.HandleFunc("GET /resource_providers/c48f6247-abe4-4a24-824e-ea39e108874f", func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") @@ -249,6 +206,36 @@ var _ = Describe("Decommission Controller", func() { ), )) }) + + It("should clear Status.Aggregates when removing from all aggregates", func(ctx SpecContext) { + By("Setting initial aggregates and IDs in status") + hypervisor := &kvmv1.Hypervisor{} + Expect(k8sClient.Get(ctx, resourceName, hypervisor)).To(Succeed()) + hypervisor.Status.Aggregates = []string{"zone-a", "test-aggregate"} + hypervisor.Status.ServiceID = serviceId + hypervisor.Status.HypervisorID = "c48f6247-abe4-4a24-824e-ea39e108874f" + Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) + + By("Reconciling - decommission controller will wait for aggregates to be cleared") + _, err := decommissionReconciler.Reconcile(ctx, reconcileReq) + Expect(err).NotTo(HaveOccurred()) + + By("Simulating aggregates controller clearing aggregates") + Expect(k8sClient.Get(ctx, resourceName, hypervisor)).To(Succeed()) + hypervisor.Status.Aggregates = []string{} + hypervisor.Status.AggregateUUIDs = []string{} + Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) + + By("Reconciling again after aggregates are cleared") + for range 3 { + _, err := decommissionReconciler.Reconcile(ctx, reconcileReq) + Expect(err).NotTo(HaveOccurred()) + } + + By("Verifying Status.Aggregates is empty") + Expect(k8sClient.Get(ctx, resourceName, hypervisor)).To(Succeed()) + Expect(hypervisor.Status.Aggregates).To(BeEmpty(), "Status.Aggregates should be cleared after decommissioning") + }) }) }) }) @@ -361,42 +348,8 @@ var _ = Describe("Decommission Controller", func() { }) }) - Context("When listing aggregates fails", func() { - BeforeEach(func() { - fakeServer.Mux.HandleFunc("GET /os-hypervisors/detail", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - fmt.Fprintf(w, `{ - "hypervisors": [{ - "id": "c48f6247-abe4-4a24-824e-ea39e108874f", - "hypervisor_hostname": "node-test", - "hypervisor_version": 2002000, - "state": "up", - "status": "enabled", - "running_vms": 0, - "service": { - "id": "service-1234", - "host": "node-test" - } - }] - }`) - }) - - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprint(w, `{"error": "Internal Server Error"}`) - }) - }) - - It("should set decommissioning condition with error", func(ctx SpecContext) { - _, err := decommissionReconciler.Reconcile(ctx, reconcileReq) - Expect(err).NotTo(HaveOccurred()) - sharedDecommissioningErrorCheck(ctx, "cannot list aggregates") - }) - }) - - Context("When ApplyAggregates fails", func() { - BeforeEach(func() { + Context("When aggregates are not empty", func() { + BeforeEach(func(ctx SpecContext) { fakeServer.Mux.HandleFunc("GET /os-hypervisors/detail", func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -416,33 +369,17 @@ var _ = Describe("Decommission Controller", func() { }`) }) - // Mock only the first API call in ApplyAggregates (GET /os-aggregates) to fail - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprint(w, `{"error": "Internal Server Error"}`) - }) + // Simulate aggregates controller hasn't cleared aggregates yet + hypervisor := &kvmv1.Hypervisor{} + Expect(k8sClient.Get(ctx, resourceName, hypervisor)).To(Succeed()) + hypervisor.Status.Aggregates = []string{"zone-a", "test-aggregate"} + Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) }) - It("should set decommissioning condition with error", func(ctx SpecContext) { + It("should wait for aggregates to be cleared", func(ctx SpecContext) { _, err := decommissionReconciler.Reconcile(ctx, reconcileReq) Expect(err).NotTo(HaveOccurred()) - - hypervisor := &kvmv1.Hypervisor{} - Expect(k8sClient.Get(ctx, resourceName, hypervisor)).To(Succeed()) - - Expect(hypervisor.Status.Conditions).To(ContainElement( - SatisfyAll( - HaveField("Type", kvmv1.ConditionTypeReady), - HaveField("Status", metav1.ConditionFalse), - HaveField("Reason", "Decommissioning"), - HaveField("Message", ContainSubstring("failed to remove host")), - ), - )) - - // Should NOT be offboarded yet - Expect(hypervisor.Status.Conditions).NotTo(ContainElement( - HaveField("Type", kvmv1.ConditionTypeOffboarded), - )) + sharedDecommissioningErrorCheck(ctx, "Waiting for aggregates to be removed") }) }) @@ -467,12 +404,6 @@ var _ = Describe("Decommission Controller", func() { }`) }) - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - fmt.Fprint(w, `{"aggregates": []}`) - }) - fakeServer.Mux.HandleFunc("DELETE /os-services/service-1234", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) fmt.Fprint(w, `{"error": "Internal Server Error"}`) @@ -507,12 +438,6 @@ var _ = Describe("Decommission Controller", func() { }`) }) - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - fmt.Fprint(w, `{"aggregates": []}`) - }) - fakeServer.Mux.HandleFunc("DELETE /os-services/service-1234", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) }) @@ -551,12 +476,6 @@ var _ = Describe("Decommission Controller", func() { }`) }) - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - fmt.Fprint(w, `{"aggregates": []}`) - }) - fakeServer.Mux.HandleFunc("DELETE /os-services/service-1234", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) }) diff --git a/internal/controller/onboarding_controller.go b/internal/controller/onboarding_controller.go index e8bd24a7..38ed45ee 100644 --- a/internal/controller/onboarding_controller.go +++ b/internal/controller/onboarding_controller.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "net/http" + "slices" "strings" "time" @@ -52,7 +53,6 @@ var errRequeue = errors.New("requeue requested") const ( defaultWaitTime = 1 * time.Minute - testAggregateName = "tenant_filter_tests" testProjectName = "test" testDomainName = "cc3test" testImageName = "cirros-d240801-kvm" @@ -126,13 +126,15 @@ func (r *OnboardingController) Reconcile(ctx context.Context, req ctrl.Request) switch status.Reason { case kvmv1.ConditionReasonInitial: - return ctrl.Result{}, r.initialOnboarding(ctx, hv, computeHost) + return ctrl.Result{}, r.initialOnboarding(ctx, hv) case kvmv1.ConditionReasonTesting: if hv.Spec.SkipTests { return r.completeOnboarding(ctx, computeHost, hv) } else { return r.smokeTest(ctx, hv, computeHost) } + case kvmv1.ConditionReasonRemovingTestAggregate: + return r.completeOnboarding(ctx, computeHost, hv) default: // Nothing to be done return ctrl.Result{}, nil @@ -190,16 +192,17 @@ func (r *OnboardingController) abortOnboarding(ctx context.Context, hv *kvmv1.Hy return r.patchStatus(ctx, hv, base) } -func (r *OnboardingController) initialOnboarding(ctx context.Context, hv *kvmv1.Hypervisor, host string) error { +func (r *OnboardingController) initialOnboarding(ctx context.Context, hv *kvmv1.Hypervisor) error { zone, found := hv.Labels[corev1.LabelTopologyZone] if !found || zone == "" { return fmt.Errorf("cannot find availability-zone label %v on node", corev1.LabelTopologyZone) } - // Apply the desired aggregate state (zone and test aggregate only) - desiredAggregates := []string{zone, testAggregateName} - if err := openstack.ApplyAggregates(ctx, r.computeClient, host, desiredAggregates); err != nil { - return fmt.Errorf("failed to apply aggregates: %w", err) + // Wait for aggregates controller to apply the desired state (zone and test aggregate) + expectedAggregates := []string{zone, testAggregateName} + if !slices.Equal(hv.Status.Aggregates, expectedAggregates) { + // Aggregates not yet applied, requeue + return errRequeue } // The service may be forced down previously due to an HA event, @@ -305,6 +308,37 @@ func (r *OnboardingController) smokeTest(ctx context.Context, hv *kvmv1.Hypervis func (r *OnboardingController) completeOnboarding(ctx context.Context, host string, hv *kvmv1.Hypervisor) (ctrl.Result, error) { log := logger.FromContext(ctx) + // Check if we're in the RemovingTestAggregate phase + onboardingCondition := meta.FindStatusCondition(hv.Status.Conditions, kvmv1.ConditionTypeOnboarding) + if onboardingCondition != nil && onboardingCondition.Reason == kvmv1.ConditionReasonRemovingTestAggregate { + // We're waiting for aggregates controller to sync + if !meta.IsStatusConditionTrue(hv.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated) { + log.Info("waiting for aggregates to be updated", "condition", kvmv1.ConditionTypeAggregatesUpdated) + return ctrl.Result{RequeueAfter: defaultWaitTime}, nil + } + + // Aggregates have been synced, mark onboarding as complete + log.Info("aggregates updated successfully", "aggregates", hv.Status.Aggregates) + base := hv.DeepCopy() + + meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ + Type: kvmv1.ConditionTypeOnboarding, + Status: metav1.ConditionFalse, + Reason: kvmv1.ConditionReasonSucceeded, + Message: "Onboarding completed", + }) + + meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ + Type: kvmv1.ConditionTypeReady, + Status: metav1.ConditionTrue, + Reason: kvmv1.ConditionReasonReadyReady, + Message: "Hypervisor is ready", + }) + + return ctrl.Result{}, r.patchStatus(ctx, hv, base) + } + + // First time in completeOnboarding - clean up and prepare for aggregate sync err := r.deleteTestServers(ctx, host) if err != nil { base := hv.DeepCopy() @@ -320,18 +354,7 @@ func (r *OnboardingController) completeOnboarding(ctx context.Context, host stri return ctrl.Result{}, err } - zone, found := hv.Labels[corev1.LabelTopologyZone] - if !found || zone == "" { - return ctrl.Result{}, fmt.Errorf("cannot find availability-zone label %v on node", corev1.LabelTopologyZone) - } - - // Remove host from test aggregate by only keeping the zone aggregate - desiredAggregates := []string{zone} - if err := openstack.ApplyAggregates(ctx, r.computeClient, host, desiredAggregates); err != nil { - return ctrl.Result{}, fmt.Errorf("failed to apply aggregates: %w", err) - } - log.Info("removed from test-aggregate", "name", testAggregateName) - + // Enable HA service before marking onboarding complete err = enableInstanceHA(hv) log.Info("enabled instance-ha") if err != nil { @@ -339,21 +362,16 @@ func (r *OnboardingController) completeOnboarding(ctx context.Context, host stri } base := hv.DeepCopy() - // Set hypervisor ready condition - meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ - Type: kvmv1.ConditionTypeReady, - Status: metav1.ConditionTrue, - Reason: kvmv1.ConditionReasonReadyReady, - Message: "Hypervisor is ready", - }) - // set onboarding condition completed + // Mark onboarding as removing test aggregate - signals aggregates controller to use Spec.Aggregates meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ Type: kvmv1.ConditionTypeOnboarding, - Status: metav1.ConditionFalse, - Reason: kvmv1.ConditionReasonSucceeded, - Message: "Onboarding completed", + Status: metav1.ConditionTrue, + Reason: kvmv1.ConditionReasonRemovingTestAggregate, + Message: "Removing test aggregate", }) + + // Patch status to signal aggregates controller return ctrl.Result{}, r.patchStatus(ctx, hv, base) } diff --git a/internal/controller/onboarding_controller_test.go b/internal/controller/onboarding_controller_test.go index c5706bed..d902a654 100644 --- a/internal/controller/onboarding_controller_test.go +++ b/internal/controller/onboarding_controller_test.go @@ -83,97 +83,11 @@ const ( var _ = Describe("Onboarding Controller", func() { const ( - region = "test-region" - availabilityZone = "test-az" - hypervisorName = "test-host" - serviceId = "service-id" - hypervisorId = "c48f6247-abe4-4a24-824e-ea39e108874f" - aggregatesBodyInitial = `{ - "aggregates": [ - { - "name": "test-az", - "availability_zone": "test-az", - "deleted": false, - "id": 100001, - "hosts": [] - }, - { - "name": "tenant_filter_tests", - "availability_zone": "", - "deleted": false, - "id": 99, - "hosts": [] - } - ] -}` - - aggregatesBodyUnexpected = `{ - "aggregates": [ - { - "name": "test-az", - "availability_zone": "test-az", - "deleted": false, - "id": 100001, - "hosts": [] - }, - { - "name": "tenant_filter_tests", - "availability_zone": "", - "deleted": false, - "id": 99, - "hosts": [] - }, - { - "name": "unexpected", - "availability_zone": "", - "deleted": false, - "id": -1, - "hosts": ["test-host"] - } - ] -}` - - aggregatesBodySetup = `{ - "aggregates": [ - { - "name": "test-az", - "availability_zone": "test-az", - "deleted": false, - "id": 100001, - "hosts": ["test-host"] - }, - { - "name": "tenant_filter_tests", - "availability_zone": "", - "deleted": false, - "id": 99, - "hosts": ["test-host"] - } - ] -}` - addedHostToAzBody = `{ - "aggregate": { - "name": "test-az", - "availability_zone": "test-az", - "deleted": false, - "hosts": [ - "test-host" - ], - "id": 100001 - } -}` - - addedHostToTestBody = `{ - "aggregate": { - "name": "tenant_filter_tests", - "availability_zone": "", - "deleted": false, - "hosts": [ - "test-host" - ], - "id": 99 - } -}` + region = "test-region" + availabilityZone = "test-az" + hypervisorName = "test-host" + serviceId = "service-id" + hypervisorId = "c48f6247-abe4-4a24-824e-ea39e108874f" flavorDetailsBody = `{ "flavors": [ @@ -325,20 +239,6 @@ var _ = Describe("Onboarding Controller", func() { Expect(fmt.Fprintf(w, HypervisorWithServers, serviceId, "", hypervisorName)).ToNot(BeNil()) }) - fakeServer.Mux.HandleFunc("POST /os-aggregates/100001/action", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, addedHostToAzBody) - Expect(err).NotTo(HaveOccurred()) - }) - - fakeServer.Mux.HandleFunc("POST /os-aggregates/99/action", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, addedHostToTestBody) - Expect(err).NotTo(HaveOccurred()) - }) - fakeServer.Mux.HandleFunc("PUT /os-services/service-id", func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -348,15 +248,6 @@ var _ = Describe("Onboarding Controller", func() { }) When("it is a clean setup", func() { - BeforeEach(func() { - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, aggregatesBodyInitial) - Expect(err).NotTo(HaveOccurred()) - }) - }) - It("should set the Service- and HypervisorId from Nova", func(ctx SpecContext) { By("Reconciling the created resource") err := reconcileLoop(ctx, 1) @@ -368,10 +259,20 @@ var _ = Describe("Onboarding Controller", func() { }) It("should update the status accordingly", func(ctx SpecContext) { - By("Reconciling the created resource") - err := reconcileLoop(ctx, 2) + By("Reconciling the created resource to set IDs") + err := reconcileLoop(ctx, 1) Expect(err).NotTo(HaveOccurred()) + + By("Simulating aggregates controller setting aggregates") hv := &kvmv1.Hypervisor{} + Expect(k8sClient.Get(ctx, namespacedName, hv)).To(Succeed()) + hv.Status.Aggregates = []string{availabilityZone, testAggregateName} + Expect(k8sClient.Status().Update(ctx, hv)).To(Succeed()) + + By("Reconciling again to process onboarding") + err = reconcileLoop(ctx, 1) + Expect(err).NotTo(HaveOccurred()) + Expect(k8sClient.Get(ctx, namespacedName, hv)).To(Succeed()) Expect(hv.Status.Conditions).To(ContainElements( SatisfyAll( @@ -389,22 +290,6 @@ var _ = Describe("Onboarding Controller", func() { }) When("it the host is already in an unexpected aggregate", func() { - BeforeEach(func() { - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, aggregatesBodyUnexpected) - Expect(err).NotTo(HaveOccurred()) - }) - - fakeServer.Mux.HandleFunc("POST /os-aggregates/-1/action", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, addedHostToTestBody) - Expect(err).NotTo(HaveOccurred()) - }) - }) - It("should set the Service- and HypervisorId from Nova", func(ctx SpecContext) { By("Reconciling the created resource") err := reconcileLoop(ctx, 1) @@ -416,10 +301,20 @@ var _ = Describe("Onboarding Controller", func() { }) It("should update the status accordingly", func(ctx SpecContext) { - By("Reconciling the created resource") - err := reconcileLoop(ctx, 2) + By("Reconciling the created resource to set IDs") + err := reconcileLoop(ctx, 1) Expect(err).NotTo(HaveOccurred()) + + By("Simulating aggregates controller setting aggregates") hv := &kvmv1.Hypervisor{} + Expect(k8sClient.Get(ctx, namespacedName, hv)).To(Succeed()) + hv.Status.Aggregates = []string{availabilityZone, testAggregateName} + Expect(k8sClient.Status().Update(ctx, hv)).To(Succeed()) + + By("Reconciling again to process onboarding") + err = reconcileLoop(ctx, 1) + Expect(err).NotTo(HaveOccurred()) + Expect(k8sClient.Get(ctx, namespacedName, hv)).To(Succeed()) Expect(hv.Status.Conditions).To(ContainElements( SatisfyAll( @@ -455,22 +350,6 @@ var _ = Describe("Onboarding Controller", func() { }) Expect(k8sClient.Status().Update(ctx, hv)).To(Succeed()) - // Mock for ApplyAggregates during completeOnboarding - // Returns aggregates with host in both test-az and tenant_filter_tests - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, aggregatesBodySetup) - Expect(err).NotTo(HaveOccurred()) - }) - - fakeServer.Mux.HandleFunc("PUT /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, aggregatesBodySetup) - Expect(err).NotTo(HaveOccurred()) - }) - fakeServer.Mux.HandleFunc("PUT /os-services/service-id", func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -492,23 +371,6 @@ var _ = Describe("Onboarding Controller", func() { Expect(err).NotTo(HaveOccurred()) }) - // Mock for removing host from test aggregate (ApplyAggregates will call this) - fakeServer.Mux.HandleFunc("POST /os-aggregates/99/action", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - // Return aggregate without the host after removal - _, err := fmt.Fprint(w, `{ - "aggregate": { - "name": "tenant_filter_tests", - "availability_zone": "", - "deleted": false, - "hosts": [], - "id": 99 - } - }`) - Expect(err).NotTo(HaveOccurred()) - }) - fakeServer.Mux.HandleFunc("POST /instance-ha", func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -563,14 +425,44 @@ var _ = Describe("Onboarding Controller", func() { Expect(k8sClient.Get(ctx, namespacedName, hv)).To(Succeed()) hv.Spec.SkipTests = true Expect(k8sClient.Update(ctx, hv)).To(Succeed()) + + // Simulate aggregates being set by aggregates controller + hv.Status.Aggregates = []string{availabilityZone, testAggregateName} + Expect(k8sClient.Status().Update(ctx, hv)).To(Succeed()) }) It("should update the conditions", func(ctx SpecContext) { - By("Reconciling the created resource") - err := reconcileLoop(ctx, 3) + By("Reconciling to move from Initial to Testing state") + err := reconcileLoop(ctx, 1) + Expect(err).NotTo(HaveOccurred()) + + By("Reconciling again to call completeOnboarding and set RemovingTestAggregate") + err = reconcileLoop(ctx, 1) Expect(err).NotTo(HaveOccurred()) + + By("Verifying onboarding is in RemovingTestAggregate state") hv := &kvmv1.Hypervisor{} Expect(k8sClient.Get(ctx, namespacedName, hv)).To(Succeed()) + onboardingCond := meta.FindStatusCondition(hv.Status.Conditions, kvmv1.ConditionTypeOnboarding) + Expect(onboardingCond).NotTo(BeNil()) + Expect(onboardingCond.Reason).To(Equal(kvmv1.ConditionReasonRemovingTestAggregate)) + + By("Simulating aggregates controller updating aggregates and setting condition") + hv.Status.Aggregates = []string{availabilityZone} + meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ + Type: kvmv1.ConditionTypeAggregatesUpdated, + Status: metav1.ConditionTrue, + Reason: kvmv1.ConditionReasonSucceeded, + Message: "Aggregates updated successfully", + }) + Expect(k8sClient.Status().Update(ctx, hv)).To(Succeed()) + + By("Reconciling once more to complete onboarding and set Ready") + err = reconcileLoop(ctx, 1) + Expect(err).NotTo(HaveOccurred()) + + By("Verifying final state") + Expect(k8sClient.Get(ctx, namespacedName, hv)).To(Succeed()) Expect(hv.Status.Conditions).To(ContainElements( SatisfyAll( HaveField("Type", kvmv1.ConditionTypeReady), @@ -590,13 +482,37 @@ var _ = Describe("Onboarding Controller", func() { Expect(k8sClient.Get(ctx, namespacedName, hv)).To(Succeed()) hv.Spec.SkipTests = false Expect(k8sClient.Update(ctx, hv)).To(Succeed()) + + // Simulate aggregates being set by aggregates controller + hv.Status.Aggregates = []string{availabilityZone, testAggregateName} + Expect(k8sClient.Status().Update(ctx, hv)).To(Succeed()) }) It("should update the conditions", func(ctx SpecContext) { - By("Reconciling the created resource") - err := reconcileLoop(ctx, 3) + By("Reconciling to run tests") + err := reconcileLoop(ctx, 1) Expect(err).NotTo(HaveOccurred()) + + By("Reconciling again after server is active") + err = reconcileLoop(ctx, 1) + Expect(err).NotTo(HaveOccurred()) + + By("Simulating aggregates controller setting condition after removing test aggregate") hv := &kvmv1.Hypervisor{} + Expect(k8sClient.Get(ctx, namespacedName, hv)).To(Succeed()) + hv.Status.Aggregates = []string{availabilityZone} + meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ + Type: kvmv1.ConditionTypeAggregatesUpdated, + Status: metav1.ConditionTrue, + Reason: kvmv1.ConditionReasonSucceeded, + Message: "Aggregates updated successfully", + }) + Expect(k8sClient.Status().Update(ctx, hv)).To(Succeed()) + + By("Reconciling to complete onboarding after aggregates condition is set") + err = reconcileLoop(ctx, 5) + Expect(err).NotTo(HaveOccurred()) + Expect(k8sClient.Get(ctx, namespacedName, hv)).To(Succeed()) Expect(hv.Status.Conditions).To(ContainElements( SatisfyAll( diff --git a/internal/openstack/aggregates.go b/internal/openstack/aggregates.go index b7a67e7f..c051f883 100644 --- a/internal/openstack/aggregates.go +++ b/internal/openstack/aggregates.go @@ -26,145 +26,98 @@ import ( "github.com/gophercloud/gophercloud/v2/openstack/compute/v2/aggregates" ) -// GetAggregatesByName retrieves all aggregates from nova and returns them as a map keyed by name. -func GetAggregatesByName(ctx context.Context, serviceClient *gophercloud.ServiceClient) (map[string]*aggregates.Aggregate, error) { - pages, err := aggregates.List(serviceClient).AllPages(ctx) - if err != nil { - return nil, fmt.Errorf("cannot list aggregates due to %w", err) - } - - aggs, err := aggregates.ExtractAggregates(pages) - if err != nil { - return nil, fmt.Errorf("cannot list aggregates due to %w", err) - } - - aggregateMap := make(map[string]*aggregates.Aggregate, len(aggs)) - for _, aggregate := range aggs { - aggregateMap[aggregate.Name] = &aggregate - } - return aggregateMap, nil -} - -// AddToAggregate adds the given host to the named aggregate, creating the aggregate if it does not yet exist. -func AddToAggregate(ctx context.Context, serviceClient *gophercloud.ServiceClient, aggs map[string]*aggregates.Aggregate, host, name, zone string) (err error) { - aggregate, found := aggs[name] - log := logger.FromContext(ctx) - if !found { - aggregate, err = aggregates.Create(ctx, serviceClient, - aggregates.CreateOpts{ - Name: name, - AvailabilityZone: zone, - }).Extract() - if err != nil { - return fmt.Errorf("failed to create aggregate %v due to %w", name, err) - } - aggs[name] = aggregate - } - - if slices.Contains(aggregate.Hosts, host) { - log.Info("Found host in aggregate", "host", host, "name", name) - return nil - } - - result, err := aggregates.AddHost(ctx, serviceClient, aggregate.ID, aggregates.AddHostOpts{Host: host}).Extract() - if err != nil { - return fmt.Errorf("failed to add host %v to aggregate %v due to %w", host, name, err) - } - log.Info("Added host to aggregate", "host", host, "name", name) - aggs[name] = result - - return nil -} - -// RemoveFromAggregate removes the given host from the named aggregate. -func RemoveFromAggregate(ctx context.Context, serviceClient *gophercloud.ServiceClient, aggs map[string]*aggregates.Aggregate, host, name string) error { - aggregate, found := aggs[name] +// ApplyAggregates ensures a host is in exactly the specified aggregates. +// +// The function performs a two-phase operation to maintain security: +// 1. First, adds the host to all desired aggregates (if not already present) +// 2. Then, removes the host from any aggregates it shouldn't be in +// +// This ordering prevents leaving the host unprotected between operations when +// aggregates have filter criteria. However, conflicts may still occur with +// aggregates in different availability zones, in which case errors are collected +// and returned together for eventual convergence. +// +// All specified aggregates must already exist in OpenStack. If any desired +// aggregate is not found, an error is returned listing the missing aggregates. +// +// Pass an empty list to remove the host from all aggregates. +func ApplyAggregates(ctx context.Context, serviceClient *gophercloud.ServiceClient, host string, desiredAggregates []string) ([]string, error) { log := logger.FromContext(ctx) - if !found { - log.Info("cannot find aggregate", "name", name) - return nil - } - found = false - for _, aggHost := range aggregate.Hosts { - if aggHost == host { - found = true - } - } + oldMicroVersion := serviceClient.Microversion + serviceClient.Microversion = "2.93" // Something bigger than 2.41 for UUIDs + defer func() { + serviceClient.Microversion = oldMicroVersion + }() - if !found { - log.Info("cannot find host in aggregate", "host", host, "name", name) - return nil - } - - result, err := aggregates.RemoveHost(ctx, serviceClient, aggregate.ID, aggregates.RemoveHostOpts{Host: host}).Extract() + // Fetch all aggregates + pages, err := aggregates.List(serviceClient).AllPages(ctx) if err != nil { - return fmt.Errorf("failed to add host %v to aggregate %v due to %w", host, name, err) + return nil, fmt.Errorf("failed to list aggregates: %w", err) } - aggs[name] = result - log.Info("removed host from aggregate", "host", host, "name", name) - return nil -} - -// ApplyAggregates ensures a host is in exactly the specified aggregates. -// It adds the host to missing aggregates and removes it from extra ones. -// Pass an empty list to remove the host from all aggregates. -// Aggregates must already exist - this function will not create them. -func ApplyAggregates(ctx context.Context, serviceClient *gophercloud.ServiceClient, host string, desiredAggregates []string) error { - log := logger.FromContext(ctx) - - aggs, err := GetAggregatesByName(ctx, serviceClient) + allAggregates, err := aggregates.ExtractAggregates(pages) if err != nil { - return fmt.Errorf("failed to get aggregates: %w", err) + return nil, fmt.Errorf("failed to extract aggregates: %w", err) } - // Build desired set for O(1) lookups + // Build desired set for lookups desiredSet := make(map[string]bool, len(desiredAggregates)) for _, name := range desiredAggregates { desiredSet[name] = true } - // We need to add the host to aggregates first, because if we first drop - // an aggregate with a filter criterion and then add a new one, we leave the host - // open for a period of time. Still, this may fail due to a conflict of aggregates - // with different availability zones, so we collect all the errors and return them - // so it hopefully will converge eventually. + var uuids []string var errs []error - var toRemove []string - - // First, add to any desired aggregates (including creating them if needed) - for _, name := range desiredAggregates { - aggregate, exists := aggs[name] - if !exists || !slices.Contains(aggregate.Hosts, host) { - // Aggregate doesn't exist or host not in it - add immediately - log.Info("Adding to aggregate", "aggregate", name, "host", host) - if err := AddToAggregate(ctx, serviceClient, aggs, host, name, ""); err != nil { - errs = append(errs, err) + var toRemove []aggregates.Aggregate + + // Single pass: handle adds immediately, collect removes for later + for _, agg := range allAggregates { + hostInAggregate := slices.Contains(agg.Hosts, host) + aggregateDesired := desiredSet[agg.Name] + + if aggregateDesired { + // Mark as found + delete(desiredSet, agg.Name) + uuids = append(uuids, agg.UUID) + + if !hostInAggregate { + // Add host to this aggregate + log.Info("Adding to aggregate", "aggregate", agg.Name, "host", host) + _, err := aggregates.AddHost(ctx, serviceClient, agg.ID, aggregates.AddHostOpts{Host: host}).Extract() + if err != nil { + errs = append(errs, fmt.Errorf("failed to add host %v to aggregate %v: %w", host, agg.Name, err)) + } } + } else if hostInAggregate { + // Collect for removal (after all adds complete) + toRemove = append(toRemove, agg) } } - // Second, collect aggregates to remove from (host is in but shouldn't be) - for name, aggregate := range aggs { - if slices.Contains(aggregate.Hosts, host) && !desiredSet[name] { - toRemove = append(toRemove, name) + // Error if any desired aggregates don't exist + if len(desiredSet) > 0 { + var missing []string + for name := range desiredSet { + missing = append(missing, name) } + errs = append(errs, fmt.Errorf("aggregates not found: %v", missing)) } - // Remove after all additions are complete + // Remove host from unwanted aggregates (after all adds complete) if len(toRemove) > 0 { - log.Info("Removing from aggregates", "aggregates", toRemove, "host", host) - for _, name := range toRemove { - if err := RemoveFromAggregate(ctx, serviceClient, aggs, host, name); err != nil { - errs = append(errs, err) + for _, agg := range toRemove { + log.Info("Removing from aggregate", "aggregate", agg.Name, "host", host) + _, err := aggregates.RemoveHost(ctx, serviceClient, agg.ID, aggregates.RemoveHostOpts{Host: host}).Extract() + if err != nil { + errs = append(errs, fmt.Errorf("failed to remove host %v from aggregate %v: %w", host, agg.Name, err)) } } } if len(errs) > 0 { - return fmt.Errorf("encountered errors during aggregate update: %w", errors.Join(errs...)) + return nil, errors.Join(errs...) + } else { + return uuids, nil } - - return nil } diff --git a/internal/openstack/aggregates_test.go b/internal/openstack/aggregates_test.go index d0758bce..d4bdcefe 100644 --- a/internal/openstack/aggregates_test.go +++ b/internal/openstack/aggregates_test.go @@ -37,6 +37,7 @@ var _ = Describe("ApplyAggregates", func() { "availability_zone": "az1", "deleted": false, "id": 1, + "uuid": "uuid-agg1", "hosts": ["test-host"] }, { @@ -44,6 +45,7 @@ var _ = Describe("ApplyAggregates", func() { "availability_zone": "az2", "deleted": false, "id": 2, + "uuid": "uuid-agg2", "hosts": ["test-host"] }, { @@ -51,6 +53,7 @@ var _ = Describe("ApplyAggregates", func() { "availability_zone": "az3", "deleted": false, "id": 3, + "uuid": "uuid-agg3", "hosts": [] } ] @@ -62,6 +65,7 @@ var _ = Describe("ApplyAggregates", func() { "availability_zone": "az3", "deleted": false, "id": 3, + "uuid": "uuid-agg3", "hosts": ["test-host"] } }` @@ -72,6 +76,7 @@ var _ = Describe("ApplyAggregates", func() { "availability_zone": "az1", "deleted": false, "id": 1, + "uuid": "uuid-agg1", "hosts": [] } }` @@ -108,8 +113,9 @@ var _ = Describe("ApplyAggregates", func() { It("should add host to agg3", func() { serviceClient := client.ServiceClient(fakeServer) - err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1", "agg2", "agg3"}) + uuids, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1", "agg2", "agg3"}) Expect(err).NotTo(HaveOccurred()) + Expect(uuids).To(ConsistOf("uuid-agg1", "uuid-agg2", "uuid-agg3")) }) }) @@ -130,8 +136,9 @@ var _ = Describe("ApplyAggregates", func() { It("should remove host from agg1", func() { serviceClient := client.ServiceClient(fakeServer) - err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg2"}) + uuids, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg2"}) Expect(err).NotTo(HaveOccurred()) + Expect(uuids).To(ConsistOf("uuid-agg2")) }) }) @@ -158,15 +165,16 @@ var _ = Describe("ApplyAggregates", func() { removeCalls++ w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - fmt.Fprint(w, `{"aggregate": {"name": "agg2", "id": 2, "hosts": []}}`) + fmt.Fprint(w, `{"aggregate": {"name": "agg2", "id": 2, "uuid": "uuid-agg2", "hosts": []}}`) }) }) It("should remove host from all aggregates", func() { serviceClient := client.ServiceClient(fakeServer) - err := ApplyAggregates(ctx, serviceClient, "test-host", []string{}) + uuids, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{}) Expect(err).NotTo(HaveOccurred()) Expect(removeCalls).To(Equal(2)) + Expect(uuids).To(BeEmpty()) }) }) @@ -181,8 +189,9 @@ var _ = Describe("ApplyAggregates", func() { It("should not make any changes", func() { serviceClient := client.ServiceClient(fakeServer) - err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1", "agg2"}) + uuids, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1", "agg2"}) Expect(err).NotTo(HaveOccurred()) + Expect(uuids).To(ConsistOf("uuid-agg1", "uuid-agg2")) }) }) @@ -209,8 +218,9 @@ var _ = Describe("ApplyAggregates", func() { It("should replace agg1 with agg3", func() { serviceClient := client.ServiceClient(fakeServer) - err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg2", "agg3"}) + uuids, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg2", "agg3"}) Expect(err).NotTo(HaveOccurred()) + Expect(uuids).To(ConsistOf("uuid-agg2", "uuid-agg3")) }) }) @@ -224,9 +234,9 @@ var _ = Describe("ApplyAggregates", func() { It("should return an error", func() { serviceClient := client.ServiceClient(fakeServer) - err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1"}) + _, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1"}) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("failed to get aggregates")) + Expect(err.Error()).To(ContainSubstring("failed to list aggregates")) }) }) @@ -246,9 +256,9 @@ var _ = Describe("ApplyAggregates", func() { It("should return an error", func() { serviceClient := client.ServiceClient(fakeServer) - err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1", "agg2", "agg3"}) + _, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1", "agg2", "agg3"}) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("encountered errors during aggregate update")) + Expect(err.Error()).To(ContainSubstring("failed to add host")) }) }) @@ -268,9 +278,9 @@ var _ = Describe("ApplyAggregates", func() { It("should return an error", func() { serviceClient := client.ServiceClient(fakeServer) - err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg2"}) + _, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg2"}) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("encountered errors during aggregate update")) + Expect(err.Error()).To(ContainSubstring("failed to remove host")) }) }) @@ -291,8 +301,9 @@ var _ = Describe("ApplyAggregates", func() { It("should add new host to aggregate", func() { serviceClient := client.ServiceClient(fakeServer) - err := ApplyAggregates(ctx, serviceClient, "new-host", []string{"agg3"}) + uuids, err := ApplyAggregates(ctx, serviceClient, "new-host", []string{"agg3"}) Expect(err).NotTo(HaveOccurred()) + Expect(uuids).To(ConsistOf("uuid-agg3")) }) }) @@ -319,11 +330,49 @@ var _ = Describe("ApplyAggregates", func() { It("should return combined errors", func() { serviceClient := client.ServiceClient(fakeServer) - err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg2", "agg3"}) + _, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg2", "agg3"}) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("encountered errors during aggregate update")) // Verify it's a joined error with multiple failures - Expect(err.Error()).To(Or(ContainSubstring("Cannot add"), ContainSubstring("Cannot remove"))) + Expect(err.Error()).To(And(ContainSubstring("failed to add host"), ContainSubstring("failed to remove host"))) + }) + }) + + Context("when desired aggregate does not exist", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + }) + + It("should return an error about missing aggregate", func() { + serviceClient := client.ServiceClient(fakeServer) + _, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1", "agg2", "agg4"}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("aggregates not found")) + Expect(err.Error()).To(ContainSubstring("agg4")) + }) + }) + + Context("when multiple desired aggregates do not exist", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + }) + + It("should return an error listing all missing aggregates", func() { + serviceClient := client.ServiceClient(fakeServer) + _, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg4", "agg5", "agg6"}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("aggregates not found")) + // Check that all missing aggregates are mentioned in the error + Expect(err.Error()).To(ContainSubstring("agg4")) + Expect(err.Error()).To(ContainSubstring("agg5")) + Expect(err.Error()).To(ContainSubstring("agg6")) }) }) })