Skip to content

Commit a8e74f3

Browse files
authored
Revert client consumer group (#1027)
* Revert "Fixes for consumer group (#1022)" This reverts commit a5f270d. * Revert "Refactor Consumer Group to use Client (#947)" This reverts commit ee37c7f.
1 parent a5f270d commit a8e74f3

14 files changed

+709
-845
lines changed

conn.go

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -133,15 +133,16 @@ const (
133133
ReadCommitted IsolationLevel = 1
134134
)
135135

136-
// DefaultClientID is the default value used as ClientID of kafka
137-
// connections.
138-
var DefaultClientID string
136+
var (
137+
// DefaultClientID is the default value used as ClientID of kafka
138+
// connections.
139+
DefaultClientID string
140+
)
139141

140142
func init() {
141143
progname := filepath.Base(os.Args[0])
142144
hostname, _ := os.Hostname()
143145
DefaultClientID = fmt.Sprintf("%s@%s (github.com/segmentio/kafka-go)", progname, hostname)
144-
DefaultTransport.(*Transport).ClientID = DefaultClientID
145146
}
146147

147148
// NewConn returns a new kafka connection for the given topic and partition.
@@ -262,12 +263,10 @@ func (c *Conn) Controller() (broker Broker, err error) {
262263
}
263264
for _, brokerMeta := range res.Brokers {
264265
if brokerMeta.NodeID == res.ControllerID {
265-
broker = Broker{
266-
ID: int(brokerMeta.NodeID),
266+
broker = Broker{ID: int(brokerMeta.NodeID),
267267
Port: int(brokerMeta.Port),
268268
Host: brokerMeta.Host,
269-
Rack: brokerMeta.Rack,
270-
}
269+
Rack: brokerMeta.Rack}
271270
break
272271
}
273272
}
@@ -323,6 +322,7 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
323322
err := c.readOperation(
324323
func(deadline time.Time, id int32) error {
325324
return c.writeRequest(findCoordinator, v0, id, request)
325+
326326
},
327327
func(deadline time.Time, size int) error {
328328
return expectZeroSize(func() (remain int, err error) {
@@ -340,6 +340,32 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
340340
return response, nil
341341
}
342342

343+
// heartbeat sends a heartbeat message required by consumer groups
344+
//
345+
// See http://kafka.apache.org/protocol.html#The_Messages_Heartbeat
346+
func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error) {
347+
var response heartbeatResponseV0
348+
349+
err := c.writeOperation(
350+
func(deadline time.Time, id int32) error {
351+
return c.writeRequest(heartbeat, v0, id, request)
352+
},
353+
func(deadline time.Time, size int) error {
354+
return expectZeroSize(func() (remain int, err error) {
355+
return (&response).readFrom(&c.rbuf, size)
356+
}())
357+
},
358+
)
359+
if err != nil {
360+
return heartbeatResponseV0{}, err
361+
}
362+
if response.ErrorCode != 0 {
363+
return heartbeatResponseV0{}, Error(response.ErrorCode)
364+
}
365+
366+
return response, nil
367+
}
368+
343369
// joinGroup attempts to join a consumer group
344370
//
345371
// See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup
@@ -726,8 +752,9 @@ func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch {
726752
// ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured
727753
// with the default values in ReadBatchConfig except for minBytes and maxBytes.
728754
func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
755+
729756
var adjustedDeadline time.Time
730-
maxFetch := int(c.fetchMaxBytes)
757+
var maxFetch = int(c.fetchMaxBytes)
731758

732759
if cfg.MinBytes < 0 || cfg.MinBytes > maxFetch {
733760
return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes of %d out of [1,%d] bounds", cfg.MinBytes, maxFetch)}
@@ -933,6 +960,7 @@ func (c *Conn) readOffset(t int64) (offset int64, err error) {
933960
// connection. If there are none, the method fetches all partitions of the kafka
934961
// cluster.
935962
func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error) {
963+
936964
if len(topics) == 0 {
937965
if len(c.topic) != 0 {
938966
defaultTopics := [...]string{c.topic}
@@ -1132,10 +1160,11 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
11321160
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
11331161
switch produceVersion {
11341162
case v7:
1135-
recordBatch, err := newRecordBatch(
1136-
codec,
1137-
msgs...,
1138-
)
1163+
recordBatch, err :=
1164+
newRecordBatch(
1165+
codec,
1166+
msgs...,
1167+
)
11391168
if err != nil {
11401169
return err
11411170
}
@@ -1150,10 +1179,11 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
11501179
recordBatch,
11511180
)
11521181
case v3:
1153-
recordBatch, err := newRecordBatch(
1154-
codec,
1155-
msgs...,
1156-
)
1182+
recordBatch, err :=
1183+
newRecordBatch(
1184+
codec,
1185+
msgs...,
1186+
)
11571187
if err != nil {
11581188
return err
11591189
}
@@ -1218,6 +1248,7 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
12181248
}
12191249
return size, err
12201250
}
1251+
12211252
})
12221253
if err != nil {
12231254
return size, err
@@ -1577,7 +1608,7 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) {
15771608
return nil, err
15781609
}
15791610
if version == v1 {
1580-
request := saslAuthenticateRequestV0{Data: data}
1611+
var request = saslAuthenticateRequestV0{Data: data}
15811612
var response saslAuthenticateResponseV0
15821613

15831614
err := c.writeOperation(

0 commit comments

Comments
 (0)