diff --git a/go.mod b/go.mod index 8372753..2b6f6fa 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/nbd-wtf/go-nostr -go 1.20 +go 1.21 require ( github.com/btcsuite/btcd/btcec/v2 v2.3.2 diff --git a/pool.go b/pool.go index b9435b8..4959cf5 100644 --- a/pool.go +++ b/pool.go @@ -102,6 +102,8 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt events := make(chan IncomingEvent) seenAlready := xsync.NewMapOf[Timestamp]() ticker := time.NewTicker(seenAlreadyDropTick) + + var mu sync.Mutex eose := false pending := xsync.NewCounter() @@ -139,6 +141,13 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt goto reconnect } + go func() { + <-sub.EndOfStoredEvents + mu.Lock() + eose = true + mu.Unlock() + }() + // reset interval when we get a good subscription interval = 3 * time.Second @@ -164,9 +173,8 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt case events <- IncomingEvent{Event: evt, Relay: relay}: case <-ctx.Done(): } - case <-sub.EndOfStoredEvents: - eose = true case <-ticker.C: + mu.Lock() if eose { old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix()) seenAlready.Range(func(id string, value Timestamp) bool { @@ -176,6 +184,7 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt return true }) } + mu.Unlock() case reason := <-sub.ClosedReason: if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed { // relay is requesting auth. if we can we will perform auth and try again