Skip to content

Commit

Permalink
Make the status RPC unidirectional.
Browse files Browse the repository at this point in the history
Also marks the replay scheduler as a background task.
  • Loading branch information
AWoloszyn authored and pmuetschard committed Feb 2, 2019
1 parent 0b7fedd commit f8d8de7
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 247 deletions.
274 changes: 130 additions & 144 deletions cmd/gapit/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"github.com/google/gapid/core/app"
"github.com/google/gapid/core/app/crash"
"github.com/google/gapid/core/event/task"
"github.com/google/gapid/core/log"
"github.com/google/gapid/gapis/service"
Expand Down Expand Up @@ -75,6 +76,26 @@ func readableBytes(nBytes uint64) string {
}
}

func newTask(id, parent uint64, name string, background bool) *tsk {
return &tsk{
id: id,
parentID: parent,
name: name,
background: background,
children: map[uint64]*tsk{},
}
}

type tsk struct {
id uint64
parentID uint64
name string
background bool
progress int32
blocked bool
children map[uint64]*tsk
}

func (verb *statusVerb) Run(ctx context.Context, flags flag.FlagSet) error {
client, err := getGapis(ctx, verb.Gapis, GapirFlags{})
if err != nil {
Expand All @@ -88,16 +109,6 @@ func (verb *statusVerb) Run(ctx context.Context, flags flag.FlagSet) error {

statusMutex := sync.Mutex{}

type tsk struct {
id uint64
parentID uint64
name string
background bool
progress int32
blocked bool
children map[uint64]*tsk
}

ancestors := make(map[uint64][]uint64)
activeTasks := make(map[uint64]*tsk)
totalBlocked := 0
Expand Down Expand Up @@ -171,163 +182,138 @@ func (verb *statusVerb) Run(ctx context.Context, flags flag.FlagSet) error {
})
defer stopPolling()

endStat, err := client.Status(ctx,
time.Duration(verb.MemoryUpdateInterval/2)*time.Millisecond,
time.Duration(verb.StatusUpdateInterval/2)*time.Millisecond,
func(tu *service.TaskUpdate) {
statusMutex.Lock()
defer statusMutex.Unlock()
ec := make(chan error)
ctx, cancel := context.WithCancel(ctx)
crash.Go(func() {
err := client.Status(ctx,
time.Duration(verb.MemoryUpdateInterval/2)*time.Millisecond,
time.Duration(verb.StatusUpdateInterval/2)*time.Millisecond,
func(tu *service.TaskUpdate) {
statusMutex.Lock()
defer statusMutex.Unlock()

if tu.Status == service.TaskStatus_STARTING {
// If this is a top-level task, add it to our list of top-level tasks.
if tu.Parent == 0 {
activeTasks[tu.Id] = &tsk{
tu.Id,
0,
tu.Name,
tu.Background,
0,
false,
make(map[uint64]*tsk),
}
} else {
if p, ok := ancestors[tu.Parent]; ok {
// If we can find this tasks parent, then add it in the tree
if a := findTask(activeTasks, append(p, tu.Parent)); a != nil {
a.children[tu.Id] = &tsk{
if tu.Status == service.TaskStatus_STARTING {
// If this is a top-level task, add it to our list of top-level tasks.
if tu.Parent == 0 {
activeTasks[tu.Id] = newTask(tu.Id, 0, tu.Name, tu.Background)
} else {
if p, ok := ancestors[tu.Parent]; ok {
// If we can find this tasks parent, then add it in the tree
if a := findTask(activeTasks, append(p, tu.Parent)); a != nil {
a.children[tu.Id] = newTask(tu.Id, tu.Parent, tu.Name, tu.Background || a.background)
ans := append([]uint64{}, ancestors[tu.Parent]...)
ancestors[tu.Id] = append(ans, tu.Parent)
} else {
// If we don't have the parent for whatever reason,
// treat this as a top-level task.
activeTasks[tu.Id] = newTask(
tu.Id,
0,
tu.Name,
tu.Background)
}
} else if a, ok := activeTasks[tu.Parent]; ok {
// If the parent of this task is a top level task,
// then add it there.
a.children[tu.Id] = newTask(
tu.Id,
tu.Parent,
tu.Name,
tu.Background || a.background,
0,
false,
make(map[uint64]*tsk),
}
tu.Background || a.background)
ans := append([]uint64{}, ancestors[tu.Parent]...)
ancestors[tu.Id] = append(ans, tu.Parent)
} else {
// If we don't have the parent for whatever reason,
// treat this as a top-level task.
activeTasks[tu.Id] = &tsk{
// Fallback to adding this as its own top-level task.
activeTasks[tu.Id] = newTask(
tu.Id,
0,
tu.Name,
tu.Background,
0,
false,
make(map[uint64]*tsk),
tu.Background)
}
}
} else if tu.Status == service.TaskStatus_FINISHED {
// Remove this from all parents.
// Make sure to fix up our "totalBlocked" if our
// blocked task finished.
loc := []uint64{}
if a, ok := ancestors[tu.Id]; ok {
loc = a
}
loc = append(loc, tu.Id)
forLineage(activeTasks, loc, func(t *tsk) {
if t.blocked {
if totalBlocked > 0 {
totalBlocked--
}
}
} else if a, ok := activeTasks[tu.Parent]; ok {
// If the parent of this task is a top level task,
// then add it there.
a.children[tu.Id] = &tsk{
tu.Id,
tu.Parent,
tu.Name,
tu.Background || a.background,
0,
false,
make(map[uint64]*tsk),
})
if len(loc) > 1 {
// Find the parent, and remove us
if t := findTask(activeTasks, loc[:len(loc)-1]); t != nil {
delete(t.children, tu.Id)
}
ans := append([]uint64{}, ancestors[tu.Parent]...)
ancestors[tu.Id] = append(ans, tu.Parent)
} else {
// Fallback to adding this as its own top-level task.
activeTasks[tu.Id] = &tsk{
tu.Id,
0,
tu.Name,
tu.Background,
0,
false,
make(map[uint64]*tsk),
}
delete(activeTasks, tu.Id)
}
}
} else if tu.Status == service.TaskStatus_FINISHED {
// Remove this from all parents.
// Make sure to fix up our "totalBlocked" if our
// blocked task finished.
loc := []uint64{}
if a, ok := ancestors[tu.Id]; ok {
loc = a
}
loc = append(loc, tu.Id)
forLineage(activeTasks, loc, func(t *tsk) {
if t.blocked {
} else if tu.Status == service.TaskStatus_PROGRESS {
// Simply update the progress for our task
loc := []uint64{}
if a, ok := ancestors[tu.Id]; ok {
loc = a
}
loc = append(loc, tu.Id)
if a := findTask(activeTasks, loc); a != nil {
a.progress = tu.CompletePercent
}
} else if tu.Status == service.TaskStatus_BLOCKED {
// If a task becomes blocked, then we should block
// it and all of its ancestors.
loc := []uint64{}
if a, ok := ancestors[tu.Id]; ok {
loc = a
}
loc = append(loc, tu.Id)
forLineage(activeTasks, loc, func(t *tsk) {
totalBlocked++
t.blocked = true
})
} else if tu.Status == service.TaskStatus_UNBLOCKED {
// If a task becomes unblocked, then we should unblock
// it and all of its ancestors.
loc := []uint64{}
if a, ok := ancestors[tu.Id]; ok {
loc = a
}
loc = append(loc, tu.Id)
forLineage(activeTasks, loc, func(t *tsk) {
if totalBlocked > 0 {
totalBlocked--
}
}
})
if len(loc) > 1 {
// Find the parent, and remove us
if t := findTask(activeTasks, loc[:len(loc)-1]); t != nil {
delete(t.children, tu.Id)
}
} else {
delete(activeTasks, tu.Id)
}
} else if tu.Status == service.TaskStatus_PROGRESS {
// Simply update the progress for our task
loc := []uint64{}
if a, ok := ancestors[tu.Id]; ok {
loc = a
}
loc = append(loc, tu.Id)
if a := findTask(activeTasks, loc); a != nil {
a.progress = tu.CompletePercent
t.blocked = false
})
} else if tu.Status == service.TaskStatus_EVENT {
fmt.Printf("EVENT--> %+v\n", tu.Event)
}
} else if tu.Status == service.TaskStatus_BLOCKED {
// If a task becomes blocked, then we should block
// it and all of its ancestors.
loc := []uint64{}
if a, ok := ancestors[tu.Id]; ok {
loc = a
}, func(tu *service.MemoryStatus) {
if tu.TotalHeap > maxMemoryUsage {
maxMemoryUsage = tu.TotalHeap
}
loc = append(loc, tu.Id)
forLineage(activeTasks, loc, func(t *tsk) {
totalBlocked++
t.blocked = true
})
} else if tu.Status == service.TaskStatus_UNBLOCKED {
// If a task becomes unblocked, then we should unblock
// it and all of its ancestors.
loc := []uint64{}
if a, ok := ancestors[tu.Id]; ok {
loc = a
}
loc = append(loc, tu.Id)
forLineage(activeTasks, loc, func(t *tsk) {
if totalBlocked > 0 {
totalBlocked--
}
t.blocked = false
})
} else if tu.Status == service.TaskStatus_EVENT {
fmt.Printf("EVENT--> %+v\n", tu.Event)
}
}, func(tu *service.MemoryStatus) {
if tu.TotalHeap > maxMemoryUsage {
maxMemoryUsage = tu.TotalHeap
}
currentMemoryUsage = tu.TotalHeap
})
if err != nil {
return log.Err(ctx, err, "Failed to connect to the GAPIS status stream")
}
defer endStat()
currentMemoryUsage = tu.TotalHeap
})
ec <- err
})

var wait sync.WaitGroup
wait.Add(1)
var sigChan chan os.Signal
sigChan = make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
go func() {
<-sigChan
wait.Done()
}()
wait.Wait()

select {
case <-sigChan:
cancel()
case err := <-ec:
if err != nil {
return log.Err(ctx, err, "Failed to connect to the GAPIS status stream")
}
}
return nil
}
54 changes: 20 additions & 34 deletions gapis/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,49 +182,35 @@ func (c *client) Profile(
}

func (c *client) Status(
ctx context.Context, snapshotInterval time.Duration, statusUpdateFrequency time.Duration, f func(*service.TaskUpdate), m func(*service.MemoryStatus)) (stop func() error, err error) {
ctx context.Context, snapshotInterval time.Duration, statusUpdateFrequency time.Duration, f func(*service.TaskUpdate), m func(*service.MemoryStatus)) error {

stream, err := c.client.Status(ctx)
if err != nil {
return nil, err
}

req := &service.ServerStatusRequest{Enable: true, MemorySnapshotInterval: float32(snapshotInterval.Seconds()), StatusUpdateFrequency: float32(statusUpdateFrequency.Seconds())}
req := &service.ServerStatusRequest{MemorySnapshotInterval: float32(snapshotInterval.Seconds()), StatusUpdateFrequency: float32(statusUpdateFrequency.Seconds())}

if err := stream.Send(req); err != nil {
return nil, err
stream, err := c.client.Status(ctx, req)
if err != nil {
return err
}

waitForEOF := task.Async(ctx, func(ctx context.Context) error {
for {
r, err := stream.Recv()
if err != nil {
if errors.Cause(err) == io.EOF {
return nil
}
return err
}
if _, ok := r.Res.(*service.ServerStatusResponse_Task); ok {
if f != nil {
f(r.GetTask())
}
} else if _, ok := r.Res.(*service.ServerStatusResponse_Memory); ok {
if m != nil {
m(r.GetMemory())
}
for {
r, err := stream.Recv()
if err != nil {
if errors.Cause(err) == io.EOF {
return nil
}
}
})

stop = func() error {
// Tell the server we want to stop profiling.
if err := stream.Send(&service.ServerStatusRequest{}); err != nil {
return err
}
return waitForEOF()
if _, ok := r.Res.(*service.ServerStatusResponse_Task); ok {
if f != nil {
f(r.GetTask())
}
} else if _, ok := r.Res.(*service.ServerStatusResponse_Memory); ok {
if m != nil {
m(r.GetMemory())
}
}
}

return stop, nil
return nil
}

func (c *client) GetPerformanceCounters(ctx context.Context) (string, error) {
Expand Down
Loading

0 comments on commit f8d8de7

Please sign in to comment.