Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 38 additions & 7 deletions cmd/thv-operator/api/v1alpha1/mcpserver_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,18 @@ const (
ConditionReasonSessionStorageNotApplicable = "SessionStorageWarningNotApplicable"
)

// ConditionRateLimitConfigValid indicates whether the rate limit configuration is valid.
const ConditionRateLimitConfigValid = "RateLimitConfigValid"

const (
// ConditionReasonRateLimitConfigValid indicates the rate limit configuration is valid.
ConditionReasonRateLimitConfigValid = "RateLimitConfigValid"
// ConditionReasonRateLimitPerUserRequiresAuth indicates perUser rate limiting requires authentication.
ConditionReasonRateLimitPerUserRequiresAuth = "PerUserRequiresAuth"
// ConditionReasonRateLimitNotApplicable indicates rate limiting is not configured.
ConditionReasonRateLimitNotApplicable = "RateLimitNotApplicable"
)

// SessionStorageProviderRedis is the provider name for Redis-backed session storage.
const SessionStorageProviderRedis = "redis"

Expand All @@ -155,6 +167,8 @@ const SessionStorageProviderRedis = "redis"
// +kubebuilder:validation:XValidation:rule="!(has(self.oidcConfig) && has(self.oidcConfigRef))",message="oidcConfig and oidcConfigRef are mutually exclusive; use oidcConfigRef to reference a shared MCPOIDCConfig"
// +kubebuilder:validation:XValidation:rule="!(has(self.telemetry) && has(self.telemetryConfigRef))",message="telemetry and telemetryConfigRef are mutually exclusive; migrate to telemetryConfigRef"
// +kubebuilder:validation:XValidation:rule="!has(self.rateLimiting) || (has(self.sessionStorage) && self.sessionStorage.provider == 'redis')",message="rateLimiting requires sessionStorage with provider 'redis'"
// +kubebuilder:validation:XValidation:rule="!(has(self.rateLimiting) && has(self.rateLimiting.perUser)) || has(self.oidcConfig) || has(self.oidcConfigRef) || has(self.externalAuthConfigRef)",message="rateLimiting.perUser requires authentication (oidcConfig, oidcConfigRef, or externalAuthConfigRef)"
// +kubebuilder:validation:XValidation:rule="!has(self.rateLimiting) || !has(self.rateLimiting.tools) || self.rateLimiting.tools.all(t, !has(t.perUser)) || has(self.oidcConfig) || has(self.oidcConfigRef) || has(self.externalAuthConfigRef)",message="per-tool perUser rate limiting requires authentication (oidcConfig, oidcConfigRef, or externalAuthConfigRef)"
//
//nolint:lll // CEL validation rules exceed line length limit
type MCPServerSpec struct {
Expand Down Expand Up @@ -488,16 +502,23 @@ type SessionStorageConfig struct {
}

// RateLimitConfig defines rate limiting configuration for an MCP server.
// At least one of shared or tools must be configured.
// At least one of shared, perUser, or tools must be configured.
//
// +kubebuilder:validation:XValidation:rule="has(self.shared) || (has(self.tools) && size(self.tools) > 0)",message="at least one of shared or tools must be configured"
// +kubebuilder:validation:XValidation:rule="has(self.shared) || has(self.perUser) || (has(self.tools) && size(self.tools) > 0)",message="at least one of shared, perUser, or tools must be configured"
//
//nolint:lll // CEL validation rules exceed line length limit
type RateLimitConfig struct {
// Shared defines a token bucket shared across all users for the entire server.
// Shared is a token bucket shared across all users for the entire server.
// +optional
Shared *RateLimitBucket `json:"shared,omitempty"`

// PerUser is a token bucket applied independently to each authenticated user
// at the server level. Requires authentication to be enabled.
// Each unique userID creates Redis keys that expire after 2x refillPeriod.
// Memory formula: unique_users_per_TTL_window * (1 + num_tools_with_per_user_limits) keys.
// +optional
PerUser *RateLimitBucket `json:"perUser,omitempty"`

// Tools defines per-tool rate limit overrides.
// Each entry applies additional rate limits to calls targeting a specific tool name.
// A request must pass both the server-level limit and the per-tool limit.
Expand All @@ -507,7 +528,8 @@ type RateLimitConfig struct {
Tools []ToolRateLimitConfig `json:"tools,omitempty"`
}

// RateLimitBucket defines a token bucket configuration.
// RateLimitBucket defines a token bucket configuration with a maximum capacity
// and a refill period. Used by both shared (global) and per-user rate limits.
type RateLimitBucket struct {
// MaxTokens is the maximum number of tokens (bucket capacity).
// This is also the burst size: the maximum number of requests that can be served
Expand All @@ -524,15 +546,24 @@ type RateLimitBucket struct {
}

// ToolRateLimitConfig defines rate limits for a specific tool.
// At least one of shared or perUser must be configured.
//
// +kubebuilder:validation:XValidation:rule="has(self.shared) || has(self.perUser)",message="at least one of shared or perUser must be configured"
//
//nolint:lll // kubebuilder marker exceeds line length
type ToolRateLimitConfig struct {
// Name is the MCP tool name this limit applies to.
// +kubebuilder:validation:Required
// +kubebuilder:validation:MinLength=1
Name string `json:"name"`

// Shared defines a token bucket shared across all users for this specific tool.
// +kubebuilder:validation:Required
Shared *RateLimitBucket `json:"shared"`
// Shared token bucket for this specific tool.
// +optional
Shared *RateLimitBucket `json:"shared,omitempty"`

// PerUser token bucket configuration for this tool.
// +optional
PerUser *RateLimitBucket `json:"perUser,omitempty"`
}

// Permission profile types
Expand Down
10 changes: 10 additions & 0 deletions cmd/thv-operator/api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 57 additions & 1 deletion cmd/thv-operator/controllers/mcpserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,10 @@ func (r *MCPServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// Validate CABundleRef if specified
r.validateCABundleRef(ctx, mcpServer)

// Validate stdio replica cap and session storage requirements
// Validate stdio replica cap, session storage, and rate limit config
r.validateStdioReplicaCap(ctx, mcpServer)
r.validateSessionStorageForReplicas(ctx, mcpServer)
r.validateRateLimitConfig(ctx, mcpServer)

// Validate PodTemplateSpec early - before other validations
// This ensures we fail fast if the spec is invalid
Expand Down Expand Up @@ -2289,6 +2290,61 @@ func (r *MCPServerReconciler) validateSessionStorageForReplicas(ctx context.Cont
}
}

// setRateLimitConfigCondition sets the RateLimitConfigValid status condition.
func setRateLimitConfigCondition(mcpServer *mcpv1alpha1.MCPServer, status metav1.ConditionStatus, reason, message string) {
meta.SetStatusCondition(&mcpServer.Status.Conditions, metav1.Condition{
Type: mcpv1alpha1.ConditionRateLimitConfigValid,
Status: status,
Reason: reason,
Message: message,
ObservedGeneration: mcpServer.Generation,
})
}

// validateRateLimitConfig validates that per-user rate limiting has authentication enabled.
// Sets the RateLimitConfigValid condition. This is defense-in-depth only; CEL admission
// validation is the primary gate. Reconciliation continues even when the condition is False
// because per-user buckets are silently skipped when userID is empty (graceful degradation).
func (r *MCPServerReconciler) validateRateLimitConfig(ctx context.Context, mcpServer *mcpv1alpha1.MCPServer) {
rl := mcpServer.Spec.RateLimiting
if rl == nil {
setRateLimitConfigCondition(mcpServer, metav1.ConditionTrue,
mcpv1alpha1.ConditionReasonRateLimitNotApplicable,
"rate limiting is not configured")
if err := r.Status().Update(ctx, mcpServer); err != nil {
log.FromContext(ctx).Error(err, "Failed to update MCPServer status after rate limit validation")
}
return
}

authEnabled := mcpServer.Spec.OIDCConfig != nil ||
mcpServer.Spec.OIDCConfigRef != nil ||
mcpServer.Spec.ExternalAuthConfigRef != nil

hasPerUser := rl.PerUser != nil
if !hasPerUser {
for _, t := range rl.Tools {
if t.PerUser != nil {
hasPerUser = true
break
}
}
}

if hasPerUser && !authEnabled {
setRateLimitConfigCondition(mcpServer, metav1.ConditionFalse,
mcpv1alpha1.ConditionReasonRateLimitPerUserRequiresAuth,
"perUser rate limiting requires authentication to be enabled (oidcConfig, oidcConfigRef, or externalAuthConfigRef)")
} else {
setRateLimitConfigCondition(mcpServer, metav1.ConditionTrue,
mcpv1alpha1.ConditionReasonRateLimitConfigValid,
"rate limit configuration is valid")
}
if err := r.Status().Update(ctx, mcpServer); err != nil {
log.FromContext(ctx).Error(err, "Failed to update MCPServer status after rate limit validation")
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *MCPServerReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Create a handler that maps MCPExternalAuthConfig changes to MCPServer reconciliation requests
Expand Down
153 changes: 153 additions & 0 deletions cmd/thv-operator/controllers/mcpserver_replicas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,3 +979,156 @@ func TestUpdateMCPServerStatusExcludesTerminatingPods(t *testing.T) {
assert.Equal(t, int32(2), updatedMCPServer.Status.ReadyReplicas,
"ReadyReplicas should exclude terminating pods")
}

func TestRateLimitConfigValidation(t *testing.T) {
t.Parallel()

tests := []struct {
name string
spec mcpv1alpha1.MCPServerSpec
expectStatus metav1.ConditionStatus
expectReason string
}{
{
name: "no-rate-limiting",
spec: mcpv1alpha1.MCPServerSpec{
Image: "test-image:latest",
Transport: "sse",
ProxyPort: 8080,
},
expectStatus: metav1.ConditionTrue,
expectReason: mcpv1alpha1.ConditionReasonRateLimitNotApplicable,
},
{
name: "peruser-with-auth",
spec: mcpv1alpha1.MCPServerSpec{
Image: "test-image:latest",
Transport: "sse",
ProxyPort: 8080,
SessionStorage: &mcpv1alpha1.SessionStorageConfig{
Provider: mcpv1alpha1.SessionStorageProviderRedis,
Address: "redis:6379",
},
OIDCConfig: &mcpv1alpha1.OIDCConfigRef{Type: "kubernetes"},
RateLimiting: &mcpv1alpha1.RateLimitConfig{
PerUser: &mcpv1alpha1.RateLimitBucket{
MaxTokens: 100,
RefillPeriod: metav1.Duration{Duration: time.Minute},
},
},
},
expectStatus: metav1.ConditionTrue,
expectReason: mcpv1alpha1.ConditionReasonRateLimitConfigValid,
},
{
name: "peruser-without-auth",
spec: mcpv1alpha1.MCPServerSpec{
Image: "test-image:latest",
Transport: "sse",
ProxyPort: 8080,
SessionStorage: &mcpv1alpha1.SessionStorageConfig{
Provider: mcpv1alpha1.SessionStorageProviderRedis,
Address: "redis:6379",
},
RateLimiting: &mcpv1alpha1.RateLimitConfig{
PerUser: &mcpv1alpha1.RateLimitBucket{
MaxTokens: 100,
RefillPeriod: metav1.Duration{Duration: time.Minute},
},
},
},
expectStatus: metav1.ConditionFalse,
expectReason: mcpv1alpha1.ConditionReasonRateLimitPerUserRequiresAuth,
},
{
name: "per-tool-peruser-without-auth",
spec: mcpv1alpha1.MCPServerSpec{
Image: "test-image:latest",
Transport: "sse",
ProxyPort: 8080,
SessionStorage: &mcpv1alpha1.SessionStorageConfig{
Provider: mcpv1alpha1.SessionStorageProviderRedis,
Address: "redis:6379",
},
RateLimiting: &mcpv1alpha1.RateLimitConfig{
Tools: []mcpv1alpha1.ToolRateLimitConfig{
{
Name: "search",
PerUser: &mcpv1alpha1.RateLimitBucket{
MaxTokens: 10,
RefillPeriod: metav1.Duration{Duration: time.Minute},
},
},
},
},
},
expectStatus: metav1.ConditionFalse,
expectReason: mcpv1alpha1.ConditionReasonRateLimitPerUserRequiresAuth,
},
{
name: "shared-only-no-auth",
spec: mcpv1alpha1.MCPServerSpec{
Image: "test-image:latest",
Transport: "sse",
ProxyPort: 8080,
SessionStorage: &mcpv1alpha1.SessionStorageConfig{
Provider: mcpv1alpha1.SessionStorageProviderRedis,
Address: "redis:6379",
},
RateLimiting: &mcpv1alpha1.RateLimitConfig{
Shared: &mcpv1alpha1.RateLimitBucket{
MaxTokens: 1000,
RefillPeriod: metav1.Duration{Duration: time.Minute},
},
},
},
expectStatus: metav1.ConditionTrue,
expectReason: mcpv1alpha1.ConditionReasonRateLimitConfigValid,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

name := "rl-" + tt.name
namespace := testNamespaceDefault

mcpServer := &mcpv1alpha1.MCPServer{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: tt.spec,
}

testScheme := createTestScheme()
fakeClient := fake.NewClientBuilder().
WithScheme(testScheme).
WithObjects(mcpServer).
WithStatusSubresource(&mcpv1alpha1.MCPServer{}).
Build()

reconciler := newTestMCPServerReconciler(fakeClient, testScheme, kubernetes.PlatformKubernetes)

_, err := reconciler.Reconcile(t.Context(), ctrl.Request{
NamespacedName: types.NamespacedName{Name: name, Namespace: namespace},
})
require.NoError(t, err)

updated := &mcpv1alpha1.MCPServer{}
err = fakeClient.Get(t.Context(), types.NamespacedName{Name: name, Namespace: namespace}, updated)
require.NoError(t, err)

var found bool
for _, cond := range updated.Status.Conditions {
if cond.Type == mcpv1alpha1.ConditionRateLimitConfigValid {
found = true
assert.Equal(t, tt.expectStatus, cond.Status)
assert.Equal(t, tt.expectReason, cond.Reason)
}
}
assert.True(t, found, "ConditionRateLimitConfigValid condition should be set")
})
}
}
Loading
Loading