Skip to content

Commit

Permalink
Added profiling to the relay and client.
Browse files Browse the repository at this point in the history
Signed-off-by: L Lakshmanan <[email protected]>
  • Loading branch information
Lakshman authored and Lakshman committed Jul 29, 2024
1 parent b1b9aa7 commit ea7b87d
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 3 deletions.
53 changes: 52 additions & 1 deletion tools/invoker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"fmt"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -53,6 +54,8 @@ const TimeseriesDBAddr = "10.96.0.84:90"
var (
completed int64
latSlice LatencySlice
profSlice LatencySlice
funcDurEnable *bool
portFlag *int
grpcTimeout time.Duration
withTracing *bool
Expand All @@ -64,6 +67,8 @@ func main() {
rps := flag.Float64("rps", 1.0, "Target requests per second")
runDuration := flag.Int("time", 5, "Run the experiment for X seconds")
latencyOutputFile := flag.String("latf", "lat.csv", "CSV file for the latency measurements in microseconds")
funcDurationOutputFile := flag.String("durf", "dur.csv", "CSV file for the function duration measurements in microseconds")
funcDurEnable := flag.Bool("profile", false, "Enable function duration profiling")
portFlag = flag.Int("port", 80, "The port that functions listen to")
withTracing = flag.Bool("trace", false, "Enable tracing in the client")
zipkin := flag.String("zipkin", "http://localhost:9411/api/v2/spans", "zipkin url")
Expand Down Expand Up @@ -107,6 +112,9 @@ func main() {
realRPS := runExperiment(endpoints, *runDuration, *rps)

writeLatencies(realRPS, *latencyOutputFile)
if *funcDurEnable {
writeFunctionDurations(*funcDurationOutputFile)
}
}

func readEndpoints(path string) (endpoints []*endpoint.Endpoint, _ error) {
Expand Down Expand Up @@ -176,7 +184,7 @@ func SayHello(address, workflowID string) {
ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout)
defer cancel()

_, err = c.SayHello(ctx, &pb.HelloRequest{
response, err := c.SayHello(ctx, &pb.HelloRequest{
Name: "Invoke relay",
VHiveMetadata: vhivemetadata.MakeVHiveMetadata(
workflowID,
Expand All @@ -187,6 +195,19 @@ func SayHello(address, workflowID string) {
if err != nil {
log.Warnf("Failed to invoke %v, err=%v", address, err)
} else {
if *funcDurEnable {
words := strings.Fields(response.Message)
lastWord := words[len(words)-1]
duration, err := strconv.ParseInt(lastWord, 10, 64)
if err == nil {
log.Debugf("Invoked %v in %v usec\n. Response: %v\n", address, duration, response.Message)
profSlice.Lock()
log.Debugf("In SayHello after lock\n")
profSlice.slice = append(profSlice.slice, duration)
log.Debugf("In SayHello before unlock after append\n")
profSlice.Unlock()
}
}
atomic.AddInt64(&completed, 1)
}
}
Expand Down Expand Up @@ -220,14 +241,18 @@ func startMeasurement(msg string) (string, time.Time) {
func getDuration(msg string, start time.Time) {
latency := time.Since(start)
log.Debugf("Invoked %v in %v usec\n", msg, latency.Microseconds())
log.Debugf("In getDuration\n")
addDurations([]time.Duration{latency})
log.Debugf("After addDurations\n")
}

func addDurations(ds []time.Duration) {
latSlice.Lock()
log.Debugf("In addDurations after lock\n")
for _, d := range ds {
latSlice.slice = append(latSlice.slice, d.Microseconds())
}
log.Debugf("In addDurations before unlock\n")
latSlice.Unlock()
}

Expand Down Expand Up @@ -256,3 +281,29 @@ func writeLatencies(rps float64, latencyOutputFile string) {
datawriter.Flush()
file.Close()
}

func writeFunctionDurations(funcDurationOutputFile string) {
profSlice.Lock()
defer profSlice.Unlock()

fileName := funcDurationOutputFile
log.Info("The measured function durations are saved in ", fileName)

file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)

if err != nil {
log.Fatal("Failed creating file: ", err)
}

datawriter := bufio.NewWriter(file)

for _, dur := range profSlice.slice {
_, err := datawriter.WriteString(strconv.FormatInt(dur, 10) + "\n")
if err != nil {
log.Fatal("Failed to write the URLs to a file ", err)
}
}

datawriter.Flush()
file.Close()
}
21 changes: 19 additions & 2 deletions tools/relay/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"fmt"
"net"
"os"
"strconv"
"time"

pb "github.com/vhive-serverless/vSwarm-proto/proto/helloworld"

Expand Down Expand Up @@ -61,6 +63,7 @@ var (
value = flag.String("value", "helloWorld", "String input to pass to benchmark")
functionMethod = flag.String("function-method", "default", "Which method of benchmark to invoke")
verbose = flag.Bool("verbose", false, "Enable verbose log printing")
profileFunction = flag.Bool("profile-function", false, "Enable function profiling")

// Client
grpcClient grpcClients.GrpcClient
Expand Down Expand Up @@ -155,8 +158,22 @@ func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloRe
// Create new packet
pkt := inputGenerator.Next()
log.Debugf("Send to func: %s\n", pkt)

startTime := time.Now()
reply, err := grpcClient.Request(ctx, pkt)
log.Debugf("Recv from func: %s\n", reply)
endTime := time.Now()
elapsedTime := int64(endTime.Sub(startTime).Microseconds())

var finalReply string

if *profileFunction {
log.Debugf("Invoked in %d usec\n. Recv from func: %s\n", elapsedTime, reply)
elapsedTimeStr := strconv.FormatInt(elapsedTime, 10)
finalReply = reply + "|" + elapsedTimeStr
} else {
log.Debugf("Recv from func: %s\n", reply)
finalReply = reply
}

return &pb.HelloReply{Message: reply}, err
return &pb.HelloReply{Message: finalReply}, err
}

0 comments on commit ea7b87d

Please sign in to comment.