Skip to content

Commit

Permalink
zerome: retry until nothing is found (#10)
Browse files Browse the repository at this point in the history
* zerome: retry until nothing is found

Signed-off-by: Seena Fallah <[email protected]>

* ci: update golangci-lint to 1.59.1

Signed-off-by: Seena Fallah <[email protected]>

---------

Signed-off-by: Seena Fallah <[email protected]>
  • Loading branch information
clwluvw authored Jul 4, 2024
1 parent 994e673 commit 22cbe6f
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ jobs:
- name: Golangci-lint
uses: golangci/golangci-lint-action@v4
with:
version: v1.59.0
version: v1.59.1
args: --timeout 3m
skip-pkg-cache: true
36 changes: 24 additions & 12 deletions internal/zerome/zerome.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,42 @@ func New(metrics []Metric) *Client {
}
}

func (c *Client) Run(ctx context.Context, wg *sync.WaitGroup) {
func (c *Client) Run(ctx context.Context, outerWG *sync.WaitGroup) {
for _, metric := range c.metrics {
go func(metric Metric, wg *sync.WaitGroup) {
defer wg.Done()
go func(metric Metric, outerWG *sync.WaitGroup) {
defer outerWG.Done()

var wg sync.WaitGroup

for {
select {
case <-ctx.Done():
wg.Wait()

return
case <-time.After(metric.Interval):
err := c.ZeroMe(ctx, metric)
if err != nil {
slog.ErrorContext(ctx, "Failed to zero metric", "metric", metric.Name, "error", err)
}
wg.Add(1)

go func() {
defer wg.Done()

err := c.ZeroMe(ctx, time.Now(), metric)
if err != nil {
slog.ErrorContext(ctx, "Failed to zero metric", "metric", metric.Name, "error", err)
}
}()
}
}
}(metric, wg)
}(metric, outerWG)
}
}

func (c *Client) ZeroMe(ctx context.Context, metric Metric) error {
func (c *Client) ZeroMe(ctx context.Context, nowTime time.Time, metric Metric) error {
// Query twice the interval to ensure that the metric has a missing data point in the past.
queryInterval := metric.Interval * 2 //nolint:gomnd,mnd

// Add query interval as a delay to cover exporter scrape failures.
ts := time.Now().Add(-queryInterval)
ts := nowTime.Add(-queryInterval)

vector, err := metric.querier.Query(ctx, ts, metric.Name, queryInterval, metric.UpLabels)
if err != nil {
Expand All @@ -67,9 +77,11 @@ func (c *Client) ZeroMe(ctx context.Context, metric Metric) error {
return err
}

slog.InfoContext(ctx, "Zeroed metric", "metric", metric.Name, "vector", vector)
for _, s := range vector {
slog.InfoContext(ctx, "Zeroed sample", "metric", metric.Name, "sample", s.String())
}

return nil
return c.ZeroMe(ctx, nowTime, metric) // Retry until nothing to zero
}

func (c *Client) zeroTimeSeries(metric Metric, vector model.Vector) []prompb.TimeSeries {
Expand Down
17 changes: 12 additions & 5 deletions internal/zerome/zerome_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,14 @@ func TestZeroMe(t *testing.T) {
// setup querier mock
mockQuerier := mock.NewMockAPI(ctrl)
querier.SetV1API(mockQuerier)
mockQuerier.EXPECT().Query(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(queryResult, nil, nil)
mockQuerier.EXPECT().Query(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_, _, _ any, _ ...any) (model.Vector, *v1.Warnings, error) {
r := queryResult
queryResult = model.Vector{} // return empty result on next call so it will exit the loop

return r, nil, nil
},
).Times(2)

// setup writer mock
{
Expand All @@ -91,7 +98,7 @@ func TestZeroMe(t *testing.T) {

client := New([]Metric{metric})

err := client.ZeroMe(context.Background(), metric)
err := client.ZeroMe(context.Background(), nowTime, metric)
require.NoError(t, err)
}

Expand Down Expand Up @@ -126,7 +133,7 @@ func TestZeroMe_QueryError(t *testing.T) {

client := New([]Metric{metric})

err := client.ZeroMe(context.Background(), metric)
err := client.ZeroMe(context.Background(), time.Now(), metric)
require.ErrorIs(t, err, queryErr)
}

Expand Down Expand Up @@ -160,7 +167,7 @@ func TestZeroMe_EmptyResult(t *testing.T) {

client := New([]Metric{metric})

err := client.ZeroMe(context.Background(), metric)
err := client.ZeroMe(context.Background(), time.Now(), metric)
require.NoError(t, err)
}

Expand Down Expand Up @@ -207,7 +214,7 @@ func TestZeroMe_WriteError(t *testing.T) {

client := New([]Metric{metric})

err := client.ZeroMe(context.Background(), metric)
err := client.ZeroMe(context.Background(), nowTime, metric)
require.ErrorIs(t, err, writeErr)
}

Expand Down

0 comments on commit 22cbe6f

Please sign in to comment.