Skip to content

Commit

Permalink
[workflow] Optionally fetch all vreplication_logs for each stream
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Mason <[email protected]>
  • Loading branch information
ajm188 committed Jun 13, 2021
1 parent 80bce00 commit 24b84a4
Showing 1 changed file with 150 additions and 0 deletions.
150 changes: 150 additions & 0 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"context"
"errors"
"fmt"
"sort"
"strings"
"sync"
"time"

"google.golang.org/protobuf/encoding/prototext"
Expand Down Expand Up @@ -465,6 +467,147 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
if err := scanWorkflow(ctx, workflow, row, tablet); err != nil {
return nil, err
}

// Sort shard streams by stream_id ASC, to support an optimization
// in fetchStreamLogs below.
for _, shardStreams := range workflow.ShardStreams {
sort.Slice(shardStreams.Streams, func(i, j int) bool {
return shardStreams.Streams[i].Id < shardStreams.Streams[j].Id
})
}
}
}

var (
wg sync.WaitGroup
vrepLogQuery = strings.TrimSpace(`
SELECT
id,
vrepl_id,
type,
state,
message,
created_at,
updated_at,
count
FROM
_vt.vreplication_log
ORDER BY
vrepl_id ASC,
id ASC
`)
)

fetchStreamLogs := func(ctx context.Context, workflow *vtctldatapb.Workflow) {
defer wg.Done()

results, err := vx.WithWorkflow(workflow.Name).QueryContext(ctx, vrepLogQuery)
if err != nil {
// Note that we do not return here. If there are any query results
// in the map (i.e. some tablets returned successfully), we will
// still try to read log rows from them on a best-effort basis. But,
// we will also pre-emptively record the top-level fetch error on
// every stream in every shard in the workflow. Further processing
// below may override the error message for certain streams.
for _, streams := range workflow.ShardStreams {
for _, stream := range streams.Streams {
stream.LogFetchError = err.Error()
}
}
}

for target, p3qr := range results {
qr := sqltypes.Proto3ToResult(p3qr)
shardStreamKey := fmt.Sprintf("%s/%s", target.Shard, target.AliasString())

ss, ok := workflow.ShardStreams[shardStreamKey]
if !ok || ss == nil {
continue
}

streams := ss.Streams
streamIdx := 0
markErrors := func(err error) {
if streamIdx >= len(streams) {
return
}

streams[streamIdx].LogFetchError = err.Error()
}

for _, row := range qr.Rows {
id, err := evalengine.ToInt64(row[0])
if err != nil {
markErrors(err)
continue
}

streamID, err := evalengine.ToInt64(row[1])
if err != nil {
markErrors(err)
continue
}

typ := row[2].ToString()
state := row[3].ToString()
message := row[4].ToString()

createdAt, err := time.Parse("2006-01-02 15:04:05", row[5].ToString())
if err != nil {
markErrors(err)
continue
}

updatedAt, err := time.Parse("2006-01-02 15:04:05", row[6].ToString())
if err != nil {
markErrors(err)
continue
}

count, err := evalengine.ToInt64(row[7])
if err != nil {
markErrors(err)
continue
}

streamLog := &vtctldatapb.Workflow_Stream_Log{
Id: id,
StreamId: streamID,
Type: typ,
State: state,
CreatedAt: &vttime.Time{
Seconds: createdAt.Unix(),
},
UpdatedAt: &vttime.Time{
Seconds: updatedAt.Unix(),
},
Message: message,
Count: count,
}

// Earlier, in the main loop where we called scanWorkflow for
// each _vt.vreplication row, we also sorted each ShardStreams
// slice by ascending id, and our _vt.vreplication_log query
// ordered by (stream_id ASC, id ASC), so we can walk the
// streams in index order in O(n) amortized over all the rows
// for this tablet.
for streamIdx < len(streams) {
stream := streams[streamIdx]
if stream.Id < streamLog.StreamId {
streamIdx++
continue
}

if stream.Id > streamLog.StreamId {
log.Warningf("Found stream log for nonexistent stream: %+v", streamLog)
break
}

// stream.Id == streamLog.StreamId
stream.Logs = append(stream.Logs, streamLog)
break
}
}
}
}

Expand Down Expand Up @@ -508,9 +651,16 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows

workflow.MaxVReplicationLag = int64(maxVReplicationLag)

// Fetch logs for all streams associated with this workflow in the background.
wg.Add(1)
go fetchStreamLogs(ctx, workflow)

workflows = append(workflows, workflow)
}

// Wait for all the log fetchers to finish.
wg.Wait()

return &vtctldatapb.GetWorkflowsResponse{
Workflows: workflows,
}, nil
Expand Down

0 comments on commit 24b84a4

Please sign in to comment.