Skip to content

Multiple kafka brokers #1658

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fef90eb
adding support for multiple brokers
coolwednesday Apr 14, 2025
a2a5740
adding support for multiple-kafka-brokers
coolwednesday Apr 14, 2025
d888fa6
managing open connections to kafka multiple brokers
coolwednesday Apr 14, 2025
f165f3d
refactoring & fixing tests
coolwednesday Apr 16, 2025
fbb93c4
fixing linters
coolwednesday Apr 16, 2025
013765d
fixing linters
coolwednesday Apr 16, 2025
4e92868
Merge branch 'development' into multiple-kafka-brokers
coolwednesday Apr 16, 2025
e1cece7
Merge branch 'development' into multiple-kafka-brokers
coolwednesday Apr 18, 2025
f7e4ce9
fixing code climate issues
coolwednesday Apr 18, 2025
e0e8290
Merge branch 'development' into multiple-kafka-brokers
coolwednesday Apr 18, 2025
5bdea50
adding documentation and fixing linters
coolwednesday Apr 18, 2025
1ee999c
fixing code climate issues
coolwednesday Apr 18, 2025
2a7837d
Merge branch 'development' into multiple-kafka-brokers
coolwednesday Apr 21, 2025
75e4ba1
Merge branch 'development' into multiple-kafka-brokers
coolwednesday Apr 21, 2025
313c087
fiixng linters
coolwednesday Apr 21, 2025
192e450
Merge branch 'development' into multiple-kafka-brokers
Umang01-hash Apr 22, 2025
1c068a9
Merge branch 'development' of github.com:gofr-dev/gofr into multiple-…
Umang01-hash Apr 24, 2025
5aa3f68
fix tests
Umang01-hash Apr 24, 2025
98fb1fd
resolve review comments
Umang01-hash Apr 24, 2025
b718710
Merge branch 'development' into multiple-kafka-brokers
coolwednesday Apr 28, 2025
6a0214b
refactoring kafka initialization
coolwednesday Apr 28, 2025
6defac1
Merge branch 'development' into multiple-kafka-brokers
Umang01-hash Apr 29, 2025
db11ce8
Merge branch 'development' into multiple-kafka-brokers
coolwednesday Apr 29, 2025
c610b74
applying review suggestions:
coolwednesday Apr 30, 2025
ed10d7f
minor fix
coolwednesday Apr 30, 2025
c7d3ba9
fixing variable missrepresentation
coolwednesday Apr 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/advanced-guide/using-publisher-subscriber/page.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ that are specific for the type of message broker user wants to use.
---

- `PUBSUB_BROKER`
- Address to connect to kafka broker.
- Address to connect to kafka broker. Multiple brokers can be added as comma seperated values.
- `+`
-
- `localhost:9092`
- `localhost:9092` or `localhost:8087,localhost:8088,localhost:8089`
- Not empty string

---
Expand Down
2 changes: 1 addition & 1 deletion examples/using-subscriber/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (m *mockMetrics) IncrementCounter(ctx context.Context, name string, labels

func initializeTest(t *testing.T) {
c := kafka.New(&kafka.Config{
Broker: "localhost:9092",
Broker: []string{"localhost:9092"},
OffSet: 1,
BatchSize: kafka.DefaultBatchSize,
BatchBytes: kafka.DefaultBatchBytes,
Expand Down
4 changes: 3 additions & 1 deletion pkg/gofr/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,10 @@ func (c *Container) Create(conf config.Config) {
InsecureSkipVerify: conf.Get("KAFKA_TLS_INSECURE_SKIP_VERIFY") == "true",
}

pubsubBrokers := strings.Split(conf.Get("PUBSUB_BROKER"), ",")

c.PubSub = kafka.New(&kafka.Config{
Broker: conf.Get("PUBSUB_BROKER"),
Broker: pubsubBrokers,
Partition: partition,
ConsumerGroupID: conf.Get("CONSUMER_ID"),
OffSet: offSet,
Expand Down
4 changes: 2 additions & 2 deletions pkg/gofr/container/mock_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type Mocks struct {

type options func(c *Container, ctrl *gomock.Controller) any

//nolint:revive // WithMockHTTPService returns an exported type intentionally; options are internal and subject to change.
func WithMockHTTPService(httpServiceNames ...string) options {
func WithMockHTTPService(httpServiceNames ...string) options { //nolint:revive // WithMockHTTPService returns an
// exported type intentionally; options are internal and subject to change.
return func(c *Container, ctrl *gomock.Controller) any {
mockservice := service.NewMockHTTP(ctrl)
for _, s := range httpServiceNames {
Expand Down
190 changes: 190 additions & 0 deletions pkg/gofr/datasource/pubsub/kafka/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package kafka

import (
"context"
"errors"
"net"
"strconv"
"sync"

"github.com/segmentio/kafka-go"
)

var errFailedToConnectBrokers = errors.New("failed to connect to any kafka brokers")

//nolint:unused // We need this wrap around for testing purposes.
type Conn struct {
conns []*kafka.Conn
}

// initialize creates and configures all Kafka client components.
func (k *kafkaClient) initialize(ctx context.Context) error {
dialer, err := setupDialer(&k.config)
if err != nil {
return err
}

conns, err := connectToBrokers(ctx, k.config.Broker, dialer, k.logger)
if err != nil {
return err
}

multi := &multiConn{
conns: conns,
dialer: dialer,
}

writer := createKafkaWriter(&k.config, dialer, k.logger)
reader := make(map[string]Reader)

k.logger.Logf("connected to %d Kafka brokers", len(conns))

k.dialer = dialer
k.conn = multi
k.writer = writer
k.reader = reader

return nil
}

func (k *kafkaClient) getNewReader(topic string) Reader {
reader := kafka.NewReader(kafka.ReaderConfig{
GroupID: k.config.ConsumerGroupID,
Brokers: k.config.Broker,
Topic: topic,
MinBytes: 10e3,
MaxBytes: 10e6,
Dialer: k.dialer,
StartOffset: int64(k.config.OffSet),
})

return reader
}

func (k *kafkaClient) DeleteTopic(_ context.Context, name string) error {
return k.conn.DeleteTopics(name)
}

func (k *kafkaClient) Controller() (broker kafka.Broker, err error) {
return k.conn.Controller()
}

func (k *kafkaClient) CreateTopic(_ context.Context, name string) error {
topics := kafka.TopicConfig{Topic: name, NumPartitions: 1, ReplicationFactor: 1}

return k.conn.CreateTopics(topics)
}

type multiConn struct {
conns []Connection
dialer *kafka.Dialer
mu sync.RWMutex
}

func (m *multiConn) Controller() (kafka.Broker, error) {
if len(m.conns) == 0 {
return kafka.Broker{}, errNoActiveConnections
}

// Try all connections until we find one that works
for _, conn := range m.conns {
if conn == nil {
continue
}

controller, err := conn.Controller()
if err == nil {
return controller, nil
}
}

return kafka.Broker{}, errNoActiveConnections
}

func (m *multiConn) CreateTopics(topics ...kafka.TopicConfig) error {
controller, err := m.Controller()
if err != nil {
return err
}

controllerAddr := net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))

controllerResolvedAddr, err := net.ResolveTCPAddr("tcp", controllerAddr)
if err != nil {
return err
}

m.mu.RLock()
defer m.mu.RUnlock()

for _, conn := range m.conns {
if conn == nil {
continue
}

if tcpAddr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
if tcpAddr.IP.Equal(controllerResolvedAddr.IP) && tcpAddr.Port == controllerResolvedAddr.Port {
return conn.CreateTopics(topics...)
}
}
}

// If not found, create a new connection
conn, err := m.dialer.DialContext(context.Background(), "tcp", controllerAddr)
if err != nil {
return err
}

m.conns = append(m.conns, conn)

return conn.CreateTopics(topics...)
}

func (m *multiConn) DeleteTopics(topics ...string) error {
controller, err := m.Controller()
if err != nil {
return err
}

controllerAddr := net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))

controllerResolvedAddr, err := net.ResolveTCPAddr("tcp", controllerAddr)
if err != nil {
return err
}

for _, conn := range m.conns {
if conn == nil {
continue
}

if tcpAddr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
// Match IP (after resolution) and Port
if tcpAddr.IP.Equal(controllerResolvedAddr.IP) && tcpAddr.Port == controllerResolvedAddr.Port {
return conn.DeleteTopics(topics...)
}
}
}

// If not found, create a new connection
conn, err := m.dialer.DialContext(context.Background(), "tcp", controllerAddr)
if err != nil {
return err
}

m.conns = append(m.conns, conn)

return conn.DeleteTopics(topics...)
}

func (m *multiConn) Close() error {
var err error

for _, conn := range m.conns {
if conn != nil {
err = errors.Join(err, conn.Close())
}
}

return err
}
84 changes: 76 additions & 8 deletions pkg/gofr/datasource/pubsub/kafka/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,96 @@ package kafka

import (
"encoding/json"
"fmt"
"net"
"strconv"

"gofr.dev/pkg/gofr/datasource"
)

func (k *kafkaClient) Health() (health datasource.Health) {
health = datasource.Health{Details: make(map[string]any)}
func (k *kafkaClient) Health() datasource.Health {
health := datasource.Health{
Status: datasource.StatusDown,
Details: make(map[string]any),
}

if k.conn == nil {
health.Details["error"] = "invalid connection type"
return health
}

health.Status = datasource.StatusUp
k.conn.mu.RLock()
defer k.conn.mu.RUnlock()

_, err := k.conn.Controller()
if err != nil {
health.Status = datasource.StatusDown
brokerStatus, allDown := k.evaluateBrokerHealth()

if !allDown {
health.Status = datasource.StatusUp
}

health.Details["host"] = k.config.Broker
health.Details["brokers"] = brokerStatus
health.Details["backend"] = "KAFKA"
health.Details["writers"] = k.getWriterStatsAsMap()
health.Details["writer"] = k.getWriterStatsAsMap()
health.Details["readers"] = k.getReaderStatsAsMap()

return health
}

func (k *kafkaClient) evaluateBrokerHealth() ([]map[string]any, bool) {
var (
statusList = make([]map[string]any, 0)
controllerAddr string
allDown = true
)

for _, conn := range k.conn.conns {
if conn == nil {
continue
}

status := checkBroker(conn, &controllerAddr)

if status["status"] == BrokerStatusUp {
allDown = false
}

statusList = append(statusList, status)
}

return statusList, allDown
}

func checkBroker(conn Connection, controllerAddr *string) map[string]any {
brokerAddr := conn.RemoteAddr().String()
status := map[string]any{
"broker": brokerAddr,
"status": "DOWN",
"isController": false,
"error": nil,
}

_, err := conn.ReadPartitions()
if err != nil {
status["error"] = err.Error()
return status
}

status["status"] = BrokerStatusUp

if *controllerAddr == "" {
controller, err := conn.Controller()
if err != nil {
status["error"] = fmt.Sprintf("controller lookup failed: %v", err)
} else if controller.Host != "" {
*controllerAddr = net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))
}
}

status["isController"] = brokerAddr == *controllerAddr

return status
}

func (k *kafkaClient) getReaderStatsAsMap() []any {
readerStats := make([]any, 0)

Expand Down
Loading
Loading