Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: merge append interface #433

Merged
merged 3 commits into from
Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
package client

import (
// standard libraries
"context"
"sync"

// first-party libraries
"github.com/linkall-labs/vanus/observability/tracing"

// this project
eb "github.com/linkall-labs/vanus/client/internal/vanus/eventbus"
"github.com/linkall-labs/vanus/client/pkg/api"
"github.com/linkall-labs/vanus/client/pkg/eventbus"
"github.com/linkall-labs/vanus/observability/tracing"
)

type Client interface {
Expand Down
11 changes: 10 additions & 1 deletion client/examples/eventbus/append/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/linkall-labs/vanus/client"
"github.com/linkall-labs/vanus/client/pkg/option"
"github.com/linkall-labs/vanus/client/pkg/policy"
"github.com/linkall-labs/vanus/proto/pkg/cloudevents"
"github.com/linkall-labs/vanus/proto/pkg/codec"
)

func main() {
Expand All @@ -42,7 +44,14 @@ func main() {
event.SetType("example.type")
event.SetData(ce.ApplicationJSON, map[string]string{"hello": "world"})

eventID, err := w.AppendOne(ctx, &event, option.WithWritePolicy(policy.NewRoundRobinWritePolicy(bus)))
eventpb, err := codec.ToProto(&event)
if err != nil {
return
}
ceBatch := &cloudevents.CloudEventBatch{
Events: []*cloudevents.CloudEvent{eventpb},
}
eventID, err := w.Append(ctx, ceBatch, option.WithWritePolicy(policy.NewRoundRobinWritePolicy(bus)))
if err != nil {
log.Print(err.Error())
} else {
Expand Down
20 changes: 15 additions & 5 deletions client/internal/vanus/eventbus/name_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ package eventbus
import (
// standard libraries
"context"
"github.com/linkall-labs/vanus/pkg/cluster"

"github.com/linkall-labs/vanus/observability/tracing"
// third-party libraries
"go.opentelemetry.io/otel/trace"

"google.golang.org/grpc/credentials/insecure"

// first-party libraries
"github.com/linkall-labs/vanus/client/pkg/record"
"github.com/linkall-labs/vanus/observability/log"
"github.com/linkall-labs/vanus/observability/tracing"
"github.com/linkall-labs/vanus/pkg/cluster"
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
metapb "github.com/linkall-labs/vanus/proto/pkg/meta"

// this project
"github.com/linkall-labs/vanus/client/pkg/record"
)

func NewNameService(endpoints []string) *NameService {
Expand All @@ -51,8 +54,11 @@ func (ns *NameService) LookupWritableLogs(ctx context.Context, eventbus string)
}

resp, err := ns.client.GetEventBus(ctx, req)

if err != nil {
log.Error(context.Background(), "get eventbus failed", map[string]interface{}{
log.KeyError: err,
"eventbus": eventbus,
})
return nil, err
}
return toLogs(resp.GetLogs()), nil
Expand All @@ -68,6 +74,10 @@ func (ns *NameService) LookupReadableLogs(ctx context.Context, eventbus string)

resp, err := ns.client.GetEventBus(ctx, req)
if err != nil {
log.Error(context.Background(), "get eventbus failed", map[string]interface{}{
log.KeyError: err,
"eventbus": eventbus,
})
return nil, err
}

Expand Down
17 changes: 9 additions & 8 deletions client/internal/vanus/eventlog/name_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ import (
"github.com/linkall-labs/vanus/observability/log"
"github.com/linkall-labs/vanus/observability/tracing"
"github.com/linkall-labs/vanus/pkg/cluster"
"github.com/linkall-labs/vanus/pkg/errors"
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
metapb "github.com/linkall-labs/vanus/proto/pkg/meta"

// this project.
"github.com/linkall-labs/vanus/client/pkg/record"
"github.com/linkall-labs/vanus/pkg/errors"
)

func NewNameService(endpoints []string) *NameService {
Expand All @@ -59,18 +59,14 @@ func (ns *NameService) LookupWritableSegment(ctx context.Context, logID uint64)

resp, err := ns.client.GetAppendableSegment(ctx, req)
if err != nil {
log.Warning(ctx, "failed to GetAppendableSegment", map[string]interface{}{
"req": req,
"res": resp,
log.Error(context.Background(), "get appendable segment failed", map[string]interface{}{
log.KeyError: err,
"eventlog": logID,
"resp": resp.String(),
})
return nil, err
}

log.Debug(ctx, "GetAppendableSegment result", map[string]interface{}{
"req": req,
"res": resp,
})
segments := toSegments(resp.GetSegments())
if len(segments) == 0 {
return nil, errors.ErrNotWritable
Expand All @@ -91,6 +87,11 @@ func (ns *NameService) LookupReadableSegments(ctx context.Context, logID uint64)

resp, err := ns.client.ListSegment(ctx, req)
if err != nil {
log.Error(context.Background(), "list segment failed", map[string]interface{}{
log.KeyError: err,
"eventlog": logID,
"resp": resp.String(),
})
return nil, err
}

Expand Down
10 changes: 5 additions & 5 deletions client/internal/vanus/store/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
package store

import (
// standard libraries
"context"
"sync"

"github.com/linkall-labs/vanus/observability/tracing"
// third-party libraries
"go.opentelemetry.io/otel/trace"

// standard libraries.
"sync"

// this project.
// first-party libraries
"github.com/linkall-labs/vanus/observability/tracing"
"github.com/linkall-labs/vanus/pkg/errors"
)

Expand Down
75 changes: 14 additions & 61 deletions client/internal/vanus/store/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,20 @@ import (
"context"
"time"

"github.com/linkall-labs/vanus/client/pkg/codec"
// third-party libraries

"github.com/linkall-labs/vanus/observability/tracing"
"go.opentelemetry.io/otel/trace"

ce "github.com/cloudevents/sdk-go/v2"
"google.golang.org/grpc"

// first-party libraries
// third-party libraries
cepb "github.com/linkall-labs/vanus/proto/pkg/cloudevents"
"github.com/linkall-labs/vanus/client/pkg/primitive"
"github.com/linkall-labs/vanus/observability/tracing"
"github.com/linkall-labs/vanus/proto/pkg/cloudevents"
segpb "github.com/linkall-labs/vanus/proto/pkg/segment"

// this project
"github.com/linkall-labs/vanus/client/internal/vanus/net/rpc"
"github.com/linkall-labs/vanus/client/internal/vanus/net/rpc/bare"
"github.com/linkall-labs/vanus/client/pkg/primitive"
)

func newBlockStore(endpoint string) (*BlockStore, error) {
Expand Down Expand Up @@ -67,38 +65,10 @@ func (s *BlockStore) Close() {
s.client.Close()
}

func (s *BlockStore) Append(ctx context.Context, block uint64, event *ce.Event) (int64, error) {
_ctx, span := s.tracer.Start(ctx, "Append")
defer span.End()

eventpb, err := codec.ToProto(event)
if err != nil {
return -1, err
}
req := &segpb.AppendToBlockRequest{
BlockId: block,
Events: &cepb.CloudEventBatch{
Events: []*cepb.CloudEvent{eventpb},
},
}

client, err := s.client.Get(_ctx)
if err != nil {
return -1, err
}

res, err := client.(segpb.SegmentServerClient).AppendToBlock(_ctx, req)
if err != nil {
return -1, err
}
// TODO(Y. F. Zhang): batch events
return res.GetOffsets()[0], nil
}

func (s *BlockStore) Read(
ctx context.Context, block uint64, offset int64, size int16, pollingTimeout uint32,
) ([]*ce.Event, error) {
ctx, span := s.tracer.Start(ctx, "Append")
) (*cloudevents.CloudEventBatch, error) {
ctx, span := s.tracer.Start(ctx, "Read")
defer span.End()

req := &segpb.ReadFromBlockRequest{
Expand All @@ -117,23 +87,7 @@ func (s *BlockStore) Read(
if err != nil {
return nil, err
}

if batch := resp.GetEvents(); batch != nil {
if eventpbs := batch.GetEvents(); len(eventpbs) > 0 {
events := make([]*ce.Event, 0, len(eventpbs))
for _, eventpb := range eventpbs {
event, err2 := codec.FromProto(eventpb)
if err2 != nil {
// TODO: return events or error?
return events, err2
}
events = append(events, event)
}
return events, nil
}
}

return []*ce.Event{}, err
return resp.GetEvents(), err
}

func (s *BlockStore) LookupOffset(ctx context.Context, blockID uint64, t time.Time) (int64, error) {
Expand All @@ -157,24 +111,23 @@ func (s *BlockStore) LookupOffset(ctx context.Context, blockID uint64, t time.Ti
return res.Offset, nil
}

func (s *BlockStore) AppendBatch(ctx context.Context, block uint64, event *cepb.CloudEventBatch) (int64, error) {
_ctx, span := s.tracer.Start(ctx, "AppendBatch")
func (s *BlockStore) Append(ctx context.Context, block uint64, events *cloudevents.CloudEventBatch) ([]int64, error) {
_ctx, span := s.tracer.Start(ctx, "Append")
defer span.End()

req := &segpb.AppendToBlockRequest{
BlockId: block,
Events: event,
Events: events,
}

client, err := s.client.Get(_ctx)
if err != nil {
return -1, err
return nil, err
}

res, err := client.(segpb.SegmentServerClient).AppendToBlock(_ctx, req)
if err != nil {
return -1, err
return nil, err
}
// TODO(Y. F. Zhang): batch events
return res.GetOffsets()[0], nil
return res.GetOffsets(), nil
}
52 changes: 48 additions & 4 deletions client/pkg/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package api

import (
"context"

ce "github.com/cloudevents/sdk-go/v2"
"github.com/linkall-labs/vanus/proto/pkg/cloudevents"
"github.com/linkall-labs/vanus/proto/pkg/codec"
)

type Eventbus interface {
Expand All @@ -31,13 +33,11 @@ type Eventbus interface {
}

type BusWriter interface {
AppendOne(ctx context.Context, event *ce.Event, opts ...WriteOption) (eid string, err error)
AppendMany(ctx context.Context, events []*ce.Event, opts ...WriteOption) (eid string, err error)
AppendBatch(ctx context.Context, events *cloudevents.CloudEventBatch, opts ...WriteOption) (err error)
Append(ctx context.Context, events *cloudevents.CloudEventBatch, opts ...WriteOption) (eids []string, err error)
}

type BusReader interface {
Read(ctx context.Context, opts ...ReadOption) ([]*ce.Event, int64, uint64, error)
Read(ctx context.Context, opts ...ReadOption) (events *cloudevents.CloudEventBatch, off int64, logid uint64, err error)
}

type Eventlog interface {
Expand All @@ -47,3 +47,47 @@ type Eventlog interface {
Length(ctx context.Context) (int64, error)
QueryOffsetByTime(ctx context.Context, timestamp int64) (int64, error)
}

func Append(ctx context.Context, w BusWriter, events []*ce.Event, opts ...WriteOption) (eids []string, err error) {
eventpbs := make([]*cloudevents.CloudEvent, len(events))
for idx := range events {
eventpb, err := codec.ToProto(events[idx])
if err != nil {
return nil, err
}
eventpbs[idx] = eventpb
}
return w.Append(ctx, &cloudevents.CloudEventBatch{
Events: eventpbs,
}, opts...)
}

func AppendOne(ctx context.Context, w BusWriter, event *ce.Event, opts ...WriteOption) (eid string, err error) {
eventpb, err := codec.ToProto(event)
if err != nil {
return "", err
}
eids, err := w.Append(ctx, &cloudevents.CloudEventBatch{
Events: []*cloudevents.CloudEvent{eventpb},
}, opts...)
if err != nil {
return "", err
}
return eids[0], nil
}

func Read(ctx context.Context, r BusReader, opts ...ReadOption) (events []*ce.Event, off int64, logid uint64, err error) {
batch, off, logid, err := r.Read(ctx, opts...)
if err != nil {
return nil, off, logid, err
}
es := make([]*ce.Event, len(batch.Events))
for idx := range batch.Events {
e, err := codec.FromProto(batch.Events[idx])
if err != nil {
return nil, 0, 0, err
}
es[idx] = e
}
return es, off, logid, nil
}
Loading