Skip to content

Commit 206484a

Browse files
Merge pull request #1658 from coolwednesday/multiple-kafka-brokers
Multiple kafka brokers
1 parent bcdb261 commit 206484a

File tree

14 files changed

+776
-341
lines changed

14 files changed

+776
-341
lines changed

docs/advanced-guide/using-publisher-subscriber/page.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,10 @@ that are specific for the type of message broker user wants to use.
4747
---
4848

4949
- `PUBSUB_BROKER`
50-
- Address to connect to kafka broker.
50+
- Address to connect to kafka broker. Multiple brokers can be added as comma seperated values.
5151
- `+`
5252
-
53-
- `localhost:9092`
53+
- `localhost:9092` or `localhost:8087,localhost:8088,localhost:8089`
5454
- Not empty string
5555

5656
---

examples/using-subscriber/main_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func (m *mockMetrics) IncrementCounter(ctx context.Context, name string, labels
2525

2626
func initializeTest(t *testing.T) {
2727
c := kafka.New(&kafka.Config{
28-
Broker: "localhost:9092",
28+
Brokers: []string{"localhost:9092"},
2929
OffSet: 1,
3030
BatchSize: kafka.DefaultBatchSize,
3131
BatchBytes: kafka.DefaultBatchBytes,

pkg/gofr/container/container.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,10 @@ func (c *Container) Create(conf config.Config) {
138138
InsecureSkipVerify: conf.Get("KAFKA_TLS_INSECURE_SKIP_VERIFY") == "true",
139139
}
140140

141+
pubsubBrokers := strings.Split(conf.Get("PUBSUB_BROKER"), ",")
142+
141143
c.PubSub = kafka.New(&kafka.Config{
142-
Broker: conf.Get("PUBSUB_BROKER"),
144+
Brokers: pubsubBrokers,
143145
Partition: partition,
144146
ConsumerGroupID: conf.Get("CONSUMER_ID"),
145147
OffSet: offSet,

pkg/gofr/container/mock_container.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ type Mocks struct {
3333

3434
type options func(c *Container, ctrl *gomock.Controller) any
3535

36-
//nolint:revive // WithMockHTTPService returns an exported type intentionally; options are internal and subject to change.
37-
func WithMockHTTPService(httpServiceNames ...string) options {
36+
func WithMockHTTPService(httpServiceNames ...string) options { //nolint:revive // WithMockHTTPService returns an
37+
// exported type intentionally; options are internal and subject to change.
3838
return func(c *Container, ctrl *gomock.Controller) any {
3939
mockservice := service.NewMockHTTP(ctrl)
4040
for _, s := range httpServiceNames {
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net"
7+
"strconv"
8+
"sync"
9+
10+
"github.com/segmentio/kafka-go"
11+
)
12+
13+
//nolint:unused // We need this wrap around for testing purposes.
14+
type Conn struct {
15+
conns []*kafka.Conn
16+
}
17+
18+
// initialize creates and configures all Kafka client components.
19+
func (k *kafkaClient) initialize(ctx context.Context) error {
20+
dialer, err := setupDialer(&k.config)
21+
if err != nil {
22+
return err
23+
}
24+
25+
conns, err := connectToBrokers(ctx, k.config.Brokers, dialer, k.logger)
26+
if err != nil {
27+
return err
28+
}
29+
30+
multi := &multiConn{
31+
conns: conns,
32+
dialer: dialer,
33+
}
34+
35+
writer := createKafkaWriter(&k.config, dialer, k.logger)
36+
reader := make(map[string]Reader)
37+
38+
k.logger.Logf("connected to %d Kafka brokers", len(conns))
39+
40+
k.dialer = dialer
41+
k.conn = multi
42+
k.writer = writer
43+
k.reader = reader
44+
45+
return nil
46+
}
47+
48+
func (k *kafkaClient) getNewReader(topic string) Reader {
49+
reader := kafka.NewReader(kafka.ReaderConfig{
50+
GroupID: k.config.ConsumerGroupID,
51+
Brokers: k.config.Brokers,
52+
Topic: topic,
53+
MinBytes: 10e3,
54+
MaxBytes: 10e6,
55+
Dialer: k.dialer,
56+
StartOffset: int64(k.config.OffSet),
57+
})
58+
59+
return reader
60+
}
61+
62+
func (k *kafkaClient) DeleteTopic(_ context.Context, name string) error {
63+
return k.conn.DeleteTopics(name)
64+
}
65+
66+
func (k *kafkaClient) Controller() (broker kafka.Broker, err error) {
67+
return k.conn.Controller()
68+
}
69+
70+
func (k *kafkaClient) CreateTopic(_ context.Context, name string) error {
71+
topics := kafka.TopicConfig{Topic: name, NumPartitions: 1, ReplicationFactor: 1}
72+
73+
return k.conn.CreateTopics(topics)
74+
}
75+
76+
type multiConn struct {
77+
conns []Connection
78+
dialer *kafka.Dialer
79+
mu sync.RWMutex
80+
}
81+
82+
func (m *multiConn) Controller() (kafka.Broker, error) {
83+
if len(m.conns) == 0 {
84+
return kafka.Broker{}, errNoActiveConnections
85+
}
86+
87+
// Try all connections until we find one that works
88+
for _, conn := range m.conns {
89+
if conn == nil {
90+
continue
91+
}
92+
93+
controller, err := conn.Controller()
94+
if err == nil {
95+
return controller, nil
96+
}
97+
}
98+
99+
return kafka.Broker{}, errNoActiveConnections
100+
}
101+
102+
func (m *multiConn) CreateTopics(topics ...kafka.TopicConfig) error {
103+
controller, err := m.Controller()
104+
if err != nil {
105+
return err
106+
}
107+
108+
controllerAddr := net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))
109+
110+
controllerResolvedAddr, err := net.ResolveTCPAddr("tcp", controllerAddr)
111+
if err != nil {
112+
return err
113+
}
114+
115+
m.mu.RLock()
116+
defer m.mu.RUnlock()
117+
118+
for _, conn := range m.conns {
119+
if conn == nil {
120+
continue
121+
}
122+
123+
if tcpAddr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
124+
if tcpAddr.IP.Equal(controllerResolvedAddr.IP) && tcpAddr.Port == controllerResolvedAddr.Port {
125+
return conn.CreateTopics(topics...)
126+
}
127+
}
128+
}
129+
130+
// If not found, create a new connection
131+
conn, err := m.dialer.DialContext(context.Background(), "tcp", controllerAddr)
132+
if err != nil {
133+
return err
134+
}
135+
136+
m.conns = append(m.conns, conn)
137+
138+
return conn.CreateTopics(topics...)
139+
}
140+
141+
func (m *multiConn) DeleteTopics(topics ...string) error {
142+
controller, err := m.Controller()
143+
if err != nil {
144+
return err
145+
}
146+
147+
controllerAddr := net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))
148+
149+
controllerResolvedAddr, err := net.ResolveTCPAddr("tcp", controllerAddr)
150+
if err != nil {
151+
return err
152+
}
153+
154+
for _, conn := range m.conns {
155+
if conn == nil {
156+
continue
157+
}
158+
159+
if tcpAddr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
160+
// Match IP (after resolution) and Port
161+
if tcpAddr.IP.Equal(controllerResolvedAddr.IP) && tcpAddr.Port == controllerResolvedAddr.Port {
162+
return conn.DeleteTopics(topics...)
163+
}
164+
}
165+
}
166+
167+
// If not found, create a new connection
168+
conn, err := m.dialer.DialContext(context.Background(), "tcp", controllerAddr)
169+
if err != nil {
170+
return err
171+
}
172+
173+
m.conns = append(m.conns, conn)
174+
175+
return conn.DeleteTopics(topics...)
176+
}
177+
178+
func (m *multiConn) Close() error {
179+
var err error
180+
181+
for _, conn := range m.conns {
182+
if conn != nil {
183+
err = errors.Join(err, conn.Close())
184+
}
185+
}
186+
187+
return err
188+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package kafka
2+
3+
import "errors"
4+
5+
var (
6+
ErrConsumerGroupNotProvided = errors.New("consumer group id not provided")
7+
errFailedToConnectBrokers = errors.New("failed to connect to any kafka brokers")
8+
errBrokerNotProvided = errors.New("kafka broker address not provided")
9+
errPublisherNotConfigured = errors.New("can't publish message. Publisher not configured or topic is empty")
10+
errBatchSize = errors.New("KAFKA_BATCH_SIZE must be greater than 0")
11+
errBatchBytes = errors.New("KAFKA_BATCH_BYTES must be greater than 0")
12+
errBatchTimeout = errors.New("KAFKA_BATCH_TIMEOUT must be greater than 0")
13+
errClientNotConnected = errors.New("kafka client not connected")
14+
errUnsupportedSASLMechanism = errors.New("unsupported SASL mechanism")
15+
errSASLCredentialsMissing = errors.New("SASL credentials missing")
16+
errUnsupportedSecurityProtocol = errors.New("unsupported security protocol")
17+
errNoActiveConnections = errors.New("no active connections to brokers")
18+
errCACertFileRead = errors.New("failed to read CA certificate file")
19+
errClientCertLoad = errors.New("failed to load client certificate")
20+
errNotController = errors.New("not a controller")
21+
errUnreachable = errors.New("unreachable")
22+
)

pkg/gofr/datasource/pubsub/kafka/health.go

Lines changed: 76 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,96 @@ package kafka
22

33
import (
44
"encoding/json"
5+
"fmt"
6+
"net"
7+
"strconv"
58

69
"gofr.dev/pkg/gofr/datasource"
710
)
811

9-
func (k *kafkaClient) Health() (health datasource.Health) {
10-
health = datasource.Health{Details: make(map[string]any)}
12+
func (k *kafkaClient) Health() datasource.Health {
13+
health := datasource.Health{
14+
Status: datasource.StatusDown,
15+
Details: make(map[string]any),
16+
}
17+
18+
if k.conn == nil {
19+
health.Details["error"] = "invalid connection type"
20+
return health
21+
}
1122

12-
health.Status = datasource.StatusUp
23+
k.conn.mu.RLock()
24+
defer k.conn.mu.RUnlock()
1325

14-
_, err := k.conn.Controller()
15-
if err != nil {
16-
health.Status = datasource.StatusDown
26+
brokerStatus, allDown := k.evaluateBrokerHealth()
27+
28+
if !allDown {
29+
health.Status = datasource.StatusUp
1730
}
1831

19-
health.Details["host"] = k.config.Broker
32+
health.Details["brokers"] = brokerStatus
2033
health.Details["backend"] = "KAFKA"
21-
health.Details["writers"] = k.getWriterStatsAsMap()
34+
health.Details["writer"] = k.getWriterStatsAsMap()
2235
health.Details["readers"] = k.getReaderStatsAsMap()
2336

2437
return health
2538
}
2639

40+
func (k *kafkaClient) evaluateBrokerHealth() ([]map[string]any, bool) {
41+
var (
42+
statusList = make([]map[string]any, 0)
43+
controllerAddr string
44+
allDown = true
45+
)
46+
47+
for _, conn := range k.conn.conns {
48+
if conn == nil {
49+
continue
50+
}
51+
52+
status := checkBroker(conn, &controllerAddr)
53+
54+
if status["status"] == brokerStatusUp {
55+
allDown = false
56+
}
57+
58+
statusList = append(statusList, status)
59+
}
60+
61+
return statusList, allDown
62+
}
63+
64+
func checkBroker(conn Connection, controllerAddr *string) map[string]any {
65+
brokerAddr := conn.RemoteAddr().String()
66+
status := map[string]any{
67+
"broker": brokerAddr,
68+
"status": "DOWN",
69+
"isController": false,
70+
"error": nil,
71+
}
72+
73+
_, err := conn.ReadPartitions()
74+
if err != nil {
75+
status["error"] = err.Error()
76+
return status
77+
}
78+
79+
status["status"] = brokerStatusUp
80+
81+
if *controllerAddr == "" {
82+
controller, err := conn.Controller()
83+
if err != nil {
84+
status["error"] = fmt.Sprintf("controller lookup failed: %v", err)
85+
} else if controller.Host != "" {
86+
*controllerAddr = net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))
87+
}
88+
}
89+
90+
status["isController"] = brokerAddr == *controllerAddr
91+
92+
return status
93+
}
94+
2795
func (k *kafkaClient) getReaderStatsAsMap() []any {
2896
readerStats := make([]any, 0)
2997

0 commit comments

Comments
 (0)