Skip to content

Commit

Permalink
chore: huge refactor.
Browse files Browse the repository at this point in the history
  • Loading branch information
foxis committed Apr 25, 2024
1 parent b467e4e commit ab10ecf
Show file tree
Hide file tree
Showing 21 changed files with 714 additions and 407 deletions.
59 changes: 30 additions & 29 deletions container.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (
)

var (
_ Endpoint = (*Container)(nil)
_ CloseNotifier = (*Container)(nil)
_ Endpoint = (*Container)(nil)
)

// Container stores endpoints, collects all intents and interests and acts as an aggregate Endpoint.
type Container struct {
*Base
*BaseEndpoint

mu sync.Mutex
endpoints []Endpoint
Expand All @@ -26,7 +26,7 @@ type Container struct {

func NewContainer(name string, size int) *Container {
return &Container{
Base: NewBase(name, size),
BaseEndpoint: NewBase(name, size),
endpoints: make([]Endpoint, 0, 8),
intentRouters: make(map[string]*IntentRouter),
interestRouters: make(map[string]*InterestRouter),
Expand All @@ -42,13 +42,18 @@ func (t *Container) Close() error {
}
}
t.endpoints = nil
errarr = append(errarr, t.Base.Close())
errarr = append(errarr, t.BaseEndpoint.Close())
return errors.Join(errarr...)
}

func (t *Container) OnClose(f func()) Endpoint {
t.AddOnClose(f)
return t
}

// Init is used by the Router to initialize this endpoint.
func (t *Container) Init(ctx context.Context, logger *slog.Logger, add, remove func(interest Interest, t Endpoint) error) error {
if err := t.Base.Init(ctx, logger, add, remove); err != nil {
func (t *Container) Init(ctx context.Context, logger *slog.Logger, addIntent IntentCallback, addInterest InterestCallback) error {
if err := t.BaseEndpoint.Init(ctx, logger, addIntent, addInterest); err != nil {
return err
}

Expand All @@ -66,20 +71,18 @@ func (t *Container) Add(ep Endpoint) error {
return errors.ErrDuplicate
}
// TODO
err := ep.Init(t.Ctx, t.Log,
func(interest Interest, t Endpoint) error { return nil },
err := ep.Init(t.Ctx(), t.Log,
func(intent Intent, t Endpoint) error { return nil },
func(interest Interest, t Endpoint) error { return nil },
)
if err != nil {
return err
}
t.endpoints = append(t.endpoints, ep)
if notifier, ok := ep.(CloseNotifier); ok {
notifier.OnClose(func() {
t.Log.Info("Container OnClose", "name", ep.Name())
t.Remove(ep)
})
}
ep.OnClose(func() {
t.Log.Info("Container OnClose", "name", ep.Name())
t.Remove(ep)
})
return nil
}

Expand Down Expand Up @@ -152,16 +155,15 @@ func (t *Container) publish(route Route, opt ...PubOpt) (Intent, error) {
return ir.Wrap(), nil
}

ir, err := NewIntentRouter(t.Ctx, route,
func() error {
t.mu.Lock()
defer t.mu.Unlock()
delete(t.intentRouters, route.ID())
return nil
},
ir, err := NewIntentRouter(t.Ctx(), route,
t.Size,
intents...,
)
ir.OnClose(func() {
t.mu.Lock()
defer t.mu.Unlock()
delete(t.intentRouters, route.ID())
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -194,16 +196,15 @@ func (t *Container) subscribe(route Route, opt ...SubOpt) (Interest, error) {
return ir.Wrap(), nil
}

ir, err := NewInterestRouter(t.Ctx, route,
func() error {
t.mu.Lock()
defer t.mu.Unlock()
delete(t.interestRouters, route.ID())
return nil
},
ir, err := NewInterestRouter(t.Ctx(), route,
t.Size,
interests...,
)
ir.OnClose(func() {
t.mu.Lock()
defer t.mu.Unlock()
delete(t.interestRouters, route.ID())
})
if err != nil {
return nil, err
}
Expand Down
111 changes: 111 additions & 0 deletions container_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package dndm

import (
"context"
"log/slog"
"testing"
"time"

"github.com/stretchr/testify/assert"
mock "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func Test_NewContainer(t *testing.T) {
container := NewContainer("test", 10)
require.NotNil(t, container)
assert.Equal(t, "test", container.Name())

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel()

addCalled := make(chan struct{})
delCalled := make(chan struct{})
container.Init(ctx, slog.Default(),
func(interest Interest, t Endpoint) error {
close(addCalled)
return nil
},
func(interest Interest, t Endpoint) error {
close(delCalled)
return nil
},
)

container.OnClose(nil)
onCloseCalled := make(chan struct{})
container.OnClose(func() { close(onCloseCalled) })
container.Close()
}

func TestContainer_AddEndpoint(t *testing.T) {
container := NewContainer("test", 10)

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel()
container.Init(ctx, slog.Default(),
func(interest Interest, t Endpoint) error {
return nil
},
func(interest Interest, t Endpoint) error {
return nil
},
)

endpoint := &MockEndpoint{}
endpoint.On("Init", container.Ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil)

err := container.Add(endpoint)
assert.NoError(t, err)
assert.Equal(t, 1, len(container.endpoints))
assert.Contains(t, container.endpoints, endpoint)

container.Close()
}

// func TestContainer_RemoveEndpoint(t *testing.T) {
// container := NewContainer("test", 10)
// endpoint := new(MockEndpoint)
// endpoint.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
// endpoint.On("Close").Return(nil)

// container.Add(endpoint)
// err := container.Remove(endpoint)
// assert.NoError(t, err)
// assert.Equal(t, 0, len(container.endpoints))
// }

// func TestContainer_Close(t *testing.T) {
// container := NewContainer("test", 10)
// endpoint1 := new(MockEndpoint)
// endpoint2 := new(MockEndpoint)

// endpoint1.On("Close").Return(nil)
// endpoint2.On("Close").Return(nil)

// container.Add(endpoint1)
// container.Add(endpoint2)

// err := container.Close()
// assert.NoError(t, err)
// }

// func TestContainer_Publish(t *testing.T) {
// container := NewContainer("test", 10)
// endpoint := new(MockEndpoint)
// endpoint.On("Publish", mock.Anything).Return(&emptypb.Empty{}, nil)

// intent, err := container.Publish(Route{route: "testRoute"}, nil)
// assert.NoError(t, err)
// assert.NotNil(t, intent)
// }

// func TestContainer_Subscribe(t *testing.T) {
// container := NewContainer("test", 10)
// endpoint := new(MockEndpoint)
// endpoint.On("Subscribe", mock.Anything).Return(&emptypb.Empty{}, nil)

// interest, err := container.Subscribe(Route{route: "testRoute"}, nil)
// assert.NoError(t, err)
// assert.NotNil(t, interest)
// }
49 changes: 18 additions & 31 deletions dndm.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dndm

import (
"context"
"log/slog"
"sync"

Expand All @@ -13,17 +12,10 @@ type CauseCloser interface {
CloseCause(e error) error
}

// CloseNotifier interface for objects that can notify about a closure
type CloseNotifier interface {
// OnClose will be called when the connection closes
OnClose(func())
}

type Router struct {
BaseCtx
mu sync.Mutex
log *slog.Logger
ctx context.Context
cancel context.CancelFunc
size int
endpoints []Endpoint
intentRouters map[string]*IntentRouter
Expand All @@ -36,10 +28,8 @@ func New(opts ...Option) (*Router, error) {
return nil, err
}

ctx, cancel := context.WithCancel(opt.ctx)
ret := &Router{
ctx: ctx,
cancel: cancel,
BaseCtx: NewBaseCtxWithCtx(opt.ctx),
log: opt.logger.With("module", "router"),
endpoints: make([]Endpoint, len(opt.endpoints)),
intentRouters: make(map[string]*IntentRouter),
Expand All @@ -49,7 +39,7 @@ func New(opts ...Option) (*Router, error) {

for i, t := range opt.endpoints {
log := opt.logger.With("endpoint", t.Name())
if err := t.Init(ret.ctx, log, ret.addInterest, ret.removeInterest); err != nil {
if err := t.Init(ret.ctx, log, func(intent Intent, ep Endpoint) error { return nil }, ret.addInterest); err != nil {
return nil, err
}
ret.endpoints[i] = t
Expand All @@ -70,15 +60,14 @@ func (d *Router) addInterest(interest Interest, t Endpoint) error {
}

ir, err := NewInterestRouter(d.ctx, route,
func() error {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.interestRouters, route.ID())
return nil
},
d.size,
interest,
)
ir.OnClose(func() {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.interestRouters, route.ID())
})
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -128,15 +117,14 @@ func (d *Router) Publish(path string, msg proto.Message, opt ...PubOpt) (Intent,
}

ir, err = NewIntentRouter(d.ctx, route,
func() error {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.intentRouters, route.ID())
return nil
},
d.size,
intents...,
)
ir.OnClose(func() {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.intentRouters, route.ID())
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -170,15 +158,14 @@ func (d *Router) Subscribe(path string, msg proto.Message, opt ...SubOpt) (Inter
}

ir, err = NewInterestRouter(d.ctx, route,
func() error {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.interestRouters, route.ID())
return nil
},
d.size,
interests...,
)
ir.OnClose(func() {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.interestRouters, route.ID())
})
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit ab10ecf

Please sign in to comment.