Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Contexts where appropriate #124

Open
lukebakken opened this issue Oct 3, 2022 · 14 comments
Open

Add Contexts where appropriate #124

lukebakken opened this issue Oct 3, 2022 · 14 comments
Assignees
Milestone

Comments

@lukebakken
Copy link
Contributor

https://www.digitalocean.com/community/tutorials/how-to-use-contexts-in-go

#121 (reply in thread)

See the following:

cc @mgdotson

@lukebakken lukebakken added this to the 2.0.0 milestone Oct 3, 2022
@Zerpet
Copy link
Contributor

Zerpet commented Oct 4, 2022

An example from the Go codebase on how to use a context in a function that makes an external call:

https://github.com/golang/go/blob/6d8ec893039a39f495c8139012e47754e4518b70/src/database/sql/ctxutil.go#L46-L61

@mrkagelui
Copy link

would be great to have this!

@lukebakken
Copy link
Contributor Author

@mrkagelui this project is open-source. We would welcome a contribution that adds this feature. @mgdotson also suggested he would be willing to work on it.

@t2y
Copy link
Contributor

t2y commented May 15, 2023

The Consume method doesn't take context for cancellation.

func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
  ...
}

I know the Channel has a Cancel method, and a developer can implement any cancel process. However, Go's context is a standard way to cancel a process, so passing context to Consume method can cancel delivering without implementing its own cancel process is useful. What do you think?

func (ch *Channel) ConsumeWithContext(ctx context.Context, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
  ...
}

@t2y
Copy link
Contributor

t2y commented May 15, 2023

I implemented #192 as a basis for discussion.

@lukebakken lukebakken self-assigned this May 15, 2023
@mgdotson
Copy link

mgdotson commented Jun 6, 2023

I'm not sure I quite follow the implementation (and I haven't followed the calling code all the way through to see if it is a blocking operation waiting for server confirmation).

Wouldn't the ch.call(req, res) be the call that needs the context due to a possible long/blocking execution time? Once we have the message queue, we can then timeout those messages in a select statement and manually cancel the consume.

If ch.call(..) is waiting on the server due to network delay or blocked status (would have to test this but it's late), this command won't return in the context provided until the server is unblocked meaning we'd still have to wrap Channel.Consume in a context.

example wrapper:

	finished := make(chan struct{})
	go func() {
		msgs, err = Channel.Consume(
			queue,
			consumer,
			autoAck,
			exclusive,
			noLocal,
			noWait,
			args,
		)
		close(finished)
	}()

	select {
	case <-finished:
		return msgs, err
	case <-ctx.Done():
		// cleanup 
		go func() { 
			<-finished
			_ = msg.Cancel(consumer, true)
		}()		
		return nil, fmt.Errorf("consume timed out")
	}

In the case of ConsumeWithContext, the ch.call(...) would be wrapped in the go func and additional cleanup could be done in the ctx.Done() code - such as ch.cancel, etc. allowing the ConsumeWithContext to not "hang" on the calling function waiting for server confirmation.

If I've misunderstood Consume and it's not a potentially blocking call with noWait=false, please ignore.

@t2y
Copy link
Contributor

t2y commented Jun 6, 2023

First of all, I also think ch.call(...) should take context and should be canceled in the future.

In your example, Consume() may block with the current implementation since ch.call(...) may block. For example, context won't cancel if Consume() was blocked. I think both #192 and your example are not much different.

@Zerpet
Copy link
Contributor

Zerpet commented Jun 6, 2023

I'm not sure about cancelling ch.call(...) because it may mess up the state machine in Channel.recv. The state machine should recover, but I'm not sure if it would recover when ch.call(...) is interrupted and ch.dispatch(message) is already called and trying to send the response back to the RPC. See:

func (ch *Channel) dispatch(msg message) {

amqp091-go/channel.go

Lines 365 to 373 in 1a875e1

case *basicDeliver:
ch.consumers.send(m.ConsumerTag, newDelivery(ch, m))
// TODO log failed consumer and close channel, this can happen when
// deliveries are in flight and a no-wait cancel has happened
default:
ch.rpc <- msg
}
}

Even if case *basicDeliver: does not block (because it discards a message when the consumer is not found i.e. closed), it is not correct to do so, because RabbitMQ will be waiting for an ack for a very long time (until consumer timeout), and the message won't be redelivered to other consumers.

If we want to cancel ch.call(...), I think the only reliable way would be to close the Channel with noWait=true. Alternatively, I would feel slightly better if we checked context cancellation before calling ch.call(...).

@mgdotson
Copy link

mgdotson commented Jun 6, 2023

@Zerpet Do you mean moving go func from #192 line 188 up to 182 or just a default context check such as:

select {
    case <-ctx.Done():
        return
    default:
}

ch.call(...)

A default check before the call is only slightly better but still leaves the caller in the state of waiting for the call to return and attempting to clean up state from outside if the context is valid prior to "call" but times out during the "call".

If moving context in #192 above "call", could there be a race with "ch.[Cc]ancel(consumer, false)" and a message pending that needs a nack?

Feels like the entire state engine needs to be reworked with contexts in mind which is a pretty big lift.

@Zerpet
Copy link
Contributor

Zerpet commented Jun 7, 2023

I meant a default context check as in your snippet.

Feels like the entire state engine needs to be reworked with contexts in mind which is a pretty big lift.

The more I think about this, the more I'm inclined to keep the state engine as-is, and implement a timeout mechanism that closes the AMQP channel. Anything else simply leaves the library in an unknown state. For example, Consume() might be waiting on ch.call(...) and give up; but the basic.consume frame may or may not have already arrive to the server. What should the library do? If we send a basic.cancel as part of the clean up, we may cause a channel exception if the server did not register the consumer, closing the channel. If we don't send a basic.cancel and the server registered the consumer, it will have a "leaked" consumer that will never consume and it will leave messages in-flight, unacked, unconsumable by other consumers. The conclusion is that the system is left in an unknown state, and it's not trivial to determine what to do next. At the same time, we don't want to start infering the state (we may not be able to) by doing subsequent calls to the server.

All in all, I think that closing the AMQP channel is the only reliable solution. It is intrusive, but it leaves the system in a known state, and it is straightforward for the caller what happened and what's the state.

Zerpet added a commit that referenced this issue May 6, 2024
The context was not honoured in any of the *WithContext functions. This
is confusing, and arguably broken. However, we cannot immediately fix
the context-support situation due to #124 (comment)

This commit undeprecates the non-context variants of publish, and
documents that both variants are equivalent. The example now favours the
non-context variants.

Related to #195

Signed-off-by: Aitor Perez Cedres <[email protected]>
Zerpet added a commit that referenced this issue May 6, 2024
The context was not honoured in any of the *WithContext functions. This
is confusing, and arguably broken. However, we cannot immediately fix
the context-support situation due to #124 (comment)

This commit undeprecates the non-context variants of publish, and
documents that both variants are equivalent. The example now favours the
non-context variants.

Related to #195

Signed-off-by: Aitor Perez Cedres <[email protected]>
Zerpet added a commit that referenced this issue May 7, 2024
The context was not honoured in any of the *WithContext functions. This
is confusing, and arguably broken. However, we cannot immediately fix
the context-support situation due to #124 (comment)

This commit undeprecates the non-context variants of publish, and
documents that both variants are equivalent. The example now favours the
non-context variants.

Related to #195

Signed-off-by: Aitor Perez Cedres <[email protected]>
lukebakken pushed a commit that referenced this issue May 7, 2024
The context was not honoured in any of the *WithContext functions. This
is confusing, and arguably broken. However, we cannot immediately fix
the context-support situation due to #124 (comment)

This commit undeprecates the non-context variants of publish, and
documents that both variants are equivalent. The example now favours the
non-context variants.

Related to #195

Signed-off-by: Aitor Perez Cedres <[email protected]>
@dgonzalezKentech

This comment was marked as off-topic.

@lukebakken

This comment was marked as off-topic.

@dgonzalezKentech

This comment was marked as off-topic.

@lukebakken

This comment was marked as off-topic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants