Skip to content

Commit

Permalink
message pool for inbound messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Busbey committed Oct 3, 2016
1 parent 8f7e89d commit da1ca82
Show file tree
Hide file tree
Showing 14 changed files with 114 additions and 73 deletions.
56 changes: 29 additions & 27 deletions in_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ type inSession struct{ loggedOn }

func (state inSession) String() string { return "In Session" }

func (state inSession) FixMsgIn(session *session, msg Message) sessionState {
func (state inSession) FixMsgIn(session *session, msg *Message) sessionState {
msgType, err := msg.Header.GetBytes(tagMsgType)
if err != nil {
return handleStateError(session, err)
}

switch enum.MsgType(msgType) {
case enum.MsgType_LOGON:
if err := session.handleLogon(msg); err != nil {
if err := session.initiateLogoutInReplyTo("", &msg); err != nil {
if err := session.handleLogon(*msg); err != nil {
if err := session.initiateLogoutInReplyTo("", msg); err != nil {
return handleStateError(session, err)
}
return logoutState{}
Expand All @@ -36,7 +36,7 @@ func (state inSession) FixMsgIn(session *session, msg Message) sessionState {
case enum.MsgType_TEST_REQUEST:
return state.handleTestRequest(session, msg)
default:
if err := session.verify(msg); err != nil {
if err := session.verify(*msg); err != nil {
return state.processReject(session, msg, err)
}
}
Expand Down Expand Up @@ -71,16 +71,16 @@ func (state inSession) Timeout(session *session, event internal.Event) (nextStat
return state
}

func (state inSession) handleLogout(session *session, msg Message) (nextState sessionState) {
if err := session.verifySelect(msg, false, false); err != nil {
func (state inSession) handleLogout(session *session, msg *Message) (nextState sessionState) {
if err := session.verifySelect(*msg, false, false); err != nil {
return state.processReject(session, msg, err)
}

if session.IsLoggedOn() {
session.log.OnEvent("Received logout request")
session.log.OnEvent("Sending logout response")

if err := session.sendLogoutInReplyTo("", &msg); err != nil {
if err := session.sendLogoutInReplyTo("", msg); err != nil {
session.logError(err)
}
} else {
Expand All @@ -100,8 +100,8 @@ func (state inSession) handleLogout(session *session, msg Message) (nextState se
return latentState{}
}

func (state inSession) handleTestRequest(session *session, msg Message) (nextState sessionState) {
if err := session.verify(msg); err != nil {
func (state inSession) handleTestRequest(session *session, msg *Message) (nextState sessionState) {
if err := session.verify(*msg); err != nil {
return state.processReject(session, msg, err)
}
var testReq FIXString
Expand All @@ -111,7 +111,7 @@ func (state inSession) handleTestRequest(session *session, msg Message) (nextSta
heartBt := NewMessage()
heartBt.Header.SetField(tagMsgType, FIXString("0"))
heartBt.Body.SetField(tagTestReqID, testReq)
if err := session.sendInReplyTo(heartBt, &msg); err != nil {
if err := session.sendInReplyTo(heartBt, msg); err != nil {
return handleStateError(session, err)
}
}
Expand All @@ -122,15 +122,15 @@ func (state inSession) handleTestRequest(session *session, msg Message) (nextSta
return state
}

func (state inSession) handleSequenceReset(session *session, msg Message) (nextState sessionState) {
func (state inSession) handleSequenceReset(session *session, msg *Message) (nextState sessionState) {
var gapFillFlag FIXBoolean
if msg.Body.Has(tagGapFillFlag) {
if err := msg.Body.GetField(tagGapFillFlag, &gapFillFlag); err != nil {
return state.processReject(session, msg, err)
}
}

if err := session.verifySelect(msg, bool(gapFillFlag), bool(gapFillFlag)); err != nil {
if err := session.verifySelect(*msg, bool(gapFillFlag), bool(gapFillFlag)); err != nil {
return state.processReject(session, msg, err)
}

Expand All @@ -146,16 +146,16 @@ func (state inSession) handleSequenceReset(session *session, msg Message) (nextS
}
case newSeqNo < expectedSeqNum:
//FIXME: to be compliant with legacy tests, do not include tag in reftagid? (11c_NewSeqNoLess)
if err := session.doReject(msg, valueIsIncorrectNoTag()); err != nil {
if err := session.doReject(*msg, valueIsIncorrectNoTag()); err != nil {
return handleStateError(session, err)
}
}
}
return state
}

func (state inSession) handleResendRequest(session *session, msg Message) (nextState sessionState) {
if err := session.verifyIgnoreSeqNumTooHighOrLow(msg); err != nil {
func (state inSession) handleResendRequest(session *session, msg *Message) (nextState sessionState) {
if err := session.verifyIgnoreSeqNumTooHighOrLow(*msg); err != nil {
return state.processReject(session, msg, err)
}

Expand Down Expand Up @@ -183,15 +183,15 @@ func (state inSession) handleResendRequest(session *session, msg Message) (nextS
endSeqNo = expectedSeqNum - 1
}

if err := state.resendMessages(session, int(beginSeqNo), endSeqNo, msg); err != nil {
if err := state.resendMessages(session, int(beginSeqNo), endSeqNo, *msg); err != nil {
return handleStateError(session, err)
}

if err := session.checkTargetTooLow(msg); err != nil {
if err := session.checkTargetTooLow(*msg); err != nil {
return state
}

if err := session.checkTargetTooHigh(msg); err != nil {
if err := session.checkTargetTooHigh(*msg); err != nil {
return state
}

Expand Down Expand Up @@ -249,7 +249,7 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int
return
}

func (state inSession) processReject(session *session, msg Message, rej MessageRejectError) sessionState {
func (state inSession) processReject(session *session, msg *Message, rej MessageRejectError) sessionState {
switch TypedError := rej.(type) {
case targetTooHigh:

Expand All @@ -266,10 +266,12 @@ func (state inSession) processReject(session *session, msg Message, rej MessageR
}

if nextState.messageStash == nil {
nextState.messageStash = make(map[int]Message)
nextState.messageStash = make(map[int]*Message)
}

nextState.messageStash[TypedError.ReceivedTarget] = msg
//do not reclaim stashed message
msg.keepMessage = true

return nextState

Expand All @@ -284,7 +286,7 @@ func (state inSession) processReject(session *session, msg Message, rej MessageR

switch rej.RejectReason() {
case rejectReasonCompIDProblem, rejectReasonSendingTimeAccuracyProblem:
if err := session.doReject(msg, rej); err != nil {
if err := session.doReject(*msg, rej); err != nil {
return handleStateError(session, err)
}

Expand All @@ -293,7 +295,7 @@ func (state inSession) processReject(session *session, msg Message, rej MessageR
}
return logoutState{}
default:
if err := session.doReject(msg, rej); err != nil {
if err := session.doReject(*msg, rej); err != nil {
return handleStateError(session, err)
}

Expand All @@ -304,11 +306,11 @@ func (state inSession) processReject(session *session, msg Message, rej MessageR
}
}

func (state inSession) doTargetTooLow(session *session, msg Message, rej targetTooLow) (nextState sessionState) {
func (state inSession) doTargetTooLow(session *session, msg *Message, rej targetTooLow) (nextState sessionState) {
var posDupFlag FIXBoolean
if msg.Header.Has(tagPossDupFlag) {
if err := msg.Header.GetField(tagPossDupFlag, &posDupFlag); err != nil {
if rejErr := session.doReject(msg, err); rejErr != nil {
if rejErr := session.doReject(*msg, err); rejErr != nil {
return handleStateError(session, rejErr)
}
return state
Expand All @@ -323,15 +325,15 @@ func (state inSession) doTargetTooLow(session *session, msg Message, rej targetT
}

if !msg.Header.Has(tagOrigSendingTime) {
if err := session.doReject(msg, RequiredTagMissing(tagOrigSendingTime)); err != nil {
if err := session.doReject(*msg, RequiredTagMissing(tagOrigSendingTime)); err != nil {
return handleStateError(session, err)
}
return state
}

var origSendingTime FIXUTCTimestamp
if err := msg.Header.GetField(tagOrigSendingTime, &origSendingTime); err != nil {
if rejErr := session.doReject(msg, err); rejErr != nil {
if rejErr := session.doReject(*msg, err); rejErr != nil {
return handleStateError(session, rejErr)
}
return state
Expand All @@ -343,7 +345,7 @@ func (state inSession) doTargetTooLow(session *session, msg Message, rej targetT
}

if sendingTime.Before(origSendingTime.Time) {
if err := session.doReject(msg, sendingTimeAccuracyProblem()); err != nil {
if err := session.doReject(*msg, sendingTimeAccuracyProblem()); err != nil {
return handleStateError(session, err)
}

Expand Down
6 changes: 3 additions & 3 deletions in_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *InSessionTestSuite) TestLogoutResetOnLogout() {
s.session.ResetOnLogout = true

s.MockApp.On("ToApp").Return(nil)
s.Nil(s.queueForSend(s.NewOrderSingle()))
s.Nil(s.queueForSend(*s.NewOrderSingle()))
s.MockApp.AssertExpectations(s.T())

s.MockApp.On("FromAdmin").Return(nil)
Expand Down Expand Up @@ -243,7 +243,7 @@ func (s *InSessionTestSuite) TestFIXMsgInResendRequestAllAdminThenApp() {
s.LastToAdminMessageSent()

s.MockApp.On("ToApp").Return(nil)
s.Require().Nil(s.session.send(s.NewOrderSingle()))
s.Require().Nil(s.session.send(*s.NewOrderSingle()))
s.LastToAppMessageSent()

s.MockApp.AssertNumberOfCalls(s.T(), "ToAdmin", 2)
Expand Down Expand Up @@ -280,7 +280,7 @@ func (s *InSessionTestSuite) TestFIXMsgInResendRequestDoNotSendApp() {
s.LastToAdminMessageSent()

s.MockApp.On("ToApp").Return(nil)
s.Require().Nil(s.session.send(s.NewOrderSingle()))
s.Require().Nil(s.session.send(*s.NewOrderSingle()))
s.LastToAppMessageSent()

s.session.Timeout(s.session, internal.NeedHeartbeat)
Expand Down
2 changes: 1 addition & 1 deletion latent_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ func (state latentState) String() string { return "Latent State" }
func (state latentState) IsLoggedOn() bool { return false }
func (state latentState) IsConnected() bool { return false }

func (state latentState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
func (state latentState) FixMsgIn(session *session, msg *Message) (nextState sessionState) {
session.log.OnEventf("Invalid Session State: Unexpected Msg %v while in Latent state", msg)
return state
}
Expand Down
6 changes: 3 additions & 3 deletions logon_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type logonState struct{ connectedNotLoggedOn }

func (s logonState) String() string { return "Logon State" }

func (s logonState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
func (s logonState) FixMsgIn(session *session, msg *Message) (nextState sessionState) {
msgType, err := msg.MsgType()
if err != nil {
return handleStateError(session, err)
Expand All @@ -20,13 +20,13 @@ func (s logonState) FixMsgIn(session *session, msg Message) (nextState sessionSt
return latentState{}
}

if err := session.handleLogon(msg); err != nil {
if err := session.handleLogon(*msg); err != nil {
switch err := err.(type) {
case RejectLogon:
session.log.OnEvent(err.Text)
logout := session.buildLogout(err.Text)

if err := session.dropAndSendInReplyTo(logout, false, &msg); err != nil {
if err := session.dropAndSendInReplyTo(logout, false, msg); err != nil {
session.logError(err)
}

Expand Down
2 changes: 1 addition & 1 deletion logout_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ type logoutState struct{ connectedNotLoggedOn }

func (state logoutState) String() string { return "Logout State" }

func (state logoutState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
func (state logoutState) FixMsgIn(session *session, msg *Message) (nextState sessionState) {
nextState = inSession{}.FixMsgIn(session, msg)
if nextState, ok := nextState.(latentState); ok {
return nextState
Expand Down
2 changes: 1 addition & 1 deletion logout_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (s *LogoutStateTestSuite) TestFixMsgInLogoutResetOnLogout() {
s.session.ResetOnLogout = true

s.MockApp.On("ToApp").Return(nil)
s.Nil(s.queueForSend(s.NewOrderSingle()))
s.Nil(s.queueForSend(*s.NewOrderSingle()))
s.MockApp.AssertExpectations(s.T())

s.MockApp.On("FromAdmin").Return(nil)
Expand Down
3 changes: 3 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ type Message struct {

//field bytes as they appear in the raw message
fields TagValues

//flag is true if this message should not be returned to pool after use
keepMessage bool
}

//ToMessage returns the message itself
Expand Down
26 changes: 26 additions & 0 deletions message_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package quickfix

type messagePool struct {
m []*Message
}

func (p *messagePool) New() *Message {
msg := NewMessage()
return &msg
}

func (p *messagePool) Get() (msg *Message) {
if len(p.m) > 0 {
msg, p.m = p.m[len(p.m)-1], p.m[:len(p.m)-1]
} else {
msg = p.New()
}

msg.keepMessage = false

return
}

func (p *messagePool) Put(msg *Message) {
p.m = append(p.m, msg)
}
2 changes: 1 addition & 1 deletion not_session_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ type notSessionTime struct{ latentState }
func (notSessionTime) String() string { return "Not session time" }
func (notSessionTime) IsSessionTime() bool { return false }

func (state notSessionTime) FixMsgIn(session *session, msg Message) (nextState sessionState) {
func (state notSessionTime) FixMsgIn(session *session, msg *Message) (nextState sessionState) {
session.log.OnEventf("Invalid Session State: Unexpected Msg %v while in Latent state", msg)
return state
}
Expand Down
16 changes: 8 additions & 8 deletions quickfix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (m *MessageFactory) SetNextSeqNum(next int) {
m.seqNum = next - 1
}

func (m *MessageFactory) buildMessage(msgType string) Message {
func (m *MessageFactory) buildMessage(msgType string) *Message {
m.seqNum++
msg := NewMessage()
msg.Header.
Expand All @@ -129,34 +129,34 @@ func (m *MessageFactory) buildMessage(msgType string) Message {
SetField(tagSendingTime, FIXUTCTimestamp{Time: time.Now()}).
SetField(tagMsgSeqNum, FIXInt(m.seqNum)).
SetField(tagMsgType, FIXString(msgType))
return msg
return &msg
}

func (m *MessageFactory) Logout() Message {
func (m *MessageFactory) Logout() *Message {
return m.buildMessage(string(enum.MsgType_LOGOUT))
}

func (m *MessageFactory) NewOrderSingle() Message {
func (m *MessageFactory) NewOrderSingle() *Message {
return m.buildMessage(string(enum.MsgType_ORDER_SINGLE))
}

func (m *MessageFactory) Heartbeat() Message {
func (m *MessageFactory) Heartbeat() *Message {
return m.buildMessage(string(enum.MsgType_HEARTBEAT))
}

func (m *MessageFactory) Logon() Message {
func (m *MessageFactory) Logon() *Message {
return m.buildMessage(string(enum.MsgType_LOGON))
}

func (m *MessageFactory) ResendRequest(beginSeqNo int) Message {
func (m *MessageFactory) ResendRequest(beginSeqNo int) *Message {
msg := m.buildMessage(string(enum.MsgType_RESEND_REQUEST))
msg.Body.SetField(tagBeginSeqNo, FIXInt(beginSeqNo))
msg.Body.SetField(tagEndSeqNo, FIXInt(0))

return msg
}

func (m *MessageFactory) SequenceReset(seqNo int) Message {
func (m *MessageFactory) SequenceReset(seqNo int) *Message {
msg := m.buildMessage(string(enum.MsgType_SEQUENCE_RESET))
msg.Body.SetField(tagNewSeqNo, FIXInt(seqNo))

Expand Down
Loading

0 comments on commit da1ca82

Please sign in to comment.