Skip to content

Commit

Permalink
chore: move timeline client into internal
Browse files Browse the repository at this point in the history
Seperate the service implementation from the client API.
  • Loading branch information
stuartwdouglas committed Dec 24, 2024
1 parent b9dde97 commit 20ad3ab
Show file tree
Hide file tree
Showing 33 changed files with 133 additions and 137 deletions.
6 changes: 3 additions & 3 deletions backend/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
timelinepb "github.com/block/ftl/backend/protos/xyz/block/ftl/timeline/v1"
ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/block/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/block/ftl/backend/timeline"
schemapb "github.com/block/ftl/common/protos/xyz/block/ftl/schema/v1"
"github.com/block/ftl/common/schema"
frontend "github.com/block/ftl/frontend/console"
Expand All @@ -25,6 +24,7 @@ import (
"github.com/block/ftl/internal/routing"
"github.com/block/ftl/internal/rpc"
"github.com/block/ftl/internal/schema/schemaeventsource"
"github.com/block/ftl/internal/timelineclient"
)

type Config struct {
Expand All @@ -36,14 +36,14 @@ type Config struct {
type service struct {
schemaEventSource schemaeventsource.EventSource
controllerClient ftlv1connect.ControllerServiceClient
timelineClient *timeline.Client
timelineClient *timelineclient.Client
adminClient admin.Client
callClient routing.CallClient
}

var _ consolepbconnect.ConsoleServiceHandler = (*service)(nil)

func Start(ctx context.Context, config Config, eventSource schemaeventsource.EventSource, controllerClient ftlv1connect.ControllerServiceClient, timelineClient *timeline.Client, adminClient admin.Client, client routing.CallClient) error {
func Start(ctx context.Context, config Config, eventSource schemaeventsource.EventSource, controllerClient ftlv1connect.ControllerServiceClient, timelineClient *timelineclient.Client, adminClient admin.Client, client routing.CallClient) error {
logger := log.FromContext(ctx).Scope("console")
ctx = log.ContextWithLogger(ctx, logger)

Expand Down
14 changes: 7 additions & 7 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/block/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/block/ftl/backend/runner/pubsub"
"github.com/block/ftl/backend/timeline"
schemapb "github.com/block/ftl/common/protos/xyz/block/ftl/schema/v1"
"github.com/block/ftl/common/schema"
"github.com/block/ftl/common/sha256"
Expand All @@ -52,6 +51,7 @@ import (
"github.com/block/ftl/internal/rpc"
"github.com/block/ftl/internal/rpc/headers"
"github.com/block/ftl/internal/schema/schemaeventsource"
"github.com/block/ftl/internal/timelineclient"
)

// CommonConfig between the production controller and development server.
Expand Down Expand Up @@ -95,7 +95,7 @@ func Start(
config Config,
storage *artefacts.OCIArtefactService,
adminClient ftlv1connect.AdminServiceClient,
timelineClient *timeline.Client,
timelineClient *timelineclient.Client,
devel bool,
) error {
config.SetDefaults()
Expand Down Expand Up @@ -140,7 +140,7 @@ type Service struct {

tasks *scheduledtask.Scheduler
pubSub *pubsub.Service
timelineClient *timeline.Client
timelineClient *timelineclient.Client
storage *artefacts.OCIArtefactService

// Map from runnerKey.String() to client.
Expand All @@ -156,7 +156,7 @@ type Service struct {
func New(
ctx context.Context,
adminClient ftlv1connect.AdminServiceClient,
timelineClient *timeline.Client,
timelineClient *timelineclient.Client,
storage *artefacts.OCIArtefactService,
config Config,
devel bool,
Expand Down Expand Up @@ -418,7 +418,7 @@ func (s *Service) setDeploymentReplicas(ctx context.Context, key model.Deploymen
return fmt.Errorf("could not activate deployment: %w", err)
}
}
s.timelineClient.Publish(ctx, timeline.DeploymentUpdated{
s.timelineClient.Publish(ctx, timelineclient.DeploymentUpdated{
DeploymentKey: key,
MinReplicas: minReplicas,
PrevMinReplicas: deployment.MinReplicas,
Expand Down Expand Up @@ -481,7 +481,7 @@ func (s *Service) ReplaceDeploy(ctx context.Context, c *connect.Request[ftlv1.Re
}
}

s.timelineClient.Publish(ctx, timeline.DeploymentCreated{
s.timelineClient.Publish(ctx, timelineclient.DeploymentCreated{
DeploymentKey: newDeploymentKey,
Language: newDeployment.Language,
ModuleName: newDeployment.Module,
Expand Down Expand Up @@ -842,7 +842,7 @@ func (s *Service) callWithRequest(
return nil, fmt.Errorf("deployment not found for module %q", module)
}

callEvent := &timeline.Call{
callEvent := &timelineclient.Call{
DeploymentKey: deployment,
RequestKey: requestKey,
ParentRequestKey: parentKey,
Expand Down
8 changes: 4 additions & 4 deletions backend/controller/deployment_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import (

"github.com/alecthomas/types/optional"

"github.com/block/ftl/backend/timeline"
"github.com/block/ftl/internal/channels"
"github.com/block/ftl/internal/log"
"github.com/block/ftl/internal/model"
"github.com/block/ftl/internal/timelineclient"
)

var _ log.Sink = (*deploymentLogsSink)(nil)

func newDeploymentLogsSink(ctx context.Context, timelineClient *timeline.Client) *deploymentLogsSink {
func newDeploymentLogsSink(ctx context.Context, timelineClient *timelineclient.Client) *deploymentLogsSink {
sink := &deploymentLogsSink{
logQueue: make(chan log.Entry, 10000),
}
Expand All @@ -40,7 +40,7 @@ func (d *deploymentLogsSink) Log(entry log.Entry) error {
return nil
}

func (d *deploymentLogsSink) processLogs(ctx context.Context, timelineClient *timeline.Client) {
func (d *deploymentLogsSink) processLogs(ctx context.Context, timelineClient *timelineclient.Client) {
for entry := range channels.IterContext(ctx, d.logQueue) {
var deployment model.DeploymentKey
depStr, ok := entry.Attributes["deployment"]
Expand All @@ -67,7 +67,7 @@ func (d *deploymentLogsSink) processLogs(ctx context.Context, timelineClient *ti
errorStr = optional.Some(entry.Error.Error())
}

timelineClient.Publish(ctx, &timeline.Log{
timelineClient.Publish(ctx, &timelineclient.Log{
RequestKey: request,
DeploymentKey: deployment,
Time: entry.Time,
Expand Down
8 changes: 4 additions & 4 deletions backend/cron/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/block/ftl/backend/cron/observability"
ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/block/ftl/backend/timeline"
"github.com/block/ftl/common/cron"
"github.com/block/ftl/common/schema"
"github.com/block/ftl/common/slices"
Expand All @@ -20,6 +19,7 @@ import (
"github.com/block/ftl/internal/routing"
"github.com/block/ftl/internal/rpc/headers"
"github.com/block/ftl/internal/schema/schemaeventsource"
"github.com/block/ftl/internal/timelineclient"
)

type cronJob struct {
Expand All @@ -46,7 +46,7 @@ func (c cronJob) String() string {
}

// Start the cron service. Blocks until the context is cancelled.
func Start(ctx context.Context, eventSource schemaeventsource.EventSource, client routing.CallClient, timelineClient *timeline.Client) error {
func Start(ctx context.Context, eventSource schemaeventsource.EventSource, client routing.CallClient, timelineClient *timelineclient.Client) error {
logger := log.FromContext(ctx).Scope("cron")
ctx = log.ContextWithLogger(ctx, logger)
// Map of cron jobs for each module.
Expand Down Expand Up @@ -136,11 +136,11 @@ func callCronJob(ctx context.Context, verbClient routing.CallClient, cronJob cro
}
}

func scheduleNext(ctx context.Context, cronQueue []cronJob, timelineClient *timeline.Client) (time.Duration, bool) {
func scheduleNext(ctx context.Context, cronQueue []cronJob, timelineClient *timelineclient.Client) (time.Duration, bool) {
if len(cronQueue) == 0 {
return 0, false
}
timelineClient.Publish(ctx, timeline.CronScheduled{
timelineClient.Publish(ctx, timelineclient.CronScheduled{
DeploymentKey: cronQueue[0].deployment,
Verb: schema.Ref{Module: cronQueue[0].module, Name: cronQueue[0].verb.Name},
ScheduledAt: cronQueue[0].next,
Expand Down
4 changes: 2 additions & 2 deletions backend/cron/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import (
"github.com/alecthomas/types/optional"

ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/block/ftl/backend/timeline"
schemapb "github.com/block/ftl/common/protos/xyz/block/ftl/schema/v1"
"github.com/block/ftl/common/schema"
"github.com/block/ftl/internal/log"
"github.com/block/ftl/internal/model"
"github.com/block/ftl/internal/routing"
"github.com/block/ftl/internal/schema/schemaeventsource"
"github.com/block/ftl/internal/timelineclient"
)

type verbClient struct {
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestCron(t *testing.T) {
timelineEndpoint, err := url.Parse("http://localhost:8080")
assert.NoError(t, err)

timelineClient := timeline.NewClient(ctx, timelineEndpoint)
timelineClient := timelineclient.NewClient(ctx, timelineEndpoint)
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
t.Cleanup(cancel)

Expand Down
6 changes: 3 additions & 3 deletions backend/ingress/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (
"github.com/alecthomas/types/optional"

ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/block/ftl/backend/timeline"
schemapb "github.com/block/ftl/common/protos/xyz/block/ftl/schema/v1"
"github.com/block/ftl/common/schema"
"github.com/block/ftl/internal/log"
"github.com/block/ftl/internal/model"
"github.com/block/ftl/internal/routing"
"github.com/block/ftl/internal/timelineclient"
)

// handleHTTP HTTP ingress routes.
Expand All @@ -37,7 +37,7 @@ func (s *service) handleHTTP(startTime time.Time, sch *schema.Schema, requestKey

verbRef := &schemapb.Ref{Module: route.module, Name: route.verb}

ingressEvent := timeline.Ingress{
ingressEvent := timelineclient.Ingress{
RequestKey: requestKey,
StartTime: startTime,
Verb: &schema.Ref{Name: route.verb, Module: route.module},
Expand Down Expand Up @@ -154,7 +154,7 @@ func (s *service) handleHTTP(startTime time.Time, sch *schema.Schema, requestKey

func (s *service) recordIngressErrorEvent(
ctx context.Context,
ingressEvent timeline.Ingress,
ingressEvent timelineclient.Ingress,
statusCode int,
errorMsg string,
) {
Expand Down
4 changes: 2 additions & 2 deletions backend/ingress/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (
"github.com/alecthomas/types/optional"

ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/block/ftl/backend/timeline"
"github.com/block/ftl/common/encoding"
"github.com/block/ftl/common/schema"
"github.com/block/ftl/internal/log"
"github.com/block/ftl/internal/model"
"github.com/block/ftl/internal/schema/schemaeventsource"
"github.com/block/ftl/internal/timelineclient"
)

func TestIngress(t *testing.T) {
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestIngress(t *testing.T) {
svc := &service{
view: syncView(ctx, schemaeventsource.NewUnattached()),
client: fv,
timelineClient: timeline.NewClient(ctx, timelineEndpoint),
timelineClient: timelineclient.NewClient(ctx, timelineEndpoint),
}
svc.handleHTTP(time.Now(), sch, reqKey, routes, rec, req, fv)
result := rec.Result()
Expand Down
6 changes: 3 additions & 3 deletions backend/ingress/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/alecthomas/types/optional"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

"github.com/block/ftl/backend/timeline"
schemapb "github.com/block/ftl/common/protos/xyz/block/ftl/schema/v1"
"github.com/block/ftl/common/slices"
"github.com/block/ftl/internal/cors"
Expand All @@ -21,6 +20,7 @@ import (
"github.com/block/ftl/internal/model"
"github.com/block/ftl/internal/routing"
"github.com/block/ftl/internal/schema/schemaeventsource"
"github.com/block/ftl/internal/timelineclient"
)

type Config struct {
Expand All @@ -40,11 +40,11 @@ type service struct {
// Complete schema synchronised from the database.
view *atomic.Value[materialisedView]
client routing.CallClient
timelineClient *timeline.Client
timelineClient *timelineclient.Client
}

// Start the HTTP ingress service. Blocks until the context is cancelled.
func Start(ctx context.Context, config Config, schemaEventSource schemaeventsource.EventSource, client routing.CallClient, timelineClient *timeline.Client) error {
func Start(ctx context.Context, config Config, schemaEventSource schemaeventsource.EventSource, client routing.CallClient, timelineClient *timelineclient.Client) error {
logger := log.FromContext(ctx).Scope("http-ingress")
ctx = log.ContextWithLogger(ctx, logger)
svc := &service{
Expand Down
8 changes: 4 additions & 4 deletions backend/runner/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import (
ftlleaseconnect "github.com/block/ftl/backend/protos/xyz/block/ftl/lease/v1/leasepbconnect"
ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/block/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/block/ftl/backend/timeline"
"github.com/block/ftl/common/schema"
"github.com/block/ftl/internal/log"
"github.com/block/ftl/internal/model"
"github.com/block/ftl/internal/rpc"
"github.com/block/ftl/internal/rpc/headers"
"github.com/block/ftl/internal/timelineclient"
)

var _ ftlv1connect.VerbServiceHandler = &Service{}
Expand All @@ -37,10 +37,10 @@ type Service struct {
controllerDeploymentService ftldeploymentconnect.DeploymentServiceClient
controllerLeaseService ftlleaseconnect.LeaseServiceClient
moduleVerbService *xsync.MapOf[string, moduleVerbService]
timelineClient *timeline.Client
timelineClient *timelineclient.Client
}

func New(controllerModuleService ftldeploymentconnect.DeploymentServiceClient, leaseClient ftlleaseconnect.LeaseServiceClient, timelineClient *timeline.Client) *Service {
func New(controllerModuleService ftldeploymentconnect.DeploymentServiceClient, leaseClient ftlleaseconnect.LeaseServiceClient, timelineClient *timelineclient.Client) *Service {
proxy := &Service{
controllerDeploymentService: controllerModuleService,
controllerLeaseService: leaseClient,
Expand Down Expand Up @@ -142,7 +142,7 @@ func (r *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallReque
headers.SetRequestKey(req.Header(), requestKey)
}

callEvent := &timeline.Call{
callEvent := &timelineclient.Call{
DeploymentKey: verbService.deployment,
RequestKey: requestKey,
StartTime: start,
Expand Down
10 changes: 5 additions & 5 deletions backend/runner/pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import (

"github.com/block/ftl/backend/controller/observability"
ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/block/ftl/backend/timeline"
"github.com/block/ftl/common/encoding"
"github.com/block/ftl/common/schema"
"github.com/block/ftl/common/slices"
"github.com/block/ftl/internal/channels"
"github.com/block/ftl/internal/log"
"github.com/block/ftl/internal/model"
"github.com/block/ftl/internal/timelineclient"
)

type consumer struct {
Expand All @@ -35,11 +35,11 @@ type consumer struct {
deadLetterPublisher optional.Option[*publisher]

verbClient VerbClient
timelineClient *timeline.Client
timelineClient *timelineclient.Client
}

func newConsumer(moduleName string, verb *schema.Verb, subscriber *schema.MetadataSubscriber, deployment model.DeploymentKey,
deadLetterPublisher optional.Option[*publisher], verbClient VerbClient, timelineClient *timeline.Client) (*consumer, error) {
deadLetterPublisher optional.Option[*publisher], verbClient VerbClient, timelineClient *timelineclient.Client) (*consumer, error) {
if verb.Runtime == nil {
return nil, fmt.Errorf("subscription %s has no runtime", verb.Name)
}
Expand Down Expand Up @@ -211,7 +211,7 @@ func (c *consumer) call(ctx context.Context, body []byte, partition, offset int)
Verb: schema.RefKey{Module: c.moduleName, Name: c.verb.Name}.ToProto(),
Body: body,
}
consumeEvent := timeline.PubSubConsume{
consumeEvent := timelineclient.PubSubConsume{
DeploymentKey: c.deployment,
RequestKey: optional.Some(requestKey.String()),
Time: time.Now(),
Expand All @@ -222,7 +222,7 @@ func (c *consumer) call(ctx context.Context, body []byte, partition, offset int)
}
defer c.timelineClient.Publish(ctx, consumeEvent)

callEvent := &timeline.Call{
callEvent := &timelineclient.Call{
DeploymentKey: c.deployment,
RequestKey: requestKey,
StartTime: start,
Expand Down
Loading

0 comments on commit 20ad3ab

Please sign in to comment.