Skip to content

Commit

Permalink
Added memory tracking to the status monitor.
Browse files Browse the repository at this point in the history
This allows us to show the user how much memory they are using.
This should help notify users if they are getting close to an
OOM situation.
  • Loading branch information
AWoloszyn committed Jan 22, 2019
1 parent 176561a commit f100d3b
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 10 deletions.
1 change: 1 addition & 0 deletions cmd/gapit/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ type (
StatusFlags struct {
Gapis GapisFlags
StatusUpdateInterval int `help:"Provides status updates at the given interval (in ms)"`
MemoryUpdateInterval int `help:"Provides memory updates at the given interval (in ms)"`
}

PackagesFlags struct {
Expand Down
29 changes: 27 additions & 2 deletions cmd/gapit/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,22 @@ func clear() {
}
}

func readableBytes(nBytes uint64) string {
suffixes := []string{"B", "KB", "MB", "GB", "TB", "PB", "EB"}
i := 0
nBytesRemainder := uint64(0)
for nBytes > 1024 {
nBytesRemainder = nBytes & 0x3FF
nBytes >>= 10
i++
}
if i == 0 {
return fmt.Sprintf("%v%v", nBytes, suffixes[i])
} else {
return fmt.Sprintf("%.3f%v", float32(nBytes)+(float32(nBytesRemainder)/1024.0), suffixes[i])
}
}

func (verb *statusVerb) Run(ctx context.Context, flags flag.FlagSet) error {
client, err := getGapis(ctx, verb.Gapis, GapirFlags{})
if err != nil {
Expand All @@ -85,6 +101,8 @@ func (verb *statusVerb) Run(ctx context.Context, flags flag.FlagSet) error {
ancestors := make(map[uint64][]uint64)
activeTasks := make(map[uint64]*tsk)
totalBlocked := 0
currentMemoryUsage := uint64(0)
maxMemoryUsage := uint64(0)

var findTask func(map[uint64]*tsk, []uint64) *tsk

Expand Down Expand Up @@ -142,6 +160,7 @@ func (verb *statusVerb) Run(ctx context.Context, flags flag.FlagSet) error {
statusMutex.Lock()
defer statusMutex.Unlock()
clear()
fmt.Printf("Memory Usage: %v Max: %v\n", readableBytes(currentMemoryUsage), readableBytes(maxMemoryUsage))
fmt.Printf("Active Tasks: \n")
print(activeTasks, 1, false)
fmt.Printf("Background Tasks: \n")
Expand All @@ -152,7 +171,8 @@ func (verb *statusVerb) Run(ctx context.Context, flags flag.FlagSet) error {
})
defer stopPolling()

endStat, err := client.Status(ctx, 0,
endStat, err := client.Status(ctx,
time.Duration(verb.MemoryUpdateInterval/2)*time.Millisecond,
time.Duration(verb.StatusUpdateInterval/2)*time.Millisecond,
func(tu *service.TaskUpdate) {
statusMutex.Lock()
Expand Down Expand Up @@ -288,7 +308,12 @@ func (verb *statusVerb) Run(ctx context.Context, flags flag.FlagSet) error {
} else if tu.Status == service.TaskStatus_EVENT {
fmt.Printf("EVENT--> %+v\n", tu.Event)
}
}, nil)
}, func(tu *service.MemoryStatus) {
if tu.TotalHeap > maxMemoryUsage {
maxMemoryUsage = tu.TotalHeap
}
currentMemoryUsage = tu.TotalHeap
})
if err != nil {
return log.Err(ctx, err, "Failed to connect to the GAPIS status stream")
}
Expand Down
4 changes: 2 additions & 2 deletions gapis/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@ func (c *client) Profile(
}

func (c *client) Status(
ctx context.Context, snapshotInterval uint32, statusUpdateFrequency time.Duration, f func(*service.TaskUpdate), m func(*service.MemoryStatus)) (stop func() error, err error) {
ctx context.Context, snapshotInterval time.Duration, statusUpdateFrequency time.Duration, f func(*service.TaskUpdate), m func(*service.MemoryStatus)) (stop func() error, err error) {

stream, err := c.client.Status(ctx)
if err != nil {
return nil, err
}

req := &service.ServerStatusRequest{Enable: true, MemorySnapshotInterval: snapshotInterval, StatusUpdateFrequency: float32(statusUpdateFrequency.Seconds())}
req := &service.ServerStatusRequest{Enable: true, MemorySnapshotInterval: float32(snapshotInterval.Seconds()), StatusUpdateFrequency: float32(statusUpdateFrequency.Seconds())}

if err := stream.Send(req); err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion gapis/server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (s *grpcServer) Status(stream service.Gapid_StatusServer) error {

// Start the profile.
stop, err = s.handler.Status(ctx,
req.MemorySnapshotInterval,
time.Duration(float32(time.Second)*req.MemorySnapshotInterval),
time.Duration(float32(time.Second)*req.StatusUpdateFrequency),
f,
m,
Expand Down
9 changes: 6 additions & 3 deletions gapis/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,12 +578,15 @@ func (l *statusListener) OnEvent(ctx context.Context, task *status.Task, event s
}
}

func (l *statusListener) OnMemorySnapshot(context.Context, runtime.MemStats) {
l.m(&service.MemoryStatus{})
func (l *statusListener) OnMemorySnapshot(ctx context.Context, stats runtime.MemStats) {
l.m(&service.MemoryStatus{
TotalHeap: stats.Alloc,
})
}

func (s *server) Status(
ctx context.Context, snapshotInterval uint32,
ctx context.Context,
snapshotInterval time.Duration,
statusUpdateFrequency time.Duration,
f func(*service.TaskUpdate),
m func(*service.MemoryStatus)) (func() error, error) {
Expand Down
6 changes: 5 additions & 1 deletion gapis/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ type Service interface {
Profile(ctx context.Context, pprof, trace io.Writer, memorySnapshotInterval uint32) (stop func() error, err error)

// Status starts resolving status events. It calls f for every update and m for every memory update.
Status(ctx context.Context, snapshotInterval uint32, statusUpdateFrequency time.Duration, f func(*TaskUpdate), m func(*MemoryStatus)) (stop func() error, err error)
Status(ctx context.Context,
snapshotInterval time.Duration,
statusUpdateFrequency time.Duration,
f func(*TaskUpdate),
m func(*MemoryStatus)) (stop func() error, err error)

// GetPerformanceCounters returns the values of all global counters as
// a string.
Expand Down
3 changes: 2 additions & 1 deletion gapis/service/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,12 @@ message TaskUpdate {
}

message MemoryStatus {
uint64 totalHeap = 1;
}

message ServerStatusRequest {
bool enable = 1;
uint32 memory_snapshot_interval = 2;
float memory_snapshot_interval = 2;
float status_update_frequency = 3;
}

Expand Down

0 comments on commit f100d3b

Please sign in to comment.