diff --git a/CODEOWNERS b/CODEOWNERS index 6c46db6201a86..8038f96a09eaf 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -494,5 +494,6 @@ extensions/upstreams/tcp @ggreenway @mattklein123 /contrib/peak_ewma/load_balancing_policies/ @rroblak @UNOWNED /contrib/kae/ @Misakokoro @UNOWNED /contrib/istio @kyessenov @wbpcode @keithmattix @krinkinmu @zirain +/contrib/reverse_tunnel_reporter @agrawroh @aakugan /compat/openssl/ @tedjpoole @envoyproxy/envoy-openssl-sync diff --git a/api/BUILD b/api/BUILD index 2b36b337b1c34..4cd9d47ab049f 100644 --- a/api/BUILD +++ b/api/BUILD @@ -104,6 +104,8 @@ proto_library( "//contrib/envoy/extensions/private_key_providers/kae/v3alpha:pkg", "//contrib/envoy/extensions/private_key_providers/qat/v3alpha:pkg", "//contrib/envoy/extensions/regex_engines/hyperscan/v3alpha:pkg", + "//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client:pkg", + "//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters:pkg", "//contrib/envoy/extensions/router/cluster_specifier/golang/v3alpha:pkg", "//contrib/envoy/extensions/stat_sinks/kafka/v3:pkg", "//contrib/envoy/extensions/tap_sinks/udp_sink/v3alpha:pkg", diff --git a/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/BUILD b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/BUILD new file mode 100644 index 0000000000000..6409469bbd62f --- /dev/null +++ b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/BUILD @@ -0,0 +1,13 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + has_services = True, + deps = [ + "//envoy/config/core/v3:pkg", + "@xds//udpa/annotations:pkg", + ], +) diff --git a/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/grpc_client.proto b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/grpc_client.proto new file mode 100644 index 0000000000000..eaa122194bc2c --- /dev/null +++ b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/grpc_client.proto @@ -0,0 +1,50 @@ +syntax = "proto3"; + +package envoy.extensions.reverse_tunnel_reporters.v3alpha.clients.grpc_client; + +import "google/protobuf/duration.proto"; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.reverse_tunnel_reporters.v3alpha.clients.grpc_client"; +option java_outer_classname = "GrpcClientProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// Configuration for gRPC push-based connection event client. +// Actively pushes connection events to a cluster using grpc using some internal timing. +// [#next-free-field: 7] +message GrpcClientConfig { + // Stat prefix for this client's metrics. + string stat_prefix = 1; + + // Name of the cluster to send gRPC requests to. + // It must be present in the config otherwise the setup will throw error in the onServerInitialized. + string cluster = 2 [(validate.rules).string = {min_len: 1}]; + + // Default interval between sending batched connection events. + // Default is 5s. + google.protobuf.Duration default_send_interval = 3 [(validate.rules).duration = { + lte {seconds: 3600} + gte {nanos: 25000000} + }]; + + // Interval between connection retry attempts to the gRPC service. + // Connect timeouts are provided at the cluster level and will be handled by the http/2 client. + // How much time to wait after a failed connect before retrying. Default is 5s. + google.protobuf.Duration connect_retry_interval = 4 [(validate.rules).duration = { + lte {seconds: 3600} + gte {nanos: 25000000} + }]; + + // Maximum number of retry attempts for failed gRPC sends. + // Basically the cluster will have default_send_interval * max_retries time to respond. + // Default is 5. After this we will disconnect and try to connect again. + uint32 max_retries = 5; + + // Maximum events to buffer at any given time + // Default is 1,000,000. + uint32 max_buffer = 6; +} diff --git a/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/stream_reverse_tunnels.proto b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/stream_reverse_tunnels.proto new file mode 100644 index 0000000000000..ebc381c0c6094 --- /dev/null +++ b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/stream_reverse_tunnels.proto @@ -0,0 +1,117 @@ +syntax = "proto3"; + +package envoy.extensions.reverse_tunnel_reporters.v3alpha.clients.grpc_client; + +import "envoy/config/core/v3/base.proto"; + +import "google/protobuf/duration.proto"; +import "google/protobuf/struct.proto"; +import "google/protobuf/timestamp.proto"; +import "google/rpc/status.proto"; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.reverse_tunnel_reporters.v3alpha.clients.grpc_client"; +option java_outer_classname = "StreamReverseTunnelsProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#protodoc-title: Reverse Tunnel Reporting Service] + +// ReverseTunnelReportingService allows Envoy instances to report reverse tunnel +// connection state changes to a management server for monitoring and coordination. +service ReverseTunnelReportingService { + // Bidirectional stream for reporting reverse tunnel connection state changes. + // The management server can control reporting intervals and acknowledge received reports. + rpc StreamReverseTunnels(stream StreamReverseTunnelsRequest) + returns (stream StreamReverseTunnelsResponse) { + } +} + +// Request message sent by Envoy to report reverse tunnel state changes. +// [#next-free-field: 7] +message StreamReverseTunnelsRequest { + // Node identifier for the reporting Envoy instance. + // This identifies which Envoy instance is sending the report. + config.core.v3.Node node = 1 [(validate.rules).message = {required: true}]; + + // List of reverse tunnels that were established since the last report. + // Each tunnel represents a new connection from a downstream Envoy. + repeated ReverseTunnel added_tunnels = 2; + + // List of tunnel names that were disconnected since the last report. + // Only the tunnel name is needed for removal notifications. + repeated string removed_tunnel_names = 3 + [(validate.rules).repeated = {items {string {min_len: 1}}}]; + + // Optional metadata for additional context or debugging information. + // Can include deployment information, version details, etc. + google.protobuf.Struct metadata = 4; + + // Indicates whether this report contains all active tunnels (true) or + // only changes since the last report (false). Usually invoked only on server disconnects. + bool full_push = 5; + + // Unique nonce for this request to enable proper ACK/NACK handling. + // Must be non-negative and should increment for each request. + // This can also be modified to be used for checksum and tracking in the future.a + int64 nonce = 6 [(validate.rules).int64 = {gte: 0}]; +} + +// Response message sent by the management server to control reporting behavior. +message StreamReverseTunnelsResponse { + // Node identifier acknowledging which Envoy instance this response is for. + // Should match the node from the corresponding request. + config.core.v3.Node node = 1; + + // Interval at which Envoy should send tunnel state reports. + // This is used to change the reporting_interval -> no need to repeat the same value. + google.protobuf.Duration report_interval = 2 [(validate.rules).duration = {lte {seconds: 3600}}]; + + // Nonce from the request being acknowledged or rejected. + // Must match the nonce from the corresponding request. + int64 request_nonce = 3 [(validate.rules).int64 = {gte: 0}]; + + // Error details if the previous request failed processing. + // If populated, indicates the request was rejected (NACK). + // If empty, indicates successful processing (ACK). + // NACK will terminate the connection -> useful for logging rather than just some disconnect. + // So basically -> NACK then terminate. + google.rpc.Status error_detail = 4; +} + +// Represents a single reverse tunnel connection with its metadata. +message ReverseTunnel { + // Unique name to identify this tunnel connection. + // Typically formatted as "{node_id}|{cluster_id}" or similar. + // Must be unique within the reporting Envoy instance. + // This is also used for the reporting the disconnection with the associated tunnel initiator. + string name = 1 [(validate.rules).string = {min_len: 1}]; + + // Detailed information about the tunnel connection. + ReverseTunnelInfo tunnel_info = 2 [(validate.rules).message = {required: true}]; +} + +// Detailed information about a reverse tunnel connection. +message ReverseTunnelInfo { + // Identity information of the tunnel initiator (downstream Envoy). + // Contains node_id, cluster_id, and tenant_id for proper identification. + TunnelInitiatorIdentity identity = 1 [(validate.rules).message = {required: true}]; + + // Timestamp when this tunnel connection was created. + // Used for ordering events and debugging connection timing issues. + google.protobuf.Timestamp created_at = 2 [(validate.rules).timestamp = {required: true}]; +} + +message TunnelInitiatorIdentity { + // Required: Tenant identifier of the initiating Envoy instance. + string tenant_id = 1 [(validate.rules).string = {min_len: 1 max_len: 128}]; + + // Required: Cluster identifier of the initiating Envoy instance. + string cluster_id = 2 [(validate.rules).string = {min_len: 1 max_len: 128}]; + + // Required: Node identifier of the initiating Envoy instance. + string node_id = 3 [(validate.rules).string = {min_len: 1 max_len: 128}]; +} diff --git a/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/BUILD b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/BUILD new file mode 100644 index 0000000000000..5f552f08145ca --- /dev/null +++ b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/BUILD @@ -0,0 +1,9 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = ["@xds//udpa/annotations:pkg"], +) diff --git a/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.proto b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.proto new file mode 100644 index 0000000000000..2b1315d801544 --- /dev/null +++ b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.proto @@ -0,0 +1,32 @@ +syntax = "proto3"; + +package envoy.extensions.reverse_tunnel_reporters.v3alpha.reporters; + +import "google/protobuf/any.proto"; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.reverse_tunnel_reporters.v3alpha.reporters"; +option java_outer_classname = "EventReporterProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +message ReverseConnectionReporterClient { + // Name to use to pick out the client should match the one reported by the factory. + string name = 1 [(validate.rules).string = {min_len: 1}]; + + // Typed config for the client + google.protobuf.Any typed_config = 2 [(validate.rules).any = {required: true}]; +} + +// Configuration for the connection event reporter. +message EventReporterConfig { + // Stat prefix for this reporter's metrics. + // Metrics will be emitted as "{stat_prefix}.events_pushed", etc. + string stat_prefix = 1; + + // List of clients to report to. + repeated ReverseConnectionReporterClient clients = 2 [(validate.rules).repeated = {min_items: 1}]; +} diff --git a/api/versioning/BUILD b/api/versioning/BUILD index d9358c863733e..3cd46fa04a6c4 100644 --- a/api/versioning/BUILD +++ b/api/versioning/BUILD @@ -43,6 +43,8 @@ proto_library( "//contrib/envoy/extensions/private_key_providers/kae/v3alpha:pkg", "//contrib/envoy/extensions/private_key_providers/qat/v3alpha:pkg", "//contrib/envoy/extensions/regex_engines/hyperscan/v3alpha:pkg", + "//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client:pkg", + "//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters:pkg", "//contrib/envoy/extensions/router/cluster_specifier/golang/v3alpha:pkg", "//contrib/envoy/extensions/stat_sinks/kafka/v3:pkg", "//contrib/envoy/extensions/tap_sinks/udp_sink/v3alpha:pkg", diff --git a/contrib/contrib_build_config.bzl b/contrib/contrib_build_config.bzl index 8c7bad41f4d92..9baa8d1541cff 100644 --- a/contrib/contrib_build_config.bzl +++ b/contrib/contrib_build_config.bzl @@ -118,4 +118,10 @@ CONTRIB_EXTENSIONS = { # "envoy.upstreams.http.tcp.golang": "//contrib/golang/upstreams/http/tcp/source:config", + + # + # Reverse tunnel reporters + # + + "envoy.bootstrap.reverse_tunnel.reverse_tunnel_reporting_service": "//contrib/reverse_tunnel_reporter/source:config", } diff --git a/contrib/extensions_metadata.yaml b/contrib/extensions_metadata.yaml index eca1671c20e35..e2f133745f368 100644 --- a/contrib/extensions_metadata.yaml +++ b/contrib/extensions_metadata.yaml @@ -205,3 +205,11 @@ envoy.load_balancing_policies.peak_ewma: status: alpha type_urls: - envoy.extensions.load_balancing_policies.peak_ewma.v3alpha.PeakEwma +envoy.bootstrap.reverse_tunnel.reverse_tunnel_reporting_service: + categories: + - envoy.bootstrap + security_posture: requires_trusted_downstream_and_upstream + status: alpha + type_urls: + - envoy.extensions.reverse_tunnel_reporters.v3alpha.reporters.EventReporterConfig + - envoy.extensions.reverse_tunnel_reporters.v3alpha.clients.grpc_client.GrpcClientConfig diff --git a/contrib/reverse_tunnel_reporter/examples/BUILD b/contrib/reverse_tunnel_reporter/examples/BUILD new file mode 100644 index 0000000000000..2d1e66fbd5a01 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/examples/BUILD @@ -0,0 +1,15 @@ +load("//bazel:envoy_build_system.bzl", "envoy_cc_binary", "envoy_contrib_package") + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_binary( + name = "test_server", + srcs = ["test_server.cc"], + deps = [ + "@com_github_grpc_grpc//:grpc++", + "@envoy_api//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client:pkg_cc_grpc", + "@envoy_api//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client:pkg_cc_proto", + ], +) diff --git a/contrib/reverse_tunnel_reporter/examples/responder-reporter-envoy.yaml b/contrib/reverse_tunnel_reporter/examples/responder-reporter-envoy.yaml new file mode 100644 index 0000000000000..f0eae2c59a9f3 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/examples/responder-reporter-envoy.yaml @@ -0,0 +1,160 @@ +--- +node: + id: upstream-node + cluster: upstream-cluster + +# Enable reverse connection bootstrap extension +bootstrap_extensions: +- name: envoy.bootstrap.reverse_tunnel.upstream_socket_interface + typed_config: + "@type": >- + type.googleapis.com/envoy.extensions.bootstrap.reverse_tunnel.upstream_socket_interface.v3.UpstreamReverseConnectionSocketInterface + stat_prefix: "upstream_reverse_connection" + reporter_config: + name: "envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.reporters.event_reporter" + typed_config: + "@type": >- + type.googleapis.com/envoy.extensions.reverse_tunnel_reporters.v3alpha.reporters.EventReporterConfig + stat_prefix: "reverse_connection_reporter" + clients: + - name: "envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.clients.grpc_client" + typed_config: + "@type": >- + type.googleapis.com/envoy.extensions.reverse_tunnel_reporters.v3alpha.clients.grpc_client.GrpcClientConfig + stat_prefix: "reverse_connection_grpc_client" + cluster: "connection_reporting_service" + default_send_interval: "10s" + connect_retry_interval: "5s" + max_retries: 3 + max_buffer: 100000 + +static_resources: + listeners: + # Accepts reverse tunnel requests + - name: rev_conn_api_listener + address: + socket_address: + address: 0.0.0.0 + port_value: 9000 + filter_chains: + - filters: + - name: envoy.filters.network.reverse_tunnel + typed_config: + "@type": >- + type.googleapis.com/envoy.extensions.filters.network.reverse_tunnel.v3.ReverseTunnel + ping_interval: 2s + + # Listener that will route the downstream request to the reverse connection cluster + - name: egress_listener + address: + socket_address: + address: 0.0.0.0 + port_value: 8085 + filter_chains: + - filters: + - name: envoy.http_connection_manager + typed_config: + "@type": >- + type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: egress_http + route_config: + virtual_hosts: + - name: backend + domains: + - "*" + routes: + - match: + prefix: "/downstream_service" + route: + cluster: reverse_connection_cluster + http_filters: + - name: envoy.filters.http.lua + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.lua.v3.Lua + inline_code: | + function envoy_on_request(request_handle) + local headers = request_handle:headers() + local node_id = headers:get("x-node-id") + local cluster_id = headers:get("x-cluster-id") + + local host_id = "" + + -- Priority 1: x-node-id header + if node_id then + host_id = node_id + request_handle:logInfo("Using x-node-id as host_id: " .. host_id) + -- Priority 2: x-cluster-id header + elseif cluster_id then + host_id = cluster_id + request_handle:logInfo("Using x-cluster-id as host_id: " .. host_id) + else + request_handle:logError("No valid headers found: x-node-id or x-cluster-id") + return + end + + headers:add("x-computed-host-id", host_id) + end + - name: envoy.filters.http.router + typed_config: + "@type": >- + type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + + # Cluster used to write requests to cached sockets + clusters: + - name: reverse_connection_cluster + connect_timeout: 200s + lb_policy: CLUSTER_PROVIDED + cluster_type: + name: envoy.clusters.reverse_connection + typed_config: + "@type": >- + type.googleapis.com/envoy.extensions.clusters.reverse_connection.v3.ReverseConnectionClusterConfig + cleanup_interval: 60s + # This is the actual host ID that will be used by the reverse connection cluster to look up a socket. + # The reverse connection cluster checks if there are cached sockets for this cluster, if so, it will + # use the socket. Otherwise, it assumes this is a downstream node and looks for cached sockets with + # this as the node instead. + host_id_format: "%REQ(x-computed-host-id)%" + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + "@type": >- + type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: {} + + - name: connection_reporting_service + connect_timeout: 30s + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: connection_reporting_service + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 9097 + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + "@type": >- + type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: {} + +admin: + access_log: + - name: envoy.access_loggers.file + typed_config: + "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog + path: "/dev/stdout" + address: + socket_address: + address: 0.0.0.0 + port_value: 8888 + +layered_runtime: + layers: + - name: layer + static_layer: + re2.max_program_size.error_level: 1000 + envoy.reloadable_features.reverse_conn_force_local_reply: true diff --git a/contrib/reverse_tunnel_reporter/examples/test_server.cc b/contrib/reverse_tunnel_reporter/examples/test_server.cc new file mode 100644 index 0000000000000..f7205eba4721b --- /dev/null +++ b/contrib/reverse_tunnel_reporter/examples/test_server.cc @@ -0,0 +1,68 @@ +// NOLINT(namespace-envoy) +// This is an example server which receives the reports and prints them out as an example for +// receivers to build their servers. Use this for quick manual validation if needed. +#include +#include +#include + +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/stream_reverse_tunnels.grpc.pb.h" +#include "grpc++/grpc++.h" +#include "grpc++/server.h" +#include "grpc++/server_builder.h" +#include "grpc++/server_context.h" + +using envoy::extensions::reverse_tunnel_reporters::v3alpha::clients::grpc_client:: + StreamReverseTunnelsRequest; +using envoy::extensions::reverse_tunnel_reporters::v3alpha::clients::grpc_client:: + StreamReverseTunnelsResponse; + +constexpr int PORT = 9097; + +class ReverseTunnelServiceImpl final + : public envoy::extensions::reverse_tunnel_reporters::v3alpha::clients::grpc_client:: + ReverseTunnelReportingService::Service { +public: + grpc::Status StreamReverseTunnels( + grpc::ServerContext* /*context*/, + grpc::ServerReaderWriter* stream) + override { + + StreamReverseTunnelsRequest request; + while (stream->Read(&request)) { + std::cout << "Received: nonce=" << request.nonce() << ", full_push=" << request.full_push() + << ", added=" << request.added_tunnels_size() + << ", removed=" << request.removed_tunnel_names_size() << std::endl; + + StreamReverseTunnelsResponse response; + response.set_request_nonce(request.nonce()); + + // Optionally set report_interval to control client send rate + response.mutable_report_interval()->set_seconds(1); + + if (!stream->Write(response)) { + std::cerr << "Failed to write response" << std::endl; + break; + } + std::cout << "Sent ACK for nonce=" << request.nonce() << std::endl; + } + + std::cout << "Stream closed" << std::endl; + return grpc::Status::OK; + } +}; + +int main(int, char**) { + std::string server_address = "0.0.0.0:" + std::to_string(PORT); + + ReverseTunnelServiceImpl service; + grpc::ServerBuilder builder; + + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + builder.RegisterService(&service); + + std::unique_ptr server(builder.BuildAndStart()); + std::cout << "Server listening on " << server_address << std::endl; + + server->Wait(); + return 0; +} diff --git a/contrib/reverse_tunnel_reporter/source/BUILD b/contrib/reverse_tunnel_reporter/source/BUILD new file mode 100644 index 0000000000000..6890914b2f78a --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/BUILD @@ -0,0 +1,36 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_contrib_extension", + "envoy_cc_library", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_library( + name = "reverse_tunnel_event_types", + hdrs = [ + "reverse_tunnel_event_types.h", + ], + deps = [ + "//envoy/common:pure_lib", + "//envoy/common:time_interface", + "//envoy/config:typed_config_interface", + "//envoy/extensions/bootstrap/reverse_tunnel:reverse_tunnel_reporter_lib", + "//envoy/server:factory_context_interface", + "//source/common/common:fmt_lib", + "//source/common/config:utility_lib", + "//source/common/protobuf", + "//source/common/protobuf:message_validator_lib", + ], +) + +envoy_cc_contrib_extension( + name = "config", + deps = [ + "//contrib/reverse_tunnel_reporter/source/clients:clients_lib", + "//contrib/reverse_tunnel_reporter/source/reporters:reporters_lib", + ], +) diff --git a/contrib/reverse_tunnel_reporter/source/clients/BUILD b/contrib/reverse_tunnel_reporter/source/clients/BUILD new file mode 100644 index 0000000000000..e8886c7f0e7e5 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/clients/BUILD @@ -0,0 +1,16 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_library( + name = "clients_lib", + deps = [ + "//contrib/reverse_tunnel_reporter/source/clients/grpc_client:grpc_client_lib", + ], +) diff --git a/contrib/reverse_tunnel_reporter/source/clients/grpc_client/BUILD b/contrib/reverse_tunnel_reporter/source/clients/grpc_client/BUILD new file mode 100644 index 0000000000000..17adeffe19274 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/clients/grpc_client/BUILD @@ -0,0 +1,29 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_library( + name = "grpc_client_lib", + srcs = [ + "client.cc", + "factory.cc", + ], + hdrs = [ + "client.h", + "factory.h", + ], + deps = [ + "//contrib/reverse_tunnel_reporter/source:reverse_tunnel_event_types", + "//envoy/registry", + "//source/common/common:logger_lib", + "//source/common/grpc:typed_async_client_lib", + "//source/common/protobuf:utility_lib", + "@envoy_api//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client:pkg_cc_proto", + ], +) diff --git a/contrib/reverse_tunnel_reporter/source/clients/grpc_client/client.cc b/contrib/reverse_tunnel_reporter/source/clients/grpc_client/client.cc new file mode 100644 index 0000000000000..f7ad7bf77a7ac --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/clients/grpc_client/client.cc @@ -0,0 +1,284 @@ +#include "contrib/reverse_tunnel_reporter/source/clients/grpc_client/client.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +GrpcClientConfig::GrpcClientConfig(const GrpcConfigProto& proto_config) + : stat_prefix(PROTOBUF_GET_STRING_OR_DEFAULT(proto_config, stat_prefix, + "reverse_tunnel_reporter_client.grpc_client")), + cluster(proto_config.cluster()), + send_interval(PROTOBUF_GET_MS_OR_DEFAULT(proto_config, default_send_interval, 5000)), + connect_retry_interval( + PROTOBUF_GET_MS_OR_DEFAULT(proto_config, connect_retry_interval, 5000)), + max_retries(proto_config.max_retries() ? proto_config.max_retries() : 5), + max_buffer(proto_config.max_buffer() ? proto_config.max_buffer() : 1000000) {} + +GrpcClient::GrpcClient(Server::Configuration::ServerFactoryContext& context, + const GrpcConfigProto& config) + : context_{context}, config_{config}, + service_method_{*Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.extensions.reverse_tunnel_reporters.v3alpha.clients.grpc_client" + ".ReverseTunnelReportingService.StreamReverseTunnels")}, + stats_(context_, config_.stat_prefix, config_.cluster) { + ENVOY_LOG(info, + "GrpcClient: constructed: cluster={}, send_interval={}ms, retry_interval={}ms, " + "max_retries={}, max_buffer={}", + config_.cluster, config_.send_interval.count(), config_.connect_retry_interval.count(), + config_.max_retries, config_.max_buffer); + + send_timer_ = context.mainThreadDispatcher().createTimer([this]() { send(false); }); + + retry_timer_ = context.mainThreadDispatcher().createTimer([this]() { connect(); }); + + stats_.send_interval_gauge_.set(config_.send_interval.count()); +} + +void GrpcClient::onServerInitialized(ReverseTunnelReporterWithState* reporter) { + reporter_ = reporter; + + envoy::config::core::v3::GrpcService grpc_service; + grpc_service.mutable_envoy_grpc()->set_cluster_name(config_.cluster); + + auto thread_local_cluster = context_.clusterManager().getThreadLocalCluster(config_.cluster); + if (!thread_local_cluster) { + ENVOY_LOG(error, "GrpcClient: cluster '{}' not found, cannot initialize", config_.cluster); + return; + } + + auto result = context_.clusterManager().grpcAsyncClientManager().getOrCreateRawAsyncClient( + grpc_service, thread_local_cluster->info()->statsScope(), false); + if (!result.ok()) { + ENVOY_LOG(error, "GrpcClient: failed to create gRPC async client: {}", + result.status().message()); + return; + } + + async_client_ = Grpc::AsyncClient(result.value()); + ENVOY_LOG(info, "GrpcClient: initialized: cluster={}", config_.cluster); + + initialized_ = true; + connect(); +} + +void GrpcClient::receiveEvents(ReverseTunnelEvent::BatchedEvents events) { + // Either we errored out of the initialized -> prevent infinite growth. + // Or the onServerInitialized has not been called yet -> no worries we will do a full push on + // connect. + if (!initialized_) { + ENVOY_LOG(debug, "GrpcClient: not initialized, cannot receive events"); + return; + } + + if ((events.size() + queued_events_.size()) > config_.max_buffer) { + ENVOY_LOG(error, + "GrpcClient: buffer overflow: cluster={}, queued={}, incoming={}, max_buffer={}", + config_.cluster, queued_events_.size(), events.size(), config_.max_buffer); + + stats_.events_dropped_counter_.add(events.size()); + + // Only disconnect if the stream is alive. If already disconnected, calling disconnect() + // would re-arm the retry timer, delaying the reconnect that is already scheduled. + if (stream_ != nullptr) { + stats_ + .getCounter(stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::ResourceExhausted, + GrpcDisconnectionReason::DisconnectReason::BUFFER_OVERFLOW)) + .inc(); + disconnect(); + } + return; + } + + const auto incoming_conns = events.connections.size(); + const auto incoming_disconns = events.disconnections.size(); + stats_.queued_events_counter_.add(events.size()); + queued_events_ += std::move(events); + ENVOY_LOG(debug, "GrpcClient: enqueued: cluster={}, +={}, -={}, queued_now={}", config_.cluster, + incoming_conns, incoming_disconns, queued_events_.size()); +} + +void GrpcClient::onReceiveMessage(Grpc::ResponsePtr&& message) { + const auto resp_nonce = message->request_nonce(); + + if (message->has_error_detail()) { + ENVOY_LOG(error, "GrpcClient: NACK: cluster={}, nonce={}", config_.cluster, resp_nonce); + + stats_ + .getCounter(stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::Aborted, + GrpcDisconnectionReason::DisconnectReason::NACK_RECEIVED)) + .inc(); + return disconnect(); + } + + // A server cannot ACK a nonce we never sent. If this fires the server has a bug. + ASSERT( + resp_nonce <= nonce_current_, + fmt::format("server acked nonce {} but we only sent up to {}", resp_nonce, nonce_current_)); + + // Valid ACK: must be newer than the last acked watermark and within what we've sent. + if (resp_nonce > nonce_acked_ && resp_nonce <= nonce_current_) { + stats_.acks_received_counter_.inc(); + nonce_acked_ = resp_nonce; + stats_.nonce_acked_gauge_.set(nonce_acked_); + + // The server may dynamically adjust our send cadence via report_interval in each ACK. + // We floor it at kMinSendInterval to prevent tight send loops. + auto new_interval = std::chrono::milliseconds( + PROTOBUF_GET_MS_OR_DEFAULT(*message, report_interval, config_.send_interval.count())); + config_.send_interval = std::max(new_interval, kMinSendInterval); + stats_.send_interval_gauge_.set(config_.send_interval.count()); + + ENVOY_LOG(debug, "GrpcClient: ACK: cluster={}, nonce={}", config_.cluster, resp_nonce); + } else { + ENVOY_LOG(debug, "GrpcClient: out-of-order ACK: cluster={}, nonce={}, expected_range=[{}, {}]", + config_.cluster, resp_nonce, nonce_acked_ + 1, nonce_current_); + stats_.out_of_order_acks_counter_.inc(); + } +} + +void GrpcClient::onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) { + // Even a graceful close is unexpected — the server should keep the stream open indefinitely. + if (status == Grpc::Status::WellKnownGrpcStatus::Ok) { + ENVOY_LOG(error, "GrpcClient: remote close (ok): cluster={}, message={}", config_.cluster, + message); + } else { + ENVOY_LOG(error, "GrpcClient: remote close: cluster={}, status={}, message={}", config_.cluster, + status, message); + } + + stats_ + .getCounter(stats_.disconnects_, + stats_.getTags(status, GrpcDisconnectionReason::DisconnectReason::REMOTE_CLOSE)) + .inc(); + disconnect(); +} + +void GrpcClient::connect() { + ENVOY_LOG(info, "GrpcClient: connecting: cluster={}", config_.cluster); + + stream_ = async_client_.start(service_method_, *this, Http::AsyncClient::StreamOptions()); + if (stream_ == nullptr) { + ENVOY_LOG(error, "GrpcClient: stream creation failed: cluster={}", config_.cluster); + + stats_ + .getCounter( + stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::Internal, + GrpcDisconnectionReason::DisconnectReason::STREAM_CREATION_FAILED)) + .inc(); + return disconnect(); + } + + // New stream, new nonce epoch. Stale nonces from the previous stream are meaningless. + nonce_acked_ = nonce_current_ = 0; + stats_.nonce_current_gauge_.set(0); + stats_.nonce_acked_gauge_.set(0); + + stats_.connection_attempts_counter_.inc(); + ENVOY_LOG(info, "GrpcClient: connected: cluster={}", config_.cluster); + send(true); +} + +void GrpcClient::disconnect() { + if (stream_ != nullptr) { + stream_.resetStream(); + stream_ = nullptr; + } + + // Stop the send loop — no point sending on a dead stream. The retry timer will reconnect. + send_timer_->disableTimer(); + setTimer(retry_timer_, config_.connect_retry_interval); + ENVOY_LOG(debug, "GrpcClient: disconnect, scheduled reconnect: cluster={}, retry_in_ms={}", + config_.cluster, config_.connect_retry_interval.count()); +} + +void GrpcClient::send(bool full_push) { + ASSERT(stream_ != nullptr); + // Too many in-flight unacked messages — the server is likely dead or stuck. Disconnect + // and let the retry timer establish a fresh stream. + if ((nonce_current_ - nonce_acked_) > config_.max_retries) { + ENVOY_LOG(error, "GrpcClient: too many unacked requests: cluster={}, nonce={}", config_.cluster, + nonce_current_); + + stats_ + .getCounter(stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded, + GrpcDisconnectionReason::DisconnectReason::MAX_RETRIES_EXCEEDED)) + .inc(); + return disconnect(); + } + + ENVOY_LOG(debug, "GrpcClient: sending: cluster={}, full_push={}, queued_now={}", config_.cluster, + full_push, queued_events_.size()); + stats_.send_attempts_counter_.inc(); + stream_.sendMessage(constructMessage(full_push), false); + + setTimer(send_timer_, config_.send_interval); +} + +StreamTunnelsReq GrpcClient::constructMessage(bool full_push) { + // Full push replaces the pending diff queue with a complete snapshot from the reporter. + // Any queued diffs are stale at this point because the full snapshot supersedes them. + if (full_push) { + stats_.events_dropped_counter_.add(queued_events_.size()); + queued_events_ = ReverseTunnelEvent::BatchedEvents{{reporter_->getAllConnections()}, {}}; + stats_.queued_events_counter_.add(queued_events_.size()); + ENVOY_LOG(info, "GrpcClient: full_push queued: cluster={}, queued_now={}", config_.cluster, + queued_events_.size()); + } + + StreamTunnelsReq message; + + auto* node = message.mutable_node(); + node->set_id(context_.localInfo().nodeName()); + node->set_cluster(context_.localInfo().clusterName()); + + auto* added_tunnels = message.mutable_added_tunnels(); + for (auto& conn : queued_events_.connections) { + auto* new_tunnel = added_tunnels->Add(); + new_tunnel->set_name(ReverseTunnelEvent::getName(conn->node_id, conn->cluster_id)); + + auto* tunnel_info = new_tunnel->mutable_tunnel_info(); + TimestampUtil::systemClockToTimestamp(conn->created_at, *tunnel_info->mutable_created_at()); + + auto* tunnel_id = tunnel_info->mutable_identity(); + tunnel_id->set_tenant_id(conn->tenant_id); + tunnel_id->set_cluster_id(conn->cluster_id); + tunnel_id->set_node_id(conn->node_id); + } + + auto* removed_tunnels = message.mutable_removed_tunnel_names(); + for (auto& disconn : queued_events_.disconnections) { + *removed_tunnels->Add() = disconn->name; + } + + message.set_full_push(full_push); + message.set_nonce(++nonce_current_); + stats_.nonce_current_gauge_.set(nonce_current_); + + ENVOY_LOG(debug, + "GrpcClient: built request: cluster={}, full_push={}, add={}, remove={}, nonce={}", + config_.cluster, full_push, queued_events_.connections.size(), + queued_events_.disconnections.size(), message.nonce()); + + stats_.sent_accepted_cnt_counter_.add(queued_events_.connections.size()); + stats_.sent_removed_cnt_counter_.add(queued_events_.disconnections.size()); + queued_events_.clear(); + + return message; +} + +void GrpcClient::setTimer(Event::TimerPtr& timer, const std::chrono::milliseconds& ms) { + if (timer->enabled()) + timer->disableTimer(); + + timer->enableTimer(ms); +} + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/source/clients/grpc_client/client.h b/contrib/reverse_tunnel_reporter/source/clients/grpc_client/client.h new file mode 100644 index 0000000000000..5dccd9c0a1785 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/clients/grpc_client/client.h @@ -0,0 +1,233 @@ +#pragma once + +#include +#include +#include + +#include "envoy/event/timer.h" +#include "envoy/grpc/async_client.h" + +#include "source/common/common/logger.h" +#include "source/common/grpc/typed_async_client.h" +#include "source/common/protobuf/utility.h" +#include "source/common/stats/symbol_table.h" + +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/grpc_client.pb.h" +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/stream_reverse_tunnels.pb.h" +#include "contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +namespace GrpcDisconnectionReason { + +#define ITEMS(X) \ + X(BUFFER_OVERFLOW, buffer_overflow) \ + X(MAX_RETRIES_EXCEEDED, max_retries_exceeded) \ + X(NACK_RECEIVED, nack_received) \ + X(REMOTE_CLOSE, remote_close) \ + X(STREAM_CREATION_FAILED, stream_creation_failed) + +#define ENUM_DECLARE(name, str) name, +enum class DisconnectReason { ITEMS(ENUM_DECLARE) COUNT }; +#undef ENUM_DECLARE + +#define ENUM_STRING(name, str) #str, +constexpr std::array(DisconnectReason::COUNT)> + DisconnectReasonStrings = {ITEMS(ENUM_STRING)}; +#undef ENUM_STRING + +constexpr absl::string_view toString(DisconnectReason r) { + return DisconnectReasonStrings[static_cast(r)]; +} + +} // namespace GrpcDisconnectionReason + +using GrpcConfigProto = + envoy::extensions::reverse_tunnel_reporters::v3alpha::clients::grpc_client::GrpcClientConfig; +using StreamTunnelsReq = envoy::extensions::reverse_tunnel_reporters::v3alpha::clients:: + grpc_client::StreamReverseTunnelsRequest; +using StreamTunnelsResp = envoy::extensions::reverse_tunnel_reporters::v3alpha::clients:: + grpc_client::StreamReverseTunnelsResponse; + +// Floor for the server-adjustable send interval to prevent tight loops. +static constexpr std::chrono::milliseconds kMinSendInterval{25}; + +/// Parsed and validated configuration from the GrpcClientConfig proto, with defaults applied. +struct GrpcClientConfig { + std::string stat_prefix; + std::string cluster; + std::chrono::milliseconds send_interval; + std::chrono::milliseconds connect_retry_interval; + uint32_t max_retries; + uint32_t max_buffer; + + explicit GrpcClientConfig(const GrpcConfigProto& config); +}; + +/// Bidirectional gRPC streaming client that reports reverse-tunnel connection +/// state to a remote ReverseTunnelReportingService. +/// +/// Protocol: +/// - On connect (and every reconnect) the client does a full push of all +/// known connections obtained from the reporter. +/// - Between connects the client sends incremental diffs (new connections / +/// removals) on a periodic send timer. +/// - Each request carries an incrementing nonce. The server ACKs by echoing +/// the nonce; a NACK carries an error_detail. If too many nonces remain +/// unacked the client disconnects and reconnects. +/// - The server may adjust the send interval via report_interval in its ACK; +/// the client clamps it to kMinSendInterval to prevent tight loops. +/// +/// Lifecycle: constructed by GrpcClientFactory, initialized via +/// onServerInitialized() which creates the gRPC channel and opens the first +/// stream and starts the send cycle. Empty messages are also sent (considered as heartbeat). +class GrpcClient : public ReverseTunnelReporterClient, + public Logger::Loggable, + public Grpc::AsyncStreamCallbacks { +public: + GrpcClient(Server::Configuration::ServerFactoryContext& context, const GrpcConfigProto& config); + + // ReverseTunnelReporterClient overrides + void onServerInitialized(ReverseTunnelReporterWithState* reporter) override; + + void receiveEvents(ReverseTunnelEvent::BatchedEvents events) override; + + // RawAsyncStreamCallbacks overrides + void onCreateInitialMetadata(Http::RequestHeaderMap&) override {} + + void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {} + + void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {} + + void onReceiveMessage(Grpc::ResponsePtr&& message) override; + + void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override; + +private: + // Actions + void connect(); + + void disconnect(); + + void send(bool full_push); + + // Helpers + StreamTunnelsReq constructMessage(bool full_push); + + void setTimer(Event::TimerPtr& timer, const std::chrono::milliseconds& ms); + + // config + Server::Configuration::ServerFactoryContext& context_; + GrpcClientConfig config_; + ReverseTunnelReporterWithState* reporter_{nullptr}; + + // State management + ReverseTunnelEvent::BatchedEvents queued_events_; + int64_t nonce_current_{0}; // Monotonically increasing, bumped on every sendMessage. + int64_t nonce_acked_{0}; // High watermark of server-acknowledged nonces. + // Guards against processing events when onServerInitialized() failed (cluster not + // found, client creation error). Without this the client silently queues to nowhere. + bool initialized_{false}; + + // grpc client and stream requirements + Grpc::AsyncClient async_client_; + Grpc::AsyncStream stream_{}; + const Protobuf::MethodDescriptor& service_method_; + + // timers + Event::TimerPtr retry_timer_; + Event::TimerPtr send_timer_; + + struct GrpcClientStats { + GrpcClientStats(Server::Configuration::ServerFactoryContext& context, + const std::string& stat_prefix, const std::string& cluster_name) + : context_{context}, stat_name_pool_(context.scope().symbolTable()), + stat_prefix_(stat_name_pool_.add(stat_prefix)), + + disconnects_(stat_name_pool_.add("disconnects")), + + status_code_(stat_name_pool_.add("status_code")), + disconnect_reason_(stat_name_pool_.add("disconnect_reason")), + cluster_label_(stat_name_pool_.add("cluster")), + cluster_value_(stat_name_pool_.add(cluster_name)), + + connection_attempts_counter_( + getCounter(stat_name_pool_.add("connection_attempts"), getTags())), + acks_received_counter_(getCounter(stat_name_pool_.add("acks_received"), getTags())), + send_attempts_counter_(getCounter(stat_name_pool_.add("send_attempts"), getTags())), + sent_accepted_cnt_counter_( + getCounter(stat_name_pool_.add("sent_accepted_cnt"), getTags())), + sent_removed_cnt_counter_(getCounter(stat_name_pool_.add("sent_removed_cnt"), getTags())), + events_dropped_counter_(getCounter(stat_name_pool_.add("events_dropped"), getTags())), + queued_events_counter_(getCounter(stat_name_pool_.add("queued_events"), getTags())), + out_of_order_acks_counter_( + getCounter(stat_name_pool_.add("out_of_order_acks"), getTags())), + + send_interval_gauge_(getGauge(stat_name_pool_.add("send_interval"), getTags())), + nonce_current_gauge_(getGauge(stat_name_pool_.add("nonce_current"), getTags())), + nonce_acked_gauge_(getGauge(stat_name_pool_.add("nonce_acked"), getTags())) {} + + Stats::StatNameTagVector getTags() { + return Stats::StatNameTagVector{ + {cluster_label_, cluster_value_}, + }; + } + + Stats::StatNameTagVector getTags(Grpc::Status::GrpcStatus status, + GrpcDisconnectionReason::DisconnectReason reason) { + Stats::StatName status_value = stat_name_pool_.add(std::to_string(status)); + Stats::StatName reason_value = stat_name_pool_.add(GrpcDisconnectionReason::toString(reason)); + + return Stats::StatNameTagVector{ + {cluster_label_, cluster_value_}, + {status_code_, status_value}, + {disconnect_reason_, reason_value}, + }; + } + + Stats::Counter& getCounter(const Stats::StatName& name, Stats::StatNameTagVector&& tags) { + return Stats::Utility::counterFromStatNames(context_.scope(), {stat_prefix_, name}, tags); + } + + Stats::Gauge& getGauge(const Stats::StatName& name, Stats::StatNameTagVector&& tags) { + return Stats::Utility::gaugeFromStatNames(context_.scope(), {stat_prefix_, name}, + Stats::Gauge::ImportMode::NeverImport, tags); + } + + Server::Configuration::ServerFactoryContext& context_; + Stats::StatNamePool stat_name_pool_; + const Stats::StatName stat_prefix_; + + const Stats::StatName disconnects_; + + const Stats::StatName status_code_; + const Stats::StatName disconnect_reason_; + const Stats::StatName cluster_label_; + const Stats::StatName cluster_value_; + + Stats::Counter& connection_attempts_counter_; + Stats::Counter& acks_received_counter_; + Stats::Counter& send_attempts_counter_; + Stats::Counter& sent_accepted_cnt_counter_; + Stats::Counter& sent_removed_cnt_counter_; + Stats::Counter& events_dropped_counter_; + Stats::Counter& queued_events_counter_; + Stats::Counter& out_of_order_acks_counter_; + + Stats::Gauge& send_interval_gauge_; + Stats::Gauge& nonce_current_gauge_; + Stats::Gauge& nonce_acked_gauge_; + }; + + GrpcClientStats stats_; + + friend class GrpcClientTest; +}; + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/source/clients/grpc_client/factory.cc b/contrib/reverse_tunnel_reporter/source/clients/grpc_client/factory.cc new file mode 100644 index 0000000000000..440ac5ac68da9 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/clients/grpc_client/factory.cc @@ -0,0 +1,36 @@ +#include "contrib/reverse_tunnel_reporter/source/clients/grpc_client/factory.h" + +#include "envoy/registry/registry.h" + +#include "source/common/protobuf/utility.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +ReverseTunnelReporterClientPtr +GrpcClientFactory::createClient(Server::Configuration::ServerFactoryContext& context, + const Protobuf::Message& config) { + const auto& grpc_client_config = + MessageUtil::downcastAndValidate( + config, context.messageValidationVisitor()); + return std::make_unique(context, grpc_client_config); +} + +std::string GrpcClientFactory::name() const { + return "envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.clients.grpc_client"; +} + +ProtobufTypes::MessagePtr GrpcClientFactory::createEmptyConfigProto() { + return std::make_unique(); +} + +REGISTER_FACTORY(GrpcClientFactory, ReverseTunnelReporterClientFactory); + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/source/clients/grpc_client/factory.h b/contrib/reverse_tunnel_reporter/source/clients/grpc_client/factory.h new file mode 100644 index 0000000000000..765fc5a2d95a7 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/clients/grpc_client/factory.h @@ -0,0 +1,27 @@ +#pragma once + +#include "source/common/protobuf/utility.h" + +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/grpc_client.pb.h" +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/grpc_client.pb.validate.h" +#include "contrib/reverse_tunnel_reporter/source/clients/grpc_client/client.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +class GrpcClientFactory : public ReverseTunnelReporterClientFactory { +public: + ReverseTunnelReporterClientPtr createClient(Server::Configuration::ServerFactoryContext& context, + const Protobuf::Message& config) override; + + std::string name() const override; + + ProtobufTypes::MessagePtr createEmptyConfigProto() override; +}; + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/source/reporters/BUILD b/contrib/reverse_tunnel_reporter/source/reporters/BUILD new file mode 100644 index 0000000000000..399bb153885c2 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reporters/BUILD @@ -0,0 +1,16 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_library( + name = "reporters_lib", + deps = [ + "//contrib/reverse_tunnel_reporter/source/reporters/event_reporter:event_reporter_lib", + ], +) diff --git a/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/BUILD b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/BUILD new file mode 100644 index 0000000000000..ace33870375d9 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/BUILD @@ -0,0 +1,30 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_library( + name = "event_reporter_lib", + srcs = [ + "factory.cc", + "reporter.cc", + ], + hdrs = [ + "factory.h", + "reporter.h", + ], + deps = [ + "//contrib/reverse_tunnel_reporter/source:reverse_tunnel_event_types", + "//envoy/extensions/bootstrap/reverse_tunnel:reverse_tunnel_reporter_lib", + "//envoy/registry", + "//source/common/common:logger_lib", + "//source/common/config:utility_lib", + "//source/common/protobuf:utility_lib", + "@envoy_api//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters:pkg_cc_proto", + ], +) diff --git a/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.cc b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.cc new file mode 100644 index 0000000000000..d4f86fe296d64 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.cc @@ -0,0 +1,58 @@ +#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.h" + +#include "envoy/registry/registry.h" + +#include "source/common/config/utility.h" +#include "source/common/protobuf/utility.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +ReverseTunnelReporterPtr +EventReporterFactory::createReporter(Server::Configuration::ServerFactoryContext& context, + ProtobufTypes::MessagePtr config) { + const auto& reporter_config = MessageUtil::downcastAndValidate( + *config, context.messageValidationVisitor()); + + std::vector clients; + clients.reserve(reporter_config.clients().size()); + for (const auto& client_config : reporter_config.clients()) { + clients.push_back(createClient(context, client_config)); + } + return std::make_unique(context, reporter_config, std::move(clients)); +} + +std::string EventReporterFactory::name() const { + return "envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.reporters.event_" + "reporter"; +} + +ProtobufTypes::MessagePtr EventReporterFactory::createEmptyConfigProto() { + return std::make_unique(); +} + +ReverseTunnelReporterClientPtr +EventReporterFactory::createClient(Server::Configuration::ServerFactoryContext& context, + const ClientConfigProto& client_config) { + auto* factory = + Config::Utility::getFactoryByName(client_config.name()); + if (!factory) { + throw EnvoyException( + fmt::format("Unknown Reporter Client Factory: '{}'. " + "Make sure it is registered as a ReverseTunnelReporterClientFactory.", + client_config.name())); + } + + auto typed_config = Config::Utility::translateAnyToFactoryConfig( + client_config.typed_config(), context.messageValidationVisitor(), *factory); + return factory->createClient(context, *typed_config); +} + +REGISTER_FACTORY(EventReporterFactory, ReverseTunnelReporterFactory); + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.h b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.h new file mode 100644 index 0000000000000..14f272506c360 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.h @@ -0,0 +1,37 @@ +#pragma once + +#include "envoy/extensions/bootstrap/reverse_tunnel/reverse_tunnel_reporter.h" + +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.pb.h" +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.pb.validate.h" +#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h" +#include "contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +/// Factory that builds an EventReporter from its proto config, dynamically +/// resolving each child ReverseTunnelReporterClient by name. +class EventReporterFactory : public ReverseTunnelReporterFactory { +public: + ReverseTunnelReporterPtr createReporter(Server::Configuration::ServerFactoryContext& context, + ProtobufTypes::MessagePtr config) override; + std::string name() const override; + ProtobufTypes::MessagePtr createEmptyConfigProto() override; + +private: + using ConfigProto = + envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::EventReporterConfig; + using ClientConfigProto = envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters:: + ReverseConnectionReporterClient; + + ReverseTunnelReporterClientPtr createClient(Server::Configuration::ServerFactoryContext& context, + const ClientConfigProto& client_config); +}; + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.cc b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.cc new file mode 100644 index 0000000000000..14c0a0823e761 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.cc @@ -0,0 +1,125 @@ +#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h" + +#include "source/common/protobuf/utility.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +EventReporter::EventReporter(Server::Configuration::ServerFactoryContext& context, + const ConfigProto& config, + std::vector&& clients) + : context_{context}, clients_{std::move(clients)}, + stats_(generateStats( + PROTOBUF_GET_STRING_OR_DEFAULT(config, stat_prefix, "reverse_tunnel_reporter"), + context.scope())) { + ENVOY_LOG(info, "EventReporter: Constructed with {} clients", clients_.size()); +} + +void EventReporter::onServerInitialized() { + ENVOY_LOG(info, "EventReporter: Initialized"); + for (auto& client : clients_) { + client->onServerInitialized(this); + } +} + +void EventReporter::reportConnectionEvent(absl::string_view node_id, absl::string_view cluster_id, + absl::string_view tenant_id) { + auto ptr = std::make_shared( + ReverseTunnelEvent::Connected{std::string(node_id), std::string(cluster_id), + std::string(tenant_id), Envoy::SystemTime::clock::now()}); + + context_.mainThreadDispatcher().post( + [this, ptr = std::move(ptr)]() mutable { this->addConnection(std::move(ptr)); }); +} + +void EventReporter::reportDisconnectionEvent(absl::string_view node_id, + absl::string_view cluster_id) { + std::string name = ReverseTunnelEvent::getName(node_id, cluster_id); + auto ptr = std::make_shared( + ReverseTunnelEvent::Disconnected{std::move(name)}); + + context_.mainThreadDispatcher().post( + [this, ptr = std::move(ptr)]() mutable { this->removeConnection(std::move(ptr)); }); +} + +// This is only served on the main thread so no locks needed. +ReverseTunnelEvent::SharedConnections EventReporter::getAllConnections() { + ASSERT(context_.mainThreadDispatcher().isThreadSafe()); + stats_.reverse_tunnel_full_pulls_total_.inc(); + + ReverseTunnelEvent::SharedConnections all_connections; + all_connections.reserve(connections_.size()); + + for (auto& [key, val] : connections_) { + all_connections.push_back(val.connection); + } + return all_connections; +} + +EventReporterStats EventReporter::generateStats(const std::string& prefix, Stats::Scope& scope) { + return EventReporterStats{ALL_EVENT_REPORTER_STATS(POOL_COUNTER_PREFIX(scope, prefix), + POOL_GAUGE_PREFIX(scope, prefix))}; +} + +void EventReporter::notifyClients(ReverseTunnelEvent::BatchedEvents&& batch) { + for (auto& client : clients_) { + client->receiveEvents(batch); + } +} + +void EventReporter::addConnection(std::shared_ptr&& connection) { + ASSERT(context_.mainThreadDispatcher().isThreadSafe()); + + ENVOY_LOG(info, "EventReporter: Accepted a new connection. Node: {}, Cluster: {}, Tenant: {}", + connection->node_id, connection->cluster_id, connection->tenant_id); + + std::string name = ReverseTunnelEvent::getName(connection->node_id, connection->cluster_id); + auto [it, inserted] = + connections_.try_emplace(std::move(name), ConnectionEntry{std::move(connection), 1}); + + if (inserted) { + stats_.reverse_tunnel_unique_active_.inc(); + notifyClients(ReverseTunnelEvent::BatchedEvents{{it->second.connection}, {}}); + } else { + // Multiple reverse tunnels can share the same name (same node). + // We ref-count them and only notify clients of removal when the last one disconnects. + it->second.count++; + } + + stats_.reverse_tunnel_established_total_.inc(); + stats_.reverse_tunnel_active_.inc(); +} + +void EventReporter::removeConnection( + std::shared_ptr&& disconnection) { + ASSERT(context_.mainThreadDispatcher().isThreadSafe()); + + const auto& name = disconnection->name; + auto it = connections_.find(name); + + ENVOY_LOG(info, "EventReporter: Removed connection. Name: {}", name); + + if (it == connections_.end()) { + ENVOY_LOG(warn, "EventReporter: Tried to remove a connection which doesnt exist"); + return; + } + + // Only notify removal on the last ref — see addConnection for the ref-count rationale. + if (it->second.count == 1) { + connections_.erase(it); + stats_.reverse_tunnel_unique_active_.dec(); + notifyClients(ReverseTunnelEvent::BatchedEvents{{}, {disconnection}}); + } else { + it->second.count--; + } + + stats_.reverse_tunnel_closed_total_.inc(); + stats_.reverse_tunnel_active_.dec(); +} + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h new file mode 100644 index 0000000000000..4c07f661acf80 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h @@ -0,0 +1,66 @@ +#pragma once + +#include + +#include "envoy/stats/stats_macros.h" + +#include "source/common/common/logger.h" + +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.pb.h" +#include "contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +#define ALL_EVENT_REPORTER_STATS(COUNTER, GAUGE) \ + COUNTER(reverse_tunnel_established_total) \ + COUNTER(reverse_tunnel_closed_total) \ + COUNTER(reverse_tunnel_full_pulls_total) \ + GAUGE(reverse_tunnel_active, Accumulate) \ + GAUGE(reverse_tunnel_unique_active, Accumulate) + +struct EventReporterStats { + ALL_EVENT_REPORTER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT) +}; + +struct ConnectionEntry { + std::shared_ptr connection; + std::size_t count; +}; + +/// Aggregates reverse-tunnel connection/disconnection events, de-duplicates by +/// name, maintains stats, and fans out batched diffs as shared ptrs to registered clients. +class EventReporter : public ReverseTunnelReporterWithState, + public Logger::Loggable { +public: + using ConfigProto = + envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::EventReporterConfig; + + EventReporter(Server::Configuration::ServerFactoryContext& context, const ConfigProto& config, + std::vector&& clients); + + void onServerInitialized() override; + void reportConnectionEvent(absl::string_view node_id, absl::string_view cluster_id, + absl::string_view tenant_id) override; + void reportDisconnectionEvent(absl::string_view node_id, absl::string_view cluster_id) override; + ReverseTunnelEvent::SharedConnections getAllConnections() override; + +private: + static EventReporterStats generateStats(const std::string& prefix, Stats::Scope& scope); + void notifyClients(ReverseTunnelEvent::BatchedEvents&& batch); + void addConnection(std::shared_ptr&& connection); + void removeConnection(std::shared_ptr&& disconnection); + + Server::Configuration::ServerFactoryContext& context_; + std::vector clients_; + EventReporterStats stats_; + // Keyed by getName(node_id, cluster_id). + absl::flat_hash_map connections_; +}; + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h b/contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h new file mode 100644 index 0000000000000..4585c55fa6f6f --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h @@ -0,0 +1,112 @@ +#pragma once + +#include +#include +#include + +#include "envoy/common/pure.h" +#include "envoy/common/time.h" +#include "envoy/config/typed_config.h" +#include "envoy/extensions/bootstrap/reverse_tunnel/reverse_tunnel_reporter.h" +#include "envoy/server/factory_context.h" + +#include "source/common/common/fmt.h" + +#include "absl/strings/str_cat.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +// The namespace holding the structs for the reverse tunnel events +namespace ReverseTunnelEvent { +// Builds the canonical connection name used as the de-duplication key in the reporter +// and as the tunnel name in the gRPC proto. +// TODO(aakugan) look into returning string_view and owning the data in the connection struct alone. +inline std::string getName(absl::string_view node_id, absl::string_view cluster_id) { + return absl::StrCat(node_id, ":", cluster_id); +} + +struct Connected { + std::string node_id; + std::string cluster_id; + std::string tenant_id; + Envoy::SystemTime created_at; +}; + +struct Disconnected { + std::string name; +}; + +using SharedConnections = std::vector>; +using SharedDisconnections = std::vector>; + +struct BatchedEvents { + SharedConnections connections; + SharedDisconnections disconnections; + + std::size_t size() const { return connections.size() + disconnections.size(); } + + void operator+=(BatchedEvents&& events) { + connections.reserve(connections.size() + events.connections.size()); + disconnections.reserve(disconnections.size() + events.disconnections.size()); + + for (auto& conn : events.connections) { + connections.push_back(std::move(conn)); + } + + for (auto& disconn : events.disconnections) { + disconnections.push_back(std::move(disconn)); + } + + events.connections.clear(); + events.disconnections.clear(); + } + + void clear() { + connections.clear(); + disconnections.clear(); + } +}; +} // namespace ReverseTunnelEvent + +// This will own the clients and expose an api for them to get the full state. +// This allows multiple clients to share data -> clients can focus on sending the data alone. +class ReverseTunnelReporterWithState : public ReverseTunnelReporter { +public: + virtual ~ReverseTunnelReporterWithState() = default; + + virtual ReverseTunnelEvent::SharedConnections getAllConnections() PURE; +}; + +using ReverseTunnelReporterWithStatePtr = std::unique_ptr; + +// This gets the ptr to the reporter for polling all the active connections. +// This also has receiveEvents to get the diff events from the reporter. +class ReverseTunnelReporterClient { +public: + virtual ~ReverseTunnelReporterClient() = default; + + virtual void onServerInitialized(ReverseTunnelReporterWithState* reporter) PURE; + + virtual void receiveEvents(ReverseTunnelEvent::BatchedEvents events) PURE; +}; + +using ReverseTunnelReporterClientPtr = std::unique_ptr; + +class ReverseTunnelReporterClientFactory : public Config::TypedFactory { +public: + virtual ReverseTunnelReporterClientPtr + createClient(Server::Configuration::ServerFactoryContext& context, + const Protobuf::Message& config) PURE; + + std::string category() const override { + return "envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.clients"; + } +}; + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/test/clients/BUILD b/contrib/reverse_tunnel_reporter/test/clients/BUILD new file mode 100644 index 0000000000000..74fff54daa5ee --- /dev/null +++ b/contrib/reverse_tunnel_reporter/test/clients/BUILD @@ -0,0 +1,71 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_test", + "envoy_cc_test_library", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_test( + name = "grpc_client_test", + srcs = ["grpc_client_test.cc"], + deps = [ + "//contrib/reverse_tunnel_reporter/source/clients/grpc_client:grpc_client_lib", + "//envoy/grpc:async_client_interface", + "//envoy/registry", + "//source/common/api:api_lib", + "//source/common/protobuf:utility_lib_header", + "//test/mocks/grpc:grpc_mocks", + "//test/mocks/local_info:local_info_mocks", + "//test/mocks/server:server_factory_context_mocks", + "//test/mocks/upstream:cluster_info_mocks", + "//test/mocks/upstream:thread_local_cluster_mocks", + "//test/test_common:registry_lib", + "//test/test_common:utility_lib", + ], +) + +envoy_cc_test_library( + name = "integration_test_utils_lib", + hdrs = ["integration_test_utils.h"], + deps = [ + "//source/common/common:fmt_lib", + "//test/config:utility_lib", + "@envoy_api//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters:pkg_cc_proto", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", + "@envoy_api//envoy/config/cluster/v3:pkg_cc_proto", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + "@envoy_api//envoy/config/listener/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/bootstrap/reverse_tunnel/downstream_socket_interface/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/bootstrap/reverse_tunnel/upstream_socket_interface/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/filters/http/router/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/filters/network/http_connection_manager/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/filters/network/reverse_tunnel/v3:pkg_cc_proto", + ], +) + +envoy_cc_test( + name = "grpc_client_integration_test", + srcs = ["grpc_client_integration_test.cc"], + deps = [ + ":integration_test_utils_lib", + "//contrib/reverse_tunnel_reporter/source:config", + "//source/common/thread_local:thread_local_lib", + "//source/extensions/bootstrap/reverse_tunnel/downstream_socket_interface:reverse_connection_resolver_lib", + "//source/extensions/bootstrap/reverse_tunnel/downstream_socket_interface:reverse_tunnel_initiator_lib", + "//source/extensions/bootstrap/reverse_tunnel/upstream_socket_interface:reverse_tunnel_acceptor_lib", + "//source/extensions/filters/http/router:config", + "//source/extensions/filters/network/http_connection_manager:config", + "//source/extensions/filters/network/reverse_tunnel:config", + "//test/integration:integration_lib", + "@com_github_grpc_grpc//:grpc++", + "@envoy_api//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client:pkg_cc_grpc", + "@envoy_api//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client:pkg_cc_proto", + "@envoy_api//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters:pkg_cc_proto", + "@envoy_api//envoy/config/listener/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/upstreams/http/v3:pkg_cc_proto", + ], +) diff --git a/contrib/reverse_tunnel_reporter/test/clients/grpc_client_integration_test.cc b/contrib/reverse_tunnel_reporter/test/clients/grpc_client_integration_test.cc new file mode 100644 index 0000000000000..abd32a6eb9f86 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/test/clients/grpc_client_integration_test.cc @@ -0,0 +1,477 @@ +#include +#include +#include + +#include "envoy/config/listener/v3/listener.pb.h" +#include "envoy/extensions/upstreams/http/v3/http_protocol_options.pb.h" + +#include "test/integration/integration.h" +#include "test/integration/utility.h" + +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/grpc_client.pb.h" +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/stream_reverse_tunnels.grpc.pb.h" +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.pb.h" +#include "contrib/reverse_tunnel_reporter/test/clients/integration_test_utils.h" +#include "grpc++/grpc++.h" +#include "grpc++/server.h" +#include "grpc++/server_builder.h" +#include "grpc++/server_context.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +constexpr absl::string_view reporterName = + "envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.reporters.event_reporter"; +constexpr absl::string_view grpcClient = + "envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.clients.grpc_client"; +constexpr std::chrono::milliseconds serverWait{5}; +constexpr std::size_t sendInterval{500}; +constexpr std::size_t maxRetries{3}; + +using envoy::extensions::reverse_tunnel_reporters::v3alpha::clients::grpc_client::GrpcClientConfig; +using envoy::extensions::reverse_tunnel_reporters::v3alpha::clients::grpc_client:: + ReverseTunnelReportingService; +using envoy::extensions::reverse_tunnel_reporters::v3alpha::clients::grpc_client:: + StreamReverseTunnelsRequest; +using envoy::extensions::reverse_tunnel_reporters::v3alpha::clients::grpc_client:: + StreamReverseTunnelsResponse; +using envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::EventReporterConfig; +using envoy::extensions::upstreams::http::v3::HttpProtocolOptions; + +class TestingService final : public ReverseTunnelReportingService::Service { + Event::Dispatcher* dispatcher_; + std::function callback_; + +public: + grpc::Status StreamReverseTunnels( + grpc::ServerContext* /*context*/, + grpc::ServerReaderWriter* stream) + override { + StreamReverseTunnelsRequest request; + int cnt{0}; + + ENVOY_LOG_MISC(error, "GrpcClientIntegrationTest: Status=Connected"); + + while (stream->Read(&request)) { + cnt++; + StreamReverseTunnelsResponse response; + response.set_request_nonce(request.nonce()); + + if (!stream->Write(response)) { + ENVOY_LOG_MISC(error, "GrpcClientIntegrationTest: Unable to send the response: {}", + response.DebugString()); + break; + } + + if (dispatcher_) { + dispatcher_->post( + [this, request = std::move(request)]() mutable { callback_(std::move(request)); }); + } + } + + ENVOY_LOG_MISC(error, "GrpcClientIntegrationTest: Stream ended, total messages: {}", cnt); + + return grpc::Status::OK; + } + + void set(Event::Dispatcher* dispatcher, + std::function& callback) { + dispatcher_ = dispatcher; + callback_ = callback; + } +}; + +struct GrpcServer { + std::unique_ptr server_; + std::thread server_thread_; + TestingService service_; + + GrpcServer(Event::Dispatcher* dispatcher, + std::function& callback) { + std::string server_address = fmt::format("{}:{}", localhost, reportingPort); + service_.set(dispatcher, callback); + + grpc::ServerBuilder builder; + + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + builder.RegisterService(&service_); + + server_ = std::unique_ptr(builder.BuildAndStart()); + + server_thread_ = std::thread([this] { server_->Wait(); }); + } + + ~GrpcServer() { + auto deadline = std::chrono::system_clock::now() + serverWait; // NO_CHECK_FORMAT(real_time) + server_->Shutdown(deadline); + server_thread_.join(); + } +}; + +class GrpcClientIntegrationTest : public testing::TestWithParam, + public BaseIntegrationTest { +public: + struct RcEvent { + std::string name; + bool connected; + + bool operator<(const RcEvent& other) const noexcept { + if (name != other.name) { + return name < other.name; + } + + return connected < other.connected; + } + + bool operator==(const RcEvent& other) const { + return name == other.name && connected == other.connected; + } + }; + + GrpcClientIntegrationTest() + : BaseIntegrationTest(GetParam(), ConfigHelper::baseConfigNoListeners()) { + callback_ = [this](StreamReverseTunnelsRequest&& req) { this->callback(std::move(req)); }; + } + + void initialize() override { + requests_.clear(); + use_lds_ = true; + + addBootstrapExtension(getUpstreamExtension(getReporterConfig()), config_helper_); + addBootstrapExtension(getDownstreamExtension(), config_helper_); + addCluster(getDownstreamCluster(), config_helper_); + addListener(getUpstreamListener(), config_helper_, current_listeners_); + + auto cluster = getUpstreamCluster(); + addCluster(getHttp2Cluster(cluster), config_helper_); + + BaseIntegrationTest::initialize(); + + current_config_ = ConfigHelper{version_, config_helper_.bootstrap()}; + } + +protected: + EventReporterConfig getReporterConfig() { + EventReporterConfig cfg; + cfg.set_stat_prefix(reporterName); + + auto* client = cfg.add_clients(); + client->set_name(grpcClient); + + GrpcClientConfig grpc_cfg; + grpc_cfg.set_stat_prefix("reverse_connection_grpc_client"); + grpc_cfg.set_cluster(upstreamCluster); + *(grpc_cfg.mutable_default_send_interval()) = + Protobuf::util::TimeUtil::MillisecondsToDuration(sendInterval); + *(grpc_cfg.mutable_connect_retry_interval()) = + Protobuf::util::TimeUtil::MillisecondsToDuration(sendInterval); + grpc_cfg.set_max_retries(maxRetries); + grpc_cfg.set_max_buffer(1'000'000); + client->mutable_typed_config()->PackFrom(grpc_cfg); + + return cfg; + } + + void updateLds(ConfigHelper& new_config) { + new_config.setLds(std::to_string(++cur_version_)); + // Wait for up to a minute for the values to propagate. + test_server_->waitForGaugeEq("listener_manager.total_listeners_active", current_listeners_, + std::chrono::seconds(60)); + current_config_ = std::move(new_config); + } + + void addListenerLds(Listener&& listener) { + ConfigHelper new_config{version_, current_config_.bootstrap()}; + addListener(std::move(listener), new_config, current_listeners_); + updateLds(new_config); + } + + void addListenerLds(std::vector&& listeners) { + ConfigHelper new_config{version_, current_config_.bootstrap()}; + + for (auto& listener : listeners) { + addListener(std::move(listener), new_config, current_listeners_); + } + + updateLds(new_config); + } + + void removeListenerLds(const std::string& name) { + ConfigHelper new_config{version_, current_config_.bootstrap()}; + removeListener(name, new_config, current_listeners_); + updateLds(new_config); + } + + void removeListenerLds(std::vector& names) { + ConfigHelper new_config{version_, current_config_.bootstrap()}; + + for (auto& name : names) { + removeListener(name, new_config, current_listeners_); + } + + updateLds(new_config); + } + + void callback(StreamReverseTunnelsRequest&& req) { requests_.push_back(std::move(req)); } + + std::vector getEvents() { + std::vector events; + + for (auto& req : requests_) { + for (auto& conn : req.added_tunnels()) { + events.push_back({conn.name(), true}); + } + + for (auto& conn : req.removed_tunnel_names()) { + events.push_back({conn, false}); + } + } + + return events; + } + + void log(std::vector& vals) { + std::string message = ""; + + for (auto& event : vals) { + message += fmt::format("Name: {}, Connected: {}", event.name, event.connected); + } + + ENVOY_LOG_MISC(error, "\nEvents:\n{}", message); + } + + void validateEqual(std::chrono::milliseconds ms, std::vector& expectations) { + timeSystem().advanceTimeWait(ms); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + + auto events = getEvents(); + std::sort(begin(events), end(events)); + std::sort(begin(expectations), end(expectations)); + + ENVOY_LOG(debug, "validateEqual: actual={}, expected={}", events.size(), expectations.size()); + bool ans = events == expectations; + + EXPECT_EQ(ans, 1); + } + + std::vector expectConnected(std::vector node_ids) { + std::vector events; + + for (auto& node_id : node_ids) { + events.push_back({fmt::format("{}:{}", node_id, downstreamCluster), true}); + } + + return events; + } + + std::vector expectDisconnected(std::vector node_ids) { + std::vector events; + + for (auto& node_id : node_ids) { + events.push_back({fmt::format("{}:{}", node_id, downstreamCluster), false}); + } + + return events; + } + + std::function callback_; + std::unique_ptr grpc_server_; + std::vector requests_; + + std::vector expectations; + + int current_listeners_{0}; + int cur_version_{0}; + + ConfigHelper current_config_{version_, config_helper_.bootstrap()}; +}; + +std::vector& +operator+=(std::vector& lhs, + const std::vector& rhs) { + lhs.insert(lhs.end(), rhs.begin(), rhs.end()); + return lhs; +} + +INSTANTIATE_TEST_SUITE_P(IpVersions, GrpcClientIntegrationTest, + testing::ValuesIn({Network::Address::IpVersion::v4}), + TestUtility::ipTestParamsToString); + +/** + * Test: HappyPath + * Tests the standard lifecycle: gRPC stream establishment, reporting new + * tunnel additions via LDS, and reporting removals when listeners are deleted. + */ +TEST_P(GrpcClientIntegrationTest, HappyPath) { + grpc_server_ = std::make_unique(dispatcher_.get(), callback_); + initialize(); + + std::vector expectations = expectConnected({"node-1", "node-2"}); + + addListenerLds(getDownstreamListener("node-1", 1)); + addListenerLds(getDownstreamListener("node-2", 1)); + + // Give envoy time to make the rc, connect to the test server, timer to fire + // and send the updates. + validateEqual(std::chrono::milliseconds(sendInterval * 3), expectations); + + removeListenerLds("node-1"); + removeListenerLds("node-2"); + + expectations += expectDisconnected({"node-1", "node-2"}); + + validateEqual(std::chrono::milliseconds(sendInterval * 3), expectations); +} + +/** + * Test: ServerReconnect + * Tests that Envoy performs a "Full State Push" upon reconnection. + * Even if tunnels didn't change, they must be re-reported on a new gRPC stream. + */ +TEST_P(GrpcClientIntegrationTest, ServerReconnect) { + grpc_server_ = std::make_unique(dispatcher_.get(), callback_); + initialize(); + + std::vector expectations = expectConnected({"node-1", "node-2"}); + + addListenerLds(getDownstreamListener("node-1", 1)); + addListenerLds(getDownstreamListener("node-2", 1)); + + validateEqual(std::chrono::milliseconds(sendInterval * 3), expectations); + + grpc_server_ = std::make_unique(dispatcher_.get(), callback_); + + expectations += expectConnected({"node-1", "node-2"}); + + // Time to disconnect, connect and then full push. + validateEqual(std::chrono::milliseconds(sendInterval * 5), expectations); + + removeListenerLds("node-1"); + removeListenerLds("node-2"); + + expectations += expectDisconnected({"node-1", "node-2"}); + + validateEqual(std::chrono::milliseconds(sendInterval * 3), expectations); +} + +/** + * Test: EventsWhenServerIsDead + * Tests "Event Convergence." If state changes (Add/Remove) happen while the + * server is down, the client must report the final ground truth once back online. + */ +TEST_P(GrpcClientIntegrationTest, EventsWhenServerIsDead) { + grpc_server_ = std::make_unique(dispatcher_.get(), callback_); + initialize(); + + std::vector expectations = expectConnected({"node-1", "node-2"}); + + addListenerLds(getDownstreamListener("node-1", 1)); + addListenerLds(getDownstreamListener("node-2", 1)); + + validateEqual(std::chrono::milliseconds(sendInterval * 3), expectations); + + grpc_server_ = nullptr; + + addListenerLds(getDownstreamListener("node-3", 1)); + removeListenerLds("node-1"); + + expectations += expectConnected({"node-2", "node-3"}); + + // Wait for the connections to establish and drain + timeSystem().advanceTimeWait(std::chrono::milliseconds(sendInterval * 5)); + + grpc_server_ = std::make_unique(dispatcher_.get(), callback_); + validateEqual(std::chrono::milliseconds(sendInterval * 5), expectations); + + removeListenerLds("node-2"); + removeListenerLds("node-3"); + + expectations += expectDisconnected({"node-2", "node-3"}); + + validateEqual(std::chrono::milliseconds(sendInterval * 3), expectations); +} + +/** + * Test: ServerDead + * Tests the gRPC client's retry engine. It verifies that the client + * continues to attempt connections based on 'connect_retry_interval' forever. + * We have limited to max_retries + 2 as we had to stop somewhere. + */ +TEST_P(GrpcClientIntegrationTest, ServerDead) { + initialize(); + // First one on server init. + test_server_->waitForCounterEq( + "reverse_connection_grpc_client.connection_attempts.cluster.upstreamCluster", 1); + + for (std::size_t i = 0; i <= maxRetries; i++) { + // Wait for the timer to fire first. + timeSystem().advanceTimeWait(std::chrono::milliseconds(sendInterval)); + test_server_->waitForCounterEq( + "reverse_connection_grpc_client.connection_attempts.cluster.upstreamCluster", i + 2); + } +} + +/** + * Test: ServerLate + * Tests the "Catch-up" scenario. Verifies that if tunnels exist before the + * reporting service is reachable, the client pushes the state immediately on connection. + */ +TEST_P(GrpcClientIntegrationTest, ServerLate) { + initialize(); + + addListenerLds(getDownstreamListener("node-1", 1)); + addListenerLds(getDownstreamListener("node-2", 1)); + + std::vector expectations = expectConnected({"node-1", "node-2"}); + + timeSystem().advanceTimeWait(std::chrono::milliseconds(sendInterval * 3)); + + grpc_server_ = std::make_unique(dispatcher_.get(), callback_); + validateEqual(std::chrono::milliseconds(sendInterval * 3), expectations); + + removeListenerLds("node-1"); + removeListenerLds("node-2"); + + expectations += expectDisconnected({"node-1", "node-2"}); + + validateEqual(std::chrono::milliseconds(sendInterval * 3), expectations); +} + +/// Only run the load test in optimized builds. +#if defined(NDEBUG) +TEST_P(GrpcClientIntegrationTest, LoadTest) { + grpc_server_ = std::make_unique(dispatcher_.get(), callback_); + initialize(); + + // Limited to 1000 to reduce test flakiness. + int sz = 1000; + std::vector nodes(sz); + for (int i = 0; i < sz; i++) { + nodes[i] = fmt::format("node-{}", i); + } + + std::vector expectations = expectConnected(nodes); + + std::vector listeners(sz); + for (int i = 0; i < sz; i++) { + listeners[i] = getDownstreamListener(nodes[i], 1); + } + + addListenerLds(std::move(listeners)); + // Pure overhead of running the client should be minimal. + validateEqual(std::chrono::milliseconds(sendInterval * 5), expectations); + + expectations += expectDisconnected(nodes); + removeListenerLds(nodes); + // Allow more time for the discovery of listener removal and then propogation. + validateEqual(std::chrono::milliseconds(sendInterval * 10), expectations); +} +#endif // defined(NDEBUG) + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/test/clients/grpc_client_test.cc b/contrib/reverse_tunnel_reporter/test/clients/grpc_client_test.cc new file mode 100644 index 0000000000000..47843ec64a47a --- /dev/null +++ b/contrib/reverse_tunnel_reporter/test/clients/grpc_client_test.cc @@ -0,0 +1,835 @@ +#include + +#include "envoy/grpc/async_client.h" +#include "envoy/registry/registry.h" + +#include "source/common/grpc/common.h" +#include "source/common/protobuf/utility.h" + +#include "test/mocks/grpc/mocks.h" +#include "test/mocks/local_info/mocks.h" +#include "test/mocks/server/server_factory_context.h" +#include "test/mocks/upstream/cluster_info.h" +#include "test/mocks/upstream/thread_local_cluster.h" +#include "test/test_common/utility.h" + +#include "contrib/reverse_tunnel_reporter/source/clients/grpc_client/client.h" +#include "contrib/reverse_tunnel_reporter/source/clients/grpc_client/factory.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::Invoke; +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +class MockReverseTunnelReporter : public ReverseTunnelReporterWithState { +public: + MOCK_METHOD(void, onServerInitialized, (), (override)); + MOCK_METHOD(void, reportConnectionEvent, + (absl::string_view, absl::string_view, absl::string_view), (override)); + MOCK_METHOD(void, reportDisconnectionEvent, (absl::string_view, absl::string_view), (override)); + MOCK_METHOD(ReverseTunnelEvent::SharedConnections, getAllConnections, (), (override)); +}; + +GrpcConfigProto mock_config() { + GrpcConfigProto config_; + + config_.set_cluster("test_cluster"); + config_.mutable_default_send_interval()->set_seconds(2); + config_.mutable_connect_retry_interval()->set_seconds(2); + config_.set_max_retries(2); + config_.set_max_buffer(1000); + config_.set_stat_prefix("test.grpc_client"); + + return config_; +} + +ReverseTunnelEvent::SharedConnections make_connections(std::vector node_ids) { + ReverseTunnelEvent::SharedConnections connections; + std::string cluster = "test_cluster"; + std::string tenant = "test_tenant"; + + for (const auto& node : node_ids) { + auto conn = std::make_shared(ReverseTunnelEvent::Connected{ + node, cluster, tenant, std::chrono::system_clock::time_point(std::chrono::seconds(1))}); + connections.push_back(std::move(conn)); + } + + return connections; +} + +ReverseTunnelEvent::SharedDisconnections make_disconnections(std::vector node_ids) { + std::string cluster = "test_cluster"; + ReverseTunnelEvent::SharedDisconnections disconnections; + + for (const auto& node : node_ids) { + auto disconn = std::make_shared( + ReverseTunnelEvent::Disconnected{ReverseTunnelEvent::getName(node, cluster)}); + disconnections.push_back(std::move(disconn)); + } + + return disconnections; +} + +StreamTunnelsResp validate_req(Buffer::InstancePtr& request, + const ReverseTunnelEvent::BatchedEvents& actual, bool full_push) { + StreamTunnelsReq req; + bool success = Grpc::Common::parseBufferInstance(std::move(request), req); + EXPECT_EQ(success, true); + + EXPECT_EQ(req.added_tunnels_size(), actual.connections.size()); + EXPECT_EQ(req.removed_tunnel_names_size(), actual.disconnections.size()); + + for (std::size_t i = 0; i < actual.connections.size(); i++) + EXPECT_EQ(actual.connections[i]->node_id, + req.added_tunnels(i).tunnel_info().identity().node_id()); + + for (std::size_t i = 0; i < actual.disconnections.size(); i++) + EXPECT_EQ(actual.disconnections[i]->name, req.removed_tunnel_names(i)); + + EXPECT_EQ(full_push, req.full_push()); + + StreamTunnelsResp resp; + resp.set_request_nonce(req.nonce()); + + return resp; +} + +Protobuf::Duration getHalfDuration(const Protobuf::Duration& dur) { + return Protobuf::util::TimeUtil::MillisecondsToDuration( + DurationUtil::durationToMilliseconds(dur) / 2); +} + +class GrpcClientTest : public testing::Test { +public: + GrpcClientTest() {} + + void SetUp() override { + api_ = Api::createApiForTest(time_system_); + dispatcher_ = api_->allocateDispatcher("test_thread"); + + ON_CALL(context_, mainThreadDispatcher()).WillByDefault(ReturnRef(*dispatcher_)); + ON_CALL(context_, scope()).WillByDefault(ReturnRef(*stats_store_.rootScope())); + ON_CALL(context_, clusterManager()).WillByDefault(ReturnRef(cm_)); + + ON_CALL(context_, localInfo()).WillByDefault(ReturnRef(local_info_)); + ON_CALL(local_info_, nodeName()).WillByDefault(ReturnRef(node_id)); + ON_CALL(local_info_, clusterName()).WillByDefault(ReturnRef(cluster_id)); + + ON_CALL(cm_, getThreadLocalCluster(_)).WillByDefault(Return(&thread_local_cluster_)); + ON_CALL(thread_local_cluster_, info()).WillByDefault(Return(cluster_info_)); + ON_CALL(*cluster_info_, statsScope()).WillByDefault(ReturnRef(*stats_store_.rootScope())); + + ON_CALL(cm_, grpcAsyncClientManager()).WillByDefault(ReturnRef(manager_)); + ON_CALL(manager_, getOrCreateRawAsyncClient(_, _, _)) + .WillByDefault(Return(absl::StatusOr(async_client_))); + } + +protected: + void inc_time(const Protobuf::Duration& dur) { + time_system_.advanceTimeAsyncImpl( + std::chrono::milliseconds(DurationUtil::durationToMilliseconds(dur))); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + } + + void get_stream(int times) { + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)) + .Times(times) + .WillRepeatedly(Invoke([this](absl::string_view, absl::string_view, + Grpc::RawAsyncStreamCallbacks& callbacks, + const Http::AsyncClient::StreamOptions&) { + callbacks_ = &callbacks; + return async_stream_.get(); + })); + } + + GrpcClient::GrpcClientStats getStats() { + return GrpcClient::GrpcClientStats{context_, config_.stat_prefix(), config_.cluster()}; + } + + std::shared_ptr> async_client_{ + std::make_shared>()}; + std::unique_ptr> async_stream_{ + std::make_unique>()}; + Grpc::RawAsyncStreamCallbacks* callbacks_; + + NiceMock context_; + NiceMock cm_; + NiceMock thread_local_cluster_; + NiceMock local_info_; + NiceMock manager_; + std::shared_ptr> cluster_info_{ + std::make_shared>()}; + + Api::ApiPtr api_; + Event::SimulatedTimeSystem time_system_; + Event::DispatcherPtr dispatcher_; + Stats::IsolatedStoreImpl stats_store_; + + std::string node_id{"tunnel-v2"}; + std::string cluster_id{"tunnel-v2"}; + + GrpcConfigProto config_{mock_config()}; + NiceMock mock_reporter_; +}; + +// Check the connection behaviour on server initialization (infinite retries) +TEST_F(GrpcClientTest, RetryAttemptsOnStreamCreationFailure) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + + // The connection attempts shld not be bound by anything. + // Making it config_.max_retries + 2 for a simple check of not bound by max_retries. + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)) + .Times(config_.max_retries() + 2) + .WillRepeatedly(Return(nullptr)); + + client.onServerInitialized(&mock_reporter_); + + for (std::size_t i = 0; i < config_.max_retries() + 1; i++) { + inc_time(config_.connect_retry_interval()); + } + + // Not incremented because no connection attempt was successful. + // startRaw => nullptr. + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 0); + EXPECT_EQ(stats_.send_attempts_counter_.value(), 0); + EXPECT_EQ(stats_.nonce_acked_gauge_.value(), 0); + EXPECT_EQ(stats_.nonce_current_gauge_.value(), 0); + EXPECT_EQ(stats_ + .getCounter(stats_.disconnects_, + stats_.getTags( + Grpc::Status::WellKnownGrpcStatus::Internal, + GrpcDisconnectionReason::DisconnectReason::STREAM_CREATION_FAILED)) + .value(), + config_.max_retries() + 2); +} + +// Checks the happy path -> server connect and full push. +TEST_F(GrpcClientTest, ClientSendsFullPushOnConnect) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + + ReverseTunnelEvent::BatchedEvents events{make_connections({"node_1"}), {}}; + + EXPECT_CALL(mock_reporter_, getAllConnections()).WillOnce(Return(events.connections)); + + get_stream(1); + + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .WillOnce(Invoke([&events](Buffer::InstancePtr& request, bool) { + auto resp = validate_req(request, events, true); + EXPECT_EQ(resp.request_nonce(), 1); + })); + + client.onServerInitialized(&mock_reporter_); + + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 1); + EXPECT_EQ(stats_.acks_received_counter_.value(), 0); + EXPECT_EQ(stats_.send_attempts_counter_.value(), 1); + EXPECT_EQ(stats_.events_dropped_counter_.value(), 0); + EXPECT_EQ(stats_.queued_events_counter_.value(), 1); + EXPECT_EQ(stats_.out_of_order_acks_counter_.value(), 0); + EXPECT_EQ(stats_.nonce_current_gauge_.value(), 1); + EXPECT_EQ(stats_.nonce_acked_gauge_.value(), 0); + EXPECT_EQ(stats_.send_interval_gauge_.value(), + DurationUtil::durationToMilliseconds(config_.default_send_interval())); + EXPECT_EQ(stats_.sent_accepted_cnt_counter_.value(), 1); + EXPECT_EQ(stats_.sent_removed_cnt_counter_.value(), 0); +} + +// Checks the happy path -> Server up connect and send the diff after the full push +TEST_F(GrpcClientTest, ClientSendsDiffAfterFullPush) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + int cur = 0, total = 4; + + ReverseTunnelEvent::BatchedEvents batches[] = { + ReverseTunnelEvent::BatchedEvents{make_connections({"node_1"}), {}}, + ReverseTunnelEvent::BatchedEvents{make_connections({"node_2"}), + make_disconnections({"node_1"})}, + ReverseTunnelEvent::BatchedEvents{make_connections({"node_3", "node_4"}), {}}, + ReverseTunnelEvent::BatchedEvents{make_connections({"node_5"}), + make_disconnections({"node_3"})}}; + + get_stream(1); + + EXPECT_CALL(mock_reporter_, getAllConnections()).WillOnce(Invoke([&batches]() { + return batches[0].connections; + })); + + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .Times(total) + .WillRepeatedly(Invoke([this, &batches, &cur](Buffer::InstancePtr& request, bool) { + auto resp = validate_req(request, batches[cur], cur == 0); + EXPECT_EQ(resp.request_nonce(), cur + 1); + callbacks_->onReceiveMessageRaw(Grpc::Common::serializeMessage(resp)); + })); + + client.onServerInitialized(&mock_reporter_); + cur++; + + for (; cur < total; cur++) { + client.receiveEvents(batches[cur]); + inc_time(config_.default_send_interval()); + } + + int total_events = 0; + for (int i = 0; i < total; i++) { + total_events += batches[i].size(); + } + + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 1); + EXPECT_EQ(stats_.acks_received_counter_.value(), total); + EXPECT_EQ(stats_.send_attempts_counter_.value(), total); + EXPECT_EQ(stats_.sent_accepted_cnt_counter_.value(), 5); + EXPECT_EQ(stats_.sent_removed_cnt_counter_.value(), 2); + EXPECT_EQ(stats_.events_dropped_counter_.value(), 0); + EXPECT_EQ(stats_.queued_events_counter_.value(), total_events); + EXPECT_EQ(stats_.out_of_order_acks_counter_.value(), 0); + EXPECT_EQ(stats_.nonce_current_gauge_.value(), total); + EXPECT_EQ(stats_.nonce_acked_gauge_.value(), total); + EXPECT_EQ(stats_.send_interval_gauge_.value(), + DurationUtil::durationToMilliseconds(config_.default_send_interval())); +} + +// Check the happy path -> config changes from the server response is applied +TEST_F(GrpcClientTest, ReportIntervalChangesReflectInClient) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + int cur = 0; + + ReverseTunnelEvent::BatchedEvents events; + + get_stream(1); + + EXPECT_CALL(mock_reporter_, getAllConnections()).WillOnce(Invoke([&events]() { + return events.connections; + })); + + // This should be called 3 times. + // Once for the first message and then twice for the next two increments. + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .Times(3) + .WillRepeatedly(Invoke([this, &events, &cur](Buffer::InstancePtr& request, bool) { + auto resp = validate_req(request, events, cur == 0); + EXPECT_EQ(resp.request_nonce(), ++cur); + + if (cur == 1) { + *resp.mutable_report_interval() = getHalfDuration(config_.default_send_interval()); + } + + callbacks_->onReceiveMessageRaw(Grpc::Common::serializeMessage(resp)); + })); + + EXPECT_EQ(stats_.send_interval_gauge_.value(), + DurationUtil::durationToMilliseconds(config_.default_send_interval())); + client.onServerInitialized(&mock_reporter_); + EXPECT_EQ(stats_.send_interval_gauge_.value(), + DurationUtil::durationToMilliseconds(getHalfDuration(config_.default_send_interval()))); + + // This is already scheduled from the next time we will use the half interval for sending. + inc_time(config_.default_send_interval()); + inc_time(getHalfDuration(config_.default_send_interval())); + + EXPECT_EQ(stats_.send_attempts_counter_.value(), 3); +} + +// Check edge case -> Full push and then diffs on reconnect +TEST_F(GrpcClientTest, FullPushAndDiffOnReconnect) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + int cur = 0, total = 4; + + ReverseTunnelEvent::BatchedEvents batches[] = { + ReverseTunnelEvent::BatchedEvents{make_connections({"node_1"}), {}}, + ReverseTunnelEvent::BatchedEvents{make_connections({"node_2"}), + make_disconnections({"node_1"})}, + ReverseTunnelEvent::BatchedEvents{make_connections({"node_3", "node_4"}), {}}, + ReverseTunnelEvent::BatchedEvents{make_connections({"node_5"}), + make_disconnections({"node_3"})}}; + + // 2 stream creations: initial connect + reconnect after remote close. + get_stream(2); + + // getAllConnections is called once per full push (initial + reconnect). + EXPECT_CALL(mock_reporter_, getAllConnections()).Times(2).WillRepeatedly(Invoke([&batches]() { + return batches[0].connections; + })); + + // total + 1: the reconnect triggers an extra full push on top of the normal total sends. + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .Times(total + 1) + .WillRepeatedly(Invoke([this, &batches, &cur](Buffer::InstancePtr& request, bool) { + // cur is still 0 during both the initial connect and the reconnect full push, + // so both correctly validate as full_push=true against batches[0]. + auto resp = validate_req(request, batches[cur], cur == 0); + EXPECT_EQ(resp.request_nonce(), cur + 1); + callbacks_->onReceiveMessageRaw(Grpc::Common::serializeMessage(resp)); + })); + + client.onServerInitialized(&mock_reporter_); + + // disconnect and reconnect -> sends the full push automatically + callbacks_->onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Unknown, "Testing"); + inc_time(config_.connect_retry_interval()); + cur++; // cur becomes 1 only after the reconnect full push has already fired. + + for (; cur < total; cur++) { + client.receiveEvents(batches[cur]); + inc_time(config_.default_send_interval()); + } + + int total_sz = 0; + for (int i = 0; i < total; i++) { + total_sz += batches[i].size(); + } + + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 2); + EXPECT_EQ(stats_.acks_received_counter_.value(), total + 1); + EXPECT_EQ(stats_.send_attempts_counter_.value(), total + 1); + // 6 = 1 (initial full push) + 1 (reconnect full push) + 1+2+1 from batches[1..3]. + EXPECT_EQ(stats_.sent_accepted_cnt_counter_.value(), 6); + EXPECT_EQ(stats_.sent_removed_cnt_counter_.value(), 2); + EXPECT_EQ(stats_.events_dropped_counter_.value(), 0); + // +batches[0].size(): the reconnect full push re-queues the initial connections. + EXPECT_EQ(stats_.queued_events_counter_.value(), total_sz + batches[0].size()); + EXPECT_EQ(stats_.out_of_order_acks_counter_.value(), 0); + EXPECT_EQ(stats_.nonce_current_gauge_.value(), total); + EXPECT_EQ(stats_.nonce_acked_gauge_.value(), total); + EXPECT_EQ(stats_ + .getCounter(stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::Unknown, + GrpcDisconnectionReason::DisconnectReason::REMOTE_CLOSE)) + .value(), + 1); +} + +// Check edge case -> Disconnect on deadline exceeded +TEST_F(GrpcClientTest, DisconnectOnTooManyUnAckedRequests) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + std::size_t cur = 0; + + ReverseTunnelEvent::BatchedEvents events; + get_stream(1); + + // max_retries + 1: the initial connect sends once, then max_retries timer ticks each send once. + // On the next timer tick send() sees (nonce_current_ - nonce_acked_) > max_retries and + // disconnects before calling sendMessage, so the total successful sends is max_retries + 1. + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .Times(config_.max_retries() + 1) + .WillRepeatedly(Invoke([&events, &cur](Buffer::InstancePtr& request, bool) { + auto resp = validate_req(request, events, cur == 0); + EXPECT_EQ(resp.request_nonce(), cur + 1); + + // No ACK is sent. It should eventually disconnect. + })); + + EXPECT_CALL(*async_stream_, resetStream()); + + client.onServerInitialized(&mock_reporter_); + cur++; + + // max_retries + 2: we need max_retries timer ticks for the sends, plus one more tick + // to trigger the disconnect check. The first send happens on connect (cur=0). + for (; cur < config_.max_retries() + 2; cur++) { + inc_time(config_.default_send_interval()); + } + + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 1); + EXPECT_EQ(stats_.acks_received_counter_.value(), 0); + EXPECT_EQ(stats_.send_attempts_counter_.value(), config_.max_retries() + 1); + EXPECT_EQ(stats_.nonce_current_gauge_.value(), config_.max_retries() + 1); + EXPECT_EQ(stats_.nonce_acked_gauge_.value(), 0); + EXPECT_EQ(stats_ + .getCounter( + stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded, + GrpcDisconnectionReason::DisconnectReason::MAX_RETRIES_EXCEEDED)) + .value(), + 1); +} + +// Check edge case -> Disconnect with Server on NACK. +TEST_F(GrpcClientTest, DisconnectOnNack) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + + ReverseTunnelEvent::BatchedEvents events; + get_stream(1); + + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .WillOnce(Invoke([this, &events](Buffer::InstancePtr& request, bool) { + auto resp = validate_req(request, events, true); + EXPECT_EQ(resp.request_nonce(), 1); + resp.mutable_error_detail()->set_code(Grpc::Status::WellKnownGrpcStatus::Unavailable); + callbacks_->onReceiveMessageRaw(Grpc::Common::serializeMessage(resp)); + })); + + EXPECT_CALL(*async_stream_, resetStream()); + + client.onServerInitialized(&mock_reporter_); + + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 1); + EXPECT_EQ(stats_.send_attempts_counter_.value(), 1); + EXPECT_EQ( + stats_ + .getCounter(stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::Aborted, + GrpcDisconnectionReason::DisconnectReason::NACK_RECEIVED)) + .value(), + 1); +} + +// Check edge case -> Disconnect on Buffer Full (Also ensure that full push has no limits) +TEST_F(GrpcClientTest, DisconnectOnBufferFull) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + + std::vector nodes; + for (std::size_t i = 0; i < config_.max_buffer() + 1; i++) { + nodes.push_back("node_" + std::to_string(i)); + } + + ReverseTunnelEvent::BatchedEvents connect_events{make_connections(nodes), {}}; + get_stream(1); + + EXPECT_CALL(mock_reporter_, getAllConnections()).WillOnce([&connect_events]() { + return connect_events.connections; + }); + + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .WillOnce(Invoke([this, &connect_events](Buffer::InstancePtr& request, bool) { + auto resp = validate_req(request, connect_events, true); + EXPECT_EQ(resp.request_nonce(), 1); + callbacks_->onReceiveMessageRaw(Grpc::Common::serializeMessage(resp)); + })); + + EXPECT_CALL(*async_stream_, resetStream()); + + client.onServerInitialized(&mock_reporter_); + client.receiveEvents(connect_events); + + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 1); + EXPECT_EQ(stats_.send_attempts_counter_.value(), 1); + EXPECT_EQ(stats_.events_dropped_counter_.value(), nodes.size()); + EXPECT_EQ(stats_.sent_accepted_cnt_counter_.value(), nodes.size()); + EXPECT_EQ(stats_.sent_removed_cnt_counter_.value(), 0); + EXPECT_EQ(stats_.queued_events_counter_.value(), nodes.size()); + EXPECT_EQ( + stats_ + .getCounter(stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::ResourceExhausted, + GrpcDisconnectionReason::DisconnectReason::BUFFER_OVERFLOW)) + .value(), + 1); +} + +// Check edge case -> Prev Nonce ignored +TEST_F(GrpcClientTest, OutOfOrderNonce) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + std::size_t cur = 0; + + ReverseTunnelEvent::BatchedEvents events; + get_stream(1); + + // Send nonce=0 (already acked) for all responses to trigger out-of-order. + // nonce=0 is always <= nonce_acked_ (which starts at 0), so every response lands + // in the else branch and increments out_of_order_acks_counter_. + // Same +1/+2 arithmetic as DisconnectOnTooManyUnAckedRequests: max_retries + 1 sends + // succeed before the disconnect fires. + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .Times(config_.max_retries() + 1) + .WillRepeatedly(Invoke([this, &events, &cur](Buffer::InstancePtr& request, bool) { + auto resp = validate_req(request, events, cur == 0); + EXPECT_EQ(resp.request_nonce(), cur + 1); + + resp.set_request_nonce(0); + callbacks_->onReceiveMessageRaw(Grpc::Common::serializeMessage(resp)); + })); + + EXPECT_CALL(*async_stream_, resetStream()); + + client.onServerInitialized(&mock_reporter_); + cur++; + + for (; cur < config_.max_retries() + 2; cur++) { + inc_time(config_.default_send_interval()); + } + + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 1); + EXPECT_EQ(stats_.send_attempts_counter_.value(), config_.max_retries() + 1); + EXPECT_EQ(stats_.out_of_order_acks_counter_.value(), config_.max_retries() + 1); + EXPECT_EQ(stats_.nonce_current_gauge_.value(), config_.max_retries() + 1); + EXPECT_EQ(stats_.nonce_acked_gauge_.value(), 0); + EXPECT_EQ(stats_ + .getCounter( + stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded, + GrpcDisconnectionReason::DisconnectReason::MAX_RETRIES_EXCEEDED)) + .value(), + 1); +} + +// Check edge case -> Skip Nonce +TEST_F(GrpcClientTest, SkipNonce) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + std::size_t cur = 0; + + ReverseTunnelEvent::BatchedEvents events; + get_stream(1); + + // max_retries + 2: the normal max_retries + 1 sends that would trigger disconnect, + // but a late ACK at iteration max_retries advances nonce_acked_ and buys one more send. + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .Times(config_.max_retries() + 2) + .WillRepeatedly(Invoke([this, &events, &cur](Buffer::InstancePtr& request, bool) { + auto resp = validate_req(request, events, cur == 0); + EXPECT_EQ(resp.request_nonce(), cur + 1); + + // Only ACK the nonce at iteration max_retries, proving a single late ACK + // advances the watermark and prevents disconnect. + if (cur == config_.max_retries()) { + callbacks_->onReceiveMessageRaw(Grpc::Common::serializeMessage(resp)); + } + })); + + client.onServerInitialized(&mock_reporter_); + cur++; + + for (; cur < config_.max_retries() + 2; cur++) { + inc_time(config_.default_send_interval()); + } + + // After the stream is up, retroactively send ACKs for earlier nonces. + // These are all below nonce_acked_ now, so they count as out-of-order. + for (std::size_t i = 1; i <= config_.max_retries(); i++) { + StreamTunnelsResp resp; + resp.set_request_nonce(i); + callbacks_->onReceiveMessageRaw(Grpc::Common::serializeMessage(resp)); + } + + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 1); + EXPECT_EQ(stats_.send_attempts_counter_.value(), config_.max_retries() + 2); + EXPECT_EQ(stats_.out_of_order_acks_counter_.value(), config_.max_retries()); + EXPECT_EQ(stats_.acks_received_counter_.value(), 1); + EXPECT_EQ(stats_.nonce_current_gauge_.value(), config_.max_retries() + 2); + EXPECT_EQ(stats_.nonce_acked_gauge_.value(), config_.max_retries() + 1); +} + +// Edge case -> Remote close Status Ok +TEST_F(GrpcClientTest, OkRemoteClose) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + + ReverseTunnelEvent::BatchedEvents events; + get_stream(1); + + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .WillOnce(Invoke([&events](Buffer::InstancePtr& request, bool) { + auto response = validate_req(request, events, true); + EXPECT_EQ(response.request_nonce(), 1); + })); + + EXPECT_CALL(*async_stream_, resetStream()); + + client.onServerInitialized(&mock_reporter_); + callbacks_->onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Ok, "Testing"); + + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 1); + EXPECT_EQ(stats_.send_attempts_counter_.value(), 1); + EXPECT_EQ(stats_ + .getCounter(stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::Ok, + GrpcDisconnectionReason::DisconnectReason::REMOTE_CLOSE)) + .value(), + 1); +} + +// --- Hardening tests --- + +TEST_F(GrpcClientTest, ReceiveEventsBeforeInitialized) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + + ReverseTunnelEvent::BatchedEvents events{make_connections({"node_1"}), {}}; + client.receiveEvents(std::move(events)); + + EXPECT_EQ(stats_.queued_events_counter_.value(), 0); + EXPECT_EQ(stats_.events_dropped_counter_.value(), 0); +} + +TEST_F(GrpcClientTest, ClusterNotFoundLogsAndReturns) { + GrpcClient client{context_, config_}; + + EXPECT_CALL(cm_, getThreadLocalCluster(_)).WillOnce(Return(nullptr)); + + client.onServerInitialized(&mock_reporter_); + + ReverseTunnelEvent::BatchedEvents events{make_connections({"node_1"}), {}}; + client.receiveEvents(std::move(events)); + + auto stats_{getStats()}; + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 0); + EXPECT_EQ(stats_.queued_events_counter_.value(), 0); +} + +TEST_F(GrpcClientTest, ClientCreationFailureLogsAndReturns) { + GrpcClient client{context_, config_}; + + EXPECT_CALL(manager_, getOrCreateRawAsyncClient(_, _, _)) + .WillOnce(Return(absl::InvalidArgumentError("Bad Karma"))); + + client.onServerInitialized(&mock_reporter_); + + ReverseTunnelEvent::BatchedEvents events{make_connections({"node_1"}), {}}; + client.receiveEvents(std::move(events)); + + auto stats_{getStats()}; + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 0); + EXPECT_EQ(stats_.queued_events_counter_.value(), 0); +} + +TEST_F(GrpcClientTest, BufferOverflowWhileDisconnectedDoesNotRearmRetry) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + + std::vector nodes; + for (std::size_t i = 0; i < config_.max_buffer() + 1; i++) { + nodes.push_back("node_" + std::to_string(i)); + } + + ReverseTunnelEvent::BatchedEvents big_batch{make_connections(nodes), {}}; + get_stream(1); + + EXPECT_CALL(mock_reporter_, getAllConnections()).WillOnce(Return(big_batch.connections)); + + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .WillOnce(Invoke([this, &big_batch](Buffer::InstancePtr& request, bool) { + auto resp = validate_req(request, big_batch, true); + callbacks_->onReceiveMessageRaw(Grpc::Common::serializeMessage(resp)); + })); + + EXPECT_CALL(*async_stream_, resetStream()); + + client.onServerInitialized(&mock_reporter_); + + // First overflow: stream is alive, should disconnect. + client.receiveEvents(ReverseTunnelEvent::BatchedEvents{make_connections(nodes), {}}); + EXPECT_EQ( + stats_ + .getCounter(stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::ResourceExhausted, + GrpcDisconnectionReason::DisconnectReason::BUFFER_OVERFLOW)) + .value(), + 1); + + // Second overflow: stream is null, should NOT increment disconnect counter. + client.receiveEvents(ReverseTunnelEvent::BatchedEvents{make_connections(nodes), {}}); + EXPECT_EQ( + stats_ + .getCounter(stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::ResourceExhausted, + GrpcDisconnectionReason::DisconnectReason::BUFFER_OVERFLOW)) + .value(), + 1); + + // Events still counted as dropped both times. + EXPECT_EQ(stats_.events_dropped_counter_.value(), nodes.size() * 2); +} + +TEST_F(GrpcClientTest, MinSendIntervalFloor) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + + ReverseTunnelEvent::BatchedEvents events; + get_stream(1); + + EXPECT_CALL(mock_reporter_, getAllConnections()).WillOnce(Return(events.connections)); + + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .WillOnce(Invoke([this, &events](Buffer::InstancePtr& request, bool) { + auto resp = validate_req(request, events, true); + + // Server tries to set interval to 10ms, below the 50ms floor. + *resp.mutable_report_interval() = Protobuf::util::TimeUtil::MillisecondsToDuration(10); + callbacks_->onReceiveMessageRaw(Grpc::Common::serializeMessage(resp)); + })); + + client.onServerInitialized(&mock_reporter_); + + // Should be clamped to kMinSendInterval (50ms), not the server's 10ms. + EXPECT_EQ(stats_.send_interval_gauge_.value(), kMinSendInterval.count()); +} + +// Verify default values when proto fields are unset/zero. +TEST_F(GrpcClientTest, ConfigDefaults) { + GrpcConfigProto bare_config; + bare_config.set_cluster("test_cluster"); + + GrpcClientConfig parsed(bare_config); + + EXPECT_EQ(parsed.stat_prefix, "reverse_tunnel_reporter_client.grpc_client"); + EXPECT_EQ(parsed.cluster, "test_cluster"); + EXPECT_EQ(parsed.send_interval.count(), 5000); + EXPECT_EQ(parsed.connect_retry_interval.count(), 5000); + EXPECT_EQ(parsed.max_retries, 5); + EXPECT_EQ(parsed.max_buffer, 1000000); +} + +class GrpcClientFactoryTest : public testing::Test { +public: + void SetUp() override { + factory_ = Registry::FactoryRegistry::getFactory( + "envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.clients.grpc_client"); + ASSERT_NE(nullptr, factory_); + + ON_CALL(context_, messageValidationVisitor()) + .WillByDefault(ReturnRef(ProtobufMessage::getStrictValidationVisitor())); + } + +protected: + ReverseTunnelReporterClientFactory* factory_{}; + NiceMock context_; +}; + +TEST_F(GrpcClientFactoryTest, Name) { + EXPECT_EQ("envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.clients.grpc_client", + factory_->name()); +} + +TEST_F(GrpcClientFactoryTest, CreateEmptyConfigProto) { + auto config = factory_->createEmptyConfigProto(); + EXPECT_NE(nullptr, config); + EXPECT_NE(nullptr, dynamic_cast(config.get())); +} + +TEST_F(GrpcClientFactoryTest, CreateClientReturnsNonNull) { + Api::ApiPtr api = Api::createApiForTest(); + Event::DispatcherPtr dispatcher = api->allocateDispatcher("test"); + ON_CALL(context_, mainThreadDispatcher()).WillByDefault(ReturnRef(*dispatcher)); + Stats::IsolatedStoreImpl stats_store; + ON_CALL(context_, scope()).WillByDefault(ReturnRef(*stats_store.rootScope())); + + GrpcConfigProto config; + config.set_cluster("test_cluster"); + + auto client = factory_->createClient(context_, config); + EXPECT_NE(nullptr, client); +} + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/test/clients/integration_test_utils.h b/contrib/reverse_tunnel_reporter/test/clients/integration_test_utils.h new file mode 100644 index 0000000000000..1085f5b92d270 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/test/clients/integration_test_utils.h @@ -0,0 +1,230 @@ +#pragma once + +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" +#include "envoy/config/cluster/v3/cluster.pb.h" +#include "envoy/config/core/v3/extension.pb.h" +#include "envoy/config/listener/v3/listener.pb.h" +#include "envoy/extensions/bootstrap/reverse_tunnel/downstream_socket_interface/v3/downstream_reverse_connection_socket_interface.pb.h" +#include "envoy/extensions/bootstrap/reverse_tunnel/upstream_socket_interface/v3/upstream_reverse_connection_socket_interface.pb.h" +#include "envoy/extensions/filters/http/router/v3/router.pb.h" +#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h" +#include "envoy/extensions/filters/network/reverse_tunnel/v3/reverse_tunnel.pb.h" + +#include "source/common/common/fmt.h" + +#include "test/config/utility.h" + +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.pb.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +constexpr absl::string_view localhost = "127.0.0.1"; +constexpr absl::string_view anyhost = "0.0.0.0"; +constexpr absl::string_view downstreamExtension = + "envoy.bootstrap.reverse_tunnel.downstream_socket_interface"; +constexpr absl::string_view upstreamExtension = + "envoy.bootstrap.reverse_tunnel.upstream_socket_interface"; +constexpr absl::string_view downstreamCluster = "downstreamCluster"; +constexpr absl::string_view upstreamCluster = "upstreamCluster"; +constexpr absl::string_view upstreamListener = "upstreamListener"; +constexpr absl::string_view upstreamFilter = "envoy.filters.network.reverse_tunnel"; +constexpr absl::string_view downstreamTenant = "downstreamTenant"; +constexpr absl::string_view downstreamResolver = "envoy.resolvers.reverse_connection"; + +constexpr int upstreamPort = 9000; +constexpr int reportingPort = 8082; + +using envoy::config::bootstrap::v3::Bootstrap; +using envoy::config::cluster::v3::Cluster; +using envoy::config::core::v3::TypedExtensionConfig; +using envoy::config::listener::v3::Listener; +using envoy::extensions::bootstrap::reverse_tunnel::downstream_socket_interface::v3:: + DownstreamReverseConnectionSocketInterface; +using envoy::extensions::bootstrap::reverse_tunnel::upstream_socket_interface::v3:: + UpstreamReverseConnectionSocketInterface; +using envoy::extensions::filters::network::reverse_tunnel::v3::ReverseTunnel; +using envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::EventReporterConfig; + +inline TypedExtensionConfig getDownstreamExtension() { + DownstreamReverseConnectionSocketInterface cfg; + cfg.set_stat_prefix(downstreamExtension); + cfg.set_enable_detailed_stats(true); + + TypedExtensionConfig ext; + ext.set_name(downstreamExtension); + ext.mutable_typed_config()->PackFrom(cfg); + + return ext; +} + +inline TypedExtensionConfig getUpstreamExtension(const EventReporterConfig& config) { + UpstreamReverseConnectionSocketInterface cfg; + cfg.set_stat_prefix(upstreamExtension); + cfg.set_enable_detailed_stats(true); + const std::string reporterName = + "envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.reporters.event_reporter"; + auto* reporter = cfg.mutable_reporter_config(); + reporter->set_name(reporterName); + reporter->mutable_typed_config()->PackFrom(config); + + TypedExtensionConfig ext; + ext.set_name(upstreamExtension); + ext.mutable_typed_config()->PackFrom(cfg); + + return ext; +} + +inline Cluster getDownstreamCluster() { + Cluster cluster; + cluster.set_name(downstreamCluster); + cluster.set_type(Cluster::STATIC); + cluster.mutable_connect_timeout()->set_seconds(30); + cluster.mutable_load_assignment()->set_cluster_name(downstreamCluster); + + auto* sa = cluster.mutable_load_assignment() + ->mutable_endpoints() + ->Add() + ->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address(); + + sa->set_address(localhost); + sa->set_port_value(upstreamPort); + + return cluster; +} + +inline Cluster getUpstreamCluster() { + Cluster cluster; + cluster.set_name(upstreamCluster); + cluster.set_type(Cluster::STATIC); + cluster.mutable_connect_timeout()->set_seconds(30); + cluster.mutable_load_assignment()->set_cluster_name(upstreamCluster); + + auto* sa = cluster.mutable_load_assignment() + ->mutable_endpoints() + ->Add() + ->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address(); + + sa->set_address(localhost); + sa->set_port_value(reportingPort); + + return cluster; +} + +inline Listener getUpstreamListener() { + Listener listener; + listener.set_name(upstreamListener); + + auto* sa = listener.mutable_address()->mutable_socket_address(); + sa->set_address(anyhost); + sa->set_port_value(upstreamPort); + + auto* filter = listener.add_filter_chains()->add_filters(); + filter->set_name(upstreamFilter); + + ReverseTunnel rtFilter; + rtFilter.mutable_ping_interval()->set_seconds(300); // No ping timeouts. + + filter->mutable_typed_config()->PackFrom(rtFilter); + + return listener; +} + +inline Listener getDownstreamListener(const std::string& name, int num_listeners) { + Listener listener; + listener.set_name(name); + + auto* sa = listener.mutable_address()->mutable_socket_address(); + sa->set_address(fmt::format("rc://{}:{}:{}@{}:{}", name, downstreamCluster, downstreamTenant, + downstreamCluster, num_listeners)); + sa->set_port_value(0); + sa->set_resolver_name(downstreamResolver); + + listener.mutable_listener_filters_timeout()->set_seconds(0); + + auto* filter_chain = listener.add_filter_chains(); + auto* filter = filter_chain->add_filters(); + filter->set_name("envoy.filters.network.http_connection_manager"); + + envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager hcm; + hcm.set_stat_prefix(name); + + auto* route_config = hcm.mutable_route_config(); + auto* vh = route_config->add_virtual_hosts(); + vh->set_name(name); + vh->add_domains("*"); + + auto* route = vh->add_routes(); + route->mutable_match()->set_prefix("/"); + auto* direct_response = route->mutable_direct_response(); + direct_response->set_status(200); + direct_response->mutable_body()->set_inline_string("reverse connection listener OK"); + + auto* http_filter = hcm.add_http_filters(); + http_filter->set_name("envoy.filters.http.router"); + envoy::extensions::filters::http::router::v3::Router router_cfg; + http_filter->mutable_typed_config()->PackFrom(router_cfg); + + filter->mutable_typed_config()->PackFrom(hcm); + + return listener; +} + +void addCluster(Cluster&& cluster, ConfigHelper& config_helper) { + config_helper.addConfigModifier([cluster = std::move(cluster)](Bootstrap& bootstrap) { + *(bootstrap.mutable_static_resources()->mutable_clusters()->Add()) = cluster; + }); +} + +void addListener(Listener&& listener, ConfigHelper& config_helper, int& current) { + config_helper.addConfigModifier([listener = std::move(listener)](Bootstrap& bootstrap) { + *(bootstrap.mutable_static_resources()->mutable_listeners()->Add()) = listener; + }); + + ++current; +} + +void addBootstrapExtension(TypedExtensionConfig&& extension, ConfigHelper& config_helper) { + config_helper.addConfigModifier([extension = std::move(extension)](Bootstrap& bootstrap) { + *(bootstrap.mutable_bootstrap_extensions()->Add()) = extension; + }); +} + +void removeListener(const std::string& name, ConfigHelper& config_helper, int& current) { + config_helper.addConfigModifier([&name](Bootstrap& bootstrap) { + auto* listeners = bootstrap.mutable_static_resources()->mutable_listeners(); + + for (int i = 0; i < listeners->size(); ++i) { + if (listeners->Get(i).name() == name) { + listeners->DeleteSubrange(i, 1); + break; + } + } + }); + + --current; +} + +Cluster getHttp2Cluster(Cluster& cluster) { + ConfigHelper::HttpProtocolOptions http2_options; + http2_options.mutable_explicit_http_config()->mutable_http2_protocol_options(); + + (*cluster.mutable_typed_extension_protocol_options()) + ["envoy.extensions.upstreams.http.v3.HttpProtocolOptions"] + .PackFrom(http2_options); + + return cluster; +} + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/test/reporters/BUILD b/contrib/reverse_tunnel_reporter/test/reporters/BUILD new file mode 100644 index 0000000000000..991e168a0ae8e --- /dev/null +++ b/contrib/reverse_tunnel_reporter/test/reporters/BUILD @@ -0,0 +1,24 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_test", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_test( + name = "event_reporter_test", + srcs = ["event_reporter_test.cc"], + deps = [ + "//contrib/reverse_tunnel_reporter/source/reporters/event_reporter:event_reporter_lib", + "//source/common/api:api_lib", + "//source/common/config:utility_lib", + "//test/mocks:common_lib", + "//test/mocks/server:server_mocks", + "//test/test_common:registry_lib", + "//test/test_common:test_time_lib", + "//test/test_common:utility_lib", + ], +) diff --git a/contrib/reverse_tunnel_reporter/test/reporters/event_reporter_test.cc b/contrib/reverse_tunnel_reporter/test/reporters/event_reporter_test.cc new file mode 100644 index 0000000000000..fd869977b94c4 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/test/reporters/event_reporter_test.cc @@ -0,0 +1,567 @@ +#include "source/common/api/api_impl.h" + +#include "test/mocks/common.h" +#include "test/mocks/server/server_factory_context.h" +#include "test/test_common/registry.h" +#include "test/test_common/test_time.h" +#include "test/test_common/utility.h" + +#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.h" +#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h" +#include "contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::Invoke; +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +class MockReverseTunnelReporterClient : public ReverseTunnelReporterClient { +public: + MockReverseTunnelReporterClient() = default; + ~MockReverseTunnelReporterClient() override = default; + + MOCK_METHOD(void, onServerInitialized, (ReverseTunnelReporterWithState*), (override)); + MOCK_METHOD(void, receiveEvents, (ReverseTunnelEvent::BatchedEvents), (override)); +}; + +class EventReporterTest : public testing::Test { +protected: + void SetUp() override { + api_ = Api::createApiForTest(); + dispatcher_ = api_->allocateDispatcher("test_thread"); + + ON_CALL(context_, mainThreadDispatcher()).WillByDefault(ReturnRef(*dispatcher_)); + ON_CALL(context_, scope()).WillByDefault(ReturnRef(*stats_store_.rootScope())); + ON_CALL(context_, messageValidationVisitor()) + .WillByDefault(ReturnRef(ProtobufMessage::getStrictValidationVisitor())); + + auto mock_client1 = std::make_unique>(); + auto mock_client2 = std::make_unique>(); + mock_client1_ = mock_client1.get(); + mock_client2_ = mock_client2.get(); + + std::vector clients; + clients.push_back(std::move(mock_client1)); + clients.push_back(std::move(mock_client2)); + + EventReporter::ConfigProto config; + config.set_stat_prefix("test_prefix"); + + reporter_ = std::make_unique(context_, config, std::move(clients)); + } + + void createTestConnection(const std::string& node_id, const std::string& cluster_id, + const std::string& tenant_id = "tenant1") { + reporter_->reportConnectionEvent(node_id, cluster_id, tenant_id); + } + + void createTestDisconnection(const std::string& node_id, const std::string& cluster_id) { + reporter_->reportDisconnectionEvent(node_id, cluster_id); + } + + void runDispatcher() { dispatcher_->run(Event::Dispatcher::RunType::NonBlock); } + + uint64_t getCounterValue(const std::string& name) { + return stats_store_.counterFromString("test_prefix." + name).value(); + } + + uint64_t getGaugeValue(const std::string& name) { + return stats_store_.gaugeFromString("test_prefix." + name, Stats::Gauge::ImportMode::Accumulate) + .value(); + } + + Api::ApiPtr api_; + Event::DispatcherPtr dispatcher_; + NiceMock context_; + Stats::TestUtil::TestStore stats_store_; + NiceMock* mock_client1_; + NiceMock* mock_client2_; + std::unique_ptr reporter_; +}; + +TEST_F(EventReporterTest, AddRemoveConnections) { + EXPECT_CALL(*mock_client1_, receiveEvents(_)) + .Times(4) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node1", batch.connections[0]->node_id); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node2", batch.connections[0]->node_id); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(0, batch.connections.size()); + EXPECT_EQ(1, batch.disconnections.size()); + EXPECT_EQ(ReverseTunnelEvent::getName("node1", "cluster1"), batch.disconnections[0]->name); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(0, batch.connections.size()); + EXPECT_EQ(1, batch.disconnections.size()); + EXPECT_EQ(ReverseTunnelEvent::getName("node2", "cluster2"), batch.disconnections[0]->name); + })); + + EXPECT_CALL(*mock_client2_, receiveEvents(_)).Times(4); + + createTestConnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(1, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + createTestConnection("node2", "cluster2"); + runDispatcher(); + + EXPECT_EQ(2, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + auto connections = reporter_->getAllConnections(); + EXPECT_EQ(2, connections.size()); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_full_pulls_total")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(2, connections.size()); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_full_pulls_total")); + + createTestDisconnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(1, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + createTestDisconnection("node2", "cluster2"); + runDispatcher(); + + EXPECT_EQ(2, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(0, connections.size()); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_full_pulls_total")); +} + +TEST_F(EventReporterTest, DuplicateConnectionHandling) { + EXPECT_CALL(*mock_client1_, receiveEvents(_)) + .Times(2) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node1", batch.connections[0]->node_id); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(0, batch.connections.size()); + EXPECT_EQ(1, batch.disconnections.size()); + EXPECT_EQ(ReverseTunnelEvent::getName("node1", "cluster1"), batch.disconnections[0]->name); + })); + + EXPECT_CALL(*mock_client2_, receiveEvents(_)).Times(2); + + createTestConnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(1, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + createTestConnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(2, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + auto connections = reporter_->getAllConnections(); + EXPECT_EQ(1, connections.size()); + EXPECT_EQ("node1", connections[0]->node_id); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_full_pulls_total")); + + createTestDisconnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(1, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(1, connections.size()); + EXPECT_EQ("node1", connections[0]->node_id); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_full_pulls_total")); + + createTestDisconnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(2, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(0, connections.size()); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_full_pulls_total")); +} + +TEST_F(EventReporterTest, PullsBeforeConnectionEvents) { + auto connections = reporter_->getAllConnections(); + EXPECT_EQ(0, connections.size()); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_full_pulls_total")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(0, connections.size()); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_full_pulls_total")); +} + +TEST_F(EventReporterTest, RemoveNonExistentConnection) { + Envoy::Logger::Registry::setLogLevel(spdlog::level::warn); + MockLogSink sink(Envoy::Logger::Registry::getSink()); + + EXPECT_CALL(sink, log(_, _)) + .WillOnce(Invoke([](absl::string_view, const spdlog::details::log_msg& msg) { + EXPECT_EQ(spdlog::level::warn, msg.level); + })); + + EXPECT_CALL(*mock_client1_, receiveEvents(_)).Times(0); + EXPECT_CALL(*mock_client2_, receiveEvents(_)).Times(0); + + createTestDisconnection("nonexistent", "connection"); + runDispatcher(); + + EXPECT_EQ(0, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_unique_active")); + + auto connections = reporter_->getAllConnections(); + EXPECT_EQ(0, connections.size()); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_full_pulls_total")); + + EXPECT_CALL(*mock_client1_, receiveEvents(_)) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node1", batch.connections[0]->node_id); + })); + EXPECT_CALL(*mock_client2_, receiveEvents(_)); + + createTestConnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(1, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); +} + +TEST_F(EventReporterTest, OnServerInitialized) { + Envoy::Logger::Registry::setLogLevel(spdlog::level::info); + MockLogSink sink(Envoy::Logger::Registry::getSink()); + + EXPECT_CALL(sink, log(_, _)) + .WillOnce(Invoke([](absl::string_view, const spdlog::details::log_msg& msg) { + EXPECT_EQ(spdlog::level::info, msg.level); + })); + + EXPECT_CALL(*mock_client1_, onServerInitialized(_)); + EXPECT_CALL(*mock_client2_, onServerInitialized(_)); + + reporter_->onServerInitialized(); + + EXPECT_EQ(0, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(0, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(0, getCounterValue("reverse_tunnel_full_pulls_total")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_unique_active")); +} + +TEST_F(EventReporterTest, DefaultStatPrefix) { + EventReporter::ConfigProto config; + + std::vector clients; + auto mock_client1 = std::make_unique>(); + auto mock_client2 = std::make_unique>(); + mock_client1_ = mock_client1.get(); + mock_client2_ = mock_client2.get(); + clients.push_back(std::move(mock_client1)); + clients.push_back(std::move(mock_client2)); + + auto default_reporter = std::make_unique(context_, config, std::move(clients)); + + EXPECT_CALL(*mock_client1_, receiveEvents(_)); + EXPECT_CALL(*mock_client2_, receiveEvents(_)); + + default_reporter->reportConnectionEvent("node1", "cluster1", "tenant1"); + runDispatcher(); + + EXPECT_EQ( + 1, stats_store_.counterFromString("reverse_tunnel_reporter.reverse_tunnel_established_total") + .value()); + EXPECT_EQ(1, stats_store_ + .gaugeFromString("reverse_tunnel_reporter.reverse_tunnel_active", + Stats::Gauge::ImportMode::Accumulate) + .value()); + EXPECT_EQ(1, stats_store_ + .gaugeFromString("reverse_tunnel_reporter.reverse_tunnel_unique_active", + Stats::Gauge::ImportMode::Accumulate) + .value()); + + auto connections = default_reporter->getAllConnections(); + EXPECT_EQ(1, connections.size()); + EXPECT_EQ( + 1, stats_store_.counterFromString("reverse_tunnel_reporter.reverse_tunnel_full_pulls_total") + .value()); +} + +TEST_F(EventReporterTest, MixedScenario) { + EXPECT_CALL(*mock_client1_, receiveEvents(_)) + .Times(4) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node1", batch.connections[0]->node_id); + EXPECT_EQ("tenant_A", batch.connections[0]->tenant_id); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node2", batch.connections[0]->node_id); + EXPECT_EQ("tenant_B", batch.connections[0]->tenant_id); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(0, batch.connections.size()); + EXPECT_EQ(1, batch.disconnections.size()); + EXPECT_EQ(ReverseTunnelEvent::getName("node2", "cluster2"), batch.disconnections[0]->name); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node3", batch.connections[0]->node_id); + EXPECT_EQ("tenant_C", batch.connections[0]->tenant_id); + })); + + EXPECT_CALL(*mock_client2_, receiveEvents(_)).Times(4); + + createTestConnection("node1", "cluster1", "tenant_A"); + runDispatcher(); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + createTestConnection("node2", "cluster2", "tenant_B"); + runDispatcher(); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + auto connections = reporter_->getAllConnections(); + EXPECT_EQ(2, connections.size()); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_full_pulls_total")); + + createTestConnection("node1", "cluster1", "tenant_A"); + runDispatcher(); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(3, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + createTestConnection("node2", "cluster2", "tenant_B"); + runDispatcher(); + EXPECT_EQ(4, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(4, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + createTestDisconnection("node1", "cluster1"); + runDispatcher(); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(3, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(2, connections.size()); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_full_pulls_total")); + + createTestDisconnection("node2", "cluster2"); + runDispatcher(); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + createTestDisconnection("node2", "cluster2"); + runDispatcher(); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + createTestConnection("node3", "cluster3", "tenant_C"); + runDispatcher(); + EXPECT_EQ(5, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(2, connections.size()); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_full_pulls_total")); + + EXPECT_EQ(5, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); +} + +TEST_F(EventReporterTest, LargeDuplicateCount) { + EXPECT_CALL(*mock_client1_, receiveEvents(_)) + .Times(2) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node1", batch.connections[0]->node_id); + EXPECT_EQ("tenant_A", batch.connections[0]->tenant_id); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(0, batch.connections.size()); + EXPECT_EQ(1, batch.disconnections.size()); + EXPECT_EQ(ReverseTunnelEvent::getName("node1", "cluster1"), batch.disconnections[0]->name); + })); + + EXPECT_CALL(*mock_client2_, receiveEvents(_)).Times(2); + + const int DUPLICATE_COUNT = 50; + + for (int i = 0; i < DUPLICATE_COUNT; i++) { + createTestConnection("node1", "cluster1", "tenant_A"); + runDispatcher(); + } + + EXPECT_EQ(DUPLICATE_COUNT, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(DUPLICATE_COUNT, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + auto connections = reporter_->getAllConnections(); + EXPECT_EQ(1, connections.size()); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_full_pulls_total")); + + for (int i = 0; i < DUPLICATE_COUNT - 1; i++) { + createTestDisconnection("node1", "cluster1"); + runDispatcher(); + } + + EXPECT_EQ(DUPLICATE_COUNT - 1, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(1, connections.size()); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_full_pulls_total")); + + createTestDisconnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(DUPLICATE_COUNT, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(DUPLICATE_COUNT, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(0, connections.size()); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_full_pulls_total")); +} + +// --- Factory tests --- + +class MockReverseTunnelReporterClientFactory : public ReverseTunnelReporterClientFactory { +public: + MOCK_METHOD(ReverseTunnelReporterClientPtr, createClient, + (Server::Configuration::ServerFactoryContext&, const Protobuf::Message&), (override)); + + std::string name() const override { return "mock_client_factory"; } + ProtobufTypes::MessagePtr createEmptyConfigProto() override { + return std::make_unique(); + } +}; + +class EventReporterFactoryTest : public testing::Test { +protected: + void SetUp() override { + ON_CALL(context_, messageValidationVisitor()) + .WillByDefault(ReturnRef(ProtobufMessage::getStrictValidationVisitor())); + ON_CALL(context_, scope()).WillByDefault(ReturnRef(*stats_store_.rootScope())); + + api_ = Api::createApiForTest(); + dispatcher_ = api_->allocateDispatcher("test_thread"); + ON_CALL(context_, mainThreadDispatcher()).WillByDefault(ReturnRef(*dispatcher_)); + } + + Api::ApiPtr api_; + Event::DispatcherPtr dispatcher_; + NiceMock context_; + Stats::TestUtil::TestStore stats_store_; + EventReporterFactory factory_; +}; + +TEST_F(EventReporterFactoryTest, Name) { + EXPECT_EQ( + "envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.reporters.event_reporter", + factory_.name()); +} + +TEST_F(EventReporterFactoryTest, CreateEmptyConfigProto) { + auto config = factory_.createEmptyConfigProto(); + EXPECT_NE(nullptr, config); + EXPECT_NE( + nullptr, + dynamic_cast< + envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::EventReporterConfig*>( + config.get())); +} + +TEST_F(EventReporterFactoryTest, CreateReporterWithRegisteredClient) { + MockReverseTunnelReporterClientFactory mock_client_factory; + Registry::InjectFactory registered(mock_client_factory); + + EXPECT_CALL(mock_client_factory, createClient(_, _)) + .WillOnce(Invoke([](Server::Configuration::ServerFactoryContext&, + const Protobuf::Message&) -> ReverseTunnelReporterClientPtr { + return std::make_unique>(); + })); + + auto config = factory_.createEmptyConfigProto(); + auto& reporter_config = dynamic_cast< + envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::EventReporterConfig&>( + *config); + reporter_config.set_stat_prefix("test"); + + auto* client_entry = reporter_config.add_clients(); + client_entry->set_name("mock_client_factory"); + client_entry->mutable_typed_config()->PackFrom(Protobuf::Struct()); + + auto reporter = factory_.createReporter(context_, std::move(config)); + EXPECT_NE(nullptr, reporter); +} + +TEST_F(EventReporterFactoryTest, CreateClientWithUnknownFactoryThrows) { + auto config = factory_.createEmptyConfigProto(); + auto& reporter_config = dynamic_cast< + envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::EventReporterConfig&>( + *config); + reporter_config.set_stat_prefix("test"); + + auto* client_entry = reporter_config.add_clients(); + client_entry->set_name("nonexistent_client_factory"); + client_entry->mutable_typed_config()->PackFrom(Protobuf::Struct()); + + EXPECT_THROW_WITH_REGEX(factory_.createReporter(context_, std::move(config)), EnvoyException, + "Unknown Reporter Client Factory: 'nonexistent_client_factory'"); +} + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/docs/root/api-v3/config/contrib/contrib.rst b/docs/root/api-v3/config/contrib/contrib.rst index ac17d06076276..5ce1c24cdb09a 100644 --- a/docs/root/api-v3/config/contrib/contrib.rst +++ b/docs/root/api-v3/config/contrib/contrib.rst @@ -21,3 +21,4 @@ Contrib extensions tap_sinks/tap_sinks load_balancing_policies/peak_ewma/peak_ewma istio/istio + reverse_tunnel_reporter/reverse_tunnel_reporter diff --git a/docs/root/api-v3/config/contrib/reverse_tunnel_reporter/reverse_tunnel_reporter.rst b/docs/root/api-v3/config/contrib/reverse_tunnel_reporter/reverse_tunnel_reporter.rst new file mode 100644 index 0000000000000..cc06a2bf352fa --- /dev/null +++ b/docs/root/api-v3/config/contrib/reverse_tunnel_reporter/reverse_tunnel_reporter.rst @@ -0,0 +1,9 @@ +Reverse tunnel reporter +======================= + +.. toctree:: + :glob: + :maxdepth: 2 + + ../../../extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/* + ../../../extensions/reverse_tunnel_reporters/v3alpha/reporters/* diff --git a/source/extensions/filters/network/reverse_tunnel/reverse_tunnel_filter.cc b/source/extensions/filters/network/reverse_tunnel/reverse_tunnel_filter.cc index ea24e8f9cb3ca..6ed8ec5249978 100644 --- a/source/extensions/filters/network/reverse_tunnel/reverse_tunnel_filter.cc +++ b/source/extensions/filters/network/reverse_tunnel/reverse_tunnel_filter.cc @@ -538,8 +538,7 @@ void ReverseTunnelFilter::processAcceptedConnection(absl::string_view node_id, // Report the connection to the extension -> reporter. if (auto extension = socket_manager->getUpstreamExtension()) { - extension->reportConnection(std::string(node_id), std::string(cluster_id), - std::string(tenant_id)); + extension->reportConnection(socket_node_id, socket_cluster_id, tenant_id); } }