Skip to content

Commit

Permalink
Implement progressive call invocations
Browse files Browse the repository at this point in the history
  • Loading branch information
muzzammilshahid committed Sep 23, 2024
1 parent fd8f83f commit 28b494f
Showing 1 changed file with 27 additions and 8 deletions.
35 changes: 27 additions & 8 deletions dealer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,17 @@ type Registration struct {
InvocationPolicy string
}

type CallMap struct {
CallerID int64
CallID int64
}

type Dealer struct {
sessions map[int64]*SessionDetails
registrationsByProcedure map[string]*Registration
registrationsBySession map[int64]map[int64]*Registration
pendingCalls map[int64]*PendingInvocation
invocationIDbyCall map[CallMap]int64

idGen *SessionScopeIDGenerator
sync.Mutex
Expand All @@ -43,6 +49,7 @@ func NewDealer() *Dealer {
registrationsByProcedure: make(map[string]*Registration),
registrationsBySession: make(map[int64]map[int64]*Registration),
pendingCalls: make(map[int64]*PendingInvocation),
invocationIDbyCall: make(map[CallMap]int64),
idGen: &SessionScopeIDGenerator{},
}
}
Expand Down Expand Up @@ -113,21 +120,33 @@ func (d *Dealer) ReceiveMessage(sessionID int64, msg messages.Message) (*Message
break
}
receiveProgress, _ := call.Options()[OptionReceiveProgress].(bool)

invocationID := d.idGen.NextID()
d.pendingCalls[invocationID] = &PendingInvocation{
RequestID: call.RequestID(),
CallerID: sessionID,
CalleeID: callee,
ReceiveProgress: receiveProgress,
progress, _ := call.Options()[OptionProgress].(bool)

invocationID, ok := d.invocationIDbyCall[CallMap{CallerID: sessionID, CallID: call.RequestID()}]
if !ok || !progress {
invocationID = d.idGen.NextID()
d.pendingCalls[invocationID] = &PendingInvocation{
RequestID: call.RequestID(),
CallerID: sessionID,
CalleeID: callee,
ReceiveProgress: receiveProgress,
Progress: progress,
}
d.invocationIDbyCall[CallMap{CallerID: sessionID, CallID: call.RequestID()}] = invocationID
}

var invocation *messages.Invocation
if call.PayloadIsBinary() && d.sessions[callee].StaticSerializer() {
invocation = messages.NewInvocationBinary(invocationID, regs.ID, nil, call.Payload(),
call.PayloadSerializer())
} else {
details := map[string]any{OptionReceiveProgress: receiveProgress}
details := map[string]any{}
if receiveProgress {
details[OptionReceiveProgress] = receiveProgress
}
if progress {
details[OptionProgress] = progress
}
invocation = messages.NewInvocation(invocationID, regs.ID, details, call.Args(), call.KwArgs())
}

Expand Down

0 comments on commit 28b494f

Please sign in to comment.