diff --git a/api/v1/hypervisor_types.go b/api/v1/hypervisor_types.go index 185f736..c6bf4dc 100644 --- a/api/v1/hypervisor_types.go +++ b/api/v1/hypervisor_types.go @@ -61,10 +61,17 @@ const ( ConditionReasonReadyEvicting = "Evicting" // ConditionTypeOnboarding reasons - ConditionReasonInitial = "Initial" - ConditionReasonOnboarding = "Onboarding" - ConditionReasonTesting = "Testing" - ConditionReasonAborted = "Aborted" + ConditionReasonInitial = "Initial" + ConditionReasonOnboarding = "Onboarding" + ConditionReasonTesting = "Testing" + ConditionReasonRemovingTestAggregate = "RemovingTestAggregate" + ConditionReasonAborted = "Aborted" + + // ConditionTypeAggregatesUpdated reasons + // Note: ConditionReasonSucceeded and ConditionReasonFailed are shared with eviction_types.go + ConditionReasonTestAggregates = "TestAggregates" + ConditionReasonTerminating = "Terminating" + ConditionReasonEvictionInProgress = "EvictionInProgress" ) // HypervisorSpec defines the desired state of Hypervisor @@ -341,6 +348,10 @@ type HypervisorStatus struct { // Aggregates are the applied aggregates of the hypervisor. Aggregates []string `json:"aggregates,omitempty"` + // +kubebuilder:default:={} + // The UUIDs of the aggregates are used to apply aggregates to the hypervisor. + AggregateUUIDs []string `json:"aggregateUUIDs,omitempty"` + // InternalIP is the internal IP address of the hypervisor. InternalIP string `json:"internalIp,omitempty"` diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index dec733a..a55c9bd 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -351,6 +351,11 @@ func (in *HypervisorStatus) DeepCopyInto(out *HypervisorStatus) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.AggregateUUIDs != nil { + in, out := &in.AggregateUUIDs, &out.AggregateUUIDs + *out = make([]string, len(*in)) + copy(*out, *in) + } if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions *out = make([]metav1.Condition, len(*in)) diff --git a/applyconfigurations/api/v1/hypervisorstatus.go b/applyconfigurations/api/v1/hypervisorstatus.go index 45c24f5..dbb1524 100644 --- a/applyconfigurations/api/v1/hypervisorstatus.go +++ b/applyconfigurations/api/v1/hypervisorstatus.go @@ -25,6 +25,7 @@ type HypervisorStatusApplyConfiguration struct { ServiceID *string `json:"serviceId,omitempty"` Traits []string `json:"traits,omitempty"` Aggregates []string `json:"aggregates,omitempty"` + AggregateUUIDs []string `json:"aggregateUUIDs,omitempty"` InternalIP *string `json:"internalIp,omitempty"` Evicted *bool `json:"evicted,omitempty"` Conditions []metav1.ConditionApplyConfiguration `json:"conditions,omitempty"` @@ -183,6 +184,16 @@ func (b *HypervisorStatusApplyConfiguration) WithAggregates(values ...string) *H return b } +// WithAggregateUUIDs adds the given value to the AggregateUUIDs field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the AggregateUUIDs field. +func (b *HypervisorStatusApplyConfiguration) WithAggregateUUIDs(values ...string) *HypervisorStatusApplyConfiguration { + for i := range values { + b.AggregateUUIDs = append(b.AggregateUUIDs, values[i]) + } + return b +} + // WithInternalIP sets the InternalIP field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the InternalIP field is set to the value of the last call. diff --git a/charts/openstack-hypervisor-operator/crds/hypervisor-crd.yaml b/charts/openstack-hypervisor-operator/crds/hypervisor-crd.yaml index 521e67a..a899375 100644 --- a/charts/openstack-hypervisor-operator/crds/hypervisor-crd.yaml +++ b/charts/openstack-hypervisor-operator/crds/hypervisor-crd.yaml @@ -188,6 +188,13 @@ spec: status: description: HypervisorStatus defines the observed state of Hypervisor properties: + aggregateUUIDs: + default: [] + description: The UUIDs of the aggregates are used to apply aggregates + to the hypervisor. + items: + type: string + type: array aggregates: description: Aggregates are the applied aggregates of the hypervisor. items: diff --git a/config/crd/bases/kvm.cloud.sap_hypervisors.yaml b/config/crd/bases/kvm.cloud.sap_hypervisors.yaml index 348504c..6aa2bb4 100644 --- a/config/crd/bases/kvm.cloud.sap_hypervisors.yaml +++ b/config/crd/bases/kvm.cloud.sap_hypervisors.yaml @@ -189,6 +189,13 @@ spec: status: description: HypervisorStatus defines the observed state of Hypervisor properties: + aggregateUUIDs: + default: [] + description: The UUIDs of the aggregates are used to apply aggregates + to the hypervisor. + items: + type: string + type: array aggregates: description: Aggregates are the applied aggregates of the hypervisor. items: diff --git a/internal/controller/aggregates_controller.go b/internal/controller/aggregates_controller.go index 7229370..1471527 100644 --- a/internal/controller/aggregates_controller.go +++ b/internal/controller/aggregates_controller.go @@ -23,6 +23,8 @@ import ( "fmt" "slices" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -52,97 +54,116 @@ type AggregatesController struct { // +kubebuilder:rbac:groups=kvm.cloud.sap,resources=hypervisors/status,verbs=get;list;watch;create;update;patch;delete func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := logger.FromContext(ctx) hv := &kvmv1.Hypervisor{} if err := ac.Get(ctx, req.NamespacedName, hv); err != nil { return ctrl.Result{}, k8sclient.IgnoreNotFound(err) } - /// On- and off-boarding need to mess with the aggregates, so let's get out of their way - if !meta.IsStatusConditionFalse(hv.Status.Conditions, kvmv1.ConditionTypeOnboarding) || - meta.IsStatusConditionTrue(hv.Status.Conditions, kvmv1.ConditionTypeTerminating) { + // Wait for onboarding controller to populate HypervisorID and ServiceID + // before attempting to modify aggregates + if hv.Status.HypervisorID == "" || hv.Status.ServiceID == "" { return ctrl.Result{}, nil } - if slices.Equal(hv.Spec.Aggregates, hv.Status.Aggregates) { - // Nothing to be done - return ctrl.Result{}, nil - } + base := hv.DeepCopy() + desiredAggregates, desiredCondition := ac.determineDesiredState(hv) + + if !slices.Equal(desiredAggregates, hv.Status.Aggregates) { + // Apply aggregates to OpenStack and update status + uuids, err := openstack.ApplyAggregates(ctx, ac.computeClient, hv.Name, desiredAggregates) + if err != nil { + // Set error condition + condition := metav1.Condition{ + Type: kvmv1.ConditionTypeAggregatesUpdated, + Status: metav1.ConditionFalse, + Reason: kvmv1.ConditionReasonFailed, + Message: fmt.Errorf("failed to apply aggregates: %w", err).Error(), + } - aggs, err := openstack.GetAggregatesByName(ctx, ac.computeClient) - if err != nil { - err = fmt.Errorf("failed listing aggregates: %w", err) - if err2 := ac.setErrorCondition(ctx, hv, err.Error()); err2 != nil { - return ctrl.Result{}, errors.Join(err, err2) + if meta.SetStatusCondition(&hv.Status.Conditions, condition) { + if err2 := ac.Status().Patch(ctx, hv, k8sclient.MergeFromWithOptions(base, + k8sclient.MergeFromWithOptimisticLock{}), k8sclient.FieldOwner(AggregatesControllerName)); err2 != nil { + return ctrl.Result{}, errors.Join(err, err2) + } + } + return ctrl.Result{}, err } - return ctrl.Result{}, err + + hv.Status.Aggregates = desiredAggregates + hv.Status.AggregateUUIDs = uuids + } + + // Set the condition based on the determined desired state + meta.SetStatusCondition(&hv.Status.Conditions, desiredCondition) + + if equality.Semantic.DeepEqual(base, hv) { + return ctrl.Result{}, nil } - toAdd := Difference(hv.Status.Aggregates, hv.Spec.Aggregates) - toRemove := Difference(hv.Spec.Aggregates, hv.Status.Aggregates) - - // We need to add first the host to the aggregates, because if we first drop - // an aggregate with a filter criterion and then add a new one, we leave the host - // open for period of time. Still, this may fail due to a conflict of aggregates - // with different availability zones, so we collect all the errors and return them - // so it hopefully will converge eventually. - var errs []error - if len(toAdd) > 0 { - log.Info("Adding", "aggregates", toAdd) - for item := range slices.Values(toAdd) { - if err = openstack.AddToAggregate(ctx, ac.computeClient, aggs, hv.Name, item, ""); err != nil { - errs = append(errs, err) + return ctrl.Result{}, ac.Status().Patch(ctx, hv, k8sclient.MergeFromWithOptions(base, + k8sclient.MergeFromWithOptimisticLock{}), k8sclient.FieldOwner(AggregatesControllerName)) +} + +// determineDesiredState returns the desired aggregates and the corresponding condition +// based on the hypervisor's current state. The condition status is True only when +// spec aggregates are being applied. Otherwise, it's False with a reason explaining +// why different aggregates are applied. +func (ac *AggregatesController) determineDesiredState(hv *kvmv1.Hypervisor) ([]string, metav1.Condition) { + // If terminating AND evicted, remove from all aggregates + // We must wait for eviction to complete before removing aggregates + if meta.IsStatusConditionTrue(hv.Status.Conditions, kvmv1.ConditionTypeTerminating) { + evictingCondition := meta.FindStatusCondition(hv.Status.Conditions, kvmv1.ConditionTypeEvicting) + // Only remove aggregates if eviction is complete (Evicting=False) + // If Evicting condition is not set or still True, keep current aggregates + if evictingCondition != nil && evictingCondition.Status == metav1.ConditionFalse { + return []string{}, metav1.Condition{ + Type: kvmv1.ConditionTypeAggregatesUpdated, + Status: metav1.ConditionFalse, + Reason: kvmv1.ConditionReasonTerminating, + Message: "Aggregates cleared due to termination after eviction", } } + // Still evicting or eviction not started - keep current aggregates + return hv.Status.Aggregates, metav1.Condition{ + Type: kvmv1.ConditionTypeAggregatesUpdated, + Status: metav1.ConditionFalse, + Reason: kvmv1.ConditionReasonEvictionInProgress, + Message: "Aggregates unchanged while terminating and eviction in progress", + } } - if len(toRemove) > 0 { - log.Info("Removing", "aggregates", toRemove) - for item := range slices.Values(toRemove) { - if err = openstack.RemoveFromAggregate(ctx, ac.computeClient, aggs, hv.Name, item); err != nil { - errs = append(errs, err) + // If onboarding is in progress (Initial or Testing), add test aggregate + onboardingCondition := meta.FindStatusCondition(hv.Status.Conditions, kvmv1.ConditionTypeOnboarding) + if onboardingCondition != nil && onboardingCondition.Status == metav1.ConditionTrue { + if onboardingCondition.Reason == kvmv1.ConditionReasonInitial || + onboardingCondition.Reason == kvmv1.ConditionReasonTesting { + zone := hv.Labels[corev1.LabelTopologyZone] + return []string{zone, testAggregateName}, metav1.Condition{ + Type: kvmv1.ConditionTypeAggregatesUpdated, + Status: metav1.ConditionFalse, + Reason: kvmv1.ConditionReasonTestAggregates, + Message: "Test aggregate applied during onboarding instead of spec aggregates", } } - } - if errs != nil { - err = fmt.Errorf("encountered errors during aggregate update: %w", errors.Join(errs...)) - if err2 := ac.setErrorCondition(ctx, hv, err.Error()); err2 != nil { - return ctrl.Result{}, errors.Join(err, err2) + // If removing test aggregate, use Spec.Aggregates (no test aggregate) + if onboardingCondition.Reason == kvmv1.ConditionReasonRemovingTestAggregate { + return hv.Spec.Aggregates, metav1.Condition{ + Type: kvmv1.ConditionTypeAggregatesUpdated, + Status: metav1.ConditionTrue, + Reason: kvmv1.ConditionReasonSucceeded, + Message: "Aggregates from spec applied successfully", + } } - return ctrl.Result{}, err } - base := hv.DeepCopy() - hv.Status.Aggregates = hv.Spec.Aggregates - meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ + // Normal operations or onboarding complete: use Spec.Aggregates + return hv.Spec.Aggregates, metav1.Condition{ Type: kvmv1.ConditionTypeAggregatesUpdated, Status: metav1.ConditionTrue, Reason: kvmv1.ConditionReasonSucceeded, - Message: "Aggregates updated successfully", - }) - return ctrl.Result{}, ac.Status().Patch(ctx, hv, k8sclient.MergeFromWithOptions(base, - k8sclient.MergeFromWithOptimisticLock{}), k8sclient.FieldOwner(AggregatesControllerName)) -} - -// setErrorCondition sets the error condition on the Hypervisor status, returns error if update fails -func (ac *AggregatesController) setErrorCondition(ctx context.Context, hv *kvmv1.Hypervisor, msg string) error { - condition := metav1.Condition{ - Type: kvmv1.ConditionTypeAggregatesUpdated, - Status: metav1.ConditionFalse, - Reason: kvmv1.ConditionReasonFailed, - Message: msg, + Message: "Aggregates from spec applied successfully", } - - base := hv.DeepCopy() - if meta.SetStatusCondition(&hv.Status.Conditions, condition) { - if err := ac.Status().Patch(ctx, hv, k8sclient.MergeFromWithOptions(base, - k8sclient.MergeFromWithOptimisticLock{}), k8sclient.FieldOwner(AggregatesControllerName)); err != nil { - return err - } - } - - return nil } // SetupWithManager sets up the controller with the Manager. @@ -154,7 +175,6 @@ func (ac *AggregatesController) SetupWithManager(mgr ctrl.Manager) error { if ac.computeClient, err = openstack.GetServiceClient(ctx, "compute", nil); err != nil { return err } - ac.computeClient.Microversion = "2.40" // gophercloud only supports numeric ids return ctrl.NewControllerManagedBy(mgr). Named(AggregatesControllerName). diff --git a/internal/controller/aggregates_controller_test.go b/internal/controller/aggregates_controller_test.go index 7f714a3..fe60055 100644 --- a/internal/controller/aggregates_controller_test.go +++ b/internal/controller/aggregates_controller_test.go @@ -25,6 +25,7 @@ import ( "github.com/gophercloud/gophercloud/v2/testhelper/client" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -50,6 +51,7 @@ var _ = Describe("AggregatesController", func() { "availability_zone": "", "deleted": false, "id": 100001, + "uuid": "uuid-100001", "hosts": ["hv-test"] }, { @@ -57,29 +59,21 @@ var _ = Describe("AggregatesController", func() { "availability_zone": "", "deleted": false, "id": 99, + "uuid": "uuid-99", "hosts": ["hv-test"] } ] } ` - AggregatesPostBody = ` -{ - "aggregate": { - "name": "test-aggregate1", - "availability_zone": "", - "deleted": false, - "id": 42 - } -}` - AggregateRemoveHostBody = ` { "aggregate": { "name": "test-aggregate3", "availability_zone": "", "deleted": false, - "id": 99 + "id": 99, + "uuid": "uuid-99" } }` @@ -92,7 +86,8 @@ var _ = Describe("AggregatesController", func() { "hosts": [ "hv-test" ], - "id": 42 + "id": 42, + "uuid": "uuid-42" } }` ) @@ -118,6 +113,9 @@ var _ = Describe("AggregatesController", func() { hypervisor := &kvmv1.Hypervisor{ ObjectMeta: metav1.ObjectMeta{ Name: hypervisorName.Name, + Labels: map[string]string{ + corev1.LabelTopologyZone: "zone-a", + }, }, Spec: kvmv1.HypervisorSpec{ LifecycleEnabled: true, @@ -138,6 +136,12 @@ var _ = Describe("AggregatesController", func() { }) Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) + By("Setting HypervisorID and ServiceID in status") + Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed()) + hypervisor.Status.HypervisorID = "test-hypervisor-id" + hypervisor.Status.ServiceID = "test-service-id" + Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) + By("Creating the AggregatesController") aggregatesController = &AggregatesController{ Client: k8sClient, @@ -153,27 +157,205 @@ var _ = Describe("AggregatesController", func() { Expect(result).To(Equal(ctrl.Result{})) }) - Context("Adding new Aggregate", func() { + Context("During onboarding phase", func() { BeforeEach(func(ctx SpecContext) { - By("Setting a missing aggregate") + By("Setting onboarding condition to true") hypervisor := &kvmv1.Hypervisor{} Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed()) - hypervisor.Spec.Aggregates = []string{"test-aggregate1"} + meta.SetStatusCondition(&hypervisor.Status.Conditions, metav1.Condition{ + Type: kvmv1.ConditionTypeOnboarding, + Status: metav1.ConditionTrue, + Reason: "Testing", + Message: "Onboarding in progress", + }) + Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) + + By("Setting desired aggregates including test aggregate") + Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed()) + hypervisor.Spec.Aggregates = []string{"zone-a"} Expect(k8sClient.Update(ctx, hypervisor)).To(Succeed()) - By("Mocking GetAggregates to return empty list") + By("Mocking GetAggregates to return aggregates without host") + aggregateList := `{ + "aggregates": [ + { + "name": "zone-a", + "availability_zone": "zone-a", + "deleted": false, + "id": 1, + "uuid": "uuid-zone-a", + "hosts": [] + }, + { + "name": "tenant_filter_tests", + "availability_zone": "", + "deleted": false, + "id": 99, + "uuid": "uuid-test", + "hosts": [] + } + ] + }` fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, AggregateListBodyEmpty) - Expect(err).NotTo(HaveOccurred()) + fmt.Fprint(w, aggregateList) + }) + + By("Mocking AddHost for both aggregates") + fakeServer.Mux.HandleFunc("POST /os-aggregates/1/action", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"aggregate": {"name": "zone-a", "id": 1, "uuid": "uuid-zone-a", "hosts": ["hv-test"]}}`) + }) + fakeServer.Mux.HandleFunc("POST /os-aggregates/99/action", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"aggregate": {"name": "tenant_filter_tests", "id": 99, "uuid": "uuid-test", "hosts": ["hv-test"]}}`) + }) + }) + + It("should add host to both specified aggregates and test aggregate", func(ctx SpecContext) { + updated := &kvmv1.Hypervisor{} + Expect(aggregatesController.Client.Get(ctx, hypervisorName, updated)).To(Succeed()) + Expect(updated.Status.Aggregates).To(ConsistOf("zone-a", testAggregateName)) + Expect(updated.Status.AggregateUUIDs).To(ConsistOf("uuid-zone-a", "uuid-test")) + + // During onboarding with test aggregate, condition should be False with TestAggregates reason + Expect(meta.IsStatusConditionFalse(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated)).To(BeTrue()) + cond := meta.FindStatusCondition(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated) + Expect(cond).NotTo(BeNil()) + Expect(cond.Reason).To(Equal(kvmv1.ConditionReasonTestAggregates)) + Expect(cond.Message).To(ContainSubstring("Test aggregate applied during onboarding")) + }) + }) + + Context("During normal operations", func() { + BeforeEach(func(ctx SpecContext) { + By("Setting desired aggregates") + hypervisor := &kvmv1.Hypervisor{} + Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed()) + hypervisor.Spec.Aggregates = []string{"zone-a", "zone-b"} + Expect(k8sClient.Update(ctx, hypervisor)).To(Succeed()) + + By("Mocking GetAggregates") + aggregateList := `{ + "aggregates": [ + { + "name": "zone-a", + "availability_zone": "zone-a", + "deleted": false, + "id": 1, + "uuid": "uuid-zone-a", + "hosts": [] + }, + { + "name": "zone-b", + "availability_zone": "zone-b", + "deleted": false, + "id": 2, + "uuid": "uuid-zone-b", + "hosts": [] + } + ] + }` + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateList) }) - By("Mocking CreateAggregate") - fakeServer.Mux.HandleFunc("POST /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + By("Mocking AddHost for both aggregates") + fakeServer.Mux.HandleFunc("POST /os-aggregates/1/action", func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, AggregatesPostBody) + fmt.Fprint(w, `{"aggregate": {"name": "zone-a", "id": 1, "uuid": "uuid-zone-a", "hosts": ["hv-test"]}}`) + }) + fakeServer.Mux.HandleFunc("POST /os-aggregates/2/action", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"aggregate": {"name": "zone-b", "id": 2, "uuid": "uuid-zone-b", "hosts": ["hv-test"]}}`) + }) + }) + + It("should add host to specified aggregates without test aggregate", func(ctx SpecContext) { + updated := &kvmv1.Hypervisor{} + Expect(aggregatesController.Client.Get(ctx, hypervisorName, updated)).To(Succeed()) + Expect(updated.Status.Aggregates).To(ConsistOf("zone-a", "zone-b")) + Expect(updated.Status.AggregateUUIDs).To(ConsistOf("uuid-zone-a", "uuid-zone-b")) + Expect(meta.IsStatusConditionTrue(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated)).To(BeTrue()) + }) + }) + + Context("When Spec.Aggregates matches Status.Aggregates", func() { + Context("but condition is not set", func() { + BeforeEach(func(ctx SpecContext) { + By("Setting matching aggregates in spec and status") + hypervisor := &kvmv1.Hypervisor{} + Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed()) + hypervisor.Spec.Aggregates = []string{"zone-a"} + hypervisor.Status.Aggregates = []string{"zone-a"} + hypervisor.Status.AggregateUUIDs = []string{"uuid-zone-a"} + Expect(k8sClient.Update(ctx, hypervisor)).To(Succeed()) + Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) + + By("Mocking GetAggregates to show host already in aggregate") + aggregateList := `{ + "aggregates": [ + { + "name": "zone-a", + "availability_zone": "zone-a", + "deleted": false, + "id": 1, + "uuid": "uuid-zone-a", + "hosts": ["hv-test"] + } + ] + }` + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateList) + }) + }) + + It("should proceed to update and set the condition", func(ctx SpecContext) { + updated := &kvmv1.Hypervisor{} + Expect(aggregatesController.Client.Get(ctx, hypervisorName, updated)).To(Succeed()) + Expect(updated.Status.Aggregates).To(ConsistOf("zone-a")) + Expect(updated.Status.AggregateUUIDs).To(ConsistOf("uuid-zone-a")) + Expect(meta.IsStatusConditionTrue(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated)).To(BeTrue()) + cond := meta.FindStatusCondition(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated) + Expect(cond.Reason).To(Equal(kvmv1.ConditionReasonSucceeded)) + }) + }) + }) + + Context("Adding to existing Aggregate", func() { + BeforeEach(func(ctx SpecContext) { + By("Setting a desired aggregate") + hypervisor := &kvmv1.Hypervisor{} + Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed()) + hypervisor.Spec.Aggregates = []string{"test-aggregate1"} + Expect(k8sClient.Update(ctx, hypervisor)).To(Succeed()) + + By("Mocking GetAggregates to return aggregate without host") + aggregateList := `{ + "aggregates": [ + { + "name": "test-aggregate1", + "availability_zone": "", + "deleted": false, + "id": 42, + "uuid": "uuid-42", + "hosts": [] + } + ] + }` + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, err := fmt.Fprint(w, aggregateList) Expect(err).NotTo(HaveOccurred()) }) @@ -197,6 +379,7 @@ var _ = Describe("AggregatesController", func() { updated := &kvmv1.Hypervisor{} Expect(aggregatesController.Client.Get(ctx, hypervisorName, updated)).To(Succeed()) Expect(updated.Status.Aggregates).To(ContainElements("test-aggregate1")) + Expect(updated.Status.AggregateUUIDs).To(ContainElements("uuid-42")) Expect(meta.IsStatusConditionTrue(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated)).To(BeTrue()) }) }) @@ -239,6 +422,7 @@ var _ = Describe("AggregatesController", func() { updated := &kvmv1.Hypervisor{} Expect(aggregatesController.Client.Get(ctx, hypervisorName, updated)).To(Succeed()) Expect(updated.Status.Aggregates).To(BeEmpty()) + Expect(updated.Status.AggregateUUIDs).To(BeEmpty()) Expect(meta.IsStatusConditionTrue(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated)).To(BeTrue()) }) }) @@ -251,20 +435,21 @@ var _ = Describe("AggregatesController", func() { Expect(result).To(Equal(ctrl.Result{})) }) - Context("before onboarding", func() { + Context("before onboarding (missing HypervisorID and ServiceID)", func() { BeforeEach(func(ctx SpecContext) { - By("Removing the onboarding condition") + By("Removing HypervisorID and ServiceID from status") hypervisor := &kvmv1.Hypervisor{} Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed()) - hypervisor.Status.Conditions = []metav1.Condition{} + hypervisor.Status.HypervisorID = "" + hypervisor.Status.ServiceID = "" Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) }) - It("should neither update Aggregates and nor set status condition", func(ctx SpecContext) { + It("should return early without updating aggregates or setting condition", func(ctx SpecContext) { updated := &kvmv1.Hypervisor{} Expect(aggregatesController.Client.Get(ctx, hypervisorName, updated)).To(Succeed()) Expect(updated.Status.Aggregates).To(BeEmpty()) - Expect(meta.IsStatusConditionTrue(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated)).To(BeFalse()) + Expect(meta.FindStatusCondition(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated)).To(BeNil()) }) }) @@ -280,13 +465,25 @@ var _ = Describe("AggregatesController", func() { Message: "dontcare", }) Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) + + By("Pre-setting the EvictionInProgress condition to match what controller will determine") + Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed()) + meta.SetStatusCondition(&hypervisor.Status.Conditions, metav1.Condition{ + Type: kvmv1.ConditionTypeAggregatesUpdated, + Status: metav1.ConditionFalse, + Reason: kvmv1.ConditionReasonEvictionInProgress, + Message: "Aggregates unchanged while terminating and eviction in progress", + }) + Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) }) It("should neither update Aggregates and nor set status condition", func(ctx SpecContext) { updated := &kvmv1.Hypervisor{} Expect(aggregatesController.Client.Get(ctx, hypervisorName, updated)).To(Succeed()) Expect(updated.Status.Aggregates).To(BeEmpty()) - Expect(meta.IsStatusConditionTrue(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated)).To(BeFalse()) + Expect(meta.IsStatusConditionFalse(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated)).To(BeTrue()) + cond := meta.FindStatusCondition(updated.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated) + Expect(cond.Reason).To(Equal(kvmv1.ConditionReasonEvictionInProgress)) }) }) }) @@ -302,7 +499,7 @@ var _ = Describe("AggregatesController", func() { Expect(cond.Message).To(ContainSubstring(expectedMessage)) } - Context("when GetAggregates fails", func() { + Context("when ApplyAggregates fails", func() { BeforeEach(func(ctx SpecContext) { By("Setting a missing aggregate") hypervisor := &kvmv1.Hypervisor{} @@ -310,7 +507,7 @@ var _ = Describe("AggregatesController", func() { hypervisor.Spec.Aggregates = []string{"test-aggregate1"} Expect(k8sClient.Update(ctx, hypervisor)).To(Succeed()) - By("Mocking GetAggregates to fail") + By("Mocking GET /os-aggregates to fail (first API call in ApplyAggregates)") fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusInternalServerError) @@ -322,79 +519,7 @@ var _ = Describe("AggregatesController", func() { It("should set error condition", func(ctx SpecContext) { _, err := aggregatesController.Reconcile(ctx, reconcileRequest) Expect(err).To(HaveOccurred()) - sharedErrorConditionChecks(ctx, "failed listing aggregates") - }) - }) - - Context("when AddToAggregate fails", func() { - BeforeEach(func(ctx SpecContext) { - By("Setting a missing aggregate") - hypervisor := &kvmv1.Hypervisor{} - Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed()) - hypervisor.Spec.Aggregates = []string{"test-aggregate1"} - Expect(k8sClient.Update(ctx, hypervisor)).To(Succeed()) - - By("Mocking GetAggregates") - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, AggregateListBodyEmpty) - Expect(err).NotTo(HaveOccurred()) - }) - - By("Mocking CreateAggregate") - fakeServer.Mux.HandleFunc("POST /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, AggregatesPostBody) - Expect(err).NotTo(HaveOccurred()) - }) - - By("Mocking AddHost to fail") - fakeServer.Mux.HandleFunc("POST /os-aggregates/42/action", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusConflict) - _, err := fmt.Fprint(w, `{"conflictingRequest": {"message": "Cannot add host to aggregate", "code": 409}}`) - Expect(err).NotTo(HaveOccurred()) - }) - }) - - It("should set error condition", func(ctx SpecContext) { - _, err := aggregatesController.Reconcile(ctx, reconcileRequest) - Expect(err).To(HaveOccurred()) - sharedErrorConditionChecks(ctx, "encountered errors during aggregate update") - }) - }) - - Context("when RemoveFromAggregate fails", func() { - BeforeEach(func(ctx SpecContext) { - By("Setting existing aggregate in status") - hypervisor := &kvmv1.Hypervisor{} - Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed()) - hypervisor.Status.Aggregates = []string{"test-aggregate2"} - Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) - - By("Mocking GetAggregates") - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, AggregateListBodyFull) - Expect(err).NotTo(HaveOccurred()) - }) - - By("Mocking RemoveHost to fail") - fakeServer.Mux.HandleFunc("POST /os-aggregates/100001/action", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusConflict) - _, err := fmt.Fprint(w, `{"conflictingRequest": {"message": "Cannot remove host from aggregate", "code": 409}}`) - Expect(err).NotTo(HaveOccurred()) - }) - }) - - It("should set error condition", func(ctx SpecContext) { - _, err := aggregatesController.Reconcile(ctx, reconcileRequest) - Expect(err).To(HaveOccurred()) - sharedErrorConditionChecks(ctx, "encountered errors during aggregate update") + sharedErrorConditionChecks(ctx, "failed to list aggregates") }) }) diff --git a/internal/controller/constants.go b/internal/controller/constants.go index 32126a6..5151bd2 100644 --- a/internal/controller/constants.go +++ b/internal/controller/constants.go @@ -22,4 +22,5 @@ const ( labelEvictionRequired = "cloud.sap/hypervisor-eviction-required" labelEvictionApproved = "cloud.sap/hypervisor-eviction-succeeded" labelHypervisor = "nova.openstack.cloud.sap/virt-driver" + testAggregateName = "tenant_filter_tests" ) diff --git a/internal/controller/decomission_controller.go b/internal/controller/decomission_controller.go index 3181adf..0a29c12 100644 --- a/internal/controller/decomission_controller.go +++ b/internal/controller/decomission_controller.go @@ -22,10 +22,8 @@ import ( "errors" "fmt" "net/http" - "slices" "github.com/gophercloud/gophercloud/v2" - "github.com/gophercloud/gophercloud/v2/openstack/compute/v2/aggregates" "github.com/gophercloud/gophercloud/v2/openstack/compute/v2/services" "github.com/gophercloud/gophercloud/v2/openstack/placement/v1/resourceproviders" "k8s.io/apimachinery/pkg/api/meta" @@ -115,22 +113,10 @@ func (r *NodeDecommissionReconciler) Reconcile(ctx context.Context, req ctrl.Req return r.setDecommissioningCondition(ctx, hv, msg) } - // Before removing the service, first take the node out of the aggregates, - // so when the node comes back, it doesn't up with the old associations - aggs, err := openstack.GetAggregatesByName(ctx, r.computeClient) - if err != nil { - return r.setDecommissioningCondition(ctx, hv, fmt.Sprintf("cannot list aggregates due to %v", err)) - } - - host := hv.Name - for name, aggregate := range aggs { - if slices.Contains(aggregate.Hosts, host) { - opts := aggregates.RemoveHostOpts{Host: host} - if err = aggregates.RemoveHost(ctx, r.computeClient, aggregate.ID, opts).Err; err != nil { - msg := fmt.Sprintf("failed to remove host %v from aggregate %v due to %v", name, host, err) - return r.setDecommissioningCondition(ctx, hv, msg) - } - } + // Wait for aggregates controller to remove from all aggregates + if len(hv.Status.Aggregates) > 0 { + msg := fmt.Sprintf("Waiting for aggregates to be removed, current: %v", hv.Status.Aggregates) + return r.setDecommissioningCondition(ctx, hv, msg) } // Deleting and evicted, so better delete the service diff --git a/internal/controller/decomission_controller_test.go b/internal/controller/decomission_controller_test.go index 06956be..05ecd4f 100644 --- a/internal/controller/decomission_controller_test.go +++ b/internal/controller/decomission_controller_test.go @@ -38,32 +38,10 @@ import ( var _ = Describe("Decommission Controller", func() { const ( - EOF = "EOF" - serviceId = "service-1234" - hypervisorName = "node-test" - namespaceName = "namespace-test" - AggregateListWithHv = ` -{ - "aggregates": [ - { - "name": "test-aggregate2", - "availability_zone": "", - "deleted": false, - "id": 100001, - "hosts": ["node-test"] - } - ] -} -` - AggregateRemoveHostBody = ` -{ - "aggregate": { - "name": "test-aggregate2", - "availability_zone": "", - "deleted": false, - "id": 100001 - } -}` + EOF = "EOF" + serviceId = "service-1234" + hypervisorName = "node-test" + namespaceName = "namespace-test" ) var ( @@ -173,27 +151,6 @@ var _ = Describe("Decommission Controller", func() { Expect(fmt.Fprintf(w, HypervisorWithServers, serviceId, "some reason", hypervisorName)).ToNot(BeNil()) }) - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, AggregateListWithHv) - Expect(err).NotTo(HaveOccurred()) - }) - - fakeServer.Mux.HandleFunc("POST /os-aggregates/100001/action", func(w http.ResponseWriter, r *http.Request) { - Expect(r.Header.Get("Content-Type")).To(Equal("application/json")) - expectedBody := `{"remove_host":{"host":"node-test"}}` - body := make([]byte, r.ContentLength) - _, err := r.Body.Read(body) - Expect(err == nil || err.Error() == EOF).To(BeTrue()) - Expect(string(body)).To(MatchJSON(expectedBody)) - - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err = fmt.Fprint(w, AggregateRemoveHostBody) - Expect(err).NotTo(HaveOccurred()) - }) - // c48f6247-abe4-4a24-824e-ea39e108874f comes from the HypervisorWithServers const fakeServer.Mux.HandleFunc("GET /resource_providers/c48f6247-abe4-4a24-824e-ea39e108874f", func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") @@ -249,6 +206,36 @@ var _ = Describe("Decommission Controller", func() { ), )) }) + + It("should clear Status.Aggregates when removing from all aggregates", func(ctx SpecContext) { + By("Setting initial aggregates and IDs in status") + hypervisor := &kvmv1.Hypervisor{} + Expect(k8sClient.Get(ctx, resourceName, hypervisor)).To(Succeed()) + hypervisor.Status.Aggregates = []string{"zone-a", "test-aggregate"} + hypervisor.Status.ServiceID = serviceId + hypervisor.Status.HypervisorID = "c48f6247-abe4-4a24-824e-ea39e108874f" + Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) + + By("Reconciling - decommission controller will wait for aggregates to be cleared") + _, err := decommissionReconciler.Reconcile(ctx, reconcileReq) + Expect(err).NotTo(HaveOccurred()) + + By("Simulating aggregates controller clearing aggregates") + Expect(k8sClient.Get(ctx, resourceName, hypervisor)).To(Succeed()) + hypervisor.Status.Aggregates = []string{} + hypervisor.Status.AggregateUUIDs = []string{} + Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) + + By("Reconciling again after aggregates are cleared") + for range 3 { + _, err := decommissionReconciler.Reconcile(ctx, reconcileReq) + Expect(err).NotTo(HaveOccurred()) + } + + By("Verifying Status.Aggregates is empty") + Expect(k8sClient.Get(ctx, resourceName, hypervisor)).To(Succeed()) + Expect(hypervisor.Status.Aggregates).To(BeEmpty(), "Status.Aggregates should be cleared after decommissioning") + }) }) }) }) @@ -361,42 +348,8 @@ var _ = Describe("Decommission Controller", func() { }) }) - Context("When listing aggregates fails", func() { - BeforeEach(func() { - fakeServer.Mux.HandleFunc("GET /os-hypervisors/detail", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - fmt.Fprintf(w, `{ - "hypervisors": [{ - "id": "c48f6247-abe4-4a24-824e-ea39e108874f", - "hypervisor_hostname": "node-test", - "hypervisor_version": 2002000, - "state": "up", - "status": "enabled", - "running_vms": 0, - "service": { - "id": "service-1234", - "host": "node-test" - } - }] - }`) - }) - - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprint(w, `{"error": "Internal Server Error"}`) - }) - }) - - It("should set decommissioning condition with error", func(ctx SpecContext) { - _, err := decommissionReconciler.Reconcile(ctx, reconcileReq) - Expect(err).NotTo(HaveOccurred()) - sharedDecommissioningErrorCheck(ctx, "cannot list aggregates") - }) - }) - - Context("When removing host from aggregate fails", func() { - BeforeEach(func() { + Context("When aggregates are not empty", func() { + BeforeEach(func(ctx SpecContext) { fakeServer.Mux.HandleFunc("GET /os-hypervisors/detail", func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -416,59 +369,17 @@ var _ = Describe("Decommission Controller", func() { }`) }) - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - fmt.Fprint(w, AggregateListWithHv) - }) - - fakeServer.Mux.HandleFunc("POST /os-aggregates/100001/action", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprint(w, `{"error": "Internal Server Error"}`) - }) - - // Add handlers for subsequent steps even though we expect failure earlier - fakeServer.Mux.HandleFunc("DELETE /os-services/service-1234", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusNoContent) - }) - - fakeServer.Mux.HandleFunc("GET /resource_providers/c48f6247-abe4-4a24-824e-ea39e108874f", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - fmt.Fprint(w, `{"uuid": "rp-uuid", "name": "hv-test"}`) - }) - - fakeServer.Mux.HandleFunc("GET /resource_providers/rp-uuid/allocations", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - fmt.Fprint(w, `{"allocations": {}}`) - }) - - fakeServer.Mux.HandleFunc("DELETE /resource_providers/rp-uuid", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusAccepted) - }) + // Simulate aggregates controller hasn't cleared aggregates yet + hypervisor := &kvmv1.Hypervisor{} + Expect(k8sClient.Get(ctx, resourceName, hypervisor)).To(Succeed()) + hypervisor.Status.Aggregates = []string{"zone-a", "test-aggregate"} + Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed()) }) - It("should set decommissioning condition with error", func(ctx SpecContext) { + It("should wait for aggregates to be cleared", func(ctx SpecContext) { _, err := decommissionReconciler.Reconcile(ctx, reconcileReq) Expect(err).NotTo(HaveOccurred()) - - hypervisor := &kvmv1.Hypervisor{} - Expect(k8sClient.Get(ctx, resourceName, hypervisor)).To(Succeed()) - - Expect(hypervisor.Status.Conditions).To(ContainElement( - SatisfyAll( - HaveField("Type", kvmv1.ConditionTypeReady), - HaveField("Status", metav1.ConditionFalse), - HaveField("Reason", "Decommissioning"), - HaveField("Message", ContainSubstring("failed to remove host")), - ), - )) - - // Should NOT be offboarded yet - Expect(hypervisor.Status.Conditions).NotTo(ContainElement( - HaveField("Type", kvmv1.ConditionTypeOffboarded), - )) + sharedDecommissioningErrorCheck(ctx, "Waiting for aggregates to be removed") }) }) @@ -493,12 +404,6 @@ var _ = Describe("Decommission Controller", func() { }`) }) - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - fmt.Fprint(w, `{"aggregates": []}`) - }) - fakeServer.Mux.HandleFunc("DELETE /os-services/service-1234", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) fmt.Fprint(w, `{"error": "Internal Server Error"}`) @@ -533,12 +438,6 @@ var _ = Describe("Decommission Controller", func() { }`) }) - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - fmt.Fprint(w, `{"aggregates": []}`) - }) - fakeServer.Mux.HandleFunc("DELETE /os-services/service-1234", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) }) @@ -577,12 +476,6 @@ var _ = Describe("Decommission Controller", func() { }`) }) - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - fmt.Fprint(w, `{"aggregates": []}`) - }) - fakeServer.Mux.HandleFunc("DELETE /os-services/service-1234", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) }) diff --git a/internal/controller/onboarding_controller.go b/internal/controller/onboarding_controller.go index 142c26d..38ed45e 100644 --- a/internal/controller/onboarding_controller.go +++ b/internal/controller/onboarding_controller.go @@ -53,7 +53,6 @@ var errRequeue = errors.New("requeue requested") const ( defaultWaitTime = 1 * time.Minute - testAggregateName = "tenant_filter_tests" testProjectName = "test" testDomainName = "cc3test" testImageName = "cirros-d240801-kvm" @@ -127,13 +126,15 @@ func (r *OnboardingController) Reconcile(ctx context.Context, req ctrl.Request) switch status.Reason { case kvmv1.ConditionReasonInitial: - return ctrl.Result{}, r.initialOnboarding(ctx, hv, computeHost) + return ctrl.Result{}, r.initialOnboarding(ctx, hv) case kvmv1.ConditionReasonTesting: if hv.Spec.SkipTests { return r.completeOnboarding(ctx, computeHost, hv) } else { return r.smokeTest(ctx, hv, computeHost) } + case kvmv1.ConditionReasonRemovingTestAggregate: + return r.completeOnboarding(ctx, computeHost, hv) default: // Nothing to be done return ctrl.Result{}, nil @@ -191,40 +192,17 @@ func (r *OnboardingController) abortOnboarding(ctx context.Context, hv *kvmv1.Hy return r.patchStatus(ctx, hv, base) } -func (r *OnboardingController) initialOnboarding(ctx context.Context, hv *kvmv1.Hypervisor, host string) error { +func (r *OnboardingController) initialOnboarding(ctx context.Context, hv *kvmv1.Hypervisor) error { zone, found := hv.Labels[corev1.LabelTopologyZone] if !found || zone == "" { return fmt.Errorf("cannot find availability-zone label %v on node", corev1.LabelTopologyZone) } - aggs, err := openstack.GetAggregatesByName(ctx, r.computeClient) - if err != nil { - return fmt.Errorf("cannot list aggregates %w", err) - } - - if err = openstack.AddToAggregate(ctx, r.computeClient, aggs, host, zone, zone); err != nil { - return fmt.Errorf("failed to agg to availability-zone aggregate %w", err) - } - - err = openstack.AddToAggregate(ctx, r.computeClient, aggs, host, testAggregateName, "") - if err != nil { - return fmt.Errorf("failed to agg to test aggregate %w", err) - } - - var errs []error - for aggregateName, aggregate := range aggs { - if aggregateName == testAggregateName || aggregateName == zone { - continue - } - if slices.Contains(aggregate.Hosts, host) { - if err := openstack.RemoveFromAggregate(ctx, r.computeClient, aggs, host, aggregateName); err != nil { - errs = append(errs, err) - } - } - } - - if len(errs) > 0 { - return fmt.Errorf("failed to remove host %v from aggregates due to %w", host, errors.Join(errs...)) + // Wait for aggregates controller to apply the desired state (zone and test aggregate) + expectedAggregates := []string{zone, testAggregateName} + if !slices.Equal(hv.Status.Aggregates, expectedAggregates) { + // Aggregates not yet applied, requeue + return errRequeue } // The service may be forced down previously due to an HA event, @@ -330,6 +308,37 @@ func (r *OnboardingController) smokeTest(ctx context.Context, hv *kvmv1.Hypervis func (r *OnboardingController) completeOnboarding(ctx context.Context, host string, hv *kvmv1.Hypervisor) (ctrl.Result, error) { log := logger.FromContext(ctx) + // Check if we're in the RemovingTestAggregate phase + onboardingCondition := meta.FindStatusCondition(hv.Status.Conditions, kvmv1.ConditionTypeOnboarding) + if onboardingCondition != nil && onboardingCondition.Reason == kvmv1.ConditionReasonRemovingTestAggregate { + // We're waiting for aggregates controller to sync + if !meta.IsStatusConditionTrue(hv.Status.Conditions, kvmv1.ConditionTypeAggregatesUpdated) { + log.Info("waiting for aggregates to be updated", "condition", kvmv1.ConditionTypeAggregatesUpdated) + return ctrl.Result{RequeueAfter: defaultWaitTime}, nil + } + + // Aggregates have been synced, mark onboarding as complete + log.Info("aggregates updated successfully", "aggregates", hv.Status.Aggregates) + base := hv.DeepCopy() + + meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ + Type: kvmv1.ConditionTypeOnboarding, + Status: metav1.ConditionFalse, + Reason: kvmv1.ConditionReasonSucceeded, + Message: "Onboarding completed", + }) + + meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ + Type: kvmv1.ConditionTypeReady, + Status: metav1.ConditionTrue, + Reason: kvmv1.ConditionReasonReadyReady, + Message: "Hypervisor is ready", + }) + + return ctrl.Result{}, r.patchStatus(ctx, hv, base) + } + + // First time in completeOnboarding - clean up and prepare for aggregate sync err := r.deleteTestServers(ctx, host) if err != nil { base := hv.DeepCopy() @@ -345,17 +354,7 @@ func (r *OnboardingController) completeOnboarding(ctx context.Context, host stri return ctrl.Result{}, err } - aggs, err := openstack.GetAggregatesByName(ctx, r.computeClient) - if err != nil { - return ctrl.Result{}, fmt.Errorf("failed to get aggregates %w", err) - } - - err = openstack.RemoveFromAggregate(ctx, r.computeClient, aggs, host, testAggregateName) - if err != nil { - return ctrl.Result{}, fmt.Errorf("failed to remove from test aggregate %w", err) - } - log.Info("removed from test-aggregate", "name", testAggregateName) - + // Enable HA service before marking onboarding complete err = enableInstanceHA(hv) log.Info("enabled instance-ha") if err != nil { @@ -363,21 +362,16 @@ func (r *OnboardingController) completeOnboarding(ctx context.Context, host stri } base := hv.DeepCopy() - // Set hypervisor ready condition - meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ - Type: kvmv1.ConditionTypeReady, - Status: metav1.ConditionTrue, - Reason: kvmv1.ConditionReasonReadyReady, - Message: "Hypervisor is ready", - }) - // set onboarding condition completed + // Mark onboarding as removing test aggregate - signals aggregates controller to use Spec.Aggregates meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ Type: kvmv1.ConditionTypeOnboarding, - Status: metav1.ConditionFalse, - Reason: kvmv1.ConditionReasonSucceeded, - Message: "Onboarding completed", + Status: metav1.ConditionTrue, + Reason: kvmv1.ConditionReasonRemovingTestAggregate, + Message: "Removing test aggregate", }) + + // Patch status to signal aggregates controller return ctrl.Result{}, r.patchStatus(ctx, hv, base) } diff --git a/internal/controller/onboarding_controller_test.go b/internal/controller/onboarding_controller_test.go index bc2fb89..d902a65 100644 --- a/internal/controller/onboarding_controller_test.go +++ b/internal/controller/onboarding_controller_test.go @@ -83,97 +83,11 @@ const ( var _ = Describe("Onboarding Controller", func() { const ( - region = "test-region" - availabilityZone = "test-az" - hypervisorName = "test-host" - serviceId = "service-id" - hypervisorId = "c48f6247-abe4-4a24-824e-ea39e108874f" - aggregatesBodyInitial = `{ - "aggregates": [ - { - "name": "test-az", - "availability_zone": "test-az", - "deleted": false, - "id": 100001, - "hosts": [] - }, - { - "name": "tenant_filter_tests", - "availability_zone": "", - "deleted": false, - "id": 99, - "hosts": [] - } - ] -}` - - aggregatesBodyUnexpected = `{ - "aggregates": [ - { - "name": "test-az", - "availability_zone": "test-az", - "deleted": false, - "id": 100001, - "hosts": [] - }, - { - "name": "tenant_filter_tests", - "availability_zone": "", - "deleted": false, - "id": 99, - "hosts": [] - }, - { - "name": "unexpected", - "availability_zone": "", - "deleted": false, - "id": -1, - "hosts": ["test-host"] - } - ] -}` - - aggregatesBodySetup = `{ - "aggregates": [ - { - "name": "test-az", - "availability_zone": "test-az", - "deleted": false, - "id": 100001, - "hosts": ["test-host"] - }, - { - "name": "tenant_filter_tests", - "availability_zone": "", - "deleted": false, - "id": 99, - "hosts": ["test-host"] - } - ] -}` - addedHostToAzBody = `{ - "aggregate": { - "name": "test-az", - "availability_zone": "test-az", - "deleted": false, - "hosts": [ - "test-host" - ], - "id": 100001 - } -}` - - addedHostToTestBody = `{ - "aggregate": { - "name": "tenant_filter_tests", - "availability_zone": "", - "deleted": false, - "hosts": [ - "test-host" - ], - "id": 99 - } -}` + region = "test-region" + availabilityZone = "test-az" + hypervisorName = "test-host" + serviceId = "service-id" + hypervisorId = "c48f6247-abe4-4a24-824e-ea39e108874f" flavorDetailsBody = `{ "flavors": [ @@ -325,20 +239,6 @@ var _ = Describe("Onboarding Controller", func() { Expect(fmt.Fprintf(w, HypervisorWithServers, serviceId, "", hypervisorName)).ToNot(BeNil()) }) - fakeServer.Mux.HandleFunc("POST /os-aggregates/100001/action", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, addedHostToAzBody) - Expect(err).NotTo(HaveOccurred()) - }) - - fakeServer.Mux.HandleFunc("POST /os-aggregates/99/action", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, addedHostToTestBody) - Expect(err).NotTo(HaveOccurred()) - }) - fakeServer.Mux.HandleFunc("PUT /os-services/service-id", func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -348,15 +248,6 @@ var _ = Describe("Onboarding Controller", func() { }) When("it is a clean setup", func() { - BeforeEach(func() { - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, aggregatesBodyInitial) - Expect(err).NotTo(HaveOccurred()) - }) - }) - It("should set the Service- and HypervisorId from Nova", func(ctx SpecContext) { By("Reconciling the created resource") err := reconcileLoop(ctx, 1) @@ -368,10 +259,20 @@ var _ = Describe("Onboarding Controller", func() { }) It("should update the status accordingly", func(ctx SpecContext) { - By("Reconciling the created resource") - err := reconcileLoop(ctx, 2) + By("Reconciling the created resource to set IDs") + err := reconcileLoop(ctx, 1) Expect(err).NotTo(HaveOccurred()) + + By("Simulating aggregates controller setting aggregates") hv := &kvmv1.Hypervisor{} + Expect(k8sClient.Get(ctx, namespacedName, hv)).To(Succeed()) + hv.Status.Aggregates = []string{availabilityZone, testAggregateName} + Expect(k8sClient.Status().Update(ctx, hv)).To(Succeed()) + + By("Reconciling again to process onboarding") + err = reconcileLoop(ctx, 1) + Expect(err).NotTo(HaveOccurred()) + Expect(k8sClient.Get(ctx, namespacedName, hv)).To(Succeed()) Expect(hv.Status.Conditions).To(ContainElements( SatisfyAll( @@ -389,22 +290,6 @@ var _ = Describe("Onboarding Controller", func() { }) When("it the host is already in an unexpected aggregate", func() { - BeforeEach(func() { - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, aggregatesBodyUnexpected) - Expect(err).NotTo(HaveOccurred()) - }) - - fakeServer.Mux.HandleFunc("POST /os-aggregates/-1/action", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, addedHostToTestBody) - Expect(err).NotTo(HaveOccurred()) - }) - }) - It("should set the Service- and HypervisorId from Nova", func(ctx SpecContext) { By("Reconciling the created resource") err := reconcileLoop(ctx, 1) @@ -416,10 +301,20 @@ var _ = Describe("Onboarding Controller", func() { }) It("should update the status accordingly", func(ctx SpecContext) { - By("Reconciling the created resource") - err := reconcileLoop(ctx, 2) + By("Reconciling the created resource to set IDs") + err := reconcileLoop(ctx, 1) Expect(err).NotTo(HaveOccurred()) + + By("Simulating aggregates controller setting aggregates") hv := &kvmv1.Hypervisor{} + Expect(k8sClient.Get(ctx, namespacedName, hv)).To(Succeed()) + hv.Status.Aggregates = []string{availabilityZone, testAggregateName} + Expect(k8sClient.Status().Update(ctx, hv)).To(Succeed()) + + By("Reconciling again to process onboarding") + err = reconcileLoop(ctx, 1) + Expect(err).NotTo(HaveOccurred()) + Expect(k8sClient.Get(ctx, namespacedName, hv)).To(Succeed()) Expect(hv.Status.Conditions).To(ContainElements( SatisfyAll( @@ -455,20 +350,6 @@ var _ = Describe("Onboarding Controller", func() { }) Expect(k8sClient.Status().Update(ctx, hv)).To(Succeed()) - fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, aggregatesBodySetup) - Expect(err).NotTo(HaveOccurred()) - }) - - fakeServer.Mux.HandleFunc("PUT /os-aggregates", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, aggregatesBodySetup) - Expect(err).NotTo(HaveOccurred()) - }) - fakeServer.Mux.HandleFunc("PUT /os-services/service-id", func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -490,13 +371,6 @@ var _ = Describe("Onboarding Controller", func() { Expect(err).NotTo(HaveOccurred()) }) - fakeServer.Mux.HandleFunc("POST /os-aggregates/99/action", func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprint(w, addedHostToTestBody) - Expect(err).NotTo(HaveOccurred()) - }) - fakeServer.Mux.HandleFunc("POST /instance-ha", func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -551,14 +425,44 @@ var _ = Describe("Onboarding Controller", func() { Expect(k8sClient.Get(ctx, namespacedName, hv)).To(Succeed()) hv.Spec.SkipTests = true Expect(k8sClient.Update(ctx, hv)).To(Succeed()) + + // Simulate aggregates being set by aggregates controller + hv.Status.Aggregates = []string{availabilityZone, testAggregateName} + Expect(k8sClient.Status().Update(ctx, hv)).To(Succeed()) }) It("should update the conditions", func(ctx SpecContext) { - By("Reconciling the created resource") - err := reconcileLoop(ctx, 3) + By("Reconciling to move from Initial to Testing state") + err := reconcileLoop(ctx, 1) + Expect(err).NotTo(HaveOccurred()) + + By("Reconciling again to call completeOnboarding and set RemovingTestAggregate") + err = reconcileLoop(ctx, 1) Expect(err).NotTo(HaveOccurred()) + + By("Verifying onboarding is in RemovingTestAggregate state") hv := &kvmv1.Hypervisor{} Expect(k8sClient.Get(ctx, namespacedName, hv)).To(Succeed()) + onboardingCond := meta.FindStatusCondition(hv.Status.Conditions, kvmv1.ConditionTypeOnboarding) + Expect(onboardingCond).NotTo(BeNil()) + Expect(onboardingCond.Reason).To(Equal(kvmv1.ConditionReasonRemovingTestAggregate)) + + By("Simulating aggregates controller updating aggregates and setting condition") + hv.Status.Aggregates = []string{availabilityZone} + meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ + Type: kvmv1.ConditionTypeAggregatesUpdated, + Status: metav1.ConditionTrue, + Reason: kvmv1.ConditionReasonSucceeded, + Message: "Aggregates updated successfully", + }) + Expect(k8sClient.Status().Update(ctx, hv)).To(Succeed()) + + By("Reconciling once more to complete onboarding and set Ready") + err = reconcileLoop(ctx, 1) + Expect(err).NotTo(HaveOccurred()) + + By("Verifying final state") + Expect(k8sClient.Get(ctx, namespacedName, hv)).To(Succeed()) Expect(hv.Status.Conditions).To(ContainElements( SatisfyAll( HaveField("Type", kvmv1.ConditionTypeReady), @@ -578,13 +482,37 @@ var _ = Describe("Onboarding Controller", func() { Expect(k8sClient.Get(ctx, namespacedName, hv)).To(Succeed()) hv.Spec.SkipTests = false Expect(k8sClient.Update(ctx, hv)).To(Succeed()) + + // Simulate aggregates being set by aggregates controller + hv.Status.Aggregates = []string{availabilityZone, testAggregateName} + Expect(k8sClient.Status().Update(ctx, hv)).To(Succeed()) }) It("should update the conditions", func(ctx SpecContext) { - By("Reconciling the created resource") - err := reconcileLoop(ctx, 3) + By("Reconciling to run tests") + err := reconcileLoop(ctx, 1) Expect(err).NotTo(HaveOccurred()) + + By("Reconciling again after server is active") + err = reconcileLoop(ctx, 1) + Expect(err).NotTo(HaveOccurred()) + + By("Simulating aggregates controller setting condition after removing test aggregate") hv := &kvmv1.Hypervisor{} + Expect(k8sClient.Get(ctx, namespacedName, hv)).To(Succeed()) + hv.Status.Aggregates = []string{availabilityZone} + meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ + Type: kvmv1.ConditionTypeAggregatesUpdated, + Status: metav1.ConditionTrue, + Reason: kvmv1.ConditionReasonSucceeded, + Message: "Aggregates updated successfully", + }) + Expect(k8sClient.Status().Update(ctx, hv)).To(Succeed()) + + By("Reconciling to complete onboarding after aggregates condition is set") + err = reconcileLoop(ctx, 5) + Expect(err).NotTo(HaveOccurred()) + Expect(k8sClient.Get(ctx, namespacedName, hv)).To(Succeed()) Expect(hv.Status.Conditions).To(ContainElements( SatisfyAll( diff --git a/internal/openstack/aggregates.go b/internal/openstack/aggregates.go index 1025ada..c051f88 100644 --- a/internal/openstack/aggregates.go +++ b/internal/openstack/aggregates.go @@ -16,6 +16,7 @@ package openstack import ( "context" + "errors" "fmt" "slices" @@ -25,83 +26,98 @@ import ( "github.com/gophercloud/gophercloud/v2/openstack/compute/v2/aggregates" ) -// GetAggregatesByName retrieves all aggregates from nova and returns them as a map keyed by name. -func GetAggregatesByName(ctx context.Context, serviceClient *gophercloud.ServiceClient) (map[string]*aggregates.Aggregate, error) { +// ApplyAggregates ensures a host is in exactly the specified aggregates. +// +// The function performs a two-phase operation to maintain security: +// 1. First, adds the host to all desired aggregates (if not already present) +// 2. Then, removes the host from any aggregates it shouldn't be in +// +// This ordering prevents leaving the host unprotected between operations when +// aggregates have filter criteria. However, conflicts may still occur with +// aggregates in different availability zones, in which case errors are collected +// and returned together for eventual convergence. +// +// All specified aggregates must already exist in OpenStack. If any desired +// aggregate is not found, an error is returned listing the missing aggregates. +// +// Pass an empty list to remove the host from all aggregates. +func ApplyAggregates(ctx context.Context, serviceClient *gophercloud.ServiceClient, host string, desiredAggregates []string) ([]string, error) { + log := logger.FromContext(ctx) + + oldMicroVersion := serviceClient.Microversion + serviceClient.Microversion = "2.93" // Something bigger than 2.41 for UUIDs + defer func() { + serviceClient.Microversion = oldMicroVersion + }() + + // Fetch all aggregates pages, err := aggregates.List(serviceClient).AllPages(ctx) if err != nil { - return nil, fmt.Errorf("cannot list aggregates due to %w", err) + return nil, fmt.Errorf("failed to list aggregates: %w", err) } - aggs, err := aggregates.ExtractAggregates(pages) + allAggregates, err := aggregates.ExtractAggregates(pages) if err != nil { - return nil, fmt.Errorf("cannot list aggregates due to %w", err) + return nil, fmt.Errorf("failed to extract aggregates: %w", err) } - aggregateMap := make(map[string]*aggregates.Aggregate, len(aggs)) - for _, aggregate := range aggs { - aggregateMap[aggregate.Name] = &aggregate + // Build desired set for lookups + desiredSet := make(map[string]bool, len(desiredAggregates)) + for _, name := range desiredAggregates { + desiredSet[name] = true } - return aggregateMap, nil -} -// AddToAggregate adds the given host to the named aggregate, creating the aggregate if it does not yet exist. -func AddToAggregate(ctx context.Context, serviceClient *gophercloud.ServiceClient, aggs map[string]*aggregates.Aggregate, host, name, zone string) (err error) { - aggregate, found := aggs[name] - log := logger.FromContext(ctx) - if !found { - aggregate, err = aggregates.Create(ctx, serviceClient, - aggregates.CreateOpts{ - Name: name, - AvailabilityZone: zone, - }).Extract() - if err != nil { - return fmt.Errorf("failed to create aggregate %v due to %w", name, err) + var uuids []string + var errs []error + var toRemove []aggregates.Aggregate + + // Single pass: handle adds immediately, collect removes for later + for _, agg := range allAggregates { + hostInAggregate := slices.Contains(agg.Hosts, host) + aggregateDesired := desiredSet[agg.Name] + + if aggregateDesired { + // Mark as found + delete(desiredSet, agg.Name) + uuids = append(uuids, agg.UUID) + + if !hostInAggregate { + // Add host to this aggregate + log.Info("Adding to aggregate", "aggregate", agg.Name, "host", host) + _, err := aggregates.AddHost(ctx, serviceClient, agg.ID, aggregates.AddHostOpts{Host: host}).Extract() + if err != nil { + errs = append(errs, fmt.Errorf("failed to add host %v to aggregate %v: %w", host, agg.Name, err)) + } + } + } else if hostInAggregate { + // Collect for removal (after all adds complete) + toRemove = append(toRemove, agg) } - aggs[name] = aggregate - } - - if slices.Contains(aggregate.Hosts, host) { - log.Info("Found host in aggregate", "host", host, "name", name) - return nil - } - - result, err := aggregates.AddHost(ctx, serviceClient, aggregate.ID, aggregates.AddHostOpts{Host: host}).Extract() - if err != nil { - return fmt.Errorf("failed to add host %v to aggregate %v due to %w", host, name, err) } - log.Info("Added host to aggregate", "host", host, "name", name) - aggs[name] = result - - return nil -} -// RemoveFromAggregate removes the given host from the named aggregate. -func RemoveFromAggregate(ctx context.Context, serviceClient *gophercloud.ServiceClient, aggs map[string]*aggregates.Aggregate, host, name string) error { - aggregate, found := aggs[name] - log := logger.FromContext(ctx) - if !found { - log.Info("cannot find aggregate", "name", name) - return nil - } - - found = false - for _, aggHost := range aggregate.Hosts { - if aggHost == host { - found = true + // Error if any desired aggregates don't exist + if len(desiredSet) > 0 { + var missing []string + for name := range desiredSet { + missing = append(missing, name) } + errs = append(errs, fmt.Errorf("aggregates not found: %v", missing)) } - if !found { - log.Info("cannot find host in aggregate", "host", host, "name", name) - return nil + // Remove host from unwanted aggregates (after all adds complete) + if len(toRemove) > 0 { + for _, agg := range toRemove { + log.Info("Removing from aggregate", "aggregate", agg.Name, "host", host) + _, err := aggregates.RemoveHost(ctx, serviceClient, agg.ID, aggregates.RemoveHostOpts{Host: host}).Extract() + if err != nil { + errs = append(errs, fmt.Errorf("failed to remove host %v from aggregate %v: %w", host, agg.Name, err)) + } + } } - result, err := aggregates.RemoveHost(ctx, serviceClient, aggregate.ID, aggregates.RemoveHostOpts{Host: host}).Extract() - if err != nil { - return fmt.Errorf("failed to add host %v to aggregate %v due to %w", host, name, err) + if len(errs) > 0 { + return nil, errors.Join(errs...) + } else { + return uuids, nil } - aggs[name] = result - log.Info("removed host from aggregate", "host", host, "name", name) - - return nil } diff --git a/internal/openstack/aggregates_test.go b/internal/openstack/aggregates_test.go new file mode 100644 index 0000000..d4bdcef --- /dev/null +++ b/internal/openstack/aggregates_test.go @@ -0,0 +1,378 @@ +/* +SPDX-FileCopyrightText: Copyright 2025 SAP SE or an SAP affiliate company and cobaltcore-dev contributors +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package openstack + +import ( + "context" + "fmt" + "net/http" + + "github.com/gophercloud/gophercloud/v2/testhelper" + "github.com/gophercloud/gophercloud/v2/testhelper/client" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("ApplyAggregates", func() { + const ( + aggregateListWithHost = `{ + "aggregates": [ + { + "name": "agg1", + "availability_zone": "az1", + "deleted": false, + "id": 1, + "uuid": "uuid-agg1", + "hosts": ["test-host"] + }, + { + "name": "agg2", + "availability_zone": "az2", + "deleted": false, + "id": 2, + "uuid": "uuid-agg2", + "hosts": ["test-host"] + }, + { + "name": "agg3", + "availability_zone": "az3", + "deleted": false, + "id": 3, + "uuid": "uuid-agg3", + "hosts": [] + } + ] + }` + + aggregateAddHostResponse = `{ + "aggregate": { + "name": "agg3", + "availability_zone": "az3", + "deleted": false, + "id": 3, + "uuid": "uuid-agg3", + "hosts": ["test-host"] + } + }` + + aggregateRemoveHostResponse = `{ + "aggregate": { + "name": "agg1", + "availability_zone": "az1", + "deleted": false, + "id": 1, + "uuid": "uuid-agg1", + "hosts": [] + } + }` + ) + + var ( + fakeServer testhelper.FakeServer + ctx context.Context + ) + + BeforeEach(func() { + fakeServer = testhelper.SetupHTTP() + ctx = context.Background() + }) + + AfterEach(func() { + fakeServer.Teardown() + }) + + Context("when adding host to new aggregate", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + + fakeServer.Mux.HandleFunc("POST /os-aggregates/3/action", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateAddHostResponse) + }) + }) + + It("should add host to agg3", func() { + serviceClient := client.ServiceClient(fakeServer) + uuids, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1", "agg2", "agg3"}) + Expect(err).NotTo(HaveOccurred()) + Expect(uuids).To(ConsistOf("uuid-agg1", "uuid-agg2", "uuid-agg3")) + }) + }) + + Context("when removing host from aggregate", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + + fakeServer.Mux.HandleFunc("POST /os-aggregates/1/action", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateRemoveHostResponse) + }) + }) + + It("should remove host from agg1", func() { + serviceClient := client.ServiceClient(fakeServer) + uuids, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg2"}) + Expect(err).NotTo(HaveOccurred()) + Expect(uuids).To(ConsistOf("uuid-agg2")) + }) + }) + + Context("when removing host from all aggregates", func() { + var removeCalls int + + BeforeEach(func() { + removeCalls = 0 + + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + + fakeServer.Mux.HandleFunc("POST /os-aggregates/1/action", func(w http.ResponseWriter, r *http.Request) { + removeCalls++ + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateRemoveHostResponse) + }) + + fakeServer.Mux.HandleFunc("POST /os-aggregates/2/action", func(w http.ResponseWriter, r *http.Request) { + removeCalls++ + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"aggregate": {"name": "agg2", "id": 2, "uuid": "uuid-agg2", "hosts": []}}`) + }) + }) + + It("should remove host from all aggregates", func() { + serviceClient := client.ServiceClient(fakeServer) + uuids, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{}) + Expect(err).NotTo(HaveOccurred()) + Expect(removeCalls).To(Equal(2)) + Expect(uuids).To(BeEmpty()) + }) + }) + + Context("when host already in desired aggregates", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + }) + + It("should not make any changes", func() { + serviceClient := client.ServiceClient(fakeServer) + uuids, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1", "agg2"}) + Expect(err).NotTo(HaveOccurred()) + Expect(uuids).To(ConsistOf("uuid-agg1", "uuid-agg2")) + }) + }) + + Context("when adding and removing simultaneously", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + + fakeServer.Mux.HandleFunc("POST /os-aggregates/3/action", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateAddHostResponse) + }) + + fakeServer.Mux.HandleFunc("POST /os-aggregates/1/action", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateRemoveHostResponse) + }) + }) + + It("should replace agg1 with agg3", func() { + serviceClient := client.ServiceClient(fakeServer) + uuids, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg2", "agg3"}) + Expect(err).NotTo(HaveOccurred()) + Expect(uuids).To(ConsistOf("uuid-agg2", "uuid-agg3")) + }) + }) + + Context("when listing aggregates fails", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprint(w, `{"error": "Internal Server Error"}`) + }) + }) + + It("should return an error", func() { + serviceClient := client.ServiceClient(fakeServer) + _, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1"}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to list aggregates")) + }) + }) + + Context("when adding host fails", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + + fakeServer.Mux.HandleFunc("POST /os-aggregates/3/action", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusConflict) + fmt.Fprint(w, `{"conflictingRequest": {"message": "Cannot add host", "code": 409}}`) + }) + }) + + It("should return an error", func() { + serviceClient := client.ServiceClient(fakeServer) + _, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1", "agg2", "agg3"}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to add host")) + }) + }) + + Context("when removing host fails", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + + fakeServer.Mux.HandleFunc("POST /os-aggregates/1/action", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusConflict) + fmt.Fprint(w, `{"conflictingRequest": {"message": "Cannot remove host", "code": 409}}`) + }) + }) + + It("should return an error", func() { + serviceClient := client.ServiceClient(fakeServer) + _, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg2"}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to remove host")) + }) + }) + + Context("when host not in any aggregates", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + + fakeServer.Mux.HandleFunc("POST /os-aggregates/3/action", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateAddHostResponse) + }) + }) + + It("should add new host to aggregate", func() { + serviceClient := client.ServiceClient(fakeServer) + uuids, err := ApplyAggregates(ctx, serviceClient, "new-host", []string{"agg3"}) + Expect(err).NotTo(HaveOccurred()) + Expect(uuids).To(ConsistOf("uuid-agg3")) + }) + }) + + Context("when both adding and removing fail", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + + // Add to agg3 fails + fakeServer.Mux.HandleFunc("POST /os-aggregates/3/action", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusConflict) + fmt.Fprint(w, `{"conflictingRequest": {"message": "Cannot add host", "code": 409}}`) + }) + + // Remove from agg1 fails + fakeServer.Mux.HandleFunc("POST /os-aggregates/1/action", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusConflict) + fmt.Fprint(w, `{"conflictingRequest": {"message": "Cannot remove host", "code": 409}}`) + }) + }) + + It("should return combined errors", func() { + serviceClient := client.ServiceClient(fakeServer) + _, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg2", "agg3"}) + Expect(err).To(HaveOccurred()) + // Verify it's a joined error with multiple failures + Expect(err.Error()).To(And(ContainSubstring("failed to add host"), ContainSubstring("failed to remove host"))) + }) + }) + + Context("when desired aggregate does not exist", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + }) + + It("should return an error about missing aggregate", func() { + serviceClient := client.ServiceClient(fakeServer) + _, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1", "agg2", "agg4"}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("aggregates not found")) + Expect(err.Error()).To(ContainSubstring("agg4")) + }) + }) + + Context("when multiple desired aggregates do not exist", func() { + BeforeEach(func() { + fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, aggregateListWithHost) + }) + }) + + It("should return an error listing all missing aggregates", func() { + serviceClient := client.ServiceClient(fakeServer) + _, err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg4", "agg5", "agg6"}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("aggregates not found")) + // Check that all missing aggregates are mentioned in the error + Expect(err.Error()).To(ContainSubstring("agg4")) + Expect(err.Error()).To(ContainSubstring("agg5")) + Expect(err.Error()).To(ContainSubstring("agg6")) + }) + }) +}) diff --git a/internal/openstack/suite_test.go b/internal/openstack/suite_test.go new file mode 100644 index 0000000..8a41f2b --- /dev/null +++ b/internal/openstack/suite_test.go @@ -0,0 +1,30 @@ +/* +SPDX-FileCopyrightText: Copyright 2025 SAP SE or an SAP affiliate company and cobaltcore-dev contributors +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package openstack + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestOpenstack(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Openstack Suite") +}