Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/controller/apiserver/apiserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"
Expand Down Expand Up @@ -185,11 +186,13 @@ func Add(mgr manager.Manager, opts options.ControllerOptions) error {

// Watch DatastoreMigration CRs so the apiserver controller reacts promptly
// to migration phase changes (e.g., goes hands-off during Migrating).
// Uses ResourceVersionChangedPredicate because migration phase transitions
// are status-only updates that don't bump generation.
go utils.WaitToAddResourceWatch(c, opts.K8sClientset, log, r.migrationWatchReady, []client.Object{
&datastoremigration.DatastoreMigration{
TypeMeta: metav1.TypeMeta{Kind: "DatastoreMigration", APIVersion: "migration.projectcalico.org/v1beta1"},
},
})
}, predicate.ResourceVersionChangedPredicate{})

log.V(5).Info("Controller created and Watches setup")
return nil
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/installation/core_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,13 @@ func Add(mgr manager.Manager, opts options.ControllerOptions) error {

// Watch DatastoreMigration CRs so the installation controller re-reconciles when
// migration state changes (e.g., Converged → triggers env var injection on components).
// This is a deferred watch since the CRD may not be installed.
// Uses ResourceVersionChangedPredicate because migration phase transitions
// are status-only updates that don't bump generation.
go utils.WaitToAddResourceWatch(c, opts.K8sClientset, log, ri.migrationWatchReady, []client.Object{
&datastoremigration.DatastoreMigration{
TypeMeta: metav1.TypeMeta{Kind: "DatastoreMigration", APIVersion: "migration.projectcalico.org/v1beta1"},
},
})
}, predicate.ResourceVersionChangedPredicate{})

return nil
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/controller/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,13 +683,20 @@ type resourceWatchContext struct {
}

// WaitToAddResourceWatch will check if the required CRD APIs are available and if so, it will add a watch for the
// resource. The completion of this operation will be signaled on a ready channel
func WaitToAddResourceWatch(controller ctrlruntime.Controller, c kubernetes.Interface, log logr.Logger, flag *ReadyFlag, objs []client.Object) {
// resource. The completion of this operation will be signaled on a ready channel.
// An optional predicate can be provided to override the default generation-based predicate for all
// watched objects. This is useful for resources whose meaningful changes are status-only updates
// that don't bump generation (e.g., DatastoreMigration phase transitions).
func WaitToAddResourceWatch(controller ctrlruntime.Controller, c kubernetes.Interface, log logr.Logger, flag *ReadyFlag, objs []client.Object, predicates ...predicate.Predicate) {
// Track resources left to watch and establish their watch context.
resourcesToWatch := map[client.Object]resourceWatchContext{}
for _, obj := range objs {
pred := createPredicateForObject(obj)
if len(predicates) > 0 {
pred = predicate.And(predicates...)
}
resourcesToWatch[obj] = resourceWatchContext{
predicate: createPredicateForObject(obj),
predicate: pred,
logger: ContextLoggerForResource(log, obj),
}
}
Expand Down
116 changes: 116 additions & 0 deletions pkg/controller/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ import (
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"

Expand Down Expand Up @@ -328,6 +332,32 @@ func (m *fakeDiscovery) ServerResourcesForGroupVersion(groupVersion string) (*me
return args.Get(0).(*metav1.APIResourceList), nil
}

// mockController implements ctrlruntime.Controller for testing watch functions.
type mockController struct {
mock.Mock
}

func (m *mockController) WatchObject(obj client.Object, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error {
args := m.Called(obj, eventhandler, predicates)
return args.Error(0)
}

func (m *mockController) Reconcile(_ context.Context, _ reconcile.Request) (reconcile.Result, error) {
return reconcile.Result{}, nil
}

func (m *mockController) Watch(_ source.Source) error {
return nil
}

func (m *mockController) Start(_ context.Context) error {
return nil
}

func (m *mockController) GetLogger() logr.Logger {
return logr.Discard()
}

var _ = Describe("CreatePredicateForObject", func() {
var objMeta metav1.Object

Expand Down Expand Up @@ -504,3 +534,89 @@ var _ = Describe("CreatePredicateForObject", func() {
&opv1.Authentication{Spec: opv1.AuthenticationSpec{OIDC: &opv1.AuthenticationOIDC{Type: opv1.OIDCTypeDex}}}, true),
)
})

var _ = Describe("WaitToAddResourceWatch with custom predicates", func() {
var (
ctrl *mockController
k8sClient fakeClient
disc *fakeDiscovery
log logr.Logger
obj client.Object
)

testGV := "test.example.com/v1"

BeforeEach(func() {
ctrl = new(mockController)
disc = new(fakeDiscovery)
k8sClient = fakeClient{discovery: disc}
log = logf.Log.WithName("resource-watch-test")

obj = &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
Kind: "TestResource",
APIVersion: testGV,
},
}
})

It("should use the provided predicate instead of the default", func() {
disc.On("ServerResourcesForGroupVersion", testGV).Return(&metav1.APIResourceList{
APIResources: []metav1.APIResource{{Kind: "TestResource"}},
})
ctrl.On("WatchObject", mock.Anything, mock.Anything, mock.Anything).Return(nil)

flag := &ReadyFlag{}
WaitToAddResourceWatch(ctrl, k8sClient, log, flag, []client.Object{obj}, predicate.ResourceVersionChangedPredicate{})

Expect(flag.IsReady()).To(BeTrue())

// Verify that WatchObject was called with a predicate (the custom one, not the default
// generation-based one). The custom predicate is wrapped via predicate.And(), so we
// just verify it was called and fires on resource version changes.
ctrl.AssertCalled(GinkgoT(), "WatchObject", mock.Anything, mock.Anything, mock.Anything)
})

It("should work with a nil flag", func() {
disc.On("ServerResourcesForGroupVersion", testGV).Return(&metav1.APIResourceList{
APIResources: []metav1.APIResource{{Kind: "TestResource"}},
})
ctrl.On("WatchObject", mock.Anything, mock.Anything, mock.Anything).Return(nil)

// Should not panic with nil flag.
WaitToAddResourceWatch(ctrl, k8sClient, log, nil, []client.Object{obj}, predicate.ResourceVersionChangedPredicate{})
ctrl.AssertExpectations(GinkgoT())
})

It("should retry when the CRD is not yet available", func() {
// First call: CRD not available. Second call: CRD available.
disc.On("ServerResourcesForGroupVersion", testGV).Return(&metav1.APIResourceList{
APIResources: []metav1.APIResource{{Kind: "SomethingElse"}},
}).Once()
disc.On("ServerResourcesForGroupVersion", testGV).Return(&metav1.APIResourceList{
APIResources: []metav1.APIResource{{Kind: "TestResource"}},
}).Once()
ctrl.On("WatchObject", mock.Anything, mock.Anything, mock.Anything).Return(nil)

flag := &ReadyFlag{}
WaitToAddResourceWatch(ctrl, k8sClient, log, flag, []client.Object{obj}, predicate.ResourceVersionChangedPredicate{})

Expect(flag.IsReady()).To(BeTrue())
disc.AssertNumberOfCalls(GinkgoT(), "ServerResourcesForGroupVersion", 2)
})

It("should retry when WatchObject fails", func() {
disc.On("ServerResourcesForGroupVersion", testGV).Return(&metav1.APIResourceList{
APIResources: []metav1.APIResource{{Kind: "TestResource"}},
})
// First WatchObject call fails, second succeeds.
ctrl.On("WatchObject", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("cache not started")).Once()
ctrl.On("WatchObject", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()

flag := &ReadyFlag{}
WaitToAddResourceWatch(ctrl, k8sClient, log, flag, []client.Object{obj}, predicate.ResourceVersionChangedPredicate{})

Expect(flag.IsReady()).To(BeTrue())
ctrl.AssertNumberOfCalls(GinkgoT(), "WatchObject", 2)
})
})
Loading