Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions api/v4/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 37 additions & 23 deletions internal/controller/postgresdatabase_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ package controller

import (
"context"
"reflect"

cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
enterprisev4 "github.com/splunk/splunk-operator/api/v4"
dbadapter "github.com/splunk/splunk-operator/pkg/postgresql/database/adapter"
dbcore "github.com/splunk/splunk-operator/pkg/postgresql/database/core"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -95,31 +95,45 @@ func (r *PostgresDatabaseReconciler) SetupWithManager(mgr ctrl.Manager) error {
return err
}
return ctrl.NewControllerManagedBy(mgr).
For(&enterprisev4.PostgresDatabase{}, builder.WithPredicates(
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
return !reflect.DeepEqual(
e.ObjectOld.GetFinalizers(),
e.ObjectNew.GetFinalizers(),
)
},
},
),
)).
Owns(&cnpgv1.Database{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(event.CreateEvent) bool { return false },
})).
Owns(&corev1.Secret{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(event.CreateEvent) bool { return false },
})).
Owns(&corev1.ConfigMap{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(event.CreateEvent) bool { return false },
})).
WithEventFilter(predicate.Funcs{GenericFunc: func(event.GenericEvent) bool { return false }}).
For(&enterprisev4.PostgresDatabase{}, builder.WithPredicates(postgresDatabasePredicator())).
Owns(&cnpgv1.Database{}, builder.WithPredicates(postgresDatabaseCNPGDatabasePredicator())).
Owns(&corev1.Secret{}, builder.WithPredicates(predicate.ResourceVersionChangedPredicate{})).
Owns(&corev1.ConfigMap{}, builder.WithPredicates(predicate.ResourceVersionChangedPredicate{})).
Named("postgresdatabase").
WithOptions(controller.Options{
MaxConcurrentReconciles: DatabaseTotalWorker,
}).
Complete(r)
}

func postgresDatabasePredicator() predicate.Predicate {
return predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
if !equality.Semantic.DeepEqual(e.ObjectOld.GetDeletionTimestamp(), e.ObjectNew.GetDeletionTimestamp()) {
return true
}
return !equality.Semantic.DeepEqual(e.ObjectOld.GetFinalizers(), e.ObjectNew.GetFinalizers())
},
},
)
}

func postgresDatabaseCNPGDatabasePredicator() predicate.Predicate {
return predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
oldObj, okOld := e.ObjectOld.(*cnpgv1.Database)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the difference between this predicate and GenerationChangedPredicate?

Copy link
Copy Markdown
Author

@DmytroPI-dev DmytroPI-dev Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If my understanding is correct, GenerationChangedPredicate is triggered only when CNPG Database spec changes. If we have status-only update, generation might not always change. Owner reference changes on the owned CNPG Database would also stop triggering reconciliation.

newObj, okNew := e.ObjectNew.(*cnpgv1.Database)
if !okOld || !okNew {
return true
}
return !equality.Semantic.DeepEqual(oldObj.Status.Applied, newObj.Status.Applied) ||
ownerReferencesChanged(oldObj, newObj)
},
},
)
}
201 changes: 199 additions & 2 deletions internal/controller/postgresdatabase_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

Expand Down Expand Up @@ -314,14 +316,20 @@ func seedConflictScenario(ctx context.Context, namespace, resourceName, clusterN
}

func seedOwnedDatabaseArtifacts(ctx context.Context, namespace, resourceName, clusterName string, postgresDB *enterprisev4.PostgresDatabase, dbNames ...string) {

ownerReferences := ownedByPostgresDatabase(postgresDB)

for _, dbName := range dbNames {
Expect(k8sClient.Create(ctx, &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: adminSecretNameForTest(resourceName, dbName),
Namespace: namespace,
OwnerReferences: ownerReferences,
},
Data: map[string][]byte{
"username": []byte(adminRoleNameForTest(dbName)),
"password": []byte("test-password"),
},
})).To(Succeed())

Expect(k8sClient.Create(ctx, &corev1.Secret{
Expand All @@ -330,6 +338,10 @@ func seedOwnedDatabaseArtifacts(ctx context.Context, namespace, resourceName, cl
Namespace: namespace,
OwnerReferences: ownerReferences,
},
Data: map[string][]byte{
"username": []byte(rwRoleNameForTest(dbName)),
"password": []byte("test-password"),
},
})).To(Succeed())

Expect(k8sClient.Create(ctx, &corev1.ConfigMap{
Expand Down Expand Up @@ -389,14 +401,41 @@ func expectStatusCondition(current *enterprisev4.PostgresDatabase, conditionType

func expectReadyStatus(current *enterprisev4.PostgresDatabase, generation int64, expectedDatabase enterprisev4.DatabaseInfo) {
expectStatusPhase(current, phaseReady)
Expect(current.Status.ObservedGeneration).NotTo(BeNil())
Expect(*current.Status.ObservedGeneration).To(Equal(generation))
Expect(current.Status.Databases).To(HaveLen(1))
Expect(current.Status.Databases[0].Name).To(Equal(expectedDatabase.Name))
Expect(current.Status.Databases[0].Ready).To(Equal(expectedDatabase.Ready))
Expect(current.Status.Databases[0].AdminUserSecretRef).NotTo(BeNil())
Expect(current.Status.Databases[0].RWUserSecretRef).NotTo(BeNil())
Expect(current.Status.Databases[0].ConfigMapRef).NotTo(BeNil())
Expect(current.Status.ObservedGeneration).NotTo(BeNil())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that supposed to be removed from the general logic?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we removed observedGeneration to act as a database readiness check, but left it to have an idea know what is the CRD version status was emitted for. See here

Expect(*current.Status.ObservedGeneration).To(Equal(generation))
}

func reconcilePostgresDatabaseToReady(ctx context.Context, scenario readyClusterScenario, poolerEnabled bool) *enterprisev4.PostgresDatabase {
seedReadyClusterScenario(ctx, scenario, poolerEnabled)

result, err := reconcilePostgresDatabase(ctx, scenario.requestName)
expectEmptyReconcileResult(result, err)

current := expectFinalizerAdded(ctx, scenario.requestName)
seedExistingDatabaseStatus(ctx, current, scenario.dbName)

result, err = reconcilePostgresDatabase(ctx, scenario.requestName)
expectReconcileResult(result, err, 15*time.Second)
expectProvisionedArtifacts(ctx, scenario, current)
expectManagedRolesPatched(ctx, scenario)

result, err = reconcilePostgresDatabase(ctx, scenario.requestName)
expectReconcileResult(result, err, 15*time.Second)
cnpgDatabase := expectCNPGDatabaseCreated(ctx, scenario, current)
markCNPGDatabaseApplied(ctx, cnpgDatabase)

result, err = reconcilePostgresDatabase(ctx, scenario.requestName)
expectEmptyReconcileResult(result, err)

current = fetchPostgresDatabase(ctx, scenario.requestName)
expectReadyStatus(current, current.Generation, enterprisev4.DatabaseInfo{Name: scenario.dbName, Ready: true})
return current
}

var _ = Describe("PostgresDatabase Controller", Label("postgres"), func() {
Expand Down Expand Up @@ -501,6 +540,164 @@ var _ = Describe("PostgresDatabase Controller", Label("postgres"), func() {
})
})

When("owned resource drift occurs after the PostgresDatabase is ready", func() {
It("repairs configmap content drift", func() {
scenario := newReadyClusterScenario(namespace, "configmap-drift", "tenant-cluster", "tenant-cnpg", "appdb")
owner := reconcilePostgresDatabaseToReady(ctx, scenario, false)

configMap := &corev1.ConfigMap{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: fmt.Sprintf("%s-%s-config", scenario.resourceName, scenario.dbName), Namespace: scenario.namespace}, configMap)).To(Succeed())
configMap.Data["rw-host"] = "unexpected.example"
Expect(k8sClient.Update(ctx, configMap)).To(Succeed())

result, err := reconcilePostgresDatabase(ctx, scenario.requestName)
expectEmptyReconcileResult(result, err)

Expect(k8sClient.Get(ctx, types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace}, configMap)).To(Succeed())
Expect(configMap.Data).To(HaveKeyWithValue("rw-host", "tenant-rw."+scenario.namespace+".svc.cluster.local"))

current := fetchPostgresDatabase(ctx, scenario.requestName)
expectReadyStatus(current, current.Generation, enterprisev4.DatabaseInfo{Name: scenario.dbName, Ready: true})
Expect(metav1.IsControlledBy(configMap, owner)).To(BeTrue())
})

It("recreates a deleted configmap", func() {
scenario := newReadyClusterScenario(namespace, "configmap-delete", "tenant-cluster", "tenant-cnpg", "appdb")
reconcilePostgresDatabaseToReady(ctx, scenario, false)

configMapName := fmt.Sprintf("%s-%s-config", scenario.resourceName, scenario.dbName)
Expect(k8sClient.Delete(ctx, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: configMapName, Namespace: scenario.namespace},
})).To(Succeed())

result, err := reconcilePostgresDatabase(ctx, scenario.requestName)
expectEmptyReconcileResult(result, err)

configMap := &corev1.ConfigMap{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: configMapName, Namespace: scenario.namespace}, configMap)).To(Succeed())
Expect(configMap.Data).To(HaveKeyWithValue("rw-host", "tenant-rw."+scenario.namespace+".svc.cluster.local"))
})

It("does not recreate a deleted managed user secret", func() {
scenario := newReadyClusterScenario(namespace, "secret-delete", "tenant-cluster", "tenant-cnpg", "appdb")
reconcilePostgresDatabaseToReady(ctx, scenario, false)

secretName := fmt.Sprintf("%s-%s-admin", scenario.resourceName, scenario.dbName)
Expect(k8sClient.Delete(ctx, &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Name: secretName, Namespace: scenario.namespace},
})).To(Succeed())

result, err := reconcilePostgresDatabase(ctx, scenario.requestName)
expectReconcileResult(result, err, 15*time.Second)

current := fetchPostgresDatabase(ctx, scenario.requestName)
expectStatusPhase(current, "Provisioning")
expectStatusCondition(current, "SecretsReady", metav1.ConditionFalse, "SecretsDriftDetected")

missing := &corev1.Secret{}
err = k8sClient.Get(ctx, types.NamespacedName{Name: secretName, Namespace: scenario.namespace}, missing)
Expect(apierrors.IsNotFound(err)).To(BeTrue())
})

It("re-attaches ownership when a managed user secret loses its owner reference", func() {
scenario := newReadyClusterScenario(namespace, "secret-adopt", "tenant-cluster", "tenant-cnpg", "appdb")
owner := reconcilePostgresDatabaseToReady(ctx, scenario, false)

secret := &corev1.Secret{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: fmt.Sprintf("%s-%s-admin", scenario.resourceName, scenario.dbName), Namespace: scenario.namespace}, secret)).To(Succeed())
secret.OwnerReferences = nil
Expect(k8sClient.Update(ctx, secret)).To(Succeed())

result, err := reconcilePostgresDatabase(ctx, scenario.requestName)
expectEmptyReconcileResult(result, err)

Expect(k8sClient.Get(ctx, types.NamespacedName{Name: secret.Name, Namespace: secret.Namespace}, secret)).To(Succeed())
Expect(metav1.IsControlledBy(secret, owner)).To(BeTrue())

current := fetchPostgresDatabase(ctx, scenario.requestName)
expectReadyStatus(current, current.Generation, enterprisev4.DatabaseInfo{Name: scenario.dbName, Ready: true})
})

It("creates secrets and configmaps for a newly added database while preserving existing ones", func() {
scenario := newReadyClusterScenario(namespace, "new-database", "tenant-cluster", "tenant-cnpg", "appdb")
current := reconcilePostgresDatabaseToReady(ctx, scenario, false)

current.Spec.Databases = append(current.Spec.Databases, enterprisev4.DatabaseDefinition{Name: "analytics"})
Expect(k8sClient.Update(ctx, current)).To(Succeed())

result, err := reconcilePostgresDatabase(ctx, scenario.requestName)
expectReconcileResult(result, err, 15*time.Second)

for _, secretName := range []string{
fmt.Sprintf("%s-analytics-admin", scenario.resourceName),
fmt.Sprintf("%s-analytics-rw", scenario.resourceName),
} {
secret := &corev1.Secret{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: secretName, Namespace: scenario.namespace}, secret)).To(Succeed())
}

configMap := &corev1.ConfigMap{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: fmt.Sprintf("%s-analytics-config", scenario.resourceName), Namespace: scenario.namespace}, configMap)).To(Succeed())
Expect(configMap.Data).To(HaveKeyWithValue("dbname", "analytics"))

existingSecret := &corev1.Secret{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: fmt.Sprintf("%s-%s-admin", scenario.resourceName, scenario.dbName), Namespace: scenario.namespace}, existingSecret)).To(Succeed())
})
})

When("postgresdatabase secondary-resource predicates run", func() {
It("treats cnpg database applied-state, create, and delete changes as drift triggers", func() {
pred := postgresDatabaseCNPGDatabasePredicator()

oldApplied := true
newApplied := false
Expect(pred.Create(event.CreateEvent{})).To(BeTrue())
Expect(pred.Update(event.UpdateEvent{
ObjectOld: &cnpgv1.Database{Status: cnpgv1.DatabaseStatus{Applied: &oldApplied}},
ObjectNew: &cnpgv1.Database{Status: cnpgv1.DatabaseStatus{Applied: &newApplied}},
})).To(BeTrue())
Expect(pred.Delete(event.DeleteEvent{})).To(BeTrue())
})

It("ignores cnpg database updates that do not change readiness or ownership", func() {
pred := postgresDatabaseCNPGDatabasePredicator()

applied := true
Expect(pred.Update(event.UpdateEvent{
ObjectOld: &cnpgv1.Database{
ObjectMeta: metav1.ObjectMeta{Name: "db", Namespace: "test"},
Status: cnpgv1.DatabaseStatus{Applied: &applied},
},
ObjectNew: &cnpgv1.Database{
ObjectMeta: metav1.ObjectMeta{Name: "db", Namespace: "test"},
Status: cnpgv1.DatabaseStatus{Applied: &applied},
},
})).To(BeFalse())
})

It("treats secret create, update, and delete events as drift triggers", func() {
pred := predicate.ResourceVersionChangedPredicate{}

Expect(pred.Create(event.CreateEvent{})).To(BeTrue())
Expect(pred.Update(event.UpdateEvent{
ObjectOld: &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "test", ResourceVersion: "1"}},
ObjectNew: &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "test", ResourceVersion: "2"}},
})).To(BeTrue())
Expect(pred.Delete(event.DeleteEvent{})).To(BeTrue())
})

It("treats configmap create, update, and delete events as drift triggers", func() {
pred := predicate.ResourceVersionChangedPredicate{}

Expect(pred.Create(event.CreateEvent{})).To(BeTrue())
Expect(pred.Update(event.UpdateEvent{
ObjectOld: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "config", Namespace: "test", ResourceVersion: "1"}},
ObjectNew: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "config", Namespace: "test", ResourceVersion: "2"}},
})).To(BeTrue())
Expect(pred.Delete(event.DeleteEvent{})).To(BeTrue())
})
})

When("role ownership conflicts exist", func() {
It("marks the resource failed and stops provisioning dependent resources", func() {
resourceName := "conflict-cluster"
Expand Down
2 changes: 1 addition & 1 deletion pkg/postgresql/cluster/core/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,11 +451,11 @@ func PostgresClusterService(ctx context.Context, rc *ReconcileContext, req ctrl.
oldPhase = *postgresCluster.Status.Phase
}
if err := syncStatus(ctx, c, postgresCluster, cnpgCluster); err != nil {
logger.Error(err, "Failed to sync status")
if apierrors.IsConflict(err) {
logger.Info("Conflict during status update, will requeue")
return ctrl.Result{Requeue: true}, nil
}
logger.Error(err, "Failed to sync status")
return ctrl.Result{}, fmt.Errorf("failed to sync status: %w", err)
}
var newPhase string
Expand Down
Loading
Loading