Skip to content

Commit

Permalink
Add support for running remote plugins
Browse files Browse the repository at this point in the history
This PR adds support for invoking of remote Wasm plugins locally. Remote
plugins Refs are resolved using the buf.lock file in the input's v2
workspace.
  • Loading branch information
emcfarlane committed Dec 10, 2024
1 parent 1dd3a77 commit d9991dc
Show file tree
Hide file tree
Showing 27 changed files with 739 additions and 146 deletions.
115 changes: 115 additions & 0 deletions private/buf/bufctl/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/bufbuild/buf/private/buf/bufwkt/bufwktstore"
"github.com/bufbuild/buf/private/buf/bufworkspace"
"github.com/bufbuild/buf/private/bufpkg/bufanalysis"
"github.com/bufbuild/buf/private/bufpkg/bufcheck"
"github.com/bufbuild/buf/private/bufpkg/bufconfig"
"github.com/bufbuild/buf/private/bufpkg/bufimage"
"github.com/bufbuild/buf/private/bufpkg/bufimage/bufimageutil"
Expand All @@ -48,6 +49,7 @@ import (
"github.com/bufbuild/buf/private/pkg/slicesext"
"github.com/bufbuild/buf/private/pkg/storage/storageos"
"github.com/bufbuild/buf/private/pkg/syserror"
"github.com/bufbuild/buf/private/pkg/wasm"
"github.com/bufbuild/protovalidate-go"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -128,6 +130,17 @@ type Controller interface {
defaultMessageEncoding buffetch.MessageEncoding,
options ...FunctionOption,
) error
// GetCheckRunnerProvider gets a CheckRunnerProvider for the given input.
//
// The returned RunnerProvider will be able to run lint and breaking checks
// using the PluginConfigs from the input. The input provided will resolve
// the PluginKeys from the related buf.lock file.
GetCheckRunnerProvider(
ctx context.Context,
input string,
wasmRuntime wasm.Runtime,
options ...FunctionOption,
) (bufcheck.RunnerProvider, error)
}

func NewController(
Expand Down Expand Up @@ -243,6 +256,7 @@ func newController(
graphProvider,
moduleDataProvider,
commitProvider,
pluginKeyProvider,
)
controller.workspaceDepManagerProvider = bufworkspace.NewWorkspaceDepManagerProvider(
logger,
Expand Down Expand Up @@ -706,6 +720,32 @@ func (c *controller) PutMessage(
return errors.Join(err, writeCloser.Close())
}

func (c *controller) GetCheckRunnerProvider(
ctx context.Context,
input string,
wasmRuntime wasm.Runtime,
options ...FunctionOption,
) (_ bufcheck.RunnerProvider, retErr error) {
defer c.handleFileAnnotationSetRetError(&retErr)
functionOptions := newFunctionOptions(c)
for _, option := range options {
option(functionOptions)
}
ref, err := c.buffetchRefParser.GetRef(ctx, input)
if err != nil {
return nil, err
}
pluginKeyProvider, err := c.getPluginKeyProviderForRef(ctx, ref, functionOptions)
if err != nil {
return nil, err
}
return bufcheck.NewLocalRunnerProvider(
wasmRuntime,
pluginKeyProvider,
c.pluginDataProvider,
), nil
}

func (c *controller) getImage(
ctx context.Context,
input string,
Expand Down Expand Up @@ -1159,6 +1199,81 @@ Declare %q in the deps key in your buf.yaml.`,
return nil
}

// getPluginKeyProviderForRef create a new PluginKeyProvider for the Ref.
//
// Remote plugins Refs are resolved to PluginKeys from the workspace buf.lock file.
// If the Ref is a MessageRef, we use the current directory buf.lock file.
func (c *controller) getPluginKeyProviderForRef(
ctx context.Context,
ref buffetch.Ref,
functionOptions *functionOptions,
) (_ bufplugin.PluginKeyProvider, retErr error) {
switch t := ref.(type) {
case buffetch.ProtoFileRef:
workspace, err := c.getWorkspaceForProtoFileRef(ctx, t, functionOptions)
if err != nil {
return nil, err
}
return bufplugin.NewStaticPluginKeyProvider(workspace.RemotePluginKeys())
case buffetch.SourceRef:
workspace, err := c.getWorkspaceForSourceRef(ctx, t, functionOptions)
if err != nil {
return nil, err
}
return bufplugin.NewStaticPluginKeyProvider(workspace.RemotePluginKeys())
case buffetch.ModuleRef:
workspace, err := c.getWorkspaceForModuleRef(ctx, t, functionOptions)
if err != nil {
return nil, err
}
return bufplugin.NewStaticPluginKeyProvider(workspace.RemotePluginKeys())
case buffetch.MessageRef:
bucket, err := c.storageosProvider.NewReadWriteBucket(
".",
storageos.ReadWriteBucketWithSymlinksIfSupported(),
)
if err != nil {
return nil, err
}
bufYAMLFile, err := bufconfig.GetBufYAMLFileForPrefixOrOverride(
ctx,
bucket,
".",
functionOptions.configOverride,
)
if err != nil {
if !errors.Is(err, fs.ErrNotExist) {
return nil, err
}
// We did not find a buf.yaml in our current directory,
// and there was no config override.
return bufplugin.NopPluginKeyProvider, nil
}
var pluginKeys []bufplugin.PluginKey
if bufYAMLFile.FileVersion() == bufconfig.FileVersionV2 {
bufLockFile, err := bufconfig.GetBufLockFileForPrefix(
ctx,
bucket,
// buf.lock files live next to the buf.yaml
".",
)
if err != nil {
if !errors.Is(err, fs.ErrNotExist) {
return nil, err
}
// We did not find a buf.lock in our current directory.
// Remote plugins are not available.
return bufplugin.NopPluginKeyProvider, nil
}
pluginKeys = bufLockFile.RemotePluginKeys()
}
return bufplugin.NewStaticPluginKeyProvider(pluginKeys)
default:
// This is a system error.
return nil, syserror.Newf("invalid Ref: %T", ref)
}
}

// handleFileAnnotationSetError will attempt to handle the error as a FileAnnotationSet, and if so, print
// the FileAnnotationSet to the writer with the given error format while returning ErrFileAnnotation.
//
Expand Down
8 changes: 4 additions & 4 deletions private/buf/buflsp/buflsp.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
"sync/atomic"

"github.com/bufbuild/buf/private/buf/bufctl"
"github.com/bufbuild/buf/private/bufpkg/bufcheck"
"github.com/bufbuild/buf/private/pkg/app/appext"
"github.com/bufbuild/buf/private/pkg/slogext"
"github.com/bufbuild/buf/private/pkg/storage"
"github.com/bufbuild/buf/private/pkg/storage/storageos"
"github.com/bufbuild/buf/private/pkg/wasm"
"go.lsp.dev/jsonrpc2"
"go.lsp.dev/protocol"
"go.uber.org/zap"
Expand All @@ -43,7 +43,7 @@ func Serve(
wktBucket storage.ReadBucket,
container appext.Container,
controller bufctl.Controller,
checkClient bufcheck.Client,
wasmRuntime wasm.Runtime,
stream jsonrpc2.Stream,
) (jsonrpc2.Conn, error) {
// The LSP protocol deals with absolute filesystem paths. This requires us to
Expand All @@ -68,7 +68,7 @@ func Serve(
container: container,
logger: container.Logger(),
controller: controller,
checkClient: checkClient,
wasmRuntime: wasmRuntime,
rootBucket: bucket,
wktBucket: wktBucket,
}
Expand Down Expand Up @@ -96,7 +96,7 @@ type lsp struct {

logger *slog.Logger
controller bufctl.Controller
checkClient bufcheck.Client
wasmRuntime wasm.Runtime
rootBucket storage.ReadBucket
fileManager *fileManager

Expand Down
39 changes: 34 additions & 5 deletions private/buf/buflsp/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ type file struct {
version int32
hasText bool // Whether this file has ever had text read into it.

workspace bufworkspace.Workspace
module bufmodule.Module
workspace bufworkspace.Workspace
module bufmodule.Module
checkClient bufcheck.Client

againstStrategy againstStrategy
againstGitRef string
Expand Down Expand Up @@ -371,6 +372,24 @@ func (f *file) FindModule(ctx context.Context) {
return
}

// Get the check runner provider for this file. The client is scoped to
// the input Buf lock file, so we need to get the check runner provider
// for the workspace that contains this file.
checkRunnerProvider, err := f.lsp.controller.GetCheckRunnerProvider(ctx, f.uri.Filename(), f.lsp.wasmRuntime)
if err != nil {
f.lsp.logger.Warn("could not get check runner provider", slogext.ErrorAttr(err))
return
}
checkClient, err := bufcheck.NewClient(
f.lsp.logger,
checkRunnerProvider,
bufcheck.ClientWithStderr(f.lsp.container.Stderr()),
)
if err != nil {
f.lsp.logger.Warn("could not create check client", slogext.ErrorAttr(err))
return
}

// Figure out which module this file belongs to.
var module bufmodule.Module
for _, mod := range workspace.Modules() {
Expand Down Expand Up @@ -398,6 +417,7 @@ func (f *file) FindModule(ctx context.Context) {

f.workspace = workspace
f.module = module
f.checkClient = checkClient
}

// IndexImports finds URIs for all of the files imported by this file.
Expand Down Expand Up @@ -641,9 +661,14 @@ func (f *file) RunLints(ctx context.Context) bool {
f.lsp.logger.Warn(fmt.Sprintf("could not find image for %q", f.uri))
return false
}

if f.checkClient == nil {
f.lsp.logger.Warn(fmt.Sprintf("could not find check client for %q", f.uri))
return false
}
f.lsp.logger.Debug(fmt.Sprintf("running lint for %q in %v", f.uri, f.module.FullName()))
return f.appendLintErrors("buf lint", f.lsp.checkClient.Lint(

f.lsp.logger.Debug(fmt.Sprintf("running lint for %q in %v %d plugin configs", f.uri, f.module.FullName(), len(f.workspace.PluginConfigs())))
return f.appendLintErrors("buf lint", f.checkClient.Lint(
ctx,
f.workspace.GetLintConfigForOpaqueID(f.module.OpaqueID()),
f.image,
Expand All @@ -664,9 +689,13 @@ func (f *file) RunBreaking(ctx context.Context) bool {
f.lsp.logger.Warn(fmt.Sprintf("could not find --against image for %q", f.uri))
return false
}
if f.checkClient == nil {
f.lsp.logger.Warn(fmt.Sprintf("could not find check client for %q", f.uri))
return false
}

f.lsp.logger.Debug(fmt.Sprintf("running breaking for %q in %v", f.uri, f.module.FullName()))
return f.appendLintErrors("buf breaking", f.lsp.checkClient.Breaking(
return f.appendLintErrors("buf breaking", f.checkClient.Breaking(
ctx,
f.workspace.GetBreakingConfigForOpaqueID(f.module.OpaqueID()),
f.image,
Expand Down
7 changes: 6 additions & 1 deletion private/buf/bufmigrate/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/bufbuild/buf/private/bufpkg/bufconfig"
"github.com/bufbuild/buf/private/bufpkg/bufmodule"
"github.com/bufbuild/buf/private/bufpkg/bufparse"
"github.com/bufbuild/buf/private/bufpkg/bufplugin"
"github.com/bufbuild/buf/private/pkg/normalpath"
"github.com/bufbuild/buf/private/pkg/slicesext"
"github.com/bufbuild/buf/private/pkg/storage"
Expand Down Expand Up @@ -695,7 +696,11 @@ func equivalentCheckConfigInV2(
) (bufconfig.CheckConfig, error) {
// No need for custom lint/breaking plugins since there's no plugins to migrate from <=v1.
// TODO: If we ever need v3, then we will have to deal with this.
client, err := bufcheck.NewClient(logger, bufcheck.NewRunnerProvider(wasm.UnimplementedRuntime))
client, err := bufcheck.NewClient(logger, bufcheck.NewLocalRunnerProvider(
wasm.UnimplementedRuntime,
bufplugin.NopPluginKeyProvider,
bufplugin.NopPluginDataProvider,
))
if err != nil {
return nil, err
}
Expand Down
16 changes: 15 additions & 1 deletion private/buf/bufworkspace/workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/bufbuild/buf/private/bufpkg/bufconfig"
"github.com/bufbuild/buf/private/bufpkg/bufmodule"
"github.com/bufbuild/buf/private/bufpkg/bufparse"
"github.com/bufbuild/buf/private/bufpkg/bufplugin"
"github.com/bufbuild/buf/private/pkg/slicesext"
)

Expand Down Expand Up @@ -72,8 +73,14 @@ type Workspace interface {
// detector ignoring these configs anyways.
GetBreakingConfigForOpaqueID(opaqueID string) bufconfig.BreakingConfig
// PluginConfigs gets the configured PluginConfigs of the Workspace.
//
// These come from buf.yaml files.
PluginConfigs() []bufconfig.PluginConfig
// ConfiguredDepModuleRefs returns the configured dependencies of the Workspace as ModuleRefs.
// RemotePluginKeys gets the remote PluginKeys of the Workspace.
//
// These come from buf.lock files.
RemotePluginKeys() []bufplugin.PluginKey
// ConfiguredDepModuleRefs returns the configured dependencies of the Workspace as Refs.
//
// These come from buf.yaml files.
//
Expand Down Expand Up @@ -105,6 +112,7 @@ type workspace struct {
opaqueIDToLintConfig map[string]bufconfig.LintConfig
opaqueIDToBreakingConfig map[string]bufconfig.BreakingConfig
pluginConfigs []bufconfig.PluginConfig
remotePluginKeys []bufplugin.PluginKey
configuredDepModuleRefs []bufparse.Ref

// If true, the workspace was created from v2 buf.yamls.
Expand All @@ -117,6 +125,7 @@ func newWorkspace(
opaqueIDToLintConfig map[string]bufconfig.LintConfig,
opaqueIDToBreakingConfig map[string]bufconfig.BreakingConfig,
pluginConfigs []bufconfig.PluginConfig,
remotePluginKeys []bufplugin.PluginKey,
configuredDepModuleRefs []bufparse.Ref,
isV2 bool,
) *workspace {
Expand All @@ -125,6 +134,7 @@ func newWorkspace(
opaqueIDToLintConfig: opaqueIDToLintConfig,
opaqueIDToBreakingConfig: opaqueIDToBreakingConfig,
pluginConfigs: pluginConfigs,
remotePluginKeys: remotePluginKeys,
configuredDepModuleRefs: configuredDepModuleRefs,
isV2: isV2,
}
Expand All @@ -142,6 +152,10 @@ func (w *workspace) PluginConfigs() []bufconfig.PluginConfig {
return slicesext.Copy(w.pluginConfigs)
}

func (w *workspace) RemotePluginKeys() []bufplugin.PluginKey {
return slicesext.Copy(w.remotePluginKeys)
}

func (w *workspace) ConfiguredDepModuleRefs() []bufparse.Ref {
return slicesext.Copy(w.configuredDepModuleRefs)
}
Expand Down
Loading

0 comments on commit d9991dc

Please sign in to comment.