Skip to content

Commit

Permalink
Fix an error into channel object
Browse files Browse the repository at this point in the history
  • Loading branch information
Leshanu E committed Nov 25, 2023
1 parent f13e82c commit 7a97d24
Show file tree
Hide file tree
Showing 12 changed files with 26 additions and 25 deletions.
2 changes: 1 addition & 1 deletion _examples/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"syscall"
"time"

amqp "github.com/rabbitmq/amqp091-go"
amqp "github.com/killer-djon/rabbitmq-go"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion _examples/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package main
import (
"context"
"flag"
amqp "github.com/rabbitmq/amqp091-go"
amqp "github.com/killer-djon/rabbitmq-go"
"log"
"os"
"os/signal"
Expand Down
2 changes: 1 addition & 1 deletion _examples/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"os"
"time"

amqp "github.com/rabbitmq/amqp091-go"
amqp "github.com/killer-djon/rabbitmq-go"
)

var url = flag.String("url", "amqp:///", "AMQP url for both the publisher and subscriber")
Expand Down
22 changes: 11 additions & 11 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,27 +389,27 @@ func (ch *Channel) recvMethod(f frame) {
if msg, ok := frame.Method.(messageWithContent); ok {
ch.body = make([]byte, 0)
ch.message = msg
ch.transition((*Channel).recvHeader)
ch.transition(ch.recvHeader)
return
}

ch.dispatch(frame.Method) // termination state
ch.transition((*Channel).recvMethod)
ch.transition(ch.recvHeader)

This comment has been minimized.

Copy link
@lukebakken

lukebakken Dec 23, 2023

Contributor

This should be ch.recvMethod


case *headerFrame:
// drop
ch.transition((*Channel).recvMethod)
ch.transition(ch.recvHeader)

This comment has been minimized.

Copy link
@lukebakken

lukebakken Dec 23, 2023

Contributor

This should be ch.recvMethod


case *bodyFrame:
// drop
ch.transition((*Channel).recvMethod)
ch.transition(ch.recvHeader)

This comment has been minimized.

Copy link
@lukebakken

lukebakken Dec 23, 2023

Contributor

This should be ch.recvMethod


default:
panic("unexpected frame type")
}
}

func (ch *Channel) recvHeader(f frame) {
func (ch *Channel) recvHeader(_ *Channel, f frame) {
switch frame := f.(type) {
case *methodFrame:
// interrupt content and handle method
Expand All @@ -422,14 +422,14 @@ func (ch *Channel) recvHeader(f frame) {
if frame.Size == 0 {
ch.message.setContent(ch.header.Properties, ch.body)
ch.dispatch(ch.message) // termination state
ch.transition((*Channel).recvMethod)
ch.transition(ch.recvHeader)

This comment has been minimized.

Copy link
@lukebakken

lukebakken Dec 23, 2023

Contributor

This should be ch.recvMethod

return
}
ch.transition((*Channel).recvContent)
ch.transition(ch.recvHeader)

This comment has been minimized.

Copy link
@lukebakken

lukebakken Dec 23, 2023

Contributor

This should be ch.recvContent


case *bodyFrame:
// drop and reset
ch.transition((*Channel).recvMethod)
ch.transition(ch.recvHeader)

default:
panic("unexpected frame type")
Expand All @@ -446,7 +446,7 @@ func (ch *Channel) recvContent(f frame) {

case *headerFrame:
// drop and reset
ch.transition((*Channel).recvMethod)
ch.transition(ch.recvHeader)

This comment has been minimized.

Copy link
@lukebakken

lukebakken Dec 23, 2023

Contributor

This should be ch.recvMethod


case *bodyFrame:
if cap(ch.body) == 0 {
Expand All @@ -457,11 +457,11 @@ func (ch *Channel) recvContent(f frame) {
if uint64(len(ch.body)) >= ch.header.Size {
ch.message.setContent(ch.header.Properties, ch.body)
ch.dispatch(ch.message) // termination state
ch.transition((*Channel).recvMethod)
ch.transition(ch.recvHeader)

This comment has been minimized.

Copy link
@lukebakken

lukebakken Dec 23, 2023

Contributor

This should be ch.recvMethod

return
}

ch.transition((*Channel).recvContent)
ch.transition(ch.recvHeader)

This comment has been minimized.

Copy link
@lukebakken

lukebakken Dec 23, 2023

Contributor

This should be ch.recvContent


default:
panic("unexpected frame type")
Expand Down
2 changes: 1 addition & 1 deletion connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ func (c *Connection) openTune(config Config, auth Authentication) error {
}

// Edge case that may race with c.shutdown()
// https://github.com/rabbitmq/amqp091-go/issues/170
// https://github.com/killer-djon/rabbitmq-go/issues/170
c.m.Lock()

// When the server and client both use default 0, then the max channel is
Expand Down
4 changes: 2 additions & 2 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func TestChannelIsClosed(t *testing.T) {
}

// TestReaderGoRoutineTerminatesWhenMsgIsProcessedDuringClose tests the issue
// described in https://github.com/rabbitmq/amqp091-go/issues/69.
// described in https://github.com/killer-djon/rabbitmq-go/issues/69.
func TestReaderGoRoutineTerminatesWhenMsgIsProcessedDuringClose(t *testing.T) {
const routines = 10
c := integrationConnection(t, t.Name())
Expand Down Expand Up @@ -340,7 +340,7 @@ func TestNewConnectionProperties_HasDefaultProperties(t *testing.T) {
}

// Connection and channels should be closeable when a memory alarm is active.
// https://github.com/rabbitmq/amqp091-go/issues/178
// https://github.com/killer-djon/rabbitmq-go/issues/178
func TestConnection_Close_WhenMemoryAlarmIsActive(t *testing.T) {
err := rabbitmqctl(t, "set_vm_memory_high_watermark", "0.0001")
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) {

case out <- *queue[0]:
/*
* https://github.com/rabbitmq/amqp091-go/issues/179
* https://github.com/rabbitmq/amqp091-go/pull/180
* https://github.com/killer-djon/rabbitmq-go/issues/179
* https://github.com/killer-djon/rabbitmq-go/pull/180
*
* Comment from @lars-t-hansen:
*
Expand Down
2 changes: 1 addition & 1 deletion example_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"os"
"time"

amqp "github.com/rabbitmq/amqp091-go"
amqp "github.com/killer-djon/rabbitmq-go"
)

// This exports a Client object that wraps this library. It
Expand Down
2 changes: 1 addition & 1 deletion examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"runtime"
"time"

amqp "github.com/rabbitmq/amqp091-go"
amqp "github.com/killer-djon/rabbitmq-go"
)

func ExampleConfig_timeout() {
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
module github.com/rabbitmq/amqp091-go
//module github.com/rabbitmq/amqp091-go
module github.com/killer-djon/rabbitmq-go

go 1.16

Expand Down
4 changes: 2 additions & 2 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2025,7 +2025,7 @@ func TestIntegrationGetNextPublishSeqNo(t *testing.T) {
}
}

// https://github.com/rabbitmq/amqp091-go/pull/44
// https://github.com/killer-djon/rabbitmq-go/pull/44
func TestShouldNotWaitAfterConnectionClosedIssue44(t *testing.T) {
conn := integrationConnection(t, "TestShouldNotWaitAfterConnectionClosedIssue44")
ch, err := conn.Channel()
Expand Down Expand Up @@ -2120,7 +2120,7 @@ func assertConsumeBody(t *testing.T, messages <-chan Delivery, want []byte) (msg
return msg
}

// https://github.com/rabbitmq/amqp091-go/issues/11
// https://github.com/killer-djon/rabbitmq-go/issues/11
func TestShouldNotWaitAfterConnectionClosedNewChannelCreatedIssue11(t *testing.T) {
conn := integrationConnection(t, "TestShouldNotWaitAfterConnectionClosedNewChannelCreatedIssue11")
ch, err := conn.Channel()
Expand Down
2 changes: 1 addition & 1 deletion reconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"fmt"
"os"

amqp "github.com/rabbitmq/amqp091-go"
amqp "github.com/killer-djon/rabbitmq-go"
)

// Every connection should declare the topology they expect
Expand Down

1 comment on commit 7a97d24

@lukebakken
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit WILL RESULT IN FAILURES, be warned!

Please sign in to comment.