Skip to content

Commit

Permalink
Fix MessageHandle memory leak on receive - ibm-messaging#53
Browse files Browse the repository at this point in the history
  • Loading branch information
matrober-uk committed Jan 21, 2023
1 parent af0d578 commit 18c12c2
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 0 deletions.
66 changes: 66 additions & 0 deletions memoryleaks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) IBM Corporation 2023
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* SPDX-License-Identifier: EPL-2.0
*/
package main

import (
"fmt"
"runtime"
"testing"
"time"

"github.com/ibm-messaging/mq-golang-jms20/mqjms"
"github.com/stretchr/testify/assert"
)

/*
* Test for memory leak when there is no message to be received.
*/
func DONTRUN_TestLeakOnEmptyGet(t *testing.T) {

// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
assert.Nil(t, cfErr)

// Creates a connection to the queue manager, using defer to close it automatically
// at the end of the function (if it was created successfully)
context, ctxErr := cf.CreateContext()
assert.Nil(t, ctxErr)
if context != nil {
defer context.Close()
}

// Now send the message and get it back again, to check that it roundtripped.
queue := context.CreateQueue("DEV.QUEUE.1")

consumer, errCons := context.CreateConsumer(queue)
if consumer != nil {
defer consumer.Close()
}
assert.Nil(t, errCons)

for i := 1; i < 35000; i++ {

//time.Sleep(100 * time.Millisecond)
rcvMsg, errRvc := consumer.ReceiveNoWait()
assert.Nil(t, errRvc)
assert.Nil(t, rcvMsg)

if i%1000 == 0 {
fmt.Println("Messages:", i)
}

}

fmt.Println("Finished receive calls - waiting for cooldown.")
runtime.GC()

time.Sleep(30 * time.Second)

}
2 changes: 2 additions & 0 deletions mqjms/ConnectionFactoryImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package mqjms

import (
"strconv"
"sync"

"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
ibmmq "github.com/ibm-messaging/mq-golang/v5/ibmmq"
Expand Down Expand Up @@ -155,6 +156,7 @@ func (cf ConnectionFactoryImpl) CreateContextWithSessionMode(sessionMode int, mq
// a new ContextImpl and return it to the caller.
ctx = ContextImpl{
qMgr: qMgr,
ctxLock: &sync.Mutex{},
sessionMode: sessionMode,
receiveBufferSize: cf.ReceiveBufferSize,
sendCheckCount: cf.SendCheckCount,
Expand Down
34 changes: 34 additions & 0 deletions mqjms/ConsumerImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ package mqjms

import (
"errors"
"fmt"
"runtime"
"strconv"
"strings"

Expand Down Expand Up @@ -57,6 +59,11 @@ func (consumer ConsumerImpl) Receive(waitMillis int32) (jms20subset.Message, jms
// of receive.
func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Message, jms20subset.JMSException) {

// Lock the context while we are making calls to the queue manager so that it
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
consumer.ctx.ctxLock.Lock()
defer consumer.ctx.ctxLock.Unlock()

// Prepare objects to be used in receiving the message.
var msg jms20subset.Message
var jmsErr jms20subset.JMSException
Expand Down Expand Up @@ -99,6 +106,28 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess

if err == nil {

// Set a finalizer on the message handle to allow it to be deleted
// when it is no longer referenced by an active object, to reduce/prevent
// memory leaks.
runtime.SetFinalizer(&thisMsgHandle, func(msgHandle *ibmmq.MQMessageHandle) {
consumer.ctx.ctxLock.Lock()
dmho := ibmmq.NewMQDMHO()
err := msgHandle.DltMH(dmho)
if err != nil {

mqret := err.(*ibmmq.MQReturn)

if mqret.MQRC == ibmmq.MQRC_HCONN_ERROR {
// Expected if the connection is closed before the finalizer executes
// (at which point it should get tidied up automatically by the connection)
} else {
fmt.Println("DltMH finalizer", err)
}

}
consumer.ctx.ctxLock.Unlock()
})

// Message received successfully (without error).
// Determine on the basis of the format field what sort of message to create.

Expand Down Expand Up @@ -142,6 +171,11 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
// Error code was returned from MQ call.
mqret := err.(*ibmmq.MQReturn)

// Delete the message handle object in-line here now that it is no longer required,
// to avoid memory leak
dmho := ibmmq.NewMQDMHO()
gmo.MsgHandle.DltMH(dmho)

if mqret.MQRC == ibmmq.MQRC_NO_MSG_AVAILABLE {

// This isn't a real error - it's the way that MQ indicates that there
Expand Down
48 changes: 48 additions & 0 deletions mqjms/ContextImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package mqjms
import (
"fmt"
"strconv"
"sync"

"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
ibmmq "github.com/ibm-messaging/mq-golang/v5/ibmmq"
Expand All @@ -21,6 +22,7 @@ import (
// connection to an IBM MQ queue manager.
type ContextImpl struct {
qMgr ibmmq.MQQueueManager
ctxLock *sync.Mutex // Mutex to synchronize MQRC calls to the queue manager
sessionMode int
receiveBufferSize int
sendCheckCount int
Expand Down Expand Up @@ -65,6 +67,11 @@ func (ctx ContextImpl) CreateConsumer(dest jms20subset.Destination) (jms20subset
// receive messages that match the specified selector from the given Destination.
func (ctx ContextImpl) CreateConsumerWithSelector(dest jms20subset.Destination, selector string) (jms20subset.JMSConsumer, jms20subset.JMSException) {

// Lock the context while we are making calls to the queue manager so that it
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
ctx.ctxLock.Lock()
defer ctx.ctxLock.Unlock()

// First validate the selector string format (we don't make use of it at
// runtime until the receive is called)
if selector != "" {
Expand Down Expand Up @@ -118,6 +125,11 @@ func (ctx ContextImpl) CreateConsumerWithSelector(dest jms20subset.Destination,
// an application can look at messages without removing them.
func (ctx ContextImpl) CreateBrowser(dest jms20subset.Destination) (jms20subset.QueueBrowser, jms20subset.JMSException) {

// Lock the context while we are making calls to the queue manager so that it
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
ctx.ctxLock.Lock()
defer ctx.ctxLock.Unlock()

// Set up the necessary objects to open the queue
mqod := ibmmq.NewMQOD()
var openOptions int32
Expand Down Expand Up @@ -165,6 +177,11 @@ func (ctx ContextImpl) CreateBrowser(dest jms20subset.Destination) (jms20subset.
// CreateTextMessage is a JMS standard mechanism for creating a TextMessage.
func (ctx ContextImpl) CreateTextMessage() jms20subset.TextMessage {

// Lock the context while we are making calls to the queue manager so that it
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
ctx.ctxLock.Lock()
defer ctx.ctxLock.Unlock()

var bodyStr *string
thisMsgHandle := createMsgHandle(ctx.qMgr)

Expand Down Expand Up @@ -198,6 +215,11 @@ func createMsgHandle(qMgr ibmmq.MQQueueManager) ibmmq.MQMessageHandle {
// and initialise it with the chosen text string.
func (ctx ContextImpl) CreateTextMessageWithString(txt string) jms20subset.TextMessage {

// Lock the context while we are making calls to the queue manager so that it
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
ctx.ctxLock.Lock()
defer ctx.ctxLock.Unlock()

thisMsgHandle := createMsgHandle(ctx.qMgr)

msg := &TextMessageImpl{
Expand All @@ -213,6 +235,11 @@ func (ctx ContextImpl) CreateTextMessageWithString(txt string) jms20subset.TextM
// CreateBytesMessage is a JMS standard mechanism for creating a BytesMessage.
func (ctx ContextImpl) CreateBytesMessage() jms20subset.BytesMessage {

// Lock the context while we are making calls to the queue manager so that it
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
ctx.ctxLock.Lock()
defer ctx.ctxLock.Unlock()

var thisBodyBytes *[]byte
thisMsgHandle := createMsgHandle(ctx.qMgr)

Expand All @@ -227,6 +254,11 @@ func (ctx ContextImpl) CreateBytesMessage() jms20subset.BytesMessage {
// CreateBytesMessageWithBytes is a JMS standard mechanism for creating a BytesMessage.
func (ctx ContextImpl) CreateBytesMessageWithBytes(bytes []byte) jms20subset.BytesMessage {

// Lock the context while we are making calls to the queue manager so that it
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
ctx.ctxLock.Lock()
defer ctx.ctxLock.Unlock()

thisMsgHandle := createMsgHandle(ctx.qMgr)

return &BytesMessageImpl{
Expand All @@ -240,6 +272,11 @@ func (ctx ContextImpl) CreateBytesMessageWithBytes(bytes []byte) jms20subset.Byt
// Commit confirms all messages that were sent under this transaction.
func (ctx ContextImpl) Commit() jms20subset.JMSException {

// Lock the context while we are making calls to the queue manager so that it
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
ctx.ctxLock.Lock()
defer ctx.ctxLock.Unlock()

var retErr jms20subset.JMSException

if (ibmmq.MQQueueManager{}) != ctx.qMgr {
Expand Down Expand Up @@ -294,6 +331,11 @@ func (ctx ContextImpl) Commit() jms20subset.JMSException {
// Rollback releases all messages that were sent under this transaction.
func (ctx ContextImpl) Rollback() jms20subset.JMSException {

// Lock the context while we are making calls to the queue manager so that it
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
ctx.ctxLock.Lock()
defer ctx.ctxLock.Unlock()

var retErr jms20subset.JMSException

if (ibmmq.MQQueueManager{}) != ctx.qMgr {
Expand Down Expand Up @@ -321,6 +363,12 @@ func (ctx ContextImpl) Close() {
ctx.Rollback()

if (ibmmq.MQQueueManager{}) != ctx.qMgr {

// Lock the context while we are making calls to the queue manager so that it
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
ctx.ctxLock.Lock()
defer ctx.ctxLock.Unlock()

ctx.qMgr.Disc()
}

Expand Down
5 changes: 5 additions & 0 deletions mqjms/ProducerImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ func (producer ProducerImpl) SendBytes(dest jms20subset.Destination, body []byte
// that are defined on this JMSProducer.
func (producer ProducerImpl) Send(dest jms20subset.Destination, msg jms20subset.Message) jms20subset.JMSException {

// Lock the context while we are making calls to the queue manager so that it
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
producer.ctx.ctxLock.Lock()
defer producer.ctx.ctxLock.Unlock()

// Set up the basic objects we need to send the message.
mqod := ibmmq.NewMQOD()
putmqmd := ibmmq.NewMQMD()
Expand Down

0 comments on commit 18c12c2

Please sign in to comment.