Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Send correct logs to Saturn logger #40

Merged
merged 3 commits into from
Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type Config struct {
PoolLowWatermark int
// MaxRetrievalAttempts determines the number of times we will attempt to retrieve a block from the Saturn network before failing.
MaxRetrievalAttempts int

// SaturnLoggerJWT is the JWT Auth token to use when submitting logs to the Saturn logging endpoint.
SaturnLoggerJWT string
}

const DefaultMaxRetries = 3
Expand All @@ -65,6 +68,10 @@ type Caboose struct {
}

func NewCaboose(config *Config) (ipfsblockstore.Blockstore, error) {
if config.SaturnLoggerJWT == "" || len(config.SaturnLoggerJWT) == 0 {
return nil, errors.New("JWT token required for Saturn Logger")
}
Copy link
Contributor

@lidel lidel Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ this breaks flow for local development OR requires everyone on the team to get their own token.
iiurc last time we talked about this @guanzo noted it is ok to simply disable sending logs to logger when JWT token is not set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


c := Caboose{
config: config,
pool: newPool(config),
Expand Down
8 changes: 5 additions & 3 deletions cmd/caboose/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ func main1() int {
LoggingClient: http.DefaultClient,
LoggingInterval: 5 * time.Second,

DoValidation: true,
PoolRefresh: 5 * time.Minute,
SaturnClient: &saturnClient,
DoValidation: true,
PoolRefresh: 5 * time.Minute,
SaturnClient: &saturnClient,
SaturnLoggerJWT: "test",
})
if err != nil {
return err
Expand All @@ -84,5 +85,6 @@ func main1() int {
log.Println(err)
return 1
}
time.Sleep(5 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this?

Copy link
Contributor Author

@aarshkshah1992 aarshkshah1992 Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was for some debugging to ensure that we do eventually see the response of the logging endpoint before shutting down. Lemme remove this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aarshkshah1992 but.. did you push the removal? 🙃 I still see it :D

return 0
}
1 change: 1 addition & 0 deletions failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestCabooseFailures(t *testing.T) {
PoolWeightChangeDebounce: time.Duration(1),
PoolRefresh: time.Millisecond * 50,
MaxRetrievalAttempts: 2,
SaturnLoggerJWT: "rand",
})
if err != nil {
t.Fatal(err)
Expand Down
50 changes: 36 additions & 14 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package caboose
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"
Expand All @@ -19,6 +20,7 @@ type logger struct {
client *http.Client
endpoint url.URL
done chan struct{}
jwt string
}

func newLogger(c *Config) *logger {
Expand All @@ -28,6 +30,7 @@ func newLogger(c *Config) *logger {
client: c.LoggingClient,
endpoint: c.LoggingEndpoint,
done: make(chan struct{}),
jwt: c.SaturnLoggerJWT,
}
go l.background()
return &l
Expand Down Expand Up @@ -59,7 +62,24 @@ func (l *logger) submit(logs []log) {
finalLogs := bytes.NewBuffer(nil)
enc := json.NewEncoder(finalLogs)
enc.Encode(logBatch{logs})
l.client.Post(l.endpoint.String(), "application/json", finalLogs)

req, err := http.NewRequest(http.MethodPost, l.endpoint.String(), finalLogs)
if err != nil {
goLogger.Errorw("failed to create http request to submit saturn logs", "err", err)
return
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", l.jwt))

resp, err := l.client.Do(req)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than having this a specific saturn logging needs a jwt bearer token - which is a pretty different level of abstraction than the rest of this library so far - i wonder if we can have this header set by the configured http client.

so e.g. we set the specific client/url at https://github.com/ipfs/bifrost-gateway/blob/main/blockstore.go#L111
we could do something like the final code block at https://developer20.com/add-header-to-every-request-in-go/ to configure the JWT token in the same place, which seems more local than splitting it between there and here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed this token from the config.

if err != nil {
goLogger.Errorw("failed to submit saturn logs", "err", err)
return
}
if resp.StatusCode != http.StatusOK {
goLogger.Errorw("saturn logging endpoint did not return 200", "status", resp.StatusCode)
}
}

func (l *logger) Close() {
Expand All @@ -71,17 +91,19 @@ type logBatch struct {
}

type log struct {
CacheHit bool `json:"cacheHit"`
URL string `json:"url"`
LocalTime time.Time `json:"localTime"`
NumBytesSent int `json:"numBytesSent"`
RequestDuration float64 `json:"requestDuration"` // in seconds
RequestID string `json:"requestId"`
HTTPStatusCode int `json:"httpStatusCode"`
HTTPProtocol string `json:"httpProtocol"`
TTFBMS int `json:"ttfbMs"`
ClientAddress string `json:"clientAddress"`
Range string `json:"range"`
Referrer string `json:"referrer"`
UserAgent string `json:"userAgent"`
CacheHit bool `json:"cacheHit"`
URL string `json:"url"`
StartTime time.Time `json:"startTime"`
NumBytesSent int `json:"numBytesSent"`
RequestDurationSec float64 `json:"requestDurationSec"` // in seconds
RequestID string `json:"requestId"`
HTTPStatusCode int `json:"httpStatusCode"`
HTTPProtocol string `json:"httpProtocol"`
TTFBMS int `json:"ttfbMs"`
Range string `json:"range"`
Referrer string `json:"referrer"`
UserAgent string `json:"userAgent"`
NodeId string `json:"nodeId"`
IfNetworkError string `json:"ifNetworkError"`
NodeIpAddress string `json:"nodeIpAddress"`
}
63 changes: 48 additions & 15 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,13 @@ func (p *pool) updateWeightUnlocked(node string, failure bool) (index int, membe

var saturnReqTmpl = "https://%s/ipfs/%s?format=raw"

var (
saturnNodeIdKey = "Saturn-Node-Id"
saturnTransferIdKey = "Saturn-Transfer-Id"
saturnCacheHitKey = "Saturn-Cache-Status"
saturnCacheHit = "HIT"
)

// doFetch attempts to fetch a block from a given Saturn endpoint. It sends the retrieval logs to the logging endpoint upon a successful or failed attempt.
func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int) (b blocks.Block, e error) {
requestId := uuid.NewString()
Expand All @@ -351,6 +358,13 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int)
proto := "unknown"
respReq := &http.Request{}
received := 0
reqUrl := ""
var respHeader http.Header
saturnNodeId := ""
saturnTransferId := ""
isCacheHit := false
networkError := ""

defer func() {
ttfbMs := fb.Sub(start).Milliseconds()
durationSecs := time.Since(start).Seconds()
Expand All @@ -363,28 +377,45 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int)
fetchSpeedMetric.Observe(float64(received) / durationSecs)
fetchSizeMetric.Observe(float64(received))
}

if respHeader != nil {
saturnNodeId = respHeader.Get(saturnNodeIdKey)
saturnTransferId = respHeader.Get(saturnTransferIdKey)

cacheHit := respHeader.Get(saturnCacheHitKey)
if cacheHit == saturnCacheHit {
isCacheHit = true
}

for k, v := range respHeader {
received = received + len(k) + len(v)
}
}

p.logger.queue <- log{
CacheHit: false,
URL: from,
LocalTime: start,
// TODO: does this include header sizes?
NumBytesSent: received,
RequestDuration: durationSecs,
RequestID: requestId,
HTTPStatusCode: code,
HTTPProtocol: proto,
TTFBMS: int(ttfbMs),
CacheHit: isCacheHit,
URL: reqUrl,
StartTime: start,
NumBytesSent: received,
RequestDurationSec: durationSecs,
RequestID: saturnTransferId,
HTTPStatusCode: code,
HTTPProtocol: proto,
TTFBMS: int(ttfbMs),
// my address
ClientAddress: "",
Range: "",
Referrer: respReq.Referer(),
UserAgent: respReq.UserAgent(),
Range: "",
Referrer: respReq.Referer(),
UserAgent: respReq.UserAgent(),
NodeId: saturnNodeId,
NodeIpAddress: from,
IfNetworkError: networkError,
}
}()

reqCtx, cancel := context.WithTimeout(ctx, DefaultSaturnRequestTimeout)
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, fmt.Sprintf(saturnReqTmpl, from, c), nil)
reqUrl = fmt.Sprintf(saturnReqTmpl, from, c)
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, reqUrl, nil)
if err != nil {
return nil, err
}
Expand All @@ -401,8 +432,10 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int)
resp, err := p.config.SaturnClient.Do(req)
fb = time.Now()
if err != nil {
networkError = err.Error()
return nil, fmt.Errorf("http request failed: %w", err)
}
respHeader = resp.Header
defer resp.Body.Close()

code = resp.StatusCode
Expand Down