Skip to content

Commit d2642a7

Browse files
author
Chris Busbey
committed
using byte buffer pool for incoming messages
1 parent 9f09493 commit d2642a7

16 files changed

+114
-53
lines changed

acceptor.go

+11-4
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ package quickfix
22

33
import (
44
"bufio"
5+
"bytes"
56
"crypto/tls"
7+
"io"
68
"net"
9+
"runtime/debug"
710
"strconv"
811
"sync"
912

@@ -128,14 +131,14 @@ func (a *Acceptor) listenForConnections() {
128131
}
129132
}
130133

131-
func (a *Acceptor) invalidMessage(msg []byte, err error) {
132-
a.globalLog.OnEventf("Invalid Message: %s, %v", msg, err.Error())
134+
func (a *Acceptor) invalidMessage(msg *bytes.Buffer, err error) {
135+
a.globalLog.OnEventf("Invalid Message: %s, %v", msg.Bytes(), err.Error())
133136
}
134137

135138
func (a *Acceptor) handleConnection(netConn net.Conn) {
136139
defer func() {
137140
if err := recover(); err != nil {
138-
a.globalLog.OnEventf("Connection Terminated: %v", err)
141+
a.globalLog.OnEventf("Connection Terminated with Panic: %s", debug.Stack())
139142
}
140143

141144
if err := netConn.Close(); err != nil {
@@ -148,7 +151,11 @@ func (a *Acceptor) handleConnection(netConn net.Conn) {
148151

149152
msgBytes, err := parser.ReadMessage()
150153
if err != nil {
151-
a.globalLog.OnEvent(err.Error())
154+
if err == io.EOF {
155+
a.globalLog.OnEvent("Connection Terminated")
156+
} else {
157+
a.globalLog.OnEvent(err.Error())
158+
}
152159
return
153160
}
154161

connection_internal_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ func TestReadLoop(t *testing.T) {
5151
continue
5252
}
5353

54-
if string(msg.bytes) != test.expectedMsg {
55-
t.Errorf("Expected %v got %v", test.expectedMsg, string(msg.bytes))
54+
if msg.bytes.String() != test.expectedMsg {
55+
t.Errorf("Expected %v got %v", test.expectedMsg, msg.bytes.String())
5656
}
5757
}
5858
}

in_session.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package quickfix
22

33
import (
4+
"bytes"
45
"time"
56

67
"github.com/quickfixgo/quickfix/enum"
@@ -212,7 +213,7 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int
212213
nextSeqNum := seqNum
213214
msg := NewMessage()
214215
for _, msgBytes := range msgs {
215-
_ = ParseMessage(&msg, msgBytes)
216+
_ = ParseMessage(&msg, bytes.NewBuffer(msgBytes))
216217
msgType, _ := msg.Header.GetBytes(tagMsgType)
217218
sentMessageSeqNum, _ := msg.Header.GetInt(tagMsgSeqNum)
218219

internal/buffer_pool.go

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package internal
2+
3+
import (
4+
"bytes"
5+
"sync"
6+
)
7+
8+
//BufferPool is an concurrently safe pool for byte buffers. Used to constructing inbound messages and writing outbound messages
9+
type BufferPool struct {
10+
b []*bytes.Buffer
11+
sync.Mutex
12+
}
13+
14+
//Get returns a buffer from the pool, or creates a new buffer if the pool is empty
15+
func (p *BufferPool) Get() (buf *bytes.Buffer) {
16+
p.Lock()
17+
if len(p.b) > 0 {
18+
buf, p.b = p.b[len(p.b)-1], p.b[:len(p.b)-1]
19+
} else {
20+
buf = new(bytes.Buffer)
21+
}
22+
23+
p.Unlock()
24+
25+
return
26+
}
27+
28+
//Put returns adds a buffer to the pool
29+
func (p *BufferPool) Put(buf *bytes.Buffer) {
30+
if buf == nil {
31+
panic("Nil Buffer inserted into pool")
32+
}
33+
34+
p.Lock()
35+
p.b = append(p.b, buf)
36+
p.Unlock()
37+
}

logon_state_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package quickfix
22

33
import (
4+
"bytes"
45
"testing"
56
"time"
67

@@ -282,7 +283,7 @@ func (s *LogonStateTestSuite) TestFixMsgInLogonSeqNumTooHigh() {
282283
msgBytesSent, ok := s.Receiver.LastMessage()
283284
s.Require().True(ok)
284285
sentMessage := NewMessage()
285-
err := ParseMessage(&sentMessage, msgBytesSent)
286+
err := ParseMessage(&sentMessage, bytes.NewBuffer(msgBytesSent))
286287
s.Require().Nil(err)
287288
s.MessageType(enum.MsgType_LOGON, sentMessage)
288289

message.go

+12-10
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ type Message struct {
8282
//ReceiveTime is the time that this message was read from the socket connection
8383
ReceiveTime time.Time
8484

85-
rawMessage []byte
85+
rawMessage *bytes.Buffer
8686

8787
//slice of Bytes corresponding to the message body
8888
bodyBytes []byte
@@ -114,15 +114,17 @@ func NewMessage() (m Message) {
114114
}
115115

116116
//ParseMessage constructs a Message from a byte slice wrapping a FIX message.
117-
func ParseMessage(msg *Message, rawMessage []byte) (err error) {
117+
func ParseMessage(msg *Message, rawMessage *bytes.Buffer) (err error) {
118118
msg.Header.Clear()
119119
msg.Body.Clear()
120120
msg.Trailer.Clear()
121121
msg.rawMessage = rawMessage
122122

123+
rawBytes := rawMessage.Bytes()
124+
123125
//allocate fields in one chunk
124126
fieldCount := 0
125-
for _, b := range rawMessage {
127+
for _, b := range rawBytes {
126128
if b == '\001' {
127129
fieldCount++
128130
}
@@ -137,23 +139,23 @@ func ParseMessage(msg *Message, rawMessage []byte) (err error) {
137139
fieldIndex := 0
138140

139141
//message must start with begin string, body length, msg type
140-
if rawMessage, err = extractSpecificField(&msg.fields[fieldIndex], tagBeginString, rawMessage); err != nil {
142+
if rawBytes, err = extractSpecificField(&msg.fields[fieldIndex], tagBeginString, rawBytes); err != nil {
141143
return
142144
}
143145

144146
msg.Header.tagLookup[msg.fields[fieldIndex].tag] = msg.fields[fieldIndex : fieldIndex+1]
145147
fieldIndex++
146148

147149
parsedFieldBytes := &msg.fields[fieldIndex]
148-
if rawMessage, err = extractSpecificField(parsedFieldBytes, tagBodyLength, rawMessage); err != nil {
150+
if rawBytes, err = extractSpecificField(parsedFieldBytes, tagBodyLength, rawBytes); err != nil {
149151
return
150152
}
151153

152154
msg.Header.tagLookup[parsedFieldBytes.tag] = msg.fields[fieldIndex : fieldIndex+1]
153155
fieldIndex++
154156

155157
parsedFieldBytes = &msg.fields[fieldIndex]
156-
if rawMessage, err = extractSpecificField(parsedFieldBytes, tagMsgType, rawMessage); err != nil {
158+
if rawBytes, err = extractSpecificField(parsedFieldBytes, tagMsgType, rawBytes); err != nil {
157159
return
158160
}
159161

@@ -164,7 +166,7 @@ func ParseMessage(msg *Message, rawMessage []byte) (err error) {
164166
foundBody := false
165167
for {
166168
parsedFieldBytes = &msg.fields[fieldIndex]
167-
rawMessage, err = extractField(parsedFieldBytes, rawMessage)
169+
rawBytes, err = extractField(parsedFieldBytes, rawBytes)
168170
if err != nil {
169171
return
170172
}
@@ -176,15 +178,15 @@ func ParseMessage(msg *Message, rawMessage []byte) (err error) {
176178
msg.Trailer.tagLookup[parsedFieldBytes.tag] = msg.fields[fieldIndex : fieldIndex+1]
177179
default:
178180
foundBody = true
179-
trailerBytes = rawMessage
181+
trailerBytes = rawBytes
180182
msg.Body.tagLookup[parsedFieldBytes.tag] = msg.fields[fieldIndex : fieldIndex+1]
181183
}
182184
if parsedFieldBytes.tag == tagCheckSum {
183185
break
184186
}
185187

186188
if !foundBody {
187-
msg.bodyBytes = rawMessage
189+
msg.bodyBytes = rawBytes
188190
}
189191

190192
fieldIndex++
@@ -296,7 +298,7 @@ func extractField(parsedFieldBytes *TagValue, buffer []byte) (remBytes []byte, e
296298

297299
func (m Message) String() string {
298300
if m.rawMessage != nil {
299-
return string(m.rawMessage)
301+
return m.rawMessage.String()
300302
}
301303

302304
return string(m.build())

message_router_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package quickfix
22

33
import (
4+
"bytes"
45
"fmt"
56
"testing"
67

@@ -38,7 +39,7 @@ func (suite *MessageRouterTestSuite) givenTheRoute(beginString, msgType string)
3839
}
3940

4041
func (suite *MessageRouterTestSuite) givenTheMessage(msgBytes []byte) {
41-
err := ParseMessage(&suite.msg, msgBytes)
42+
err := ParseMessage(&suite.msg, bytes.NewBuffer(msgBytes))
4243
suite.Nil(err)
4344

4445
var beginString FIXString

message_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
)
1010

1111
func BenchmarkParseMessage(b *testing.B) {
12-
rawMsg := []byte("8=FIX.4.29=10435=D34=249=TW52=20140515-19:49:56.65956=ISLD11=10021=140=154=155=TSLA60=00010101-00:00:00.00010=039")
12+
rawMsg := bytes.NewBufferString("8=FIX.4.29=10435=D34=249=TW52=20140515-19:49:56.65956=ISLD11=10021=140=154=155=TSLA60=00010101-00:00:00.00010=039")
1313

1414
var msg Message
1515
for i := 0; i < b.N; i++ {
@@ -31,12 +31,12 @@ func (s *MessageSuite) SetupTest() {
3131
}
3232

3333
func (s *MessageSuite) TestParseMessage() {
34-
rawMsg := []byte("8=FIX.4.29=10435=D34=249=TW52=20140515-19:49:56.65956=ISLD11=10021=140=154=155=TSLA60=00010101-00:00:00.00010=039")
34+
rawMsg := bytes.NewBufferString("8=FIX.4.29=10435=D34=249=TW52=20140515-19:49:56.65956=ISLD11=10021=140=154=155=TSLA60=00010101-00:00:00.00010=039")
3535

3636
err := ParseMessage(&s.msg, rawMsg)
3737
s.Nil(err)
3838

39-
s.True(bytes.Equal(rawMsg, s.msg.rawMessage), "Expected msg bytes to equal raw bytes")
39+
s.True(bytes.Equal(rawMsg.Bytes(), s.msg.rawMessage.Bytes()), "Expected msg bytes to equal raw bytes")
4040

4141
expectedBodyBytes := []byte("11=10021=140=154=155=TSLA60=00010101-00:00:00.000")
4242

@@ -55,7 +55,7 @@ func (s *MessageSuite) TestParseMessage() {
5555

5656
func (s *MessageSuite) TestParseOutOfOrder() {
5757
//allow fields out of order, save for validation
58-
rawMsg := []byte("8=FIX.4.09=8135=D11=id21=338=10040=154=155=MSFT34=249=TW52=20140521-22:07:0956=ISLD10=250")
58+
rawMsg := bytes.NewBufferString("8=FIX.4.09=8135=D11=id21=338=10040=154=155=MSFT34=249=TW52=20140521-22:07:0956=ISLD10=250")
5959
s.Nil(ParseMessage(&s.msg, rawMsg))
6060
}
6161

@@ -73,7 +73,7 @@ func (s *MessageSuite) TestBuild() {
7373
}
7474

7575
func (s *MessageSuite) TestReBuild() {
76-
rawMsg := []byte("8=FIX.4.29=10435=D34=249=TW52=20140515-19:49:56.65956=ISLD11=10021=140=154=155=TSLA60=00010101-00:00:00.00010=039")
76+
rawMsg := bytes.NewBufferString("8=FIX.4.29=10435=D34=249=TW52=20140515-19:49:56.65956=ISLD11=10021=140=154=155=TSLA60=00010101-00:00:00.00010=039")
7777

7878
s.Nil(ParseMessage(&s.msg, rawMsg))
7979

@@ -92,7 +92,7 @@ func (s *MessageSuite) TestReBuild() {
9292
}
9393

9494
func (s *MessageSuite) TestReverseRoute() {
95-
s.Nil(ParseMessage(&s.msg, []byte("8=FIX.4.29=17135=D34=249=TW50=KK52=20060102-15:04:0556=ISLD57=AP144=BB115=JCD116=CS128=MG129=CB142=JV143=RY145=BH11=ID21=338=10040=w54=155=INTC60=20060102-15:04:0510=123")))
95+
s.Nil(ParseMessage(&s.msg, bytes.NewBufferString("8=FIX.4.29=17135=D34=249=TW50=KK52=20060102-15:04:0556=ISLD57=AP144=BB115=JCD116=CS128=MG129=CB142=JV143=RY145=BH11=ID21=338=10040=w54=155=INTC60=20060102-15:04:0510=123")))
9696

9797
builder := s.msg.reverseRoute()
9898

@@ -123,15 +123,15 @@ func (s *MessageSuite) TestReverseRoute() {
123123
}
124124

125125
func (s *MessageSuite) TestReverseRouteIgnoreEmpty() {
126-
s.Nil(ParseMessage(&s.msg, []byte("8=FIX.4.09=12835=D34=249=TW52=20060102-15:04:0556=ISLD115=116=CS128=MG129=CB11=ID21=338=10040=w54=155=INTC60=20060102-15:04:0510=123")))
126+
s.Nil(ParseMessage(&s.msg, bytes.NewBufferString("8=FIX.4.09=12835=D34=249=TW52=20060102-15:04:0556=ISLD115=116=CS128=MG129=CB11=ID21=338=10040=w54=155=INTC60=20060102-15:04:0510=123")))
127127
builder := s.msg.reverseRoute()
128128

129129
s.False(builder.Header.Has(tagDeliverToCompID), "Should not reverse if empty")
130130
}
131131

132132
func (s *MessageSuite) TestReverseRouteFIX40() {
133133
//onbehalfof/deliverto location id not supported in fix 4.0
134-
s.Nil(ParseMessage(&s.msg, []byte("8=FIX.4.09=17135=D34=249=TW50=KK52=20060102-15:04:0556=ISLD57=AP144=BB115=JCD116=CS128=MG129=CB142=JV143=RY145=BH11=ID21=338=10040=w54=155=INTC60=20060102-15:04:0510=123")))
134+
s.Nil(ParseMessage(&s.msg, bytes.NewBufferString("8=FIX.4.09=17135=D34=249=TW50=KK52=20060102-15:04:0556=ISLD57=AP144=BB115=JCD116=CS128=MG129=CB142=JV143=RY145=BH11=ID21=338=10040=w54=155=INTC60=20060102-15:04:0510=123")))
135135

136136
builder := s.msg.reverseRoute()
137137

parser.go

+12-6
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@ import (
44
"bytes"
55
"io"
66
"time"
7+
8+
"github.com/quickfixgo/quickfix/internal"
79
)
810

911
const (
1012
defaultBufSize = 4096
1113
)
1214

15+
var bufferPool internal.BufferPool
16+
1317
type parser struct {
1418
buffer []byte
1519
reader io.Reader
@@ -97,25 +101,27 @@ func (p *parser) jumpLength() (int, error) {
97101
return offset + length, nil
98102
}
99103

100-
func (p *parser) ReadMessage() ([]byte, error) {
104+
func (p *parser) ReadMessage() (msgBytes *bytes.Buffer, err error) {
101105
start, err := p.findStart()
102106
if err != nil {
103-
return []byte{}, err
107+
return
104108
}
105109
p.buffer = p.buffer[start:]
106110

107111
index, err := p.jumpLength()
108112
if err != nil {
109-
return []byte{}, err
113+
return
110114
}
111115

112116
index, err = p.findEndAfterOffset(index)
113117
if err != nil {
114-
return []byte{}, err
118+
return
115119
}
116120

117-
msgBytes := p.buffer[:index:index]
121+
msgBytes = bufferPool.Get()
122+
msgBytes.Reset()
123+
msgBytes.Write(p.buffer[:index])
118124
p.buffer = p.buffer[index:]
119125

120-
return msgBytes, nil
126+
return
121127
}

0 commit comments

Comments
 (0)