Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leaks due to MessageHandles - #53 #63

Merged
merged 3 commits into from
Jan 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions memoryleaks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright (c) IBM Corporation 2023
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* SPDX-License-Identifier: EPL-2.0
*/
package main

import (
"fmt"
"runtime"
"testing"
"time"

"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
"github.com/ibm-messaging/mq-golang-jms20/mqjms"
"github.com/stretchr/testify/assert"
)

/*
* Test for memory leak when there is no message to be received.
*
* This test is not included in the normal bucket as it sends an enormous number of
* messages, and requires human observation of the total process size to establish whether
* it passes or not, so can only be run under human supervision
*/
func DONT_RUNTestLeakOnEmptyGet(t *testing.T) {

// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
//cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
//assert.Nil(t, cfErr)

// Initialise the attributes of the CF in whatever way you like
cf := mqjms.ConnectionFactoryImpl{
QMName: "QM1",
Hostname: "localhost",
PortNumber: 1414,
ChannelName: "DEV.APP.SVRCONN",
UserName: "app",
Password: "passw0rd",
}

// Creates a connection to the queue manager, using defer to close it automatically
// at the end of the function (if it was created successfully)
context, ctxErr := cf.CreateContext()
assert.Nil(t, ctxErr)
if context != nil {
defer context.Close()
}

// Now send the message and get it back again, to check that it roundtripped.
queue := context.CreateQueue("DEV.QUEUE.1")

consumer, errCons := context.CreateConsumer(queue)
if consumer != nil {
defer consumer.Close()
}
assert.Nil(t, errCons)

for i := 1; i < 35000; i++ {

rcvMsg, errRvc := consumer.ReceiveNoWait()
assert.Nil(t, errRvc)
assert.Nil(t, rcvMsg)

if i%1000 == 0 {
fmt.Println("Messages:", i)
}

}

fmt.Println("Finished receive calls - waiting for cooldown.")
runtime.GC()

time.Sleep(30 * time.Second)

}

/*
* Test for memory leak when sending and receiving messages
*
* This test is not included in the normal bucket as it sends an enormous number of
* messages, and requires human observation of the total process size to establish whether
* it passes or not, so can only be run under human supervision
*/
func DONTRUN_TestLeakOnPutGet(t *testing.T) {

// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
//cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
//assert.Nil(t, cfErr)

// Initialise the attributes of the CF in whatever way you like
cf := mqjms.ConnectionFactoryImpl{
QMName: "QM1",
Hostname: "localhost",
PortNumber: 1414,
ChannelName: "DEV.APP.SVRCONN",
UserName: "app",
Password: "passw0rd",
}

// Creates a connection to the queue manager, using defer to close it automatically
// at the end of the function (if it was created successfully)
context, ctxErr := cf.CreateContext()
assert.Nil(t, ctxErr)
if context != nil {
defer context.Close()
}

// Now send the message and get it back again, to check that it roundtripped.
queue := context.CreateQueue("DEV.QUEUE.1")

consumer, errCons := context.CreateConsumer(queue)
if consumer != nil {
defer consumer.Close()
}
assert.Nil(t, errCons)

ttlMillis := 20000
producer := context.CreateProducer().SetTimeToLive(ttlMillis)

for i := 1; i < 25000; i++ {

// Create a TextMessage and check that we can populate it
msgBody := "Message " + fmt.Sprint(i)
txtMsg := context.CreateTextMessage()
txtMsg.SetText(msgBody)
txtMsg.SetIntProperty("MessageNumber", i)

errSend := producer.Send(queue, txtMsg)
assert.Nil(t, errSend)

rcvMsg, errRvc := consumer.ReceiveNoWait()
assert.Nil(t, errRvc)
assert.NotNil(t, rcvMsg)

// Check message body.
switch msg := rcvMsg.(type) {
case jms20subset.TextMessage:
assert.Equal(t, msgBody, *msg.GetText())
default:
assert.Fail(t, "Got something other than a text message")
}

// Check messageID
assert.Equal(t, txtMsg.GetJMSMessageID(), rcvMsg.GetJMSMessageID())

// Check int property
rcvMsgNum, propErr := rcvMsg.GetIntProperty("MessageNumber")
assert.Nil(t, propErr)
assert.Equal(t, i, rcvMsgNum)

if i%1000 == 0 {
fmt.Println("Messages:", i)
}

}

fmt.Println("Finished receive calls - waiting for cooldown.")
runtime.GC()

time.Sleep(30 * time.Second)

}
2 changes: 2 additions & 0 deletions mqjms/ConnectionFactoryImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package mqjms

import (
"strconv"
"sync"

"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
ibmmq "github.com/ibm-messaging/mq-golang/v5/ibmmq"
Expand Down Expand Up @@ -155,6 +156,7 @@ func (cf ConnectionFactoryImpl) CreateContextWithSessionMode(sessionMode int, mq
// a new ContextImpl and return it to the caller.
ctx = ContextImpl{
qMgr: qMgr,
ctxLock: &sync.Mutex{},
sessionMode: sessionMode,
receiveBufferSize: cf.ReceiveBufferSize,
sendCheckCount: cf.SendCheckCount,
Expand Down
56 changes: 56 additions & 0 deletions mqjms/ConsumerImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ package mqjms

import (
"errors"
"fmt"
"runtime"
"strconv"
"strings"
"sync"

"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
ibmmq "github.com/ibm-messaging/mq-golang/v5/ibmmq"
Expand Down Expand Up @@ -57,6 +60,11 @@ func (consumer ConsumerImpl) Receive(waitMillis int32) (jms20subset.Message, jms
// of receive.
func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Message, jms20subset.JMSException) {

// Lock the context while we are making calls to the queue manager so that it
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
consumer.ctx.ctxLock.Lock()
defer consumer.ctx.ctxLock.Unlock()

// Prepare objects to be used in receiving the message.
var msg jms20subset.Message
var jmsErr jms20subset.JMSException
Expand Down Expand Up @@ -99,6 +107,11 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess

if err == nil {

// Set a finalizer on the message handle to allow it to be deleted
// when it is no longer referenced by an active object, to reduce/prevent
// memory leaks.
setMessageHandlerFinalizer(thisMsgHandle, consumer.ctx.ctxLock)

// Message received successfully (without error).
// Determine on the basis of the format field what sort of message to create.

Expand All @@ -116,6 +129,7 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
MessageImpl: MessageImpl{
mqmd: getmqmd,
msgHandle: &thisMsgHandle,
ctxLock: consumer.ctx.ctxLock,
},
}

Expand All @@ -133,6 +147,7 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
MessageImpl: MessageImpl{
mqmd: getmqmd,
msgHandle: &thisMsgHandle,
ctxLock: consumer.ctx.ctxLock,
},
}
}
Expand All @@ -142,6 +157,11 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
// Error code was returned from MQ call.
mqret := err.(*ibmmq.MQReturn)

// Delete the message handle object in-line here now that it is no longer required,
// to avoid memory leak
dmho := ibmmq.NewMQDMHO()
gmo.MsgHandle.DltMH(dmho)

if mqret.MQRC == ibmmq.MQRC_NO_MSG_AVAILABLE {

// This isn't a real error - it's the way that MQ indicates that there
Expand All @@ -164,6 +184,36 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
return msg, jmsErr
}

/*
* Set a finalizer on the message handle to allow it to be deleted
* when it is no longer referenced by an active object, to reduce/prevent
* memory leaks.
*/
func setMessageHandlerFinalizer(thisMsgHandle ibmmq.MQMessageHandle, ctxLock *sync.Mutex) {

runtime.SetFinalizer(&thisMsgHandle, func(msgHandle *ibmmq.MQMessageHandle) {
ctxLock.Lock()
defer ctxLock.Unlock()

dmho := ibmmq.NewMQDMHO()
err := msgHandle.DltMH(dmho)
if err != nil {

mqret := err.(*ibmmq.MQReturn)

if mqret.MQRC == ibmmq.MQRC_HCONN_ERROR {
// Expected if the connection is closed before the finalizer executes
// (at which point it should get tidied up automatically by the connection)
} else {
fmt.Println("DltMH finalizer", err)
}

}

})

}

// ReceiveStringBodyNoWait implements the IBM MQ logic necessary to receive a
// message from a Destination and return its body as a string.
//
Expand Down Expand Up @@ -356,6 +406,12 @@ func applySelector(selector string, getmqmd *ibmmq.MQMD, gmo *ibmmq.MQGMO) error
func (consumer ConsumerImpl) Close() {

if (ibmmq.MQObject{}) != consumer.qObject {

// Lock the context while we are making calls to the queue manager so that it
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
consumer.ctx.ctxLock.Lock()
defer consumer.ctx.ctxLock.Unlock()

consumer.qObject.Close(0)
}

Expand Down
Loading