Skip to content

Commit

Permalink
feat(pkger): extend pkger to provide sources of templates to each tem…
Browse files Browse the repository at this point in the history
…plate
jsteenb2 committed Jun 16, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 020c9e7 commit b977568
Showing 18 changed files with 463 additions and 230 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
### Features

1. [18387](https://github.com/influxdata/influxdb/pull/18387): Integrate query cancellation after queries have been submitted
1. [18515](https://github.com/influxdata/influxdb/pull/18515): Extend templates with the source file|url|reader.

## v2.0.0-beta.12 [2020-06-12]

5 changes: 3 additions & 2 deletions cmd/influx/pkg.go
Original file line number Diff line number Diff line change
@@ -214,11 +214,12 @@ func (b *cmdPkgBuilder) pkgApplyRunEFn(cmd *cobra.Command, args []string) error
}

opts := []pkger.ApplyOptFn{
pkger.ApplyWithPkg(pkg),
pkger.ApplyWithEnvRefs(providedEnvRefs),
pkger.ApplyWithStackID(stackID),
}

dryRunImpact, err := svc.DryRun(context.Background(), influxOrgID, 0, pkg, opts...)
dryRunImpact, err := svc.DryRun(context.Background(), influxOrgID, 0, opts...)
if err != nil {
return err
}
@@ -254,7 +255,7 @@ func (b *cmdPkgBuilder) pkgApplyRunEFn(cmd *cobra.Command, args []string) error

opts = append(opts, pkger.ApplyWithSecrets(providedSecrets))

impact, err := svc.Apply(context.Background(), influxOrgID, 0, pkg, opts...)
impact, err := svc.Apply(context.Background(), influxOrgID, 0, opts...)
if err != nil {
return err
}
12 changes: 6 additions & 6 deletions cmd/influx/pkg_test.go
Original file line number Diff line number Diff line change
@@ -708,8 +708,8 @@ func testPkgWritesToBuffer(newCmdFn func(w io.Writer) *cobra.Command, args pkgFi
type fakePkgSVC struct {
initStackFn func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error)
createFn func(ctx context.Context, setters ...pkger.CreatePkgSetFn) (*pkger.Pkg, error)
dryRunFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg) (pkger.PkgImpactSummary, error)
applyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error)
dryRunFn func(ctx context.Context, orgID, userID influxdb.ID, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error)
applyFn func(ctx context.Context, orgID, userID influxdb.ID, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error)
}

var _ pkger.SVC = (*fakePkgSVC)(nil)
@@ -740,16 +740,16 @@ func (f *fakePkgSVC) CreatePkg(ctx context.Context, setters ...pkger.CreatePkgSe
panic("not implemented")
}

func (f *fakePkgSVC) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error) {
func (f *fakePkgSVC) DryRun(ctx context.Context, orgID, userID influxdb.ID, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error) {
if f.dryRunFn != nil {
return f.dryRunFn(ctx, orgID, userID, pkg)
return f.dryRunFn(ctx, orgID, userID, opts...)
}
panic("not implemented")
}

func (f *fakePkgSVC) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error) {
func (f *fakePkgSVC) Apply(ctx context.Context, orgID, userID influxdb.ID, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error) {
if f.applyFn != nil {
return f.applyFn(ctx, orgID, userID, pkg, opts...)
return f.applyFn(ctx, orgID, userID, opts...)
}
panic("not implemented")
}
149 changes: 98 additions & 51 deletions cmd/influxd/launcher/pkger_test.go

Large diffs are not rendered by default.

30 changes: 28 additions & 2 deletions http/swagger.yml
Original file line number Diff line number Diff line change
@@ -4732,6 +4732,10 @@ paths:
type: string
description:
type: string
sources:
type: array
items:
type: string
urls:
type: array
items:
@@ -7647,11 +7651,29 @@ components:
stackID:
type: string
package:
$ref: "#/components/schemas/Pkg"
type: object
properties:
contentType:
type: string
sources:
type: array
items:
type: string
package:
$ref: "#/components/schemas/Pkg"
packages:
type: array
items:
$ref: "#/components/schemas/Pkg"
type: object
properties:
contentType:
type: string
sources:
type: array
items:
type: string
package:
$ref: "#/components/schemas/Pkg"
secrets:
type: object
additionalProperties:
@@ -7759,6 +7781,10 @@ components:
PkgSummary:
type: object
properties:
sources:
type: array
items:
type: string
stackID:
type: string
summary:
33 changes: 18 additions & 15 deletions pkger/http_remote_service.go
Original file line number Diff line number Diff line change
@@ -68,7 +68,7 @@ func (s *HTTPRemoteService) ExportStack(ctx context.Context, orgID, stackID infl
Get(RoutePrefix, "stacks", stackID.String(), "export").
QueryParams([2]string{"orgID", orgID.String()}).
Decode(func(resp *http.Response) error {
decodedPkg, err := Parse(EncodingJSON, FromReader(resp.Body))
decodedPkg, err := Parse(EncodingJSON, FromReader(resp.Body, ""))
if err != nil {
return err
}
@@ -139,7 +139,7 @@ func (s *HTTPRemoteService) CreatePkg(ctx context.Context, setters ...CreatePkgS
err := s.Client.
PostJSON(reqBody, RoutePrefix).
Decode(func(resp *http.Response) error {
pkg, err := Parse(EncodingJSON, FromReader(resp.Body))
pkg, err := Parse(EncodingJSON, FromReader(resp.Body, "export"))
newPkg = pkg
return err
}).
@@ -157,35 +157,37 @@ func (s *HTTPRemoteService) CreatePkg(ctx context.Context, setters ...CreatePkgS
// DryRun provides a dry run of the pkg application. The pkg will be marked verified
// for later calls to Apply. This func will be run on an Apply if it has not been run
// already.
func (s *HTTPRemoteService) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (PkgImpactSummary, error) {
return s.apply(ctx, orgID, pkg, true, opts...)
func (s *HTTPRemoteService) DryRun(ctx context.Context, orgID, userID influxdb.ID, opts ...ApplyOptFn) (PkgImpactSummary, error) {
return s.apply(ctx, orgID, true, opts...)
}

// Apply will apply all the resources identified in the provided pkg. The entire pkg will be applied
// in its entirety. If a failure happens midway then the entire pkg will be rolled back to the state
// from before the pkg was applied.
func (s *HTTPRemoteService) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (PkgImpactSummary, error) {
return s.apply(ctx, orgID, pkg, false, opts...)
func (s *HTTPRemoteService) Apply(ctx context.Context, orgID, userID influxdb.ID, opts ...ApplyOptFn) (PkgImpactSummary, error) {
return s.apply(ctx, orgID, false, opts...)
}

func (s *HTTPRemoteService) apply(ctx context.Context, orgID influxdb.ID, pkg *Pkg, dryRun bool, opts ...ApplyOptFn) (PkgImpactSummary, error) {
func (s *HTTPRemoteService) apply(ctx context.Context, orgID influxdb.ID, dryRun bool, opts ...ApplyOptFn) (PkgImpactSummary, error) {
opt := applyOptFromOptFns(opts...)

var rawPkg []byte
if pkg != nil {
var rawPkg ReqRawPkg
for _, pkg := range opt.Pkgs {
b, err := pkg.Encode(EncodingJSON)
if err != nil {
return PkgImpactSummary{}, err
}
rawPkg = b
rawPkg.Pkg = b
rawPkg.Sources = pkg.sources
rawPkg.ContentType = EncodingJSON.String()
}

reqBody := ReqApplyPkg{
OrgID: orgID.String(),
DryRun: dryRun,
EnvRefs: opt.EnvRefs,
Secrets: opt.MissingSecrets,
RawPkg: rawPkg,
OrgID: orgID.String(),
DryRun: dryRun,
EnvRefs: opt.EnvRefs,
Secrets: opt.MissingSecrets,
RawTemplate: rawPkg,
}
if opt.StackID != 0 {
stackID := opt.StackID.String()
@@ -202,6 +204,7 @@ func (s *HTTPRemoteService) apply(ctx context.Context, orgID influxdb.ID, pkg *P
}

impact := PkgImpactSummary{
Sources: resp.Sources,
Diff: resp.Diff,
Summary: resp.Summary,
}
108 changes: 82 additions & 26 deletions pkger/http_server.go
Original file line number Diff line number Diff line change
@@ -385,38 +385,50 @@ func (s *HTTPServer) createPkg(w http.ResponseWriter, r *http.Request) {
s.encResp(w, r, enc, http.StatusOK, resp)
}

// PkgRemote provides a package via a remote (i.e. a gist). If content type is not
// ReqPkgRemote provides a package via a remote (i.e. a gist). If content type is not
// provided then the service will do its best to discern the content type of the
// contents.
type PkgRemote struct {
URL string `json:"url"`
ContentType string `json:"contentType"`
type ReqPkgRemote struct {
URL string `json:"url" yaml:"url"`
ContentType string `json:"contentType" yaml:"contentType"`
}

// Encoding returns the encoding type that corresponds to the given content type.
func (p PkgRemote) Encoding() Encoding {
ct := strings.ToLower(p.ContentType)
urlBase := path.Ext(p.URL)
switch {
case ct == "jsonnet" || urlBase == ".jsonnet":
return EncodingJsonnet
case ct == "json" || urlBase == ".json":
return EncodingJSON
case ct == "yml" || ct == "yaml" || urlBase == ".yml" || urlBase == ".yaml":
return EncodingYAML
default:
return EncodingSource
func (p ReqPkgRemote) Encoding() Encoding {
return convertEncoding(p.ContentType, p.URL)
}

type ReqRawPkg struct {
ContentType string `json:"contentType" yaml:"contentType"`
Sources []string `json:"sources" yaml:"sources"`
Pkg json.RawMessage `json:"contents" yaml:"contents"`
}

func (p ReqRawPkg) Encoding() Encoding {
var source string
if len(p.Sources) > 0 {
source = p.Sources[0]
}
return convertEncoding(p.ContentType, source)
}

// ReqApplyPkg is the request body for a json or yaml body for the apply pkg endpoint.
type ReqApplyPkg struct {
DryRun bool `json:"dryRun" yaml:"dryRun"`
OrgID string `json:"orgID" yaml:"orgID"`
StackID *string `json:"stackID" yaml:"stackID"` // optional: non nil value signals stack should be used
Remotes []PkgRemote `json:"remotes" yaml:"remotes"`
DryRun bool `json:"dryRun" yaml:"dryRun"`
OrgID string `json:"orgID" yaml:"orgID"`
StackID *string `json:"stackID" yaml:"stackID"` // optional: non nil value signals stack should be used
Remotes []ReqPkgRemote `json:"remotes" yaml:"remotes"`

// TODO(jsteenb2): pkg references will all be replaced by template references
// these 2 exist alongside the templates for backwards compatibility
// until beta13 rolls out the door. This code should get axed when the next
// OSS release goes out.
RawPkgs []json.RawMessage `json:"packages" yaml:"packages"`
RawPkg json.RawMessage `json:"package" yaml:"package"`

RawTemplates []ReqRawPkg `json:"templates" yaml:"templates"`
RawTemplate ReqRawPkg `json:"template" yaml:"template"`

EnvRefs map[string]string `json:"envRefs"`
Secrets map[string]string `json:"secrets"`
}
@@ -442,11 +454,31 @@ func (r ReqApplyPkg) Pkgs(encoding Encoding) (*Pkg, error) {
if rawPkg == nil {
continue
}

pkg, err := Parse(encoding, FromReader(bytes.NewReader(rawPkg)), ValidSkipParseError())
if err != nil {
return nil, &influxdb.Error{
Code: influxdb.EUnprocessableEntity,
Msg: fmt.Sprintf("pkg [%d] had an issue: %s", i, err.Error()),
Msg: fmt.Sprintf("pkg[%d] had an issue: %s", i, err.Error()),
}
}
rawPkgs = append(rawPkgs, pkg)
}

for i, rawTmpl := range append(r.RawTemplates, r.RawTemplate) {
if rawTmpl.Pkg == nil {
continue
}
enc := encoding
if sourceEncoding := rawTmpl.Encoding(); sourceEncoding != EncodingSource {
enc = sourceEncoding
}
pkg, err := Parse(enc, FromReader(bytes.NewReader(rawTmpl.Pkg), rawTmpl.Sources...), ValidSkipParseError())
if err != nil {
sources := formatSources(rawTmpl.Sources)
return nil, &influxdb.Error{
Code: influxdb.EUnprocessableEntity,
Msg: fmt.Sprintf("pkg[%d] from source(s) %q had an issue: %s", i, sources, err.Error()),
}
}
rawPkgs = append(rawPkgs, pkg)
@@ -457,9 +489,10 @@ func (r ReqApplyPkg) Pkgs(encoding Encoding) (*Pkg, error) {

// RespApplyPkg is the response body for the apply pkg endpoint.
type RespApplyPkg struct {
StackID string `json:"stackID" yaml:"stackID"`
Diff Diff `json:"diff" yaml:"diff"`
Summary Summary `json:"summary" yaml:"summary"`
Sources []string `json:"sources" yaml:"sources"`
StackID string `json:"stackID" yaml:"stackID"`
Diff Diff `json:"diff" yaml:"diff"`
Summary Summary `json:"summary" yaml:"summary"`

Errors []ValidationErr `json:"errors,omitempty" yaml:"errors,omitempty"`
}
@@ -510,13 +543,15 @@ func (s *HTTPServer) applyPkg(w http.ResponseWriter, r *http.Request) {

applyOpts := []ApplyOptFn{
ApplyWithEnvRefs(reqBody.EnvRefs),
ApplyWithPkg(parsedPkg),
ApplyWithStackID(stackID),
}

if reqBody.DryRun {
impact, err := s.svc.DryRun(r.Context(), *orgID, userID, parsedPkg, applyOpts...)
impact, err := s.svc.DryRun(r.Context(), *orgID, userID, applyOpts...)
if IsParseErr(err) {
s.api.Respond(w, r, http.StatusUnprocessableEntity, RespApplyPkg{
Sources: append([]string{}, impact.Sources...), // guarantee non nil slice
StackID: impact.StackID.String(),
Diff: impact.Diff,
Summary: impact.Summary,
@@ -530,6 +565,7 @@ func (s *HTTPServer) applyPkg(w http.ResponseWriter, r *http.Request) {
}

s.api.Respond(w, r, http.StatusOK, RespApplyPkg{
Sources: append([]string{}, impact.Sources...), // guarantee non nil slice
StackID: impact.StackID.String(),
Diff: impact.Diff,
Summary: impact.Summary,
@@ -539,13 +575,14 @@ func (s *HTTPServer) applyPkg(w http.ResponseWriter, r *http.Request) {

applyOpts = append(applyOpts, ApplyWithSecrets(reqBody.Secrets))

impact, err := s.svc.Apply(r.Context(), *orgID, userID, parsedPkg, applyOpts...)
impact, err := s.svc.Apply(r.Context(), *orgID, userID, applyOpts...)
if err != nil && !IsParseErr(err) {
s.api.Err(w, r, err)
return
}

s.api.Respond(w, r, http.StatusCreated, RespApplyPkg{
Sources: append([]string{}, impact.Sources...), // guarantee non nil slice
StackID: impact.StackID.String(),
Diff: impact.Diff,
Summary: impact.Summary,
@@ -557,6 +594,10 @@ type encoder interface {
Encode(interface{}) error
}

func formatSources(sources []string) string {
return strings.Join(sources, "; ")
}

func decodeWithEncoding(r *http.Request, v interface{}) (Encoding, error) {
encoding := pkgEncoding(r.Header.Get("Content-Type"))

@@ -584,6 +625,21 @@ func pkgEncoding(contentType string) Encoding {
}
}

func convertEncoding(ct, rawURL string) Encoding {
ct = strings.ToLower(ct)
urlBase := path.Ext(rawURL)
switch {
case ct == "jsonnet" || urlBase == ".jsonnet":
return EncodingJsonnet
case ct == "json" || urlBase == ".json":
return EncodingJSON
case ct == "yml" || ct == "yaml" || urlBase == ".yml" || urlBase == ".yaml":
return EncodingYAML
default:
return EncodingSource
}
}

func newJSONEnc(w io.Writer) encoder {
enc := json.NewEncoder(w)
enc.SetIndent("", "\t")
137 changes: 93 additions & 44 deletions pkger/http_server_test.go
Original file line number Diff line number Diff line change
@@ -116,25 +116,25 @@ func TestPkgerHTTPServer(t *testing.T) {
name: "app json",
contentType: "application/json",
reqBody: pkger.ReqApplyPkg{
DryRun: true,
OrgID: influxdb.ID(9000).String(),
RawPkg: bucketPkgKinds(t, pkger.EncodingJSON),
DryRun: true,
OrgID: influxdb.ID(9000).String(),
RawTemplate: bucketPkgKinds(t, pkger.EncodingJSON),
},
},
{
name: "defaults json when no content type",
reqBody: pkger.ReqApplyPkg{
DryRun: true,
OrgID: influxdb.ID(9000).String(),
RawPkg: bucketPkgKinds(t, pkger.EncodingJSON),
DryRun: true,
OrgID: influxdb.ID(9000).String(),
RawTemplate: bucketPkgKinds(t, pkger.EncodingJSON),
},
},
{
name: "retrieves package from a URL",
reqBody: pkger.ReqApplyPkg{
DryRun: true,
OrgID: influxdb.ID(9000).String(),
Remotes: []pkger.PkgRemote{{
Remotes: []pkger.ReqPkgRemote{{
URL: newPkgURL(t, filesvr.URL, "testdata/remote_bucket.json"),
}},
},
@@ -143,17 +143,26 @@ func TestPkgerHTTPServer(t *testing.T) {
name: "app jsonnet",
contentType: "application/x-jsonnet",
reqBody: pkger.ReqApplyPkg{
DryRun: true,
OrgID: influxdb.ID(9000).String(),
RawPkg: bucketPkgKinds(t, pkger.EncodingJsonnet),
DryRun: true,
OrgID: influxdb.ID(9000).String(),
RawTemplate: bucketPkgKinds(t, pkger.EncodingJsonnet),
},
},
}

for _, tt := range tests {
fn := func(t *testing.T) {
svc := &fakeSVC{
dryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error) {
dryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error) {
var opt pkger.ApplyOpt
for _, o := range opts {
o(&opt)
}
pkg, err := pkger.Combine(opt.Pkgs)
if err != nil {
return pkger.PkgImpactSummary{}, err
}

if err := pkg.Validate(); err != nil {
return pkger.PkgImpactSummary{}, err
}
@@ -212,7 +221,16 @@ func TestPkgerHTTPServer(t *testing.T) {
for _, tt := range tests {
fn := func(t *testing.T) {
svc := &fakeSVC{
dryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error) {
dryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error) {
var opt pkger.ApplyOpt
for _, o := range opts {
o(&opt)
}
pkg, err := pkger.Combine(opt.Pkgs)
if err != nil {
return pkger.PkgImpactSummary{}, err
}

if err := pkg.Validate(); err != nil {
return pkger.PkgImpactSummary{}, err
}
@@ -256,7 +274,7 @@ func TestPkgerHTTPServer(t *testing.T) {
})

t.Run("with multiple pkgs", func(t *testing.T) {
newBktPkg := func(t *testing.T, bktName string) json.RawMessage {
newBktPkg := func(t *testing.T, bktName string) pkger.ReqRawPkg {
t.Helper()

pkgStr := fmt.Sprintf(`[
@@ -275,7 +293,11 @@ func TestPkgerHTTPServer(t *testing.T) {

pkgBytes, err := pkg.Encode(pkger.EncodingJSON)
require.NoError(t, err)
return pkgBytes
return pkger.ReqRawPkg{
ContentType: pkger.EncodingJSON.String(),
Sources: pkg.Sources(),
Pkg: pkgBytes,
}
}

tests := []struct {
@@ -288,11 +310,11 @@ func TestPkgerHTTPServer(t *testing.T) {
reqBody: pkger.ReqApplyPkg{
DryRun: true,
OrgID: influxdb.ID(9000).String(),
Remotes: []pkger.PkgRemote{{
Remotes: []pkger.ReqPkgRemote{{
ContentType: "json",
URL: newPkgURL(t, filesvr.URL, "testdata/remote_bucket.json"),
}},
RawPkgs: []json.RawMessage{
RawTemplates: []pkger.ReqRawPkg{
newBktPkg(t, "bkt1"),
newBktPkg(t, "bkt2"),
newBktPkg(t, "bkt3"),
@@ -303,10 +325,10 @@ func TestPkgerHTTPServer(t *testing.T) {
{
name: "retrieves packages from raw single and list",
reqBody: pkger.ReqApplyPkg{
DryRun: true,
OrgID: influxdb.ID(9000).String(),
RawPkg: newBktPkg(t, "bkt4"),
RawPkgs: []json.RawMessage{
DryRun: true,
OrgID: influxdb.ID(9000).String(),
RawTemplate: newBktPkg(t, "bkt4"),
RawTemplates: []pkger.ReqRawPkg{
newBktPkg(t, "bkt1"),
newBktPkg(t, "bkt2"),
newBktPkg(t, "bkt3"),
@@ -319,7 +341,16 @@ func TestPkgerHTTPServer(t *testing.T) {
for _, tt := range tests {
fn := func(t *testing.T) {
svc := &fakeSVC{
dryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error) {
dryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error) {
var opt pkger.ApplyOpt
for _, o := range opts {
o(&opt)
}
pkg, err := pkger.Combine(opt.Pkgs)
if err != nil {
return pkger.PkgImpactSummary{}, err
}

if err := pkg.Validate(); err != nil {
return pkger.PkgImpactSummary{}, err
}
@@ -373,20 +404,20 @@ func TestPkgerHTTPServer(t *testing.T) {
name: "invalid org id",
contentType: "application/json",
reqBody: pkger.ReqApplyPkg{
DryRun: true,
OrgID: "bad org id",
RawPkg: bucketPkgKinds(t, pkger.EncodingJSON),
DryRun: true,
OrgID: "bad org id",
RawTemplate: bucketPkgKinds(t, pkger.EncodingJSON),
},
expectedStatusCode: http.StatusBadRequest,
},
{
name: "invalid stack id",
contentType: "application/json",
reqBody: pkger.ReqApplyPkg{
DryRun: true,
OrgID: influxdb.ID(9000).String(),
StackID: strPtr("invalid stack id"),
RawPkg: bucketPkgKinds(t, pkger.EncodingJSON),
DryRun: true,
OrgID: influxdb.ID(9000).String(),
StackID: strPtr("invalid stack id"),
RawTemplate: bucketPkgKinds(t, pkger.EncodingJSON),
},
expectedStatusCode: http.StatusBadRequest,
},
@@ -395,7 +426,15 @@ func TestPkgerHTTPServer(t *testing.T) {
for _, tt := range tests {
fn := func(t *testing.T) {
svc := &fakeSVC{
dryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error) {
dryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error) {
var opt pkger.ApplyOpt
for _, o := range opts {
o(&opt)
}
pkg, err := pkger.Combine(opt.Pkgs)
if err != nil {
return pkger.PkgImpactSummary{}, err
}
return pkger.PkgImpactSummary{
Summary: pkg.Summary(),
}, nil
@@ -419,11 +458,17 @@ func TestPkgerHTTPServer(t *testing.T) {

t.Run("apply a pkg", func(t *testing.T) {
svc := &fakeSVC{
applyFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error) {
applyFn: func(ctx context.Context, orgID, userID influxdb.ID, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error) {
var opt pkger.ApplyOpt
for _, o := range opts {
o(&opt)
}

pkg, err := pkger.Combine(opt.Pkgs)
if err != nil {
return pkger.PkgImpactSummary{}, err
}

sum := pkg.Summary()

var diff pkger.Diff
@@ -450,9 +495,9 @@ func TestPkgerHTTPServer(t *testing.T) {

testttp.
PostJSON(t, "/api/v2/packages/apply", pkger.ReqApplyPkg{
OrgID: influxdb.ID(9000).String(),
Secrets: map[string]string{"secret1": "val1"},
RawPkg: bucketPkgKinds(t, pkger.EncodingJSON),
OrgID: influxdb.ID(9000).String(),
Secrets: map[string]string{"secret1": "val1"},
RawTemplate: bucketPkgKinds(t, pkger.EncodingJSON),
}).
Do(svr).
ExpectStatus(http.StatusCreated).
@@ -760,7 +805,7 @@ func TestPkgerHTTPServer(t *testing.T) {
})
}

func bucketPkgKinds(t *testing.T, encoding pkger.Encoding) []byte {
func bucketPkgKinds(t *testing.T, encoding pkger.Encoding) pkger.ReqRawPkg {
t.Helper()

var pkgStr string
@@ -813,17 +858,21 @@ spec:

b, err := pkg.Encode(encoding)
require.NoError(t, err)
return b
return pkger.ReqRawPkg{
ContentType: encoding.String(),
Sources: pkg.Sources(),
Pkg: b,
}
}

func newReqApplyYMLBody(t *testing.T, orgID influxdb.ID, dryRun bool) *bytes.Buffer {
t.Helper()

var buf bytes.Buffer
err := yaml.NewEncoder(&buf).Encode(pkger.ReqApplyPkg{
DryRun: dryRun,
OrgID: orgID.String(),
RawPkg: bucketPkgKinds(t, pkger.EncodingYAML),
DryRun: dryRun,
OrgID: orgID.String(),
RawTemplate: bucketPkgKinds(t, pkger.EncodingYAML),
})
require.NoError(t, err)
return &buf
@@ -840,8 +889,8 @@ func decodeBody(t *testing.T, r io.Reader, v interface{}) {
type fakeSVC struct {
initStack func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error)
listStacksFn func(ctx context.Context, orgID influxdb.ID, filter pkger.ListFilter) ([]pkger.Stack, error)
dryRunFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error)
applyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error)
dryRunFn func(ctx context.Context, orgID, userID influxdb.ID, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error)
applyFn func(ctx context.Context, orgID, userID influxdb.ID, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error)
}

var _ pkger.SVC = (*fakeSVC)(nil)
@@ -872,19 +921,19 @@ func (f *fakeSVC) CreatePkg(ctx context.Context, setters ...pkger.CreatePkgSetFn
panic("not implemented")
}

func (f *fakeSVC) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error) {
func (f *fakeSVC) DryRun(ctx context.Context, orgID, userID influxdb.ID, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error) {
if f.dryRunFn == nil {
panic("not implemented")
}

return f.dryRunFn(ctx, orgID, userID, pkg, opts...)
return f.dryRunFn(ctx, orgID, userID, opts...)
}

func (f *fakeSVC) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error) {
func (f *fakeSVC) Apply(ctx context.Context, orgID, userID influxdb.ID, opts ...pkger.ApplyOptFn) (pkger.PkgImpactSummary, error) {
if f.applyFn == nil {
panic("not implemented")
}
return f.applyFn(ctx, orgID, userID, pkg, opts...)
return f.applyFn(ctx, orgID, userID, opts...)
}

func newMountedHandler(rh kithttp.ResourceHandler, userID influxdb.ID) chi.Router {
32 changes: 28 additions & 4 deletions pkger/parser.go
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ import (
"io"
"io/ioutil"
"net/http"
"net/url"
"regexp"
"sort"
"strconv"
@@ -96,21 +97,38 @@ func Parse(encoding Encoding, readerFn ReaderFn, opts ...ValidateOptFn) (*Pkg, e
// FromFile reads a file from disk and provides a reader from it.
func FromFile(filePath string) ReaderFn {
return func() (io.Reader, string, error) {
u, err := url.Parse(filePath)
if err != nil {
return nil, filePath, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "invalid filepath provided",
Err: err,
}
}
if u.Scheme == "" {
u.Scheme = "file"
}

// not using os.Open to avoid having to deal with closing the file in here
b, err := ioutil.ReadFile(filePath)
b, err := ioutil.ReadFile(u.Path)
if err != nil {
return nil, filePath, err
}
return bytes.NewBuffer(b), filePath, nil

return bytes.NewBuffer(b), u.String(), nil
}
}

// FromReader simply passes the reader along. Useful when consuming
// this from an HTTP request body. There are a number of other useful
// places for this functional input.
func FromReader(r io.Reader) ReaderFn {
func FromReader(r io.Reader, sources ...string) ReaderFn {
return func() (io.Reader, string, error) {
return r, "byte stream", nil
source := "byte stream"
if len(sources) > 0 {
source = formatSources(sources)
}
return r, source, nil
}
}

@@ -336,6 +354,12 @@ func (p *Pkg) Encode(encoding Encoding) ([]byte, error) {
return buf.Bytes(), nil
}

func (p *Pkg) Sources() []string {
// note: we prevent the internal field from being changed by enabling access
// to the sources via the exported method here.
return p.sources
}

// Summary returns a package Summary that describes all the resources and
// associations the pkg contains. It is very useful for informing users of
// the changes that will take place when this pkg would be applied.
10 changes: 9 additions & 1 deletion pkger/parser_test.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package pkger
import (
"errors"
"fmt"
"net/url"
"path/filepath"
"sort"
"strconv"
@@ -4255,7 +4256,14 @@ func nextField(t *testing.T, field string) (string, int) {

func validParsedPkgFromFile(t *testing.T, path string, encoding Encoding) *Pkg {
t.Helper()
return newParsedPkg(t, FromFile(path), encoding)

pkg := newParsedPkg(t, FromFile(path), encoding)
u := url.URL{
Scheme: "file",
Path: path,
}
require.Equal(t, []string{u.String()}, pkg.Sources())
return pkg
}

func newParsedPkg(t *testing.T, fn ReaderFn, encoding Encoding, opts ...ValidateOptFn) *Pkg {
67 changes: 40 additions & 27 deletions pkger/service.go
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ type (
OrgID influxdb.ID `json:"orgID"`
Name string `json:"name"`
Description string `json:"description"`
Sources []string `json:"sources"`
URLs []string `json:"urls"`
Resources []StackResource `json:"resources"`

@@ -67,8 +68,8 @@ type SVC interface {
ListStacks(ctx context.Context, orgID influxdb.ID, filter ListFilter) ([]Stack, error)

CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*Pkg, error)
DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (PkgImpactSummary, error)
Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (PkgImpactSummary, error)
DryRun(ctx context.Context, orgID, userID influxdb.ID, opts ...ApplyOptFn) (PkgImpactSummary, error)
Apply(ctx context.Context, orgID, userID influxdb.ID, opts ...ApplyOptFn) (PkgImpactSummary, error)
}

// SVCMiddleware is a service middleware func.
@@ -873,6 +874,7 @@ func (s *Service) filterOrgResourceKinds(resourceKindFilters []Kind) []struct {

// PkgImpactSummary represents the impact the application of a pkg will have on the system.
type PkgImpactSummary struct {
Sources []string
StackID influxdb.ID
Diff Diff
Summary Summary
@@ -881,18 +883,11 @@ type PkgImpactSummary struct {
// DryRun provides a dry run of the pkg application. The pkg will be marked verified
// for later calls to Apply. This func will be run on an Apply if it has not been run
// already.
func (s *Service) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (PkgImpactSummary, error) {
func (s *Service) DryRun(ctx context.Context, orgID, userID influxdb.ID, opts ...ApplyOptFn) (PkgImpactSummary, error) {
opt := applyOptFromOptFns(opts...)

if opt.StackID != 0 {
remotePkgs, err := s.getStackRemotePackages(ctx, opt.StackID)
if err != nil {
return PkgImpactSummary{}, err
}
pkg, err = Combine(append(remotePkgs, pkg), ValidWithoutResources())
if err != nil {
return PkgImpactSummary{}, err
}
pkg, err := s.pkgFromApplyOpts(ctx, opt)
if err != nil {
return PkgImpactSummary{}, err
}

state, err := s.dryRun(ctx, orgID, pkg, opt)
@@ -901,6 +896,7 @@ func (s *Service) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pk
}

return PkgImpactSummary{
Sources: pkg.sources,
StackID: opt.StackID,
Diff: state.diff(),
Summary: newSummaryFromStatePkg(state, pkg),
@@ -1330,6 +1326,7 @@ func (s *Service) addStackState(ctx context.Context, stackID influxdb.ID, state

// ApplyOpt is an option for applying a package.
type ApplyOpt struct {
Pkgs []*Pkg
EnvRefs map[string]string
MissingSecrets map[string]string
StackID influxdb.ID
@@ -1345,6 +1342,13 @@ func ApplyWithEnvRefs(envRefs map[string]string) ApplyOptFn {
}
}

// ApplyWithPkg provides a pkg to the application/dry run.
func ApplyWithPkg(pkg *Pkg) ApplyOptFn {
return func(opt *ApplyOpt) {
opt.Pkgs = append(opt.Pkgs, pkg)
}
}

// ApplyWithSecrets provides secrets to the platform that the pkg will need.
func ApplyWithSecrets(secrets map[string]string) ApplyOptFn {
return func(o *ApplyOpt) {
@@ -1370,19 +1374,12 @@ func applyOptFromOptFns(opts ...ApplyOptFn) ApplyOpt {
// Apply will apply all the resources identified in the provided pkg. The entire pkg will be applied
// in its entirety. If a failure happens midway then the entire pkg will be rolled back to the state
// from before the pkg were applied.
func (s *Service) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (impact PkgImpactSummary, e error) {
func (s *Service) Apply(ctx context.Context, orgID, userID influxdb.ID, opts ...ApplyOptFn) (impact PkgImpactSummary, e error) {
opt := applyOptFromOptFns(opts...)

if opt.StackID != 0 {
remotePkgs, err := s.getStackRemotePackages(ctx, opt.StackID)
if err != nil {
return PkgImpactSummary{}, err
}

pkg, err = Combine(append(remotePkgs, pkg), ValidWithoutResources())
if err != nil {
return PkgImpactSummary{}, err
}
pkg, err := s.pkgFromApplyOpts(ctx, opt)
if err != nil {
return PkgImpactSummary{}, err
}

if err := pkg.Validate(ValidWithoutResources()); err != nil {
@@ -1413,7 +1410,9 @@ func (s *Service) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg
if e != nil {
updateStackFn = s.updateStackAfterRollback
}
if err := updateStackFn(ctx, stackID, state); err != nil {

err := updateStackFn(ctx, stackID, state, pkg.Sources())
if err != nil {
s.log.Error("failed to update stack", zap.Error(err))
}
}(stackID)
@@ -1429,6 +1428,7 @@ func (s *Service) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg
pkg.applySecrets(opt.MissingSecrets)

return PkgImpactSummary{
Sources: pkg.sources,
StackID: stackID,
Diff: state.diff(),
Summary: newSummaryFromStatePkg(state, pkg),
@@ -2867,6 +2867,18 @@ func (s *Service) rollbackLabelMappings(ctx context.Context, mappings []stateLab
return nil
}

func (s *Service) pkgFromApplyOpts(ctx context.Context, opt ApplyOpt) (*Pkg, error) {
if opt.StackID != 0 {
remotePkgs, err := s.getStackRemotePackages(ctx, opt.StackID)
if err != nil {
return nil, err
}
opt.Pkgs = append(opt.Pkgs, remotePkgs...)
}

return Combine(opt.Pkgs, ValidWithoutResources())
}

func (s *Service) getStackRemotePackages(ctx context.Context, stackID influxdb.ID) ([]*Pkg, error) {
stack, err := s.store.ReadStackByID(ctx, stackID)
if err != nil {
@@ -2908,7 +2920,7 @@ func (s *Service) getStackRemotePackages(ctx context.Context, stackID influxdb.I
return remotePkgs, nil
}

func (s *Service) updateStackAfterSuccess(ctx context.Context, stackID influxdb.ID, state *stateCoordinator) error {
func (s *Service) updateStackAfterSuccess(ctx context.Context, stackID influxdb.ID, state *stateCoordinator, sources []string) error {
stack, err := s.store.ReadStackByID(ctx, stackID)
if err != nil {
return err
@@ -3035,11 +3047,12 @@ func (s *Service) updateStackAfterSuccess(ctx context.Context, stackID influxdb.
}
stack.Resources = stackResources

stack.Sources = sources
stack.UpdatedAt = time.Now()
return s.store.UpdateStack(ctx, stack)
}

func (s *Service) updateStackAfterRollback(ctx context.Context, stackID influxdb.ID, state *stateCoordinator) error {
func (s *Service) updateStackAfterRollback(ctx context.Context, stackID influxdb.ID, state *stateCoordinator, sources []string) error {
stack, err := s.store.ReadStackByID(ctx, stackID)
if err != nil {
return err
8 changes: 4 additions & 4 deletions pkger/service_auth.go
Original file line number Diff line number Diff line change
@@ -64,10 +64,10 @@ func (s *authMW) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*Pkg
return s.next.CreatePkg(ctx, setters...)
}

func (s *authMW) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (PkgImpactSummary, error) {
return s.next.DryRun(ctx, orgID, userID, pkg, opts...)
func (s *authMW) DryRun(ctx context.Context, orgID, userID influxdb.ID, opts ...ApplyOptFn) (PkgImpactSummary, error) {
return s.next.DryRun(ctx, orgID, userID, opts...)
}

func (s *authMW) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (PkgImpactSummary, error) {
return s.next.Apply(ctx, orgID, userID, pkg, opts...)
func (s *authMW) Apply(ctx context.Context, orgID, userID influxdb.ID, opts ...ApplyOptFn) (PkgImpactSummary, error) {
return s.next.Apply(ctx, orgID, userID, opts...)
}
8 changes: 4 additions & 4 deletions pkger/service_logging.go
Original file line number Diff line number Diff line change
@@ -113,7 +113,7 @@ func (s *loggingMW) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (p
return s.next.CreatePkg(ctx, setters...)
}

func (s *loggingMW) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (impact PkgImpactSummary, err error) {
func (s *loggingMW) DryRun(ctx context.Context, orgID, userID influxdb.ID, opts ...ApplyOptFn) (impact PkgImpactSummary, err error) {
defer func(start time.Time) {
dur := zap.Duration("took", time.Since(start))
if err != nil {
@@ -138,10 +138,10 @@ func (s *loggingMW) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *
fields = append(fields, dur)
s.logger.Info("pkg dry run successful", fields...)
}(time.Now())
return s.next.DryRun(ctx, orgID, userID, pkg, opts...)
return s.next.DryRun(ctx, orgID, userID, opts...)
}

func (s *loggingMW) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (impact PkgImpactSummary, err error) {
func (s *loggingMW) Apply(ctx context.Context, orgID, userID influxdb.ID, opts ...ApplyOptFn) (impact PkgImpactSummary, err error) {
defer func(start time.Time) {
dur := zap.Duration("took", time.Since(start))
if err != nil {
@@ -163,7 +163,7 @@ func (s *loggingMW) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *P
fields = append(fields, dur)
s.logger.Info("pkg apply successful", fields...)
}(time.Now())
return s.next.Apply(ctx, orgID, userID, pkg, opts...)
return s.next.Apply(ctx, orgID, userID, opts...)
}

func (s *loggingMW) summaryLogFields(sum Summary) []zap.Field {
8 changes: 4 additions & 4 deletions pkger/service_metrics.go
Original file line number Diff line number Diff line change
@@ -56,14 +56,14 @@ func (s *mwMetrics) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*
return pkg, rec(err)
}

func (s *mwMetrics) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (PkgImpactSummary, error) {
func (s *mwMetrics) DryRun(ctx context.Context, orgID, userID influxdb.ID, opts ...ApplyOptFn) (PkgImpactSummary, error) {
rec := s.rec.Record("dry_run")
impact, err := s.next.DryRun(ctx, orgID, userID, pkg, opts...)
impact, err := s.next.DryRun(ctx, orgID, userID, opts...)
return impact, rec(err)
}

func (s *mwMetrics) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (PkgImpactSummary, error) {
func (s *mwMetrics) Apply(ctx context.Context, orgID, userID influxdb.ID, opts ...ApplyOptFn) (PkgImpactSummary, error) {
rec := s.rec.Record("apply")
impact, err := s.next.Apply(ctx, orgID, userID, pkg, opts...)
impact, err := s.next.Apply(ctx, orgID, userID, opts...)
return impact, rec(err)
}
64 changes: 32 additions & 32 deletions pkger/service_test.go
Original file line number Diff line number Diff line change
@@ -95,7 +95,7 @@ func TestService(t *testing.T) {
}
svc := newTestService(WithBucketSVC(fakeBktSVC))

impact, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, pkg)
impact, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, ApplyWithPkg(pkg))
require.NoError(t, err)

require.Len(t, impact.Diff.Buckets, 2)
@@ -130,7 +130,7 @@ func TestService(t *testing.T) {
}
svc := newTestService(WithBucketSVC(fakeBktSVC))

impact, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, pkg)
impact, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, ApplyWithPkg(pkg))
require.NoError(t, err)

require.Len(t, impact.Diff.Buckets, 2)
@@ -171,7 +171,7 @@ func TestService(t *testing.T) {

svc := newTestService(WithCheckSVC(fakeCheckSVC))

impact, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, pkg)
impact, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, ApplyWithPkg(pkg))
require.NoError(t, err)

checks := impact.Diff.Checks
@@ -209,7 +209,7 @@ func TestService(t *testing.T) {
}
svc := newTestService(WithLabelSVC(fakeLabelSVC))

impact, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, pkg)
impact, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, ApplyWithPkg(pkg))
require.NoError(t, err)

require.Len(t, impact.Diff.Labels, 3)
@@ -250,7 +250,7 @@ func TestService(t *testing.T) {
}
svc := newTestService(WithLabelSVC(fakeLabelSVC))

impact, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, pkg)
impact, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, ApplyWithPkg(pkg))
require.NoError(t, err)

labels := impact.Diff.Labels
@@ -299,7 +299,7 @@ func TestService(t *testing.T) {

svc := newTestService(WithNotificationEndpointSVC(fakeEndpointSVC))

impact, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, pkg)
impact, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, ApplyWithPkg(pkg))
require.NoError(t, err)

require.Len(t, impact.Diff.NotificationEndpoints, 5)
@@ -367,7 +367,7 @@ func TestService(t *testing.T) {

svc := newTestService(WithNotificationEndpointSVC(fakeEndpointSVC))

impact, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, pkg)
impact, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, ApplyWithPkg(pkg))
require.NoError(t, err)

require.Len(t, impact.Diff.NotificationRules, 1)
@@ -403,7 +403,7 @@ func TestService(t *testing.T) {
}
svc := newTestService(WithSecretSVC(fakeSecretSVC))

impact, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, pkg)
impact, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, ApplyWithPkg(pkg))
require.NoError(t, err)

assert.Equal(t, []string{"routing-key"}, impact.Summary.MissingSecrets)
@@ -424,7 +424,7 @@ func TestService(t *testing.T) {
}
svc := newTestService(WithVariableSVC(fakeVarSVC))

impact, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, pkg)
impact, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, ApplyWithPkg(pkg))
require.NoError(t, err)

variables := impact.Diff.Variables
@@ -492,7 +492,7 @@ func TestService(t *testing.T) {

orgID := influxdb.ID(9000)

impact, err := svc.Apply(context.TODO(), orgID, 0, pkg)
impact, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.NoError(t, err)

sum := impact.Summary
@@ -544,7 +544,7 @@ func TestService(t *testing.T) {

svc := newTestService(WithBucketSVC(fakeBktSVC))

impact, err := svc.Apply(context.TODO(), orgID, 0, pkg)
impact, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.NoError(t, err)

sum := impact.Summary
@@ -584,7 +584,7 @@ func TestService(t *testing.T) {

orgID := influxdb.ID(9000)

_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.Error(t, err)

assert.GreaterOrEqual(t, fakeBktSVC.DeleteBucketCalls.Count(), 1)
@@ -605,7 +605,7 @@ func TestService(t *testing.T) {

orgID := influxdb.ID(9000)

impact, err := svc.Apply(context.TODO(), orgID, 0, pkg)
impact, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.NoError(t, err)

sum := impact.Summary
@@ -647,7 +647,7 @@ func TestService(t *testing.T) {

orgID := influxdb.ID(9000)

_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.Error(t, err)

assert.GreaterOrEqual(t, fakeCheckSVC.DeleteCheckCalls.Count(), 1)
@@ -672,7 +672,7 @@ func TestService(t *testing.T) {

orgID := influxdb.ID(9000)

impact, err := svc.Apply(context.TODO(), orgID, 0, pkg)
impact, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.NoError(t, err)

sum := impact.Summary
@@ -705,7 +705,7 @@ func TestService(t *testing.T) {

orgID := influxdb.ID(9000)

_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.Error(t, err)

assert.GreaterOrEqual(t, fakeLabelSVC.DeleteLabelCalls.Count(), 1)
@@ -760,7 +760,7 @@ func TestService(t *testing.T) {

svc := newTestService(WithLabelSVC(fakeLabelSVC))

impact, err := svc.Apply(context.TODO(), orgID, 0, pkg)
impact, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.NoError(t, err)

sum := impact.Summary
@@ -797,7 +797,7 @@ func TestService(t *testing.T) {

orgID := influxdb.ID(9000)

impact, err := svc.Apply(context.TODO(), orgID, 0, pkg)
impact, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.NoError(t, err)

sum := impact.Summary
@@ -839,7 +839,7 @@ func TestService(t *testing.T) {

orgID := influxdb.ID(9000)

_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.Error(t, err)

assert.True(t, deletedDashs[1])
@@ -874,7 +874,7 @@ func TestService(t *testing.T) {

orgID := influxdb.ID(9000)

_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.NoError(t, err)

assert.Equal(t, numExpected, fakeLabelSVC.CreateLabelMappingCalls.Count())
@@ -910,7 +910,7 @@ func TestService(t *testing.T) {

orgID := influxdb.ID(9000)

_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.Error(t, err)

assert.GreaterOrEqual(t, fakeLabelSVC.DeleteLabelMappingCalls.Count(), killCount)
@@ -1106,7 +1106,7 @@ func TestService(t *testing.T) {

orgID := influxdb.ID(9000)

impact, err := svc.Apply(context.TODO(), orgID, 0, pkg)
impact, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.NoError(t, err)

sum := impact.Summary
@@ -1155,7 +1155,7 @@ func TestService(t *testing.T) {

orgID := influxdb.ID(9000)

_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.Error(t, err)

assert.GreaterOrEqual(t, fakeEndpointSVC.DeleteNotificationEndpointCalls.Count(), 3)
@@ -1184,7 +1184,7 @@ func TestService(t *testing.T) {

orgID := influxdb.ID(9000)

impact, err := svc.Apply(context.TODO(), orgID, 0, pkg)
impact, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.NoError(t, err)

sum := impact.Summary
@@ -1227,7 +1227,7 @@ func TestService(t *testing.T) {

orgID := influxdb.ID(9000)

_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.Error(t, err)

assert.Equal(t, 1, fakeRuleStore.DeleteNotificationRuleCalls.Count())
@@ -1261,7 +1261,7 @@ func TestService(t *testing.T) {

svc := newTestService(WithTaskSVC(fakeTaskSVC))

impact, err := svc.Apply(context.TODO(), orgID, 0, pkg)
impact, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.NoError(t, err)

sum := impact.Summary
@@ -1294,7 +1294,7 @@ func TestService(t *testing.T) {

orgID := influxdb.ID(9000)

_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.Error(t, err)

assert.Equal(t, 1, fakeTaskSVC.DeleteTaskCalls.Count())
@@ -1315,7 +1315,7 @@ func TestService(t *testing.T) {

svc := newTestService(WithTelegrafSVC(fakeTeleSVC))

impact, err := svc.Apply(context.TODO(), orgID, 0, pkg)
impact, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.NoError(t, err)

sum := impact.Summary
@@ -1348,7 +1348,7 @@ func TestService(t *testing.T) {

orgID := influxdb.ID(9000)

_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.Error(t, err)

assert.Equal(t, 1, fakeTeleSVC.DeleteTelegrafConfigCalls.Count())
@@ -1369,7 +1369,7 @@ func TestService(t *testing.T) {

orgID := influxdb.ID(9000)

impact, err := svc.Apply(context.TODO(), orgID, 0, pkg)
impact, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.NoError(t, err)

sum := impact.Summary
@@ -1404,7 +1404,7 @@ func TestService(t *testing.T) {

orgID := influxdb.ID(9000)

_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.Error(t, err)

assert.GreaterOrEqual(t, fakeVarSVC.DeleteVariableCalls.Count(), 1)
@@ -1445,7 +1445,7 @@ func TestService(t *testing.T) {

svc := newTestService(WithVariableSVC(fakeVarSVC))

impact, err := svc.Apply(context.TODO(), orgID, 0, pkg)
impact, err := svc.Apply(context.TODO(), orgID, 0, ApplyWithPkg(pkg))
require.NoError(t, err)

sum := impact.Summary
8 changes: 4 additions & 4 deletions pkger/service_tracing.go
Original file line number Diff line number Diff line change
@@ -59,16 +59,16 @@ func (s *traceMW) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (pkg
return s.next.CreatePkg(ctx, setters...)
}

func (s *traceMW) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (PkgImpactSummary, error) {
func (s *traceMW) DryRun(ctx context.Context, orgID, userID influxdb.ID, opts ...ApplyOptFn) (PkgImpactSummary, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
span.LogKV("orgID", orgID.String(), "userID", userID.String())
defer span.Finish()
return s.next.DryRun(ctx, orgID, userID, pkg, opts...)
return s.next.DryRun(ctx, orgID, userID, opts...)
}

func (s *traceMW) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (PkgImpactSummary, error) {
func (s *traceMW) Apply(ctx context.Context, orgID, userID influxdb.ID, opts ...ApplyOptFn) (PkgImpactSummary, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
span.LogKV("orgID", orgID.String(), "userID", userID.String())
defer span.Finish()
return s.next.Apply(ctx, orgID, userID, pkg, opts...)
return s.next.Apply(ctx, orgID, userID, opts...)
}
3 changes: 3 additions & 0 deletions pkger/store.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ type (

Name string `json:"name"`
Description string `json:"description"`
Sources []string `json:"sources,omitempty"`
URLs []string `json:"urls,omitempty"`
Resources []entStackResource `json:"resources,omitempty"`

@@ -292,6 +293,7 @@ func convertStackToEnt(stack Stack) (kv.Entity, error) {
Description: stack.Description,
CreatedAt: stack.CreatedAt,
UpdatedAt: stack.UpdatedAt,
Sources: stack.Sources,
URLs: stack.URLs,
}

@@ -323,6 +325,7 @@ func convertStackEntToStack(ent *entStack) (Stack, error) {
stack := Stack{
Name: ent.Name,
Description: ent.Description,
Sources: ent.Sources,
URLs: ent.URLs,
CRUDLog: influxdb.CRUDLog{
CreatedAt: ent.CreatedAt,
10 changes: 6 additions & 4 deletions pkger/store_test.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,10 @@ func TestStoreKV(t *testing.T) {

stackStub := func(id, orgID influxdb.ID) pkger.Stack {
now := time.Time{}.Add(10 * 365 * 24 * time.Hour)
urls := []string{
"http://example.com",
"http://abc.gov",
}
return pkger.Stack{
ID: id,
OrgID: orgID,
@@ -26,10 +30,8 @@ func TestStoreKV(t *testing.T) {
CreatedAt: now,
UpdatedAt: now.Add(time.Hour),
},
URLs: []string{
"http://example.com",
"http://abc.gov",
},
Sources: urls,
URLs: urls,
Resources: []pkger.StackResource{
{
APIVersion: pkger.APIVersion,

0 comments on commit b977568

Please sign in to comment.