Skip to content

Commit

Permalink
nits+type assertions
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d committed Dec 2, 2024
1 parent c7a5ddf commit c1ca067
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 27 deletions.
16 changes: 16 additions & 0 deletions pkg/blockbuilder/builder/builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package builder

import (
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
)

// TestBuilder implements Worker interface for testing
type TestBuilder struct {
*Worker
}

func NewTestBuilder(builderID string, transport types.Transport) *TestBuilder {
return &TestBuilder{
Worker: NewWorker(builderID, transport),
}
}
12 changes: 8 additions & 4 deletions pkg/blockbuilder/builder/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,28 @@ import (
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
)

var (
_ types.Transport = unimplementedTransport{}
_ types.Transport = &MemoryTransport{}
)

// unimplementedTransport provides default implementations that panic
type unimplementedTransport struct{}

func (t *unimplementedTransport) SendGetJobRequest(_ context.Context, _ *types.GetJobRequest) (*types.GetJobResponse, error) {
func (t unimplementedTransport) SendGetJobRequest(_ context.Context, _ *types.GetJobRequest) (*types.GetJobResponse, error) {
panic("unimplemented")
}

func (t *unimplementedTransport) SendCompleteJob(_ context.Context, _ *types.CompleteJobRequest) error {
func (t unimplementedTransport) SendCompleteJob(_ context.Context, _ *types.CompleteJobRequest) error {
panic("unimplemented")
}

func (t *unimplementedTransport) SendSyncJob(_ context.Context, _ *types.SyncJobRequest) error {
func (t unimplementedTransport) SendSyncJob(_ context.Context, _ *types.SyncJobRequest) error {
panic("unimplemented")
}

// MemoryTransport implements Transport interface for in-memory communication
type MemoryTransport struct {
unimplementedTransport
scheduler types.Scheduler
}

Expand Down
25 changes: 15 additions & 10 deletions pkg/blockbuilder/builder/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,42 @@ import (
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
)

var (
_ types.Worker = unimplementedWorker{}
_ types.Worker = &Worker{}
)

// unimplementedWorker provides default implementations for the Worker interface.
type unimplementedWorker struct{}

func (u *unimplementedWorker) GetJob(_ context.Context) (*types.Job, bool, error) {
func (u unimplementedWorker) GetJob(_ context.Context) (*types.Job, bool, error) {
panic("unimplemented")
}

func (u *unimplementedWorker) CompleteJob(_ context.Context, _ *types.Job) error {
func (u unimplementedWorker) CompleteJob(_ context.Context, _ *types.Job) error {
panic("unimplemented")
}

func (u *unimplementedWorker) SyncJob(_ context.Context, _ *types.Job) error {
func (u unimplementedWorker) SyncJob(_ context.Context, _ *types.Job) error {
panic("unimplemented")
}

// WorkerImpl is the implementation of the Worker interface.
type WorkerImpl struct {
// Worker is the implementation of the Worker interface.
type Worker struct {
unimplementedWorker
transport types.Transport
builderID string
}

// NewWorker creates a new Worker instance.
func NewWorker(builderID string, transport types.Transport) *WorkerImpl {
return &WorkerImpl{
func NewWorker(builderID string, transport types.Transport) *Worker {
return &Worker{
transport: transport,
builderID: builderID,
}
}

func (w *WorkerImpl) GetJob(ctx context.Context) (*types.Job, bool, error) {
func (w *Worker) GetJob(ctx context.Context) (*types.Job, bool, error) {
resp, err := w.transport.SendGetJobRequest(ctx, &types.GetJobRequest{
BuilderID: w.builderID,
})
Expand All @@ -46,14 +51,14 @@ func (w *WorkerImpl) GetJob(ctx context.Context) (*types.Job, bool, error) {
return resp.Job, resp.OK, nil
}

func (w *WorkerImpl) CompleteJob(ctx context.Context, job *types.Job) error {
func (w *Worker) CompleteJob(ctx context.Context, job *types.Job) error {
return w.transport.SendCompleteJob(ctx, &types.CompleteJobRequest{
BuilderID: w.builderID,
Job: job,
})
}

func (w *WorkerImpl) SyncJob(ctx context.Context, job *types.Job) error {
func (w *Worker) SyncJob(ctx context.Context, job *types.Job) error {
return w.transport.SendSyncJob(ctx, &types.SyncJobRequest{
BuilderID: w.builderID,
Job: job,
Expand Down
26 changes: 15 additions & 11 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,39 @@ import (
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
)

var (
_ types.Scheduler = unimplementedScheduler{}
_ types.Scheduler = &QueueScheduler{}
)

// unimplementedScheduler provides default implementations that panic.
type unimplementedScheduler struct{}

func (s *unimplementedScheduler) HandleGetJob(_ context.Context, _ string) (*types.Job, bool, error) {
func (s unimplementedScheduler) HandleGetJob(_ context.Context, _ string) (*types.Job, bool, error) {
panic("unimplemented")
}

func (s *unimplementedScheduler) HandleCompleteJob(_ context.Context, _ string, _ *types.Job) error {
func (s unimplementedScheduler) HandleCompleteJob(_ context.Context, _ string, _ *types.Job) error {
panic("unimplemented")
}

func (s *unimplementedScheduler) HandleSyncJob(_ context.Context, _ string, _ *types.Job) error {
func (s unimplementedScheduler) HandleSyncJob(_ context.Context, _ string, _ *types.Job) error {
panic("unimplemented")
}

// SchedulerImpl implements the Scheduler interface
type SchedulerImpl struct {
unimplementedScheduler
// QueueScheduler implements the Scheduler interface
type QueueScheduler struct {
queue *JobQueue
}

// NewScheduler creates a new scheduler instance
func NewScheduler(queue *JobQueue) *SchedulerImpl {
return &SchedulerImpl{
func NewScheduler(queue *JobQueue) *QueueScheduler {
return &QueueScheduler{
queue: queue,
}
}

func (s *SchedulerImpl) HandleGetJob(ctx context.Context, builderID string) (*types.Job, bool, error) {
func (s *QueueScheduler) HandleGetJob(ctx context.Context, builderID string) (*types.Job, bool, error) {
select {
case <-ctx.Done():
return nil, false, ctx.Err()
Expand All @@ -43,10 +47,10 @@ func (s *SchedulerImpl) HandleGetJob(ctx context.Context, builderID string) (*ty
}
}

func (s *SchedulerImpl) HandleCompleteJob(_ context.Context, builderID string, job *types.Job) error {
func (s *QueueScheduler) HandleCompleteJob(_ context.Context, builderID string, job *types.Job) error {
return s.queue.MarkComplete(job.ID, builderID)
}

func (s *SchedulerImpl) HandleSyncJob(_ context.Context, builderID string, job *types.Job) error {
func (s *QueueScheduler) HandleSyncJob(_ context.Context, builderID string, job *types.Job) error {
return s.queue.SyncJob(job.ID, builderID, job)
}
4 changes: 2 additions & 2 deletions pkg/blockbuilder/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (

type testEnv struct {
queue *JobQueue
scheduler *SchedulerImpl
scheduler *QueueScheduler
transport *builder.MemoryTransport
builder *builder.WorkerImpl
builder *builder.Worker
}

func newTestEnv(builderID string) *testEnv {
Expand Down

0 comments on commit c1ca067

Please sign in to comment.