diff --git a/internal/endpoint/endpoint.go b/internal/endpoint/endpoint.go index 55d65be8..b2f07aa0 100644 --- a/internal/endpoint/endpoint.go +++ b/internal/endpoint/endpoint.go @@ -68,8 +68,7 @@ type Endpoint struct { Channel string } Kafka struct { - Host string - Port int + Brokers []string TopicName string Auth string SSL bool @@ -424,21 +423,24 @@ func parseEndpoint(s string) (Endpoint, error) { if endpoint.Protocol == Kafka { // Parsing connection from URL string - hp := strings.Split(s, ":") - switch len(hp) { - default: - return endpoint, errors.New("invalid kafka url") - case 1: - endpoint.Kafka.Host = hp[0] - endpoint.Kafka.Port = 9092 - case 2: - n, err := strconv.ParseUint(hp[1], 10, 16) - if err != nil { - return endpoint, errors.New("invalid kafka url port") + for _, broker := range strings.Split(s, ",") { + broker = strings.TrimSpace(broker) + if broker == "" { + return endpoint, errors.New("invalid kafka url") + } + hp := strings.Split(broker, ":") + switch len(hp) { + default: + return endpoint, errors.New("invalid kafka url") + case 1: + endpoint.Kafka.Brokers = append(endpoint.Kafka.Brokers, hp[0]+":9092") + case 2: + _, err := strconv.ParseUint(hp[1], 10, 16) + if err != nil { + return endpoint, errors.New("invalid kafka url port") + } + endpoint.Kafka.Brokers = append(endpoint.Kafka.Brokers, broker) } - - endpoint.Kafka.Host = hp[0] - endpoint.Kafka.Port = int(n) } // Parsing Kafka queue name diff --git a/internal/endpoint/kafka.go b/internal/endpoint/kafka.go index 97479dfc..5f4cfa7f 100644 --- a/internal/endpoint/kafka.go +++ b/internal/endpoint/kafka.go @@ -72,7 +72,6 @@ func (conn *KafkaConn) Send(msg string) error { sarama.Logger = lg.New(log.Output(), "[sarama] ", 0) } - uri := fmt.Sprintf("%s:%d", conn.ep.Kafka.Host, conn.ep.Kafka.Port) if conn.conn == nil { cfg := sarama.NewConfig() @@ -162,7 +161,7 @@ func (conn *KafkaConn) Send(msg string) error { } } - c, err := sarama.NewSyncProducer([]string{uri}, cfg) + c, err := sarama.NewSyncProducer(conn.ep.Kafka.Brokers, cfg) if err != nil { cfg.MetricRegistry.UnregisterAll() return err diff --git a/internal/server/hooks.go b/internal/server/hooks.go index 6af683be..32b1f2a6 100644 --- a/internal/server/hooks.go +++ b/internal/server/hooks.go @@ -45,8 +45,7 @@ func (s *Server) cmdSetHook(msg *Message) ( if vs, urls, ok = tokenval(vs); !ok || urls == "" { return NOMessage, d, errInvalidNumberOfArguments } - for _, url := range strings.Split(urls, ",") { - url = strings.TrimSpace(url) + for _, url := range splitEndpointURLs(urls) { err := s.epc.Validate(url) if err != nil { log.Errorf("sethook: %v", err) @@ -717,3 +716,23 @@ func (h *Hook) proc() (ok bool) { } return true } + +// splitEndpointURLs splits endpoint URLs on commas, but rejoins parts +// that don't have a scheme prefix (e.g. kafka broker addresses). +func splitEndpointURLs(s string) []string { + parts := strings.Split(s, ",") + var urls []string + for _, part := range parts { + part = strings.TrimSpace(part) + if len(urls) > 0 && !hasSchemePrefix(part) { + urls[len(urls)-1] += "," + part + } else { + urls = append(urls, part) + } + } + return urls +} + +func hasSchemePrefix(s string) bool { + return strings.Contains(s, "://") || strings.HasPrefix(s, "Endpoint=") +}