Skip to content

Commit

Permalink
Allow syncing via the connect experimental transport.
Browse files Browse the repository at this point in the history
  • Loading branch information
morgabra committed Jan 3, 2025
1 parent 7f58ddc commit 4b73549
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 4 deletions.
2 changes: 2 additions & 0 deletions internal/connector/connector_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
"golang.org/x/net/http2/h2c"

"golang.org/x/net/http2"
Expand Down Expand Up @@ -58,6 +59,7 @@ func NewWrapperConnect(ctx context.Context, server interface{}, optfunc ...Optio
}

func (cw *wrapperConnect) Run(ctx context.Context, serverCfg *connectorwrapperV1.ServerConfig) error {
ctxzap.Extract(ctx).Warn("connect enabled!")
rl, err := ratelimit2.NewLimiter(ctx, cw.now, serverCfg.RateLimiterConfig)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion internal/connector/connector_connect_shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ func NewConnectorConnectShim(
ticketingEnabled bool,
) *connectorConnectShim {
s := &connectorConnectShim{
server: server,
server: server,
rateLimiter: rateLimiter,
}

if ticketingEnabled {
Expand Down
4 changes: 4 additions & 0 deletions pkg/cli/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ func MakeMainCommand(
opts = append(opts, connectorrunner.WithTempDir(v.GetString("c1z-temp-dir")))
}

if v.GetBool("use-connect-experiment") {
opts = append(opts, connectorrunner.WithConnectExperiment())
}

// NOTE(shackra): top-most in the execution flow for connectors
r, err := connectorrunner.NewConnectorRunner(runCtx, c, opts...)
if err != nil {
Expand Down
27 changes: 24 additions & 3 deletions pkg/connectorrunner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ type runnerConfig struct {
listTicketSchemasConfig *listTicketSchemasConfig
getTicketConfig *getTicketConfig
skipFullSync bool

useConnect bool
}

// WithRateLimiterConfig sets the RateLimiterConfig for a runner.
Expand All @@ -312,6 +314,14 @@ func WithRateLimiterConfig(cfg *ratelimitV1.RateLimiterConfig) Option {
}
}

// WithConnectExperiment uses the Connect transport for connector communication.
func WithConnectExperiment() Option {
return func(ctx context.Context, w *runnerConfig) error {
w.useConnect = true
return nil
}
}

// WithExternalLimiter configures the connector to use an external rate limiter.
// The `opts` map is injected into the environment in order for the service to be configured.
func WithExternalLimiter(address string, opts map[string]string) Option {
Expand Down Expand Up @@ -556,9 +566,20 @@ func NewConnectorRunner(ctx context.Context, c types.ConnectorServer, opts ...Op
wrapperOpts = append(wrapperOpts, connector.WithFullSyncDisabled())
}

cw, err := connector.NewWrapper(ctx, c, wrapperOpts...)
if err != nil {
return nil, err
var cw types.ClientWrapper
var err error
if cfg.useConnect {
// FIXME(morgabra): Make this a top level option/select a port from the OS in the wrapper if unset.
wrapperOpts = append(wrapperOpts, connector.WithConnectListenPort(8000))
cw, err = connector.NewWrapperConnect(ctx, c, wrapperOpts...)
if err != nil {
return nil, err
}
} else {
cw, err = connector.NewWrapper(ctx, c, wrapperOpts...)
if err != nil {
return nil, err
}
}

runner.cw = cw
Expand Down
2 changes: 2 additions & 0 deletions pkg/field/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
ticketTemplatePathField = StringField("ticket-template-path", WithHidden(true), WithDescription("A JSON file describing the ticket to create"), WithPersistent(true))
logLevelField = StringField("log-level", WithDefaultValue("info"), WithDescription("The log level: debug, info, warn, error"), WithPersistent(true))
skipFullSync = BoolField("skip-full-sync", WithDescription("This must be set to skip a full sync"), WithPersistent(true))
useConnectExperimental = BoolField("use-connect-experiment", WithDescription("This must be set to enable the experimental connect transport"), WithHidden(true), WithPersistent(true))
)

// DefaultFields list the default fields expected in every single connector.
Expand Down Expand Up @@ -62,6 +63,7 @@ var DefaultFields = []SchemaField{
ticketTemplatePathField,
logLevelField,
skipFullSync,
useConnectExperimental,
}

func IsFieldAmongDefaultList(f SchemaField) bool {
Expand Down

0 comments on commit 4b73549

Please sign in to comment.