From 8bdf788ffe778dd23f1ce9c5539a986411bbfeef Mon Sep 17 00:00:00 2001 From: Sandeep Bhardwaj Date: Mon, 14 Mar 2022 23:55:14 +0530 Subject: [PATCH 1/2] refactor: enable linter for init --- .golangci.yml | 1 - ext/datastore/bigquery/bigquery.go | 2 +- store/local/job_spec_adapter.go | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 2353f81396..a137b07a7b 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -27,7 +27,6 @@ linters: - gofumpt - godox - godot - - gochecknoinits - gci - forbidigo - errcheck diff --git a/ext/datastore/bigquery/bigquery.go b/ext/datastore/bigquery/bigquery.go index 34f721319a..1bbf7bd478 100644 --- a/ext/datastore/bigquery/bigquery.go +++ b/ext/datastore/bigquery/bigquery.go @@ -179,7 +179,7 @@ func (b *BigQuery) BackupResource(ctx context.Context, request models.BackupReso return backupTable(ctx, request, client) } -func init() { +func init() { //nolint:gochecknoinits if err := models.DatastoreRegistry.Add(This); err != nil { panic(err) } diff --git a/store/local/job_spec_adapter.go b/store/local/job_spec_adapter.go index 3633abf9b1..84f619868d 100644 --- a/store/local/job_spec_adapter.go +++ b/store/local/job_spec_adapter.go @@ -28,7 +28,7 @@ var ( ErrNotAMonthDuration = errors.New("invalid month string") ) -func init() { +func init() { //nolint:gochecknoinits _ = validator.SetValidationFunc("isCron", utils.CronIntervalValidator) } From 6b551e2f1185ffe6d0d69d4d098d1e9e6504fb37 Mon Sep 17 00:00:00 2001 From: Sandeep Bhardwaj Date: Tue, 15 Mar 2022 00:05:06 +0530 Subject: [PATCH 2/2] refactor: autofix gofumpt errors --- .golangci.yml | 18 +++--- api/handler/v1beta1/adapter.go | 2 +- api/handler/v1beta1/backup.go | 2 +- api/handler/v1beta1/replay.go | 4 +- api/handler/v1beta1/runtime.go | 5 -- api/handler/v1beta1/runtime_test.go | 6 +- cmd/admin_build_instance.go | 2 +- cmd/backup_create.go | 2 +- cmd/backup_status.go | 2 +- cmd/commands.go | 6 +- cmd/config.go | 2 +- cmd/deploy.go | 4 +- cmd/job_create.go | 4 +- cmd/job_hook.go | 2 +- cmd/job_render.go | 2 +- cmd/job_validate.go | 2 +- cmd/replay.go | 2 +- cmd/replay_create.go | 10 ++-- cmd/replay_list.go | 6 +- cmd/replay_status.go | 4 +- cmd/resource.go | 8 +-- cmd/server/server.go | 6 +- cmd/version.go | 2 +- core/gossip/server.go | 10 ++-- core/tree/multi_root_tree.go | 6 +- datastore/service.go | 8 +-- datastore/service_test.go | 12 ++-- ext/datastore/bigquery/adapter.go | 1 + ext/datastore/bigquery/bigquery_test.go | 6 +- ext/datastore/bigquery/dataset.go | 4 +- ext/datastore/bigquery/dataset_spec.go | 7 +-- ext/datastore/bigquery/mock.go | 2 +- ext/datastore/bigquery/table.go | 6 +- ext/datastore/bigquery/table_spec.go | 3 +- ext/datastore/bigquery/table_test.go | 28 ++++----- ext/notify/slack/slack.go | 4 +- ext/scheduler/airflow/airflow.go | 2 +- ext/scheduler/airflow2/airflow.go | 8 +-- ext/scheduler/airflow2/compiler/compiler.go | 4 +- ext/scheduler/prime/planner.go | 2 +- ext/scheduler/prime/scheduler.go | 2 +- extension/extension.go | 2 +- extension/manifest.go | 2 +- job/dependency_resolver_test.go | 54 +++++++++-------- job/priority_resolver.go | 5 +- job/priority_resolver_test.go | 20 ++++--- job/replay.go | 6 +- job/replay_manager.go | 12 ++-- job/replay_manager_test.go | 1 - job/replay_syncer_test.go | 2 +- job/replay_validator.go | 16 +++-- job/replay_validator_test.go | 2 +- job/service.go | 26 ++++---- job/service_test.go | 58 +++++++++--------- main.go | 4 +- mock/datastore.go | 7 +++ mock/replay.go | 2 +- mock/secret.go | 4 +- models/datastore.go | 2 - models/job.go | 6 +- models/plugin.go | 7 ++- models/project.go | 12 ++-- models/scheduler.go | 4 +- plugin/gen.go | 6 +- plugin/plugin.go | 2 +- run/context.go | 10 ++-- run/go_engine_test.go | 1 - run/service_test.go | 2 +- service/errors.go | 2 +- service/namespace_service.go | 4 +- service/secret_service.go | 4 +- store/local/job_spec_repository.go | 12 ++-- store/local/job_spec_repository_test.go | 60 +++++++++---------- store/local/resource_spec_repository.go | 8 +-- store/local/resource_spec_repository_test.go | 30 +++++----- store/postgres/adapter.go | 10 ++-- store/postgres/backup_repository_test.go | 2 +- store/postgres/job_spec_repository.go | 2 +- store/postgres/job_spec_repository_test.go | 14 ++--- store/postgres/namespace_repository_test.go | 10 ++-- store/postgres/postgres.go | 4 +- store/postgres/project_repository_test.go | 10 ++-- store/postgres/replay_repository.go | 2 +- store/postgres/replay_repository_test.go | 2 +- .../postgres/resource_spec_repository_test.go | 10 ++-- store/postgres/secret_repository_test.go | 14 ++--- store/store.go | 2 - utils/file.go | 2 +- utils/proto.go | 4 +- utils/uuid.go | 3 +- utils/validator.go | 2 +- 91 files changed, 340 insertions(+), 366 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index a137b07a7b..ca5049b498 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -24,7 +24,6 @@ linters: - promlinter - interfacer - gosec - - gofumpt - godox - godot - gci @@ -57,18 +56,21 @@ linters: - gocyclo - dupl linters-settings: + gofumpt: + lang-version: "1.17" + extra-rules: true revive: ignore-generated-header: true severity: warning gomnd: ignored-numbers: # Why we have a big range of file permissions - - '0644' - - '0655' - - '0666' - - '0770' - - '0755' - - '0765' - - '0777' + - '0o644' + - '0o655' + - '0o666' + - '0o770' + - '0o755' + - '0o765' + - '0o777' ignored-functions: - 'survey.MinLength' - 'survey.MaxLength' diff --git a/api/handler/v1beta1/adapter.go b/api/handler/v1beta1/adapter.go index ced0330ffc..85eb72c6d9 100644 --- a/api/handler/v1beta1/adapter.go +++ b/api/handler/v1beta1/adapter.go @@ -221,7 +221,7 @@ func (adapt *Adapter) ToJobProto(spec models.JobSpec) (*pb.JobSpecification, err }) } - //prep external dependencies for proto + // prep external dependencies for proto for _, httpDep := range spec.ExternalDependencies.HTTPDependencies { conf.Dependencies = append(conf.Dependencies, &pb.JobDependency{ HttpDependency: &pb.HttpDependency{ diff --git a/api/handler/v1beta1/backup.go b/api/handler/v1beta1/backup.go index 72e7d028b8..30ee637eda 100644 --- a/api/handler/v1beta1/backup.go +++ b/api/handler/v1beta1/backup.go @@ -38,7 +38,7 @@ func (sv *RuntimeServiceServer) BackupDryRun(ctx context.Context, req *pb.Backup } jobSpecs = append(jobSpecs, downstreamSpecs...) - //should add config + // should add config backupRequest := models.BackupRequest{ ResourceName: req.ResourceName, Project: namespaceSpec.ProjectSpec, diff --git a/api/handler/v1beta1/replay.go b/api/handler/v1beta1/replay.go index 039f980215..de3633fbdb 100644 --- a/api/handler/v1beta1/replay.go +++ b/api/handler/v1beta1/replay.go @@ -131,8 +131,8 @@ func (sv *RuntimeServiceServer) ListReplays(ctx context.Context, req *pb.ListRep }, nil } -func (sv *RuntimeServiceServer) parseReplayRequest(ctx context.Context, projectName string, namespace string, - jobName string, startDate string, endDate string, forceFlag bool, allowedDownstreams []string) (models.ReplayRequest, error) { +func (sv *RuntimeServiceServer) parseReplayRequest(ctx context.Context, projectName, namespace string, + jobName, startDate, endDate string, forceFlag bool, allowedDownstreams []string) (models.ReplayRequest, error) { namespaceSpec, err := sv.namespaceService.Get(ctx, projectName, namespace) if err != nil { return models.ReplayRequest{}, mapToGRPCErr(sv.l, err, "unable to get namespace") diff --git a/api/handler/v1beta1/runtime.go b/api/handler/v1beta1/runtime.go index 5a384028a1..f7f51a71a2 100644 --- a/api/handler/v1beta1/runtime.go +++ b/api/handler/v1beta1/runtime.go @@ -39,18 +39,13 @@ type JobEventService interface { type ProtoAdapter interface { FromJobProto(*pb.JobSpecification) (models.JobSpec, error) ToJobProto(models.JobSpec) (*pb.JobSpecification, error) - FromProjectProto(*pb.ProjectSpecification) models.ProjectSpec ToProjectProto(models.ProjectSpec) *pb.ProjectSpecification - FromNamespaceProto(specification *pb.NamespaceSpecification) models.NamespaceSpec ToNamespaceProto(spec models.NamespaceSpec) *pb.NamespaceSpecification - ToInstanceProto(models.InstanceSpec) (*pb.InstanceSpec, error) - FromResourceProto(res *pb.ResourceSpecification, storeName string) (models.ResourceSpec, error) ToResourceProto(res models.ResourceSpec) (*pb.ResourceSpecification, error) - ToReplayExecutionTreeNode(res *tree.TreeNode) (*pb.ReplayExecutionTreeNode, error) ToReplayStatusTreeNode(res *tree.TreeNode) (*pb.ReplayStatusTreeNode, error) } diff --git a/api/handler/v1beta1/runtime_test.go b/api/handler/v1beta1/runtime_test.go index 8575d286f7..84618cfef5 100644 --- a/api/handler/v1beta1/runtime_test.go +++ b/api/handler/v1beta1/runtime_test.go @@ -157,7 +157,8 @@ func TestRuntimeServiceServer(t *testing.T) { }, FileMap: map[string]string{ "query.sql": "select * from 1", - }}, nil) + }, + }, nil) defer instanceService.AssertExpectations(t) runtimeServiceServer := v1.NewRuntimeServiceServer( @@ -223,7 +224,8 @@ func TestRuntimeServiceServer(t *testing.T) { }, FileMap: map[string]string{ "query.sql": "select * from 1", - }}, nil) + }, + }, nil) defer instanceService.AssertExpectations(t) runtimeServiceServer := v1.NewRuntimeServiceServer( diff --git a/cmd/admin_build_instance.go b/cmd/admin_build_instance.go index 711ba5a966..08da202ebe 100644 --- a/cmd/admin_build_instance.go +++ b/cmd/admin_build_instance.go @@ -109,7 +109,7 @@ func getInstanceBuildRequest(l log.Logger, jobName, inputDirectory, host, projec } // make sure output dir exists - if err := os.MkdirAll(inputDirectory, 0777); err != nil { + if err := os.MkdirAll(inputDirectory, 0o777); err != nil { return fmt.Errorf("failed to create directory at %s: %w", inputDirectory, err) } writeToFileFn := utils.WriteStringToFileIndexed() diff --git a/cmd/backup_create.go b/cmd/backup_create.go index 934c3c5e6a..4bf9aab460 100644 --- a/cmd/backup_create.go +++ b/cmd/backup_create.go @@ -78,7 +78,7 @@ func backupCreateCommand(l log.Logger, datastoreRepo models.DatastoreRepo, conf return err } if dryRun { - //if only dry run, exit now + // if only dry run, exit now return nil } diff --git a/cmd/backup_status.go b/cmd/backup_status.go index fe0c0e3cf0..e9402253d2 100644 --- a/cmd/backup_status.go +++ b/cmd/backup_status.go @@ -86,7 +86,7 @@ func printBackupDetailResponse(l log.Logger, backupDetailResponse *pb.GetBackupR table.SetBorder(false) ttl := backupDetailResponse.Spec.Config[models.ConfigTTL] - var expiry = backupDetailResponse.Spec.CreatedAt.AsTime() + expiry := backupDetailResponse.Spec.CreatedAt.AsTime() if ttl != "" { ttlDuration, err := time.ParseDuration(ttl) if err != nil { diff --git a/cmd/commands.go b/cmd/commands.go index 395c6a2aa7..d568130c5f 100644 --- a/cmd/commands.go +++ b/cmd/commands.go @@ -61,10 +61,10 @@ type JobSpecRepository interface { // default output of logging should go to stdout // interactive output like progress bars should go to stderr // unless the stdout/err is a tty, colors/progressbar should be disabled -func New(plainLog log.Logger, jsonLog log.Logger, conf config.Optimus, pluginRepo models.PluginRepository, dsRepo models.DatastoreRepo) *cli.Command { +func New(plainLog, jsonLog log.Logger, conf config.Optimus, pluginRepo models.PluginRepository, dsRepo models.DatastoreRepo) *cli.Command { disableColoredOut = !isTerminal(os.Stdout) - var cmd = &cli.Command{ + cmd := &cli.Command{ Use: "optimus [flags]", Long: heredoc.Doc(` Optimus is an easy-to-use, reliable, and performant workflow orchestrator for @@ -111,7 +111,7 @@ func New(plainLog log.Logger, jsonLog log.Logger, conf config.Optimus, pluginRep cmdx.SetHelp(cmd) cmd.PersistentFlags().BoolVar(&disableColoredOut, "no-color", disableColoredOut, "Disable colored output") - //init local specs + // init local specs var jobSpecRepo JobSpecRepository jobSpecFs := afero.NewBasePathFs(afero.NewOsFs(), conf.Namespace.Job.Path) if conf.Namespace.Job.Path != "" { diff --git a/cmd/config.go b/cmd/config.go index b64eae0899..9069cf26d5 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -135,7 +135,7 @@ func configInitCommand(l log.Logger, dsRepo models.DatastoreRepo) *cli.Command { if err != nil { return err } - if err := ioutil.WriteFile(fmt.Sprintf("%s.%s", config.FileName, config.FileExtension), confMarshaled, 0655); err != nil { + if err := ioutil.WriteFile(fmt.Sprintf("%s.%s", config.FileName, config.FileExtension), confMarshaled, 0o655); err != nil { return err } l.Info(coloredSuccess("Configuration initialised successfully")) diff --git a/cmd/deploy.go b/cmd/deploy.go index e429538bb9..a822f329ea 100644 --- a/cmd/deploy.go +++ b/cmd/deploy.go @@ -76,9 +76,9 @@ func deployCommand(l log.Logger, conf config.Optimus, jobSpecRepo JobSpecReposit } // postDeploymentRequest send a deployment request to service -func postDeploymentRequest(l log.Logger, projectName string, namespaceName string, jobSpecRepo JobSpecRepository, +func postDeploymentRequest(l log.Logger, projectName, namespaceName string, jobSpecRepo JobSpecRepository, conf config.Optimus, pluginRepo models.PluginRepository, datastoreRepo models.DatastoreRepo, datastoreSpecFs map[string]afero.Fs, - ignoreJobDeployment, ignoreResources bool, verbose bool) (err error) { + ignoreJobDeployment, ignoreResources, verbose bool) (err error) { dialTimeoutCtx, dialCancel := context.WithTimeout(context.Background(), OptimusDialTimeout) defer dialCancel() diff --git a/cmd/job_create.go b/cmd/job_create.go index d4c0b2e1ed..0adaa13c60 100644 --- a/cmd/job_create.go +++ b/cmd/job_create.go @@ -151,7 +151,7 @@ func createJobSurvey(jobSpecRepo JobSpecRepository, pluginRepo models.PluginRepo return local.Job{}, errors.New("no supported task plugin found") } - var qs = []*survey.Question{ + qs := []*survey.Question{ { Name: "name", Prompt: &survey.Input{ @@ -357,7 +357,7 @@ func getWindowParameters(winName string) local.JobTaskWindow { } } - //default + // default return local.JobTaskWindow{ Size: "24h", Offset: "0", diff --git a/cmd/job_hook.go b/cmd/job_hook.go index 5cc8f5d85a..d70eae4eec 100644 --- a/cmd/job_hook.go +++ b/cmd/job_hook.go @@ -52,7 +52,7 @@ func createHookSurvey(jobSpec models.JobSpec, pluginRepo models.PluginRepository return emptyJobSpec, errors.New("no supported hook plugin found") } - var qs = []*survey.Question{ + qs := []*survey.Question{ { Name: "hook", Prompt: &survey.Select{ diff --git a/cmd/job_render.go b/cmd/job_render.go index 03e0eb4877..0838193f4e 100644 --- a/cmd/job_render.go +++ b/cmd/job_render.go @@ -40,7 +40,7 @@ func jobRenderTemplateCommand(l log.Logger, jobSpecRepo JobSpecRepository) *cli. // create temporary directory renderedPath := filepath.Join(".", "render", jobSpec.Name) - _ = os.MkdirAll(renderedPath, 0770) + _ = os.MkdirAll(renderedPath, 0o770) l.Info(fmt.Sprintf("Rendering assets in %s", renderedPath)) now := time.Now() diff --git a/cmd/job_validate.go b/cmd/job_validate.go index 361b53b7e8..77a375dbc0 100644 --- a/cmd/job_validate.go +++ b/cmd/job_validate.go @@ -58,7 +58,7 @@ func jobValidateCommand(l log.Logger, pluginRepo models.PluginRepository, jobSpe return cmd } -func validateJobSpecificationRequest(l log.Logger, projectName string, namespace string, +func validateJobSpecificationRequest(l log.Logger, projectName, namespace string, pluginRepo models.PluginRepository, jobSpecs []models.JobSpec, host string, verbose bool) (err error) { adapt := v1handler.NewAdapter(pluginRepo, models.DatastoreRegistry) diff --git a/cmd/replay.go b/cmd/replay.go index 7980ce1c3d..bc3941397d 100644 --- a/cmd/replay.go +++ b/cmd/replay.go @@ -33,7 +33,7 @@ func taskRunBlockComparator(a, b interface{}) int { return strings.Compare(aAsserted.name, bAsserted.name) } -//formatRunsPerJobInstance returns a hashmap with Job -> Runs[] mapping +// formatRunsPerJobInstance returns a hashmap with Job -> Runs[] mapping func formatRunsPerJobInstance(instance *pb.ReplayExecutionTreeNode, taskReruns map[string]taskRunBlock, height int) { if _, ok := taskReruns[instance.JobName]; !ok { taskReruns[instance.JobName] = taskRunBlock{ diff --git a/cmd/replay_create.go b/cmd/replay_create.go index 266501dd09..b6e1b5a453 100644 --- a/cmd/replay_create.go +++ b/cmd/replay_create.go @@ -77,7 +77,7 @@ Date ranges are inclusive. return err } if dryRun { - //if only dry run, exit now + // if only dry run, exit now return nil } @@ -162,7 +162,7 @@ func printReplayDryRunResponse(l log.Logger, replayRequest *pb.ReplayDryRunReque taskRerunsMap := make(map[string]taskRunBlock) formatRunsPerJobInstance(replayDryRunResponse.ExecutionTree, taskRerunsMap, 0) - //sort run block + // sort run block taskRerunsSorted := set.NewTreeSetWith(taskRunBlockComparator) for _, block := range taskRerunsMap { taskRerunsSorted.Add(block) @@ -178,11 +178,11 @@ func printReplayDryRunResponse(l log.Logger, replayRequest *pb.ReplayDryRunReque } table.Render() - //print tree + // print tree l.Info(coloredNotice("\n> Dependency tree")) l.Info(fmt.Sprintf("%s", printExecutionTree(replayDryRunResponse.ExecutionTree, treeprint.New()))) - //ignored jobs + // ignored jobs if len(replayDryRunResponse.IgnoredJobs) > 0 { l.Info("> Ignored jobs") ignoredJobsCount := 0 @@ -190,7 +190,7 @@ func printReplayDryRunResponse(l log.Logger, replayRequest *pb.ReplayDryRunReque ignoredJobsCount++ l.Info(fmt.Sprintf("%d. %s", ignoredJobsCount, job)) } - //separator + // separator l.Info("") } } diff --git a/cmd/replay_list.go b/cmd/replay_list.go index c29bdf83bd..715615b8eb 100644 --- a/cmd/replay_list.go +++ b/cmd/replay_list.go @@ -82,9 +82,11 @@ func printReplayListResponse(l log.Logger, replayListResponse *pb.ListReplaysRes }) for _, replaySpec := range replayListResponse.ReplayList { - table.Append([]string{replaySpec.Id, replaySpec.JobName, replaySpec.StartDate.AsTime().Format(models.JobDatetimeLayout), + table.Append([]string{ + replaySpec.Id, replaySpec.JobName, replaySpec.StartDate.AsTime().Format(models.JobDatetimeLayout), replaySpec.EndDate.AsTime().Format(models.JobDatetimeLayout), replaySpec.Config[models.ConfigIgnoreDownstream], - replaySpec.CreatedAt.AsTime().Format(time.RFC3339), replaySpec.State}) + replaySpec.CreatedAt.AsTime().Format(time.RFC3339), replaySpec.State, + }) } table.Render() diff --git a/cmd/replay_status.go b/cmd/replay_status.go index a3f00bb3bc..8209f6b306 100644 --- a/cmd/replay_status.go +++ b/cmd/replay_status.go @@ -15,9 +15,7 @@ import ( ) func replayStatusCommand(l log.Logger, conf config.Optimus) *cli.Command { - var ( - projectName string - ) + var projectName string reCmd := &cli.Command{ Use: "status", diff --git a/cmd/resource.go b/cmd/resource.go index ff7ad5527a..427a33d8db 100644 --- a/cmd/resource.go +++ b/cmd/resource.go @@ -17,10 +17,8 @@ import ( cli "github.com/spf13/cobra" ) -var ( - validateResourceName = utils.ValidatorFactory.NewFromRegex(`^[a-zA-Z0-9][a-zA-Z0-9_\-\.]+$`, - `invalid name (can only contain characters A-Z (in either case), 0-9, "-", "_" or "." and must start with an alphanumeric character)`) -) +var validateResourceName = utils.ValidatorFactory.NewFromRegex(`^[a-zA-Z0-9][a-zA-Z0-9_\-\.]+$`, + `invalid name (can only contain characters A-Z (in either case), 0-9, "-", "_" or "." and must start with an alphanumeric character)`) func resourceCommand(l log.Logger, datastoreSpecsFs map[string]afero.Fs, datastoreRepo models.DatastoreRepo) *cli.Command { cmd := &cli.Command{ @@ -87,7 +85,7 @@ func createResourceSubCommand(l log.Logger, datastoreSpecFs map[string]afero.Fs, resourceDirectory := filepath.Join(rwd, newDirName) resourceNameDefault := strings.ReplaceAll(strings.ReplaceAll(resourceDirectory, "/", "."), "\\", ".") - var qs = []*survey.Question{ + qs := []*survey.Question{ { Name: "name", Prompt: &survey.Input{ diff --git a/cmd/server/server.go b/cmd/server/server.go index 67a28f4024..bb0bd25d9d 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -59,10 +59,8 @@ import ( "gorm.io/gorm" ) -var ( - // termChan listen for sigterm - termChan = make(chan os.Signal, 1) -) +// termChan listen for sigterm +var termChan = make(chan os.Signal, 1) const ( shutdownWait = 30 * time.Second diff --git a/cmd/version.go b/cmd/version.go index 0337068967..576d76f35c 100644 --- a/cmd/version.go +++ b/cmd/version.go @@ -64,7 +64,7 @@ func versionCommand(l log.Logger, host string, pluginRepo models.PluginRepositor } // getVersionRequest send a version request to service -func getVersionRequest(clientVer string, host string) (ver string, err error) { +func getVersionRequest(clientVer, host string) (ver string, err error) { dialTimeoutCtx, dialCancel := context.WithTimeout(context.Background(), OptimusDialTimeout) defer dialCancel() diff --git a/core/gossip/server.go b/core/gossip/server.go index fc11caac5d..e56d787dcf 100644 --- a/core/gossip/server.go +++ b/core/gossip/server.go @@ -27,9 +27,9 @@ const ( connectionTimeout = 10 * time.Second applyTimeout = 10 * time.Second - //leaderWaitDelay = 100 * time.Millisecond - //appliedWaitDelay = 100 * time.Millisecond - //raftLogCacheSize = 512 + // leaderWaitDelay = 100 * time.Millisecond + // appliedWaitDelay = 100 * time.Millisecond + // raftLogCacheSize = 512 ) type Server struct { @@ -180,7 +180,7 @@ func newSerfConfig(serfAddr, raftAddress, nodeID string, eventCh chan serf.Event // raft manages the leadership/follower state in cluster // minimum 3 nodes are required to work properly to have 1 node // fail-over resistant -func (s *Server) initRaft(_ context.Context, devMode bool, bootstrapCluster bool, schedulerConf config.SchedulerConfig, fsm raft.FSM) error { +func (s *Server) initRaft(_ context.Context, devMode, bootstrapCluster bool, schedulerConf config.SchedulerConfig, fsm raft.FSM) error { c := raft.DefaultConfig() c.LocalID = raft.ServerID(schedulerConf.NodeID) @@ -232,7 +232,7 @@ func (s *Server) initRaftStore(devMode bool, baseDir string) (raft.LogStore, raf } // prepare directory for data - if err := os.MkdirAll(baseDir, 0777); err != nil { + if err := os.MkdirAll(baseDir, 0o777); err != nil { return nil, nil, nil, err } diff --git a/core/tree/multi_root_tree.go b/core/tree/multi_root_tree.go index 6742ca6124..f423865d90 100644 --- a/core/tree/multi_root_tree.go +++ b/core/tree/multi_root_tree.go @@ -5,10 +5,8 @@ import ( "fmt" ) -var ( - // ErrCyclicDependencyEncountered is triggered a tree has a cyclic dependency - ErrCyclicDependencyEncountered = errors.New("a cycle dependency encountered in the tree") -) +// ErrCyclicDependencyEncountered is triggered a tree has a cyclic dependency +var ErrCyclicDependencyEncountered = errors.New("a cycle dependency encountered in the tree") // MultiRootTree - represents a data type which has multiple independent root nodes // all root nodes have their independent tree based on depdencies of TreeNode. diff --git a/datastore/service.go b/datastore/service.go index 7b425cf1c8..589b2068b4 100644 --- a/datastore/service.go +++ b/datastore/service.go @@ -21,7 +21,7 @@ const ( ConcurrentTicketPerSec = 5 ConcurrentLimit = 20 - //backupListWindow window interval to fetch recent backups + // backupListWindow window interval to fetch recent backups backupListWindow = -3 * 30 * 24 * time.Hour ) @@ -206,7 +206,7 @@ func (srv Service) BackupResourceDryRun(ctx context.Context, backupRequest model } } - //do backup in storer + // do backup in storer _, err = datastorer.BackupResource(ctx, models.BackupResourceRequest{ Resource: resourceSpec, BackupSpec: backupRequest, @@ -270,7 +270,7 @@ func (srv Service) BackupResource(ctx context.Context, backupRequest models.Back } } - //do backup in storer + // do backup in storer backupResp, err := datastorer.BackupResource(ctx, models.BackupResourceRequest{ Resource: resourceSpec, BackupSpec: backupRequest, @@ -295,7 +295,7 @@ func (srv Service) BackupResource(ctx context.Context, backupRequest models.Back } } - //save the backup + // save the backup backupRepo := srv.backupRepoFactory.New(backupRequest.Project, backupSpec.Resource.Datastore) if err := backupRepo.Save(ctx, backupSpec); err != nil { return models.BackupResult{}, err diff --git a/datastore/service_test.go b/datastore/service_test.go index 623c533620..e380338819 100644 --- a/datastore/service_test.go +++ b/datastore/service_test.go @@ -1445,7 +1445,7 @@ func TestService(t *testing.T) { Job: &jobDownstream, } - //root + // root jobRoot := models.JobSpec{ ID: uuid.Must(uuid.NewRandom()), Name: "job-1", @@ -1488,7 +1488,7 @@ func TestService(t *testing.T) { Spec: resultSpecRoot, } - //downstream + // downstream unitDownstream := models.GenerateDestinationRequest{ Config: models.PluginConfigs{}.FromJobSpec(jobDownstream.Task.Config), Assets: models.PluginAssets{}.FromJobSpec(jobDownstream.Assets), @@ -1599,7 +1599,7 @@ func TestService(t *testing.T) { Job: &jobDownstream, } - //root + // root jobRoot := models.JobSpec{ ID: uuid.Must(uuid.NewRandom()), Name: "job-1", @@ -1642,7 +1642,7 @@ func TestService(t *testing.T) { Spec: resultSpecRoot, } - //downstream + // downstream unitDownstream := models.GenerateDestinationRequest{ Config: models.PluginConfigs{}.FromJobSpec(jobDownstream.Task.Config), Assets: models.PluginAssets{}.FromJobSpec(jobDownstream.Assets), @@ -2321,7 +2321,7 @@ func TestService(t *testing.T) { Job: &jobDownstream, } - //root + // root jobRoot := models.JobSpec{ ID: uuid.Must(uuid.NewRandom()), Name: "job-1", @@ -2363,7 +2363,7 @@ func TestService(t *testing.T) { Spec: resultSpecRoot, } - //downstream + // downstream unitDownstream := models.GenerateDestinationRequest{ Config: models.PluginConfigs{}.FromJobSpec(jobDownstream.Task.Config), Assets: models.PluginAssets{}.FromJobSpec(jobDownstream.Assets), diff --git a/ext/datastore/bigquery/adapter.go b/ext/datastore/bigquery/adapter.go index 6d053a2f93..30cb4d3faa 100644 --- a/ext/datastore/bigquery/adapter.go +++ b/ext/datastore/bigquery/adapter.go @@ -84,6 +84,7 @@ func bqFieldModeTo(field BQField) (fieldMode, error) { } return fm, nil } + func bqGoogleSheetsOptionsTo(m map[string]interface{}) *bqapi.GoogleSheetsOptions { var skipLeadingRows int64 var sheetRange string diff --git a/ext/datastore/bigquery/bigquery_test.go b/ext/datastore/bigquery/bigquery_test.go index 36306b4ede..21e1314fcd 100644 --- a/ext/datastore/bigquery/bigquery_test.go +++ b/ext/datastore/bigquery/bigquery_test.go @@ -545,7 +545,7 @@ func TestBigquery(t *testing.T) { bQClientFactory.On("New", testingContext, secret).Return(bQClient, nil) - //duplicate table + // duplicate table bQClient.On("DatasetInProject", spec.Project, spec.Dataset).Return(bQDatasetHandle).Once() bQClient.On("DatasetInProject", destinationTable.Project, destinationTable.Dataset).Return(bQDatasetHandle).Once() bQDatasetHandle.On("Metadata", testingContext).Return(&datasetMetadata, nil) @@ -555,11 +555,11 @@ func TestBigquery(t *testing.T) { bQCopier.On("Run", testingContext).Return(bQJob, nil) bQJob.On("Wait", testingContext).Return(&bigquery.JobStatus{}, nil) - //update expiry + // update expiry bQTable.On("Metadata", testingContext).Return(tableMetadata, nil).Once() bQTable.On("Update", testingContext, toUpdate, eTag).Return(tableMetadata, nil) - //verify + // verify bQTable.On("Metadata", testingContext).Return(tableMetadata, nil).Once() bq := BigQuery{ ClientFac: bQClientFactory, diff --git a/ext/datastore/bigquery/dataset.go b/ext/datastore/bigquery/dataset.go index f4f9d13c0c..62fbd3f6b9 100644 --- a/ext/datastore/bigquery/dataset.go +++ b/ext/datastore/bigquery/dataset.go @@ -13,9 +13,7 @@ import ( "google.golang.org/api/googleapi" ) -var ( - datasetMutex sync.Mutex -) +var datasetMutex sync.Mutex func createDataset(ctx context.Context, spec models.ResourceSpec, client bqiface.Client, upsert bool) error { bqResource, ok := spec.Spec.(BQDataset) diff --git a/ext/datastore/bigquery/dataset_spec.go b/ext/datastore/bigquery/dataset_spec.go index e07827b701..3c50183af3 100644 --- a/ext/datastore/bigquery/dataset_spec.go +++ b/ext/datastore/bigquery/dataset_spec.go @@ -12,9 +12,7 @@ import ( "gopkg.in/yaml.v3" ) -var ( - datasetNameParseRegex = regexp.MustCompile(`^([\w-]+)\.(\w+)$`) -) +var datasetNameParseRegex = regexp.MustCompile(`^([\w-]+)\.(\w+)$`) const ( ExpectedDatasetNameSegments = 3 @@ -47,8 +45,7 @@ type BQDatasetMetadata struct { } // datasetSpecHandler helps serializing/deserializing datastore resource for dataset -type datasetSpecHandler struct { -} +type datasetSpecHandler struct{} func (s datasetSpecHandler) ToYaml(optResource models.ResourceSpec) ([]byte, error) { if optResource.Spec == nil { diff --git a/ext/datastore/bigquery/mock.go b/ext/datastore/bigquery/mock.go index 536fde14b9..ef62bd91bc 100644 --- a/ext/datastore/bigquery/mock.go +++ b/ext/datastore/bigquery/mock.go @@ -30,7 +30,7 @@ func (cli *BqClientMock) Dataset(dataset string) bqiface.Dataset { panic("not implemented") } -func (cli *BqClientMock) DatasetInProject(project string, dataset string) bqiface.Dataset { +func (cli *BqClientMock) DatasetInProject(project, dataset string) bqiface.Dataset { return cli.Called(project, dataset).Get(0).(bqiface.Dataset) } diff --git a/ext/datastore/bigquery/table.go b/ext/datastore/bigquery/table.go index e8e3602aed..9bec8ceece 100644 --- a/ext/datastore/bigquery/table.go +++ b/ext/datastore/bigquery/table.go @@ -15,9 +15,7 @@ import ( "google.golang.org/api/googleapi" ) -var ( - tableNameParseRegex = regexp.MustCompile(`^([\w-]+)\.(\w+)\.([\w-]+)$`) -) +var tableNameParseRegex = regexp.MustCompile(`^([\w-]+)\.(\w+)\.([\w-]+)$`) const ( errorReadTableSpec = "failed to read table spec for bigquery" @@ -198,7 +196,7 @@ func prepareBQResourceDst(bqResourceSrc BQTable, backupSpec models.BackupRequest } } -func duplicateTable(ctx context.Context, client bqiface.Client, bqResourceSrc BQTable, bqResourceDst BQTable) (bqiface.Table, error) { +func duplicateTable(ctx context.Context, client bqiface.Client, bqResourceSrc, bqResourceDst BQTable) (bqiface.Table, error) { // make sure dataset is present datasetDst := client.DatasetInProject(bqResourceDst.Project, bqResourceDst.Dataset) if err := ensureDataset(ctx, datasetDst, BQDataset{ diff --git a/ext/datastore/bigquery/table_spec.go b/ext/datastore/bigquery/table_spec.go index e9196cdf60..1171e2048e 100644 --- a/ext/datastore/bigquery/table_spec.go +++ b/ext/datastore/bigquery/table_spec.go @@ -123,8 +123,7 @@ type BQPartitioningRange struct { } // tableSpecHandler helps serializing/deserializing datastore resource for table -type tableSpecHandler struct { -} +type tableSpecHandler struct{} func (s tableSpecHandler) ToYaml(optResource models.ResourceSpec) ([]byte, error) { if optResource.Spec == nil { diff --git a/ext/datastore/bigquery/table_test.go b/ext/datastore/bigquery/table_test.go index 9e458e221b..0c35797c46 100644 --- a/ext/datastore/bigquery/table_test.go +++ b/ext/datastore/bigquery/table_test.go @@ -435,7 +435,7 @@ func TestTable(t *testing.T) { bQJob := new(BqJobMock) defer bQJob.AssertExpectations(t) - //duplicate table + // duplicate table bQClient.On("DatasetInProject", bQResource.Project, bQResource.Dataset).Return(bQDatasetHandle).Once() bQClient.On("DatasetInProject", destinationTable.Project, destinationTable.Dataset).Return(bQDatasetHandle).Once() bQDatasetHandle.On("Metadata", testingContext).Return(&datasetMetadata, nil) @@ -445,11 +445,11 @@ func TestTable(t *testing.T) { bQCopier.On("Run", testingContext).Return(bQJob, nil) bQJob.On("Wait", testingContext).Return(&bigquery.JobStatus{}, nil) - //update expiry + // update expiry bQTable.On("Metadata", testingContext).Return(tableMetadata, nil).Once() bQTable.On("Update", testingContext, toUpdate, eTag).Return(tableMetadata, nil) - //verify + // verify bQTable.On("Metadata", testingContext).Return(tableMetadata, nil).Once() resp, err := backupTable(testingContext, request, bQClient) @@ -489,7 +489,7 @@ func TestTable(t *testing.T) { errorMsg := "unable to get dataset metadata" - //duplicate table + // duplicate table bQClient.On("DatasetInProject", destinationTable.Project, destinationTable.Dataset).Return(bQDatasetHandle).Once() bQDatasetHandle.On("Metadata", testingContext).Return(&bqiface.DatasetMetadata{}, errors.New(errorMsg)) resp, err := backupTable(testingContext, request, bQClient) @@ -506,7 +506,7 @@ func TestTable(t *testing.T) { errorMsg := "unable to get dataset metadata" - //duplicate table + // duplicate table bQClient.On("DatasetInProject", destinationTable.Project, destinationTable.Dataset).Return(bQDatasetHandle).Once() bQDatasetHandle.On("Metadata", testingContext).Return(&datasetMetadata, nil).Once() bQClient.On("DatasetInProject", bQResource.Project, bQResource.Dataset).Return(bQDatasetHandle).Once() @@ -535,7 +535,7 @@ func TestTable(t *testing.T) { errorMsg := "unable to copy table" - //duplicate table + // duplicate table bQClient.On("DatasetInProject", destinationTable.Project, destinationTable.Dataset).Return(bQDatasetHandle).Once() bQDatasetHandle.On("Metadata", testingContext).Return(&datasetMetadata, nil).Once() bQClient.On("DatasetInProject", bQResource.Project, bQResource.Dataset).Return(bQDatasetHandle).Once() @@ -568,7 +568,7 @@ func TestTable(t *testing.T) { errorMsg := "unable to get status of copy table" - //duplicate table + // duplicate table bQClient.On("DatasetInProject", destinationTable.Project, destinationTable.Dataset).Return(bQDatasetHandle).Once() bQDatasetHandle.On("Metadata", testingContext).Return(&datasetMetadata, nil).Once() bQClient.On("DatasetInProject", bQResource.Project, bQResource.Dataset).Return(bQDatasetHandle).Once() @@ -602,7 +602,7 @@ func TestTable(t *testing.T) { errorMsg := "unable to get metadata of backup table" - //duplicate table + // duplicate table bQClient.On("DatasetInProject", destinationTable.Project, destinationTable.Dataset).Return(bQDatasetHandle).Once() bQDatasetHandle.On("Metadata", testingContext).Return(&datasetMetadata, nil).Once() bQClient.On("DatasetInProject", bQResource.Project, bQResource.Dataset).Return(bQDatasetHandle).Once() @@ -613,7 +613,7 @@ func TestTable(t *testing.T) { bQCopier.On("Run", testingContext).Return(bQJob, nil) bQJob.On("Wait", testingContext).Return(&bigquery.JobStatus{}, nil) - //update expiry + // update expiry bQTable.On("Metadata", testingContext).Return(tableMetadata, errors.New(errorMsg)).Once() resp, err := backupTable(testingContext, request, bQClient) @@ -639,7 +639,7 @@ func TestTable(t *testing.T) { errorMsg := "unable to update expiration of backup table" - //duplicate table + // duplicate table bQClient.On("DatasetInProject", destinationTable.Project, destinationTable.Dataset).Return(bQDatasetHandle).Once() bQDatasetHandle.On("Metadata", testingContext).Return(&datasetMetadata, nil).Once() bQClient.On("DatasetInProject", bQResource.Project, bQResource.Dataset).Return(bQDatasetHandle).Once() @@ -650,7 +650,7 @@ func TestTable(t *testing.T) { bQCopier.On("Run", testingContext).Return(bQJob, nil) bQJob.On("Wait", testingContext).Return(&bigquery.JobStatus{}, nil) - //update expiry + // update expiry bQTable.On("Metadata", testingContext).Return(tableMetadata, nil).Once() bQTable.On("Update", testingContext, toUpdate, eTag).Return(tableMetadata, errors.New(errorMsg)) @@ -677,7 +677,7 @@ func TestTable(t *testing.T) { errorMsg := "unable to ensure the backup table" - //duplicate table + // duplicate table bQClient.On("DatasetInProject", destinationTable.Project, destinationTable.Dataset).Return(bQDatasetHandle).Once() bQDatasetHandle.On("Metadata", testingContext).Return(&datasetMetadata, nil).Once() bQClient.On("DatasetInProject", bQResource.Project, bQResource.Dataset).Return(bQDatasetHandle).Once() @@ -688,11 +688,11 @@ func TestTable(t *testing.T) { bQCopier.On("Run", testingContext).Return(bQJob, nil) bQJob.On("Wait", testingContext).Return(&bigquery.JobStatus{}, nil) - //update expiry + // update expiry bQTable.On("Metadata", testingContext).Return(tableMetadata, nil).Once() bQTable.On("Update", testingContext, toUpdate, eTag).Return(tableMetadata, nil) - //verify + // verify bQTable.On("Metadata", testingContext).Return(tableMetadata, errors.New(errorMsg)).Once() resp, err := backupTable(testingContext, request, bQClient) diff --git a/ext/notify/slack/slack.go b/ext/notify/slack/slack.go index 887eb5ae6b..31f82b226e 100644 --- a/ext/notify/slack/slack.go +++ b/ext/notify/slack/slack.go @@ -159,7 +159,7 @@ func buildMessageBlocks(events []event) []api.Block { if slas, ok := evt.meta.Value["slas"]; ok { for slaIdx, sla := range slas.GetListValue().GetValues() { slaFields := sla.GetStructValue().GetFields() - var slaStr = "" + slaStr := "" if taskID, ok := slaFields["task_id"]; ok { slaStr += "\nTask: " + taskID.GetStringValue() } @@ -221,7 +221,7 @@ func buildMessageBlocks(events []event) []api.Block { var detailsElementsSlice []api.MixedElement if exception, ok := evt.meta.Value["exception"]; ok && exception.GetStringValue() != "" { optionText := api.NewTextBlockObject("plain_text", fmt.Sprintf("Exception:\n%s", exception.GetStringValue()), true, false) - detailsElementsSlice = append(detailsElementsSlice, optionText) //api.NewOptionBlockObject("", optionText, nil)) + detailsElementsSlice = append(detailsElementsSlice, optionText) // api.NewOptionBlockObject("", optionText, nil)) } if message, ok := evt.meta.Value["message"]; ok && message.GetStringValue() != "" { optionText := api.NewTextBlockObject("plain_text", fmt.Sprintf("Message:\n%s", message.GetStringValue()), true, false) diff --git a/ext/scheduler/airflow/airflow.go b/ext/scheduler/airflow/airflow.go index 32d7d2c5a1..e5f4b53128 100644 --- a/ext/scheduler/airflow/airflow.go +++ b/ext/scheduler/airflow/airflow.go @@ -303,7 +303,7 @@ func (s *scheduler) Clear(ctx context.Context, projSpec models.ProjectSpec, jobN return nil } -func (s *scheduler) GetJobRunStatus(ctx context.Context, projectSpec models.ProjectSpec, jobName string, startDate time.Time, endDate time.Time, +func (s *scheduler) GetJobRunStatus(ctx context.Context, projectSpec models.ProjectSpec, jobName string, startDate, endDate time.Time, batchSize int) ([]models.JobStatus, error) { allJobStatus, err := s.GetJobStatus(ctx, projectSpec, jobName) if err != nil { diff --git a/ext/scheduler/airflow2/airflow.go b/ext/scheduler/airflow2/airflow.go index e120a6bd84..9d625abc8c 100644 --- a/ext/scheduler/airflow2/airflow.go +++ b/ext/scheduler/airflow2/airflow.go @@ -34,9 +34,7 @@ var SharedLib []byte //go:embed resources/base_dag.py var resBaseDAG []byte -var ( - ErrEmptyJobName = errors.New("job name cannot be an empty string") -) +var ErrEmptyJobName = errors.New("job name cannot be an empty string") const ( baseLibFileName = "__lib.py" @@ -265,7 +263,7 @@ func (s *scheduler) Clear(ctx context.Context, projSpec models.ProjectSpec, jobN } schdHost = strings.Trim(schdHost, "/") - var jsonStr = []byte(fmt.Sprintf(`{"start_date":"%s", "end_date": "%s", "dry_run": false, "reset_dag_runs": true, "only_failed": false}`, + jsonStr := []byte(fmt.Sprintf(`{"start_date":"%s", "end_date": "%s", "dry_run": false, "reset_dag_runs": true, "only_failed": false}`, startDate.UTC().Format(airflowDateFormat), endDate.UTC().Format(airflowDateFormat))) postURL := fmt.Sprintf( @@ -315,7 +313,7 @@ func (s *scheduler) GetJobRunStatus(ctx context.Context, projectSpec models.Proj "execution_date_gte": "%s", "execution_date_lte": "%s" }`, pageOffset, batchSize, jobName, startDate.UTC().Format(airflowDateFormat), endDate.UTC().Format(airflowDateFormat)) - var jsonStr = []byte(dagRunBatchReq) + jsonStr := []byte(dagRunBatchReq) request, err := http.NewRequestWithContext(ctx, http.MethodPost, postURL, bytes.NewBuffer(jsonStr)) if err != nil { return nil, fmt.Errorf("failed to build http request for %s: %w", dagStatusBatchURL, err) diff --git a/ext/scheduler/airflow2/compiler/compiler.go b/ext/scheduler/airflow2/compiler/compiler.go index d104badf10..a204925c1b 100644 --- a/ext/scheduler/airflow2/compiler/compiler.go +++ b/ext/scheduler/airflow2/compiler/compiler.go @@ -12,9 +12,7 @@ import ( "github.com/odpf/optimus/models" ) -var ( - ErrEmptyTemplateFile = errors.New("empty template file for job") -) +var ErrEmptyTemplateFile = errors.New("empty template file for job") // Compiler converts generic job spec data to scheduler specific file that will // be consumed by the target scheduler diff --git a/ext/scheduler/prime/planner.go b/ext/scheduler/prime/planner.go index cabdc2b9e3..9dd9688d5d 100644 --- a/ext/scheduler/prime/planner.go +++ b/ext/scheduler/prime/planner.go @@ -163,7 +163,7 @@ func (p *Planner) getJobAllocations(ctx context.Context) (mostCapNodeID string, } } } - var mostCapSize = PeerPoolSize + mostCapSize := PeerPoolSize for nodeID, utilization := range peerUtilization { if utilization < mostCapSize { mostCapNodeID = nodeID diff --git a/ext/scheduler/prime/scheduler.go b/ext/scheduler/prime/scheduler.go index 9338f17bbb..e92910cf1e 100644 --- a/ext/scheduler/prime/scheduler.go +++ b/ext/scheduler/prime/scheduler.go @@ -71,7 +71,7 @@ func (s *Scheduler) Clear(ctx context.Context, projSpec models.ProjectSpec, jobN return nil } -func (s *Scheduler) GetJobRunStatus(ctx context.Context, projectSpec models.ProjectSpec, jobName string, startDate time.Time, endDate time.Time, batchSize int) ([]models.JobStatus, error) { +func (s *Scheduler) GetJobRunStatus(ctx context.Context, projectSpec models.ProjectSpec, jobName string, startDate, endDate time.Time, batchSize int) ([]models.JobStatus, error) { panic("implement me") } diff --git a/extension/extension.go b/extension/extension.go index 5950f4e796..40be0a5c98 100644 --- a/extension/extension.go +++ b/extension/extension.go @@ -128,7 +128,7 @@ func (e *Extension) downloadAsset(url, destPath string) error { return e.getResponseError(resp) } - f, err := os.OpenFile(destPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0755) + f, err := os.OpenFile(destPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o755) if err != nil { return fmt.Errorf("error opening file: %v", err) } diff --git a/extension/manifest.go b/extension/manifest.go index 7c745e117e..abd8162f50 100644 --- a/extension/manifest.go +++ b/extension/manifest.go @@ -54,7 +54,7 @@ func FlushManifest(manifest *Manifest, dirPath string) error { return fmt.Errorf("error creating dir: %v", err) } manifestPath := path.Join(dirPath, manifestFileName) - f, err := os.OpenFile(manifestPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0755) + f, err := os.OpenFile(manifestPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o755) if err != nil { return fmt.Errorf("error opening file: %v", err) } diff --git a/job/dependency_resolver_test.go b/job/dependency_resolver_test.go index 8e362180b3..3600535e5a 100644 --- a/job/dependency_resolver_test.go +++ b/job/dependency_resolver_test.go @@ -47,7 +47,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test1", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -78,7 +78,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test2", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -151,7 +151,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test1", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -182,7 +182,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test2", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -230,7 +230,8 @@ func TestDependencyResolver(t *testing.T) { Dependencies: []string{ "project.dataset.tablex_destination", "project.dataset.table2_destination", - }}, nil) + }, + }, nil) execUnit1.On("GenerateDependencies", ctx, unitData2).Return(&models.GenerateDependenciesResponse{}, nil) // hook dependency @@ -268,7 +269,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test1", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -299,7 +300,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test2", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -379,7 +380,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test3", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -398,7 +399,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test1", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -417,7 +418,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test2", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -478,7 +479,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test1", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -497,7 +498,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test2", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -544,7 +545,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test1", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -584,7 +585,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test1", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -609,7 +610,8 @@ func TestDependencyResolver(t *testing.T) { unitData := models.GenerateDependenciesRequest{Config: models.PluginConfigs{}.FromJobSpec(jobSpec1.Task.Config), Assets: models.PluginAssets{}.FromJobSpec(jobSpec1.Assets), Project: projectSpec} execUnit.On("GenerateDependencies", context.Background(), unitData).Return(&models.GenerateDependenciesResponse{ - Dependencies: []string{"project.dataset.table3_destination"}}, nil) + Dependencies: []string{"project.dataset.table3_destination"}, + }, nil) resolver := job.NewDependencyResolver(projectJobSpecRepoFactory) _, err := resolver.Resolve(ctx, projectSpec, jobSpec1, nil) @@ -627,7 +629,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test1", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -646,7 +648,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test2", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -690,7 +692,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test1", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -709,7 +711,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test2", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -752,7 +754,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test3", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -771,7 +773,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test1", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -791,7 +793,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test2", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -862,7 +864,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test3", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -881,7 +883,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test1", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -903,7 +905,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test2", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -924,7 +926,7 @@ func TestDependencyResolver(t *testing.T) { Name: "test2-external", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ diff --git a/job/priority_resolver.go b/job/priority_resolver.go index e1aac07c7e..79224f15a6 100644 --- a/job/priority_resolver.go +++ b/job/priority_resolver.go @@ -46,8 +46,7 @@ type PriorityResolver interface { // dag2, dag6 will get weight of maxWeight-1 // dag3 will get maxWeight-2 // Note: it's crucial that dependencies of all Jobs are already resolved -type priorityResolver struct { -} +type priorityResolver struct{} // NewPriorityResolver create an instance of priorityResolver func NewPriorityResolver() *priorityResolver { @@ -116,7 +115,7 @@ func (a *priorityResolver) buildMultiRootDependencyTree(jobSpecs []models.JobSpe for _, childSpec := range jobSpecMap { childNode := a.findOrCreateDAGNode(tree, childSpec) for _, depDAG := range childSpec.Dependencies { - var missingParent = false + missingParent := false parentSpec, ok := jobSpecMap[depDAG.Job.Name] if !ok { if depDAG.Type == models.JobSpecDependencyTypeIntra { diff --git a/job/priority_resolver_test.go b/job/priority_resolver_test.go index 80900d09d1..7ee57ddacd 100644 --- a/job/priority_resolver_test.go +++ b/job/priority_resolver_test.go @@ -24,7 +24,7 @@ func getDependencyObject(specs map[string]models.JobSpec, dependencySpecs ...str return dependenciesMap } -func getMultiDependencyObject(specs map[string]models.JobSpec, dependencySpec1 string, dependencySpec2 string) map[string]models.JobSpecDependency { +func getMultiDependencyObject(specs map[string]models.JobSpec, dependencySpec1, dependencySpec2 string) map[string]models.JobSpecDependency { depSpec1 := specs[dependencySpec1] depSpec2 := specs[dependencySpec2] return map[string]models.JobSpecDependency{dependencySpec1: {Job: &depSpec1}, dependencySpec2: {Job: &depSpec2}} @@ -238,8 +238,10 @@ func TestPriorityWeightResolver(t *testing.T) { externalSpecName := "external-dag-dep" externalSpec := models.JobSpec{Name: externalSpecName, Dependencies: noDependency} deps2 := getDependencyObject(specs, spec1) - deps2[externalSpecName] = models.JobSpecDependency{Job: &externalSpec, Project: &models.ProjectSpec{Name: "external-project-name"}, - Type: models.JobSpecDependencyTypeInter} + deps2[externalSpecName] = models.JobSpecDependency{ + Job: &externalSpec, Project: &models.ProjectSpec{Name: "external-project-name"}, + Type: models.JobSpecDependencyTypeInter, + } specs[spec2] = models.JobSpec{Name: spec2, Dependencies: deps2} jobSpecs = append(jobSpecs, specs[spec2]) @@ -255,8 +257,10 @@ func TestPriorityWeightResolver(t *testing.T) { // for the spec2, we'll add external spec as dependency jobnameWithExternalDep := "job-with-1-external-dep" jobnameWithExternalDepDependencies := map[string]models.JobSpecDependency{ - externalSpecName: {Job: &externalSpec, Project: &models.ProjectSpec{Name: "external-project-name"}, - Type: models.JobSpecDependencyTypeInter}, + externalSpecName: { + Job: &externalSpec, Project: &models.ProjectSpec{Name: "external-project-name"}, + Type: models.JobSpecDependencyTypeInter, + }, } jobSpecs = append(jobSpecs, models.JobSpec{Name: jobnameWithExternalDep, Dependencies: jobnameWithExternalDepDependencies}) @@ -267,8 +271,10 @@ func TestPriorityWeightResolver(t *testing.T) { max := job.MaxPriorityWeight max1 := max - job.PriorityWeightGap*1 max2 := max - job.PriorityWeightGap*2 - expectedWeights := map[string]int{spec1: max, spec2: max1, spec3: max2, spec4: max, spec5: max1, - jobnameWithExternalDep: max1} + expectedWeights := map[string]int{ + spec1: max, spec2: max1, spec3: max2, spec4: max, spec5: max1, + jobnameWithExternalDep: max1, + } for _, jobSpec := range resolvedJobSpecs { assert.Equal(t, expectedWeights[jobSpec.Name], jobSpec.Task.Priority) diff --git a/job/replay.go b/job/replay.go index 3a99caba10..2de69f015c 100644 --- a/job/replay.go +++ b/job/replay.go @@ -126,7 +126,7 @@ func populateDownstreamRuns(parentNode *tree.TreeNode) (*tree.TreeNode, error) { // where parent task and current task has same scheduled interval taskFirstEffectedRun := taskSchedule.Next(parentRunDate.Add(-1 * time.Second)) - //make sure it is after current dag start date + // make sure it is after current dag start date if taskFirstEffectedRun.Before(childDag.Schedule.StartDate) { continue } @@ -150,7 +150,7 @@ func populateDownstreamRuns(parentNode *tree.TreeNode) (*tree.TreeNode, error) { // getRunsBetweenDates provides execution runs from start to end following a schedule interval // start and end both are inclusive -func getRunsBetweenDates(start time.Time, end time.Time, schedule string) ([]time.Time, error) { +func getRunsBetweenDates(start, end time.Time, schedule string) ([]time.Time, error) { var runs []time.Time // standard cron parser without descriptors @@ -218,7 +218,7 @@ func (srv *Service) prepareReplayStatusTree(ctx context.Context, replayRequest m return srv.populateDownstreamRunsWithStatus(ctx, replayRequest.Project, replaySpec.StartDate, replaySpec.EndDate, replaySpec.ExecutionTree) } -func (srv *Service) populateDownstreamRunsWithStatus(ctx context.Context, projectSpec models.ProjectSpec, startDate time.Time, endDate time.Time, parentNode *tree.TreeNode) (*tree.TreeNode, error) { +func (srv *Service) populateDownstreamRunsWithStatus(ctx context.Context, projectSpec models.ProjectSpec, startDate, endDate time.Time, parentNode *tree.TreeNode) (*tree.TreeNode, error) { for _, dependent := range parentNode.Dependents { runsWithStatus := set.NewTreeSetWith(TimeOfJobStatusComparator) jobStatusList, err := srv.replayManager.GetRunStatus(ctx, projectSpec, startDate, endDate, dependent.Data.(models.JobSpec).Name) diff --git a/job/replay_manager.go b/job/replay_manager.go index 8974eb91f1..2dee30bd4d 100644 --- a/job/replay_manager.go +++ b/job/replay_manager.go @@ -25,13 +25,13 @@ var ( ) const ( - //ReplayRunTimeout signifies type of replay failure caused by timeout + // ReplayRunTimeout signifies type of replay failure caused by timeout ReplayRunTimeout = "long running replay timeout" // TimestampLogFormat format of a timestamp will be used in logs TimestampLogFormat = "2006-01-02T15:04:05+00:00" // schedulerBatchSize number of run instances to be checked per request schedulerBatchSize = 100 - //replayListWindow window interval to fetch recent replays + // replayListWindow window interval to fetch recent replays replayListWindow = -3 * 30 * 24 * time.Hour ) @@ -201,17 +201,17 @@ func (m *Manager) GetRunStatus(ctx context.Context, projectSpec models.ProjectSp return m.scheduler.GetJobRunStatus(ctx, projectSpec, jobName, startDate, batchEndDate, schedulerBatchSize) } -//Close stops consuming any new request +// Close stops consuming any new request func (m *Manager) Close() error { if m.requestQ != nil { - //stop accepting any more requests + // stop accepting any more requests close(m.requestQ) } - //wait for request worker to finish + // wait for request worker to finish m.wg.Wait() if m.syncerScheduler != nil { - //wait for syncer to finish + // wait for syncer to finish <-m.syncerScheduler.Stop().Done() } return nil diff --git a/job/replay_manager_test.go b/job/replay_manager_test.go index be9d16c295..ebd1389b2f 100644 --- a/job/replay_manager_test.go +++ b/job/replay_manager_test.go @@ -207,7 +207,6 @@ func TestReplayManager(t *testing.T) { defer replayWorkerFact.AssertExpectations(t) replayManager := job.NewManager(log, replayWorkerFact, replaySpecRepoFac, uuidProvider, job.ReplayManagerConfig{ - NumWorkers: 1, WorkerTimeout: time.Second * 5, }, nil, replayValidator, nil) diff --git a/job/replay_syncer_test.go b/job/replay_syncer_test.go index 76ba1f6cdc..f51fdffc63 100644 --- a/job/replay_syncer_test.go +++ b/job/replay_syncer_test.go @@ -26,7 +26,7 @@ func TestReplaySyncer(t *testing.T) { startDate := time.Date(2020, time.Month(8), 22, 0, 0, 0, 0, time.UTC) endDate := time.Date(2020, time.Month(8), 23, 0, 0, 0, 0, time.UTC) batchEndDate := endDate.AddDate(0, 0, 1).Add(time.Second * -1) - dagStartTime := time.Date(2020, time.Month(4), 05, 0, 0, 0, 0, time.UTC) + dagStartTime := time.Date(2020, time.Month(4), 5, 0, 0, 0, 0, time.UTC) specs := make(map[string]models.JobSpec) spec1 := "dag1" diff --git a/job/replay_validator.go b/job/replay_validator.go index f56b3ca183..e39b374eec 100644 --- a/job/replay_validator.go +++ b/job/replay_validator.go @@ -10,10 +10,8 @@ import ( "github.com/odpf/optimus/store" ) -var ( - // ReplayStatusToValidate signifies list of status to be used when checking active replays - ReplayStatusToValidate = []string{models.ReplayStatusInProgress, models.ReplayStatusAccepted, models.ReplayStatusReplayed} -) +// ReplayStatusToValidate signifies list of status to be used when checking active replays +var ReplayStatusToValidate = []string{models.ReplayStatusInProgress, models.ReplayStatusAccepted, models.ReplayStatusReplayed} type Validator struct { scheduler models.SchedulerUnit @@ -28,12 +26,12 @@ func (v *Validator) Validate(ctx context.Context, replaySpecRepo store.ReplaySpe if !reqInput.Force { reqReplayNodes := replayTree.GetAllNodes() - //check if this dag have running instance in the batchScheduler + // check if this dag have running instance in the batchScheduler if err := v.validateRunningInstance(ctx, reqReplayNodes, reqInput); err != nil { return err } - //check another replay active for this dag + // check another replay active for this dag activeReplaySpecs, err := replaySpecRepo.GetByStatus(ctx, ReplayStatusToValidate) if err != nil { if err == store.ErrResourceNotFound { @@ -43,7 +41,7 @@ func (v *Validator) Validate(ctx context.Context, replaySpecRepo store.ReplaySpe } return validateReplayJobsConflict(activeReplaySpecs, reqReplayNodes) } - //check and cancel if found conflicted replays for same job ID + // check and cancel if found conflicted replays for same job ID return cancelConflictedReplays(ctx, replaySpecRepo, reqInput) } @@ -92,7 +90,7 @@ func validateReplayJobsConflict(activeReplaySpecs []models.ReplaySpec, reqReplay return nil } -func checkAnyConflictedDags(activeNodes []*tree.TreeNode, reqReplayNodes []*tree.TreeNode) error { +func checkAnyConflictedDags(activeNodes, reqReplayNodes []*tree.TreeNode) error { activeNodesMap := make(map[string]*tree.TreeNode) for _, activeNode := range activeNodes { activeNodesMap[activeNode.GetName()] = activeNode @@ -108,7 +106,7 @@ func checkAnyConflictedDags(activeNodes []*tree.TreeNode, reqReplayNodes []*tree return nil } -func checkAnyConflictedRuns(activeNode *tree.TreeNode, reqNode *tree.TreeNode) error { +func checkAnyConflictedRuns(activeNode, reqNode *tree.TreeNode) error { for _, reqNodeRun := range reqNode.Runs.Values() { if activeNode.Runs.Contains(reqNodeRun.(time.Time)) { return ErrConflictedJobRun diff --git a/job/replay_validator_test.go b/job/replay_validator_test.go index 592316f6ee..2471e2e140 100644 --- a/job/replay_validator_test.go +++ b/job/replay_validator_test.go @@ -20,7 +20,7 @@ func TestReplayValidator(t *testing.T) { t.Run("Validate", func(t *testing.T) { ctx := context.TODO() reqBatchSize := 100 - dagStartTime := time.Date(2020, time.Month(4), 05, 0, 0, 0, 0, time.UTC) + dagStartTime := time.Date(2020, time.Month(4), 5, 0, 0, 0, 0, time.UTC) startDate := time.Date(2020, time.Month(8), 22, 0, 0, 0, 0, time.UTC) endDate := time.Date(2020, time.Month(8), 26, 0, 0, 0, 0, time.UTC) batchEndDate := endDate.AddDate(0, 0, 1).Add(time.Second * -1) diff --git a/job/service.go b/job/service.go index 6757c38605..ec65057d6e 100644 --- a/job/service.go +++ b/job/service.go @@ -17,16 +17,14 @@ import ( ) const ( - //PersistJobPrefix is used to keep the job during sync even if they are not in source repo + // PersistJobPrefix is used to keep the job during sync even if they are not in source repo PersistJobPrefix string = "__" ConcurrentTicketPerSec = 40 ConcurrentLimit = 600 ) -var ( - errDependencyResolution = fmt.Errorf("dependency resolution") -) +var errDependencyResolution = fmt.Errorf("dependency resolution") type AssetCompiler func(jobSpec models.JobSpec, scheduledAt time.Time) (models.JobAssets, error) @@ -65,7 +63,7 @@ type ReplayManager interface { Replay(context.Context, models.ReplayRequest) (models.ReplayResult, error) GetReplay(context.Context, uuid.UUID) (models.ReplaySpec, error) GetReplayList(ctx context.Context, projectID uuid.UUID) ([]models.ReplaySpec, error) - GetRunStatus(ctx context.Context, projectSpec models.ProjectSpec, startDate time.Time, endDate time.Time, + GetRunStatus(ctx context.Context, projectSpec models.ProjectSpec, startDate, endDate time.Time, jobName string) ([]models.JobStatus, error) } @@ -387,7 +385,7 @@ func (srv *Service) GetDependencyResolvedSpecs(ctx context.Context, proj models. return func() (interface{}, error) { resolvedSpec, err := srv.dependencyResolver.Resolve(ctx, proj, currentSpec, progressObserver) if err != nil { - //wrappedErr := errors2.Wrap(, err.Error()) + // wrappedErr := errors2.Wrap(, err.Error()) return nil, fmt.Errorf("%s: %s/%s: %w", errDependencyResolution, jobsToNamespace[currentSpec.Name], currentSpec.Name, err) } return resolvedSpec, nil @@ -460,7 +458,7 @@ func (srv *Service) GetDownstream(ctx context.Context, projectSpec models.Projec var jobSpecs []models.JobSpec for _, node := range rootInstance.GetAllNodes() { - //ignore the root + // ignore the root if node.GetName() != rootInstance.GetName() { jobSpecs = append(jobSpecs, node.Data.(models.JobSpec)) } @@ -504,7 +502,7 @@ func (srv *Service) prepareNamespaceJobSpecMap(ctx context.Context, projectSpec func filterNode(parentNode *tree.TreeNode, dependents []*tree.TreeNode, allowedDownstream []string, jobNamespaceMap map[string]string) *tree.TreeNode { for _, dep := range dependents { - //if dep is not within allowed namespace, skip this dependency + // if dep is not within allowed namespace, skip this dependency isAuthorized := false for _, namespace := range allowedDownstream { if namespace == models.AllNamespace || namespace == jobNamespaceMap[dep.GetName()] { @@ -516,19 +514,19 @@ func filterNode(parentNode *tree.TreeNode, dependents []*tree.TreeNode, allowedD continue } - //if dep is within allowed namespace, add the node to parent + // if dep is within allowed namespace, add the node to parent depNode := tree.NewTreeNode(dep.Data) - //check for the dependent + // check for the dependent depNode = filterNode(depNode, dep.Dependents, allowedDownstream, jobNamespaceMap) - //add the complete node + // add the complete node parentNode.AddDependent(depNode) } return parentNode } -func listIgnoredJobs(rootInstance *tree.TreeNode, rootFilteredTree *tree.TreeNode) []string { +func listIgnoredJobs(rootInstance, rootFilteredTree *tree.TreeNode) []string { allowedNodesMap := make(map[string]*tree.TreeNode) for _, allowedNode := range rootFilteredTree.GetAllNodes() { allowedNodesMap[allowedNode.GetName()] = allowedNode @@ -557,7 +555,7 @@ func (srv *Service) notifyProgress(po progress.Observer, event progress.Event) { } // remove items present in from -func setSubtract(from []string, remove []string) []string { +func setSubtract(from, remove []string) []string { removeMap := make(map[string]bool) for _, item := range remove { removeMap[item] = true @@ -702,7 +700,7 @@ func populateDownstreamDAGs(dagTree *tree.MultiRootTree, jobSpec models.JobSpec, for _, childSpec := range jobSpecMap { childNode := findOrCreateDAGNode(dagTree, childSpec) for _, depDAG := range childSpec.Dependencies { - var isExternal = false + isExternal := false parentSpec, ok := jobSpecMap[depDAG.Job.Name] if !ok { if depDAG.Type == models.JobSpecDependencyTypeIntra { diff --git a/job/service_test.go b/job/service_test.go index b7e5d78bde..822baadb41 100644 --- a/job/service_test.go +++ b/job/service_test.go @@ -29,7 +29,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, } @@ -72,7 +72,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, } @@ -106,7 +106,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -131,7 +131,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -176,7 +176,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, @@ -188,7 +188,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, @@ -200,7 +200,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -258,7 +258,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, @@ -274,7 +274,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, @@ -286,7 +286,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -346,7 +346,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, @@ -399,7 +399,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, @@ -447,7 +447,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, @@ -459,7 +459,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, @@ -471,7 +471,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -538,7 +538,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, @@ -548,7 +548,7 @@ func TestService(t *testing.T) { Name: "test-2", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, @@ -592,7 +592,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, @@ -604,7 +604,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, @@ -616,7 +616,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{ @@ -687,7 +687,7 @@ func TestService(t *testing.T) { Name: "test-1", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, @@ -697,7 +697,7 @@ func TestService(t *testing.T) { Name: "test-2", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, @@ -710,7 +710,7 @@ func TestService(t *testing.T) { Name: "test-2", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, @@ -826,7 +826,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, @@ -838,7 +838,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, @@ -891,7 +891,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, @@ -901,7 +901,7 @@ func TestService(t *testing.T) { Name: "downstream-test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, @@ -913,7 +913,7 @@ func TestService(t *testing.T) { Name: "test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, @@ -923,7 +923,7 @@ func TestService(t *testing.T) { Name: "downstream-test", Owner: "optimus", Schedule: models.JobSpecSchedule{ - StartDate: time.Date(2020, 12, 02, 0, 0, 0, 0, time.UTC), + StartDate: time.Date(2020, 12, 2, 0, 0, 0, 0, time.UTC), Interval: "@daily", }, Task: models.JobSpecTask{}, diff --git a/main.go b/main.go index 415b18fab6..e1a33ed26a 100644 --- a/main.go +++ b/main.go @@ -22,9 +22,7 @@ import ( "github.com/odpf/salt/log" ) -var ( - errRequestFail = errors.New("🔥 unable to complete request successfully") -) +var errRequestFail = errors.New("🔥 unable to complete request successfully") type PlainFormatter struct{} diff --git a/mock/datastore.go b/mock/datastore.go index 96b38de66e..75bc6b2ba8 100644 --- a/mock/datastore.go +++ b/mock/datastore.go @@ -20,25 +20,32 @@ type Datastorer struct { func (d *Datastorer) Name() string { return d.Called().Get(0).(string) } + func (d *Datastorer) Description() string { return d.Called().Get(0).(string) } + func (d *Datastorer) Types() map[models.ResourceType]models.DatastoreTypeController { return d.Called().Get(0).(map[models.ResourceType]models.DatastoreTypeController) } + func (d *Datastorer) CreateResource(ctx context.Context, inp models.CreateResourceRequest) error { return d.Called(ctx, inp).Error(0) } + func (d *Datastorer) UpdateResource(ctx context.Context, inp models.UpdateResourceRequest) error { return d.Called(ctx, inp).Error(0) } + func (d *Datastorer) ReadResource(ctx context.Context, inp models.ReadResourceRequest) (models.ReadResourceResponse, error) { args := d.Called(ctx, inp) return args.Get(0).(models.ReadResourceResponse), args.Error(1) } + func (d *Datastorer) DeleteResource(ctx context.Context, inp models.DeleteResourceRequest) error { return d.Called(ctx, inp).Error(0) } + func (d *Datastorer) BackupResource(ctx context.Context, inp models.BackupResourceRequest) (models.BackupResourceResponse, error) { args := d.Called(ctx, models.BackupResourceRequest{ Resource: inp.Resource, diff --git a/mock/replay.go b/mock/replay.go index e488e215a3..ca6e6a0365 100644 --- a/mock/replay.go +++ b/mock/replay.go @@ -131,7 +131,7 @@ func NewReplayWorker() *ReplayWorker { } func (rm *ReplayWorker) Process(ctx context.Context, replayRequest models.ReplayRequest) error { - //mock processing time for concurrent replay call testing + // mock processing time for concurrent replay call testing args := rm.Called(ctx, replayRequest) <-rm.finish return args.Error(0) diff --git a/mock/secret.go b/mock/secret.go index 4ba141df88..4b632c0fc8 100644 --- a/mock/secret.go +++ b/mock/secret.go @@ -42,11 +42,11 @@ type SecretService struct { mock.Mock } -func (s *SecretService) Save(ctx context.Context, prjName string, nsName string, item models.ProjectSecretItem) error { +func (s *SecretService) Save(ctx context.Context, prjName, nsName string, item models.ProjectSecretItem) error { return s.Called(ctx, prjName, nsName, item).Error(0) } -func (s *SecretService) Update(ctx context.Context, prjName string, nsName string, item models.ProjectSecretItem) error { +func (s *SecretService) Update(ctx context.Context, prjName, nsName string, item models.ProjectSecretItem) error { return s.Called(ctx, prjName, nsName, item).Error(0) } diff --git a/models/datastore.go b/models/datastore.go index 854e5df1c5..658ccb8a98 100644 --- a/models/datastore.go +++ b/models/datastore.go @@ -83,7 +83,6 @@ type DatastoreTypeController interface { type DatastoreSpecAdapter interface { ToYaml(spec ResourceSpec) ([]byte, error) FromYaml([]byte) (ResourceSpec, error) - ToProtobuf(ResourceSpec) ([]byte, error) FromProtobuf([]byte) (ResourceSpec, error) } @@ -179,7 +178,6 @@ func (s *supportedDatastore) Add(newUnit Datastorer) error { type DatastoreService interface { // does not really fetch resource metadata, just the user provided spec GetAll(ctx context.Context, namespace NamespaceSpec, datastoreName string) ([]ResourceSpec, error) - CreateResource(ctx context.Context, namespace NamespaceSpec, resourceSpecs []ResourceSpec, obs progress.Observer) error UpdateResource(ctx context.Context, namespace NamespaceSpec, resourceSpecs []ResourceSpec, obs progress.Observer) error ReadResource(ctx context.Context, namespace NamespaceSpec, datastoreName, name string) (ResourceSpec, error) diff --git a/models/job.go b/models/job.go index 96fc03be6f..d9de2a4d07 100644 --- a/models/job.go +++ b/models/job.go @@ -58,7 +58,7 @@ type JobSpec struct { Assets JobAssets Hooks []JobSpecHook Metadata JobSpecMetadata - ExternalDependencies ExternalDependency //external dependencies for http + ExternalDependencies ExternalDependency // external dependencies for http } func (js JobSpec) GetName() string { @@ -202,7 +202,7 @@ func (w *JobSpecTaskWindow) getWindowDate(today time.Time, windowSize, windowOff floatingStart = floatingStart.AddDate(0, int(-sizeMonths), 0) } - //final start is computed + // final start is computed windowStart = floatingStart } @@ -342,7 +342,7 @@ type JobService interface { Replay(context.Context, ReplayRequest) (ReplayResult, error) // GetReplayStatus of a replay using its ID GetReplayStatus(context.Context, ReplayRequest) (ReplayState, error) - //GetReplayList of a project + // GetReplayList of a project GetReplayList(ctx context.Context, projectID uuid.UUID) ([]ReplaySpec, error) // GetByDestination fetches a Job by destination for a specific project GetByDestination(ctx context.Context, projectSpec ProjectSpec, destination string) (JobSpec, error) diff --git a/models/plugin.go b/models/plugin.go index 11ca203907..1e07272275 100644 --- a/models/plugin.go +++ b/models/plugin.go @@ -31,8 +31,10 @@ var ( PluginTypeHook = PluginType(InstanceTypeHook.String()) ) -type PluginType string -type PluginMod string +type ( + PluginType string + PluginMod string +) func (pm PluginMod) String() string { return string(pm) @@ -201,7 +203,6 @@ func (c PluginConfigs) FromJobSpec(jobSpecConfig JobSpecConfigs) PluginConfigs { taskPluginConfigs := PluginConfigs{} for _, c := range jobSpecConfig { taskPluginConfigs = append(taskPluginConfigs, PluginConfig{ - Name: c.Name, Value: c.Value, }) diff --git a/models/project.go b/models/project.go index d234cf190e..89e1feedf0 100644 --- a/models/project.go +++ b/models/project.go @@ -31,13 +31,11 @@ const ( KeyLength = 32 ) -var ( - // PluginSecretString generates plugin secret identifier using its type - // and name, e.g. task, bq2bq - PluginSecretString = func(pluginType InstanceType, pluginName string) string { - return strings.ToUpper(fmt.Sprintf("%s_%s", pluginType, pluginName)) - } -) +// PluginSecretString generates plugin secret identifier using its type +// and name, e.g. task, bq2bq +var PluginSecretString = func(pluginType InstanceType, pluginName string) string { + return strings.ToUpper(fmt.Sprintf("%s_%s", pluginType, pluginName)) +} type ProjectSpec struct { ID uuid.UUID diff --git a/models/scheduler.go b/models/scheduler.go index dfed8adf58..56d7aede43 100644 --- a/models/scheduler.go +++ b/models/scheduler.go @@ -23,7 +23,6 @@ var ( // SchedulerUnit is implemented by supported schedulers type SchedulerUnit interface { GetName() string - VerifyJob(ctx context.Context, namespace NamespaceSpec, job JobSpec) error ListJobs(ctx context.Context, namespace NamespaceSpec, opts SchedulerListOptions) ([]Job, error) DeployJobs(ctx context.Context, namespace NamespaceSpec, jobs []JobSpec, obs progress.Observer) error @@ -115,8 +114,7 @@ type ExecutorStopRequest struct { Signal string } -type ExecutorStartResponse struct { -} +type ExecutorStartResponse struct{} type ExecutorStats struct { Logs []byte diff --git a/plugin/gen.go b/plugin/gen.go index ca7b11b637..9010d70bee 100644 --- a/plugin/gen.go +++ b/plugin/gen.go @@ -88,7 +88,7 @@ func BuildHelper(l log.Logger, templateEngine models.TemplateEngine, configBytes return fmt.Errorf("failed to create output dir: %w", err) } - //prepare entrypoint string blob + // prepare entrypoint string blob entrypointLines := []string{} for _, line := range strings.Split(EntrypointTemplate, "\n") { if len(line) == 0 { @@ -113,7 +113,7 @@ func BuildHelper(l log.Logger, templateEngine models.TemplateEngine, configBytes return err } destPath = filepath.Join(taskPlugin.Path, "Dockerfile") - if err := ioutil.WriteFile(destPath, []byte(dockerFile), 0655); err != nil { + if err := ioutil.WriteFile(destPath, []byte(dockerFile), 0o655); err != nil { return err } @@ -186,7 +186,7 @@ func BuildHelper(l log.Logger, templateEngine models.TemplateEngine, configBytes return err } destPath := filepath.Join(hookPlugin.Path, "Dockerfile") - if err := ioutil.WriteFile(destPath, []byte(dockerFile), 0655); err != nil { + if err := ioutil.WriteFile(destPath, []byte(dockerFile), 0o655); err != nil { return err } diff --git a/plugin/plugin.go b/plugin/plugin.go index 7a2496791a..5930e03cd7 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -25,7 +25,7 @@ func Initialize(pluginLogger hclog.Logger) error { pluginLogger.Debug(fmt.Sprintf("discovering plugins(%d)...", len(discoveredPlugins))) // pluginMap is the map of plugins we can dispense. - var pluginMap = map[string]plugin.Plugin{ + pluginMap := map[string]plugin.Plugin{ models.PluginTypeBase: base.NewPluginClient(pluginLogger), models.ModTypeCLI.String(): cli.NewPluginClient(pluginLogger), models.ModTypeDependencyResolver.String(): dependencyresolver.NewPluginClient(pluginLogger), diff --git a/run/context.go b/run/context.go index 245c40b07a..ecfcb5d02a 100644 --- a/run/context.go +++ b/run/context.go @@ -22,11 +22,9 @@ const ( SecretsStringToMatch = ".secret." ) -var ( - // IgnoreTemplateRenderExtension used as extension on a file will skip template - // rendering of it - IgnoreTemplateRenderExtension = []string{".gtpl", ".j2", ".tmpl", ".tpl"} -) +// IgnoreTemplateRenderExtension used as extension on a file will skip template +// rendering of it +var IgnoreTemplateRenderExtension = []string{".gtpl", ".j2", ".tmpl", ".tpl"} // ContextManager fetches all config data for a given instanceSpec and compiles all // macros/templates. @@ -100,7 +98,7 @@ func (fm *ContextManager) createContextForTask(instanceConfig map[string]string) return contextForTask } -func (fm *ContextManager) createContextForHook(initialContext map[string]interface{}, taskConfigs map[string]string, taskSecretConfigs map[string]string) map[string]interface{} { +func (fm *ContextManager) createContextForHook(initialContext map[string]interface{}, taskConfigs, taskSecretConfigs map[string]string) map[string]interface{} { // Merge taskConfig and secret config for the context mergedTaskConfigs := utils.MergeMaps(taskConfigs, taskSecretConfigs) diff --git a/run/go_engine_test.go b/run/go_engine_test.go index ac88d729a9..563750e672 100644 --- a/run/go_engine_test.go +++ b/run/go_engine_test.go @@ -121,7 +121,6 @@ func TestGoEngine(t *testing.T) { comp := run.NewGoEngine() compiledExpr, err := comp.CompileFiles(testCase.Input, values) - if err != nil { t.Error(err) } diff --git a/run/service_test.go b/run/service_test.go index 2f0e02658d..483479af04 100644 --- a/run/service_test.go +++ b/run/service_test.go @@ -377,7 +377,7 @@ func TestService(t *testing.T) { } // Copy exported fields -func Copy(dst interface{}, src interface{}) error { +func Copy(dst, src interface{}) error { bytes, err := json.Marshal(src) if err != nil { return fmt.Errorf("failed to marshal src: %v", err) diff --git a/service/errors.go b/service/errors.go index ac85caeaee..933c428748 100644 --- a/service/errors.go +++ b/service/errors.go @@ -39,7 +39,7 @@ func NewError(entity string, errType ErrorType, msg string) *DomainError { } } -func FromError(err error, entity string, msg string) *DomainError { +func FromError(err error, entity, msg string) *DomainError { errType := ErrInternalError msgStr := "internal error" if errors.Is(err, store.ErrResourceNotFound) { diff --git a/service/namespace_service.go b/service/namespace_service.go index bb6b771f49..c1a79b6a57 100644 --- a/service/namespace_service.go +++ b/service/namespace_service.go @@ -30,7 +30,7 @@ func NewNamespaceService(projectService ProjectService, factory NamespaceRepoFac } } -func (s namespaceService) Get(ctx context.Context, projectName string, namespaceName string) (models.NamespaceSpec, error) { +func (s namespaceService) Get(ctx context.Context, projectName, namespaceName string) (models.NamespaceSpec, error) { if projectName == "" { return models.NamespaceSpec{}, NewError(models.ProjectEntity, ErrInvalidArgument, "project name cannot be empty") @@ -50,7 +50,7 @@ func (s namespaceService) Get(ctx context.Context, projectName string, namespace } // GetNamespaceOptionally is used for optionally getting namespace if name is present, otherwise get only project -func (s namespaceService) GetNamespaceOptionally(ctx context.Context, projectName string, namespaceName string) (models.ProjectSpec, models.NamespaceSpec, error) { +func (s namespaceService) GetNamespaceOptionally(ctx context.Context, projectName, namespaceName string) (models.ProjectSpec, models.NamespaceSpec, error) { projectSpec, err := s.projectService.Get(ctx, projectName) if err != nil { return models.ProjectSpec{}, models.NamespaceSpec{}, err diff --git a/service/secret_service.go b/service/secret_service.go index 9d5eb760c1..44938a5c60 100644 --- a/service/secret_service.go +++ b/service/secret_service.go @@ -29,7 +29,7 @@ func NewSecretService(projectService ProjectService, namespaceService NamespaceS } } -func (s secretService) Save(ctx context.Context, projectName string, namespaceName string, item models.ProjectSecretItem) error { +func (s secretService) Save(ctx context.Context, projectName, namespaceName string, item models.ProjectSecretItem) error { if item.Name == "" { return NewError(models.SecretEntity, ErrInvalidArgument, "secret name cannot be empty") } @@ -46,7 +46,7 @@ func (s secretService) Save(ctx context.Context, projectName string, namespaceNa return nil } -func (s secretService) Update(ctx context.Context, projectName string, namespaceName string, item models.ProjectSecretItem) error { +func (s secretService) Update(ctx context.Context, projectName, namespaceName string, item models.ProjectSecretItem) error { if item.Name == "" { return NewError(models.SecretEntity, ErrInvalidArgument, "secret name cannot be empty") } diff --git a/store/local/job_spec_repository.go b/store/local/job_spec_repository.go index ffcc5d7fad..99d4559ee5 100644 --- a/store/local/job_spec_repository.go +++ b/store/local/job_spec_repository.go @@ -22,9 +22,7 @@ const ( AssetFolderName = "assets" ) -var ( - specSuffixRegex = regexp.MustCompile(`\.(yaml|cfg|sql|txt)$`) -) +var specSuffixRegex = regexp.MustCompile(`\.(yaml|cfg|sql|txt)$`) type cacheItem struct { path string @@ -58,20 +56,20 @@ func (repo *jobRepository) SaveAt(job models.JobSpec, rootDir string) error { } // create necessary folders - if err = repo.fs.MkdirAll(repo.assetFolderPath(rootDir), os.FileMode(0765)|os.ModeDir); err != nil { + if err = repo.fs.MkdirAll(repo.assetFolderPath(rootDir), os.FileMode(0o765)|os.ModeDir); err != nil { return fmt.Errorf("repo.fs.MkdirAll: %s: %w", rootDir, err) } // save assets for assetName, assetValue := range config.Asset { - if err := afero.WriteFile(repo.fs, repo.assetFilePath(rootDir, assetName), []byte(assetValue), os.FileMode(0755)); err != nil { + if err := afero.WriteFile(repo.fs, repo.assetFilePath(rootDir, assetName), []byte(assetValue), os.FileMode(0o755)); err != nil { return fmt.Errorf("WriteFile.Asset: %s: %w", repo.assetFilePath(rootDir, assetName), err) } } config.Asset = nil // save job - fd, err := repo.fs.OpenFile(repo.jobFilePath(rootDir), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.FileMode(0755)) + fd, err := repo.fs.OpenFile(repo.jobFilePath(rootDir), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.FileMode(0o755)) if err != nil { return err } @@ -345,7 +343,7 @@ func (repo *jobRepository) assetFolderPath(name string) string { // assetFilePath generates the path to asset directory files // for a given job -func (repo *jobRepository) assetFilePath(job string, file string) string { +func (repo *jobRepository) assetFilePath(job, file string) string { return filepath.Join(repo.assetFolderPath(job), file) } diff --git a/store/local/job_spec_repository_test.go b/store/local/job_spec_repository_test.go index 9e30ebbafc..c80d8fabc1 100644 --- a/store/local/job_spec_repository_test.go +++ b/store/local/job_spec_repository_test.go @@ -245,10 +245,10 @@ func TestJobSpecRepository(t *testing.T) { t.Run("should open the file and parse its contents", func(t *testing.T) { // create test files and directories appFS := afero.NewMemMapFs() - appFS.MkdirAll(spec.Name, 0755) - afero.WriteFile(appFS, filepath.Join(spec.Name, local.JobSpecFileName), []byte(testJobContents), 0644) - appFS.MkdirAll(filepath.Join(spec.Name, local.AssetFolderName), 0755) - afero.WriteFile(appFS, filepath.Join(spec.Name, local.AssetFolderName, "query.sql"), []byte(jobConfig.Asset["query.sql"]), 0644) + appFS.MkdirAll(spec.Name, 0o755) + afero.WriteFile(appFS, filepath.Join(spec.Name, local.JobSpecFileName), []byte(testJobContents), 0o644) + appFS.MkdirAll(filepath.Join(spec.Name, local.AssetFolderName), 0o755) + afero.WriteFile(appFS, filepath.Join(spec.Name, local.AssetFolderName, "query.sql"), []byte(jobConfig.Asset["query.sql"]), 0o644) repo := local.NewJobSpecRepository(appFS, adapter) returnedSpec, err := repo.GetByName(spec.Name) @@ -286,11 +286,11 @@ task: // ./spec/job.yaml // ./spec/asset/query.sql appFS := afero.NewMemMapFs() - afero.WriteFile(appFS, local.JobSpecParentName, []byte(thisYamlContent), 0644) - appFS.MkdirAll(spec.Name, 0755) - afero.WriteFile(appFS, filepath.Join(spec.Name, local.JobSpecFileName), []byte(testJobContentsLocal), 0644) - appFS.MkdirAll(filepath.Join(spec.Name, local.AssetFolderName), 0755) - afero.WriteFile(appFS, filepath.Join(spec.Name, local.AssetFolderName, "query.sql"), []byte(jobConfig.Asset["query.sql"]), 0644) + afero.WriteFile(appFS, local.JobSpecParentName, []byte(thisYamlContent), 0o644) + appFS.MkdirAll(spec.Name, 0o755) + afero.WriteFile(appFS, filepath.Join(spec.Name, local.JobSpecFileName), []byte(testJobContentsLocal), 0o644) + appFS.MkdirAll(filepath.Join(spec.Name, local.AssetFolderName), 0o755) + afero.WriteFile(appFS, filepath.Join(spec.Name, local.AssetFolderName, "query.sql"), []byte(jobConfig.Asset["query.sql"]), 0o644) repo := local.NewJobSpecRepository(appFS, adapter) returnedSpec, err := repo.GetByName(spec.Name) @@ -337,15 +337,15 @@ task: // ./secret_jobs/spec/job.yaml // ./secret_jobs/spec/asset/query.sql appFS := afero.NewMemMapFs() - afero.WriteFile(appFS, local.JobSpecParentName, []byte(thisYamlContentRoot), 0644) - appFS.MkdirAll("secret_jobs", 0755) + afero.WriteFile(appFS, local.JobSpecParentName, []byte(thisYamlContentRoot), 0o644) + appFS.MkdirAll("secret_jobs", 0o755) - appFS.MkdirAll(filepath.Join("secret_jobs", spec.Name), 0755) - afero.WriteFile(appFS, filepath.Join("secret_jobs", local.JobSpecParentName), []byte(thisYamlContentSubFolder), 0644) + appFS.MkdirAll(filepath.Join("secret_jobs", spec.Name), 0o755) + afero.WriteFile(appFS, filepath.Join("secret_jobs", local.JobSpecParentName), []byte(thisYamlContentSubFolder), 0o644) - afero.WriteFile(appFS, filepath.Join("secret_jobs", spec.Name, local.JobSpecFileName), []byte(testJobContentsLocal), 0644) - appFS.MkdirAll(filepath.Join("secret_jobs", spec.Name, local.AssetFolderName), 0755) - afero.WriteFile(appFS, filepath.Join("secret_jobs", spec.Name, local.AssetFolderName, "query.sql"), []byte(jobConfig.Asset["query.sql"]), 0644) + afero.WriteFile(appFS, filepath.Join("secret_jobs", spec.Name, local.JobSpecFileName), []byte(testJobContentsLocal), 0o644) + appFS.MkdirAll(filepath.Join("secret_jobs", spec.Name, local.AssetFolderName), 0o755) + afero.WriteFile(appFS, filepath.Join("secret_jobs", spec.Name, local.AssetFolderName, "query.sql"), []byte(jobConfig.Asset["query.sql"]), 0o644) repo := local.NewJobSpecRepository(appFS, adapter) returnedSpec, err := repo.GetByName(spec.Name) @@ -361,10 +361,10 @@ task: t.Run("should use cache if file is requested more than once", func(t *testing.T) { // create test files and directories appFS := afero.NewMemMapFs() - appFS.MkdirAll(spec.Name, 0755) - afero.WriteFile(appFS, filepath.Join(spec.Name, local.JobSpecFileName), []byte(testJobContents), 0644) - appFS.MkdirAll(filepath.Join(spec.Name, local.AssetFolderName), 0755) - afero.WriteFile(appFS, filepath.Join(spec.Name, local.AssetFolderName, "query.sql"), []byte(jobConfig.Asset["query.sql"]), 0644) + appFS.MkdirAll(spec.Name, 0o755) + afero.WriteFile(appFS, filepath.Join(spec.Name, local.JobSpecFileName), []byte(testJobContents), 0o644) + appFS.MkdirAll(filepath.Join(spec.Name, local.AssetFolderName), 0o755) + afero.WriteFile(appFS, filepath.Join(spec.Name, local.AssetFolderName, "query.sql"), []byte(jobConfig.Asset["query.sql"]), 0o644) repo := local.NewJobSpecRepository(appFS, adapter) returnedSpec, err := repo.GetByName(spec.Name) @@ -388,7 +388,7 @@ task: }) t.Run("should return ErrNoSuchSpec in case the job folder exist but no job file exist", func(t *testing.T) { appFS := afero.NewMemMapFs() - appFS.MkdirAll(spec.Name, 0755) + appFS.MkdirAll(spec.Name, 0o755) repo := local.NewJobSpecRepository(appFS, adapter) _, err := repo.GetByName(spec.Name) @@ -402,8 +402,8 @@ task: t.Run("should return error if yaml source is incorrect and failed to validate", func(t *testing.T) { // create test files and directories appFS := afero.NewMemMapFs() - appFS.MkdirAll(spec.Name, 0755) - afero.WriteFile(appFS, filepath.Join(spec.Name, local.JobSpecFileName), []byte("name:a"), 0644) + appFS.MkdirAll(spec.Name, 0o755) + afero.WriteFile(appFS, filepath.Join(spec.Name, local.JobSpecFileName), []byte("name:a"), 0o644) repo := local.NewJobSpecRepository(appFS, adapter) _, err := repo.GetByName(spec.Name) @@ -503,9 +503,9 @@ hooks: []`, appFS := afero.NewMemMapFs() for idx, jobspec := range jobspecs { - appFS.MkdirAll(jobspec.Name, 0755) - afero.WriteFile(appFS, filepath.Join(jobspec.Name, local.JobSpecFileName), []byte(content[idx]), 0644) - appFS.MkdirAll(filepath.Join(jobspec.Name, local.AssetFolderName), 0755) + appFS.MkdirAll(jobspec.Name, 0o755) + afero.WriteFile(appFS, filepath.Join(jobspec.Name, local.JobSpecFileName), []byte(content[idx]), 0o644) + appFS.MkdirAll(filepath.Join(jobspec.Name, local.AssetFolderName), 0o755) } repo := local.NewJobSpecRepository(appFS, adapter) @@ -524,7 +524,7 @@ hooks: []`, }) t.Run("should return ErrNoSpecsFound if the root directory has no files", func(t *testing.T) { appFS := afero.NewMemMapFs() - appFS.MkdirAll("test", 0755) + appFS.MkdirAll("test", 0o755) repo := local.NewJobSpecRepository(appFS, adapter) _, err := repo.GetAll() @@ -534,9 +534,9 @@ hooks: []`, appFS := afero.NewMemMapFs() for idx, jobspec := range jobspecs { - appFS.MkdirAll(jobspec.Name, 0755) - afero.WriteFile(appFS, filepath.Join(jobspec.Name, local.JobSpecFileName), []byte(content[idx]), 0644) - appFS.MkdirAll(filepath.Join(jobspec.Name, local.AssetFolderName), 0755) + appFS.MkdirAll(jobspec.Name, 0o755) + afero.WriteFile(appFS, filepath.Join(jobspec.Name, local.JobSpecFileName), []byte(content[idx]), 0o644) + appFS.MkdirAll(filepath.Join(jobspec.Name, local.AssetFolderName), 0o755) } repo := local.NewJobSpecRepository(appFS, adapter) diff --git a/store/local/resource_spec_repository.go b/store/local/resource_spec_repository.go index a984add38e..a75b222043 100644 --- a/store/local/resource_spec_repository.go +++ b/store/local/resource_spec_repository.go @@ -55,19 +55,19 @@ func (repo *resourceRepository) SaveAt(resourceSpec models.ResourceSpec, rootDir } // create necessary folders - if err = repo.fs.MkdirAll(repo.assetFolderPath(rootDir), os.FileMode(0765)|os.ModeDir); err != nil { + if err = repo.fs.MkdirAll(repo.assetFolderPath(rootDir), os.FileMode(0o765)|os.ModeDir); err != nil { return fmt.Errorf("repo.fs.MkdirAll: %s: %w", rootDir, err) } // save assets for assetName, assetValue := range resourceSpec.Assets { - if err := afero.WriteFile(repo.fs, repo.assetFilePath(rootDir, assetName), []byte(assetValue), os.FileMode(0755)); err != nil { + if err := afero.WriteFile(repo.fs, repo.assetFilePath(rootDir, assetName), []byte(assetValue), os.FileMode(0o755)); err != nil { return fmt.Errorf("WriteFile.Asset: %s: %w", repo.assetFilePath(rootDir, assetName), err) } } // save resource - if afero.WriteFile(repo.fs, repo.resourceFilePath(rootDir), specBytes, os.FileMode(0755)); err != nil { + if afero.WriteFile(repo.fs, repo.resourceFilePath(rootDir), specBytes, os.FileMode(0o755)); err != nil { return err } @@ -330,7 +330,7 @@ func (repo *resourceRepository) assetFolderPath(name string) string { } // assetFilePath generates the path to asset directory files -func (repo *resourceRepository) assetFilePath(job string, file string) string { +func (repo *resourceRepository) assetFilePath(job, file string) string { return filepath.Join(repo.assetFolderPath(job), file) } diff --git a/store/local/resource_spec_repository_test.go b/store/local/resource_spec_repository_test.go index 8aef73ed68..29cbb42cea 100644 --- a/store/local/resource_spec_repository_test.go +++ b/store/local/resource_spec_repository_test.go @@ -101,9 +101,9 @@ func TestResourceSpecRepository(t *testing.T) { t.Run("should open the file ${ROOT}/${name}.yaml and parse its contents", func(t *testing.T) { // create test files and directories appFS := afero.NewMemMapFs() - appFS.MkdirAll(specTable.Name, 0755) - afero.WriteFile(appFS, filepath.Join(specTable.Name, local.ResourceSpecFileName), []byte(testResourceContents), 0644) - afero.WriteFile(appFS, filepath.Join(specTable.Name, "query.sql"), []byte(specTable.Assets["query.sql"]), 0644) + appFS.MkdirAll(specTable.Name, 0o755) + afero.WriteFile(appFS, filepath.Join(specTable.Name, local.ResourceSpecFileName), []byte(testResourceContents), 0o644) + afero.WriteFile(appFS, filepath.Join(specTable.Name, "query.sql"), []byte(specTable.Assets["query.sql"]), 0o644) repo := local.NewResourceSpecRepository(appFS, datastorer) returnedSpec, err := repo.GetByName(ctx, specTable.Name) @@ -113,9 +113,9 @@ func TestResourceSpecRepository(t *testing.T) { t.Run("should use cache if file is requested more than once", func(t *testing.T) { // create test files and directories appFS := afero.NewMemMapFs() - appFS.MkdirAll(specTable.Name, 0755) - afero.WriteFile(appFS, filepath.Join(specTable.Name, local.ResourceSpecFileName), []byte(testResourceContents), 0644) - afero.WriteFile(appFS, filepath.Join(specTable.Name, "query.sql"), []byte(specTable.Assets["query.sql"]), 0644) + appFS.MkdirAll(specTable.Name, 0o755) + afero.WriteFile(appFS, filepath.Join(specTable.Name, local.ResourceSpecFileName), []byte(testResourceContents), 0o644) + afero.WriteFile(appFS, filepath.Join(specTable.Name, "query.sql"), []byte(specTable.Assets["query.sql"]), 0o644) repo := local.NewResourceSpecRepository(appFS, datastorer) returnedSpec, err := repo.GetByName(ctx, specTable.Name) @@ -136,7 +136,7 @@ func TestResourceSpecRepository(t *testing.T) { }) t.Run("should return ErrNoSuchSpec in case the folder exist but no resource file exist", func(t *testing.T) { appFS := afero.NewMemMapFs() - appFS.MkdirAll(specTable.Name, 0755) + appFS.MkdirAll(specTable.Name, 0o755) repo := local.NewResourceSpecRepository(appFS, datastorer) _, err := repo.GetByName(ctx, specTable.Name) @@ -150,9 +150,9 @@ func TestResourceSpecRepository(t *testing.T) { t.Run("should return error if yaml source is incorrect and failed to validate", func(t *testing.T) { // create test files and directories appFS := afero.NewMemMapFs() - appFS.MkdirAll(specTable.Name, 0755) - afero.WriteFile(appFS, filepath.Join(specTable.Name, local.ResourceSpecFileName), []byte("name:a"), 0644) - afero.WriteFile(appFS, filepath.Join(specTable.Name, "query.sql"), []byte(specTable.Assets["query.sql"]), 0644) + appFS.MkdirAll(specTable.Name, 0o755) + afero.WriteFile(appFS, filepath.Join(specTable.Name, local.ResourceSpecFileName), []byte("name:a"), 0o644) + afero.WriteFile(appFS, filepath.Join(specTable.Name, "query.sql"), []byte(specTable.Assets["query.sql"]), 0o644) repo := local.NewResourceSpecRepository(appFS, datastorer) _, err := repo.GetByName(ctx, specTable.Name) @@ -209,8 +209,8 @@ spec: // create test files and directories appFS := afero.NewMemMapFs() for idx, resSpec := range resSpecs { - appFS.MkdirAll(resSpec.Name, 0755) - afero.WriteFile(appFS, filepath.Join(resSpec.Name, local.ResourceSpecFileName), []byte(content[idx]), 0644) + appFS.MkdirAll(resSpec.Name, 0o755) + afero.WriteFile(appFS, filepath.Join(resSpec.Name, local.ResourceSpecFileName), []byte(content[idx]), 0o644) } repo := local.NewResourceSpecRepository(appFS, datastorer) @@ -229,7 +229,7 @@ spec: }) t.Run("should return ErrNoResources if the root directory has no files", func(t *testing.T) { appFS := afero.NewMemMapFs() - appFS.MkdirAll("test", 0755) + appFS.MkdirAll("test", 0o755) repo := local.NewResourceSpecRepository(appFS, datastorer) _, err := repo.GetAll(ctx) @@ -239,8 +239,8 @@ spec: // create test files and directories appFS := afero.NewMemMapFs() for idx, resSpec := range resSpecs { - appFS.MkdirAll(resSpec.Name, 0755) - afero.WriteFile(appFS, filepath.Join(resSpec.Name, local.ResourceSpecFileName), []byte(content[idx]), 0644) + appFS.MkdirAll(resSpec.Name, 0o755) + afero.WriteFile(appFS, filepath.Join(resSpec.Name, local.ResourceSpecFileName), []byte(content[idx]), 0o644) } repo := local.NewResourceSpecRepository(appFS, datastorer) diff --git a/store/postgres/adapter.go b/store/postgres/adapter.go index d74630c5c0..28c21322ff 100644 --- a/store/postgres/adapter.go +++ b/store/postgres/adapter.go @@ -37,14 +37,14 @@ type Job struct { TaskName string TaskConfig datatypes.JSON - WindowSize *int64 //duration in nanos + WindowSize *int64 // duration in nanos WindowOffset *int64 WindowTruncateTo *string Assets datatypes.JSON Hooks datatypes.JSON Metadata datatypes.JSON - ExternalDependencies datatypes.JSON //store external dependencies + ExternalDependencies datatypes.JSON // store external dependencies CreatedAt time.Time `gorm:"not null" json:"created_at"` UpdatedAt time.Time `gorm:"not null" json:"updated_at"` @@ -166,7 +166,7 @@ func (adapt JobSpecAdapter) ToSpec(conf Job) (models.JobSpec, error) { return models.JobSpec{}, err } - //prep external dependencies + // prep external dependencies externalDependencies := models.ExternalDependency{} if conf.ExternalDependencies != nil { if err := json.Unmarshal(conf.ExternalDependencies, &externalDependencies); err != nil { @@ -179,7 +179,7 @@ func (adapt JobSpecAdapter) ToSpec(conf Job) (models.JobSpec, error) { return models.JobSpec{}, err } - //prep assets + // prep assets jobAssets := []models.JobSpecAsset{} assetsRaw := []JobAsset{} if err := json.Unmarshal(conf.Assets, &assetsRaw); err != nil { @@ -189,7 +189,7 @@ func (adapt JobSpecAdapter) ToSpec(conf Job) (models.JobSpec, error) { jobAssets = append(jobAssets, asset.ToSpec()) } - //prep hooks + // prep hooks jobHooks := []models.JobSpecHook{} hooksRaw := []JobHook{} if err := json.Unmarshal(conf.Hooks, &hooksRaw); err != nil { diff --git a/store/postgres/backup_repository_test.go b/store/postgres/backup_repository_test.go index 0bb8032293..5696135a30 100644 --- a/store/postgres/backup_repository_test.go +++ b/store/postgres/backup_repository_test.go @@ -94,7 +94,7 @@ func TestIntegrationBackupRepository(t *testing.T) { projectName := "project" destinationDataset := "optimus_backup" destinationTable := fmt.Sprintf("backup_playground_table_%s", backupUUID) - //urn := fmt.Sprintf("store://%s:%s.%s", projectName, destinationDataset, destinationTable) + // urn := fmt.Sprintf("store://%s:%s.%s", projectName, destinationDataset, destinationTable) backupResult := make(map[string]interface{}) backupResult["project"] = projectName diff --git a/store/postgres/job_spec_repository.go b/store/postgres/job_spec_repository.go index c7b2cb96e3..c4446b697f 100644 --- a/store/postgres/job_spec_repository.go +++ b/store/postgres/job_spec_repository.go @@ -80,7 +80,7 @@ func (repo *ProjectJobSpecRepository) GetAll(ctx context.Context) ([]models.JobS return specs, nil } -func (repo *ProjectJobSpecRepository) GetByNameForProject(ctx context.Context, projName string, jobName string) (models.JobSpec, models.ProjectSpec, error) { +func (repo *ProjectJobSpecRepository) GetByNameForProject(ctx context.Context, projName, jobName string) (models.JobSpec, models.ProjectSpec, error) { var r Job var p Project if err := repo.db.WithContext(ctx).Where("name = ?", projName).First(&p).Error; err != nil { diff --git a/store/postgres/job_spec_repository_test.go b/store/postgres/job_spec_repository_test.go index 03a47a221b..aecc6cc5fd 100644 --- a/store/postgres/job_spec_repository_test.go +++ b/store/postgres/job_spec_repository_test.go @@ -282,7 +282,7 @@ func TestIntegrationJobRepository(t *testing.T) { projectJobSpecRepo := NewProjectJobSpecRepository(db, projectSpec, adapter) repo := NewJobSpecRepository(db, namespaceSpec, projectJobSpecRepo, adapter) - //try for create + // try for create err := repo.Save(ctx, testModelA) assert.Nil(t, err) @@ -292,7 +292,7 @@ func TestIntegrationJobRepository(t *testing.T) { taskSchema := checkModel.Task.Unit.Info() assert.Equal(t, gTask, taskSchema.Name) - //try for update + // try for update err = repo.Save(ctx, testModelB) assert.Nil(t, err) @@ -324,7 +324,7 @@ func TestIntegrationJobRepository(t *testing.T) { projectJobSpecRepo := NewProjectJobSpecRepository(db, projectSpec, adapter) repo := NewJobSpecRepository(db, namespaceSpec, projectJobSpecRepo, adapter) - //try for create + // try for create testModelA.Task.Unit = &models.Plugin{Base: execUnit1, DependencyMod: depMod1} err := repo.Save(ctx, testModelA) assert.Nil(t, err) @@ -338,7 +338,7 @@ func TestIntegrationJobRepository(t *testing.T) { testModelA.Task.Window.Offset = time.Hour * 2 testModelA.Task.Window.Size = 0 - //try for update + // try for update testModelA.Task.Unit = &models.Plugin{Base: execUnit2, DependencyMod: depMod2} err = repo.Save(ctx, testModelA) assert.Nil(t, err) @@ -360,7 +360,7 @@ func TestIntegrationJobRepository(t *testing.T) { projectJobSpecRepo := NewProjectJobSpecRepository(db, projectSpec, adapter) repo := NewJobSpecRepository(db, namespaceSpec, projectJobSpecRepo, adapter) - //try for create + // try for create err := repo.Save(ctx, testModelA) assert.Nil(t, err) @@ -509,7 +509,7 @@ func TestIntegrationJobRepository(t *testing.T) { projectJobSpecRepo := NewProjectJobSpecRepository(db, projectSpec, adapter) repo := NewJobSpecRepository(db, namespaceSpec, projectJobSpecRepo, adapter) - //try for create + // try for create err := repo.Save(ctx, testModelA) assert.Nil(t, err) @@ -522,7 +522,7 @@ func TestIntegrationJobRepository(t *testing.T) { assert.Equal(t, time.Duration(0), checkModel.Behavior.Retry.Delay) assert.Equal(t, true, checkModel.Behavior.Retry.ExponentialBackoff) - //try for update + // try for update testModelA.Behavior.CatchUp = false testModelA.Behavior.DependsOnPast = true err = repo.Save(ctx, testModelA) diff --git a/store/postgres/namespace_repository_test.go b/store/postgres/namespace_repository_test.go index f54ffe0099..48520b2e10 100644 --- a/store/postgres/namespace_repository_test.go +++ b/store/postgres/namespace_repository_test.go @@ -140,7 +140,7 @@ func TestIntegrationNamespaceRepository(t *testing.T) { repo := NewNamespaceRepository(db, projectSpec, hash) - //try for create + // try for create err := repo.Save(ctx, testModelA) assert.Nil(t, err) @@ -148,7 +148,7 @@ func TestIntegrationNamespaceRepository(t *testing.T) { assert.Nil(t, err) assert.Equal(t, "g-optimus", checkModel.Name) - //try for update + // try for update err = repo.Save(ctx, testModelB) assert.Nil(t, err) @@ -165,7 +165,7 @@ func TestIntegrationNamespaceRepository(t *testing.T) { repo := NewNamespaceRepository(db, projectSpec, hash) - //try for create + // try for create testModelA.Config["bucket"] = "gs://some_folder" err := repo.Save(ctx, testModelA) assert.Nil(t, err) @@ -174,7 +174,7 @@ func TestIntegrationNamespaceRepository(t *testing.T) { assert.Nil(t, err) assert.Equal(t, "t-optimus", checkModel.Name) - //try for update + // try for update testModelA.Config["bucket"] = "gs://another_folder" err = repo.Save(ctx, testModelA) assert.Nil(t, err) @@ -192,7 +192,7 @@ func TestIntegrationNamespaceRepository(t *testing.T) { repo := NewNamespaceRepository(db, projectSpec, hash) - //try for create + // try for create err := repo.Save(ctx, testModelA) assert.Nil(t, err) diff --git a/store/postgres/postgres.go b/store/postgres/postgres.go index bc9f2ec96a..56163ba9f4 100644 --- a/store/postgres/postgres.go +++ b/store/postgres/postgres.go @@ -34,9 +34,7 @@ const ( tracingSpanKey = "otel:span" ) -var ( - tracer = otel.Tracer("optimus/store/postgres") -) +var tracer = otel.Tracer("optimus/store/postgres") // NewHTTPFSMigrator reads the migrations from httpfs and returns the migrate.Migrate func NewHTTPFSMigrator(DBConnURL string) (*migrate.Migrate, error) { diff --git a/store/postgres/project_repository_test.go b/store/postgres/project_repository_test.go index 579a235b1e..a207001d32 100644 --- a/store/postgres/project_repository_test.go +++ b/store/postgres/project_repository_test.go @@ -104,7 +104,7 @@ func TestIntegrationProjectRepository(t *testing.T) { repo := NewProjectRepository(db, hash) - //try for create + // try for create err := repo.Save(ctx, testModelA) assert.Nil(t, err) @@ -112,7 +112,7 @@ func TestIntegrationProjectRepository(t *testing.T) { assert.Nil(t, err) assert.Equal(t, "g-optimus", checkModel.Name) - //try for update + // try for update err = repo.Save(ctx, testModelB) assert.Nil(t, err) @@ -129,7 +129,7 @@ func TestIntegrationProjectRepository(t *testing.T) { repo := NewProjectRepository(db, hash) - //try for create + // try for create testModelA.Config["bucket"] = "gs://some_folder" err := repo.Save(ctx, testModelA) assert.Nil(t, err) @@ -138,7 +138,7 @@ func TestIntegrationProjectRepository(t *testing.T) { assert.Nil(t, err) assert.Equal(t, "t-optimus", checkModel.Name) - //try for update + // try for update testModelA.Config["bucket"] = "gs://another_folder" err = repo.Save(ctx, testModelA) assert.Nil(t, err) @@ -156,7 +156,7 @@ func TestIntegrationProjectRepository(t *testing.T) { repo := NewProjectRepository(db, hash) - //try for create + // try for create err := repo.Save(ctx, testModelA) assert.Nil(t, err) diff --git a/store/postgres/replay_repository.go b/store/postgres/replay_repository.go index 3f042b8c05..59a21d5afb 100644 --- a/store/postgres/replay_repository.go +++ b/store/postgres/replay_repository.go @@ -40,7 +40,7 @@ type ExecutionTree struct { } func fromTreeNode(treeNode *tree.TreeNode) *ExecutionTree { - //only store necessary job spec data in tree + // only store necessary job spec data in tree treeNodeJob := treeNode.Data.(models.JobSpec) jobSpec := Job{ ID: treeNodeJob.ID, diff --git a/store/postgres/replay_repository_test.go b/store/postgres/replay_repository_test.go index 1ccfbe1967..44ce28921c 100644 --- a/store/postgres/replay_repository_test.go +++ b/store/postgres/replay_repository_test.go @@ -20,7 +20,7 @@ import ( "gorm.io/gorm" ) -func treeIsEqual(treeNode *tree.TreeNode, treeNodeComparator *tree.TreeNode) bool { +func treeIsEqual(treeNode, treeNodeComparator *tree.TreeNode) bool { if treeNode.Data.GetName() != treeNodeComparator.Data.GetName() { return false } diff --git a/store/postgres/resource_spec_repository_test.go b/store/postgres/resource_spec_repository_test.go index 10624e3a2e..244d9d4943 100644 --- a/store/postgres/resource_spec_repository_test.go +++ b/store/postgres/resource_spec_repository_test.go @@ -157,7 +157,7 @@ func TestIntegrationResourceSpecRepository(t *testing.T) { dsTypeTableController.On("GenerateURN", testMock.Anything).Return(testModelA.URN, nil).Once() - //try for create + // try for create err := repo.Save(ctx, testModelA) assert.Nil(t, err) @@ -165,7 +165,7 @@ func TestIntegrationResourceSpecRepository(t *testing.T) { assert.Nil(t, err) assert.Equal(t, "proj.datas.test", checkModel.Name) - //try for create + // try for create err = repo.Save(ctx, testModelB) assert.Nil(t, err) @@ -185,7 +185,7 @@ func TestIntegrationResourceSpecRepository(t *testing.T) { dsTypeTableController.On("GenerateURN", testMock.Anything).Return(testModelA.URN, nil).Twice() - //try for create + // try for create err := repo.Save(ctx, testModelA) assert.Nil(t, err) @@ -193,7 +193,7 @@ func TestIntegrationResourceSpecRepository(t *testing.T) { assert.Nil(t, err) assert.Equal(t, "proj.ttt.test2", checkModel.Name) - //try for update + // try for update testModelA.Version = 6 dsTypeTableAdapter.On("ToYaml", testModelA).Return([]byte("some binary data testModelA"), nil) dsTypeTableAdapter.On("FromYaml", []byte("some binary data testModelA")).Return(testModelA, nil) @@ -253,7 +253,7 @@ func TestIntegrationResourceSpecRepository(t *testing.T) { dsTypeTableController.On("GenerateURN", testMock.Anything).Return(testModelA.URN, nil).Twice() - //try for create + // try for create err := resourceSpecNamespace1.Save(ctx, testModelA) assert.Nil(t, err) diff --git a/store/postgres/secret_repository_test.go b/store/postgres/secret_repository_test.go index d692a1f74d..c1698613d9 100644 --- a/store/postgres/secret_repository_test.go +++ b/store/postgres/secret_repository_test.go @@ -164,7 +164,7 @@ func TestIntegrationSecretRepository(t *testing.T) { repo := NewSecretRepository(db, hash) - //try for create + // try for create err := repo.Save(ctx, projectSpec, namespaceSpec, testModelA) assert.Nil(t, err) @@ -172,7 +172,7 @@ func TestIntegrationSecretRepository(t *testing.T) { assert.Nil(t, err) assert.Equal(t, "g-optimus", checkModel.Name) - //try for update + // try for update err = repo.Save(ctx, projectSpec, namespaceSpec, testModelB) assert.Nil(t, err) @@ -189,7 +189,7 @@ func TestIntegrationSecretRepository(t *testing.T) { repo := NewSecretRepository(db, hash) - //try for create + // try for create testModelA.Value = "gs://some_folder" err := repo.Save(ctx, projectSpec, namespaceSpec, testModelA) assert.Nil(t, err) @@ -198,7 +198,7 @@ func TestIntegrationSecretRepository(t *testing.T) { assert.Nil(t, err) assert.Equal(t, "t-optimus", checkModel.Name) - //try for create the same secret + // try for create the same secret testModelA.Value = "gs://another_folder" err = repo.Save(ctx, projectSpec, namespaceSpec, testModelA) assert.Equal(t, "resource already exists", err.Error()) @@ -213,7 +213,7 @@ func TestIntegrationSecretRepository(t *testing.T) { repo := NewSecretRepository(db, hash) - //try for create + // try for create testModelA.Value = "gs://some_folder" err := repo.Save(ctx, projectSpec, namespaceSpec, testModelA) assert.Nil(t, err) @@ -222,7 +222,7 @@ func TestIntegrationSecretRepository(t *testing.T) { assert.Nil(t, err) assert.Equal(t, "t-optimus", checkModel.Name) - //try for update + // try for update testModelA.Value = "gs://another_folder" err = repo.Update(ctx, projectSpec, namespaceSpec, testModelA) assert.Nil(t, err) @@ -239,7 +239,7 @@ func TestIntegrationSecretRepository(t *testing.T) { repo := NewSecretRepository(db, hash) - //try for update + // try for update err := repo.Update(ctx, projectSpec, namespaceSpec, testModelA) assert.Equal(t, "resource not found", err.Error()) }) diff --git a/store/store.go b/store/store.go index 552de7a470..5603c15cb2 100644 --- a/store/store.go +++ b/store/store.go @@ -66,12 +66,10 @@ type JobRunRepository interface { // Save updates the run in place if it can else insert new // Note: it doesn't insert the instances attached to job run in db Save(context.Context, models.NamespaceSpec, models.JobRun) error - GetByScheduledAt(ctx context.Context, jobID uuid.UUID, scheduledAt time.Time) (models.JobRun, models.NamespaceSpec, error) GetByID(context.Context, uuid.UUID) (models.JobRun, models.NamespaceSpec, error) UpdateStatus(context.Context, uuid.UUID, models.JobRunState) error GetByTrigger(ctx context.Context, trigger models.JobRunTrigger, state ...models.JobRunState) ([]models.JobRun, error) - AddInstance(ctx context.Context, namespace models.NamespaceSpec, run models.JobRun, spec models.InstanceSpec) error // Clear will not delete the record but will reset all the run details diff --git a/utils/file.go b/utils/file.go index c77ad113ce..568c5355c1 100644 --- a/utils/file.go +++ b/utils/file.go @@ -10,7 +10,7 @@ func WriteStringToFileIndexed() func(filePath, data string, writer io.Writer) er index := 0 return func(filePath, data string, writer io.Writer) error { if err := ioutil.WriteFile(filePath, - []byte(data), 0644); err != nil { + []byte(data), 0o644); err != nil { return fmt.Errorf("failed to write file at %s: %w", filePath, err) } index++ diff --git a/utils/proto.go b/utils/proto.go index 8ba3ac587a..4fd0d0fb4f 100644 --- a/utils/proto.go +++ b/utils/proto.go @@ -5,11 +5,11 @@ import ( ) // ToEnumProto converts models to Types defined in protobuf, task -> TYPE_TASK -func ToEnumProto(modelType string, enumName string) string { +func ToEnumProto(modelType, enumName string) string { return strings.ToUpper(enumName + "_" + modelType) } // FromEnumProto converts models to Types defined in protobuf, TYPE_TASK -> task -func FromEnumProto(typeProto string, enumName string) string { +func FromEnumProto(typeProto, enumName string) string { return strings.TrimPrefix(strings.ToLower(typeProto), strings.ToLower(enumName+"_")) } diff --git a/utils/uuid.go b/utils/uuid.go index 63cd76a009..a75b214643 100644 --- a/utils/uuid.go +++ b/utils/uuid.go @@ -6,8 +6,7 @@ type UUIDProvider interface { NewUUID() (uuid.UUID, error) } -type uuidProvider struct { -} +type uuidProvider struct{} func (*uuidProvider) NewUUID() (uuid.UUID, error) { return uuid.NewRandom() diff --git a/utils/validator.go b/utils/validator.go index d398e3c9fe..6d40d2568a 100644 --- a/utils/validator.go +++ b/utils/validator.go @@ -31,7 +31,7 @@ func CronIntervalValidator(val interface{}, param string) error { type VFactory struct{} func (f *VFactory) NewFromRegex(re, message string) survey.Validator { - var regex = regexp.MustCompile(re) + regex := regexp.MustCompile(re) return func(v interface{}) error { k := reflect.ValueOf(v).Kind() if k != reflect.String {