Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete processed events #94

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions cmd/event-service/di/inject_application.go

This file was deleted.

21 changes: 21 additions & 0 deletions cmd/event-service/di/inject_bloom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package di

import (
"time"

"github.com/google/wire"
"github.com/planetary-social/nos-event-service/service/domain/bloom"
)

var bloomSet = wire.NewSet(
provideBloomFilter,
)

func provideBloomFilter() *bloom.EventFilter {
config := bloom.FilterConfig{
ExpectedItems: 1_000_000, // Expect 1M events per rotation
FalsePositiveRate: 0.001, // 0.1% false positive rate
RotationInterval: time.Hour, // Rotate every hour
}
return bloom.NewEventFilter(config)
}
46 changes: 17 additions & 29 deletions cmd/event-service/di/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/planetary-social/nos-event-service/service/domain"
"github.com/planetary-social/nos-event-service/service/domain/downloader"
"github.com/planetary-social/nos-event-service/service/domain/relays"
"github.com/planetary-social/nos-event-service/service/ports/memorypubsub"
"github.com/planetary-social/nos-event-service/service/ports/sqlitepubsub"
)

func BuildService(context.Context, config.Config) (Service, func(), error) {
Expand All @@ -36,6 +38,7 @@ func BuildService(context.Context, config.Config) (Service, func(), error) {
domainSet,
externalPubsubSet,
extractConfigSet,
bloomSet,
)
return Service{}, nil, nil
}
Expand All @@ -55,42 +58,13 @@ func BuildTestAdapters(context.Context, testing.TB) (sqlite.TestedItems, func(),
}

type TestApplication struct {
GetEventsHandler *app.GetEventsHandler

EventRepository *mocks.EventRepository
}

func BuildTestApplication(context.Context, testing.TB) (TestApplication, error) {
wire.Build(
wire.Struct(new(TestApplication), "*"),

wire.Struct(new(app.Adapters), "*"),

mocks.NewTransactionProvider,
wire.Bind(new(app.TransactionProvider), new(*mocks.TransactionProvider)),

mocks.NewEventRepository,
wire.Bind(new(app.EventRepository), new(*mocks.EventRepository)),

mocks.NewRelayRepository,
wire.Bind(new(app.RelayRepository), new(*mocks.RelayRepository)),

mocks.NewContactRepository,
wire.Bind(new(app.ContactRepository), new(*mocks.ContactRepository)),

mocks.NewPublicKeysToMonitorRepository,
wire.Bind(new(app.PublicKeysToMonitorRepository), new(*mocks.PublicKeysToMonitorRepository)),

mocks.NewPublisher,
wire.Bind(new(app.Publisher), new(*mocks.Publisher)),

mocks.NewMetrics,
wire.Bind(new(app.Metrics), new(*mocks.Metrics)),

applicationSet,
loggingSet,

wire.Value(logging.LevelError),
)
return TestApplication{}, nil
}
Expand Down Expand Up @@ -171,3 +145,17 @@ var domainSet = wire.NewSet(
domain.NewContactsExtractor,
wire.Bind(new(app.ContactsExtractor), new(*domain.ContactsExtractor)),
)

var applicationSet = wire.NewSet(
wire.Struct(new(app.Application), "*"),

app.NewSaveReceivedEventHandler,
wire.Bind(new(memorypubsub.SaveReceivedEventHandler), new(*app.SaveReceivedEventHandler)),

app.NewProcessSavedEventHandler,
wire.Bind(new(sqlitepubsub.ProcessSavedEventHandler), new(*app.ProcessSavedEventHandler)),

app.NewUpdateMetricsHandler,
app.NewAddPublicKeyToMonitorHandler,
app.NewGetPublicKeyInfoHandler,
)
42 changes: 7 additions & 35 deletions cmd/event-service/di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21
require (
github.com/ThreeDotsLabs/watermill v1.3.1
github.com/ThreeDotsLabs/watermill-googlecloud v1.0.13
github.com/bits-and-blooms/bloom/v3 v3.7.0
github.com/boreq/errors v0.1.0
github.com/boreq/rest v0.1.0
github.com/davecgh/go-spew v1.1.1
Expand Down Expand Up @@ -32,6 +33,7 @@ require (
cloud.google.com/go/longrunning v0.4.2 // indirect
cloud.google.com/go/pubsub v1.30.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
github.com/btcsuite/btcd/btcutil v1.1.3 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBA
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bits-and-blooms/bitset v1.10.0 h1:ePXTeiPEazB5+opbv5fr8umg2R/1NlzgDsyepwsSr88=
github.com/bits-and-blooms/bitset v1.10.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bits-and-blooms/bloom/v3 v3.7.0 h1:VfknkqV4xI+PsaDIsoHueyxVDZrfvMn56jeWUzvzdls=
github.com/bits-and-blooms/bloom/v3 v3.7.0/go.mod h1:VKlUSvp0lFIYqxJjzdnSsZEw4iHb1kOL2tfHTgyJBHg=
github.com/boreq/errors v0.1.0 h1:aJIXv9JnyR5KtxFpQ8/AiblH3nfYmr1e1yoTze/5A1k=
github.com/boreq/errors v0.1.0/go.mod h1:B3dsXzhYvfgUXp7ViU/moPYM4PojgQ9MiQ21uvY6qqQ=
github.com/boreq/rest v0.1.0 h1:bAx31Rp1KrXHkCOlzqAtLKdh74xbly2SHkv9k3vX3iA=
Expand Down Expand Up @@ -243,6 +247,8 @@ github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JT
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg=
github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
Expand Down
35 changes: 35 additions & 0 deletions internal/fixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/nbd-wtf/go-nostr"
"github.com/planetary-social/nos-event-service/internal"
"github.com/planetary-social/nos-event-service/internal/logging"
"github.com/planetary-social/nos-event-service/service/app"
"github.com/planetary-social/nos-event-service/service/domain"
"github.com/planetary-social/nos-event-service/service/domain/downloader"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -221,3 +222,37 @@ func RequireEqualEventSlices(tb testing.TB, a, b []domain.Event) {
require.Equal(tb, a[i].Raw(), b[i].Raw())
}
}

type MockTransactionProvider struct {
EventRepository app.EventRepository
RelayRepository app.RelayRepository
ContactRepository app.ContactRepository
}

func NewTransactionProvider(
eventRepo app.EventRepository,
relayRepo app.RelayRepository,
contactRepo app.ContactRepository,
) *MockTransactionProvider {
return &MockTransactionProvider{
EventRepository: eventRepo,
RelayRepository: relayRepo,
ContactRepository: contactRepo,
}
}

func (m *MockTransactionProvider) Transact(ctx context.Context, f func(context.Context, app.Adapters) error) error {
return f(ctx, app.Adapters{
Events: m.EventRepository,
Relays: m.RelayRepository,
Contacts: m.ContactRepository,
})
}

func (m *MockTransactionProvider) ReadOnly(ctx context.Context, f func(context.Context, app.Adapters) error) error {
return f(ctx, app.Adapters{
Events: m.EventRepository,
Relays: m.RelayRepository,
Contacts: m.ContactRepository,
})
}
48 changes: 48 additions & 0 deletions internal/logging/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,51 @@ func init() {
}
}
}

type nopLogger struct{}

func NewNopLogger() Logger {
return &nopLogger{}
}

func (n *nopLogger) Trace() Entry {
return &nopEntry{}
}

func (n *nopLogger) Debug() Entry {
return &nopEntry{}
}

func (n *nopLogger) Info() Entry {
return &nopEntry{}
}

func (n *nopLogger) Error() Entry {
return &nopEntry{}
}

func (n *nopLogger) WithError(err error) Logger {
return n
}

func (n *nopLogger) WithField(key string, value interface{}) Logger {
return n
}

func (n *nopLogger) New(component string) Logger {
return n
}

type nopEntry struct{}

func (n *nopEntry) WithError(err error) Entry {
return n
}

func (n *nopEntry) WithField(key string, value interface{}) Entry {
return n
}

func (n *nopEntry) Message(msg string) {
// Do nothing
}
5 changes: 3 additions & 2 deletions service/adapters/mocks/contact_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mocks
import (
"context"

"github.com/planetary-social/nos-event-service/service/app"
"github.com/planetary-social/nos-event-service/service/domain"
)

Expand All @@ -14,11 +15,11 @@ func NewContactRepository() *ContactRepository {
}

func (c ContactRepository) GetCurrentContactsEvent(ctx context.Context, author domain.PublicKey) (domain.Event, error) {
panic("implement me")
return domain.Event{}, app.ErrNoContactsEvent
}

func (c ContactRepository) SetContacts(ctx context.Context, event domain.Event, contacts []domain.PublicKey) error {
panic("implement me")
return nil
}

func (c ContactRepository) GetFollowees(ctx context.Context, publicKey domain.PublicKey) ([]domain.PublicKey, error) {
Expand Down
Loading
Loading