diff --git a/cmd/influx/task.go b/cmd/influx/task.go index 2043e80b55f..438346a79f5 100644 --- a/cmd/influx/task.go +++ b/cmd/influx/task.go @@ -378,7 +378,6 @@ func taskDeleteF(cmd *cobra.Command, args []string) error { type TaskLogFindFlags struct { taskID string runID string - orgID string } var taskLogFindFlags TaskLogFindFlags @@ -392,7 +391,6 @@ func init() { taskLogFindCmd.Flags().StringVarP(&taskLogFindFlags.taskID, "task-id", "", "", "task id (required)") taskLogFindCmd.Flags().StringVarP(&taskLogFindFlags.runID, "run-id", "", "", "run id") - taskLogFindCmd.Flags().StringVarP(&taskLogFindFlags.orgID, "org-id", "", "", "organization id") taskLogFindCmd.MarkFlagRequired("task-id") logCmd.AddCommand(taskLogFindCmd) @@ -409,7 +407,7 @@ func taskLogFindF(cmd *cobra.Command, args []string) error { if err != nil { return err } - filter.Task = id + filter.Task = *id if taskLogFindFlags.runID != "" { id, err := platform.IDFromString(taskLogFindFlags.runID) @@ -419,14 +417,6 @@ func taskLogFindF(cmd *cobra.Command, args []string) error { filter.Run = id } - if taskLogFindFlags.orgID != "" { - id, err := platform.IDFromString(taskLogFindFlags.orgID) - if err != nil { - return err - } - filter.Org = id - } - ctx := context.TODO() logs, _, err := s.FindLogs(ctx, filter) if err != nil { @@ -451,7 +441,6 @@ func taskLogFindF(cmd *cobra.Command, args []string) error { type TaskRunFindFlags struct { runID string taskID string - orgID string afterTime string beforeTime string limit int @@ -468,13 +457,11 @@ func init() { taskRunFindCmd.Flags().StringVarP(&taskRunFindFlags.taskID, "task-id", "", "", "task id (required)") taskRunFindCmd.Flags().StringVarP(&taskRunFindFlags.runID, "run-id", "", "", "run id") - taskRunFindCmd.Flags().StringVarP(&taskRunFindFlags.orgID, "org-id", "", "", "organization id") taskRunFindCmd.Flags().StringVarP(&taskRunFindFlags.afterTime, "after", "", "", "after time for filtering") taskRunFindCmd.Flags().StringVarP(&taskRunFindFlags.beforeTime, "before", "", "", "before time for filtering") taskRunFindCmd.Flags().IntVarP(&taskRunFindFlags.limit, "limit", "", 0, "limit the results") taskRunFindCmd.MarkFlagRequired("task-id") - taskRunFindCmd.MarkFlagRequired("org-id") runCmd.AddCommand(taskRunFindCmd) } @@ -494,13 +481,7 @@ func taskRunFindF(cmd *cobra.Command, args []string) error { if err != nil { return err } - filter.Task = taskID - - orgID, err := platform.IDFromString(taskRunFindFlags.orgID) - if err != nil { - return err - } - filter.Org = orgID + filter.Task = *taskID var runs []*platform.Run if taskRunFindFlags.runID != "" { @@ -508,7 +489,7 @@ func taskRunFindF(cmd *cobra.Command, args []string) error { if err != nil { return err } - run, err := s.FindRunByID(context.Background(), *filter.Org, *id) + run, err := s.FindRunByID(context.Background(), filter.Task, *id) if err != nil { return err } diff --git a/cmd/influxd/launcher/tasks_test.go b/cmd/influxd/launcher/tasks_test.go index 6e061acf40e..608f8277b69 100644 --- a/cmd/influxd/launcher/tasks_test.go +++ b/cmd/influxd/launcher/tasks_test.go @@ -122,7 +122,7 @@ from(bucket:"my_bucket_in") |> range(start:-5m) |> to(bucket:"%s", org:"%s")`, b } time.Sleep(5 * time.Millisecond) - runs, _, err := be.TaskService().FindRuns(ctx, influxdb.RunFilter{Org: &org.ID, Task: &created.ID, Limit: 1}) + runs, _, err := be.TaskService().FindRuns(ctx, influxdb.RunFilter{Task: created.ID, Limit: 1}) if err != nil { t.Fatal(err) } @@ -219,7 +219,7 @@ from(bucket:"my_bucket_in") |> range(start:-5m) |> to(bucket:"%s", org:"%s")`, b }) // now lets see a logs - logs, _, err := be.TaskService().FindLogs(ctx, influxdb.LogFilter{Org: &org.ID, Task: &created.ID, Run: &targetRun.ID}) + logs, _, err := be.TaskService().FindLogs(ctx, influxdb.LogFilter{Task: created.ID, Run: &targetRun.ID}) if err != nil { t.Fatal(err) } diff --git a/http/task_service.go b/http/task_service.go index 579781a97e9..4f06045c4dc 100644 --- a/http/task_service.go +++ b/http/task_service.go @@ -766,24 +766,7 @@ func decodeGetLogsRequest(ctx context.Context, r *http.Request, orgs platform.Or if err != nil { return nil, err } - req.filter.Task = taskID - - qp := r.URL.Query() - - if orgName := qp.Get("org"); orgName != "" { - o, err := orgs.FindOrganization(ctx, platform.OrganizationFilter{Name: &orgName}) - if err != nil { - return nil, err - } - - req.filter.Org = &o.ID - } else if oid := qp.Get("orgID"); oid != "" { - orgID, err := platform.IDFromString(oid) - if err != nil { - return nil, err - } - req.filter.Org = orgID - } + req.filter.Task = *taskID if runID := params.ByName("rid"); runID != "" { id, err := platform.IDFromString(runID) @@ -823,7 +806,7 @@ func (h *TaskHandler) handleGetRuns(w http.ResponseWriter, r *http.Request) { return } - if err := encodeResponse(ctx, w, http.StatusOK, newRunsResponse(runs, *req.filter.Task)); err != nil { + if err := encodeResponse(ctx, w, http.StatusOK, newRunsResponse(runs, req.filter.Task)); err != nil { logEncodingError(h.logger, r, err) return } @@ -848,25 +831,10 @@ func decodeGetRunsRequest(ctx context.Context, r *http.Request, orgs platform.Or if err != nil { return nil, err } - req.filter.Task = taskID + req.filter.Task = *taskID qp := r.URL.Query() - if orgName := qp.Get("org"); orgName != "" { - o, err := orgs.FindOrganization(ctx, platform.OrganizationFilter{Name: &orgName}) - if err != nil { - return nil, err - } - - req.filter.Org = &o.ID - } else if orgID := qp.Get("orgID"); orgID != "" { - oid, err := platform.IDFromString(orgID) - if err != nil { - return nil, err - } - req.filter.Org = oid - } - if id := qp.Get("after"); id != "" { afterID, err := platform.IDFromString(id) if err != nil { @@ -1401,26 +1369,21 @@ func (t TaskService) DeleteTask(ctx context.Context, id platform.ID) error { // FindLogs returns logs for a run. func (t TaskService) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) { - if filter.Task == nil { + if !filter.Task.Valid() { return nil, 0, errors.New("task ID required") } var urlPath string if filter.Run == nil { - urlPath = path.Join(taskIDPath(*filter.Task), "logs") + urlPath = path.Join(taskIDPath(filter.Task), "logs") } else { - urlPath = path.Join(taskIDRunIDPath(*filter.Task, *filter.Run), "logs") + urlPath = path.Join(taskIDRunIDPath(filter.Task, *filter.Run), "logs") } u, err := newURL(t.Addr, urlPath) if err != nil { return nil, 0, err } - val := url.Values{} - if filter.Org != nil { - val.Set("orgID", filter.Org.String()) - } - u.RawQuery = val.Encode() req, err := http.NewRequest("GET", u.String(), nil) if err != nil { @@ -1450,19 +1413,16 @@ func (t TaskService) FindLogs(ctx context.Context, filter platform.LogFilter) ([ // FindRuns returns a list of runs that match a filter and the total count of returned runs. func (t TaskService) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) { - if filter.Task == nil { + if !filter.Task.Valid() { return nil, 0, errors.New("task ID required") } - u, err := newURL(t.Addr, taskIDRunsPath(*filter.Task)) + u, err := newURL(t.Addr, taskIDRunsPath(filter.Task)) if err != nil { return nil, 0, err } val := url.Values{} - if filter.Org != nil { - val.Set("orgID", filter.Org.String()) - } if filter.After != nil { val.Set("after", filter.After.String()) } diff --git a/http/task_service_test.go b/http/task_service_test.go index 55f28e2b626..285dfdd855c 100644 --- a/http/task_service_test.go +++ b/http/task_service_test.go @@ -440,7 +440,7 @@ func TestTaskHandler_handleGetRuns(t *testing.T) { runs := []*platform.Run{ { ID: platform.ID(2), - TaskID: *f.Task, + TaskID: f.Task, Status: "success", ScheduledFor: "2018-12-01T17:00:13Z", StartedAt: "2018-12-01T17:00:03.155645Z", @@ -617,7 +617,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) { name: "get task logs", svc: &mock.TaskService{ FindLogsFn: func(_ context.Context, f platform.LogFilter) ([]*platform.Log, int, error) { - if *f.Task == taskID { + if f.Task == taskID { return nil, 0, nil } @@ -633,7 +633,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) { name: "get run logs", svc: &mock.TaskService{ FindLogsFn: func(_ context.Context, f platform.LogFilter) ([]*platform.Log, int, error) { - if *f.Task != taskID { + if f.Task != taskID { return nil, 0, backend.ErrTaskNotFound } if *f.Run != runID { @@ -652,7 +652,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) { name: "get runs", svc: &mock.TaskService{ FindRunsFn: func(_ context.Context, f platform.RunFilter) ([]*platform.Run, int, error) { - if *f.Task != taskID { + if f.Task != taskID { return nil, 0, backend.ErrTaskNotFound } diff --git a/task.go b/task.go index 1c8cb30e434..6c4441d841b 100644 --- a/task.go +++ b/task.go @@ -251,17 +251,20 @@ type TaskFilter struct { // RunFilter represents a set of filters that restrict the returned results type RunFilter struct { - Org *ID - Task *ID + // Task ID is required for listing runs. + Task ID + After *ID Limit int AfterTime string BeforeTime string } -// LogFilter represents a set of filters that restrict the returned results +// LogFilter represents a set of filters that restrict the returned log results. type LogFilter struct { - Org *ID - Task *ID - Run *ID + // Task ID is required. + Task ID + + // The optional Run ID limits logs to a single run. + Run *ID } diff --git a/task/backend/inmem_logreaderwriter.go b/task/backend/inmem_logreaderwriter.go index f9120a49575..b2f172b4261 100644 --- a/task/backend/inmem_logreaderwriter.go +++ b/task/backend/inmem_logreaderwriter.go @@ -10,14 +10,20 @@ import ( platform "github.com/influxdata/influxdb" ) +// orgtask is used as a key for storing runs by org and task ID. +// This is only relevant for the in-memory run store. +type orgtask struct { + o, t platform.ID +} + type runReaderWriter struct { - mu sync.RWMutex - byTaskID map[string][]*platform.Run - byRunID map[string]*platform.Run + mu sync.RWMutex + byOrgTask map[orgtask][]*platform.Run + byRunID map[string]*platform.Run } func NewInMemRunReaderWriter() *runReaderWriter { - return &runReaderWriter{byRunID: map[string]*platform.Run{}, byTaskID: map[string][]*platform.Run{}} + return &runReaderWriter{byRunID: map[string]*platform.Run{}, byOrgTask: map[orgtask][]*platform.Run{}} } func (r *runReaderWriter) UpdateRunState(ctx context.Context, rlb RunLogBase, when time.Time, status RunStatus) error { @@ -48,8 +54,8 @@ func (r *runReaderWriter) UpdateRunState(ctx context.Context, rlb RunLogBase, wh } timeSetter(run) r.byRunID[ridStr] = run - tidStr := rlb.Task.ID.String() - r.byTaskID[tidStr] = append(r.byTaskID[tidStr], run) + ot := orgtask{o: rlb.Task.Org, t: rlb.Task.ID} + r.byOrgTask[ot] = append(r.byOrgTask[ot], run) return nil } @@ -75,15 +81,15 @@ func (r *runReaderWriter) AddRunLog(ctx context.Context, rlb RunLogBase, when ti return nil } -func (r *runReaderWriter) ListRuns(ctx context.Context, runFilter platform.RunFilter) ([]*platform.Run, error) { +func (r *runReaderWriter) ListRuns(ctx context.Context, orgID platform.ID, runFilter platform.RunFilter) ([]*platform.Run, error) { r.mu.RLock() defer r.mu.RUnlock() - if runFilter.Task == nil { + if !runFilter.Task.Valid() { return nil, errors.New("task is required") } - ex, ok := r.byTaskID[runFilter.Task.String()] + ex, ok := r.byOrgTask[orgtask{o: orgID, t: runFilter.Task}] if !ok { return nil, ErrRunNotFound } @@ -129,12 +135,12 @@ func (r *runReaderWriter) FindRunByID(ctx context.Context, orgID, runID platform return &rtnRun, nil } -func (r *runReaderWriter) ListLogs(ctx context.Context, logFilter platform.LogFilter) ([]platform.Log, error) { +func (r *runReaderWriter) ListLogs(ctx context.Context, orgID platform.ID, logFilter platform.LogFilter) ([]platform.Log, error) { r.mu.RLock() defer r.mu.RUnlock() - if logFilter.Task == nil && logFilter.Run == nil { - return nil, errors.New("task or run is required") + if !logFilter.Task.Valid() { + return nil, errors.New("task ID required") } if logFilter.Run != nil { @@ -142,12 +148,19 @@ func (r *runReaderWriter) ListLogs(ctx context.Context, logFilter platform.LogFi if !ok { return nil, ErrRunNotFound } + // TODO(mr): validate that task ID matches, if task is also set. Needs test. return []platform.Log{run.Log}, nil } - logs := []platform.Log{} - for _, run := range r.byTaskID[logFilter.Task.String()] { + var logs []platform.Log + ot := orgtask{o: orgID, t: logFilter.Task} + for _, run := range r.byOrgTask[ot] { logs = append(logs, run.Log) } + + if len(logs) == 0 { + return nil, errors.New("no matching runs found") + } + return logs, nil } diff --git a/task/backend/query_logreader.go b/task/backend/query_logreader.go index abb6555d9a5..33303663483 100644 --- a/task/backend/query_logreader.go +++ b/task/backend/query_logreader.go @@ -22,18 +22,17 @@ type QueryLogReader struct { queryService query.QueryService } +var _ LogReader = (*QueryLogReader)(nil) + func NewQueryLogReader(qs query.QueryService) *QueryLogReader { return &QueryLogReader{ queryService: qs, } } -func (qlr *QueryLogReader) ListLogs(ctx context.Context, logFilter platform.LogFilter) ([]platform.Log, error) { - if logFilter.Org == nil { - return nil, errors.New("org required") - } - if logFilter.Task == nil && logFilter.Run == nil { - return nil, errors.New("task or run is required") +func (qlr *QueryLogReader) ListLogs(ctx context.Context, orgID platform.ID, logFilter platform.LogFilter) ([]platform.Log, error) { + if !logFilter.Task.Valid() { + return nil, errors.New("task ID required to list logs") } filterPart := "" @@ -58,7 +57,7 @@ func (qlr *QueryLogReader) ListLogs(ctx context.Context, logFilter platform.LogF if auth.Kind() != "authorization" { return nil, platform.ErrAuthorizerNotSupported } - request := &query.Request{Authorization: auth.(*platform.Authorization), OrganizationID: *logFilter.Org, Compiler: lang.FluxCompiler{Query: listScript}} + request := &query.Request{Authorization: auth.(*platform.Authorization), OrganizationID: orgID, Compiler: lang.FluxCompiler{Query: listScript}} ittr, err := qlr.queryService.Query(ctx, request) if err != nil { @@ -78,6 +77,10 @@ func (qlr *QueryLogReader) ListLogs(ctx context.Context, logFilter platform.LogF } runs := re.Runs() + if len(runs) == 0 { + return nil, errors.New("no matching runs found") + } + logs := make([]platform.Log, len(runs)) for i, r := range runs { logs[i] = r.Log @@ -85,13 +88,10 @@ func (qlr *QueryLogReader) ListLogs(ctx context.Context, logFilter platform.LogF return logs, nil } -func (qlr *QueryLogReader) ListRuns(ctx context.Context, runFilter platform.RunFilter) ([]*platform.Run, error) { - if runFilter.Task == nil { +func (qlr *QueryLogReader) ListRuns(ctx context.Context, orgID platform.ID, runFilter platform.RunFilter) ([]*platform.Run, error) { + if !runFilter.Task.Valid() { return nil, errors.New("task required") } - if runFilter.Org == nil { - return nil, errors.New("org required") - } limit := "|> limit(n: 100)\n" if runFilter.Limit > 0 { @@ -129,14 +129,23 @@ from(bucketID: "000000000000000a") if auth.Kind() != "authorization" { return nil, platform.ErrAuthorizerNotSupported } - request := &query.Request{Authorization: auth.(*platform.Authorization), OrganizationID: *runFilter.Org, Compiler: lang.FluxCompiler{Query: listScript}} + request := &query.Request{Authorization: auth.(*platform.Authorization), OrganizationID: orgID, Compiler: lang.FluxCompiler{Query: listScript}} ittr, err := qlr.queryService.Query(ctx, request) if err != nil { return nil, err } - return queryIttrToRuns(ittr) + runs, err := queryIttrToRuns(ittr) + if err != nil { + return nil, err + } + + if len(runs) == 0 { + return nil, errors.New("no matching runs found") + } + + return runs, nil } func (qlr *QueryLogReader) FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error) { diff --git a/task/backend/scheduler_test.go b/task/backend/scheduler_test.go index 093c124d875..02c5f3b3547 100644 --- a/task/backend/scheduler_test.go +++ b/task/backend/scheduler_test.go @@ -29,8 +29,10 @@ func TestScheduler_Cancelation(t *testing.T) { o.Start(context.Background()) defer o.Stop() + const orgID = 2 task := &backend.StoreTask{ - ID: platform.ID(1), + ID: platform.ID(1), + Org: orgID, } meta := &backend.StoreTaskMeta{ MaxConcurrency: 1, @@ -41,7 +43,7 @@ func TestScheduler_Cancelation(t *testing.T) { if err := o.ClaimTask(task, meta); err != nil { t.Fatal(err) } - runs, err := rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID}) + runs, err := rl.ListRuns(context.Background(), orgID, platform.RunFilter{Task: task.ID}) if err != nil { t.Fatal(err) } @@ -49,7 +51,7 @@ func TestScheduler_Cancelation(t *testing.T) { t.Fatal(err) } time.Sleep(10 * time.Millisecond) // we have to do this because the storage system we are using for the logs is eventually consistent. - runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID}) + runs, err = rl.ListRuns(context.Background(), orgID, platform.RunFilter{Task: task.ID}) if err != nil { t.Fatal(err) } @@ -58,7 +60,7 @@ func TestScheduler_Cancelation(t *testing.T) { } // check to make sure it is really canceling, and that the status doesn't get changed to something else after it would have finished time.Sleep(500 * time.Millisecond) - runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID}) + runs, err = rl.ListRuns(context.Background(), orgID, platform.RunFilter{Task: task.ID}) if err != nil { t.Fatal(err) } @@ -395,7 +397,7 @@ func TestScheduler_Queue(t *testing.T) { } // pollForRunStatus tries a few times to find runs matching supplied conditions, before failing. -func pollForRunStatus(t *testing.T, r backend.LogReader, taskID platform.ID, expCount, expIndex int, expStatus string) { +func pollForRunStatus(t *testing.T, r backend.LogReader, taskID, orgID platform.ID, expCount, expIndex int, expStatus string) { t.Helper() var runs []*platform.Run @@ -407,7 +409,7 @@ func pollForRunStatus(t *testing.T, r backend.LogReader, taskID platform.ID, exp time.Sleep(10 * time.Millisecond) } - runs, err = r.ListRuns(context.Background(), platform.RunFilter{Task: &taskID}) + runs, err = r.ListRuns(context.Background(), orgID, platform.RunFilter{Task: taskID}) if err != nil { t.Fatal(err) } @@ -443,7 +445,8 @@ func TestScheduler_RunLog(t *testing.T) { // Claim a task that starts later. task := &backend.StoreTask{ - ID: platform.ID(1), + ID: platform.ID(1), + Org: 2, } meta := &backend.StoreTaskMeta{ MaxConcurrency: 99, @@ -456,7 +459,7 @@ func TestScheduler_RunLog(t *testing.T) { t.Fatal(err) } - if _, err := rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID}); err != backend.ErrRunNotFound { + if _, err := rl.ListRuns(context.Background(), task.Org, platform.RunFilter{Task: task.ID}); err != backend.ErrRunNotFound { t.Fatal(err) } @@ -466,7 +469,7 @@ func TestScheduler_RunLog(t *testing.T) { t.Fatal(err) } - runs, err := rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID}) + runs, err := rl.ListRuns(context.Background(), task.Org, platform.RunFilter{Task: task.ID}) if err != nil { t.Fatal(err) } @@ -484,7 +487,7 @@ func TestScheduler_RunLog(t *testing.T) { t.Fatal(err) } - pollForRunStatus(t, rl, task.ID, 1, 0, backend.RunSuccess.String()) + pollForRunStatus(t, rl, task.ID, task.Org, 1, 0, backend.RunSuccess.String()) // Create a new run, but fail this time. s.Tick(7) @@ -493,7 +496,7 @@ func TestScheduler_RunLog(t *testing.T) { t.Fatal(err) } - pollForRunStatus(t, rl, task.ID, 2, 1, backend.RunStarted.String()) + pollForRunStatus(t, rl, task.ID, task.Org, 2, 1, backend.RunStarted.String()) // Finish with failure to create the run. promises[0].Finish(nil, errors.New("forced failure")) @@ -501,7 +504,7 @@ func TestScheduler_RunLog(t *testing.T) { t.Fatal(err) } - pollForRunStatus(t, rl, task.ID, 2, 1, backend.RunFail.String()) + pollForRunStatus(t, rl, task.ID, task.Org, 2, 1, backend.RunFail.String()) // Create a new run that starts but fails. s.Tick(8) @@ -510,12 +513,12 @@ func TestScheduler_RunLog(t *testing.T) { t.Fatal(err) } - pollForRunStatus(t, rl, task.ID, 3, 2, backend.RunStarted.String()) + pollForRunStatus(t, rl, task.ID, task.Org, 3, 2, backend.RunStarted.String()) promises[0].Finish(mock.NewRunResult(errors.New("started but failed to finish properly"), false), nil) if _, err := e.PollForNumberRunning(task.ID, 0); err != nil { t.Fatal(err) } - pollForRunStatus(t, rl, task.ID, 3, 2, backend.RunFail.String()) + pollForRunStatus(t, rl, task.ID, task.Org, 3, 2, backend.RunFail.String()) // One more run, but cancel this time. s.Tick(9) @@ -524,7 +527,7 @@ func TestScheduler_RunLog(t *testing.T) { t.Fatal(err) } - pollForRunStatus(t, rl, task.ID, 4, 3, backend.RunStarted.String()) + pollForRunStatus(t, rl, task.ID, task.Org, 4, 3, backend.RunStarted.String()) // Finish with failure. promises[0].Cancel() @@ -532,7 +535,7 @@ func TestScheduler_RunLog(t *testing.T) { t.Fatal(err) } - pollForRunStatus(t, rl, task.ID, 4, 3, backend.RunCanceled.String()) + pollForRunStatus(t, rl, task.ID, task.Org, 4, 3, backend.RunCanceled.String()) } func TestScheduler_RunFailureCleanup(t *testing.T) { diff --git a/task/backend/store.go b/task/backend/store.go index 7d1f71441ab..e898a86727c 100644 --- a/task/backend/store.go +++ b/task/backend/store.go @@ -329,6 +329,8 @@ type LogWriter interface { // This is useful for test, but not much else. type NopLogWriter struct{} +var _ LogWriter = NopLogWriter{} + func (NopLogWriter) UpdateRunState(context.Context, RunLogBase, time.Time, RunStatus) error { return nil } @@ -340,21 +342,25 @@ func (NopLogWriter) AddRunLog(context.Context, RunLogBase, time.Time, string) er // LogReader reads log information and log data from a store. type LogReader interface { // ListRuns returns a list of runs belonging to a task. - ListRuns(ctx context.Context, runFilter platform.RunFilter) ([]*platform.Run, error) + // orgID is necessary to look in the correct system bucket. + ListRuns(ctx context.Context, orgID platform.ID, runFilter platform.RunFilter) ([]*platform.Run, error) // FindRunByID finds a run given a orgID and runID. // orgID is necessary to look in the correct system bucket. FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error) // ListLogs lists logs for a task or a specified run of a task. - ListLogs(ctx context.Context, logFilter platform.LogFilter) ([]platform.Log, error) + // orgID is necessary to look in the correct system bucket. + ListLogs(ctx context.Context, orgID platform.ID, logFilter platform.LogFilter) ([]platform.Log, error) } -// NopLogWriter is a LogWriter that doesn't do anything when its methods are called. +// NopLogReader is a LogReader that doesn't do anything when its methods are called. // This is useful for test, but not much else. type NopLogReader struct{} -func (NopLogReader) ListRuns(ctx context.Context, runFilter platform.RunFilter) ([]*platform.Run, error) { +var _ LogReader = NopLogReader{} + +func (NopLogReader) ListRuns(ctx context.Context, orgID platform.ID, runFilter platform.RunFilter) ([]*platform.Run, error) { return nil, nil } @@ -362,7 +368,7 @@ func (NopLogReader) FindRunByID(ctx context.Context, orgID, runID platform.ID) ( return nil, nil } -func (NopLogReader) ListLogs(ctx context.Context, logFilter platform.LogFilter) ([]platform.Log, error) { +func (NopLogReader) ListLogs(ctx context.Context, orgID platform.ID, logFilter platform.LogFilter) ([]platform.Log, error) { return nil, nil } diff --git a/task/backend/storetest/logstoretest.go b/task/backend/storetest/logstoretest.go index 388f7b40cd6..91a9ba12db6 100644 --- a/task/backend/storetest/logstoretest.go +++ b/task/backend/storetest/logstoretest.go @@ -173,8 +173,11 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) ctx := pcontext.SetAuthorizer(context.Background(), makeNewAuthorization()) - if _, err := reader.ListRuns(ctx, platform.RunFilter{Task: &task.ID}); err == nil { - t.Fatal("failed to error on bad id") + if _, err := reader.ListRuns(ctx, task.ID, platform.RunFilter{Task: task.ID}); err == nil { + t.Fatal("failed to error on bad org id") + } + if _, err := reader.ListRuns(ctx, task.Org, platform.RunFilter{Task: task.Org}); err == nil { + t.Fatal("failed to error on bad task id") } now := time.Now().UTC() @@ -201,13 +204,15 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) } } - if _, err := reader.ListRuns(ctx, platform.RunFilter{}); err == nil { - t.Fatal("failed to error without any filter") + if _, err := reader.ListRuns(ctx, task.Org, platform.RunFilter{}); err == nil { + t.Fatal("failed to error with invalid task ID") + } + if _, err := reader.ListRuns(ctx, 0, platform.RunFilter{Task: task.ID}); err == nil { + t.Fatal("failed to error with invalid org ID") } - listRuns, err := reader.ListRuns(ctx, platform.RunFilter{ - Task: &task.ID, - Org: &task.Org, + listRuns, err := reader.ListRuns(ctx, task.Org, platform.RunFilter{ + Task: task.ID, Limit: 2 * nRuns, }) if err != nil { @@ -219,9 +224,8 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) } const afterIDIdx = 20 - listRuns, err = reader.ListRuns(ctx, platform.RunFilter{ - Task: &task.ID, - Org: &task.Org, + listRuns, err = reader.ListRuns(ctx, task.Org, platform.RunFilter{ + Task: task.ID, After: &runs[afterIDIdx].ID, Limit: 2 * nRuns, }) @@ -233,9 +237,8 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) t.Fatalf("retrieved: %d, expected: %d", len(listRuns), len(runs)-(afterIDIdx+1)) } - listRuns, err = reader.ListRuns(ctx, platform.RunFilter{ - Task: &task.ID, - Org: &task.Org, + listRuns, err = reader.ListRuns(ctx, task.Org, platform.RunFilter{ + Task: task.ID, Limit: 30, }) if err != nil { @@ -248,9 +251,8 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) const afterTimeIdx = 34 scheduledFor, _ := time.Parse(time.RFC3339, runs[afterTimeIdx].ScheduledFor) - listRuns, err = reader.ListRuns(ctx, platform.RunFilter{ - Task: &task.ID, - Org: &task.Org, + listRuns, err = reader.ListRuns(ctx, task.Org, platform.RunFilter{ + Task: task.ID, AfterTime: scheduledFor.Format(time.RFC3339), Limit: 2 * nRuns, }) @@ -264,9 +266,8 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) const beforeTimeIdx = 34 scheduledFor, _ = time.Parse(time.RFC3339, runs[beforeTimeIdx].ScheduledFor) - listRuns, err = reader.ListRuns(ctx, platform.RunFilter{ - Task: &task.ID, - Org: &task.Org, + listRuns, err = reader.ListRuns(ctx, task.Org, platform.RunFilter{ + Task: task.ID, BeforeTime: scheduledFor.Add(time.Millisecond).Format(time.RFC3339), }) if err != nil { @@ -348,11 +349,11 @@ func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) ctx := pcontext.SetAuthorizer(context.Background(), makeNewAuthorization()) - if _, err := reader.ListLogs(ctx, platform.LogFilter{}); err == nil { - t.Fatal("failed to error with no filter") + if _, err := reader.ListLogs(ctx, task.Org, platform.LogFilter{}); err == nil { + t.Fatal("failed to error with missing task ID") } - if _, err := reader.ListLogs(ctx, platform.LogFilter{Run: &task.ID}); err == nil { - t.Fatal("failed to error with a non-run-ID") + if _, err := reader.ListLogs(ctx, 9999999, platform.LogFilter{Task: task.ID}); err == nil { + t.Fatal("failed to error with an invalid org ID") } now := time.Now().UTC() @@ -381,7 +382,7 @@ func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) } const targetRun = 4 - logs, err := reader.ListLogs(ctx, platform.LogFilter{Run: &runs[targetRun].ID, Org: &task.Org}) + logs, err := reader.ListLogs(ctx, task.Org, platform.LogFilter{Task: task.ID, Run: &runs[targetRun].ID}) if err != nil { t.Fatal(err) } @@ -394,7 +395,7 @@ func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) t.Fatalf("expected: %q, got: %q", fmtTimelog+": log4", string(logs[0])) } - logs, err = reader.ListLogs(ctx, platform.LogFilter{Task: &task.ID, Org: &task.Org}) + logs, err = reader.ListLogs(ctx, task.Org, platform.LogFilter{Task: task.ID}) if err != nil { t.Fatal(err) } diff --git a/task/platform_adapter.go b/task/platform_adapter.go index f4cacf68aff..c11518f3b28 100644 --- a/task/platform_adapter.go +++ b/task/platform_adapter.go @@ -230,7 +230,12 @@ func (p pAdapter) DeleteTask(ctx context.Context, id platform.ID) error { } func (p pAdapter) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) { - logs, err := p.r.ListLogs(ctx, filter) + task, err := p.s.FindTaskByID(ctx, filter.Task) + if err != nil { + return nil, 0, err + } + + logs, err := p.r.ListLogs(ctx, task.Org, filter) logPointers := make([]*platform.Log, len(logs)) for i := range logs { logPointers[i] = &logs[i] @@ -239,7 +244,12 @@ func (p pAdapter) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*p } func (p pAdapter) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) { - runs, err := p.r.ListRuns(ctx, filter) + task, err := p.s.FindTaskByID(ctx, filter.Task) + if err != nil { + return nil, 0, err + } + + runs, err := p.r.ListRuns(ctx, task.Org, filter) return runs, len(runs), err } diff --git a/task/servicetest/servicetest.go b/task/servicetest/servicetest.go index cc9225db611..0ceed1ac0cf 100644 --- a/task/servicetest/servicetest.go +++ b/task/servicetest/servicetest.go @@ -462,7 +462,7 @@ func testTaskRuns(t *testing.T, sys *System) { } // Limit 1 should only return the earlier run. - runs, _, err := sys.ts.FindRuns(sys.Ctx, platform.RunFilter{Org: &cr.OrgID, Task: &task.ID, Limit: 1}) + runs, _, err := sys.ts.FindRuns(sys.Ctx, platform.RunFilter{Task: task.ID, Limit: 1}) if err != nil { t.Fatal(err) } @@ -483,7 +483,7 @@ func testTaskRuns(t *testing.T, sys *System) { } // Unspecified limit returns both runs. - runs, _, err = sys.ts.FindRuns(sys.Ctx, platform.RunFilter{Org: &cr.OrgID, Task: &task.ID}) + runs, _, err = sys.ts.FindRuns(sys.Ctx, platform.RunFilter{Task: task.ID}) if err != nil { t.Fatal(err) } @@ -745,9 +745,8 @@ func testTaskRuns(t *testing.T, sys *System) { } // Ensure it is returned when filtering logs by run ID. - logs, err := sys.LR.ListLogs(sys.Ctx, platform.LogFilter{ - Org: &cr.OrgID, - Task: &task.ID, + logs, err := sys.LR.ListLogs(sys.Ctx, cr.OrgID, platform.LogFilter{ + Task: task.ID, Run: &rc1.Created.RunID, }) if err != nil { @@ -767,9 +766,8 @@ func testTaskRuns(t *testing.T, sys *System) { } // Ensure both returned when filtering logs by task ID. - logs, err = sys.LR.ListLogs(sys.Ctx, platform.LogFilter{ - Org: &cr.OrgID, - Task: &task.ID, + logs, err = sys.LR.ListLogs(sys.Ctx, cr.OrgID, platform.LogFilter{ + Task: task.ID, }) if err != nil { t.Fatal(err) diff --git a/task/validator.go b/task/validator.go index f1f5416b38d..9808b5f02b9 100644 --- a/task/validator.go +++ b/task/validator.go @@ -35,6 +35,7 @@ func NewValidator(ts platform.TaskService, bs platform.BucketService) platform.T preAuth: query.NewPreAuthorizer(bs), } } + func (ts *taskServiceValidator) FindTaskByID(ctx context.Context, id platform.ID) (*platform.Task, error) { task, err := ts.TaskService.FindTaskByID(ctx, id) if err != nil { @@ -128,33 +129,29 @@ func (ts *taskServiceValidator) DeleteTask(ctx context.Context, id platform.ID) } func (ts *taskServiceValidator) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) { - if filter.Org != nil { - perm, err := platform.NewPermission(platform.ReadAction, platform.TasksResourceType, *filter.Org) - if err != nil { - return nil, -1, err - } - - if err := validatePermission(ctx, *perm); err != nil { - return nil, -1, err - } - + // Look up the task first, through the validator, to ensure we have permission to view the task. + if _, err := ts.FindTaskByID(ctx, filter.Task); err != nil { + return nil, -1, err } - // TODO(lyon): If the user no longer has permission to the organization we might fail or filter here? + // If we can find the task, we can read its logs. return ts.TaskService.FindLogs(ctx, filter) } func (ts *taskServiceValidator) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) { - if filter.Org != nil { - perm, err := platform.NewPermission(platform.ReadAction, platform.TasksResourceType, *filter.Org) - if err != nil { - return nil, -1, err - } + // Look up the task first, through the validator, to ensure we have permission to view the task. + task, err := ts.FindTaskByID(ctx, filter.Task) + if err != nil { + return nil, -1, err + } - if err := validatePermission(ctx, *perm); err != nil { - return nil, -1, err - } + perm, err := platform.NewPermission(platform.ReadAction, platform.TasksResourceType, task.OrganizationID) + if err != nil { + return nil, -1, err + } + if err := validatePermission(ctx, *perm); err != nil { + return nil, -1, err } // TODO(lyon): If the user no longer has permission to the organization we might fail or filter here? diff --git a/task/validator_test.go b/task/validator_test.go index 4ca1368f7c9..1aa72a2bedc 100644 --- a/task/validator_test.go +++ b/task/validator_test.go @@ -323,7 +323,7 @@ from(bucket:"cows") |> range(start:-5m) |> to(bucket:"cows", org:"thing")` auth: &influxdb.Authorization{Status: "active", Permissions: []influxdb.Permission{influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{Type: influxdb.OrgsResourceType, OrgID: &taskID}}}}, check: func(ctx context.Context, svc influxdb.TaskService) error { _, _, err := svc.FindLogs(ctx, influxdb.LogFilter{ - Org: &orgID, + Task: taskID, }) if err == nil { return errors.New("returned no error with a invalid auth") @@ -336,25 +336,17 @@ from(bucket:"cows") |> range(start:-5m) |> to(bucket:"cows", org:"thing")` auth: &influxdb.Authorization{Status: "active", Permissions: []influxdb.Permission{influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{Type: influxdb.TasksResourceType, OrgID: &orgID}}}}, check: func(ctx context.Context, svc influxdb.TaskService) error { _, _, err := svc.FindLogs(ctx, influxdb.LogFilter{ - Org: &orgID, + Task: taskID, }) return err }, }, - { - name: "FindLogs without org", - auth: &influxdb.Authorization{Status: "active"}, - check: func(ctx context.Context, svc influxdb.TaskService) error { - _, _, err := svc.FindLogs(ctx, influxdb.LogFilter{}) - return err - }, - }, { name: "FindRuns with bad auth", auth: &influxdb.Authorization{Status: "active", Permissions: []influxdb.Permission{influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{Type: influxdb.OrgsResourceType, OrgID: &taskID}}}}, check: func(ctx context.Context, svc influxdb.TaskService) error { _, _, err := svc.FindRuns(ctx, influxdb.RunFilter{ - Org: &orgID, + Task: taskID, }) if err == nil { return errors.New("returned no error with a invalid auth") @@ -367,19 +359,11 @@ from(bucket:"cows") |> range(start:-5m) |> to(bucket:"cows", org:"thing")` auth: &influxdb.Authorization{Status: "active", Permissions: []influxdb.Permission{influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{Type: influxdb.TasksResourceType, OrgID: &orgID}}}}, check: func(ctx context.Context, svc influxdb.TaskService) error { _, _, err := svc.FindRuns(ctx, influxdb.RunFilter{ - Org: &orgID, + Task: taskID, }) return err }, }, - { - name: "FindRuns without org", - auth: &influxdb.Authorization{Status: "active"}, - check: func(ctx context.Context, svc influxdb.TaskService) error { - _, _, err := svc.FindRuns(ctx, influxdb.RunFilter{}) - return err - }, - }, { name: "FindRunByID missing auth", auth: &influxdb.Authorization{Permissions: []influxdb.Permission{}}, @@ -469,5 +453,4 @@ from(bucket:"cows") |> range(start:-5m) |> to(bucket:"cows", org:"thing")` } }) } - }