Skip to content

Commit 148d893

Browse files
committed
Use unsafe.SliceData()
1 parent f0d709f commit 148d893

File tree

3 files changed

+10
-71
lines changed

3 files changed

+10
-71
lines changed

kafka/event.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ out:
224224
// or to the global Producer.Events channel, or none.
225225
rkmessages := make([]*C.rd_kafka_message_t, int(C.rd_kafka_event_message_count(rkev)))
226226

227-
cnt := int(C.rd_kafka_event_message_array(rkev, (**C.rd_kafka_message_t)(unsafe.Pointer(&rkmessages[0])), C.size_t(len(rkmessages))))
227+
cnt := int(C.rd_kafka_event_message_array(rkev, unsafe.SliceData(rkmessages), C.size_t(len(rkmessages))))
228228

229229
for _, rkmessage := range rkmessages[:cnt] {
230230
msg := h.newMessageFromC(rkmessage)

kafka/handle.go

+2-7
Original file line numberDiff line numberDiff line change
@@ -309,14 +309,9 @@ func (h *handle) setOAuthBearerToken(oauthBearerToken OAuthBearerToken) error {
309309
extensionSize++
310310
}
311311

312-
var cExtensionsToUse **C.char
313-
if extensionSize > 0 {
314-
cExtensionsToUse = (**C.char)(unsafe.Pointer(&cExtensions[0]))
315-
}
316-
317312
cErr := C.rd_kafka_oauthbearer_set_token(h.rk, cTokenValue,
318-
C.int64_t(oauthBearerToken.Expiration.UnixNano()/(1000*1000)), cPrincipal,
319-
cExtensionsToUse, C.size_t(extensionSize), cErrstr, cErrstrSize)
313+
C.int64_t(oauthBearerToken.Expiration.UnixMilli()), cPrincipal,
314+
unsafe.SliceData(cExtensions), C.size_t(extensionSize), cErrstr, cErrstrSize)
320315
if cErr == C.RD_KAFKA_RESP_ERR_NO_ERROR {
321316
return nil
322317
}

kafka/producer.go

+7-63
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,11 @@ void free_tmphdrs (tmphdr_t *tmphdrs, size_t tmphdrsCnt) {
7070
rd_kafka_resp_err_t do_produce (rd_kafka_t *rk,
7171
rd_kafka_topic_t *rkt, int32_t partition,
7272
int msgflags,
73-
int valIsNull, void *val, size_t val_len,
74-
int keyIsNull, void *key, size_t key_len,
73+
void *valp, size_t val_len,
74+
void *keyp, size_t key_len,
7575
int64_t timestamp,
7676
tmphdr_t *tmphdrs, size_t tmphdrsCnt,
7777
uintptr_t cgoid) {
78-
void *valp = valIsNull ? NULL : val;
79-
void *keyp = keyIsNull ? NULL : key;
8078
#ifdef RD_KAFKA_V_TIMESTAMP
8179
rd_kafka_resp_err_t err;
8280
#ifdef RD_KAFKA_V_HEADERS
@@ -179,55 +177,6 @@ func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event)
179177

180178
crkt := p.handle.getRkt(*msg.TopicPartition.Topic)
181179

182-
// Three problems:
183-
// 1) There's a difference between an empty Value or Key (length 0, proper pointer) and
184-
// a null Value or Key (length 0, null pointer).
185-
// 2) we need to be able to send a null Value or Key, but the unsafe.Pointer(&slice[0])
186-
// dereference can't be performed on a nil slice.
187-
// 3) cgo's pointer checking requires the unsafe.Pointer(slice..) call to be made
188-
// in the call to the C function.
189-
//
190-
// Solution:
191-
// Keep track of whether the Value or Key were nil (1), but let the valp and keyp pointers
192-
// point to a 1-byte slice (but the length to send is still 0) so that the dereference (2)
193-
// works.
194-
// Then perform the unsafe.Pointer() on the valp and keyp pointers (which now either point
195-
// to the original msg.Value and msg.Key or to the 1-byte slices) in the call to C (3).
196-
//
197-
var valp []byte
198-
var keyp []byte
199-
oneByte := []byte{0}
200-
var valIsNull C.int
201-
var keyIsNull C.int
202-
var valLen int
203-
var keyLen int
204-
205-
if msg.Value == nil {
206-
valIsNull = 1
207-
valLen = 0
208-
valp = oneByte
209-
} else {
210-
valLen = len(msg.Value)
211-
if valLen > 0 {
212-
valp = msg.Value
213-
} else {
214-
valp = oneByte
215-
}
216-
}
217-
218-
if msg.Key == nil {
219-
keyIsNull = 1
220-
keyLen = 0
221-
keyp = oneByte
222-
} else {
223-
keyLen = len(msg.Key)
224-
if keyLen > 0 {
225-
keyp = msg.Key
226-
} else {
227-
keyp = oneByte
228-
}
229-
}
230-
231180
var cgoid int
232181

233182
// Per-message state that needs to be retained through the C code:
@@ -242,7 +191,7 @@ func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event)
242191

243192
var timestamp int64
244193
if !msg.Timestamp.IsZero() {
245-
timestamp = msg.Timestamp.UnixNano() / 1000000
194+
timestamp = msg.Timestamp.UnixMilli()
246195
}
247196

248197
// Convert headers to C-friendly tmphdrs
@@ -270,20 +219,15 @@ func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event)
270219
tmphdrs[n].size = C.ssize_t(-1)
271220
}
272221
}
273-
} else {
274-
// no headers, need a dummy tmphdrs of size 1 to avoid index
275-
// out of bounds panic in do_produce() call below.
276-
// tmphdrsCnt will be 0.
277-
tmphdrs = []C.tmphdr_t{{nil, nil, 0}}
278222
}
279223

280224
cErr := C.do_produce(p.handle.rk, crkt,
281225
C.int32_t(msg.TopicPartition.Partition),
282226
C.int(msgFlags)|C.RD_KAFKA_MSG_F_COPY,
283-
valIsNull, unsafe.Pointer(&valp[0]), C.size_t(valLen),
284-
keyIsNull, unsafe.Pointer(&keyp[0]), C.size_t(keyLen),
227+
unsafe.Pointer(unsafe.SliceData(msg.Value)), C.size_t(len(msg.Value)),
228+
unsafe.Pointer(unsafe.SliceData(msg.Key)), C.size_t(len(msg.Key)),
285229
C.int64_t(timestamp),
286-
(*C.tmphdr_t)(unsafe.Pointer(&tmphdrs[0])), C.size_t(tmphdrsCnt),
230+
(*C.tmphdr_t)(unsafe.SliceData(tmphdrs)), C.size_t(tmphdrsCnt),
287231
(C.uintptr_t)(cgoid))
288232
if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
289233
if cgoid != 0 {
@@ -326,7 +270,7 @@ func (p *Producer) produceBatch(topic string, msgs []*Message, msgFlags int) err
326270
p.handle.messageToC(m, &cmsgs[i])
327271
}
328272
r := C.rd_kafka_produce_batch(crkt, C.RD_KAFKA_PARTITION_UA, C.int(msgFlags)|C.RD_KAFKA_MSG_F_FREE,
329-
(*C.rd_kafka_message_t)(&cmsgs[0]), C.int(len(msgs)))
273+
unsafe.SliceData(cmsgs), C.int(len(cmsgs)))
330274
if r == -1 {
331275
return newError(C.rd_kafka_last_error())
332276
}

0 commit comments

Comments
 (0)