Skip to content

Commit

Permalink
feat: pubsub retries
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Jun 6, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 112c6a9 commit 5b15fbc
Showing 10 changed files with 182 additions and 42 deletions.
10 changes: 10 additions & 0 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
@@ -596,12 +596,22 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem
Module: moduleSchema.Name,
Name: v.Name,
}
retryParams := schema.RetryParams{}
if retryMd, ok := slices.FindVariant[*schema.MetadataRetry](v.Metadata); ok {
retryParams, err = retryMd.RetryParams()
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("could not parse retry parameters for %q: %w", v.Name, err)
}
}
err := tx.InsertSubscriber(ctx, sql.InsertSubscriberParams{
Key: model.NewSubscriberKey(moduleSchema.Name, s.Name, v.Name),
Module: moduleSchema.Name,
SubscriptionName: s.Name,
Deployment: deploymentKey,
Sink: sinkRef,
RetryAttempts: int32(retryParams.Count),
Backoff: retryParams.MinBackoff,
MaxBackoff: retryParams.MaxBackoff,
})
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("could not insert subscriber: %w", translatePGError(err))
11 changes: 7 additions & 4 deletions backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
@@ -71,7 +71,7 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t
continue
}

sink, err := tx.db.GetRandomSubscriberSink(ctx, subscription.Key)
subscriber, err := tx.db.GetRandomSubscriber(ctx, subscription.Key)
if err != nil {
return 0, fmt.Errorf("failed to get lock on subscription: %w", translatePGError(err))
}
@@ -88,9 +88,12 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t
},
}
_, err = tx.db.CreateAsyncCall(ctx, sql.CreateAsyncCallParams{
Verb: sink,
Origin: origin.String(),
Request: nextCursor.Payload,
Verb: subscriber.Sink,
Origin: origin.String(),
Request: nextCursor.Payload,
RemainingAttempts: subscriber.RetryAttempts,
Backoff: subscriber.Backoff,
MaxBackoff: subscriber.MaxBackoff,
})
if err != nil {
return 0, fmt.Errorf("failed to schedule async task for subscription: %w", translatePGError(err))
3 changes: 3 additions & 0 deletions backend/controller/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 19 additions & 4 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
@@ -646,7 +646,15 @@ UPDATE SET
RETURNING id;

-- name: InsertSubscriber :exec
INSERT INTO topic_subscribers (key, topic_subscriptions_id, deployment_id, sink)
INSERT INTO topic_subscribers (
key,
topic_subscriptions_id,
deployment_id,
sink,
retry_attempts,
backoff,
max_backoff
)
VALUES (
sqlc.arg('key')::subscriber_key,
(
@@ -657,7 +665,11 @@ VALUES (
AND topic_subscriptions.name = sqlc.arg('subscription_name')::TEXT
),
(SELECT id FROM deployments WHERE key = sqlc.arg('deployment')::deployment_key),
sqlc.arg('sink'));
sqlc.arg('sink'),
sqlc.arg('retry_attempts'),
sqlc.arg('backoff')::interval,
sqlc.arg('max_backoff')::interval
);

-- name: PublishEventForTopic :exec
INSERT INTO topic_events (
@@ -713,9 +725,12 @@ WHERE topics.key = sqlc.arg('topic')::topic_key
ORDER BY events.created_at, events.id
LIMIT 1;

-- name: GetRandomSubscriberSink :one
-- name: GetRandomSubscriber :one
SELECT
subscribers.sink as sink
subscribers.sink as sink,
subscribers.retry_attempts as retry_attempts,
subscribers.backoff as backoff,
subscribers.max_backoff as max_backoff
FROM topic_subscribers as subscribers
JOIN deployments ON subscribers.deployment_id = deployments.id
JOIN topic_subscriptions ON subscribers.topic_subscriptions_id = topic_subscriptions.id
51 changes: 42 additions & 9 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 10 additions & 7 deletions backend/controller/sql/schema/001_init.sql
Original file line number Diff line number Diff line change
@@ -416,15 +416,18 @@ CREATE DOMAIN subscriber_key AS TEXT;
--
-- A subscriber is a 1:1 mapping between a subscription and a sink.
CREATE TABLE topic_subscribers (
id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
"key" subscriber_key UNIQUE NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'),

id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
"key" subscriber_key UNIQUE NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'),
topic_subscriptions_id BIGINT NOT NULL REFERENCES topic_subscriptions(id),

deployment_id BIGINT NOT NULL REFERENCES deployments(id) ON DELETE CASCADE,
deployment_id BIGINT NOT NULL REFERENCES deployments(id) ON DELETE CASCADE,
-- Name of the verb to call on the deployment.
sink schema_ref NOT NULL
sink schema_ref NOT NULL,

-- retry options
retry_attempts INT NOT NULL,
backoff INTERVAL NOT NULL,
max_backoff INTERVAL NOT NULL
);

CREATE INDEX topic_subscribers_subscription_idx ON topic_subscribers (topic_subscriptions_id);
33 changes: 17 additions & 16 deletions backend/schema/validate.go
Original file line number Diff line number Diff line change
@@ -550,24 +550,25 @@ func validateVerbMetadata(scopes Scopes, module *Module, n *Verb) (merr []error)
}
case *MetadataRetry:
// Only allow retries on FSM transitions for now
fsms := islices.Filter(module.Decls, func(d Decl) bool {
fsm, ok := d.(*FSM)
if !ok {
return false
}
starts := islices.Filter(fsm.Start, func(ref *Ref) bool {
return ref.Name == n.Name
})
if len(starts) > 0 {
return true
_, isPartOfFSM := islices.Find(module.Decls, func(d Decl) bool {
if d, ok := d.(*FSM); ok {
// check if this verb part of the FSM
if _, isStart := islices.Find(d.Start, func(ref *Ref) bool {
return ref.Name == n.Name
}); isStart {
return true
}
if _, isTransition := islices.Find(d.Transitions, func(t *FSMTransition) bool {
return t.To.Name == n.Name
}); isTransition {
return true
}
}
transitions := islices.Filter(fsm.Transitions, func(t *FSMTransition) bool {
return t.To.Name == n.Name
})
return len(transitions) > 0
return false
})
if len(fsms) == 0 {
merr = append(merr, errorf(md, "verb %s: retries can only be added to FSM transitions", n.Name))
_, isSubscriber := islices.FindVariant[*MetadataSubscriber](n.Metadata)
if !isPartOfFSM && !isSubscriber {
merr = append(merr, errorf(md, `verb %s: retries can only be added to subscribers or FSM transitions`, n.Name))
return
}

20 changes: 19 additions & 1 deletion go-runtime/ftl/testdata/go/mapper/go.mod
Original file line number Diff line number Diff line change
@@ -4,28 +4,46 @@ go 1.22.2

toolchain go1.22.3

require github.com/TBD54566975/ftl v1.1.0
require (
github.com/TBD54566975/ftl v1.1.0
github.com/alecthomas/assert/v2 v2.10.0
)

require (
connectrpc.com/connect v1.16.1 // indirect
connectrpc.com/grpcreflect v1.2.0 // indirect
connectrpc.com/otelconnect v0.7.0 // indirect
github.com/BurntSushi/toml v1.4.0 // indirect
github.com/TBD54566975/scaffolder v1.0.0 // indirect
github.com/alecthomas/concurrency v0.0.2 // indirect
github.com/alecthomas/participle/v2 v2.1.1 // indirect
github.com/alecthomas/repr v0.4.0 // indirect
github.com/alecthomas/types v0.16.0 // indirect
github.com/alessio/shellescape v1.4.2 // indirect
github.com/amacneil/dbmate/v2 v2.16.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.27.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.7 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.7 // indirect
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.29.1 // indirect
github.com/aws/smithy-go v1.20.2 // indirect
github.com/danieljoos/wincred v1.2.0 // indirect
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hexops/gotextdiff v1.0.3 // indirect
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.6.0 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/puzpuzpuz/xsync/v3 v3.1.0 // indirect
github.com/swaggest/jsonschema-go v0.3.70 // indirect
github.com/swaggest/refl v1.3.0 // indirect
github.com/zalando/go-keyring v0.2.4 // indirect
Loading

0 comments on commit 5b15fbc

Please sign in to comment.