Skip to content

Commit

Permalink
Get job by job_id in redis
Browse files Browse the repository at this point in the history
  Get the last 10MB of data if it exceeds 10MB

Signed-off-by: stonezdj <[email protected]>
  • Loading branch information
stonezdj committed Feb 21, 2023
1 parent 99b3711 commit c89ae08
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 2 deletions.
33 changes: 33 additions & 0 deletions api/v2.0/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4593,6 +4593,39 @@ paths:
$ref: '#/responses/404'
'500':
$ref: '#/responses/500'
/jobservice/jobs/{job_id}/log:
get:
operationId: getJobLog
summary: Get job log by job id
description: Get job log by job id, it is only used by administrator
produces:
- text/plain
tags:
- jobservice
parameters:
- $ref: '#/parameters/requestId'
- name: job_id
in: path
required: true
type: string
description: The id of the job.
responses:
'200':
description: Get job log successfully.
headers:
Content-Type:
description: The content type of response body
type: string
schema:
type: string
'401':
$ref: '#/responses/401'
'403':
$ref: '#/responses/403'
'404':
$ref: '#/responses/404'
'500':
$ref: '#/responses/500'
/jobservice/queues:
get:
operationId: listJobQueues
Expand Down
5 changes: 5 additions & 0 deletions src/controller/jobmonitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type MonitorController interface {
PauseJobQueues(ctx context.Context, jobType string) error
// ResumeJobQueues resume the job queue by type
ResumeJobQueues(ctx context.Context, jobType string) error
JobLog(ctx context.Context, jobID string) ([]byte, error)
}

type monitorController struct {
Expand Down Expand Up @@ -366,3 +367,7 @@ func (w *monitorController) resumeQueue(ctx context.Context, jobType string) err
}
return nil
}

func (w *monitorController) JobLog(ctx context.Context, jobID string) ([]byte, error) {
return w.taskManager.GetLogByJobID(ctx, jobID)
}
3 changes: 3 additions & 0 deletions src/jobservice/logger/getter/Interface.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package getter

// LogSizeLimit limits the size of log returned
const LogSizeLimit = 10 * 1024 * 1024

// Interface defines operations of a log data getter
type Interface interface {
// Retrieve the log data of the specified log entry
Expand Down
7 changes: 6 additions & 1 deletion src/jobservice/logger/getter/db_getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,10 @@ func (dbg *DBGetter) Retrieve(logID string) ([]byte, error) {
return nil, errs.NoObjectFoundError(fmt.Sprintf("log entity: %s", logID))
}

return []byte(jobLog.Content), nil
sz := len(jobLog.Content)
var buf []byte
if sz > LogSizeLimit {
buf = []byte(jobLog.Content[sz-LogSizeLimit:])
}
return buf, nil
}
37 changes: 36 additions & 1 deletion src/jobservice/logger/getter/file_getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (fg *FileGetter) Retrieve(logID string) ([]byte, error) {
return nil, errs.NoObjectFoundError(logID)
}

return os.ReadFile(fPath)
return tailLogFile(fPath, LogSizeLimit)
}

func isValidLogID(id string) error {
Expand All @@ -54,3 +54,38 @@ func isValidLogID(id string) error {

return nil
}

func tailLogFile(filename string, limit int64) ([]byte, error) {
fInfo, err := os.Stat(filename)
if err != nil {
return nil, err
}
size := fInfo.Size()

sizeToRead := limit

if sizeToRead > size {
sizeToRead = size
}

fi, err := os.Open(filename)
if err != nil {
return nil, err
}
defer fi.Close()

pos := size - sizeToRead
if pos < 0 {
pos = 0
}
if pos != 0 {
_, err = fi.Seek(pos, 0)
if err != nil {
return nil, err
}
}

buf := make([]byte, sizeToRead)
_, err = fi.Read(buf)
return buf, err
}
29 changes: 29 additions & 0 deletions src/jobservice/logger/getter/file_getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,32 @@ func TestLogDataGetter(t *testing.T) {
t.Errorf("expect reading 5 bytes but got %d bytes", len(data))
}
}

func Test_tailLogFile(t *testing.T) {
type args struct {
filename string
mbs int64
}
tests := []struct {
name string
args args
want int
wantErr bool
}{
{"normal test", args{"testdata/normal.log", 1000}, len(`hello world`), false},
{"truncated test", args{"testdata/truncated.log", 1000}, 1000, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tailLogFile(tt.args.filename, tt.args.mbs)
if (err != nil) != tt.wantErr {
t.Errorf("tailLogFile() error = %v, wantErr %v", err, tt.wantErr)
return
}
// result should always less than the size limit
if len(got) > tt.want {
t.Errorf("tailLogFile() got = %v, want %v", len(got), tt.want)
}
})
}
}
179 changes: 179 additions & 0 deletions src/jobservice/logger/getter/testdata/truncated.log

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions src/pkg/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type Manager interface {
UpdateExtraAttrs(ctx context.Context, id int64, extraAttrs map[string]interface{}) (err error)
// Get the log of the specified task
GetLog(ctx context.Context, id int64) (log []byte, err error)
// GetLogByJobID get the log of specified job id
GetLogByJobID(ctx context.Context, jobID string) (log []byte, err error)
// Count counts total of tasks according to the query.
// Query the "ExtraAttrs" by setting 'query.Keywords["ExtraAttrs.key"]="value"'
Count(ctx context.Context, query *q.Query) (int64, error)
Expand Down Expand Up @@ -259,3 +261,7 @@ func (m *manager) UpdateStatusInBatch(ctx context.Context, jobIDs []string, stat
func (m *manager) ExecutionIDsByVendorAndStatus(ctx context.Context, vendorType, status string) ([]int64, error) {
return m.dao.ExecutionIDsByVendorAndStatus(ctx, vendorType, status)
}

func (m *manager) GetLogByJobID(ctx context.Context, jobID string) (log []byte, err error) {
return m.jsClient.GetJobLog(jobID)
}
11 changes: 11 additions & 0 deletions src/server/v2.0/handler/jobservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,14 @@ func (j *jobServiceAPI) ActionPendingJobs(ctx context.Context, params jobservice
}
return jobservice.NewActionPendingJobsOK()
}

func (j *jobServiceAPI) GetJobLog(ctx context.Context, params jobservice.GetJobLogParams) middleware.Responder {
if err := j.RequireSystemAccess(ctx, rbac.ActionList, rbac.ResourceJobServiceMonitor); err != nil {
return j.SendError(ctx, err)
}
log, err := j.jobCtr.JobLog(ctx, params.JobID)
if err != nil {
return j.SendError(ctx, err)
}
return jobservice.NewGetJobLogOK().WithContentType("text/plain").WithPayload(string(log))
}

0 comments on commit c89ae08

Please sign in to comment.