diff --git a/CHANGELOG.md b/CHANGELOG.md index d4642f505c2..451449288b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ 1. [16312](https://github.com/influxdata/influxdb/pull/16312): Add support for notification rule pkger export functionality 1. [16320](https://github.com/influxdata/influxdb/pull/16320): Add support for tasks to pkger parser 1. [16322](https://github.com/influxdata/influxdb/pull/16322): Add support for tasks to pkger dry run functionality +1. [16323](https://github.com/influxdata/influxdb/pull/16323): Add support for tasks to pkger apply functionality ### Bug Fixes diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 716fbefd444..3ead3021480 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -838,8 +838,9 @@ func (m *Launcher) run(ctx context.Context) (err error) { b := m.apibackend authedOrgSVC := authorizer.NewOrgService(b.OrganizationService) authedURMSVC := authorizer.NewURMService(b.OrgLookupService, b.UserResourceMappingService) + pkgerLogger := m.log.With(zap.String("service", "pkger")) pkgSVC = pkger.NewService( - pkger.WithLogger(m.log.With(zap.String("service", "pkger"))), + pkger.WithLogger(pkgerLogger), pkger.WithBucketSVC(authorizer.NewBucketService(b.BucketService)), pkger.WithCheckSVC(authorizer.NewCheckService(b.CheckService, authedURMSVC, authedOrgSVC)), pkger.WithDashboardSVC(authorizer.NewDashboardService(b.DashboardService)), @@ -847,6 +848,7 @@ func (m *Launcher) run(ctx context.Context) (err error) { pkger.WithNotificationEndpointSVC(authorizer.NewNotificationEndpointService(b.NotificationEndpointService, authedURMSVC, authedOrgSVC)), pkger.WithNotificationRuleSVC(authorizer.NewNotificationRuleStore(b.NotificationRuleStore, authedURMSVC, authedOrgSVC)), pkger.WithSecretSVC(authorizer.NewSecretService(b.SecretService)), + pkger.WithTaskSVC(authorizer.NewTaskService(pkgerLogger, b.TaskService)), pkger.WithTelegrafSVC(authorizer.NewTelegrafConfigService(b.TelegrafService, b.UserResourceMappingService)), pkger.WithVariableSVC(authorizer.NewVariableService(b.VariableService)), ) diff --git a/cmd/influxd/launcher/launcher_helpers.go b/cmd/influxd/launcher/launcher_helpers.go index fafd77c717b..3c64a95b519 100644 --- a/cmd/influxd/launcher/launcher_helpers.go +++ b/cmd/influxd/launcher/launcher_helpers.go @@ -362,6 +362,10 @@ func (tl *TestLauncher) PkgerService(tb testing.TB) pkger.SVC { return &http.PkgerService{Client: tl.HTTPClient(tb)} } +func (tl *TestLauncher) TaskServiceKV() platform.TaskService { + return tl.kvService +} + func (tl *TestLauncher) TelegrafService(tb testing.TB) *http.TelegrafService { tb.Helper() return http.NewTelegrafService(tl.HTTPClient(tb)) diff --git a/cmd/influxd/launcher/pkger_test.go b/cmd/influxd/launcher/pkger_test.go index 880a7f1f466..9d1a3827e9a 100644 --- a/cmd/influxd/launcher/pkger_test.go +++ b/cmd/influxd/launcher/pkger_test.go @@ -49,6 +49,7 @@ func TestLauncher_Pkger(t *testing.T) { }), pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)), pkger.WithNotificationRuleSVC(l.NotificationRuleService()), + pkger.WithTaskSVC(l.TaskServiceKV()), pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService(t)), ) @@ -88,6 +89,12 @@ func TestLauncher_Pkger(t *testing.T) { require.NoError(t, err) assert.Empty(t, rules) + tasks, _, err := l.TaskServiceKV().FindTasks(ctx, influxdb.TaskFilter{ + OrganizationID: &l.Org.ID, + }) + require.NoError(t, err) + assert.Empty(t, tasks) + teles, _, err := l.TelegrafService(t).FindTelegrafConfigs(ctx, influxdb.TelegrafConfigFilter{ OrgID: &l.Org.ID, }) @@ -258,6 +265,12 @@ func TestLauncher_Pkger(t *testing.T) { assert.Equal(t, "http_none_auth_notification_endpoint", rule.EndpointName) assert.Equal(t, "http", rule.EndpointType) + require.Len(t, sum1.Tasks, 1) + task := sum1.Tasks[0] + assert.NotZero(t, task.ID) + assert.Equal(t, "task_1", task.Name) + assert.Equal(t, "desc_1", task.Description) + teles := sum1.TelegrafConfigs require.Len(t, teles, 1) assert.NotZero(t, teles[0].TelegrafConfig.ID) @@ -506,6 +519,7 @@ spec: pkger.WithDashboardSVC(l.DashboardService(t)), pkger.WithLabelSVC(l.LabelService(t)), pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)), + pkger.WithTaskSVC(l.TaskServiceKV()), pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService(t)), ) @@ -714,11 +728,7 @@ spec: cron: 15 * * * * query: > from(bucket: "rucket_1") - |> range(start: v.timeRangeStart, stop: v.timeRangeStop) - |> filter(fn: (r) => r._measurement == "cpu") - |> filter(fn: (r) => r._field == "usage_idle") - |> aggregateWindow(every: 1m, fn: mean) - |> yield(name: "mean") + |> yield() associations: - kind: Label name: label_1 diff --git a/http/swagger.yml b/http/swagger.yml index 1693524d8ad..97238aaa927 100644 --- a/http/swagger.yml +++ b/http/swagger.yml @@ -7321,6 +7321,27 @@ components: type: array items: $ref: "#/components/schemas/PkgSummaryLabel" + tasks: + type: array + items: + type: object + properties: + id: + type: string + name: + type: string + cron: + type: string + description: + type: string + every: + type: string + offset: + type: string + query: + type: string + status: + type: string telegrafConfigs: type: array items: diff --git a/pkger/models.go b/pkger/models.go index 26224b6d87f..190d3a127d8 100644 --- a/pkger/models.go +++ b/pkger/models.go @@ -754,6 +754,7 @@ type SummaryLabelMapping struct { // SummaryTask provides a summary of a task. type SummaryTask struct { + ID SafeID `json:"id"` Name string `json:"name"` Cron string `json:"cron"` Description string `json:"description"` @@ -1808,6 +1809,8 @@ const ( ) type task struct { + id influxdb.ID + orgID influxdb.ID name string cron string description string @@ -1819,6 +1822,10 @@ type task struct { labels sortedLabels } +func (t *task) ID() influxdb.ID { + return t.id +} + func (t *task) Name() string { return t.name } @@ -1834,8 +1841,27 @@ func (t *task) Status() influxdb.Status { return influxdb.Status(t.status) } +func (t *task) flux() string { + taskOpts := []string{fmt.Sprintf("name: %q", t.name)} + if t.cron != "" { + taskOpts = append(taskOpts, fmt.Sprintf("cron: %q", t.cron)) + } + if t.every > 0 { + taskOpts = append(taskOpts, fmt.Sprintf("every: %s", t.every)) + } + if t.offset > 0 { + taskOpts = append(taskOpts, fmt.Sprintf("offset: %s", t.offset)) + } + // this is required by the API, super nasty. Will be super challenging for + // anyone outside org to figure out how to do this within an hour of looking + // at the API :sadpanda:. Would be ideal to let the API translate the arguments + // into this required form instead of forcing that complexity on the caller. + return fmt.Sprintf("option task = { %s }\n%s", strings.Join(taskOpts, ", "), t.query) +} + func (t *task) summarize() SummaryTask { return SummaryTask{ + ID: SafeID(t.ID()), Name: t.Name(), Cron: t.cron, Description: t.description, diff --git a/pkger/service.go b/pkger/service.go index 7e0576aa467..ff34ebf811d 100644 --- a/pkger/service.go +++ b/pkger/service.go @@ -34,6 +34,7 @@ type serviceOpt struct { endpointSVC influxdb.NotificationEndpointService ruleSVC influxdb.NotificationRuleStore secretSVC influxdb.SecretService + taskSVC influxdb.TaskService teleSVC influxdb.TelegrafConfigStore varSVC influxdb.VariableService @@ -99,6 +100,13 @@ func WithSecretSVC(secretSVC influxdb.SecretService) ServiceSetterFn { } } +// WithTelegrafSVC sets the telegraf service. +func WithTaskSVC(taskSVC influxdb.TaskService) ServiceSetterFn { + return func(opt *serviceOpt) { + opt.taskSVC = taskSVC + } +} + // WithTelegrafSVC sets the telegraf service. func WithTelegrafSVC(telegrafSVC influxdb.TelegrafConfigStore) ServiceSetterFn { return func(opt *serviceOpt) { @@ -125,6 +133,7 @@ type Service struct { endpointSVC influxdb.NotificationEndpointService ruleSVC influxdb.NotificationRuleStore secretSVC influxdb.SecretService + taskSVC influxdb.TaskService teleSVC influxdb.TelegrafConfigStore varSVC influxdb.VariableService @@ -152,6 +161,7 @@ func NewService(opts ...ServiceSetterFn) *Service { endpointSVC: opt.endpointSVC, ruleSVC: opt.ruleSVC, secretSVC: opt.secretSVC, + taskSVC: opt.taskSVC, teleSVC: opt.teleSVC, varSVC: opt.varSVC, applyReqLimit: opt.applyReqLimit, @@ -1076,6 +1086,7 @@ func (s *Service) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg s.applyChecks(pkg.checks()), s.applyDashboards(pkg.dashboards()), s.applyNotificationEndpoints(pkg.notificationEndpoints()), + s.applyTasks(pkg.tasks()), s.applyTelegrafs(pkg.telegrafs()), }, } @@ -1681,6 +1692,55 @@ func (s *Service) rollbackNotificationRules(rules []*notificationRule) error { return nil } +func (s *Service) applyTasks(tasks []*task) applier { + const resource = "tasks" + + mutex := new(doMutex) + rollbackTasks := make([]task, 0, len(tasks)) + + createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody { + var t task + mutex.Do(func() { + tasks[i].orgID = orgID + t = *tasks[i] + }) + + newTask, err := s.taskSVC.CreateTask(ctx, influxdb.TaskCreate{ + Type: influxdb.TaskSystemType, + Flux: t.flux(), + OwnerID: userID, + Description: t.description, + Status: string(t.Status()), + OrganizationID: t.orgID, + }) + if err != nil { + return &applyErrBody{name: t.Name(), msg: err.Error()} + } + + mutex.Do(func() { + tasks[i].id = newTask.ID + rollbackTasks = append(rollbackTasks, *tasks[i]) + }) + + return nil + } + + return applier{ + creater: creater{ + entries: len(tasks), + fn: createFn, + }, + rollbacker: rollbacker{ + resource: resource, + fn: func() error { + return s.deleteByIDs("task", len(rollbackTasks), s.taskSVC.DeleteTask, func(i int) influxdb.ID { + return rollbackTasks[i].ID() + }) + }, + }, + } +} + func (s *Service) applyTelegrafs(teles []*telegraf) applier { const resource = "telegrafs"