diff --git a/cmd/thv-operator/api/v1alpha1/mcpserver_types.go b/cmd/thv-operator/api/v1alpha1/mcpserver_types.go index dd665f4250..c2ff2f9a53 100644 --- a/cmd/thv-operator/api/v1alpha1/mcpserver_types.go +++ b/cmd/thv-operator/api/v1alpha1/mcpserver_types.go @@ -147,6 +147,18 @@ const ( ConditionReasonSessionStorageNotApplicable = "SessionStorageWarningNotApplicable" ) +// ConditionRateLimitConfigValid indicates whether the rate limit configuration is valid. +const ConditionRateLimitConfigValid = "RateLimitConfigValid" + +const ( + // ConditionReasonRateLimitConfigValid indicates the rate limit configuration is valid. + ConditionReasonRateLimitConfigValid = "RateLimitConfigValid" + // ConditionReasonRateLimitPerUserRequiresAuth indicates perUser rate limiting requires authentication. + ConditionReasonRateLimitPerUserRequiresAuth = "PerUserRequiresAuth" + // ConditionReasonRateLimitNotApplicable indicates rate limiting is not configured. + ConditionReasonRateLimitNotApplicable = "RateLimitNotApplicable" +) + // SessionStorageProviderRedis is the provider name for Redis-backed session storage. const SessionStorageProviderRedis = "redis" @@ -155,6 +167,8 @@ const SessionStorageProviderRedis = "redis" // +kubebuilder:validation:XValidation:rule="!(has(self.oidcConfig) && has(self.oidcConfigRef))",message="oidcConfig and oidcConfigRef are mutually exclusive; use oidcConfigRef to reference a shared MCPOIDCConfig" // +kubebuilder:validation:XValidation:rule="!(has(self.telemetry) && has(self.telemetryConfigRef))",message="telemetry and telemetryConfigRef are mutually exclusive; migrate to telemetryConfigRef" // +kubebuilder:validation:XValidation:rule="!has(self.rateLimiting) || (has(self.sessionStorage) && self.sessionStorage.provider == 'redis')",message="rateLimiting requires sessionStorage with provider 'redis'" +// +kubebuilder:validation:XValidation:rule="!(has(self.rateLimiting) && has(self.rateLimiting.perUser)) || has(self.oidcConfig) || has(self.oidcConfigRef) || has(self.externalAuthConfigRef)",message="rateLimiting.perUser requires authentication (oidcConfig, oidcConfigRef, or externalAuthConfigRef)" +// +kubebuilder:validation:XValidation:rule="!has(self.rateLimiting) || !has(self.rateLimiting.tools) || self.rateLimiting.tools.all(t, !has(t.perUser)) || has(self.oidcConfig) || has(self.oidcConfigRef) || has(self.externalAuthConfigRef)",message="per-tool perUser rate limiting requires authentication (oidcConfig, oidcConfigRef, or externalAuthConfigRef)" // //nolint:lll // CEL validation rules exceed line length limit type MCPServerSpec struct { @@ -488,16 +502,23 @@ type SessionStorageConfig struct { } // RateLimitConfig defines rate limiting configuration for an MCP server. -// At least one of shared or tools must be configured. +// At least one of shared, perUser, or tools must be configured. // -// +kubebuilder:validation:XValidation:rule="has(self.shared) || (has(self.tools) && size(self.tools) > 0)",message="at least one of shared or tools must be configured" +// +kubebuilder:validation:XValidation:rule="has(self.shared) || has(self.perUser) || (has(self.tools) && size(self.tools) > 0)",message="at least one of shared, perUser, or tools must be configured" // //nolint:lll // CEL validation rules exceed line length limit type RateLimitConfig struct { - // Shared defines a token bucket shared across all users for the entire server. + // Shared is a token bucket shared across all users for the entire server. // +optional Shared *RateLimitBucket `json:"shared,omitempty"` + // PerUser is a token bucket applied independently to each authenticated user + // at the server level. Requires authentication to be enabled. + // Each unique userID creates Redis keys that expire after 2x refillPeriod. + // Memory formula: unique_users_per_TTL_window * (1 + num_tools_with_per_user_limits) keys. + // +optional + PerUser *RateLimitBucket `json:"perUser,omitempty"` + // Tools defines per-tool rate limit overrides. // Each entry applies additional rate limits to calls targeting a specific tool name. // A request must pass both the server-level limit and the per-tool limit. @@ -507,7 +528,8 @@ type RateLimitConfig struct { Tools []ToolRateLimitConfig `json:"tools,omitempty"` } -// RateLimitBucket defines a token bucket configuration. +// RateLimitBucket defines a token bucket configuration with a maximum capacity +// and a refill period. Used by both shared (global) and per-user rate limits. type RateLimitBucket struct { // MaxTokens is the maximum number of tokens (bucket capacity). // This is also the burst size: the maximum number of requests that can be served @@ -524,15 +546,24 @@ type RateLimitBucket struct { } // ToolRateLimitConfig defines rate limits for a specific tool. +// At least one of shared or perUser must be configured. +// +// +kubebuilder:validation:XValidation:rule="has(self.shared) || has(self.perUser)",message="at least one of shared or perUser must be configured" +// +//nolint:lll // kubebuilder marker exceeds line length type ToolRateLimitConfig struct { // Name is the MCP tool name this limit applies to. // +kubebuilder:validation:Required // +kubebuilder:validation:MinLength=1 Name string `json:"name"` - // Shared defines a token bucket shared across all users for this specific tool. - // +kubebuilder:validation:Required - Shared *RateLimitBucket `json:"shared"` + // Shared token bucket for this specific tool. + // +optional + Shared *RateLimitBucket `json:"shared,omitempty"` + + // PerUser token bucket configuration for this tool. + // +optional + PerUser *RateLimitBucket `json:"perUser,omitempty"` } // Permission profile types diff --git a/cmd/thv-operator/api/v1alpha1/zz_generated.deepcopy.go b/cmd/thv-operator/api/v1alpha1/zz_generated.deepcopy.go index 83408b7eb8..bd254deac7 100644 --- a/cmd/thv-operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/cmd/thv-operator/api/v1alpha1/zz_generated.deepcopy.go @@ -2649,6 +2649,11 @@ func (in *RateLimitConfig) DeepCopyInto(out *RateLimitConfig) { *out = new(RateLimitBucket) **out = **in } + if in.PerUser != nil { + in, out := &in.PerUser, &out.PerUser + *out = new(RateLimitBucket) + **out = **in + } if in.Tools != nil { in, out := &in.Tools, &out.Tools *out = make([]ToolRateLimitConfig, len(*in)) @@ -3188,6 +3193,11 @@ func (in *ToolRateLimitConfig) DeepCopyInto(out *ToolRateLimitConfig) { *out = new(RateLimitBucket) **out = **in } + if in.PerUser != nil { + in, out := &in.PerUser, &out.PerUser + *out = new(RateLimitBucket) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ToolRateLimitConfig. diff --git a/cmd/thv-operator/controllers/mcpserver_controller.go b/cmd/thv-operator/controllers/mcpserver_controller.go index 29ddff9b2d..18192eaccb 100644 --- a/cmd/thv-operator/controllers/mcpserver_controller.go +++ b/cmd/thv-operator/controllers/mcpserver_controller.go @@ -196,9 +196,10 @@ func (r *MCPServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( // Validate CABundleRef if specified r.validateCABundleRef(ctx, mcpServer) - // Validate stdio replica cap and session storage requirements + // Validate stdio replica cap, session storage, and rate limit config r.validateStdioReplicaCap(ctx, mcpServer) r.validateSessionStorageForReplicas(ctx, mcpServer) + r.validateRateLimitConfig(ctx, mcpServer) // Validate PodTemplateSpec early - before other validations // This ensures we fail fast if the spec is invalid @@ -2289,6 +2290,61 @@ func (r *MCPServerReconciler) validateSessionStorageForReplicas(ctx context.Cont } } +// setRateLimitConfigCondition sets the RateLimitConfigValid status condition. +func setRateLimitConfigCondition(mcpServer *mcpv1alpha1.MCPServer, status metav1.ConditionStatus, reason, message string) { + meta.SetStatusCondition(&mcpServer.Status.Conditions, metav1.Condition{ + Type: mcpv1alpha1.ConditionRateLimitConfigValid, + Status: status, + Reason: reason, + Message: message, + ObservedGeneration: mcpServer.Generation, + }) +} + +// validateRateLimitConfig validates that per-user rate limiting has authentication enabled. +// Sets the RateLimitConfigValid condition. This is defense-in-depth only; CEL admission +// validation is the primary gate. Reconciliation continues even when the condition is False +// because per-user buckets are silently skipped when userID is empty (graceful degradation). +func (r *MCPServerReconciler) validateRateLimitConfig(ctx context.Context, mcpServer *mcpv1alpha1.MCPServer) { + rl := mcpServer.Spec.RateLimiting + if rl == nil { + setRateLimitConfigCondition(mcpServer, metav1.ConditionTrue, + mcpv1alpha1.ConditionReasonRateLimitNotApplicable, + "rate limiting is not configured") + if err := r.Status().Update(ctx, mcpServer); err != nil { + log.FromContext(ctx).Error(err, "Failed to update MCPServer status after rate limit validation") + } + return + } + + authEnabled := mcpServer.Spec.OIDCConfig != nil || + mcpServer.Spec.OIDCConfigRef != nil || + mcpServer.Spec.ExternalAuthConfigRef != nil + + hasPerUser := rl.PerUser != nil + if !hasPerUser { + for _, t := range rl.Tools { + if t.PerUser != nil { + hasPerUser = true + break + } + } + } + + if hasPerUser && !authEnabled { + setRateLimitConfigCondition(mcpServer, metav1.ConditionFalse, + mcpv1alpha1.ConditionReasonRateLimitPerUserRequiresAuth, + "perUser rate limiting requires authentication to be enabled (oidcConfig, oidcConfigRef, or externalAuthConfigRef)") + } else { + setRateLimitConfigCondition(mcpServer, metav1.ConditionTrue, + mcpv1alpha1.ConditionReasonRateLimitConfigValid, + "rate limit configuration is valid") + } + if err := r.Status().Update(ctx, mcpServer); err != nil { + log.FromContext(ctx).Error(err, "Failed to update MCPServer status after rate limit validation") + } +} + // SetupWithManager sets up the controller with the Manager. func (r *MCPServerReconciler) SetupWithManager(mgr ctrl.Manager) error { // Create a handler that maps MCPExternalAuthConfig changes to MCPServer reconciliation requests diff --git a/cmd/thv-operator/controllers/mcpserver_replicas_test.go b/cmd/thv-operator/controllers/mcpserver_replicas_test.go index 6d3ef53b5f..08d9326e98 100644 --- a/cmd/thv-operator/controllers/mcpserver_replicas_test.go +++ b/cmd/thv-operator/controllers/mcpserver_replicas_test.go @@ -979,3 +979,156 @@ func TestUpdateMCPServerStatusExcludesTerminatingPods(t *testing.T) { assert.Equal(t, int32(2), updatedMCPServer.Status.ReadyReplicas, "ReadyReplicas should exclude terminating pods") } + +func TestRateLimitConfigValidation(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + spec mcpv1alpha1.MCPServerSpec + expectStatus metav1.ConditionStatus + expectReason string + }{ + { + name: "no-rate-limiting", + spec: mcpv1alpha1.MCPServerSpec{ + Image: "test-image:latest", + Transport: "sse", + ProxyPort: 8080, + }, + expectStatus: metav1.ConditionTrue, + expectReason: mcpv1alpha1.ConditionReasonRateLimitNotApplicable, + }, + { + name: "peruser-with-auth", + spec: mcpv1alpha1.MCPServerSpec{ + Image: "test-image:latest", + Transport: "sse", + ProxyPort: 8080, + SessionStorage: &mcpv1alpha1.SessionStorageConfig{ + Provider: mcpv1alpha1.SessionStorageProviderRedis, + Address: "redis:6379", + }, + OIDCConfig: &mcpv1alpha1.OIDCConfigRef{Type: "kubernetes"}, + RateLimiting: &mcpv1alpha1.RateLimitConfig{ + PerUser: &mcpv1alpha1.RateLimitBucket{ + MaxTokens: 100, + RefillPeriod: metav1.Duration{Duration: time.Minute}, + }, + }, + }, + expectStatus: metav1.ConditionTrue, + expectReason: mcpv1alpha1.ConditionReasonRateLimitConfigValid, + }, + { + name: "peruser-without-auth", + spec: mcpv1alpha1.MCPServerSpec{ + Image: "test-image:latest", + Transport: "sse", + ProxyPort: 8080, + SessionStorage: &mcpv1alpha1.SessionStorageConfig{ + Provider: mcpv1alpha1.SessionStorageProviderRedis, + Address: "redis:6379", + }, + RateLimiting: &mcpv1alpha1.RateLimitConfig{ + PerUser: &mcpv1alpha1.RateLimitBucket{ + MaxTokens: 100, + RefillPeriod: metav1.Duration{Duration: time.Minute}, + }, + }, + }, + expectStatus: metav1.ConditionFalse, + expectReason: mcpv1alpha1.ConditionReasonRateLimitPerUserRequiresAuth, + }, + { + name: "per-tool-peruser-without-auth", + spec: mcpv1alpha1.MCPServerSpec{ + Image: "test-image:latest", + Transport: "sse", + ProxyPort: 8080, + SessionStorage: &mcpv1alpha1.SessionStorageConfig{ + Provider: mcpv1alpha1.SessionStorageProviderRedis, + Address: "redis:6379", + }, + RateLimiting: &mcpv1alpha1.RateLimitConfig{ + Tools: []mcpv1alpha1.ToolRateLimitConfig{ + { + Name: "search", + PerUser: &mcpv1alpha1.RateLimitBucket{ + MaxTokens: 10, + RefillPeriod: metav1.Duration{Duration: time.Minute}, + }, + }, + }, + }, + }, + expectStatus: metav1.ConditionFalse, + expectReason: mcpv1alpha1.ConditionReasonRateLimitPerUserRequiresAuth, + }, + { + name: "shared-only-no-auth", + spec: mcpv1alpha1.MCPServerSpec{ + Image: "test-image:latest", + Transport: "sse", + ProxyPort: 8080, + SessionStorage: &mcpv1alpha1.SessionStorageConfig{ + Provider: mcpv1alpha1.SessionStorageProviderRedis, + Address: "redis:6379", + }, + RateLimiting: &mcpv1alpha1.RateLimitConfig{ + Shared: &mcpv1alpha1.RateLimitBucket{ + MaxTokens: 1000, + RefillPeriod: metav1.Duration{Duration: time.Minute}, + }, + }, + }, + expectStatus: metav1.ConditionTrue, + expectReason: mcpv1alpha1.ConditionReasonRateLimitConfigValid, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + name := "rl-" + tt.name + namespace := testNamespaceDefault + + mcpServer := &mcpv1alpha1.MCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: tt.spec, + } + + testScheme := createTestScheme() + fakeClient := fake.NewClientBuilder(). + WithScheme(testScheme). + WithObjects(mcpServer). + WithStatusSubresource(&mcpv1alpha1.MCPServer{}). + Build() + + reconciler := newTestMCPServerReconciler(fakeClient, testScheme, kubernetes.PlatformKubernetes) + + _, err := reconciler.Reconcile(t.Context(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: name, Namespace: namespace}, + }) + require.NoError(t, err) + + updated := &mcpv1alpha1.MCPServer{} + err = fakeClient.Get(t.Context(), types.NamespacedName{Name: name, Namespace: namespace}, updated) + require.NoError(t, err) + + var found bool + for _, cond := range updated.Status.Conditions { + if cond.Type == mcpv1alpha1.ConditionRateLimitConfigValid { + found = true + assert.Equal(t, tt.expectStatus, cond.Status) + assert.Equal(t, tt.expectReason, cond.Reason) + } + } + assert.True(t, found, "ConditionRateLimitConfigValid condition should be set") + }) + } +} diff --git a/deploy/charts/operator-crds/files/crds/toolhive.stacklok.dev_mcpservers.yaml b/deploy/charts/operator-crds/files/crds/toolhive.stacklok.dev_mcpservers.yaml index 4a7059b1fe..f111475c81 100644 --- a/deploy/charts/operator-crds/files/crds/toolhive.stacklok.dev_mcpservers.yaml +++ b/deploy/charts/operator-crds/files/crds/toolhive.stacklok.dev_mcpservers.yaml @@ -502,8 +502,33 @@ spec: RateLimiting defines rate limiting configuration for the MCP server. Requires Redis session storage to be configured for distributed rate limiting. properties: + perUser: + description: |- + PerUser is a token bucket applied independently to each authenticated user + at the server level. Requires authentication to be enabled. + Each unique userID creates Redis keys that expire after 2x refillPeriod. + Memory formula: unique_users_per_TTL_window * (1 + num_tools_with_per_user_limits) keys. + properties: + maxTokens: + description: |- + MaxTokens is the maximum number of tokens (bucket capacity). + This is also the burst size: the maximum number of requests that can be served + instantaneously before the bucket is depleted. + format: int32 + minimum: 1 + type: integer + refillPeriod: + description: |- + RefillPeriod is the duration to fully refill the bucket from zero to maxTokens. + The effective refill rate is maxTokens / refillPeriod tokens per second. + Format: Go duration string (e.g., "1m0s", "30s", "1h0m0s"). + type: string + required: + - maxTokens + - refillPeriod + type: object shared: - description: Shared defines a token bucket shared across all users + description: Shared is a token bucket shared across all users for the entire server. properties: maxTokens: @@ -530,17 +555,39 @@ spec: Each entry applies additional rate limits to calls targeting a specific tool name. A request must pass both the server-level limit and the per-tool limit. items: - description: ToolRateLimitConfig defines rate limits for a specific - tool. + description: |- + ToolRateLimitConfig defines rate limits for a specific tool. + At least one of shared or perUser must be configured. properties: name: description: Name is the MCP tool name this limit applies to. minLength: 1 type: string + perUser: + description: PerUser token bucket configuration for this + tool. + properties: + maxTokens: + description: |- + MaxTokens is the maximum number of tokens (bucket capacity). + This is also the burst size: the maximum number of requests that can be served + instantaneously before the bucket is depleted. + format: int32 + minimum: 1 + type: integer + refillPeriod: + description: |- + RefillPeriod is the duration to fully refill the bucket from zero to maxTokens. + The effective refill rate is maxTokens / refillPeriod tokens per second. + Format: Go duration string (e.g., "1m0s", "30s", "1h0m0s"). + type: string + required: + - maxTokens + - refillPeriod + type: object shared: - description: Shared defines a token bucket shared across - all users for this specific tool. + description: Shared token bucket for this specific tool. properties: maxTokens: description: |- @@ -562,17 +609,19 @@ spec: type: object required: - name - - shared type: object + x-kubernetes-validations: + - message: at least one of shared or perUser must be configured + rule: has(self.shared) || has(self.perUser) type: array x-kubernetes-list-map-keys: - name x-kubernetes-list-type: map type: object x-kubernetes-validations: - - message: at least one of shared or tools must be configured - rule: has(self.shared) || (has(self.tools) && size(self.tools) > - 0) + - message: at least one of shared, perUser, or tools must be configured + rule: has(self.shared) || has(self.perUser) || (has(self.tools) + && size(self.tools) > 0) replicas: description: |- Replicas is the desired number of proxy runner (thv run) pod replicas. @@ -965,6 +1014,15 @@ spec: - message: rateLimiting requires sessionStorage with provider 'redis' rule: '!has(self.rateLimiting) || (has(self.sessionStorage) && self.sessionStorage.provider == ''redis'')' + - message: rateLimiting.perUser requires authentication (oidcConfig, oidcConfigRef, + or externalAuthConfigRef) + rule: '!(has(self.rateLimiting) && has(self.rateLimiting.perUser)) || + has(self.oidcConfig) || has(self.oidcConfigRef) || has(self.externalAuthConfigRef)' + - message: per-tool perUser rate limiting requires authentication (oidcConfig, + oidcConfigRef, or externalAuthConfigRef) + rule: '!has(self.rateLimiting) || !has(self.rateLimiting.tools) || self.rateLimiting.tools.all(t, + !has(t.perUser)) || has(self.oidcConfig) || has(self.oidcConfigRef) + || has(self.externalAuthConfigRef)' status: description: MCPServerStatus defines the observed state of MCPServer properties: diff --git a/deploy/charts/operator-crds/templates/toolhive.stacklok.dev_mcpservers.yaml b/deploy/charts/operator-crds/templates/toolhive.stacklok.dev_mcpservers.yaml index a7232513e7..a00102f26f 100644 --- a/deploy/charts/operator-crds/templates/toolhive.stacklok.dev_mcpservers.yaml +++ b/deploy/charts/operator-crds/templates/toolhive.stacklok.dev_mcpservers.yaml @@ -505,8 +505,33 @@ spec: RateLimiting defines rate limiting configuration for the MCP server. Requires Redis session storage to be configured for distributed rate limiting. properties: + perUser: + description: |- + PerUser is a token bucket applied independently to each authenticated user + at the server level. Requires authentication to be enabled. + Each unique userID creates Redis keys that expire after 2x refillPeriod. + Memory formula: unique_users_per_TTL_window * (1 + num_tools_with_per_user_limits) keys. + properties: + maxTokens: + description: |- + MaxTokens is the maximum number of tokens (bucket capacity). + This is also the burst size: the maximum number of requests that can be served + instantaneously before the bucket is depleted. + format: int32 + minimum: 1 + type: integer + refillPeriod: + description: |- + RefillPeriod is the duration to fully refill the bucket from zero to maxTokens. + The effective refill rate is maxTokens / refillPeriod tokens per second. + Format: Go duration string (e.g., "1m0s", "30s", "1h0m0s"). + type: string + required: + - maxTokens + - refillPeriod + type: object shared: - description: Shared defines a token bucket shared across all users + description: Shared is a token bucket shared across all users for the entire server. properties: maxTokens: @@ -533,17 +558,39 @@ spec: Each entry applies additional rate limits to calls targeting a specific tool name. A request must pass both the server-level limit and the per-tool limit. items: - description: ToolRateLimitConfig defines rate limits for a specific - tool. + description: |- + ToolRateLimitConfig defines rate limits for a specific tool. + At least one of shared or perUser must be configured. properties: name: description: Name is the MCP tool name this limit applies to. minLength: 1 type: string + perUser: + description: PerUser token bucket configuration for this + tool. + properties: + maxTokens: + description: |- + MaxTokens is the maximum number of tokens (bucket capacity). + This is also the burst size: the maximum number of requests that can be served + instantaneously before the bucket is depleted. + format: int32 + minimum: 1 + type: integer + refillPeriod: + description: |- + RefillPeriod is the duration to fully refill the bucket from zero to maxTokens. + The effective refill rate is maxTokens / refillPeriod tokens per second. + Format: Go duration string (e.g., "1m0s", "30s", "1h0m0s"). + type: string + required: + - maxTokens + - refillPeriod + type: object shared: - description: Shared defines a token bucket shared across - all users for this specific tool. + description: Shared token bucket for this specific tool. properties: maxTokens: description: |- @@ -565,17 +612,19 @@ spec: type: object required: - name - - shared type: object + x-kubernetes-validations: + - message: at least one of shared or perUser must be configured + rule: has(self.shared) || has(self.perUser) type: array x-kubernetes-list-map-keys: - name x-kubernetes-list-type: map type: object x-kubernetes-validations: - - message: at least one of shared or tools must be configured - rule: has(self.shared) || (has(self.tools) && size(self.tools) > - 0) + - message: at least one of shared, perUser, or tools must be configured + rule: has(self.shared) || has(self.perUser) || (has(self.tools) + && size(self.tools) > 0) replicas: description: |- Replicas is the desired number of proxy runner (thv run) pod replicas. @@ -968,6 +1017,15 @@ spec: - message: rateLimiting requires sessionStorage with provider 'redis' rule: '!has(self.rateLimiting) || (has(self.sessionStorage) && self.sessionStorage.provider == ''redis'')' + - message: rateLimiting.perUser requires authentication (oidcConfig, oidcConfigRef, + or externalAuthConfigRef) + rule: '!(has(self.rateLimiting) && has(self.rateLimiting.perUser)) || + has(self.oidcConfig) || has(self.oidcConfigRef) || has(self.externalAuthConfigRef)' + - message: per-tool perUser rate limiting requires authentication (oidcConfig, + oidcConfigRef, or externalAuthConfigRef) + rule: '!has(self.rateLimiting) || !has(self.rateLimiting.tools) || self.rateLimiting.tools.all(t, + !has(t.perUser)) || has(self.oidcConfig) || has(self.oidcConfigRef) + || has(self.externalAuthConfigRef)' status: description: MCPServerStatus defines the observed state of MCPServer properties: diff --git a/docs/operator/crd-api.md b/docs/operator/crd-api.md index b9f5a49f18..26e5a2f47f 100644 --- a/docs/operator/crd-api.md +++ b/docs/operator/crd-api.md @@ -2905,7 +2905,8 @@ _Appears in:_ -RateLimitBucket defines a token bucket configuration. +RateLimitBucket defines a token bucket configuration with a maximum capacity +and a refill period. Used by both shared (global) and per-user rate limits. @@ -2924,7 +2925,7 @@ _Appears in:_ RateLimitConfig defines rate limiting configuration for an MCP server. -At least one of shared or tools must be configured. +At least one of shared, perUser, or tools must be configured. @@ -2933,7 +2934,8 @@ _Appears in:_ | Field | Description | Default | Validation | | --- | --- | --- | --- | -| `shared` _[api.v1alpha1.RateLimitBucket](#apiv1alpha1ratelimitbucket)_ | Shared defines a token bucket shared across all users for the entire server. | | Optional: \{\}
| +| `shared` _[api.v1alpha1.RateLimitBucket](#apiv1alpha1ratelimitbucket)_ | Shared is a token bucket shared across all users for the entire server. | | Optional: \{\}
| +| `perUser` _[api.v1alpha1.RateLimitBucket](#apiv1alpha1ratelimitbucket)_ | PerUser is a token bucket applied independently to each authenticated user
at the server level. Requires authentication to be enabled.
Each unique userID creates Redis keys that expire after 2x refillPeriod.
Memory formula: unique_users_per_TTL_window * (1 + num_tools_with_per_user_limits) keys. | | Optional: \{\}
| | `tools` _[api.v1alpha1.ToolRateLimitConfig](#apiv1alpha1toolratelimitconfig) array_ | Tools defines per-tool rate limit overrides.
Each entry applies additional rate limits to calls targeting a specific tool name.
A request must pass both the server-level limit and the per-tool limit. | | Optional: \{\}
| @@ -3418,6 +3420,7 @@ _Appears in:_ ToolRateLimitConfig defines rate limits for a specific tool. +At least one of shared or perUser must be configured. @@ -3427,7 +3430,8 @@ _Appears in:_ | Field | Description | Default | Validation | | --- | --- | --- | --- | | `name` _string_ | Name is the MCP tool name this limit applies to. | | MinLength: 1
Required: \{\}
| -| `shared` _[api.v1alpha1.RateLimitBucket](#apiv1alpha1ratelimitbucket)_ | Shared defines a token bucket shared across all users for this specific tool. | | Required: \{\}
| +| `shared` _[api.v1alpha1.RateLimitBucket](#apiv1alpha1ratelimitbucket)_ | Shared token bucket for this specific tool. | | Optional: \{\}
| +| `perUser` _[api.v1alpha1.RateLimitBucket](#apiv1alpha1ratelimitbucket)_ | PerUser token bucket configuration for this tool. | | Optional: \{\}
| #### api.v1alpha1.URLSource diff --git a/docs/server/docs.go b/docs/server/docs.go index 2319396919..fdf70f16ec 100644 --- a/docs/server/docs.go +++ b/docs/server/docs.go @@ -45,7 +45,7 @@ const docTemplate = `{ "type": "object" }, "github_com_stacklok_toolhive_cmd_thv-operator_api_v1alpha1.RateLimitBucket": { - "description": "Shared defines a token bucket shared across all users for this specific tool.\n+kubebuilder:validation:Required", + "description": "PerUser token bucket configuration for this tool.\n+optional", "properties": { "maxTokens": { "description": "MaxTokens is the maximum number of tokens (bucket capacity).\nThis is also the burst size: the maximum number of requests that can be served\ninstantaneously before the bucket is depleted.\n+kubebuilder:validation:Required\n+kubebuilder:validation:Minimum=1", @@ -60,6 +60,9 @@ const docTemplate = `{ "github_com_stacklok_toolhive_cmd_thv-operator_api_v1alpha1.RateLimitConfig": { "description": "RateLimitConfig contains the CRD rate limiting configuration.\nWhen set, rate limiting middleware is added to the proxy middleware chain.", "properties": { + "perUser": { + "$ref": "#/components/schemas/github_com_stacklok_toolhive_cmd_thv-operator_api_v1alpha1.RateLimitBucket" + }, "shared": { "$ref": "#/components/schemas/github_com_stacklok_toolhive_cmd_thv-operator_api_v1alpha1.RateLimitBucket" }, @@ -80,6 +83,9 @@ const docTemplate = `{ "description": "Name is the MCP tool name this limit applies to.\n+kubebuilder:validation:Required\n+kubebuilder:validation:MinLength=1", "type": "string" }, + "perUser": { + "$ref": "#/components/schemas/github_com_stacklok_toolhive_cmd_thv-operator_api_v1alpha1.RateLimitBucket" + }, "shared": { "$ref": "#/components/schemas/github_com_stacklok_toolhive_cmd_thv-operator_api_v1alpha1.RateLimitBucket" } diff --git a/docs/server/swagger.json b/docs/server/swagger.json index e7aa68b69e..95e69cd871 100644 --- a/docs/server/swagger.json +++ b/docs/server/swagger.json @@ -38,7 +38,7 @@ "type": "object" }, "github_com_stacklok_toolhive_cmd_thv-operator_api_v1alpha1.RateLimitBucket": { - "description": "Shared defines a token bucket shared across all users for this specific tool.\n+kubebuilder:validation:Required", + "description": "PerUser token bucket configuration for this tool.\n+optional", "properties": { "maxTokens": { "description": "MaxTokens is the maximum number of tokens (bucket capacity).\nThis is also the burst size: the maximum number of requests that can be served\ninstantaneously before the bucket is depleted.\n+kubebuilder:validation:Required\n+kubebuilder:validation:Minimum=1", @@ -53,6 +53,9 @@ "github_com_stacklok_toolhive_cmd_thv-operator_api_v1alpha1.RateLimitConfig": { "description": "RateLimitConfig contains the CRD rate limiting configuration.\nWhen set, rate limiting middleware is added to the proxy middleware chain.", "properties": { + "perUser": { + "$ref": "#/components/schemas/github_com_stacklok_toolhive_cmd_thv-operator_api_v1alpha1.RateLimitBucket" + }, "shared": { "$ref": "#/components/schemas/github_com_stacklok_toolhive_cmd_thv-operator_api_v1alpha1.RateLimitBucket" }, @@ -73,6 +76,9 @@ "description": "Name is the MCP tool name this limit applies to.\n+kubebuilder:validation:Required\n+kubebuilder:validation:MinLength=1", "type": "string" }, + "perUser": { + "$ref": "#/components/schemas/github_com_stacklok_toolhive_cmd_thv-operator_api_v1alpha1.RateLimitBucket" + }, "shared": { "$ref": "#/components/schemas/github_com_stacklok_toolhive_cmd_thv-operator_api_v1alpha1.RateLimitBucket" } diff --git a/docs/server/swagger.yaml b/docs/server/swagger.yaml index e12b491388..6227788b23 100644 --- a/docs/server/swagger.yaml +++ b/docs/server/swagger.yaml @@ -33,8 +33,8 @@ components: type: object github_com_stacklok_toolhive_cmd_thv-operator_api_v1alpha1.RateLimitBucket: description: |- - Shared defines a token bucket shared across all users for this specific tool. - +kubebuilder:validation:Required + PerUser token bucket configuration for this tool. + +optional properties: maxTokens: description: |- @@ -52,6 +52,8 @@ components: RateLimitConfig contains the CRD rate limiting configuration. When set, rate limiting middleware is added to the proxy middleware chain. properties: + perUser: + $ref: '#/components/schemas/github_com_stacklok_toolhive_cmd_thv-operator_api_v1alpha1.RateLimitBucket' shared: $ref: '#/components/schemas/github_com_stacklok_toolhive_cmd_thv-operator_api_v1alpha1.RateLimitBucket' tools: @@ -75,6 +77,8 @@ components: +kubebuilder:validation:Required +kubebuilder:validation:MinLength=1 type: string + perUser: + $ref: '#/components/schemas/github_com_stacklok_toolhive_cmd_thv-operator_api_v1alpha1.RateLimitBucket' shared: $ref: '#/components/schemas/github_com_stacklok_toolhive_cmd_thv-operator_api_v1alpha1.RateLimitBucket' type: object diff --git a/pkg/ratelimit/limiter.go b/pkg/ratelimit/limiter.go index 6a4415bb02..0ff06f412f 100644 --- a/pkg/ratelimit/limiter.go +++ b/pkg/ratelimit/limiter.go @@ -22,7 +22,7 @@ import ( type Limiter interface { // Allow checks whether a request is permitted. // toolName is the MCP tool being called (empty for non-tool requests). - // userID is the authenticated user (reserved for #4550, currently no-op). + // userID is the authenticated user (empty for unauthenticated requests). Allow(ctx context.Context, toolName, userID string) (*Decision, error) } @@ -54,6 +54,14 @@ func NewLimiter(client redis.Cmdable, namespace, name string, crd *v1alpha1.Rate l.serverBucket = b } + if crd.PerUser != nil { + spec, err := newBucketSpec(namespace, name, crd.PerUser) + if err != nil { + return nil, fmt.Errorf("perUser bucket: %w", err) + } + l.perUserSpec = &spec + } + for _, t := range crd.Tools { if t.Shared != nil { b, err := newBucket(namespace, name, "shared:tool:"+t.Name, t.Shared) @@ -65,24 +73,43 @@ func NewLimiter(client redis.Cmdable, namespace, name string, crd *v1alpha1.Rate } l.toolBuckets[t.Name] = b } + if t.PerUser != nil { + spec, err := newBucketSpec(namespace, name, t.PerUser) + if err != nil { + return nil, fmt.Errorf("tool %q perUser bucket: %w", t.Name, err) + } + if l.perUserTools == nil { + l.perUserTools = make(map[string]bucketSpec) + } + l.perUserTools[t.Name] = spec + } } return l, nil } +// bucketSpec holds deferred bucket parameters for per-user buckets that are +// created on the fly in Allow() because the userID is not known at construction time. +type bucketSpec struct { + namespace string + serverName string + maxTokens int32 + refillPeriod time.Duration +} + // limiter is the concrete implementation of Limiter. type limiter struct { client redis.Cmdable - serverBucket *bucket.TokenBucket // nil when no global server limit - toolBuckets map[string]*bucket.TokenBucket // tool name -> bucket + serverBucket *bucket.TokenBucket // nil when no shared server limit + toolBuckets map[string]*bucket.TokenBucket // tool name -> shared bucket + perUserSpec *bucketSpec // nil when no server-level per-user limit + perUserTools map[string]bucketSpec // tool name -> per-user bucket spec; nil when none } // Allow atomically checks all applicable rate limit buckets for the request. // Tokens are only consumed if ALL buckets have sufficient capacity, preventing -// a rejected per-tool call from draining the server-level budget. -func (l *limiter) Allow(ctx context.Context, toolName, _ string) (*Decision, error) { - // TODO(#4550): per-user rate limiting — currently ignored. - +// a rejected per-tool or per-user call from draining other budgets. +func (l *limiter) Allow(ctx context.Context, toolName, userID string) (*Decision, error) { // Collect applicable buckets in priority order. var buckets []*bucket.TokenBucket if l.serverBucket != nil { @@ -94,6 +121,36 @@ func (l *limiter) Allow(ctx context.Context, toolName, _ string) (*Decision, err } } + // Per-user buckets are created on the fly because userID is request-scoped. + // bucket.New only allocates a struct — all state lives in Redis, so creating + // a new TokenBucket per request is safe (no local state to lose). + // + // Key prefixes deviate from RFC THV-0057 to prevent cross-type collisions: + // RFC uses "user:{userId}:tool:{toolName}" for both scopes, but a userID + // containing ":tool:" would collide with the per-tool key. Instead we use + // distinct prefixes: "user:" for server-level, "user-tool:" for tool-level. + if userID != "" { + if l.perUserSpec != nil { + s := l.perUserSpec + buckets = append(buckets, bucket.New( + s.namespace, s.serverName, + "user:"+userID, + s.maxTokens, s.refillPeriod, + )) + } + if toolName != "" && l.perUserTools != nil { + if s, ok := l.perUserTools[toolName]; ok { + // Key prefix "user-tool:" is distinct from "user:" to prevent + // collisions when a userID contains delimiter characters. + buckets = append(buckets, bucket.New( + s.namespace, s.serverName, + "user-tool:"+toolName+":"+userID, + s.maxTokens, s.refillPeriod, + )) + } + } + } + if len(buckets) == 0 { return &Decision{Allowed: true}, nil } @@ -119,14 +176,38 @@ func (noopLimiter) Allow(context.Context, string, string) (*Decision, error) { return &Decision{Allowed: true}, nil } -// newBucket validates a CRD bucket spec and creates a TokenBucket. -func newBucket(namespace, serverName, suffix string, b *v1alpha1.RateLimitBucket) (*bucket.TokenBucket, error) { +// validateBucketCRD checks that a CRD bucket spec has valid parameters. +func validateBucketCRD(b *v1alpha1.RateLimitBucket) (int32, time.Duration, error) { if b.MaxTokens < 1 { - return nil, fmt.Errorf("maxTokens must be >= 1, got %d", b.MaxTokens) + return 0, 0, fmt.Errorf("maxTokens must be >= 1, got %d", b.MaxTokens) } d := b.RefillPeriod.Duration if d <= 0 { - return nil, fmt.Errorf("refillPeriod must be positive, got %s", d) + return 0, 0, fmt.Errorf("refillPeriod must be positive, got %s", d) + } + return b.MaxTokens, d, nil +} + +// newBucket validates a CRD bucket spec and creates a TokenBucket. +func newBucket(namespace, serverName, suffix string, b *v1alpha1.RateLimitBucket) (*bucket.TokenBucket, error) { + maxTokens, refillPeriod, err := validateBucketCRD(b) + if err != nil { + return nil, err + } + return bucket.New(namespace, serverName, suffix, maxTokens, refillPeriod), nil +} + +// newBucketSpec validates a CRD bucket spec and creates a deferred bucketSpec +// for per-user buckets that are materialized at Allow() time. +func newBucketSpec(namespace, serverName string, b *v1alpha1.RateLimitBucket) (bucketSpec, error) { + maxTokens, refillPeriod, err := validateBucketCRD(b) + if err != nil { + return bucketSpec{}, err } - return bucket.New(namespace, serverName, suffix, b.MaxTokens, d), nil + return bucketSpec{ + namespace: namespace, + serverName: serverName, + maxTokens: maxTokens, + refillPeriod: refillPeriod, + }, nil } diff --git a/pkg/ratelimit/limiter_test.go b/pkg/ratelimit/limiter_test.go index ce68954cdf..a825303d99 100644 --- a/pkg/ratelimit/limiter_test.go +++ b/pkg/ratelimit/limiter_test.go @@ -4,7 +4,6 @@ package ratelimit import ( - "context" "testing" "time" @@ -34,7 +33,7 @@ func TestNewLimiter_NilCRDReturnsNoop(t *testing.T) { l, err := NewLimiter(client, "ns", "srv", nil) require.NoError(t, err) - d, err := l.Allow(context.Background(), "anything", "user-a") + d, err := l.Allow(t.Context(), "anything", "user-a") require.NoError(t, err) assert.True(t, d.Allowed) } @@ -74,7 +73,7 @@ func TestNewLimiter_ZeroDuration(t *testing.T) { func TestLimiter_ServerGlobalExhausted(t *testing.T) { t.Parallel() client, _ := newTestClient(t) - ctx := context.Background() + ctx := t.Context() crd := &v1alpha1.RateLimitConfig{ Shared: &v1alpha1.RateLimitBucket{MaxTokens: 2, RefillPeriod: metav1.Duration{Duration: time.Minute}}, @@ -97,7 +96,7 @@ func TestLimiter_ServerGlobalExhausted(t *testing.T) { func TestLimiter_PerToolIsolation(t *testing.T) { t.Parallel() client, _ := newTestClient(t) - ctx := context.Background() + ctx := t.Context() crd := &v1alpha1.RateLimitConfig{ Tools: []v1alpha1.ToolRateLimitConfig{ @@ -127,7 +126,7 @@ func TestLimiter_PerToolIsolation(t *testing.T) { func TestLimiter_ServerAndPerToolBothRequired(t *testing.T) { t.Parallel() client, _ := newTestClient(t) - ctx := context.Background() + ctx := t.Context() crd := &v1alpha1.RateLimitConfig{ Shared: &v1alpha1.RateLimitBucket{MaxTokens: 5, RefillPeriod: metav1.Duration{Duration: time.Minute}}, @@ -170,32 +169,190 @@ func TestLimiter_RedisUnavailableReturnsError(t *testing.T) { mr.Close() - _, err = l.Allow(context.Background(), "", "") + _, err = l.Allow(t.Context(), "", "") assert.Error(t, err) } -func TestLimiter_UserIDNoOp(t *testing.T) { +func TestLimiter_PerUserServerLevel(t *testing.T) { t.Parallel() client, _ := newTestClient(t) - ctx := context.Background() + ctx := t.Context() crd := &v1alpha1.RateLimitConfig{ - Shared: &v1alpha1.RateLimitBucket{MaxTokens: 2, RefillPeriod: metav1.Duration{Duration: time.Minute}}, + PerUser: &v1alpha1.RateLimitBucket{MaxTokens: 2, RefillPeriod: metav1.Duration{Duration: time.Minute}}, } l, err := NewLimiter(client, "ns", "srv", crd) require.NoError(t, err) - // Different users share the same global bucket (per-user not yet implemented). + // User A exhausts their 2 tokens. + for range 2 { + d, err := l.Allow(ctx, "", "user-a") + require.NoError(t, err) + require.True(t, d.Allowed) + } d, err := l.Allow(ctx, "", "user-a") require.NoError(t, err) - require.True(t, d.Allowed) + assert.False(t, d.Allowed) + assert.Greater(t, d.RetryAfter, time.Duration(0)) + // User B is independent — still has full budget. d, err = l.Allow(ctx, "", "user-b") require.NoError(t, err) + assert.True(t, d.Allowed) +} + +func TestLimiter_PerToolPerUserIsolation(t *testing.T) { + t.Parallel() + client, _ := newTestClient(t) + ctx := t.Context() + + crd := &v1alpha1.RateLimitConfig{ + Tools: []v1alpha1.ToolRateLimitConfig{ + { + Name: "search", + PerUser: &v1alpha1.RateLimitBucket{MaxTokens: 1, RefillPeriod: metav1.Duration{Duration: time.Minute}}, + }, + }, + } + l, err := NewLimiter(client, "ns", "srv", crd) + require.NoError(t, err) + + // User A uses their 1 token for "search". + d, err := l.Allow(ctx, "search", "user-a") + require.NoError(t, err) require.True(t, d.Allowed) - // Third call from any user is rejected. - d, err = l.Allow(ctx, "", "user-c") + // User A rejected for "search". + d, err = l.Allow(ctx, "search", "user-a") + require.NoError(t, err) + assert.False(t, d.Allowed) + + // User B can still use "search". + d, err = l.Allow(ctx, "search", "user-b") + require.NoError(t, err) + assert.True(t, d.Allowed) + + // User A can use a different tool (no limit configured for "list"). + d, err = l.Allow(ctx, "list", "user-a") + require.NoError(t, err) + assert.True(t, d.Allowed) +} + +func TestLimiter_ServerAndToolPerUserBothRequired(t *testing.T) { + t.Parallel() + client, _ := newTestClient(t) + ctx := t.Context() + + crd := &v1alpha1.RateLimitConfig{ + PerUser: &v1alpha1.RateLimitBucket{MaxTokens: 5, RefillPeriod: metav1.Duration{Duration: time.Minute}}, + Tools: []v1alpha1.ToolRateLimitConfig{ + { + Name: "search", + PerUser: &v1alpha1.RateLimitBucket{MaxTokens: 2, RefillPeriod: metav1.Duration{Duration: time.Minute}}, + }, + }, + } + l, err := NewLimiter(client, "ns", "srv", crd) + require.NoError(t, err) + + // User A makes 2 "search" calls — both pass. + for range 2 { + d, err := l.Allow(ctx, "search", "user-a") + require.NoError(t, err) + require.True(t, d.Allowed) + } + + // Third "search" rejected by per-tool per-user limit (server per-user still has 3). + d, err := l.Allow(ctx, "search", "user-a") + require.NoError(t, err) + assert.False(t, d.Allowed) + + // "list" (no per-tool limit) still allowed for user A. + d, err = l.Allow(ctx, "list", "user-a") + require.NoError(t, err) + assert.True(t, d.Allowed) +} + +func TestLimiter_PerUserRejectionDoesNotDrainShared(t *testing.T) { + t.Parallel() + client, _ := newTestClient(t) + ctx := t.Context() + + // Shared: 3 tokens, PerUser: 1 token. + // A noisy user hitting their per-user limit must not consume shared tokens. + crd := &v1alpha1.RateLimitConfig{ + Shared: &v1alpha1.RateLimitBucket{MaxTokens: 3, RefillPeriod: metav1.Duration{Duration: time.Minute}}, + PerUser: &v1alpha1.RateLimitBucket{MaxTokens: 1, RefillPeriod: metav1.Duration{Duration: time.Minute}}, + } + l, err := NewLimiter(client, "ns", "srv", crd) + require.NoError(t, err) + + // User A: first call passes (shared=2, user-a=0). + d, err := l.Allow(ctx, "", "user-a") + require.NoError(t, err) + require.True(t, d.Allowed) + + // User A: second call rejected by per-user limit. Shared must NOT be drained. + d, err = l.Allow(ctx, "", "user-a") require.NoError(t, err) assert.False(t, d.Allowed) + + // Users B and C should each succeed (shared still has 2 tokens). + d, err = l.Allow(ctx, "", "user-b") + require.NoError(t, err) + assert.True(t, d.Allowed, "user-b should not be blocked — shared bucket should not have been drained by user-a's rejected request") + + d, err = l.Allow(ctx, "", "user-c") + require.NoError(t, err) + assert.True(t, d.Allowed, "user-c should not be blocked — shared bucket should still have tokens") + + // Now shared is exhausted (3 consumed: a, b, c). User D is rejected by shared. + d, err = l.Allow(ctx, "", "user-d") + require.NoError(t, err) + assert.False(t, d.Allowed, "user-d should be rejected — shared bucket is now exhausted") +} + +func TestLimiter_RedisUnavailablePerUser(t *testing.T) { + t.Parallel() + client, mr := newTestClient(t) + + crd := &v1alpha1.RateLimitConfig{ + PerUser: &v1alpha1.RateLimitBucket{MaxTokens: 10, RefillPeriod: metav1.Duration{Duration: time.Minute}}, + } + l, err := NewLimiter(client, "ns", "srv", crd) + require.NoError(t, err) + + mr.Close() + + _, err = l.Allow(t.Context(), "", "user-a") + assert.Error(t, err) +} + +func TestNewLimiter_PerUserZeroMaxTokens(t *testing.T) { + t.Parallel() + client, _ := newTestClient(t) + + crd := &v1alpha1.RateLimitConfig{ + PerUser: &v1alpha1.RateLimitBucket{MaxTokens: 0, RefillPeriod: metav1.Duration{Duration: time.Minute}}, + } + _, err := NewLimiter(client, "ns", "srv", crd) + assert.Error(t, err) + assert.Contains(t, err.Error(), "perUser bucket: maxTokens must be >= 1") +} + +func TestNewLimiter_ToolPerUserZeroDuration(t *testing.T) { + t.Parallel() + client, _ := newTestClient(t) + + crd := &v1alpha1.RateLimitConfig{ + Tools: []v1alpha1.ToolRateLimitConfig{ + { + Name: "search", + PerUser: &v1alpha1.RateLimitBucket{MaxTokens: 5, RefillPeriod: metav1.Duration{Duration: 0}}, + }, + }, + } + _, err := NewLimiter(client, "ns", "srv", crd) + assert.Error(t, err) + assert.Contains(t, err.Error(), `tool "search" perUser bucket: refillPeriod must be positive`) }