-
Notifications
You must be signed in to change notification settings - Fork 0
/
sub_url_source.go
62 lines (54 loc) · 1.47 KB
/
sub_url_source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package main
import (
"context"
"github.com/ServiceWeaver/weaver"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/sourcegraph/conc/stream"
"raycat/internal/pkg/bytesEx"
"raycat/internal/pkg/fetcher"
"raycat/internal/pkg/tinypool"
"time"
)
var _ subURLSourceProvider = (*subURLSource)(nil)
var (
urlSubLru = expirable.NewLRU[string, []byte](20, nil, time.Hour*4)
urlBuf = tinypool.New(tinypool.BufReset)
)
type subURLSourceProvider interface {
UpdateUrlSub(ctx context.Context, urlSubs []string, fetchTimeoutSeconds int) ([]byte, error)
}
type subURLSource struct {
weaver.Implements[subURLSourceProvider]
}
func (s *subURLSource) UpdateUrlSub(ctx context.Context, urlSubs []string, fetchTimeoutSeconds int) ([]byte, error) {
buf := urlBuf.Get()
defer urlBuf.Free(buf)
st := stream.New().WithMaxGoroutines(len(urlSubs))
for _, sub := range urlSubs {
sub := sub
if subContent, found := urlSubLru.Get(sub); found {
buf.Write(subContent)
continue
}
st.Go(func() stream.Callback {
client := fetcher.NewClient(fetchTimeoutSeconds)
content, err := client.Fetch(sub)
if err != nil {
s.Logger(ctx).Error("failed to fetch url sub from source", "url", sub, "error", err)
return func() {
urlSubLru.Remove(sub)
}
}
if !bytesEx.IsLastByteNewline(content) {
content = append(content, byte('\n'))
}
return func() {
urlSubLru.Add(sub, content)
buf.Write(content)
}
})
}
st.Wait()
b := buf.Bytes()
return b, nil
}