Skip to content

Commit

Permalink
refactor(task): remove explicit org from run/log lookup
Browse files Browse the repository at this point in the history
Task ID is now a required value on run and log filters. It was
effectively required by all implementations before anyway, so now those
types reflect that requirement.

Organization ID was removed from those same fields. The TaskService
looks up the organization ID via the task in cases where we need it at a
lower layer.
  • Loading branch information
mark-rushakoff committed Feb 16, 2019
1 parent 79dfbb6 commit cbef811
Show file tree
Hide file tree
Showing 14 changed files with 170 additions and 206 deletions.
25 changes: 3 additions & 22 deletions cmd/influx/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ func taskDeleteF(cmd *cobra.Command, args []string) error {
type TaskLogFindFlags struct {
taskID string
runID string
orgID string
}

var taskLogFindFlags TaskLogFindFlags
Expand All @@ -392,7 +391,6 @@ func init() {

taskLogFindCmd.Flags().StringVarP(&taskLogFindFlags.taskID, "task-id", "", "", "task id (required)")
taskLogFindCmd.Flags().StringVarP(&taskLogFindFlags.runID, "run-id", "", "", "run id")
taskLogFindCmd.Flags().StringVarP(&taskLogFindFlags.orgID, "org-id", "", "", "organization id")
taskLogFindCmd.MarkFlagRequired("task-id")

logCmd.AddCommand(taskLogFindCmd)
Expand All @@ -409,7 +407,7 @@ func taskLogFindF(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
filter.Task = id
filter.Task = *id

if taskLogFindFlags.runID != "" {
id, err := platform.IDFromString(taskLogFindFlags.runID)
Expand All @@ -419,14 +417,6 @@ func taskLogFindF(cmd *cobra.Command, args []string) error {
filter.Run = id
}

if taskLogFindFlags.orgID != "" {
id, err := platform.IDFromString(taskLogFindFlags.orgID)
if err != nil {
return err
}
filter.Org = id
}

ctx := context.TODO()
logs, _, err := s.FindLogs(ctx, filter)
if err != nil {
Expand All @@ -451,7 +441,6 @@ func taskLogFindF(cmd *cobra.Command, args []string) error {
type TaskRunFindFlags struct {
runID string
taskID string
orgID string
afterTime string
beforeTime string
limit int
Expand All @@ -468,13 +457,11 @@ func init() {

taskRunFindCmd.Flags().StringVarP(&taskRunFindFlags.taskID, "task-id", "", "", "task id (required)")
taskRunFindCmd.Flags().StringVarP(&taskRunFindFlags.runID, "run-id", "", "", "run id")
taskRunFindCmd.Flags().StringVarP(&taskRunFindFlags.orgID, "org-id", "", "", "organization id")
taskRunFindCmd.Flags().StringVarP(&taskRunFindFlags.afterTime, "after", "", "", "after time for filtering")
taskRunFindCmd.Flags().StringVarP(&taskRunFindFlags.beforeTime, "before", "", "", "before time for filtering")
taskRunFindCmd.Flags().IntVarP(&taskRunFindFlags.limit, "limit", "", 0, "limit the results")

taskRunFindCmd.MarkFlagRequired("task-id")
taskRunFindCmd.MarkFlagRequired("org-id")

runCmd.AddCommand(taskRunFindCmd)
}
Expand All @@ -494,21 +481,15 @@ func taskRunFindF(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
filter.Task = taskID

orgID, err := platform.IDFromString(taskRunFindFlags.orgID)
if err != nil {
return err
}
filter.Org = orgID
filter.Task = *taskID

var runs []*platform.Run
if taskRunFindFlags.runID != "" {
id, err := platform.IDFromString(taskRunFindFlags.runID)
if err != nil {
return err
}
run, err := s.FindRunByID(context.Background(), *filter.Org, *id)
run, err := s.FindRunByID(context.Background(), filter.Task, *id)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/influxd/launcher/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ from(bucket:"my_bucket_in") |> range(start:-5m) |> to(bucket:"%s", org:"%s")`, b
}
time.Sleep(5 * time.Millisecond)

runs, _, err := be.TaskService().FindRuns(ctx, influxdb.RunFilter{Org: &org.ID, Task: &created.ID, Limit: 1})
runs, _, err := be.TaskService().FindRuns(ctx, influxdb.RunFilter{Task: created.ID, Limit: 1})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -219,7 +219,7 @@ from(bucket:"my_bucket_in") |> range(start:-5m) |> to(bucket:"%s", org:"%s")`, b
})

// now lets see a logs
logs, _, err := be.TaskService().FindLogs(ctx, influxdb.LogFilter{Org: &org.ID, Task: &created.ID, Run: &targetRun.ID})
logs, _, err := be.TaskService().FindLogs(ctx, influxdb.LogFilter{Task: created.ID, Run: &targetRun.ID})
if err != nil {
t.Fatal(err)
}
Expand Down
56 changes: 8 additions & 48 deletions http/task_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,24 +766,7 @@ func decodeGetLogsRequest(ctx context.Context, r *http.Request, orgs platform.Or
if err != nil {
return nil, err
}
req.filter.Task = taskID

qp := r.URL.Query()

if orgName := qp.Get("org"); orgName != "" {
o, err := orgs.FindOrganization(ctx, platform.OrganizationFilter{Name: &orgName})
if err != nil {
return nil, err
}

req.filter.Org = &o.ID
} else if oid := qp.Get("orgID"); oid != "" {
orgID, err := platform.IDFromString(oid)
if err != nil {
return nil, err
}
req.filter.Org = orgID
}
req.filter.Task = *taskID

if runID := params.ByName("rid"); runID != "" {
id, err := platform.IDFromString(runID)
Expand Down Expand Up @@ -823,7 +806,7 @@ func (h *TaskHandler) handleGetRuns(w http.ResponseWriter, r *http.Request) {
return
}

if err := encodeResponse(ctx, w, http.StatusOK, newRunsResponse(runs, *req.filter.Task)); err != nil {
if err := encodeResponse(ctx, w, http.StatusOK, newRunsResponse(runs, req.filter.Task)); err != nil {
logEncodingError(h.logger, r, err)
return
}
Expand All @@ -848,25 +831,10 @@ func decodeGetRunsRequest(ctx context.Context, r *http.Request, orgs platform.Or
if err != nil {
return nil, err
}
req.filter.Task = taskID
req.filter.Task = *taskID

qp := r.URL.Query()

if orgName := qp.Get("org"); orgName != "" {
o, err := orgs.FindOrganization(ctx, platform.OrganizationFilter{Name: &orgName})
if err != nil {
return nil, err
}

req.filter.Org = &o.ID
} else if orgID := qp.Get("orgID"); orgID != "" {
oid, err := platform.IDFromString(orgID)
if err != nil {
return nil, err
}
req.filter.Org = oid
}

if id := qp.Get("after"); id != "" {
afterID, err := platform.IDFromString(id)
if err != nil {
Expand Down Expand Up @@ -1401,26 +1369,21 @@ func (t TaskService) DeleteTask(ctx context.Context, id platform.ID) error {

// FindLogs returns logs for a run.
func (t TaskService) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) {
if filter.Task == nil {
if !filter.Task.Valid() {
return nil, 0, errors.New("task ID required")
}

var urlPath string
if filter.Run == nil {
urlPath = path.Join(taskIDPath(*filter.Task), "logs")
urlPath = path.Join(taskIDPath(filter.Task), "logs")
} else {
urlPath = path.Join(taskIDRunIDPath(*filter.Task, *filter.Run), "logs")
urlPath = path.Join(taskIDRunIDPath(filter.Task, *filter.Run), "logs")
}

u, err := newURL(t.Addr, urlPath)
if err != nil {
return nil, 0, err
}
val := url.Values{}
if filter.Org != nil {
val.Set("orgID", filter.Org.String())
}
u.RawQuery = val.Encode()

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
Expand Down Expand Up @@ -1450,19 +1413,16 @@ func (t TaskService) FindLogs(ctx context.Context, filter platform.LogFilter) ([

// FindRuns returns a list of runs that match a filter and the total count of returned runs.
func (t TaskService) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) {
if filter.Task == nil {
if !filter.Task.Valid() {
return nil, 0, errors.New("task ID required")
}

u, err := newURL(t.Addr, taskIDRunsPath(*filter.Task))
u, err := newURL(t.Addr, taskIDRunsPath(filter.Task))
if err != nil {
return nil, 0, err
}

val := url.Values{}
if filter.Org != nil {
val.Set("orgID", filter.Org.String())
}
if filter.After != nil {
val.Set("after", filter.After.String())
}
Expand Down
8 changes: 4 additions & 4 deletions http/task_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func TestTaskHandler_handleGetRuns(t *testing.T) {
runs := []*platform.Run{
{
ID: platform.ID(2),
TaskID: *f.Task,
TaskID: f.Task,
Status: "success",
ScheduledFor: "2018-12-01T17:00:13Z",
StartedAt: "2018-12-01T17:00:03.155645Z",
Expand Down Expand Up @@ -617,7 +617,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
name: "get task logs",
svc: &mock.TaskService{
FindLogsFn: func(_ context.Context, f platform.LogFilter) ([]*platform.Log, int, error) {
if *f.Task == taskID {
if f.Task == taskID {
return nil, 0, nil
}

Expand All @@ -633,7 +633,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
name: "get run logs",
svc: &mock.TaskService{
FindLogsFn: func(_ context.Context, f platform.LogFilter) ([]*platform.Log, int, error) {
if *f.Task != taskID {
if f.Task != taskID {
return nil, 0, backend.ErrTaskNotFound
}
if *f.Run != runID {
Expand All @@ -652,7 +652,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
name: "get runs",
svc: &mock.TaskService{
FindRunsFn: func(_ context.Context, f platform.RunFilter) ([]*platform.Run, int, error) {
if *f.Task != taskID {
if f.Task != taskID {
return nil, 0, backend.ErrTaskNotFound
}

Expand Down
15 changes: 9 additions & 6 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,17 +251,20 @@ type TaskFilter struct {

// RunFilter represents a set of filters that restrict the returned results
type RunFilter struct {
Org *ID
Task *ID
// Task ID is required for listing runs.
Task ID

After *ID
Limit int
AfterTime string
BeforeTime string
}

// LogFilter represents a set of filters that restrict the returned results
// LogFilter represents a set of filters that restrict the returned log results.
type LogFilter struct {
Org *ID
Task *ID
Run *ID
// Task ID is required.
Task ID

// The optional Run ID limits logs to a single run.
Run *ID
}
41 changes: 27 additions & 14 deletions task/backend/inmem_logreaderwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,20 @@ import (
platform "github.com/influxdata/influxdb"
)

// orgtask is used as a key for storing runs by org and task ID.
// This is only relevant for the in-memory run store.
type orgtask struct {
o, t platform.ID
}

type runReaderWriter struct {
mu sync.RWMutex
byTaskID map[string][]*platform.Run
byRunID map[string]*platform.Run
mu sync.RWMutex
byOrgTask map[orgtask][]*platform.Run
byRunID map[string]*platform.Run
}

func NewInMemRunReaderWriter() *runReaderWriter {
return &runReaderWriter{byRunID: map[string]*platform.Run{}, byTaskID: map[string][]*platform.Run{}}
return &runReaderWriter{byRunID: map[string]*platform.Run{}, byOrgTask: map[orgtask][]*platform.Run{}}
}

func (r *runReaderWriter) UpdateRunState(ctx context.Context, rlb RunLogBase, when time.Time, status RunStatus) error {
Expand Down Expand Up @@ -48,8 +54,8 @@ func (r *runReaderWriter) UpdateRunState(ctx context.Context, rlb RunLogBase, wh
}
timeSetter(run)
r.byRunID[ridStr] = run
tidStr := rlb.Task.ID.String()
r.byTaskID[tidStr] = append(r.byTaskID[tidStr], run)
ot := orgtask{o: rlb.Task.Org, t: rlb.Task.ID}
r.byOrgTask[ot] = append(r.byOrgTask[ot], run)
return nil
}

Expand All @@ -75,15 +81,15 @@ func (r *runReaderWriter) AddRunLog(ctx context.Context, rlb RunLogBase, when ti
return nil
}

func (r *runReaderWriter) ListRuns(ctx context.Context, runFilter platform.RunFilter) ([]*platform.Run, error) {
func (r *runReaderWriter) ListRuns(ctx context.Context, orgID platform.ID, runFilter platform.RunFilter) ([]*platform.Run, error) {
r.mu.RLock()
defer r.mu.RUnlock()

if runFilter.Task == nil {
if !runFilter.Task.Valid() {
return nil, errors.New("task is required")
}

ex, ok := r.byTaskID[runFilter.Task.String()]
ex, ok := r.byOrgTask[orgtask{o: orgID, t: runFilter.Task}]
if !ok {
return nil, ErrRunNotFound
}
Expand Down Expand Up @@ -129,25 +135,32 @@ func (r *runReaderWriter) FindRunByID(ctx context.Context, orgID, runID platform
return &rtnRun, nil
}

func (r *runReaderWriter) ListLogs(ctx context.Context, logFilter platform.LogFilter) ([]platform.Log, error) {
func (r *runReaderWriter) ListLogs(ctx context.Context, orgID platform.ID, logFilter platform.LogFilter) ([]platform.Log, error) {
r.mu.RLock()
defer r.mu.RUnlock()

if logFilter.Task == nil && logFilter.Run == nil {
return nil, errors.New("task or run is required")
if !logFilter.Task.Valid() {
return nil, errors.New("task ID required")
}

if logFilter.Run != nil {
run, ok := r.byRunID[logFilter.Run.String()]
if !ok {
return nil, ErrRunNotFound
}
// TODO(mr): validate that task ID matches, if task is also set. Needs test.
return []platform.Log{run.Log}, nil
}

logs := []platform.Log{}
for _, run := range r.byTaskID[logFilter.Task.String()] {
var logs []platform.Log
ot := orgtask{o: orgID, t: logFilter.Task}
for _, run := range r.byOrgTask[ot] {
logs = append(logs, run.Log)
}

if len(logs) == 0 {
return nil, errors.New("no matching runs found")
}

return logs, nil
}
Loading

0 comments on commit cbef811

Please sign in to comment.