Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"path/filepath"
"time"

crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"

intController "github.com/splunk/splunk-operator/internal/controller"
Expand Down Expand Up @@ -55,6 +56,7 @@ import (
"github.com/splunk/splunk-operator/internal/controller"

cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
pgmetrics "github.com/splunk/splunk-operator/pkg/postgresql/metrics"
//+kubebuilder:scaffold:imports
//extapi "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
)
Expand Down Expand Up @@ -282,18 +284,29 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "Telemetry")
os.Exit(1)
}
pgRecorder := pgmetrics.NewPrometheusRecorder()
if err := pgmetrics.Register(crmetrics.Registry); err != nil {
setupLog.Error(err, "unable to register PostgreSQL metrics")
os.Exit(1)
}
pgFleetCollector := pgmetrics.NewFleetCollector()

if err := (&controller.PostgresDatabaseReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("postgresdatabase-controller"),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("postgresdatabase-controller"),
Metrics: pgRecorder,
FleetCollector: pgFleetCollector,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PostgresDatabase")
os.Exit(1)
}
if err := (&controller.PostgresClusterReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("postgrescluster-controller"),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("postgrescluster-controller"),
Metrics: pgRecorder,
FleetCollector: pgFleetCollector,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PostgresCluster")
os.Exit(1)
Expand Down
13 changes: 9 additions & 4 deletions internal/controller/postgrescluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
enterprisev4 "github.com/splunk/splunk-operator/api/v4"
clustercore "github.com/splunk/splunk-operator/pkg/postgresql/cluster/core"
pgmetrics "github.com/splunk/splunk-operator/pkg/postgresql/metrics"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -42,8 +43,10 @@ const (
// PostgresClusterReconciler reconciles PostgresCluster resources.
type PostgresClusterReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
Scheme *runtime.Scheme
Recorder record.EventRecorder
Metrics pgmetrics.Recorder
FleetCollector *pgmetrics.FleetCollector
}

// +kubebuilder:rbac:groups=enterprise.splunk.com,resources=postgresclusters,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -57,8 +60,10 @@ type PostgresClusterReconciler struct {
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch

func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
rc := &clustercore.ReconcileContext{Client: r.Client, Scheme: r.Scheme, Recorder: r.Recorder}
return clustercore.PostgresClusterService(ctx, rc, req)
rc := &clustercore.ReconcileContext{Client: r.Client, Scheme: r.Scheme, Recorder: r.Recorder, Metrics: r.Metrics}
result, err := clustercore.PostgresClusterService(ctx, rc, req)
r.FleetCollector.CollectClusterMetrics(ctx, r.Client, r.Metrics)
return result, err
}

// SetupWithManager registers the controller and owned resource watches.
Expand Down
14 changes: 10 additions & 4 deletions internal/controller/postgresdatabase_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
enterprisev4 "github.com/splunk/splunk-operator/api/v4"
dbadapter "github.com/splunk/splunk-operator/pkg/postgresql/database/adapter"
dbcore "github.com/splunk/splunk-operator/pkg/postgresql/database/core"
pgmetrics "github.com/splunk/splunk-operator/pkg/postgresql/metrics"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -42,8 +43,10 @@ import (
// PostgresDatabaseReconciler reconciles a PostgresDatabase object.
type PostgresDatabaseReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
Scheme *runtime.Scheme
Recorder record.EventRecorder
Metrics pgmetrics.Recorder
FleetCollector *pgmetrics.FleetCollector
}

const (
Expand Down Expand Up @@ -71,8 +74,11 @@ func (r *PostgresDatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Req
}
return ctrl.Result{}, err
}
rc := &dbcore.ReconcileContext{Client: r.Client, Scheme: r.Scheme, Recorder: r.Recorder}
return dbcore.PostgresDatabaseService(ctx, rc, postgresDB, dbadapter.NewDBRepository)
rc := &dbcore.ReconcileContext{Client: r.Client, Scheme: r.Scheme, Recorder: r.Recorder, Metrics: r.Metrics}
result, err := dbcore.PostgresDatabaseService(ctx, rc, postgresDB, dbadapter.NewDBRepository)
r.FleetCollector.CollectDatabaseMetrics(ctx, r.Client, r.Metrics)

return result, err
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
21 changes: 12 additions & 9 deletions pkg/postgresql/cluster/core/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
password "github.com/sethvargo/go-password/password"
enterprisev4 "github.com/splunk/splunk-operator/api/v4"
pgmetrics "github.com/splunk/splunk-operator/pkg/postgresql/metrics"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -67,7 +68,7 @@ func PostgresClusterService(ctx context.Context, rc *ReconcileContext, req ctrl.
ctx = log.IntoContext(ctx, logger)

updateStatus := func(conditionType conditionTypes, status metav1.ConditionStatus, reason conditionReasons, message string, phase reconcileClusterPhases) error {
return setStatus(ctx, c, postgresCluster, conditionType, status, reason, message, phase)
return setStatus(ctx, c, rc.Metrics, postgresCluster, conditionType, status, reason, message, phase)
}

// Finalizer handling must come before any other processing.
Expand Down Expand Up @@ -384,7 +385,7 @@ func PostgresClusterService(ctx context.Context, rc *ReconcileContext, req ctrl.
default:
oldConditions := make([]metav1.Condition, len(postgresCluster.Status.Conditions))
copy(oldConditions, postgresCluster.Status.Conditions)
if err := syncPoolerStatus(ctx, c, postgresCluster); err != nil {
if err := syncPoolerStatus(ctx, c, rc.Metrics, postgresCluster); err != nil {
logger.Error(err, "Failed to sync pooler status")
rc.emitWarning(postgresCluster, EventPoolerReconcileFailed, fmt.Sprintf("Failed to sync pooler status: %v", err))
if statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerReconciliationFailed,
Expand Down Expand Up @@ -450,7 +451,7 @@ func PostgresClusterService(ctx context.Context, rc *ReconcileContext, req ctrl.
if postgresCluster.Status.Phase != nil {
oldPhase = *postgresCluster.Status.Phase
}
if err := syncStatus(ctx, c, postgresCluster, cnpgCluster); err != nil {
if err := syncStatus(ctx, c, rc.Metrics, postgresCluster, cnpgCluster); err != nil {
logger.Error(err, "Failed to sync status")
if apierrors.IsConflict(err) {
logger.Info("Conflict during status update, will requeue")
Expand Down Expand Up @@ -478,7 +479,7 @@ func PostgresClusterService(ctx context.Context, rc *ReconcileContext, req ctrl.
logger.Info("Poolers ready, syncing status")
poolerOldConditions := make([]metav1.Condition, len(postgresCluster.Status.Conditions))
copy(poolerOldConditions, postgresCluster.Status.Conditions)
_ = syncPoolerStatus(ctx, c, postgresCluster)
_ = syncPoolerStatus(ctx, c, rc.Metrics, postgresCluster)
rc.emitPoolerReadyTransition(postgresCluster, poolerOldConditions)
}
}
Expand Down Expand Up @@ -756,7 +757,7 @@ func deleteConnectionPoolers(ctx context.Context, c client.Client, cluster *ente
}

// syncPoolerStatus populates ConnectionPoolerStatus and the PoolerReady condition.
func syncPoolerStatus(ctx context.Context, c client.Client, cluster *enterprisev4.PostgresCluster) error {
func syncPoolerStatus(ctx context.Context, c client.Client, metrics pgmetrics.Recorder, cluster *enterprisev4.PostgresCluster) error {
rwPooler := &cnpgv1.Pooler{}
if err := c.Get(ctx, types.NamespacedName{
Name: poolerResourceName(cluster.Name, readWriteEndpoint),
Expand All @@ -777,13 +778,13 @@ func syncPoolerStatus(ctx context.Context, c client.Client, cluster *enterprisev
rwDesired, rwScheduled := poolerInstanceCount(rwPooler)
roDesired, roScheduled := poolerInstanceCount(roPooler)

return setStatus(ctx, c, cluster, poolerReady, metav1.ConditionTrue, reasonAllInstancesReady,
return setStatus(ctx, c, metrics, cluster, poolerReady, metav1.ConditionTrue, reasonAllInstancesReady,
fmt.Sprintf("%s: %d/%d, %s: %d/%d", readWriteEndpoint, rwScheduled, rwDesired, readOnlyEndpoint, roScheduled, roDesired),
readyClusterPhase)
}

// syncStatus maps CNPG Cluster state to PostgresCluster status.
func syncStatus(ctx context.Context, c client.Client, cluster *enterprisev4.PostgresCluster, cnpgCluster *cnpgv1.Cluster) error {
func syncStatus(ctx context.Context, c client.Client, metrics pgmetrics.Recorder, cluster *enterprisev4.PostgresCluster, cnpgCluster *cnpgv1.Cluster) error {
cluster.Status.ProvisionerRef = &corev1.ObjectReference{
APIVersion: "postgresql.cnpg.io/v1",
Kind: "Cluster",
Expand Down Expand Up @@ -836,13 +837,13 @@ func syncStatus(ctx context.Context, c client.Client, cluster *enterprisev4.Post
message = fmt.Sprintf("CNPG cluster phase: %s", cnpgCluster.Status.Phase)
}

return setStatus(ctx, c, cluster, clusterReady, condStatus, reason, message, phase)
return setStatus(ctx, c, metrics, cluster, clusterReady, condStatus, reason, message, phase)
}

// setStatus sets the phase, condition and persists the status.
// It skips the API write when the resulting status is identical to the current
// state, avoiding unnecessary etcd churn and ResourceVersion bumps on stable clusters.
func setStatus(ctx context.Context, c client.Client, cluster *enterprisev4.PostgresCluster, condType conditionTypes, status metav1.ConditionStatus, reason conditionReasons, message string, phase reconcileClusterPhases) error {
func setStatus(ctx context.Context, c client.Client, metrics pgmetrics.Recorder, cluster *enterprisev4.PostgresCluster, condType conditionTypes, status metav1.ConditionStatus, reason conditionReasons, message string, phase reconcileClusterPhases) error {
before := cluster.Status.DeepCopy()

p := string(phase)
Expand All @@ -859,6 +860,8 @@ func setStatus(ctx context.Context, c client.Client, cluster *enterprisev4.Postg
return nil
}

metrics.IncStatusTransition(pgmetrics.ControllerCluster, string(condType), string(status), string(reason))

if err := c.Status().Update(ctx, cluster); err != nil {
return fmt.Errorf("failed to update PostgresCluster status: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/postgresql/cluster/core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

enterprisev4 "github.com/splunk/splunk-operator/api/v4"
pgmetrics "github.com/splunk/splunk-operator/pkg/postgresql/metrics"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
Expand All @@ -17,6 +18,7 @@ type ReconcileContext struct {
Client client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
Metrics pgmetrics.Recorder
}

// normalizedCNPGClusterSpec is a subset of cnpgv1.ClusterSpec fields used for drift detection.
Expand Down
14 changes: 12 additions & 2 deletions pkg/postgresql/database/core/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/sethvargo/go-password/password"
enterprisev4 "github.com/splunk/splunk-operator/api/v4"
pgmetrics "github.com/splunk/splunk-operator/pkg/postgresql/metrics"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -43,7 +44,7 @@ func PostgresDatabaseService(
logger.Info("Reconciling PostgresDatabase")

updateStatus := func(conditionType conditionTypes, conditionStatus metav1.ConditionStatus, reason conditionReasons, message string, phase reconcileDBPhases) error {
return persistStatus(ctx, c, postgresDB, conditionType, conditionStatus, reason, message, phase)
return persistStatus(ctx, c, rc.Metrics, postgresDB, conditionType, conditionStatus, reason, message, phase)
}

// Finalizer: cleanup on deletion, register on creation.
Expand Down Expand Up @@ -186,6 +187,10 @@ func PostgresDatabaseService(
if err := patchManagedRoles(ctx, c, postgresDB, cluster); err != nil {
logger.Error(err, "Failed to patch users in CNPG Cluster")
rc.emitWarning(postgresDB, EventManagedRolesPatchFailed, fmt.Sprintf("Failed to patch managed roles: %v", err))
if statusErr := updateStatus(rolesReady, metav1.ConditionFalse, reasonUsersCreationFailed,
fmt.Sprintf("Failed to patch managed roles: %v", err), failedDBPhase); statusErr != nil {
logger.Error(statusErr, "Failed to update status")
}
return ctrl.Result{}, err
}
rc.emitNormal(postgresDB, EventRoleReconciliationStarted, fmt.Sprintf("Patched managed roles, waiting for %d roles to reconcile", len(desiredUsers)))
Expand Down Expand Up @@ -223,6 +228,10 @@ func PostgresDatabaseService(
if err != nil {
logger.Error(err, "Failed to reconcile CNPG Databases")
rc.emitWarning(postgresDB, EventDatabasesReconcileFailed, fmt.Sprintf("Failed to reconcile databases: %v", err))
if statusErr := updateStatus(databasesReady, metav1.ConditionFalse, reasonDatabaseReconcileFailed,
fmt.Sprintf("Failed to reconcile databases: %v", err), failedDBPhase); statusErr != nil {
logger.Error(statusErr, "Failed to update status")
}
return ctrl.Result{}, err
}
if len(adopted) > 0 {
Expand Down Expand Up @@ -493,8 +502,9 @@ func verifyDatabasesReady(ctx context.Context, c client.Client, postgresDB *ente
return notReady, nil
}

func persistStatus(ctx context.Context, c client.Client, db *enterprisev4.PostgresDatabase, conditionType conditionTypes, conditionStatus metav1.ConditionStatus, reason conditionReasons, message string, phase reconcileDBPhases) error {
func persistStatus(ctx context.Context, c client.Client, metrics pgmetrics.Recorder, db *enterprisev4.PostgresDatabase, conditionType conditionTypes, conditionStatus metav1.ConditionStatus, reason conditionReasons, message string, phase reconcileDBPhases) error {
applyStatus(db, conditionType, conditionStatus, reason, message, phase)
metrics.IncStatusTransition(pgmetrics.ControllerDatabase, string(conditionType), string(conditionStatus), string(reason))
return c.Status().Update(ctx, db)
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/postgresql/database/core/database_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
enterprisev4 "github.com/splunk/splunk-operator/api/v4"
pgmetrics "github.com/splunk/splunk-operator/pkg/postgresql/metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -306,7 +307,7 @@ func TestVerifyRolesReady(t *testing.T) {
},
},
},
wantErr: "user main_db_rw reconciliation failed: [reserved role]",
wantErr: "reconciling user main_db_rw: [reserved role]",
},
{
name: "returns missing roles that are not reconciled yet",
Expand Down Expand Up @@ -591,6 +592,7 @@ func TestSetStatus(t *testing.T) {
err := persistStatus(
context.Background(),
c,
&pgmetrics.NoopRecorder{},
postgresDB,
clusterReady,
metav1.ConditionTrue,
Expand Down
11 changes: 7 additions & 4 deletions pkg/postgresql/database/core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ import (
"time"

enterprisev4 "github.com/splunk/splunk-operator/api/v4"
pgmetrics "github.com/splunk/splunk-operator/pkg/postgresql/metrics"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// ReconcileContext bundles infrastructure dependencies injected by the controller
// ReconcileContext bundles infrastructure dependencies injected by the controller.
type ReconcileContext struct {
Client client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
Metrics pgmetrics.Recorder
}

type reconcileDBPhases string
Expand Down Expand Up @@ -76,9 +78,10 @@ const (
reasonUsersAvailable conditionReasons = "UsersAvailable"
reasonRoleConflict conditionReasons = "RoleConflict"
reasonConfigMapsCreationFailed conditionReasons = "ConfigMapsCreationFailed"
reasonConfigMapsCreated conditionReasons = "ConfigMapsCreated"
reasonPrivilegesGranted conditionReasons = "PrivilegesGranted"
reasonPrivilegesGrantFailed conditionReasons = "PrivilegesGrantFailed"
reasonConfigMapsCreated conditionReasons = "ConfigMapsCreated"
reasonDatabaseReconcileFailed conditionReasons = "DatabaseReconcileFailed"
reasonPrivilegesGranted conditionReasons = "PrivilegesGranted"
reasonPrivilegesGrantFailed conditionReasons = "PrivilegesGrantFailed"

// ClusterReady sentinel values returned by ensureClusterReady.
// Exported so the controller adapter can switch on them if needed.
Expand Down
Loading
Loading