diff --git a/CODEOWNERS b/CODEOWNERS index 6c46db6201a86..8038f96a09eaf 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -494,5 +494,6 @@ extensions/upstreams/tcp @ggreenway @mattklein123 /contrib/peak_ewma/load_balancing_policies/ @rroblak @UNOWNED /contrib/kae/ @Misakokoro @UNOWNED /contrib/istio @kyessenov @wbpcode @keithmattix @krinkinmu @zirain +/contrib/reverse_tunnel_reporter @agrawroh @aakugan /compat/openssl/ @tedjpoole @envoyproxy/envoy-openssl-sync diff --git a/api/BUILD b/api/BUILD index 2b36b337b1c34..4cd9d47ab049f 100644 --- a/api/BUILD +++ b/api/BUILD @@ -104,6 +104,8 @@ proto_library( "//contrib/envoy/extensions/private_key_providers/kae/v3alpha:pkg", "//contrib/envoy/extensions/private_key_providers/qat/v3alpha:pkg", "//contrib/envoy/extensions/regex_engines/hyperscan/v3alpha:pkg", + "//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client:pkg", + "//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters:pkg", "//contrib/envoy/extensions/router/cluster_specifier/golang/v3alpha:pkg", "//contrib/envoy/extensions/stat_sinks/kafka/v3:pkg", "//contrib/envoy/extensions/tap_sinks/udp_sink/v3alpha:pkg", diff --git a/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/BUILD b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/BUILD new file mode 100644 index 0000000000000..6409469bbd62f --- /dev/null +++ b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/BUILD @@ -0,0 +1,13 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + has_services = True, + deps = [ + "//envoy/config/core/v3:pkg", + "@xds//udpa/annotations:pkg", + ], +) diff --git a/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/grpc_client.proto b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/grpc_client.proto new file mode 100644 index 0000000000000..eaa122194bc2c --- /dev/null +++ b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/grpc_client.proto @@ -0,0 +1,50 @@ +syntax = "proto3"; + +package envoy.extensions.reverse_tunnel_reporters.v3alpha.clients.grpc_client; + +import "google/protobuf/duration.proto"; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.reverse_tunnel_reporters.v3alpha.clients.grpc_client"; +option java_outer_classname = "GrpcClientProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// Configuration for gRPC push-based connection event client. +// Actively pushes connection events to a cluster using grpc using some internal timing. +// [#next-free-field: 7] +message GrpcClientConfig { + // Stat prefix for this client's metrics. + string stat_prefix = 1; + + // Name of the cluster to send gRPC requests to. + // It must be present in the config otherwise the setup will throw error in the onServerInitialized. + string cluster = 2 [(validate.rules).string = {min_len: 1}]; + + // Default interval between sending batched connection events. + // Default is 5s. + google.protobuf.Duration default_send_interval = 3 [(validate.rules).duration = { + lte {seconds: 3600} + gte {nanos: 25000000} + }]; + + // Interval between connection retry attempts to the gRPC service. + // Connect timeouts are provided at the cluster level and will be handled by the http/2 client. + // How much time to wait after a failed connect before retrying. Default is 5s. + google.protobuf.Duration connect_retry_interval = 4 [(validate.rules).duration = { + lte {seconds: 3600} + gte {nanos: 25000000} + }]; + + // Maximum number of retry attempts for failed gRPC sends. + // Basically the cluster will have default_send_interval * max_retries time to respond. + // Default is 5. After this we will disconnect and try to connect again. + uint32 max_retries = 5; + + // Maximum events to buffer at any given time + // Default is 1,000,000. + uint32 max_buffer = 6; +} diff --git a/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/stream_reverse_tunnels.proto b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/stream_reverse_tunnels.proto new file mode 100644 index 0000000000000..ebc381c0c6094 --- /dev/null +++ b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/stream_reverse_tunnels.proto @@ -0,0 +1,117 @@ +syntax = "proto3"; + +package envoy.extensions.reverse_tunnel_reporters.v3alpha.clients.grpc_client; + +import "envoy/config/core/v3/base.proto"; + +import "google/protobuf/duration.proto"; +import "google/protobuf/struct.proto"; +import "google/protobuf/timestamp.proto"; +import "google/rpc/status.proto"; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.reverse_tunnel_reporters.v3alpha.clients.grpc_client"; +option java_outer_classname = "StreamReverseTunnelsProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#protodoc-title: Reverse Tunnel Reporting Service] + +// ReverseTunnelReportingService allows Envoy instances to report reverse tunnel +// connection state changes to a management server for monitoring and coordination. +service ReverseTunnelReportingService { + // Bidirectional stream for reporting reverse tunnel connection state changes. + // The management server can control reporting intervals and acknowledge received reports. + rpc StreamReverseTunnels(stream StreamReverseTunnelsRequest) + returns (stream StreamReverseTunnelsResponse) { + } +} + +// Request message sent by Envoy to report reverse tunnel state changes. +// [#next-free-field: 7] +message StreamReverseTunnelsRequest { + // Node identifier for the reporting Envoy instance. + // This identifies which Envoy instance is sending the report. + config.core.v3.Node node = 1 [(validate.rules).message = {required: true}]; + + // List of reverse tunnels that were established since the last report. + // Each tunnel represents a new connection from a downstream Envoy. + repeated ReverseTunnel added_tunnels = 2; + + // List of tunnel names that were disconnected since the last report. + // Only the tunnel name is needed for removal notifications. + repeated string removed_tunnel_names = 3 + [(validate.rules).repeated = {items {string {min_len: 1}}}]; + + // Optional metadata for additional context or debugging information. + // Can include deployment information, version details, etc. + google.protobuf.Struct metadata = 4; + + // Indicates whether this report contains all active tunnels (true) or + // only changes since the last report (false). Usually invoked only on server disconnects. + bool full_push = 5; + + // Unique nonce for this request to enable proper ACK/NACK handling. + // Must be non-negative and should increment for each request. + // This can also be modified to be used for checksum and tracking in the future.a + int64 nonce = 6 [(validate.rules).int64 = {gte: 0}]; +} + +// Response message sent by the management server to control reporting behavior. +message StreamReverseTunnelsResponse { + // Node identifier acknowledging which Envoy instance this response is for. + // Should match the node from the corresponding request. + config.core.v3.Node node = 1; + + // Interval at which Envoy should send tunnel state reports. + // This is used to change the reporting_interval -> no need to repeat the same value. + google.protobuf.Duration report_interval = 2 [(validate.rules).duration = {lte {seconds: 3600}}]; + + // Nonce from the request being acknowledged or rejected. + // Must match the nonce from the corresponding request. + int64 request_nonce = 3 [(validate.rules).int64 = {gte: 0}]; + + // Error details if the previous request failed processing. + // If populated, indicates the request was rejected (NACK). + // If empty, indicates successful processing (ACK). + // NACK will terminate the connection -> useful for logging rather than just some disconnect. + // So basically -> NACK then terminate. + google.rpc.Status error_detail = 4; +} + +// Represents a single reverse tunnel connection with its metadata. +message ReverseTunnel { + // Unique name to identify this tunnel connection. + // Typically formatted as "{node_id}|{cluster_id}" or similar. + // Must be unique within the reporting Envoy instance. + // This is also used for the reporting the disconnection with the associated tunnel initiator. + string name = 1 [(validate.rules).string = {min_len: 1}]; + + // Detailed information about the tunnel connection. + ReverseTunnelInfo tunnel_info = 2 [(validate.rules).message = {required: true}]; +} + +// Detailed information about a reverse tunnel connection. +message ReverseTunnelInfo { + // Identity information of the tunnel initiator (downstream Envoy). + // Contains node_id, cluster_id, and tenant_id for proper identification. + TunnelInitiatorIdentity identity = 1 [(validate.rules).message = {required: true}]; + + // Timestamp when this tunnel connection was created. + // Used for ordering events and debugging connection timing issues. + google.protobuf.Timestamp created_at = 2 [(validate.rules).timestamp = {required: true}]; +} + +message TunnelInitiatorIdentity { + // Required: Tenant identifier of the initiating Envoy instance. + string tenant_id = 1 [(validate.rules).string = {min_len: 1 max_len: 128}]; + + // Required: Cluster identifier of the initiating Envoy instance. + string cluster_id = 2 [(validate.rules).string = {min_len: 1 max_len: 128}]; + + // Required: Node identifier of the initiating Envoy instance. + string node_id = 3 [(validate.rules).string = {min_len: 1 max_len: 128}]; +} diff --git a/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/BUILD b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/BUILD new file mode 100644 index 0000000000000..5f552f08145ca --- /dev/null +++ b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/BUILD @@ -0,0 +1,9 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = ["@xds//udpa/annotations:pkg"], +) diff --git a/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.proto b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.proto new file mode 100644 index 0000000000000..2b1315d801544 --- /dev/null +++ b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.proto @@ -0,0 +1,32 @@ +syntax = "proto3"; + +package envoy.extensions.reverse_tunnel_reporters.v3alpha.reporters; + +import "google/protobuf/any.proto"; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.reverse_tunnel_reporters.v3alpha.reporters"; +option java_outer_classname = "EventReporterProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +message ReverseConnectionReporterClient { + // Name to use to pick out the client should match the one reported by the factory. + string name = 1 [(validate.rules).string = {min_len: 1}]; + + // Typed config for the client + google.protobuf.Any typed_config = 2 [(validate.rules).any = {required: true}]; +} + +// Configuration for the connection event reporter. +message EventReporterConfig { + // Stat prefix for this reporter's metrics. + // Metrics will be emitted as "{stat_prefix}.events_pushed", etc. + string stat_prefix = 1; + + // List of clients to report to. + repeated ReverseConnectionReporterClient clients = 2 [(validate.rules).repeated = {min_items: 1}]; +} diff --git a/api/versioning/BUILD b/api/versioning/BUILD index d9358c863733e..3cd46fa04a6c4 100644 --- a/api/versioning/BUILD +++ b/api/versioning/BUILD @@ -43,6 +43,8 @@ proto_library( "//contrib/envoy/extensions/private_key_providers/kae/v3alpha:pkg", "//contrib/envoy/extensions/private_key_providers/qat/v3alpha:pkg", "//contrib/envoy/extensions/regex_engines/hyperscan/v3alpha:pkg", + "//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client:pkg", + "//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters:pkg", "//contrib/envoy/extensions/router/cluster_specifier/golang/v3alpha:pkg", "//contrib/envoy/extensions/stat_sinks/kafka/v3:pkg", "//contrib/envoy/extensions/tap_sinks/udp_sink/v3alpha:pkg", diff --git a/contrib/contrib_build_config.bzl b/contrib/contrib_build_config.bzl index 8c7bad41f4d92..9baa8d1541cff 100644 --- a/contrib/contrib_build_config.bzl +++ b/contrib/contrib_build_config.bzl @@ -118,4 +118,10 @@ CONTRIB_EXTENSIONS = { # "envoy.upstreams.http.tcp.golang": "//contrib/golang/upstreams/http/tcp/source:config", + + # + # Reverse tunnel reporters + # + + "envoy.bootstrap.reverse_tunnel.reverse_tunnel_reporting_service": "//contrib/reverse_tunnel_reporter/source:config", } diff --git a/contrib/extensions_metadata.yaml b/contrib/extensions_metadata.yaml index eca1671c20e35..e2f133745f368 100644 --- a/contrib/extensions_metadata.yaml +++ b/contrib/extensions_metadata.yaml @@ -205,3 +205,11 @@ envoy.load_balancing_policies.peak_ewma: status: alpha type_urls: - envoy.extensions.load_balancing_policies.peak_ewma.v3alpha.PeakEwma +envoy.bootstrap.reverse_tunnel.reverse_tunnel_reporting_service: + categories: + - envoy.bootstrap + security_posture: requires_trusted_downstream_and_upstream + status: alpha + type_urls: + - envoy.extensions.reverse_tunnel_reporters.v3alpha.reporters.EventReporterConfig + - envoy.extensions.reverse_tunnel_reporters.v3alpha.clients.grpc_client.GrpcClientConfig diff --git a/contrib/reverse_tunnel_reporter/source/BUILD b/contrib/reverse_tunnel_reporter/source/BUILD new file mode 100644 index 0000000000000..c952f5c659e33 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/BUILD @@ -0,0 +1,35 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_contrib_extension", + "envoy_cc_library", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_library( + name = "reverse_tunnel_event_types", + hdrs = [ + "reverse_tunnel_event_types.h", + ], + deps = [ + "//envoy/common:pure_lib", + "//envoy/common:time_interface", + "//envoy/config:typed_config_interface", + "//envoy/extensions/bootstrap/reverse_tunnel:reverse_tunnel_reporter_lib", + "//envoy/server:factory_context_interface", + "//source/common/common:fmt_lib", + "//source/common/config:utility_lib", + "//source/common/protobuf", + "//source/common/protobuf:message_validator_lib", + ], +) + +envoy_cc_contrib_extension( + name = "config", + deps = [ + "//contrib/reverse_tunnel_reporter/source/reporters:reporters_lib", + ], +) diff --git a/contrib/reverse_tunnel_reporter/source/reporters/BUILD b/contrib/reverse_tunnel_reporter/source/reporters/BUILD new file mode 100644 index 0000000000000..399bb153885c2 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reporters/BUILD @@ -0,0 +1,16 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_library( + name = "reporters_lib", + deps = [ + "//contrib/reverse_tunnel_reporter/source/reporters/event_reporter:event_reporter_lib", + ], +) diff --git a/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/BUILD b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/BUILD new file mode 100644 index 0000000000000..ace33870375d9 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/BUILD @@ -0,0 +1,30 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_library( + name = "event_reporter_lib", + srcs = [ + "factory.cc", + "reporter.cc", + ], + hdrs = [ + "factory.h", + "reporter.h", + ], + deps = [ + "//contrib/reverse_tunnel_reporter/source:reverse_tunnel_event_types", + "//envoy/extensions/bootstrap/reverse_tunnel:reverse_tunnel_reporter_lib", + "//envoy/registry", + "//source/common/common:logger_lib", + "//source/common/config:utility_lib", + "//source/common/protobuf:utility_lib", + "@envoy_api//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters:pkg_cc_proto", + ], +) diff --git a/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.cc b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.cc new file mode 100644 index 0000000000000..d4f86fe296d64 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.cc @@ -0,0 +1,58 @@ +#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.h" + +#include "envoy/registry/registry.h" + +#include "source/common/config/utility.h" +#include "source/common/protobuf/utility.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +ReverseTunnelReporterPtr +EventReporterFactory::createReporter(Server::Configuration::ServerFactoryContext& context, + ProtobufTypes::MessagePtr config) { + const auto& reporter_config = MessageUtil::downcastAndValidate( + *config, context.messageValidationVisitor()); + + std::vector clients; + clients.reserve(reporter_config.clients().size()); + for (const auto& client_config : reporter_config.clients()) { + clients.push_back(createClient(context, client_config)); + } + return std::make_unique(context, reporter_config, std::move(clients)); +} + +std::string EventReporterFactory::name() const { + return "envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.reporters.event_" + "reporter"; +} + +ProtobufTypes::MessagePtr EventReporterFactory::createEmptyConfigProto() { + return std::make_unique(); +} + +ReverseTunnelReporterClientPtr +EventReporterFactory::createClient(Server::Configuration::ServerFactoryContext& context, + const ClientConfigProto& client_config) { + auto* factory = + Config::Utility::getFactoryByName(client_config.name()); + if (!factory) { + throw EnvoyException( + fmt::format("Unknown Reporter Client Factory: '{}'. " + "Make sure it is registered as a ReverseTunnelReporterClientFactory.", + client_config.name())); + } + + auto typed_config = Config::Utility::translateAnyToFactoryConfig( + client_config.typed_config(), context.messageValidationVisitor(), *factory); + return factory->createClient(context, *typed_config); +} + +REGISTER_FACTORY(EventReporterFactory, ReverseTunnelReporterFactory); + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.h b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.h new file mode 100644 index 0000000000000..14f272506c360 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.h @@ -0,0 +1,37 @@ +#pragma once + +#include "envoy/extensions/bootstrap/reverse_tunnel/reverse_tunnel_reporter.h" + +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.pb.h" +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.pb.validate.h" +#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h" +#include "contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +/// Factory that builds an EventReporter from its proto config, dynamically +/// resolving each child ReverseTunnelReporterClient by name. +class EventReporterFactory : public ReverseTunnelReporterFactory { +public: + ReverseTunnelReporterPtr createReporter(Server::Configuration::ServerFactoryContext& context, + ProtobufTypes::MessagePtr config) override; + std::string name() const override; + ProtobufTypes::MessagePtr createEmptyConfigProto() override; + +private: + using ConfigProto = + envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::EventReporterConfig; + using ClientConfigProto = envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters:: + ReverseConnectionReporterClient; + + ReverseTunnelReporterClientPtr createClient(Server::Configuration::ServerFactoryContext& context, + const ClientConfigProto& client_config); +}; + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.cc b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.cc new file mode 100644 index 0000000000000..14c0a0823e761 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.cc @@ -0,0 +1,125 @@ +#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h" + +#include "source/common/protobuf/utility.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +EventReporter::EventReporter(Server::Configuration::ServerFactoryContext& context, + const ConfigProto& config, + std::vector&& clients) + : context_{context}, clients_{std::move(clients)}, + stats_(generateStats( + PROTOBUF_GET_STRING_OR_DEFAULT(config, stat_prefix, "reverse_tunnel_reporter"), + context.scope())) { + ENVOY_LOG(info, "EventReporter: Constructed with {} clients", clients_.size()); +} + +void EventReporter::onServerInitialized() { + ENVOY_LOG(info, "EventReporter: Initialized"); + for (auto& client : clients_) { + client->onServerInitialized(this); + } +} + +void EventReporter::reportConnectionEvent(absl::string_view node_id, absl::string_view cluster_id, + absl::string_view tenant_id) { + auto ptr = std::make_shared( + ReverseTunnelEvent::Connected{std::string(node_id), std::string(cluster_id), + std::string(tenant_id), Envoy::SystemTime::clock::now()}); + + context_.mainThreadDispatcher().post( + [this, ptr = std::move(ptr)]() mutable { this->addConnection(std::move(ptr)); }); +} + +void EventReporter::reportDisconnectionEvent(absl::string_view node_id, + absl::string_view cluster_id) { + std::string name = ReverseTunnelEvent::getName(node_id, cluster_id); + auto ptr = std::make_shared( + ReverseTunnelEvent::Disconnected{std::move(name)}); + + context_.mainThreadDispatcher().post( + [this, ptr = std::move(ptr)]() mutable { this->removeConnection(std::move(ptr)); }); +} + +// This is only served on the main thread so no locks needed. +ReverseTunnelEvent::SharedConnections EventReporter::getAllConnections() { + ASSERT(context_.mainThreadDispatcher().isThreadSafe()); + stats_.reverse_tunnel_full_pulls_total_.inc(); + + ReverseTunnelEvent::SharedConnections all_connections; + all_connections.reserve(connections_.size()); + + for (auto& [key, val] : connections_) { + all_connections.push_back(val.connection); + } + return all_connections; +} + +EventReporterStats EventReporter::generateStats(const std::string& prefix, Stats::Scope& scope) { + return EventReporterStats{ALL_EVENT_REPORTER_STATS(POOL_COUNTER_PREFIX(scope, prefix), + POOL_GAUGE_PREFIX(scope, prefix))}; +} + +void EventReporter::notifyClients(ReverseTunnelEvent::BatchedEvents&& batch) { + for (auto& client : clients_) { + client->receiveEvents(batch); + } +} + +void EventReporter::addConnection(std::shared_ptr&& connection) { + ASSERT(context_.mainThreadDispatcher().isThreadSafe()); + + ENVOY_LOG(info, "EventReporter: Accepted a new connection. Node: {}, Cluster: {}, Tenant: {}", + connection->node_id, connection->cluster_id, connection->tenant_id); + + std::string name = ReverseTunnelEvent::getName(connection->node_id, connection->cluster_id); + auto [it, inserted] = + connections_.try_emplace(std::move(name), ConnectionEntry{std::move(connection), 1}); + + if (inserted) { + stats_.reverse_tunnel_unique_active_.inc(); + notifyClients(ReverseTunnelEvent::BatchedEvents{{it->second.connection}, {}}); + } else { + // Multiple reverse tunnels can share the same name (same node). + // We ref-count them and only notify clients of removal when the last one disconnects. + it->second.count++; + } + + stats_.reverse_tunnel_established_total_.inc(); + stats_.reverse_tunnel_active_.inc(); +} + +void EventReporter::removeConnection( + std::shared_ptr&& disconnection) { + ASSERT(context_.mainThreadDispatcher().isThreadSafe()); + + const auto& name = disconnection->name; + auto it = connections_.find(name); + + ENVOY_LOG(info, "EventReporter: Removed connection. Name: {}", name); + + if (it == connections_.end()) { + ENVOY_LOG(warn, "EventReporter: Tried to remove a connection which doesnt exist"); + return; + } + + // Only notify removal on the last ref — see addConnection for the ref-count rationale. + if (it->second.count == 1) { + connections_.erase(it); + stats_.reverse_tunnel_unique_active_.dec(); + notifyClients(ReverseTunnelEvent::BatchedEvents{{}, {disconnection}}); + } else { + it->second.count--; + } + + stats_.reverse_tunnel_closed_total_.inc(); + stats_.reverse_tunnel_active_.dec(); +} + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h new file mode 100644 index 0000000000000..4c07f661acf80 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h @@ -0,0 +1,66 @@ +#pragma once + +#include + +#include "envoy/stats/stats_macros.h" + +#include "source/common/common/logger.h" + +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.pb.h" +#include "contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +#define ALL_EVENT_REPORTER_STATS(COUNTER, GAUGE) \ + COUNTER(reverse_tunnel_established_total) \ + COUNTER(reverse_tunnel_closed_total) \ + COUNTER(reverse_tunnel_full_pulls_total) \ + GAUGE(reverse_tunnel_active, Accumulate) \ + GAUGE(reverse_tunnel_unique_active, Accumulate) + +struct EventReporterStats { + ALL_EVENT_REPORTER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT) +}; + +struct ConnectionEntry { + std::shared_ptr connection; + std::size_t count; +}; + +/// Aggregates reverse-tunnel connection/disconnection events, de-duplicates by +/// name, maintains stats, and fans out batched diffs as shared ptrs to registered clients. +class EventReporter : public ReverseTunnelReporterWithState, + public Logger::Loggable { +public: + using ConfigProto = + envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::EventReporterConfig; + + EventReporter(Server::Configuration::ServerFactoryContext& context, const ConfigProto& config, + std::vector&& clients); + + void onServerInitialized() override; + void reportConnectionEvent(absl::string_view node_id, absl::string_view cluster_id, + absl::string_view tenant_id) override; + void reportDisconnectionEvent(absl::string_view node_id, absl::string_view cluster_id) override; + ReverseTunnelEvent::SharedConnections getAllConnections() override; + +private: + static EventReporterStats generateStats(const std::string& prefix, Stats::Scope& scope); + void notifyClients(ReverseTunnelEvent::BatchedEvents&& batch); + void addConnection(std::shared_ptr&& connection); + void removeConnection(std::shared_ptr&& disconnection); + + Server::Configuration::ServerFactoryContext& context_; + std::vector clients_; + EventReporterStats stats_; + // Keyed by getName(node_id, cluster_id). + absl::flat_hash_map connections_; +}; + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h b/contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h new file mode 100644 index 0000000000000..4585c55fa6f6f --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h @@ -0,0 +1,112 @@ +#pragma once + +#include +#include +#include + +#include "envoy/common/pure.h" +#include "envoy/common/time.h" +#include "envoy/config/typed_config.h" +#include "envoy/extensions/bootstrap/reverse_tunnel/reverse_tunnel_reporter.h" +#include "envoy/server/factory_context.h" + +#include "source/common/common/fmt.h" + +#include "absl/strings/str_cat.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +// The namespace holding the structs for the reverse tunnel events +namespace ReverseTunnelEvent { +// Builds the canonical connection name used as the de-duplication key in the reporter +// and as the tunnel name in the gRPC proto. +// TODO(aakugan) look into returning string_view and owning the data in the connection struct alone. +inline std::string getName(absl::string_view node_id, absl::string_view cluster_id) { + return absl::StrCat(node_id, ":", cluster_id); +} + +struct Connected { + std::string node_id; + std::string cluster_id; + std::string tenant_id; + Envoy::SystemTime created_at; +}; + +struct Disconnected { + std::string name; +}; + +using SharedConnections = std::vector>; +using SharedDisconnections = std::vector>; + +struct BatchedEvents { + SharedConnections connections; + SharedDisconnections disconnections; + + std::size_t size() const { return connections.size() + disconnections.size(); } + + void operator+=(BatchedEvents&& events) { + connections.reserve(connections.size() + events.connections.size()); + disconnections.reserve(disconnections.size() + events.disconnections.size()); + + for (auto& conn : events.connections) { + connections.push_back(std::move(conn)); + } + + for (auto& disconn : events.disconnections) { + disconnections.push_back(std::move(disconn)); + } + + events.connections.clear(); + events.disconnections.clear(); + } + + void clear() { + connections.clear(); + disconnections.clear(); + } +}; +} // namespace ReverseTunnelEvent + +// This will own the clients and expose an api for them to get the full state. +// This allows multiple clients to share data -> clients can focus on sending the data alone. +class ReverseTunnelReporterWithState : public ReverseTunnelReporter { +public: + virtual ~ReverseTunnelReporterWithState() = default; + + virtual ReverseTunnelEvent::SharedConnections getAllConnections() PURE; +}; + +using ReverseTunnelReporterWithStatePtr = std::unique_ptr; + +// This gets the ptr to the reporter for polling all the active connections. +// This also has receiveEvents to get the diff events from the reporter. +class ReverseTunnelReporterClient { +public: + virtual ~ReverseTunnelReporterClient() = default; + + virtual void onServerInitialized(ReverseTunnelReporterWithState* reporter) PURE; + + virtual void receiveEvents(ReverseTunnelEvent::BatchedEvents events) PURE; +}; + +using ReverseTunnelReporterClientPtr = std::unique_ptr; + +class ReverseTunnelReporterClientFactory : public Config::TypedFactory { +public: + virtual ReverseTunnelReporterClientPtr + createClient(Server::Configuration::ServerFactoryContext& context, + const Protobuf::Message& config) PURE; + + std::string category() const override { + return "envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.clients"; + } +}; + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/test/reporters/BUILD b/contrib/reverse_tunnel_reporter/test/reporters/BUILD new file mode 100644 index 0000000000000..991e168a0ae8e --- /dev/null +++ b/contrib/reverse_tunnel_reporter/test/reporters/BUILD @@ -0,0 +1,24 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_test", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_test( + name = "event_reporter_test", + srcs = ["event_reporter_test.cc"], + deps = [ + "//contrib/reverse_tunnel_reporter/source/reporters/event_reporter:event_reporter_lib", + "//source/common/api:api_lib", + "//source/common/config:utility_lib", + "//test/mocks:common_lib", + "//test/mocks/server:server_mocks", + "//test/test_common:registry_lib", + "//test/test_common:test_time_lib", + "//test/test_common:utility_lib", + ], +) diff --git a/contrib/reverse_tunnel_reporter/test/reporters/event_reporter_test.cc b/contrib/reverse_tunnel_reporter/test/reporters/event_reporter_test.cc new file mode 100644 index 0000000000000..fd869977b94c4 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/test/reporters/event_reporter_test.cc @@ -0,0 +1,567 @@ +#include "source/common/api/api_impl.h" + +#include "test/mocks/common.h" +#include "test/mocks/server/server_factory_context.h" +#include "test/test_common/registry.h" +#include "test/test_common/test_time.h" +#include "test/test_common/utility.h" + +#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.h" +#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h" +#include "contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::Invoke; +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +class MockReverseTunnelReporterClient : public ReverseTunnelReporterClient { +public: + MockReverseTunnelReporterClient() = default; + ~MockReverseTunnelReporterClient() override = default; + + MOCK_METHOD(void, onServerInitialized, (ReverseTunnelReporterWithState*), (override)); + MOCK_METHOD(void, receiveEvents, (ReverseTunnelEvent::BatchedEvents), (override)); +}; + +class EventReporterTest : public testing::Test { +protected: + void SetUp() override { + api_ = Api::createApiForTest(); + dispatcher_ = api_->allocateDispatcher("test_thread"); + + ON_CALL(context_, mainThreadDispatcher()).WillByDefault(ReturnRef(*dispatcher_)); + ON_CALL(context_, scope()).WillByDefault(ReturnRef(*stats_store_.rootScope())); + ON_CALL(context_, messageValidationVisitor()) + .WillByDefault(ReturnRef(ProtobufMessage::getStrictValidationVisitor())); + + auto mock_client1 = std::make_unique>(); + auto mock_client2 = std::make_unique>(); + mock_client1_ = mock_client1.get(); + mock_client2_ = mock_client2.get(); + + std::vector clients; + clients.push_back(std::move(mock_client1)); + clients.push_back(std::move(mock_client2)); + + EventReporter::ConfigProto config; + config.set_stat_prefix("test_prefix"); + + reporter_ = std::make_unique(context_, config, std::move(clients)); + } + + void createTestConnection(const std::string& node_id, const std::string& cluster_id, + const std::string& tenant_id = "tenant1") { + reporter_->reportConnectionEvent(node_id, cluster_id, tenant_id); + } + + void createTestDisconnection(const std::string& node_id, const std::string& cluster_id) { + reporter_->reportDisconnectionEvent(node_id, cluster_id); + } + + void runDispatcher() { dispatcher_->run(Event::Dispatcher::RunType::NonBlock); } + + uint64_t getCounterValue(const std::string& name) { + return stats_store_.counterFromString("test_prefix." + name).value(); + } + + uint64_t getGaugeValue(const std::string& name) { + return stats_store_.gaugeFromString("test_prefix." + name, Stats::Gauge::ImportMode::Accumulate) + .value(); + } + + Api::ApiPtr api_; + Event::DispatcherPtr dispatcher_; + NiceMock context_; + Stats::TestUtil::TestStore stats_store_; + NiceMock* mock_client1_; + NiceMock* mock_client2_; + std::unique_ptr reporter_; +}; + +TEST_F(EventReporterTest, AddRemoveConnections) { + EXPECT_CALL(*mock_client1_, receiveEvents(_)) + .Times(4) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node1", batch.connections[0]->node_id); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node2", batch.connections[0]->node_id); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(0, batch.connections.size()); + EXPECT_EQ(1, batch.disconnections.size()); + EXPECT_EQ(ReverseTunnelEvent::getName("node1", "cluster1"), batch.disconnections[0]->name); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(0, batch.connections.size()); + EXPECT_EQ(1, batch.disconnections.size()); + EXPECT_EQ(ReverseTunnelEvent::getName("node2", "cluster2"), batch.disconnections[0]->name); + })); + + EXPECT_CALL(*mock_client2_, receiveEvents(_)).Times(4); + + createTestConnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(1, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + createTestConnection("node2", "cluster2"); + runDispatcher(); + + EXPECT_EQ(2, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + auto connections = reporter_->getAllConnections(); + EXPECT_EQ(2, connections.size()); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_full_pulls_total")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(2, connections.size()); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_full_pulls_total")); + + createTestDisconnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(1, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + createTestDisconnection("node2", "cluster2"); + runDispatcher(); + + EXPECT_EQ(2, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(0, connections.size()); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_full_pulls_total")); +} + +TEST_F(EventReporterTest, DuplicateConnectionHandling) { + EXPECT_CALL(*mock_client1_, receiveEvents(_)) + .Times(2) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node1", batch.connections[0]->node_id); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(0, batch.connections.size()); + EXPECT_EQ(1, batch.disconnections.size()); + EXPECT_EQ(ReverseTunnelEvent::getName("node1", "cluster1"), batch.disconnections[0]->name); + })); + + EXPECT_CALL(*mock_client2_, receiveEvents(_)).Times(2); + + createTestConnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(1, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + createTestConnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(2, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + auto connections = reporter_->getAllConnections(); + EXPECT_EQ(1, connections.size()); + EXPECT_EQ("node1", connections[0]->node_id); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_full_pulls_total")); + + createTestDisconnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(1, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(1, connections.size()); + EXPECT_EQ("node1", connections[0]->node_id); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_full_pulls_total")); + + createTestDisconnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(2, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(0, connections.size()); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_full_pulls_total")); +} + +TEST_F(EventReporterTest, PullsBeforeConnectionEvents) { + auto connections = reporter_->getAllConnections(); + EXPECT_EQ(0, connections.size()); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_full_pulls_total")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(0, connections.size()); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_full_pulls_total")); +} + +TEST_F(EventReporterTest, RemoveNonExistentConnection) { + Envoy::Logger::Registry::setLogLevel(spdlog::level::warn); + MockLogSink sink(Envoy::Logger::Registry::getSink()); + + EXPECT_CALL(sink, log(_, _)) + .WillOnce(Invoke([](absl::string_view, const spdlog::details::log_msg& msg) { + EXPECT_EQ(spdlog::level::warn, msg.level); + })); + + EXPECT_CALL(*mock_client1_, receiveEvents(_)).Times(0); + EXPECT_CALL(*mock_client2_, receiveEvents(_)).Times(0); + + createTestDisconnection("nonexistent", "connection"); + runDispatcher(); + + EXPECT_EQ(0, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_unique_active")); + + auto connections = reporter_->getAllConnections(); + EXPECT_EQ(0, connections.size()); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_full_pulls_total")); + + EXPECT_CALL(*mock_client1_, receiveEvents(_)) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node1", batch.connections[0]->node_id); + })); + EXPECT_CALL(*mock_client2_, receiveEvents(_)); + + createTestConnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(1, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); +} + +TEST_F(EventReporterTest, OnServerInitialized) { + Envoy::Logger::Registry::setLogLevel(spdlog::level::info); + MockLogSink sink(Envoy::Logger::Registry::getSink()); + + EXPECT_CALL(sink, log(_, _)) + .WillOnce(Invoke([](absl::string_view, const spdlog::details::log_msg& msg) { + EXPECT_EQ(spdlog::level::info, msg.level); + })); + + EXPECT_CALL(*mock_client1_, onServerInitialized(_)); + EXPECT_CALL(*mock_client2_, onServerInitialized(_)); + + reporter_->onServerInitialized(); + + EXPECT_EQ(0, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(0, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(0, getCounterValue("reverse_tunnel_full_pulls_total")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_unique_active")); +} + +TEST_F(EventReporterTest, DefaultStatPrefix) { + EventReporter::ConfigProto config; + + std::vector clients; + auto mock_client1 = std::make_unique>(); + auto mock_client2 = std::make_unique>(); + mock_client1_ = mock_client1.get(); + mock_client2_ = mock_client2.get(); + clients.push_back(std::move(mock_client1)); + clients.push_back(std::move(mock_client2)); + + auto default_reporter = std::make_unique(context_, config, std::move(clients)); + + EXPECT_CALL(*mock_client1_, receiveEvents(_)); + EXPECT_CALL(*mock_client2_, receiveEvents(_)); + + default_reporter->reportConnectionEvent("node1", "cluster1", "tenant1"); + runDispatcher(); + + EXPECT_EQ( + 1, stats_store_.counterFromString("reverse_tunnel_reporter.reverse_tunnel_established_total") + .value()); + EXPECT_EQ(1, stats_store_ + .gaugeFromString("reverse_tunnel_reporter.reverse_tunnel_active", + Stats::Gauge::ImportMode::Accumulate) + .value()); + EXPECT_EQ(1, stats_store_ + .gaugeFromString("reverse_tunnel_reporter.reverse_tunnel_unique_active", + Stats::Gauge::ImportMode::Accumulate) + .value()); + + auto connections = default_reporter->getAllConnections(); + EXPECT_EQ(1, connections.size()); + EXPECT_EQ( + 1, stats_store_.counterFromString("reverse_tunnel_reporter.reverse_tunnel_full_pulls_total") + .value()); +} + +TEST_F(EventReporterTest, MixedScenario) { + EXPECT_CALL(*mock_client1_, receiveEvents(_)) + .Times(4) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node1", batch.connections[0]->node_id); + EXPECT_EQ("tenant_A", batch.connections[0]->tenant_id); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node2", batch.connections[0]->node_id); + EXPECT_EQ("tenant_B", batch.connections[0]->tenant_id); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(0, batch.connections.size()); + EXPECT_EQ(1, batch.disconnections.size()); + EXPECT_EQ(ReverseTunnelEvent::getName("node2", "cluster2"), batch.disconnections[0]->name); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node3", batch.connections[0]->node_id); + EXPECT_EQ("tenant_C", batch.connections[0]->tenant_id); + })); + + EXPECT_CALL(*mock_client2_, receiveEvents(_)).Times(4); + + createTestConnection("node1", "cluster1", "tenant_A"); + runDispatcher(); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + createTestConnection("node2", "cluster2", "tenant_B"); + runDispatcher(); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + auto connections = reporter_->getAllConnections(); + EXPECT_EQ(2, connections.size()); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_full_pulls_total")); + + createTestConnection("node1", "cluster1", "tenant_A"); + runDispatcher(); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(3, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + createTestConnection("node2", "cluster2", "tenant_B"); + runDispatcher(); + EXPECT_EQ(4, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(4, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + createTestDisconnection("node1", "cluster1"); + runDispatcher(); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(3, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(2, connections.size()); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_full_pulls_total")); + + createTestDisconnection("node2", "cluster2"); + runDispatcher(); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + createTestDisconnection("node2", "cluster2"); + runDispatcher(); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + createTestConnection("node3", "cluster3", "tenant_C"); + runDispatcher(); + EXPECT_EQ(5, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(2, connections.size()); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_full_pulls_total")); + + EXPECT_EQ(5, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); +} + +TEST_F(EventReporterTest, LargeDuplicateCount) { + EXPECT_CALL(*mock_client1_, receiveEvents(_)) + .Times(2) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node1", batch.connections[0]->node_id); + EXPECT_EQ("tenant_A", batch.connections[0]->tenant_id); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(0, batch.connections.size()); + EXPECT_EQ(1, batch.disconnections.size()); + EXPECT_EQ(ReverseTunnelEvent::getName("node1", "cluster1"), batch.disconnections[0]->name); + })); + + EXPECT_CALL(*mock_client2_, receiveEvents(_)).Times(2); + + const int DUPLICATE_COUNT = 50; + + for (int i = 0; i < DUPLICATE_COUNT; i++) { + createTestConnection("node1", "cluster1", "tenant_A"); + runDispatcher(); + } + + EXPECT_EQ(DUPLICATE_COUNT, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(DUPLICATE_COUNT, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + auto connections = reporter_->getAllConnections(); + EXPECT_EQ(1, connections.size()); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_full_pulls_total")); + + for (int i = 0; i < DUPLICATE_COUNT - 1; i++) { + createTestDisconnection("node1", "cluster1"); + runDispatcher(); + } + + EXPECT_EQ(DUPLICATE_COUNT - 1, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(1, connections.size()); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_full_pulls_total")); + + createTestDisconnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(DUPLICATE_COUNT, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(DUPLICATE_COUNT, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(0, connections.size()); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_full_pulls_total")); +} + +// --- Factory tests --- + +class MockReverseTunnelReporterClientFactory : public ReverseTunnelReporterClientFactory { +public: + MOCK_METHOD(ReverseTunnelReporterClientPtr, createClient, + (Server::Configuration::ServerFactoryContext&, const Protobuf::Message&), (override)); + + std::string name() const override { return "mock_client_factory"; } + ProtobufTypes::MessagePtr createEmptyConfigProto() override { + return std::make_unique(); + } +}; + +class EventReporterFactoryTest : public testing::Test { +protected: + void SetUp() override { + ON_CALL(context_, messageValidationVisitor()) + .WillByDefault(ReturnRef(ProtobufMessage::getStrictValidationVisitor())); + ON_CALL(context_, scope()).WillByDefault(ReturnRef(*stats_store_.rootScope())); + + api_ = Api::createApiForTest(); + dispatcher_ = api_->allocateDispatcher("test_thread"); + ON_CALL(context_, mainThreadDispatcher()).WillByDefault(ReturnRef(*dispatcher_)); + } + + Api::ApiPtr api_; + Event::DispatcherPtr dispatcher_; + NiceMock context_; + Stats::TestUtil::TestStore stats_store_; + EventReporterFactory factory_; +}; + +TEST_F(EventReporterFactoryTest, Name) { + EXPECT_EQ( + "envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.reporters.event_reporter", + factory_.name()); +} + +TEST_F(EventReporterFactoryTest, CreateEmptyConfigProto) { + auto config = factory_.createEmptyConfigProto(); + EXPECT_NE(nullptr, config); + EXPECT_NE( + nullptr, + dynamic_cast< + envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::EventReporterConfig*>( + config.get())); +} + +TEST_F(EventReporterFactoryTest, CreateReporterWithRegisteredClient) { + MockReverseTunnelReporterClientFactory mock_client_factory; + Registry::InjectFactory registered(mock_client_factory); + + EXPECT_CALL(mock_client_factory, createClient(_, _)) + .WillOnce(Invoke([](Server::Configuration::ServerFactoryContext&, + const Protobuf::Message&) -> ReverseTunnelReporterClientPtr { + return std::make_unique>(); + })); + + auto config = factory_.createEmptyConfigProto(); + auto& reporter_config = dynamic_cast< + envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::EventReporterConfig&>( + *config); + reporter_config.set_stat_prefix("test"); + + auto* client_entry = reporter_config.add_clients(); + client_entry->set_name("mock_client_factory"); + client_entry->mutable_typed_config()->PackFrom(Protobuf::Struct()); + + auto reporter = factory_.createReporter(context_, std::move(config)); + EXPECT_NE(nullptr, reporter); +} + +TEST_F(EventReporterFactoryTest, CreateClientWithUnknownFactoryThrows) { + auto config = factory_.createEmptyConfigProto(); + auto& reporter_config = dynamic_cast< + envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::EventReporterConfig&>( + *config); + reporter_config.set_stat_prefix("test"); + + auto* client_entry = reporter_config.add_clients(); + client_entry->set_name("nonexistent_client_factory"); + client_entry->mutable_typed_config()->PackFrom(Protobuf::Struct()); + + EXPECT_THROW_WITH_REGEX(factory_.createReporter(context_, std::move(config)), EnvoyException, + "Unknown Reporter Client Factory: 'nonexistent_client_factory'"); +} + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/docs/root/api-v3/config/contrib/contrib.rst b/docs/root/api-v3/config/contrib/contrib.rst index ac17d06076276..5ce1c24cdb09a 100644 --- a/docs/root/api-v3/config/contrib/contrib.rst +++ b/docs/root/api-v3/config/contrib/contrib.rst @@ -21,3 +21,4 @@ Contrib extensions tap_sinks/tap_sinks load_balancing_policies/peak_ewma/peak_ewma istio/istio + reverse_tunnel_reporter/reverse_tunnel_reporter diff --git a/docs/root/api-v3/config/contrib/reverse_tunnel_reporter/reverse_tunnel_reporter.rst b/docs/root/api-v3/config/contrib/reverse_tunnel_reporter/reverse_tunnel_reporter.rst new file mode 100644 index 0000000000000..cc06a2bf352fa --- /dev/null +++ b/docs/root/api-v3/config/contrib/reverse_tunnel_reporter/reverse_tunnel_reporter.rst @@ -0,0 +1,9 @@ +Reverse tunnel reporter +======================= + +.. toctree:: + :glob: + :maxdepth: 2 + + ../../../extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/* + ../../../extensions/reverse_tunnel_reporters/v3alpha/reporters/*