From 6eaf4dc5ff59cc843e76bf322c6634dd880b3444 Mon Sep 17 00:00:00 2001 From: Brooks Townsend Date: Mon, 22 Jul 2024 10:49:51 -0400 Subject: [PATCH 1/2] refactor(*): separate functionality into files Signed-off-by: Brooks Townsend --- config.go | 60 ++++++++++++++++++++++++++++++++++++ provider.go | 54 +++++++++++++------------------- tracing.go | 89 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 171 insertions(+), 32 deletions(-) create mode 100644 config.go create mode 100644 tracing.go diff --git a/config.go b/config.go new file mode 100644 index 0000000..69cce93 --- /dev/null +++ b/config.go @@ -0,0 +1,60 @@ +package main + +import ( + "errors" + + "github.com/wasmCloud/provider-sdk-go" +) + +type CouchbaseConnectionArgs struct { + Username string + Password string + BucketName string + Host string + ConnectionString string + ScopeName string + CollectionName string +} + +// Construct Couchbase connection args from config and secrets +func validateCouchbaseConfig(config map[string]string, secrets map[string]provider.SecretValue) (CouchbaseConnectionArgs, error) { + connectionArgs := CouchbaseConnectionArgs{} + if username, ok := config["username"]; !ok || username == "" { + return connectionArgs, errors.New("username is required") + } else { + connectionArgs.Username = username + } + if bucketName, ok := config["bucket_name"]; !ok || bucketName == "" { + return connectionArgs, errors.New("bucket_name is required") + } else { + connectionArgs.BucketName = bucketName + } + if host, ok := config["host"]; !ok || host == "" { + return connectionArgs, errors.New("host is required") + } else { + connectionArgs.Host = host + } + if connectionString, ok := config["connection_string"]; !ok || connectionString == "" { + return connectionArgs, errors.New("connection_string is required") + } else { + connectionArgs.ConnectionString = connectionString + } + if scopeName, ok := config["scope_name"]; !ok || scopeName == "" { + return connectionArgs, errors.New("scope_name is required") + } else { + connectionArgs.ScopeName = scopeName + } + if collectionName, ok := config["collection_name"]; !ok || collectionName == "" { + return connectionArgs, errors.New("collection_name is required") + } else { + connectionArgs.CollectionName = collectionName + } + + password := secrets["password"].StringValue() + if password == "" { + return connectionArgs, errors.New("password is required") + } else { + connectionArgs.Password = password + } + return connectionArgs, nil +} diff --git a/provider.go b/provider.go index 11db381..fe82690 100644 --- a/provider.go +++ b/provider.go @@ -3,7 +3,6 @@ package main import ( "context" "errors" - "time" "github.com/couchbase/gocb/v2" sdk "github.com/wasmCloud/provider-sdk-go" @@ -45,6 +44,10 @@ func (h *Handler) Get(ctx context.Context, bucket string, key string) (*wrpc.Res defer span.End() h.Logger.Debug("received request to get value", "key", key) collection, err := h.getCollectionFromContext(ctx) + if err != nil { + h.Logger.Error("unable to get collection from context", "error", err) + return wrpc.Err[[]uint8](*errNoSuchStore), err + } res, err := collection.Get(key, &gocb.GetOptions{Transcoder: gocb.NewRawJSONTranscoder()}) if err != nil { h.Logger.Error("unable to get value in store", "key", key, "error", err) @@ -64,13 +67,13 @@ func (h *Handler) getCollectionFromContext(ctx context.Context) (*gocb.Collectio header, ok := wrpcnats.HeaderFromContext(ctx) if !ok { h.Logger.Warn("Received request from unknown origin") - return nil, errors.New("Error fetching header from wrpc context") + return nil, errors.New("error fetching header from wrpc context") } // Only allow requests from a linked component sourceId := header.Get("source-id") if h.linkedFrom[sourceId] == nil { h.Logger.Warn("Received request from unlinked source", "sourceId", sourceId) - return nil, errors.New("Received request from unlinked source") + return nil, errors.New("received request from unlinked source") } return h.clusterConnections[sourceId], nil } @@ -80,6 +83,10 @@ func (h *Handler) Set(ctx context.Context, bucket string, key string, value []ui defer span.End() h.Logger.Debug("received request to set value", "key", key) collection, err := h.getCollectionFromContext(ctx) + if err != nil { + h.Logger.Error("unable to get collection from context", "error", err) + return wrpc.Err[struct{}](*errNoSuchStore), err + } _, err = collection.Upsert(key, &value, &gocb.UpsertOptions{Transcoder: gocb.NewRawJSONTranscoder()}) if err != nil { h.Logger.Error("unable to store value", "key", key, "error", err) @@ -93,6 +100,10 @@ func (h *Handler) Delete(ctx context.Context, bucket string, key string) (*wrpc. defer span.End() h.Logger.Debug("received request to delete value", "key", key) collection, err := h.getCollectionFromContext(ctx) + if err != nil { + h.Logger.Error("unable to get collection from context", "error", err) + return wrpc.Err[struct{}](*errNoSuchStore), err + } _, err = collection.Remove(key, nil) if err != nil { h.Logger.Error("unable to remove value", "key", key, "error", err) @@ -106,6 +117,10 @@ func (h *Handler) Exists(ctx context.Context, bucket string, key string) (*wrpc. defer span.End() h.Logger.Debug("received request to check value existence", "key", key) collection, err := h.getCollectionFromContext(ctx) + if err != nil { + h.Logger.Error("unable to get collection from context", "error", err) + return wrpc.Err[bool](*errNoSuchStore), err + } res, err := collection.Exists(key, nil) if err != nil { h.Logger.Error("unable to check existence of value", "key", key, "error", err) @@ -125,6 +140,10 @@ func (h *Handler) Increment(ctx context.Context, bucket string, key string, delt defer span.End() h.Logger.Debug("received request to increment key by delta", "key", key, "delta", delta) collection, err := h.getCollectionFromContext(ctx) + if err != nil { + h.Logger.Error("unable to get collection from context", "error", err) + return wrpc.Err[uint64](*errNoSuchStore), err + } res, err := collection.Binary().Increment(key, &gocb.IncrementOptions{Initial: int64(delta), Delta: delta}) if err != nil { h.Logger.Error("unable to increment value at key", "key", key, "error", err) @@ -133,32 +152,3 @@ func (h *Handler) Increment(ctx context.Context, bucket string, key string, delt return wrpc.Ok[atomics.Error](res.Content()), nil } - -func (h *Handler) updateCouchbaseCluster(handler *Handler, sourceId string, config map[string]string) { - // Connect to the cluster - cluster, err := gocb.Connect(config["connectionString"], gocb.ClusterOptions{ - Username: config["username"], - Password: config["password"], - }) - if err != nil { - handler.Logger.Error("unable to connect to couchbase cluster", "error", err) - return - } - var collection *gocb.Collection - bucketName := config["bucketName"] - scopeName := config["scopeName"] - collectionName := config["collectionName"] - if collectionName != "" && scopeName != "" { - collection = cluster.Bucket(bucketName).Scope(scopeName).Collection(collectionName) - } else { - collection = cluster.Bucket(bucketName).DefaultCollection() - } - - bucket := cluster.Bucket(bucketName) - if err = bucket.WaitUntilReady(5*time.Second, nil); err != nil { - handler.Logger.Error("unable to connect to couchbase bucket", "error", err) - } - - // Store the connection - handler.clusterConnections[sourceId] = collection -} diff --git a/tracing.go b/tracing.go new file mode 100644 index 0000000..8a67f98 --- /dev/null +++ b/tracing.go @@ -0,0 +1,89 @@ +package main + +import ( + "context" + "errors" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" +) + +func setupOTelSDK(ctx context.Context) (shutdown func(context.Context) error, err error) { + var shutdownFuncs []func(context.Context) error + + // shutdown calls cleanup functions registered via shutdownFuncs. + // The errors from the calls are joined. + // Each registered cleanup will be invoked once. + shutdown = func(ctx context.Context) error { + var err error + for _, fn := range shutdownFuncs { + err = errors.Join(err, fn(ctx)) + } + shutdownFuncs = nil + return err + } + + // handleErr calls shutdown for cleanup and makes sure that all errors are returned. + handleErr := func(inErr error) { + err = errors.Join(inErr, shutdown(ctx)) + } + + // Set up propagator. + prop := newPropagator() + otel.SetTextMapPropagator(prop) + + // Set up trace provider. + tracerProvider, err := newTraceProvider() + if err != nil { + handleErr(err) + return + } + shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown) + otel.SetTracerProvider(tracerProvider) + traceProvider := otel.GetTracerProvider() + tracer = traceProvider.Tracer(TRACER_NAME) + + return +} + +func newExporter(ctx context.Context) (*otlptrace.Exporter, error) { + exporter, err := otlptrace.New( + ctx, + otlptracehttp.NewClient( + otlptracehttp.WithEndpoint("localhost:4318"), + + otlptracehttp.WithInsecure(), + ), + ) + return exporter, err +} + +func newTraceProvider() (*trace.TracerProvider, error) { + traceExporter, err := newExporter(context.Background()) + if err != nil { + return nil, err + } + + traceProvider := trace.NewTracerProvider( + trace.WithBatcher(traceExporter, + // Default is 5s. Set to 1s for demonstrative purposes. + trace.WithBatchTimeout(time.Second)), + trace.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String("couchbase-provider"), + ))) + return traceProvider, nil +} + +func newPropagator() propagation.TextMapPropagator { + return propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ) +} From af010438ed3feb0d2e8fb9ae9530615e48820403 Mon Sep 17 00:00:00 2001 From: Brooks Townsend Date: Mon, 22 Jul 2024 10:50:14 -0400 Subject: [PATCH 2/2] feat: support secrets for password Signed-off-by: Brooks Townsend --- README.md | 34 ++++++++---- config.go | 30 +++-------- docker-compose.yaml | 22 ++++---- go.mod | 6 +-- go.sum | 8 +-- main.go | 122 +++++++++----------------------------------- run.sh | 8 +-- wadm.yaml | 48 +++++++++-------- 8 files changed, 97 insertions(+), 181 deletions(-) diff --git a/README.md b/README.md index 81eb371..7c0d320 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ This provider uses the **RawJSONTranscoder** for Couchbase, storing any new keys Prerequisites: -- [wash 0.29](https://wasmcloud.com/docs/installation) or later +- [wash 0.30](https://wasmcloud.com/docs/installation) or later Build this capability provider with: @@ -18,19 +18,33 @@ wash build ## Run -Prerequisites: +### Prerequisites -- [wash 0.29](https://wasmcloud.com/docs/installation) or later +- [wash 0.30](https://wasmcloud.com/docs/installation) or later +- The [secrets-nats-kv](https://github.com/wasmCloud/wasmCloud/tree/main/crates/secrets-nats-kv) CLI installed (for now this requires a Rust toolchain) - A built couchbase capability provider, see [#build](#build) - Setup Couchbase server with the required configuration for testing using docker-compose.yaml in the repo. - ``` - docker-compose up -d - ``` - Alternatively, you can use [Quick Install](https://docs.couchbase.com/server/current/getting-started/do-a-quick-install.html) guide with a bucket named **test** created. - + +```bash +docker-compose up -d +``` + +Alternatively, you can use [Quick Install](https://docs.couchbase.com/server/current/getting-started/do-a-quick-install.html) guide with a bucket named **test** created. + +### Running ```shell -wash up -d +WASMCLOUD_SECRETS_TOPIC=wasmcloud.secrets \ + wash up -d + +# Generate encryption keys and run the backend +export ENCRYPTION_XKEY_SEED=$(wash keys gen curve -o json | jq -r '.seed') +export TRANSIT_XKEY_SEED=$(wash keys gen curve -o json | jq -r '.seed') +secrets-nats-kv run & +# Put the password in the NATS KV secrets backend +provider_key=$(wash inspect ./build/wasmcloud-provider-couchbase.par.gz -o json | jq -r '.service') +secrets-nats-kv put couchbase_password --string password +secrets-nats-kv add-mapping $provider_key --secret couchbase_password wash app deploy ./wadm.yaml ``` @@ -42,7 +56,7 @@ curl localhost:8080/couchbase ## Test -To test the WIT bindings, download [`wit-bindgen`][wit-bindgen] and run the following: +To test the WIT bindings, download [wit-bindgen](https://github.com/bytecodealliance/wit-bindgen) and run the following: ```console wit-deps && wit-bindgen rust --out-dir /tmp/wit wit/ diff --git a/config.go b/config.go index 69cce93..4dfb89b 100644 --- a/config.go +++ b/config.go @@ -10,7 +10,6 @@ type CouchbaseConnectionArgs struct { Username string Password string BucketName string - Host string ConnectionString string ScopeName string CollectionName string @@ -20,39 +19,24 @@ type CouchbaseConnectionArgs struct { func validateCouchbaseConfig(config map[string]string, secrets map[string]provider.SecretValue) (CouchbaseConnectionArgs, error) { connectionArgs := CouchbaseConnectionArgs{} if username, ok := config["username"]; !ok || username == "" { - return connectionArgs, errors.New("username is required") + return connectionArgs, errors.New("username config is required") } else { connectionArgs.Username = username } - if bucketName, ok := config["bucket_name"]; !ok || bucketName == "" { - return connectionArgs, errors.New("bucket_name is required") + if bucketName, ok := config["bucketName"]; !ok || bucketName == "" { + return connectionArgs, errors.New("bucketName config is required") } else { connectionArgs.BucketName = bucketName } - if host, ok := config["host"]; !ok || host == "" { - return connectionArgs, errors.New("host is required") - } else { - connectionArgs.Host = host - } - if connectionString, ok := config["connection_string"]; !ok || connectionString == "" { - return connectionArgs, errors.New("connection_string is required") + if connectionString, ok := config["connectionString"]; !ok || connectionString == "" { + return connectionArgs, errors.New("connectionString config is required") } else { connectionArgs.ConnectionString = connectionString } - if scopeName, ok := config["scope_name"]; !ok || scopeName == "" { - return connectionArgs, errors.New("scope_name is required") - } else { - connectionArgs.ScopeName = scopeName - } - if collectionName, ok := config["collection_name"]; !ok || collectionName == "" { - return connectionArgs, errors.New("collection_name is required") - } else { - connectionArgs.CollectionName = collectionName - } - password := secrets["password"].StringValue() + password := secrets["password"].String.Reveal() if password == "" { - return connectionArgs, errors.New("password is required") + return connectionArgs, errors.New("password secret is required") } else { connectionArgs.Password = password } diff --git a/docker-compose.yaml b/docker-compose.yaml index 94edfcf..89f6313 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,5 +1,3 @@ -version: '3.7' - services: couchbase: image: couchbase:latest @@ -18,26 +16,26 @@ services: - COUCHBASE_CONNECTION_STRING=localhost volumes: - couchbase-data:/opt/couchbase/var - + couchbase-init: image: couchbase:latest depends_on: - couchbase volumes: - ./init-couchbase.sh:/init-couchbase.sh - entrypoint: "/init-couchbase.sh" + entrypoint: '/init-couchbase.sh' jaeger-all-in-one: image: jaegertracing/all-in-one:1.58 restart: always ports: - - "16686:16686" - - "14268:14268" - - "14250:14250" - - "9411:9411" - - "4317:4317" - - "4318:4318" + - '16686:16686' + - '14268:14268' + - '14250:14250' + - '9411:9411' + - '4317:4317' + - '4318:4318' environment: COLLECTOR_ZIPKIN_HTTP_PORT: 9411 - COLLECTOR_OTLP_ENABLED: "true" + COLLECTOR_OTLP_ENABLED: 'true' volumes: - couchbase-data: \ No newline at end of file + couchbase-data: diff --git a/go.mod b/go.mod index ccfe484..728113f 100644 --- a/go.mod +++ b/go.mod @@ -4,14 +4,13 @@ go 1.22.3 require ( github.com/couchbase/gocb/v2 v2.8.1 - github.com/wasmCloud/provider-sdk-go v0.0.0-20240626135506-00b1fb3dec9e + github.com/wasmCloud/provider-sdk-go v0.0.0-20240724131928-654ff747dffc github.com/wrpc/wrpc/go v0.0.0-20240619071643-b830439e40d6 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 ) require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect - github.com/felixge/httpsnoop v1.0.4 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240711142825-46eb208f015d // indirect @@ -31,12 +30,11 @@ require ( github.com/nats-io/nats.go v1.36.0 // indirect github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 go.opentelemetry.io/otel v1.28.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0 go.opentelemetry.io/otel/metric v1.28.0 // indirect go.opentelemetry.io/otel/sdk v1.28.0 - go.opentelemetry.io/otel/trace v1.28.0 // indirect + go.opentelemetry.io/otel/trace v1.28.0 go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.24.0 // indirect diff --git a/go.sum b/go.sum index 2808e2c..8ac8acc 100644 --- a/go.sum +++ b/go.sum @@ -25,8 +25,6 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= -github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -87,14 +85,12 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/wasmCloud/provider-sdk-go v0.0.0-20240626135506-00b1fb3dec9e h1:UIK1VDU1hwvGTd4S9pokw/IoY0ChIhYldWB44JnWuD8= -github.com/wasmCloud/provider-sdk-go v0.0.0-20240626135506-00b1fb3dec9e/go.mod h1:DZRTpRkX3YluIs+FKcx157XbQa1q4rFRDpxbamZZBc0= +github.com/wasmCloud/provider-sdk-go v0.0.0-20240724131928-654ff747dffc h1:EA7GYD0MvfzxziD2BW5kLuu12x8Y41B/ZSPpX6dOzm0= +github.com/wasmCloud/provider-sdk-go v0.0.0-20240724131928-654ff747dffc/go.mod h1:2dz9nrAHpuVaJo/pjjtn8Lw5q0GYM5oliisEpqVwAV4= github.com/wrpc/wrpc/go v0.0.0-20240619071643-b830439e40d6 h1:P2X0Tsw8o45TmBZbST5vRguBjK80e7QJ9TX/fh/8l2U= github.com/wrpc/wrpc/go v0.0.0-20240619071643-b830439e40d6/go.mod h1:xbr40Iv8kVVnQHnHzmdbtaxOvyRZLb7y3c90vrGGcxo= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 h1:4K4tsIXefpVJtvA/8srF4V4y0akAoPHkIslgAkjixJA= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0/go.mod h1:jjdQuTGVsXV4vSs+CJ2qYDeDPf9yIJV23qlIzBm73Vg= go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 h1:3Q/xZUyC1BBkualc9ROb4G8qkH90LXEIICcs5zv1OYY= diff --git a/main.go b/main.go index 08397f2..e52a8ca 100644 --- a/main.go +++ b/main.go @@ -14,14 +14,6 @@ import ( server "github.com/couchbase-examples/wasmcloud-provider-couchbase/bindings" "github.com/couchbase/gocb/v2" "github.com/wasmCloud/provider-sdk-go" - "go.opentelemetry.io/otel" - - "go.opentelemetry.io/otel/exporters/otlp/otlptrace" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" - "go.opentelemetry.io/otel/propagation" - "go.opentelemetry.io/otel/sdk/resource" - "go.opentelemetry.io/otel/sdk/trace" - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" ) func main() { @@ -104,32 +96,43 @@ func run() error { return nil } +// Provider handler functions func handleNewTargetLink(handler *Handler, link provider.InterfaceLinkDefinition) error { handler.Logger.Info("Handling new target link", "link", link) handler.linkedFrom[link.SourceID] = link.TargetConfig - err := ValidateCouchbaseConfig(link.TargetConfig) + couchbaseConnectionArgs, err := validateCouchbaseConfig(link.TargetConfig, link.TargetSecrets) if err != nil { handler.Logger.Error("Invalid couchbase target config", "error", err) return err } - handler.updateCouchbaseCluster(handler, link.SourceID, link.TargetConfig) + handler.updateCouchbaseCluster(handler, link.SourceID, couchbaseConnectionArgs) return nil } -func ValidateCouchbaseConfig(config map[string]string) error { - if config["username"] == "" { - return errors.New("username is required") - } - if config["password"] == "" { - return errors.New("password is required") +func (h *Handler) updateCouchbaseCluster(handler *Handler, sourceId string, connectionArgs CouchbaseConnectionArgs) { + // Connect to the cluster + cluster, err := gocb.Connect(connectionArgs.ConnectionString, gocb.ClusterOptions{ + Username: connectionArgs.Username, + Password: connectionArgs.Password, + }) + if err != nil { + handler.Logger.Error("unable to connect to couchbase cluster", "error", err) + return } - if config["bucketName"] == "" { - return errors.New("bucket is required") + var collection *gocb.Collection + if connectionArgs.CollectionName != "" && connectionArgs.ScopeName != "" { + collection = cluster.Bucket(connectionArgs.BucketName).Scope(connectionArgs.ScopeName).Collection(connectionArgs.CollectionName) + } else { + collection = cluster.Bucket(connectionArgs.BucketName).DefaultCollection() } - if config["connectionString"] == "" { - return errors.New("connectionString is required") + + bucket := cluster.Bucket(connectionArgs.BucketName) + if err = bucket.WaitUntilReady(5*time.Second, nil); err != nil { + handler.Logger.Error("unable to connect to couchbase bucket", "error", err) } - return nil + + // Store the connection + handler.clusterConnections[sourceId] = collection } func handleDelTargetLink(handler *Handler, link provider.InterfaceLinkDefinition) error { @@ -139,7 +142,7 @@ func handleDelTargetLink(handler *Handler, link provider.InterfaceLinkDefinition } func handleHealthCheck(handler *Handler) string { - handler.Logger.Info("Handling health check") + handler.Logger.Debug("Handling health check") return "provider healthy" } @@ -148,78 +151,3 @@ func handleShutdown(handler *Handler) error { // clear(handler.linkedFrom) return nil } - -func setupOTelSDK(ctx context.Context) (shutdown func(context.Context) error, err error) { - var shutdownFuncs []func(context.Context) error - - // shutdown calls cleanup functions registered via shutdownFuncs. - // The errors from the calls are joined. - // Each registered cleanup will be invoked once. - shutdown = func(ctx context.Context) error { - var err error - for _, fn := range shutdownFuncs { - err = errors.Join(err, fn(ctx)) - } - shutdownFuncs = nil - return err - } - - // handleErr calls shutdown for cleanup and makes sure that all errors are returned. - handleErr := func(inErr error) { - err = errors.Join(inErr, shutdown(ctx)) - } - - // Set up propagator. - prop := newPropagator() - otel.SetTextMapPropagator(prop) - - // Set up trace provider. - tracerProvider, err := newTraceProvider() - if err != nil { - handleErr(err) - return - } - shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown) - otel.SetTracerProvider(tracerProvider) - traceProvider := otel.GetTracerProvider() - tracer = traceProvider.Tracer(TRACER_NAME) - - return -} - -func newExporter(ctx context.Context) (*otlptrace.Exporter, error) { - - exporter, err := otlptrace.New( - context.Background(), - otlptracehttp.NewClient( - otlptracehttp.WithEndpoint("localhost:4318"), - - otlptracehttp.WithInsecure(), - ), - ) - return exporter, err -} - -func newTraceProvider() (*trace.TracerProvider, error) { - traceExporter, err := newExporter(context.Background()) - if err != nil { - return nil, err - } - - traceProvider := trace.NewTracerProvider( - trace.WithBatcher(traceExporter, - // Default is 5s. Set to 1s for demonstrative purposes. - trace.WithBatchTimeout(time.Second)), - trace.WithResource(resource.NewWithAttributes( - semconv.SchemaURL, - semconv.ServiceNameKey.String("couchbase-provider"), - ))) - return traceProvider, nil -} - -func newPropagator() propagation.TextMapPropagator { - return propagation.NewCompositeTextMapPropagator( - propagation.TraceContext{}, - propagation.Baggage{}, - ) -} diff --git a/run.sh b/run.sh index c585283..aa6fd14 100755 --- a/run.sh +++ b/run.sh @@ -5,12 +5,6 @@ host_data=' "lattice_rpc_url": "0.0.0.0:4222", "lattice_rpc_prefix": "default", "provider_key": "couchbase", - "link_name": "default", - "config": { - "username": "Administrator", - "password": "password", - "bucketName": "test", - "connectionString": "localhost" - } + "link_name": "default" }' echo $host_data | base64 | go run ./ diff --git a/wadm.yaml b/wadm.yaml index cf37ade..fe63dd4 100644 --- a/wadm.yaml +++ b/wadm.yaml @@ -6,6 +6,11 @@ metadata: version: v0.0.1 description: 'Couchbase capability provider demonstration with wasmCloud' spec: + policies: + - name: nats-kv + type: policy.secret.wasmcloud.dev/v1alpha1 + properties: + backend: nats-kv components: - name: counter type: component @@ -14,39 +19,37 @@ spec: traits: - type: spreadscaler properties: - replicas: 1 + replicas: 100 # Link the component to the provider on wasi-keyvalue - type: link properties: - target: couchbase namespace: wasi package: keyvalue interfaces: [atomics, store] - target_config: - - name: provider-config - properties: - username: 'Administrator' - password: 'password' - bucketName: 'test' - connectionString: 'localhost' - scopeName: 'test' + target: + name: couchbase + secrets: + - name: couchbase_password + properties: + policy: nats-kv + key: couchbase_password + config: + - name: provider-config + properties: + username: 'Administrator' + bucketName: 'test' + connectionString: 'localhost' + - name: couchbase type: capability properties: image: file://./build/wasmcloud-provider-couchbase.par.gz id: couchbase - config: - - name: provider-config - properties: - username: 'Administrator' - password: 'password' - bucketName: 'test' - connectionString: 'localhost' # Add a capability provider that enables HTTP access - name: httpserver type: capability properties: - image: ghcr.io/wasmcloud/http-server:0.20.0 + image: ghcr.io/wasmcloud/http-server:0.21.0 traits: # Link the httpserver to the component, and configure the HTTP server # to listen on port 8080 for incoming requests @@ -56,7 +59,8 @@ spec: namespace: wasi package: http interfaces: [incoming-handler] - source_config: - - name: default-http - properties: - address: 127.0.0.1:8080 + source: + config: + - name: default-http + properties: + address: 127.0.0.1:8080