diff --git a/client/client.go b/client/client.go index 2f7fbb0a6..b63df4ae7 100644 --- a/client/client.go +++ b/client/client.go @@ -16,13 +16,17 @@ package client import ( + // standard libraries "context" "sync" + // first-party libraries + "github.com/linkall-labs/vanus/observability/tracing" + + // this project eb "github.com/linkall-labs/vanus/client/internal/vanus/eventbus" "github.com/linkall-labs/vanus/client/pkg/api" "github.com/linkall-labs/vanus/client/pkg/eventbus" - "github.com/linkall-labs/vanus/observability/tracing" ) type Client interface { diff --git a/client/examples/eventbus/append/main.go b/client/examples/eventbus/append/main.go index 38951802c..6a42636de 100644 --- a/client/examples/eventbus/append/main.go +++ b/client/examples/eventbus/append/main.go @@ -26,6 +26,8 @@ import ( "github.com/linkall-labs/vanus/client" "github.com/linkall-labs/vanus/client/pkg/option" "github.com/linkall-labs/vanus/client/pkg/policy" + "github.com/linkall-labs/vanus/proto/pkg/cloudevents" + "github.com/linkall-labs/vanus/proto/pkg/codec" ) func main() { @@ -42,7 +44,14 @@ func main() { event.SetType("example.type") event.SetData(ce.ApplicationJSON, map[string]string{"hello": "world"}) - eventID, err := w.AppendOne(ctx, &event, option.WithWritePolicy(policy.NewRoundRobinWritePolicy(bus))) + eventpb, err := codec.ToProto(&event) + if err != nil { + return + } + ceBatch := &cloudevents.CloudEventBatch{ + Events: []*cloudevents.CloudEvent{eventpb}, + } + eventID, err := w.Append(ctx, ceBatch, option.WithWritePolicy(policy.NewRoundRobinWritePolicy(bus))) if err != nil { log.Print(err.Error()) } else { diff --git a/client/internal/vanus/eventbus/name_service.go b/client/internal/vanus/eventbus/name_service.go index c46fdf5ca..d0e994242 100644 --- a/client/internal/vanus/eventbus/name_service.go +++ b/client/internal/vanus/eventbus/name_service.go @@ -17,17 +17,20 @@ package eventbus import ( // standard libraries "context" - "github.com/linkall-labs/vanus/pkg/cluster" - "github.com/linkall-labs/vanus/observability/tracing" + // third-party libraries "go.opentelemetry.io/otel/trace" - "google.golang.org/grpc/credentials/insecure" // first-party libraries - "github.com/linkall-labs/vanus/client/pkg/record" + "github.com/linkall-labs/vanus/observability/log" + "github.com/linkall-labs/vanus/observability/tracing" + "github.com/linkall-labs/vanus/pkg/cluster" ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller" metapb "github.com/linkall-labs/vanus/proto/pkg/meta" + + // this project + "github.com/linkall-labs/vanus/client/pkg/record" ) func NewNameService(endpoints []string) *NameService { @@ -51,8 +54,11 @@ func (ns *NameService) LookupWritableLogs(ctx context.Context, eventbus string) } resp, err := ns.client.GetEventBus(ctx, req) - if err != nil { + log.Error(context.Background(), "get eventbus failed", map[string]interface{}{ + log.KeyError: err, + "eventbus": eventbus, + }) return nil, err } return toLogs(resp.GetLogs()), nil @@ -68,6 +74,10 @@ func (ns *NameService) LookupReadableLogs(ctx context.Context, eventbus string) resp, err := ns.client.GetEventBus(ctx, req) if err != nil { + log.Error(context.Background(), "get eventbus failed", map[string]interface{}{ + log.KeyError: err, + "eventbus": eventbus, + }) return nil, err } diff --git a/client/internal/vanus/eventlog/name_service.go b/client/internal/vanus/eventlog/name_service.go index b30abceb3..9f4518c7b 100644 --- a/client/internal/vanus/eventlog/name_service.go +++ b/client/internal/vanus/eventlog/name_service.go @@ -28,12 +28,12 @@ import ( "github.com/linkall-labs/vanus/observability/log" "github.com/linkall-labs/vanus/observability/tracing" "github.com/linkall-labs/vanus/pkg/cluster" + "github.com/linkall-labs/vanus/pkg/errors" ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller" metapb "github.com/linkall-labs/vanus/proto/pkg/meta" // this project. "github.com/linkall-labs/vanus/client/pkg/record" - "github.com/linkall-labs/vanus/pkg/errors" ) func NewNameService(endpoints []string) *NameService { @@ -59,18 +59,14 @@ func (ns *NameService) LookupWritableSegment(ctx context.Context, logID uint64) resp, err := ns.client.GetAppendableSegment(ctx, req) if err != nil { - log.Warning(ctx, "failed to GetAppendableSegment", map[string]interface{}{ - "req": req, - "res": resp, + log.Error(context.Background(), "get appendable segment failed", map[string]interface{}{ log.KeyError: err, + "eventlog": logID, + "resp": resp.String(), }) return nil, err } - log.Debug(ctx, "GetAppendableSegment result", map[string]interface{}{ - "req": req, - "res": resp, - }) segments := toSegments(resp.GetSegments()) if len(segments) == 0 { return nil, errors.ErrNotWritable @@ -91,6 +87,11 @@ func (ns *NameService) LookupReadableSegments(ctx context.Context, logID uint64) resp, err := ns.client.ListSegment(ctx, req) if err != nil { + log.Error(context.Background(), "list segment failed", map[string]interface{}{ + log.KeyError: err, + "eventlog": logID, + "resp": resp.String(), + }) return nil, err } diff --git a/client/internal/vanus/store/allocator.go b/client/internal/vanus/store/allocator.go index c468d848a..7ac47d101 100644 --- a/client/internal/vanus/store/allocator.go +++ b/client/internal/vanus/store/allocator.go @@ -15,15 +15,15 @@ package store import ( + // standard libraries "context" + "sync" - "github.com/linkall-labs/vanus/observability/tracing" + // third-party libraries "go.opentelemetry.io/otel/trace" - // standard libraries. - "sync" - - // this project. + // first-party libraries + "github.com/linkall-labs/vanus/observability/tracing" "github.com/linkall-labs/vanus/pkg/errors" ) diff --git a/client/internal/vanus/store/block_store.go b/client/internal/vanus/store/block_store.go index bb36fbeeb..3e91441ad 100644 --- a/client/internal/vanus/store/block_store.go +++ b/client/internal/vanus/store/block_store.go @@ -19,22 +19,20 @@ import ( "context" "time" - "github.com/linkall-labs/vanus/client/pkg/codec" + // third-party libraries - "github.com/linkall-labs/vanus/observability/tracing" "go.opentelemetry.io/otel/trace" - - ce "github.com/cloudevents/sdk-go/v2" "google.golang.org/grpc" // first-party libraries - // third-party libraries - cepb "github.com/linkall-labs/vanus/proto/pkg/cloudevents" + "github.com/linkall-labs/vanus/client/pkg/primitive" + "github.com/linkall-labs/vanus/observability/tracing" + "github.com/linkall-labs/vanus/proto/pkg/cloudevents" segpb "github.com/linkall-labs/vanus/proto/pkg/segment" + // this project "github.com/linkall-labs/vanus/client/internal/vanus/net/rpc" "github.com/linkall-labs/vanus/client/internal/vanus/net/rpc/bare" - "github.com/linkall-labs/vanus/client/pkg/primitive" ) func newBlockStore(endpoint string) (*BlockStore, error) { @@ -67,38 +65,10 @@ func (s *BlockStore) Close() { s.client.Close() } -func (s *BlockStore) Append(ctx context.Context, block uint64, event *ce.Event) (int64, error) { - _ctx, span := s.tracer.Start(ctx, "Append") - defer span.End() - - eventpb, err := codec.ToProto(event) - if err != nil { - return -1, err - } - req := &segpb.AppendToBlockRequest{ - BlockId: block, - Events: &cepb.CloudEventBatch{ - Events: []*cepb.CloudEvent{eventpb}, - }, - } - - client, err := s.client.Get(_ctx) - if err != nil { - return -1, err - } - - res, err := client.(segpb.SegmentServerClient).AppendToBlock(_ctx, req) - if err != nil { - return -1, err - } - // TODO(Y. F. Zhang): batch events - return res.GetOffsets()[0], nil -} - func (s *BlockStore) Read( ctx context.Context, block uint64, offset int64, size int16, pollingTimeout uint32, -) ([]*ce.Event, error) { - ctx, span := s.tracer.Start(ctx, "Append") +) (*cloudevents.CloudEventBatch, error) { + ctx, span := s.tracer.Start(ctx, "Read") defer span.End() req := &segpb.ReadFromBlockRequest{ @@ -117,23 +87,7 @@ func (s *BlockStore) Read( if err != nil { return nil, err } - - if batch := resp.GetEvents(); batch != nil { - if eventpbs := batch.GetEvents(); len(eventpbs) > 0 { - events := make([]*ce.Event, 0, len(eventpbs)) - for _, eventpb := range eventpbs { - event, err2 := codec.FromProto(eventpb) - if err2 != nil { - // TODO: return events or error? - return events, err2 - } - events = append(events, event) - } - return events, nil - } - } - - return []*ce.Event{}, err + return resp.GetEvents(), err } func (s *BlockStore) LookupOffset(ctx context.Context, blockID uint64, t time.Time) (int64, error) { @@ -157,24 +111,23 @@ func (s *BlockStore) LookupOffset(ctx context.Context, blockID uint64, t time.Ti return res.Offset, nil } -func (s *BlockStore) AppendBatch(ctx context.Context, block uint64, event *cepb.CloudEventBatch) (int64, error) { - _ctx, span := s.tracer.Start(ctx, "AppendBatch") +func (s *BlockStore) Append(ctx context.Context, block uint64, events *cloudevents.CloudEventBatch) ([]int64, error) { + _ctx, span := s.tracer.Start(ctx, "Append") defer span.End() req := &segpb.AppendToBlockRequest{ BlockId: block, - Events: event, + Events: events, } client, err := s.client.Get(_ctx) if err != nil { - return -1, err + return nil, err } res, err := client.(segpb.SegmentServerClient).AppendToBlock(_ctx, req) if err != nil { - return -1, err + return nil, err } - // TODO(Y. F. Zhang): batch events - return res.GetOffsets()[0], nil + return res.GetOffsets(), nil } diff --git a/client/pkg/api/client.go b/client/pkg/api/client.go index f5b092c69..79d060b03 100644 --- a/client/pkg/api/client.go +++ b/client/pkg/api/client.go @@ -17,8 +17,10 @@ package api import ( "context" + ce "github.com/cloudevents/sdk-go/v2" "github.com/linkall-labs/vanus/proto/pkg/cloudevents" + "github.com/linkall-labs/vanus/proto/pkg/codec" ) type Eventbus interface { @@ -31,13 +33,11 @@ type Eventbus interface { } type BusWriter interface { - AppendOne(ctx context.Context, event *ce.Event, opts ...WriteOption) (eid string, err error) - AppendMany(ctx context.Context, events []*ce.Event, opts ...WriteOption) (eid string, err error) - AppendBatch(ctx context.Context, events *cloudevents.CloudEventBatch, opts ...WriteOption) (err error) + Append(ctx context.Context, events *cloudevents.CloudEventBatch, opts ...WriteOption) (eids []string, err error) } type BusReader interface { - Read(ctx context.Context, opts ...ReadOption) ([]*ce.Event, int64, uint64, error) + Read(ctx context.Context, opts ...ReadOption) (events *cloudevents.CloudEventBatch, off int64, logid uint64, err error) } type Eventlog interface { @@ -47,3 +47,47 @@ type Eventlog interface { Length(ctx context.Context) (int64, error) QueryOffsetByTime(ctx context.Context, timestamp int64) (int64, error) } + +func Append(ctx context.Context, w BusWriter, events []*ce.Event, opts ...WriteOption) (eids []string, err error) { + eventpbs := make([]*cloudevents.CloudEvent, len(events)) + for idx := range events { + eventpb, err := codec.ToProto(events[idx]) + if err != nil { + return nil, err + } + eventpbs[idx] = eventpb + } + return w.Append(ctx, &cloudevents.CloudEventBatch{ + Events: eventpbs, + }, opts...) +} + +func AppendOne(ctx context.Context, w BusWriter, event *ce.Event, opts ...WriteOption) (eid string, err error) { + eventpb, err := codec.ToProto(event) + if err != nil { + return "", err + } + eids, err := w.Append(ctx, &cloudevents.CloudEventBatch{ + Events: []*cloudevents.CloudEvent{eventpb}, + }, opts...) + if err != nil { + return "", err + } + return eids[0], nil +} + +func Read(ctx context.Context, r BusReader, opts ...ReadOption) (events []*ce.Event, off int64, logid uint64, err error) { + batch, off, logid, err := r.Read(ctx, opts...) + if err != nil { + return nil, off, logid, err + } + es := make([]*ce.Event, len(batch.Events)) + for idx := range batch.Events { + e, err := codec.FromProto(batch.Events[idx]) + if err != nil { + return nil, 0, 0, err + } + es[idx] = e + } + return es, off, logid, nil +} diff --git a/client/pkg/api/mock_client.go b/client/pkg/api/mock_client.go index ffb896001..10cbcf2cd 100644 --- a/client/pkg/api/mock_client.go +++ b/client/pkg/api/mock_client.go @@ -8,7 +8,6 @@ import ( context "context" reflect "reflect" - v2 "github.com/cloudevents/sdk-go/v2" gomock "github.com/golang/mock/gomock" cloudevents "github.com/linkall-labs/vanus/proto/pkg/cloudevents" ) @@ -147,63 +146,24 @@ func (m *MockBusWriter) EXPECT() *MockBusWriterMockRecorder { return m.recorder } -// AppendBatch mocks base method. -func (m *MockBusWriter) AppendBatch(ctx context.Context, events *cloudevents.CloudEventBatch, opts ...WriteOption) error { +// Append mocks base method. +func (m *MockBusWriter) Append(ctx context.Context, events *cloudevents.CloudEventBatch, opts ...WriteOption) ([]string, error) { m.ctrl.T.Helper() varargs := []interface{}{ctx, events} for _, a := range opts { varargs = append(varargs, a) } - ret := m.ctrl.Call(m, "AppendBatch", varargs...) - ret0, _ := ret[0].(error) - return ret0 -} - -// AppendBatch indicates an expected call of AppendBatch. -func (mr *MockBusWriterMockRecorder) AppendBatch(ctx, events interface{}, opts ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, events}, opts...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AppendBatch", reflect.TypeOf((*MockBusWriter)(nil).AppendBatch), varargs...) -} - -// AppendMany mocks base method. -func (m *MockBusWriter) AppendMany(ctx context.Context, events []*v2.Event, opts ...WriteOption) (string, error) { - m.ctrl.T.Helper() - varargs := []interface{}{ctx, events} - for _, a := range opts { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "AppendMany", varargs...) - ret0, _ := ret[0].(string) + ret := m.ctrl.Call(m, "Append", varargs...) + ret0, _ := ret[0].([]string) ret1, _ := ret[1].(error) return ret0, ret1 } -// AppendMany indicates an expected call of AppendMany. -func (mr *MockBusWriterMockRecorder) AppendMany(ctx, events interface{}, opts ...interface{}) *gomock.Call { +// Append indicates an expected call of Append. +func (mr *MockBusWriterMockRecorder) Append(ctx, events interface{}, opts ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() varargs := append([]interface{}{ctx, events}, opts...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AppendMany", reflect.TypeOf((*MockBusWriter)(nil).AppendMany), varargs...) -} - -// AppendOne mocks base method. -func (m *MockBusWriter) AppendOne(ctx context.Context, event *v2.Event, opts ...WriteOption) (string, error) { - m.ctrl.T.Helper() - varargs := []interface{}{ctx, event} - for _, a := range opts { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "AppendOne", varargs...) - ret0, _ := ret[0].(string) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// AppendOne indicates an expected call of AppendOne. -func (mr *MockBusWriterMockRecorder) AppendOne(ctx, event interface{}, opts ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, event}, opts...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AppendOne", reflect.TypeOf((*MockBusWriter)(nil).AppendOne), varargs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Append", reflect.TypeOf((*MockBusWriter)(nil).Append), varargs...) } // MockBusReader is a mock of BusReader interface. @@ -230,14 +190,14 @@ func (m *MockBusReader) EXPECT() *MockBusReaderMockRecorder { } // Read mocks base method. -func (m *MockBusReader) Read(ctx context.Context, opts ...ReadOption) ([]*v2.Event, int64, uint64, error) { +func (m *MockBusReader) Read(ctx context.Context, opts ...ReadOption) (*cloudevents.CloudEventBatch, int64, uint64, error) { m.ctrl.T.Helper() varargs := []interface{}{ctx} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "Read", varargs...) - ret0, _ := ret[0].([]*v2.Event) + ret0, _ := ret[0].(*cloudevents.CloudEventBatch) ret1, _ := ret[1].(int64) ret2, _ := ret[2].(uint64) ret3, _ := ret[3].(error) diff --git a/client/pkg/eventbus/eventbus.go b/client/pkg/eventbus/eventbus.go index b42d28745..6bc755762 100644 --- a/client/pkg/eventbus/eventbus.go +++ b/client/pkg/eventbus/eventbus.go @@ -15,33 +15,31 @@ package eventbus import ( - // standard libraries. + // standard libraries "context" "encoding/base64" "encoding/binary" stderrors "errors" - "github.com/linkall-labs/vanus/proto/pkg/cloudevents" "io" "sync" - "github.com/linkall-labs/vanus/observability/tracing" - "go.opentelemetry.io/otel/trace" + // third-party libraries - // third-party libraries. - ce "github.com/cloudevents/sdk-go/v2" "github.com/scylladb/go-set/u64set" + "go.opentelemetry.io/otel/trace" // first-party libraries - - // this project. - "github.com/linkall-labs/vanus/client/pkg/api" - "github.com/linkall-labs/vanus/client/pkg/eventlog" - "github.com/linkall-labs/vanus/client/pkg/policy" - vlog "github.com/linkall-labs/vanus/observability/log" + "github.com/linkall-labs/vanus/observability/log" + "github.com/linkall-labs/vanus/observability/tracing" "github.com/linkall-labs/vanus/pkg/errors" + "github.com/linkall-labs/vanus/proto/pkg/cloudevents" + // this project eb "github.com/linkall-labs/vanus/client/internal/vanus/eventbus" el "github.com/linkall-labs/vanus/client/internal/vanus/eventlog" + "github.com/linkall-labs/vanus/client/pkg/api" + "github.com/linkall-labs/vanus/client/pkg/eventlog" + "github.com/linkall-labs/vanus/client/pkg/policy" ) func NewEventbus(cfg *eb.Config) *eventbus { @@ -67,7 +65,7 @@ func NewEventbus(cfg *eb.Config) *eventbus { for { re, ok := <-ch if !ok { - vlog.Debug(context.Background(), "eventbus quits writable watcher", map[string]interface{}{ + log.Debug(context.Background(), "eventbus quits writable watcher", map[string]interface{}{ "eventbus": bus.cfg.Name, }) break @@ -89,7 +87,7 @@ func NewEventbus(cfg *eb.Config) *eventbus { for { re, ok := <-ch if !ok { - vlog.Debug(context.Background(), "eventbus quits readable watcher", map[string]interface{}{ + log.Debug(context.Background(), "eventbus quits readable watcher", map[string]interface{}{ "eventbus": bus.cfg.Name, }) break @@ -188,16 +186,16 @@ func (b *eventbus) GetLog(ctx context.Context, logID uint64, opts ...api.LogOpti if len(b.readableLogs) == 0 { b.refreshReadableLogs(ctx) } - if log, ok := b.readableLogs[logID]; ok { - return log, nil + if l, ok := b.readableLogs[logID]; ok { + return l, nil } return nil, errors.ErrResourceNotFound.WithMessage("eventlog not found") } else if op.Policy.AccessMode() == api.ReadWrite { if len(b.writableLogs) == 0 { b.refreshWritableLogs(ctx) } - if log, ok := b.writableLogs[logID]; ok { - return log, nil + if l, ok := b.writableLogs[logID]; ok { + return l, nil } return nil, errors.ErrResourceNotFound.WithMessage("eventlog not found") } else { @@ -311,8 +309,7 @@ func (b *eventbus) updateWritableLogs(ctx context.Context, re *WritableLogsResul Endpoints: b.cfg.Endpoints, ID: logID, } - log := eventlog.NewEventLog(cfg) - lws[logID] = log + lws[logID] = eventlog.NewEventLog(cfg) return true }) b.setWritableLogs(s, lws) @@ -404,8 +401,7 @@ func (b *eventbus) updateReadableLogs(ctx context.Context, re *ReadableLogsResul Endpoints: b.cfg.Endpoints, ID: logID, } - log := eventlog.NewEventLog(cfg) - lws[logID] = log + lws[logID] = eventlog.NewEventLog(cfg) return true }) b.setReadableLogs(s, lws) @@ -446,33 +442,10 @@ type busWriter struct { tracer *tracing.Tracer } -func (w *busWriter) AppendBatch(ctx context.Context, events *cloudevents.CloudEventBatch, opts ...api.WriteOption) (err error) { - _ctx, span := w.tracer.Start(ctx, "CloudEventBatch") - defer span.End() - - var writeOpts = w.opts - if len(opts) > 0 { - writeOpts = w.opts.Copy() - for _, opt := range opts { - opt(writeOpts) - } - } - - // 1. pick a writer of eventlog - lw, err := w.pickWritableLog(_ctx, writeOpts) - if err != nil { - return err - } - - // 2. append the event to the eventlog - _, err = lw.AppendMany(_ctx, events) - return err -} - var _ api.BusWriter = (*busWriter)(nil) -func (w *busWriter) AppendOne(ctx context.Context, event *ce.Event, opts ...api.WriteOption) (eid string, err error) { - _ctx, span := w.tracer.Start(ctx, "AppendOne") +func (w *busWriter) Append(ctx context.Context, events *cloudevents.CloudEventBatch, opts ...api.WriteOption) (eids []string, err error) { + _ctx, span := w.tracer.Start(ctx, "Append") defer span.End() var writeOpts = w.opts @@ -486,27 +459,29 @@ func (w *busWriter) AppendOne(ctx context.Context, event *ce.Event, opts ...api. // 1. pick a writer of eventlog lw, err := w.pickWritableLog(_ctx, writeOpts) if err != nil { - return "", err + log.Error(context.Background(), "pick writable log failed", map[string]interface{}{ + log.KeyError: err, + "eventbus": w.ebus.Name(), + }) + return nil, err } // 2. append the event to the eventlog - off, err := lw.Append(_ctx, event) + offsets, err := lw.Append(_ctx, events) if err != nil { - return "", err + log.Error(context.Background(), "logwriter append failed", map[string]interface{}{ + log.KeyError: err, + "eventbus": w.ebus.Name(), + "eventlog-id": lw.Log().ID(), + }) + return nil, err } - // 3. generate event ID - var buf [16]byte - binary.BigEndian.PutUint64(buf[0:8], lw.Log().ID()) - binary.BigEndian.PutUint64(buf[8:16], uint64(off)) - encoded := base64.StdEncoding.EncodeToString(buf[:]) - - return encoded, nil -} - -func (w *busWriter) AppendMany(ctx context.Context, events []*ce.Event, opts ...api.WriteOption) (eid string, err error) { - // TODO(jiangkai): implement this method, by jiangkai, 2022.10.24 - return "", nil + eventIDs := make([]string, len(offsets)) + for idx := range offsets { + eventIDs[idx] = genEventID(lw.Log().ID(), offsets[idx]) + } + return eventIDs, nil } func (w *busWriter) Bus() api.Eventbus { @@ -517,17 +492,25 @@ func (w *busWriter) pickWritableLog(ctx context.Context, opts *api.WriteOptions) _ctx, span := w.tracer.Start(ctx, "pickWritableLog") defer span.End() - log, err := opts.Policy.NextLog(ctx) + l, err := opts.Policy.NextLog(ctx) if err != nil { return nil, err } - l := w.ebus.getWritableLog(_ctx, log.ID()) - if l == nil { + lw := w.ebus.getWritableLog(_ctx, l.ID()) + if lw == nil { return nil, stderrors.New("can not pick writable log") } - return l.Writer(), nil + return lw.Writer(), nil +} + +func genEventID(logID uint64, off int64) string { + var buf [16]byte + binary.BigEndian.PutUint64(buf[0:8], logID) + binary.BigEndian.PutUint64(buf[8:16], uint64(off)) + encoded := base64.StdEncoding.EncodeToString(buf[:]) + return encoded } type busReader struct { @@ -538,7 +521,7 @@ type busReader struct { var _ api.BusReader = (*busReader)(nil) -func (r *busReader) Read(ctx context.Context, opts ...api.ReadOption) ([]*ce.Event, int64, uint64, error) { +func (r *busReader) Read(ctx context.Context, opts ...api.ReadOption) (events *cloudevents.CloudEventBatch, off int64, logid uint64, err error) { _ctx, span := r.tracer.Start(ctx, "Read") defer span.End() @@ -553,19 +536,27 @@ func (r *busReader) Read(ctx context.Context, opts ...api.ReadOption) ([]*ce.Eve // 1. pick a reader of eventlog lr, err := r.pickReadableLog(_ctx, readOpts) if err != nil { - return []*ce.Event{}, 0, 0, err + log.Error(context.Background(), "pick readable log failed", map[string]interface{}{ + log.KeyError: err, + "eventbus": r.ebus.Name(), + }) + return nil, 0, 0, err } // TODO(jiangkai): refactor eventlog interface to avoid seek every time, by jiangkai, 2022.10.24 - off, err := lr.Seek(_ctx, readOpts.Policy.Offset(), io.SeekStart) + off, err = lr.Seek(_ctx, readOpts.Policy.Offset(), io.SeekStart) if err != nil { - return []*ce.Event{}, 0, 0, err + log.Error(context.Background(), "seek offset failed", map[string]interface{}{ + log.KeyError: err, + "eventbus": r.ebus.Name(), + }) + return nil, 0, 0, err } // 2. read the event to the eventlog - events, err := lr.Read(_ctx, int16(readOpts.BatchSize)) + events, err = lr.Read(_ctx, int16(readOpts.BatchSize)) if err != nil { - return []*ce.Event{}, 0, 0, err + return nil, 0, 0, err } return events, off, lr.Log().ID(), nil } @@ -578,11 +569,11 @@ func (r *busReader) pickReadableLog(ctx context.Context, opts *api.ReadOptions) _ctx, span := r.tracer.Start(ctx, "pickReadableLog") defer span.End() - log, err := opts.Policy.NextLog(ctx) + l, err := opts.Policy.NextLog(ctx) if err != nil { return nil, err } - lr := r.ebus.getReadableLog(_ctx, log.ID()) + lr := r.ebus.getReadableLog(_ctx, l.ID()) if lr == nil { return nil, stderrors.New("can not pick readable log") } diff --git a/client/pkg/eventbus/lookup.go b/client/pkg/eventbus/lookup.go index cd540436d..2c6c6503d 100644 --- a/client/pkg/eventbus/lookup.go +++ b/client/pkg/eventbus/lookup.go @@ -15,12 +15,14 @@ package eventbus import ( - // standard libraries. + // standard libraries "context" "time" - // this project. + // first-party libraries + "github.com/linkall-labs/vanus/observability/log" + // this project "github.com/linkall-labs/vanus/client/pkg/primitive" "github.com/linkall-labs/vanus/client/pkg/record" ) @@ -47,6 +49,11 @@ func WatchWritableLogs(bus *eventbus) *WritableLogsWatcher { ch := make(chan *WritableLogsResult, 1) w := primitive.NewWatcher(30*time.Second, func() { rs, err := bus.nameService.LookupWritableLogs(context.Background(), bus.cfg.Name) + log.Debug(context.Background(), "lookup writable logs", map[string]interface{}{ + log.KeyError: err, + "eventbus": bus.cfg.Name, + "logs": rs, + }) ch <- &WritableLogsResult{ Eventlogs: rs, Err: err, @@ -84,6 +91,11 @@ func WatchReadableLogs(bus *eventbus) *ReadableLogsWatcher { ch := make(chan *ReadableLogsResult, 1) w := primitive.NewWatcher(30*time.Second, func() { rs, err := bus.nameService.LookupReadableLogs(context.Background(), bus.cfg.Name) + log.Debug(context.Background(), "lookup readable logs", map[string]interface{}{ + log.KeyError: err, + "eventbus": bus.cfg.Name, + "logs": rs, + }) ch <- &ReadableLogsResult{ Eventlogs: rs, Err: err, diff --git a/client/pkg/eventlog/eventlog.go b/client/pkg/eventlog/eventlog.go index f96756e19..144bc962b 100644 --- a/client/pkg/eventlog/eventlog.go +++ b/client/pkg/eventlog/eventlog.go @@ -16,17 +16,15 @@ package eventlog import ( - // standard libraries. + // standard libraries "context" - "github.com/linkall-labs/vanus/proto/pkg/cloudevents" - // third-party libraries. - ce "github.com/cloudevents/sdk-go/v2" + // third-party libraries + "github.com/linkall-labs/vanus/proto/pkg/cloudevents" + segpb "github.com/linkall-labs/vanus/proto/pkg/segment" - // first-party libraries. + // first-party libraries "github.com/linkall-labs/vanus/client/pkg/api" - segpb "github.com/linkall-labs/vanus/proto/pkg/segment" - // this project. ) const ( @@ -50,8 +48,7 @@ type LogWriter interface { Close(ctx context.Context) - Append(ctx context.Context, event *ce.Event) (off int64, err error) - AppendMany(ctx context.Context, events *cloudevents.CloudEventBatch) (off int64, err error) + Append(ctx context.Context, events *cloudevents.CloudEventBatch) (offs []int64, err error) } type LogReader interface { @@ -60,7 +57,7 @@ type LogReader interface { Close(ctx context.Context) // TODO: async - Read(ctx context.Context, size int16) (events []*ce.Event, err error) + Read(ctx context.Context, size int16) (events *cloudevents.CloudEventBatch, err error) // Seek sets the offset for the next Read to offset, // interpreted according to whence. diff --git a/client/pkg/eventlog/eventlog_impl.go b/client/pkg/eventlog/eventlog_impl.go index c0acd5100..8a0ffe85c 100644 --- a/client/pkg/eventlog/eventlog_impl.go +++ b/client/pkg/eventlog/eventlog_impl.go @@ -15,25 +15,25 @@ package eventlog import ( - // standard libraries. + // standard libraries "context" - "github.com/linkall-labs/vanus/proto/pkg/cloudevents" "io" "sort" "sync" "time" - "github.com/linkall-labs/vanus/observability/tracing" + // third-party libraries "go.opentelemetry.io/otel/trace" - // third-party libraries. - ce "github.com/cloudevents/sdk-go/v2" + // first-party libraries + "github.com/linkall-labs/vanus/observability/log" + "github.com/linkall-labs/vanus/observability/tracing" + "github.com/linkall-labs/vanus/pkg/errors" + "github.com/linkall-labs/vanus/proto/pkg/cloudevents" - // this project. + // this project el "github.com/linkall-labs/vanus/client/internal/vanus/eventlog" "github.com/linkall-labs/vanus/client/pkg/record" - vlog "github.com/linkall-labs/vanus/observability/log" - "github.com/linkall-labs/vanus/pkg/errors" ) const ( @@ -43,60 +43,60 @@ const ( ) func NewEventLog(cfg *el.Config) Eventlog { - log := &eventlog{ + l := &eventlog{ cfg: cfg, nameService: el.NewNameService(cfg.Endpoints), tracer: tracing.NewTracer("pkg.eventlog.impl", trace.SpanKindClient), } - log.writableWatcher = WatchWritableSegment(log) - log.readableWatcher = WatchReadableSegments(log) + l.writableWatcher = WatchWritableSegment(l) + l.readableWatcher = WatchReadableSegments(l) go func() { - ch := log.writableWatcher.Chan() + ch := l.writableWatcher.Chan() for { r, ok := <-ch if !ok { - vlog.Debug(context.Background(), "eventlog quits writable watcher", map[string]interface{}{ - "eventlog": log.cfg.ID, + log.Debug(context.Background(), "eventlog quits writable watcher", map[string]interface{}{ + "eventlog": l.cfg.ID, }) break } - ctx, span := log.tracer.Start(context.Background(), "updateReadableSegmentsTask") + ctx, span := l.tracer.Start(context.Background(), "updateReadableSegmentsTask") if r != nil { - log.updateWritableSegment(ctx, r) + l.updateWritableSegment(ctx, r) } - log.writableWatcher.Wakeup() + l.writableWatcher.Wakeup() span.End() } }() - log.writableWatcher.Start() + l.writableWatcher.Start() go func() { - ch := log.readableWatcher.Chan() + ch := l.readableWatcher.Chan() for { rs, ok := <-ch if !ok { - vlog.Debug(context.Background(), "eventlog quits readable watcher", map[string]interface{}{ - "eventlog": log.cfg.ID, + log.Debug(context.Background(), "eventlog quits readable watcher", map[string]interface{}{ + "eventlog": l.cfg.ID, }) break } - ctx, span := log.tracer.Start(context.Background(), "updateReadableSegmentsTask") + ctx, span := l.tracer.Start(context.Background(), "updateReadableSegmentsTask") if rs != nil { - log.updateReadableSegments(ctx, rs) + l.updateReadableSegments(ctx, rs) } - log.readableWatcher.Wakeup() + l.readableWatcher.Wakeup() span.End() } }() - log.readableWatcher.Start() + l.readableWatcher.Start() - return log + return l } type eventlog struct { @@ -220,8 +220,8 @@ func (l *eventlog) updateWritableSegment(ctx context.Context, r *record.Segment) segment, err := newSegment(ctx, r, true) if err != nil { - vlog.Error(context.Background(), "new segment failed", map[string]interface{}{ - vlog.KeyError: err, + log.Error(context.Background(), "new segment failed", map[string]interface{}{ + log.KeyError: err, }) return } @@ -280,6 +280,9 @@ func (l *eventlog) updateReadableSegments(ctx context.Context, rs []*record.Segm } if err != nil { // FIXME: create or update segment failed + log.Debug(context.Background(), "update readable segment failed", map[string]interface{}{ + "segment": segment.id, + }) continue } segments = append(segments, segment) @@ -343,26 +346,26 @@ type logWriter struct { mu sync.RWMutex } -func (w *logWriter) AppendMany(ctx context.Context, events *cloudevents.CloudEventBatch) (off int64, err error) { +func (w *logWriter) Append(ctx context.Context, events *cloudevents.CloudEventBatch) (offs []int64, err error) { retryTimes := defaultRetryTimes for i := 1; i <= retryTimes; i++ { - offset, err := w.doAppendBatch(ctx, events) + offsets, err := w.doAppend(ctx, events) if err == nil { - return offset, nil + return offsets, nil } - vlog.Warning(ctx, "failed to Append", map[string]interface{}{ - vlog.KeyError: err, - "offset": offset, - }) - if errors.Is(err, errors.ErrSegmentFull) { - if i < retryTimes { - continue - } + if !errors.Is(err, errors.ErrSegmentFull) { + log.Error(ctx, "logwriter append failed", map[string]interface{}{ + log.KeyError: err, + }) + return nil, err } - return -1, err + log.Debug(ctx, "logwriter append failed cause segment full", map[string]interface{}{ + log.KeyError: err, + "offsets": offsets, + "retry_time": i, + }) } - - return -1, errors.ErrUnknown + return nil, errors.ErrUnknown } func (w *logWriter) Log() Eventlog { @@ -373,58 +376,19 @@ func (w *logWriter) Close(ctx context.Context) { // TODO: by jiangkai, 2022.10.19 } -func (w *logWriter) Append(ctx context.Context, event *ce.Event) (int64, error) { - // TODO: async for throughput - - retryTimes := defaultRetryTimes - for i := 1; i <= retryTimes; i++ { - offset, err := w.doAppend(ctx, event) - if err == nil { - return offset, nil - } - vlog.Warning(ctx, "failed to Append", map[string]interface{}{ - vlog.KeyError: err, - "offset": offset, - }) - if errors.Is(err, errors.ErrSegmentFull) { - if i < retryTimes { - continue - } - } - return -1, err - } - - return -1, errors.ErrUnknown -} - -func (w *logWriter) doAppend(ctx context.Context, event *ce.Event) (int64, error) { - segment, err := w.selectWritableSegment(ctx) - if err != nil { - return -1, err - } - offset, err := segment.Append(ctx, event) - if err != nil { - if errors.Is(err, errors.ErrSegmentFull) { - segment.SetNotWritable() - } - return -1, err - } - return offset, nil -} - -func (w *logWriter) doAppendBatch(ctx context.Context, event *cloudevents.CloudEventBatch) (int64, error) { +func (w *logWriter) doAppend(ctx context.Context, event *cloudevents.CloudEventBatch) ([]int64, error) { segment, err := w.selectWritableSegment(ctx) if err != nil { - return -1, err + return nil, err } - offset, err := segment.AppendBatch(ctx, event) + offsets, err := segment.Append(ctx, event) if err != nil { if errors.Is(err, errors.ErrSegmentFull) { segment.SetNotWritable() } - return -1, err + return nil, err } - return offset, nil + return offsets, nil } func (w *logWriter) selectWritableSegment(ctx context.Context) (*segment, error) { @@ -470,7 +434,7 @@ func (r *logReader) Close(ctx context.Context) { // TODO: by jiangkai, 2022.10.19 } -func (r *logReader) Read(ctx context.Context, size int16) ([]*ce.Event, error) { +func (r *logReader) Read(ctx context.Context, size int16) (*cloudevents.CloudEventBatch, error) { if r.cur == nil { segment, err := r.elog.selectReadableSegment(ctx, r.pos) if errors.Is(err, errors.ErrOffsetOnEnd) { @@ -494,7 +458,7 @@ func (r *logReader) Read(ctx context.Context, size int16) ([]*ce.Event, error) { return nil, err } - r.pos += int64(len(events)) + r.pos += int64(len(events.Events)) if r.pos == r.cur.EndOffset() { r.switchSegment(ctx) } diff --git a/client/pkg/eventlog/log_segment.go b/client/pkg/eventlog/log_segment.go index ac73e3be9..686f5b157 100644 --- a/client/pkg/eventlog/log_segment.go +++ b/client/pkg/eventlog/log_segment.go @@ -15,28 +15,25 @@ package eventlog import ( - // standard libraries. + // standard libraries "context" "encoding/binary" - "github.com/linkall-labs/vanus/proto/pkg/cloudevents" "math" "sync" "time" - "github.com/linkall-labs/vanus/observability/tracing" + // third-party libraries "go.opentelemetry.io/otel/trace" - - // third-party libraries. - ce "github.com/cloudevents/sdk-go/v2" "go.uber.org/atomic" - // first-party libraries. + // first-party libraries + "github.com/linkall-labs/vanus/observability/tracing" + "github.com/linkall-labs/vanus/pkg/errors" + "github.com/linkall-labs/vanus/proto/pkg/cloudevents" segpb "github.com/linkall-labs/vanus/proto/pkg/segment" - // this project. - + // this project "github.com/linkall-labs/vanus/client/pkg/record" - "github.com/linkall-labs/vanus/pkg/errors" ) func newSegment(ctx context.Context, r *record.Segment, towrite bool) (*segment, error) { @@ -157,37 +154,25 @@ func (s *segment) Update(ctx context.Context, r *record.Segment, towrite bool) e return nil } -func (s *segment) Append(ctx context.Context, event *ce.Event) (int64, error) { +func (s *segment) Append(ctx context.Context, event *cloudevents.CloudEventBatch) ([]int64, error) { _ctx, span := s.tracer.Start(ctx, "Append") defer span.End() b := s.preferSegmentBlock() if b == nil { - return -1, errors.ErrNotLeader + return nil, errors.ErrNotLeader } - off, err := b.Append(_ctx, event) + offs, err := b.Append(_ctx, event) if err != nil { - return -1, err - } - return off + s.startOffset, nil -} - -func (s *segment) AppendBatch(ctx context.Context, event *cloudevents.CloudEventBatch) (int64, error) { - _ctx, span := s.tracer.Start(ctx, "AppendBatch") - defer span.End() - - b := s.preferSegmentBlock() - if b == nil { - return -1, errors.ErrNotLeader + return nil, err } - off, err := b.AppendBatch(_ctx, event) - if err != nil { - return -1, err + for idx := range offs { + offs[idx] += s.startOffset } - return off + s.startOffset, nil + return offs, nil } -func (s *segment) Read(ctx context.Context, from int64, size int16, pollingTimeout uint32) ([]*ce.Event, error) { +func (s *segment) Read(ctx context.Context, from int64, size int16, pollingTimeout uint32) (*cloudevents.CloudEventBatch, error) { if from < s.startOffset { return nil, errors.ErrOffsetUnderflow } @@ -212,20 +197,23 @@ func (s *segment) Read(ctx context.Context, from int64, size int16, pollingTimeo return nil, err } - for _, e := range events { - v, ok := e.Extensions()[segpb.XVanusBlockOffset] + for _, e := range events.Events { + v, ok := e.Attributes[segpb.XVanusBlockOffset] if !ok { continue } - off, ok := v.(int32) + + _, ok = v.GetAttr().(*cloudevents.CloudEvent_CloudEventAttributeValue_CeInteger) if !ok { return events, errors.ErrCorruptedEvent } - offset := s.startOffset + int64(off) + offset := s.startOffset + int64(v.GetCeInteger()) buf := make([]byte, 8) binary.BigEndian.PutUint64(buf, uint64(offset)) - e.SetExtension(XVanusLogOffset, buf) - e.SetExtension(segpb.XVanusBlockOffset, nil) + e.Attributes[XVanusLogOffset] = &cloudevents.CloudEvent_CloudEventAttributeValue{ + Attr: &cloudevents.CloudEvent_CloudEventAttributeValue_CeBytes{CeBytes: buf}, + } + delete(e.Attributes, segpb.XVanusBlockOffset) } return events, err diff --git a/client/pkg/eventlog/lookup.go b/client/pkg/eventlog/lookup.go index c73c62129..7f96a4266 100644 --- a/client/pkg/eventlog/lookup.go +++ b/client/pkg/eventlog/lookup.go @@ -15,12 +15,11 @@ package eventlog import ( - // standard libraries. + // standard libraries "context" "time" - // this project. - + // this project "github.com/linkall-labs/vanus/client/pkg/primitive" "github.com/linkall-labs/vanus/client/pkg/record" ) @@ -42,10 +41,10 @@ func (w *WritableSegmentWatcher) Start() { go w.Watcher.Run() } -func WatchWritableSegment(log *eventlog) *WritableSegmentWatcher { +func WatchWritableSegment(l *eventlog) *WritableSegmentWatcher { ch := make(chan *record.Segment, 1) w := primitive.NewWatcher(defaultWatchInterval, func() { - r, err := log.nameService.LookupWritableSegment(context.Background(), log.cfg.ID) + r, err := l.nameService.LookupWritableSegment(context.Background(), l.cfg.ID) if err != nil { ch <- nil } else { @@ -74,10 +73,10 @@ func (w *ReadableSegmentsWatcher) Start() { go w.Watcher.Run() } -func WatchReadableSegments(log *eventlog) *ReadableSegmentsWatcher { +func WatchReadableSegments(l *eventlog) *ReadableSegmentsWatcher { ch := make(chan []*record.Segment, 1) w := primitive.NewWatcher(defaultWatchInterval, func() { - rs, err := log.nameService.LookupReadableSegments(context.Background(), log.cfg.ID) + rs, err := l.nameService.LookupReadableSegments(context.Background(), l.cfg.ID) if err != nil { ch <- nil } else { diff --git a/client/pkg/eventlog/segment_block.go b/client/pkg/eventlog/segment_block.go index cafd5c221..a842c260a 100644 --- a/client/pkg/eventlog/segment_block.go +++ b/client/pkg/eventlog/segment_block.go @@ -15,17 +15,17 @@ package eventlog import ( - // third-party libraries + // standard libraries "context" "time" - ce "github.com/cloudevents/sdk-go/v2" + // first-party libraries + "github.com/linkall-labs/vanus/pkg/errors" + "github.com/linkall-labs/vanus/proto/pkg/cloudevents" // this project "github.com/linkall-labs/vanus/client/internal/vanus/store" "github.com/linkall-labs/vanus/client/pkg/record" - "github.com/linkall-labs/vanus/pkg/errors" - "github.com/linkall-labs/vanus/proto/pkg/cloudevents" ) func newBlock(ctx context.Context, r *record.Block) (*block, error) { @@ -53,22 +53,20 @@ func (s *block) LookupOffset(ctx context.Context, t time.Time) (int64, error) { return s.store.LookupOffset(ctx, s.id, t) } -func (s *block) Append(ctx context.Context, event *ce.Event) (int64, error) { +func (s *block) Append(ctx context.Context, event *cloudevents.CloudEventBatch) ([]int64, error) { return s.store.Append(ctx, s.id, event) } -func (s *block) AppendBatch(ctx context.Context, event *cloudevents.CloudEventBatch) (int64, error) { - return s.store.AppendBatch(ctx, s.id, event) -} - -func (s *block) Read(ctx context.Context, offset int64, size int16, pollingTimeout uint32) ([]*ce.Event, error) { +func (s *block) Read(ctx context.Context, offset int64, size int16, pollingTimeout uint32) (*cloudevents.CloudEventBatch, error) { if offset < 0 { return nil, errors.ErrOffsetUnderflow } if size > 0 { // doRead } else if size == 0 { - return make([]*ce.Event, 0), nil + return &cloudevents.CloudEventBatch{ + Events: []*cloudevents.CloudEvent{}, + }, nil } else if size < 0 { return nil, errors.ErrInvalidArgument } diff --git a/internal/gateway/gateway.go b/internal/gateway/gateway.go index 401fd9c8b..c31b7f8af 100644 --- a/internal/gateway/gateway.go +++ b/internal/gateway/gateway.go @@ -165,7 +165,7 @@ func (ga *ceGateway) receive(ctx context.Context, event v2.Event) (re *v2.Event, v, _ = ga.busWriter.LoadOrStore(ebName, ga.client.Eventbus(ctx, ebName).Writer()) } writer, _ := v.(api.BusWriter) - eventID, err := writer.AppendOne(_ctx, &event) + eventID, err := api.AppendOne(ctx, writer, &event) if err != nil { log.Warning(_ctx, "append to failed", map[string]interface{}{ log.KeyError: err, diff --git a/internal/gateway/gateway_test.go b/internal/gateway/gateway_test.go index 4d5d8b974..5d464be3e 100644 --- a/internal/gateway/gateway_test.go +++ b/internal/gateway/gateway_test.go @@ -148,7 +148,7 @@ func TestGateway_EventID(t *testing.T) { ctrl := NewController(t) defer ctrl.Finish() var ( - eventID = "AABBCC" + eventIDs = "AABBCC" busName = "test" controllers = []string{"127.0.0.1:2048"} port = 8087 @@ -159,7 +159,7 @@ func TestGateway_EventID(t *testing.T) { mockBusWriter := api.NewMockBusWriter(ctrl) mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) - mockBusWriter.EXPECT().AppendOne(Any(), Any()).AnyTimes().Return("AABBCC", nil) + mockBusWriter.EXPECT().Append(Any(), Any()).AnyTimes().Return([]string{"AABBCC"}, nil) cfg := Config{ Port: port, @@ -198,6 +198,6 @@ func TestGateway_EventID(t *testing.T) { err = resEvent.DataAs(&ed) So(err, ShouldBeNil) So(ed.BusName, ShouldEqual, busName) - So(ed.EventID, ShouldEqual, eventID) + So(ed.EventID, ShouldEqual, eventIDs) }) } diff --git a/internal/gateway/proxy/deadletter.go b/internal/gateway/proxy/deadletter.go index 861431fe9..e584e3f3d 100644 --- a/internal/gateway/proxy/deadletter.go +++ b/internal/gateway/proxy/deadletter.go @@ -20,7 +20,7 @@ import ( "fmt" v2 "github.com/cloudevents/sdk-go/v2" - "github.com/linkall-labs/vanus/client/pkg/codec" + "github.com/linkall-labs/vanus/client/pkg/api" "github.com/linkall-labs/vanus/client/pkg/eventlog" "github.com/linkall-labs/vanus/client/pkg/option" "github.com/linkall-labs/vanus/client/pkg/policy" @@ -28,6 +28,7 @@ import ( "github.com/linkall-labs/vanus/internal/primitive/vanus" "github.com/linkall-labs/vanus/pkg/errors" "github.com/linkall-labs/vanus/proto/pkg/cloudevents" + "github.com/linkall-labs/vanus/proto/pkg/codec" ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller" proxypb "github.com/linkall-labs/vanus/proto/pkg/proxy" "google.golang.org/protobuf/types/known/emptypb" @@ -88,7 +89,7 @@ func (cp *ControllerProxy) GetDeadLetterEvent(ctx context.Context, var events []*v2.Event loop: for { - _events, _, _, err := busReader.Read(ctx) + _events, _, _, err := api.Read(ctx, busReader) if err != nil { if errors.Is(err, errors.ErrOffsetOnEnd) { // read end @@ -170,7 +171,7 @@ func (cp *ControllerProxy) ResendDeadLetterEvent(ctx context.Context, var events []*cloudevents.CloudEvent loop: for { - _events, _, _, err := busReader.Read(ctx) + _events, _, _, err := api.Read(ctx, busReader) if err != nil { if errors.Is(err, errors.ErrOffsetOnEnd) { // read end diff --git a/internal/gateway/proxy/proxy.go b/internal/gateway/proxy/proxy.go index a262400c7..30a495bf6 100644 --- a/internal/gateway/proxy/proxy.go +++ b/internal/gateway/proxy/proxy.go @@ -214,7 +214,7 @@ func (cp *ControllerProxy) writeEvents(ctx context.Context, val, _ = cp.writerMap.LoadOrStore(eventbusName, cp.client.Eventbus(ctx, eventbusName).Writer()) } w, _ := val.(api.BusWriter) - err := w.AppendBatch(ctx, events) + _, err := w.Append(ctx, events) if err != nil { log.Warning(ctx, "append to failed", map[string]interface{}{ log.KeyError: err, @@ -713,12 +713,12 @@ func (cp *ControllerProxy) GetEvent(ctx context.Context, if err != nil { return nil, err } - - events, _, _, err := cp.client.Eventbus(ctx, req.GetEventbus()).Reader( + reader := cp.client.Eventbus(ctx, req.GetEventbus()).Reader( option.WithDisablePolling(), option.WithReadPolicy(policy.NewManuallyReadPolicy(ls[0], offset)), option.WithBatchSize(int(num)), - ).Read(ctx) + ) + events, _, _, err := api.Read(ctx, reader) if err != nil { return nil, err } @@ -797,10 +797,11 @@ func (cp *ControllerProxy) getByEventID(ctx context.Context, return nil, err } - events, _, _, err := cp.client.Eventbus(ctx, req.GetEventbus()).Reader( + reader := cp.client.Eventbus(ctx, req.GetEventbus()).Reader( option.WithReadPolicy(policy.NewManuallyReadPolicy(l, off)), option.WithDisablePolling(), - ).Read(ctx) + ) + events, _, _, err := api.Read(ctx, reader) if err != nil { return nil, err } diff --git a/internal/gateway/proxy/proxy_test.go b/internal/gateway/proxy/proxy_test.go index 1d2a983c8..9aec134ae 100644 --- a/internal/gateway/proxy/proxy_test.go +++ b/internal/gateway/proxy/proxy_test.go @@ -30,6 +30,8 @@ import ( "github.com/linkall-labs/vanus/internal/convert" "github.com/linkall-labs/vanus/internal/primitive" "github.com/linkall-labs/vanus/internal/primitive/vanus" + "github.com/linkall-labs/vanus/proto/pkg/cloudevents" + "github.com/linkall-labs/vanus/proto/pkg/codec" ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller" metapb "github.com/linkall-labs/vanus/proto/pkg/meta" proxypb "github.com/linkall-labs/vanus/proto/pkg/proxy" @@ -105,7 +107,11 @@ func TestControllerProxy_GetEvent(t *testing.T) { "string": "string", }) So(err, ShouldBeNil) - reader.EXPECT().Read(gomock.Any()).Times(2).Return([]*v2.Event{&e1}, int64(0), id, nil) + epb1, _ := codec.ToProto(&e1) + batchret := &cloudevents.CloudEventBatch{ + Events: []*cloudevents.CloudEvent{epb1}, + } + reader.EXPECT().Read(gomock.Any()).Times(2).Return(batchret, int64(0), id, nil) res, err := cp.GetEvent(stdCtx.Background(), &proxypb.GetEventRequest{ Eventbus: "ut1", Offset: -123, @@ -145,6 +151,7 @@ func TestControllerProxy_GetEvent(t *testing.T) { }) events := make([]*v2.Event, maximumNumberPerGetRequest) + eventpbs := make([]*cloudevents.CloudEvent, maximumNumberPerGetRequest) for idx := 0; idx < maximumNumberPerGetRequest; idx++ { e := v2.NewEvent() e.SetID(fmt.Sprintf("ut%d", idx)) @@ -159,9 +166,14 @@ func TestControllerProxy_GetEvent(t *testing.T) { }) So(err, ShouldBeNil) events[idx] = &e + epb, _ := codec.ToProto(&e) + eventpbs[idx] = epb } - reader.EXPECT().Read(gomock.Any()).Times(1).Return(events, int64(1234), id, nil) + ret := &cloudevents.CloudEventBatch{ + Events: eventpbs, + } + reader.EXPECT().Read(gomock.Any()).Times(1).Return(ret, int64(1234), id, nil) res, err := cp.GetEvent(stdCtx.Background(), &proxypb.GetEventRequest{ Eventbus: "ut1", Offset: 1234, @@ -208,8 +220,11 @@ func TestControllerProxy_GetEvent(t *testing.T) { "string": "string", }) So(err, ShouldBeNil) - - reader.EXPECT().Read(gomock.Any()).Times(1).Return([]*v2.Event{&e}, offset, id, nil) + epb1, _ := codec.ToProto(&e) + ret := &cloudevents.CloudEventBatch{ + Events: []*cloudevents.CloudEvent{epb1}, + } + reader.EXPECT().Read(gomock.Any()).Times(1).Return(ret, offset, id, nil) res, err := cp.GetEvent(stdCtx.Background(), &proxypb.GetEventRequest{ Eventbus: "ut1", EventId: base64.StdEncoding.EncodeToString(b), @@ -348,7 +363,11 @@ func TestControllerProxy_ValidateSubscription(t *testing.T) { eb.EXPECT().ListLog(gomock.Any()).Times(1).Return([]api.Eventlog{nil}, nil) rd := api.NewMockBusReader(ctrl) eb.EXPECT().Reader(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(rd) - rd.EXPECT().Read(gomock.Any()).Times(1).Return([]*v2.Event{&e}, int64(0), uint64(0), nil) + epb, _ := codec.ToProto(&e) + ret := &cloudevents.CloudEventBatch{ + Events: []*cloudevents.CloudEvent{epb}, + } + rd.EXPECT().Read(gomock.Any()).Times(1).Return(ret, int64(0), uint64(0), nil) // mock subscription pb := &metapb.Subscription{ diff --git a/internal/store/segment/server.go b/internal/store/segment/server.go index 90664aa89..0d285cce6 100644 --- a/internal/store/segment/server.go +++ b/internal/store/segment/server.go @@ -554,11 +554,9 @@ func (s *server) AppendToBlock(ctx context.Context, id vanus.ID, events []*cepb. metrics.WriteTPSCounterVec.WithLabelValues(s.volumeIDStr, b.IDStr(), metrics.LabelFailed).Add(float64(len(events))) metrics.WriteThroughputCounterVec.WithLabelValues(s.volumeIDStr, b.IDStr(), metrics.LabelFailed).Add(float64(size)) return nil, s.processAppendError(ctx, b, err) - } else { - metrics.WriteTPSCounterVec.WithLabelValues(s.volumeIDStr, b.IDStr(), metrics.LabelSuccess).Add(float64(len(events))) - metrics.WriteThroughputCounterVec.WithLabelValues(s.volumeIDStr, b.IDStr(), metrics.LabelSuccess).Add(float64(size)) } - + metrics.WriteTPSCounterVec.WithLabelValues(s.volumeIDStr, b.IDStr(), metrics.LabelSuccess).Add(float64(len(events))) + metrics.WriteThroughputCounterVec.WithLabelValues(s.volumeIDStr, b.IDStr(), metrics.LabelSuccess).Add(float64(size)) return seqs, nil } diff --git a/internal/timer/timingwheel/bucket.go b/internal/timer/timingwheel/bucket.go index c4903ada1..9c963b320 100644 --- a/internal/timer/timingwheel/bucket.go +++ b/internal/timer/timingwheel/bucket.go @@ -349,7 +349,7 @@ func (b *bucket) putEvent(ctx context.Context, tm *timingMsg) (err error) { if !b.isLeader() { return nil } - _, err = b.eventbusWriter.AppendOne(ctx, tm.getEvent()) + _, err = api.AppendOne(ctx, b.eventbusWriter, tm.getEvent()) if err != nil { log.Error(ctx, "append event to failed", map[string]interface{}{ log.KeyError: err, @@ -387,7 +387,7 @@ func (b *bucket) getEvent(ctx context.Context, number int16) (events []*ce.Event } readPolicy := option.WithReadPolicy(policy.NewManuallyReadPolicy(ls[0], b.offset)) - events, _, _, err = b.eventbusReader.Read(ctx, readPolicy, option.WithBatchSize(int(number))) + events, _, _, err = api.Read(ctx, b.eventbusReader, readPolicy, option.WithBatchSize(int(number))) if err != nil { if !errors.Is(err, errors.ErrOffsetOnEnd) && !stderr.Is(ctx.Err(), context.Canceled) { log.Error(ctx, "read failed", map[string]interface{}{ diff --git a/internal/timer/timingwheel/bucket_test.go b/internal/timer/timingwheel/bucket_test.go index 8ef2cbf0a..a7bf46499 100644 --- a/internal/timer/timingwheel/bucket_test.go +++ b/internal/timer/timingwheel/bucket_test.go @@ -28,6 +28,8 @@ import ( "github.com/linkall-labs/vanus/internal/kv" "github.com/linkall-labs/vanus/pkg/cluster" "github.com/linkall-labs/vanus/pkg/errors" + "github.com/linkall-labs/vanus/proto/pkg/cloudevents" + "github.com/linkall-labs/vanus/proto/pkg/codec" . "github.com/smartystreets/goconvey/convey" ) @@ -135,8 +137,6 @@ func TestBucket_run(t *testing.T) { bucket.client = mockClient tw.client = mockClient tw.distributionStation.eventbusWriter = mockBusWriter - events := make([]*ce.Event, 1) - events[0] = event(0) for e := tw.twList.Front(); e != nil; e = e.Next() { for _, bucket := range e.Value.(*timingWheelElement).buckets { bucket.eventbusWriter = mockBusWriter @@ -147,9 +147,9 @@ func TestBucket_run(t *testing.T) { Convey("get event failed", func() { mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) - mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(events, int64(0), uint64(0), stderr.New("test")) + mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(batch(0), int64(0), uint64(0), stderr.New("test")) go func() { - time.Sleep(100 * time.Millisecond) + time.Sleep(200 * time.Millisecond) cancel() }() bucket.run(ctx) @@ -158,7 +158,7 @@ func TestBucket_run(t *testing.T) { Convey("get event on end", func() { mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) - mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(events, int64(0), uint64(0), errors.ErrOffsetOnEnd) + mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(batch(0), int64(0), uint64(0), errors.ErrOffsetOnEnd) go func() { time.Sleep(100 * time.Millisecond) cancel() @@ -169,7 +169,7 @@ func TestBucket_run(t *testing.T) { Convey("bucket exit", func() { mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) - mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(events, int64(0), uint64(0), errors.ErrOffsetOnEnd) + mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(batch(0), int64(0), uint64(0), errors.ErrOffsetOnEnd) go func() { time.Sleep(100 * time.Millisecond) close(bucket.exitC) @@ -180,8 +180,8 @@ func TestBucket_run(t *testing.T) { Convey("push failed", func() { mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) - mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(events, int64(0), uint64(0), nil) - mockBusWriter.EXPECT().AppendOne(Any(), Any()).AnyTimes().Return("", stderr.New("test")) + mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(batch(0), int64(0), uint64(0), nil) + mockBusWriter.EXPECT().Append(Any(), Any()).AnyTimes().Return([]string{""}, stderr.New("test")) mockStoreCli.EXPECT().Set(Any(), Any(), Any()).AnyTimes().Return(nil) go func() { time.Sleep(100 * time.Millisecond) @@ -192,14 +192,13 @@ func TestBucket_run(t *testing.T) { }) Convey("flow failed", func() { - events[0] = event(1000) bucket.layer = 2 bucket.waitingForReady = bucket.waitingForFlow bucket.eventHandler = bucket.pushToPrevTimingWheel bucket.element = tw.twList.Front().Next() mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) - mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(events, int64(0), uint64(0), nil) - mockBusWriter.EXPECT().AppendOne(Any(), Any()).AnyTimes().Return("", stderr.New("test")) + mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(batch(1000), int64(0), uint64(0), nil) + mockBusWriter.EXPECT().Append(Any(), Any()).AnyTimes().Return([]string{""}, stderr.New("test")) mockStoreCli.EXPECT().Set(Any(), Any(), Any()).AnyTimes().Return(nil) go func() { time.Sleep(100 * time.Millisecond) @@ -210,14 +209,13 @@ func TestBucket_run(t *testing.T) { }) Convey("flow success", func() { - events[0] = event(1000) bucket.layer = 2 bucket.waitingForReady = bucket.waitingForFlow bucket.eventHandler = bucket.pushToPrevTimingWheel bucket.element = tw.twList.Front().Next() mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) - mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(events, int64(0), uint64(0), nil) - mockBusWriter.EXPECT().AppendOne(Any(), Any()).AnyTimes().Return("", nil) + mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(batch(1000), int64(0), uint64(0), nil) + mockBusWriter.EXPECT().Append(Any(), Any()).AnyTimes().Return([]string{""}, nil) mockStoreCli.EXPECT().Set(Any(), Any(), Any()).AnyTimes().Return(nil) go func() { time.Sleep(100 * time.Millisecond) @@ -229,8 +227,8 @@ func TestBucket_run(t *testing.T) { Convey("push success", func() { mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) - mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(events, int64(0), uint64(0), nil) - mockBusWriter.EXPECT().AppendOne(Any(), Any()).AnyTimes().Return("", nil) + mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(batch(1000), int64(0), uint64(0), nil) + mockBusWriter.EXPECT().Append(Any(), Any()).AnyTimes().Return([]string{""}, nil) mockStoreCli.EXPECT().Set(Any(), Any(), Any()).AnyTimes().Return(nil) go func() { time.Sleep(100 * time.Millisecond) @@ -329,10 +327,8 @@ func TestBucket_getEvent(t *testing.T) { bucket.client = mockClient Convey("test bucket get event success", func() { - events := make([]*ce.Event, 1) - events[0] = event(10000) mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) - mockBusReader.EXPECT().Read(Any(), Any()).AnyTimes().Return(events, int64(0), uint64(0), nil) + mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(batch(10000), int64(0), uint64(0), nil) result, err := bucket.getEvent(ctx, 1) So(len(result), ShouldEqual, 0) So(err, ShouldNotBeNil) @@ -349,10 +345,8 @@ func TestBucket_getEvent(t *testing.T) { Convey("test bucket get event failed", func() { tw.SetLeader(true) - events := make([]*ce.Event, 1) - events[0] = event(10000) mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) - mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(events, int64(0), uint64(0), stderr.New("test")) + mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(batch(10000), int64(0), uint64(0), stderr.New("test")) _, err := bucket.getEvent(ctx, 1) So(err, ShouldNotBeNil) }) @@ -477,8 +471,6 @@ func TestBucket_hasOnEnd(t *testing.T) { mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) mockStoreCli := kv.NewMockClient(mockCtrl) bucket.kvStore = mockStoreCli - events := make([]*ce.Event, 1) - events[0] = event(0) Convey("test bucket has on end1", func() { mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, stderr.New("test")) @@ -489,7 +481,7 @@ func TestBucket_hasOnEnd(t *testing.T) { Convey("test bucket has on end2", func() { mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) - mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(events, int64(0), uint64(0), nil) + mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(batch(0), int64(0), uint64(0), nil) mockStoreCli.EXPECT().Get(Any(), Any()).Times(1).Return([]byte{}, stderr.New("test")) ret := bucket.hasOnEnd(ctx) So(ret, ShouldBeFalse) @@ -497,7 +489,7 @@ func TestBucket_hasOnEnd(t *testing.T) { Convey("test bucket has on end3", func() { mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) - mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(events, int64(0), uint64(0), nil) + mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(batch(0), int64(0), uint64(0), nil) mockStoreCli.EXPECT().Get(Any(), Any()).Times(1).Return([]byte{}, nil) ret := bucket.hasOnEnd(ctx) So(ret, ShouldBeTrue) @@ -559,3 +551,15 @@ func event(i int64) *ce.Event { e.SetExtension(xVanusEventbus, "quick-start") return &e } + +func batch(i int64) *cloudevents.CloudEventBatch { + e := ce.NewEvent() + t := time.Now().Add(time.Duration(i) * time.Millisecond).UTC().Format(time.RFC3339) + e.SetExtension(xVanusDeliveryTime, t) + e.SetExtension(xVanusEventbus, "quick-start") + events := make([]*cloudevents.CloudEvent, 1) + events[0], _ = codec.ToProto(&e) + return &cloudevents.CloudEventBatch{ + Events: events, + } +} diff --git a/internal/timer/timingwheel/timingwheel.go b/internal/timer/timingwheel/timingwheel.go index 94da5ee94..3f29ec5ff 100644 --- a/internal/timer/timingwheel/timingwheel.go +++ b/internal/timer/timingwheel/timingwheel.go @@ -582,7 +582,7 @@ func (tw *timingWheel) deliver(ctx context.Context, e *ce.Event) error { v, _ = tw.cache.LoadOrStore(ebName, tw.client.Eventbus(ctx, ebName).Writer()) } writer, _ := v.(api.BusWriter) - _, err = writer.AppendOne(ctx, e) + _, err = api.AppendOne(ctx, writer, e) if err != nil { if errors.Is(err, errors.ErrOffsetOnEnd) { log.Warning(ctx, "eventbus not found, discard this event", map[string]interface{}{ diff --git a/internal/timer/timingwheel/timingwheel_test.go b/internal/timer/timingwheel/timingwheel_test.go index e60a17d13..b448d50ac 100644 --- a/internal/timer/timingwheel/timingwheel_test.go +++ b/internal/timer/timingwheel/timingwheel_test.go @@ -62,7 +62,7 @@ func TestTimingWheel_Start(t *testing.T) { mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) - mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return([]*ce.Event{}, int64(0), uint64(0), nil) + mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(batch(0), int64(0), uint64(0), nil) for e := tw.twList.Front(); e != nil; { for _, bucket := range e.Value.(*timingWheelElement).buckets { bucket.eventbusReader = mockBusReader @@ -305,12 +305,10 @@ func TestTimingWheel_runReceivingStation(t *testing.T) { mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) - events := make([]*ce.Event, 1) - events[0] = event(0) Convey("test bucket run receiving station with get event failed", func() { mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) - mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(events, int64(0), uint64(0), stderr.New("test")) + mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(batch(0), int64(0), uint64(0), stderr.New("test")) go func() { time.Sleep(100 * time.Millisecond) cancel() @@ -321,8 +319,8 @@ func TestTimingWheel_runReceivingStation(t *testing.T) { Convey("test timingwheel run receiving station with start failure", func() { mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) - mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(events, int64(0), uint64(0), nil) - mockBusWriter.EXPECT().AppendOne(Any(), Any()).AnyTimes().Return("", stderr.New("test")) + mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(batch(0), int64(0), uint64(0), nil) + mockBusWriter.EXPECT().Append(Any(), Any()).AnyTimes().Return([]string{""}, stderr.New("test")) mockStoreCli.EXPECT().Set(Any(), Any(), Any()).AnyTimes().Return(nil) go func() { time.Sleep(100 * time.Millisecond) @@ -334,8 +332,8 @@ func TestTimingWheel_runReceivingStation(t *testing.T) { Convey("test timingwheel run receiving station with start success", func() { mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) - mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(events, int64(0), uint64(0), nil) - mockBusWriter.EXPECT().AppendOne(Any(), Any()).AnyTimes().Return("", nil) + mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(batch(0), int64(0), uint64(0), nil) + mockBusWriter.EXPECT().Append(Any(), Any()).AnyTimes().Return([]string{""}, nil) mockStoreCli.EXPECT().Set(Any(), Any(), Any()).AnyTimes().Return(nil) go func() { time.Sleep(100 * time.Millisecond) @@ -408,12 +406,10 @@ func TestTimingWheel_runDistributionStation(t *testing.T) { tw.distributionStation.kvStore = mockStoreCli tw.distributionStation.timingwheel = tw tw.distributionStation.client = mockClient - events := make([]*ce.Event, 1) - events[0] = event(0) Convey("test timingwheel run distribution station with get event failed", func() { mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) - mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(events, int64(0), uint64(0), stderr.New("test")) + mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(batch(0), int64(0), uint64(0), stderr.New("test")) go func() { time.Sleep(100 * time.Millisecond) cancel() @@ -423,8 +419,8 @@ func TestTimingWheel_runDistributionStation(t *testing.T) { }) Convey("test timingwheel run distribution station with deliver failed", func() { - mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(events, int64(0), uint64(0), nil) - mockBusWriter.EXPECT().AppendOne(Any(), Any()).AnyTimes().Return("", errors.ErrNotWritable) + mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(batch(0), int64(0), uint64(0), nil) + mockBusWriter.EXPECT().Append(Any(), Any()).AnyTimes().Return([]string{""}, errors.ErrNotWritable) mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) mockStoreCli.EXPECT().Set(Any(), Any(), Any()).AnyTimes().Return(nil) go func() { @@ -436,8 +432,8 @@ func TestTimingWheel_runDistributionStation(t *testing.T) { }) Convey("test timingwheel run distribution station with deliver success", func() { - mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(events, int64(0), uint64(0), nil) - mockBusWriter.EXPECT().AppendOne(Any(), Any()).AnyTimes().Return("", nil) + mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().Return(batch(0), int64(0), uint64(0), nil) + mockBusWriter.EXPECT().Append(Any(), Any()).AnyTimes().Return([]string{""}, nil) mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) mockStoreCli.EXPECT().Set(Any(), Any(), Any()).AnyTimes().Return(nil) go func() { @@ -472,19 +468,19 @@ func TestTimingWheel_deliver(t *testing.T) { }) Convey("test timingwheel deliver failure with eventbus not found", func() { - mockBusWriter.EXPECT().AppendOne(Any(), Any()).AnyTimes().Return("", errors.ErrOffsetOnEnd) + mockBusWriter.EXPECT().Append(Any(), Any()).AnyTimes().Return([]string{""}, errors.ErrOffsetOnEnd) err := tw.deliver(ctx, e) So(err, ShouldBeNil) }) Convey("test timingwheel deliver failure with append failed", func() { - mockBusWriter.EXPECT().AppendOne(Any(), Any()).AnyTimes().Return("", errors.ErrNotWritable) + mockBusWriter.EXPECT().Append(Any(), Any()).AnyTimes().Return([]string{""}, errors.ErrNotWritable) err := tw.deliver(ctx, e) So(err, ShouldNotBeNil) }) Convey("test timingwheel deliver success", func() { - mockBusWriter.EXPECT().AppendOne(Any(), Any()).AnyTimes().Return("", nil) + mockBusWriter.EXPECT().Append(Any(), Any()).AnyTimes().Return([]string{""}, nil) err := tw.deliver(ctx, e) So(err, ShouldBeNil) }) @@ -526,7 +522,7 @@ func TestTimingWheelElement_push(t *testing.T) { Convey("push timing message to timingwheel failure", func() { tw.SetLeader(true) - mockBusWriter.EXPECT().AppendOne(Any(), Any()).AnyTimes().Return("", stderr.New("test")) + mockBusWriter.EXPECT().Append(Any(), Any()).AnyTimes().Return([]string{""}, stderr.New("test")) tm := newTimingMsg(ctx, event(1000)) twe := tw.twList.Front().Value.(*timingWheelElement) result := twe.push(ctx, tm) @@ -635,7 +631,7 @@ func TestTimingWheelElement_flow(t *testing.T) { Convey("flow timing message failure causes append failed", func() { tw.SetLeader(true) tm := newTimingMsg(ctx, event(1000)) - mockBusWriter.EXPECT().AppendOne(Any(), Any()).AnyTimes().Return("", stderr.New("test")) + mockBusWriter.EXPECT().Append(Any(), Any()).AnyTimes().Return([]string{""}, stderr.New("test")) result := tw.twList.Front().Value.(*timingWheelElement).flow(ctx, tm) So(result, ShouldEqual, false) }) diff --git a/internal/trigger/client/grpc.go b/internal/trigger/client/grpc.go index b8cf2b1a4..7258e14d5 100644 --- a/internal/trigger/client/grpc.go +++ b/internal/trigger/client/grpc.go @@ -20,8 +20,8 @@ import ( "time" ce "github.com/cloudevents/sdk-go/v2" - "github.com/linkall-labs/vanus/client/pkg/codec" "github.com/linkall-labs/vanus/proto/pkg/cloudevents" + "github.com/linkall-labs/vanus/proto/pkg/codec" "github.com/pkg/errors" stdGrpc "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" diff --git a/internal/trigger/reader/reader.go b/internal/trigger/reader/reader.go index 41c65e7f9..46eab380a 100644 --- a/internal/trigger/reader/reader.go +++ b/internal/trigger/reader/reader.go @@ -237,6 +237,6 @@ func (elReader *eventLogReader) putEvent(ctx context.Context, event info.EventRe func readEvents(ctx context.Context, lr api.BusReader) ([]*ce.Event, error) { timeout, cancel := context.WithTimeout(ctx, readEventTimeout) defer cancel() - events, _, _, err := lr.Read(timeout) + events, _, _, err := api.Read(timeout, lr) return events, err } diff --git a/internal/trigger/reader/reader_test.go b/internal/trigger/reader/reader_test.go index 54476d8dd..583e793a7 100644 --- a/internal/trigger/reader/reader_test.go +++ b/internal/trigger/reader/reader_test.go @@ -28,6 +28,8 @@ import ( "github.com/linkall-labs/vanus/client/pkg/api" "github.com/linkall-labs/vanus/client/pkg/eventlog" "github.com/linkall-labs/vanus/internal/trigger/info" + "github.com/linkall-labs/vanus/proto/pkg/cloudevents" + "github.com/linkall-labs/vanus/proto/pkg/codec" . "github.com/smartystreets/goconvey/convey" ) @@ -52,7 +54,7 @@ func TestReaderStart(t *testing.T) { mockEventlog.EXPECT().LatestOffset(Any()).AnyTimes().Return(offset, nil) mockEventlog.EXPECT().EarliestOffset(Any()).AnyTimes().Return(offset, nil) mockBusReader.EXPECT().Read(Any()).AnyTimes().DoAndReturn( - func(ctx context.Context, opts ...api.ReadOption) ([]*ce.Event, int64, uint64, error) { + func(ctx context.Context, opts ...api.ReadOption) (*cloudevents.CloudEventBatch, int64, uint64, error) { time.Sleep(time.Millisecond) e := ce.NewEvent() e.SetID(uuid.NewString()) @@ -60,7 +62,11 @@ func TestReaderStart(t *testing.T) { binary.BigEndian.PutUint64(buf, index) e.SetExtension(eventlog.XVanusLogOffset, buf) index++ - return []*ce.Event{&e}, int64(0), uint64(0), nil + epb, _ := codec.ToProto(&e) + return &cloudevents.CloudEventBatch{ + Events: []*cloudevents.CloudEvent{epb}, + }, int64(0), uint64(0), nil + // return []*ce.Event{&e}, int64(0), uint64(0), nil }) eventCh := make(chan info.EventRecord, 100) r := NewReader(Config{EventBusName: "test", BatchSize: 1}, eventCh).(*reader) diff --git a/internal/trigger/trigger/trigger.go b/internal/trigger/trigger/trigger.go index 97d9fc190..5cfea68cd 100644 --- a/internal/trigger/trigger/trigger.go +++ b/internal/trigger/trigger/trigger.go @@ -443,7 +443,7 @@ func (t *trigger) writeEventToRetry(ctx context.Context, e *ce.Event, attempts i for { writeAttempt++ startTime := time.Now() - _, err := t.timerEventWriter.AppendOne(ctx, e) + _, err := api.AppendOne(ctx, t.timerEventWriter, e) metrics.TriggerRetryEventAppendSecond.WithLabelValues(t.subscriptionIDStr). Observe(time.Since(startTime).Seconds()) if err != nil { @@ -478,7 +478,7 @@ func (t *trigger) writeEventToDeadLetter(ctx context.Context, e *ce.Event, reaso for { writeAttempt++ startTime := time.Now() - _, err := t.dlEventWriter.AppendOne(ctx, e) + _, err := api.AppendOne(ctx, t.dlEventWriter, e) metrics.TriggerDeadLetterEventAppendSecond.WithLabelValues(t.subscriptionIDStr). Observe(time.Since(startTime).Seconds()) if err != nil { diff --git a/internal/trigger/trigger/trigger_test.go b/internal/trigger/trigger/trigger_test.go index b5a5cce3c..604184e79 100644 --- a/internal/trigger/trigger/trigger_test.go +++ b/internal/trigger/trigger/trigger_test.go @@ -34,6 +34,7 @@ import ( "github.com/linkall-labs/vanus/internal/trigger/client" "github.com/linkall-labs/vanus/internal/trigger/info" "github.com/linkall-labs/vanus/internal/trigger/reader" + "github.com/linkall-labs/vanus/proto/pkg/cloudevents" . "github.com/smartystreets/goconvey/convey" ) @@ -124,13 +125,13 @@ func TestTriggerWriteFailEvent(t *testing.T) { tg.dlEventWriter = mockBusWriter tg.timerEventWriter = mockBusWriter var callCount int - mockBusWriter.EXPECT().AppendOne(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(func(context.Context, - *ce.Event, ...api.WriteOption) (string, error) { + mockBusWriter.EXPECT().Append(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(func(context.Context, + *cloudevents.CloudEventBatch, ...api.WriteOption) ([]string, error) { callCount++ if callCount%2 != 0 { - return "", fmt.Errorf("append error") + return []string{""}, fmt.Errorf("append error") } - return "", nil + return []string{""}, nil }) Convey("test no need retry,in dlq", func() { tg.writeFailEvent(ctx, e.Event, 400, fmt.Errorf("400 error")) diff --git a/client/pkg/codec/protobuf.go b/proto/pkg/codec/protobuf.go similarity index 100% rename from client/pkg/codec/protobuf.go rename to proto/pkg/codec/protobuf.go