Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement capnproto replication #7659

Merged
merged 19 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .bingo/Variables.mk
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ $(BINGO): $(BINGO_DIR)/bingo.mod
@echo "(re)installing $(GOBIN)/bingo-v0.9.0"
@cd $(BINGO_DIR) && GOWORK=off $(GO) build -mod=mod -modfile=bingo.mod -o=$(GOBIN)/bingo-v0.9.0 "github.com/bwplotka/bingo"

CAPNPC_GO := $(GOBIN)/capnpc-go-v3.0.1-alpha.2.0.20240830165715-46ccd63a72af
$(CAPNPC_GO): $(BINGO_DIR)/capnpc-go.mod
@# Install binary/ries using Go 1.14+ build command. This is using bwplotka/bingo-controlled, separate go module with pinned dependencies.
@echo "(re)installing $(GOBIN)/capnpc-go-v3.0.1-alpha.2.0.20240830165715-46ccd63a72af"
@cd $(BINGO_DIR) && GOWORK=off $(GO) build -mod=mod -modfile=capnpc-go.mod -o=$(GOBIN)/capnpc-go-v3.0.1-alpha.2.0.20240830165715-46ccd63a72af "capnproto.org/go/capnp/v3/capnpc-go"

FAILLINT := $(GOBIN)/faillint-v1.13.0
$(FAILLINT): $(BINGO_DIR)/faillint.mod
@# Install binary/ries using Go 1.14+ build command. This is using bwplotka/bingo-controlled, separate go module with pinned dependencies.
Expand Down
5 changes: 5 additions & 0 deletions .bingo/capnpc-go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module _ // Auto generated by https://github.com/bwplotka/bingo. DO NOT EDIT

go 1.23.1

require capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af // capnpc-go
6 changes: 6 additions & 0 deletions .bingo/capnpc-go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af h1:A5wxH0ZidOtYYUGjhtBaRuB87M73bGfc06uWB8sHpg0=
capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af/go.mod h1:2vT5D2dtG8sJGEoEKU17e+j7shdaYp1Myl8X03B3hmc=
github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381 h1:d5EKgQfRQvO97jnISfR89AiCCCJMwMFoSxUiU0OGCRU=
github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381/go.mod h1:OU76gHeRo8xrzGJU3F3I1CqX1ekM8dfJw0+wPeMwnp0=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
2 changes: 2 additions & 0 deletions .bingo/variables.env
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ ALERTMANAGER="${GOBIN}/alertmanager-v0.27.0"

BINGO="${GOBIN}/bingo-v0.9.0"

CAPNPC_GO="${GOBIN}/capnpc-go-v3.0.1-alpha.2.0.20240830165715-46ccd63a72af"

FAILLINT="${GOBIN}/faillint-v1.13.0"

GOIMPORTS="${GOBIN}/goimports-v0.23.0"
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7658](https://github.com/thanos-io/thanos/pull/7658) Store: Fix panic because too small buffer in pool.
- [#7643](https://github.com/thanos-io/thanos/pull/7643) Receive: fix thanos_receive_write_{timeseries,samples} stats
- [#7644](https://github.com/thanos-io/thanos/pull/7644) fix(ui): add null check to find overlapping blocks logic
- [#7814](https://github.com/thanos-io/thanos/pull/7814) Store: label_values: if matchers contain __name__=="something", do not add <labelName> != "" to fetch less postings.
- [#7814](https://github.com/thanos-io/thanos/pull/7814) Store: label_values: if matchers contain **name**=="something", do not add <labelname> != "" to fetch less postings.
- [#7679](https://github.com/thanos-io/thanos/pull/7679) Query: respect store.limit.* flags when evaluating queries
- [#7821](https://github.com/thanos-io/thanos/pull/7679) Query/Receive: Fix coroutine leak introduced in https://github.com/thanos-io/thanos/pull/7796.

Expand All @@ -29,6 +29,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7429](https://github.com/thanos-io/thanos/pull/7429): Reloader: introduce `TolerateEnvVarExpansionErrors` to allow suppressing errors when expanding environment variables in the configuration file. When set, this will ensure that the reloader won't consider the operation to fail when an unset environment variable is encountered. Note that all unset environment variables are left as is, whereas all set environment variables are expanded as usual.
- [#7560](https://github.com/thanos-io/thanos/pull/7560) Query: Added the possibility of filtering rules by rule_name, rule_group or file to HTTP api.
- [#7652](https://github.com/thanos-io/thanos/pull/7652) Store: Implement metadata API limit in stores.
- [#7659](https://github.com/thanos-io/thanos/pull/7659) Receive: Add support for replication using [Cap'n Proto](https://capnproto.org/). This protocol has a lower CPU and memory footprint, which leads to a reduction in resource usage in Receivers. Before enabling it, make sure that all receivers are updated to a version which supports this replication method.

### Changed

Expand Down
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,13 @@ proto: ## Generates Go files from Thanos proto files.
proto: check-git $(GOIMPORTS) $(PROTOC) $(PROTOC_GEN_GOGOFAST)
@GOIMPORTS_BIN="$(GOIMPORTS)" PROTOC_BIN="$(PROTOC)" PROTOC_GEN_GOGOFAST_BIN="$(PROTOC_GEN_GOGOFAST)" PROTOC_VERSION="$(PROTOC_VERSION)" scripts/genproto.sh

.PHONY: capnp
capnp: ## Generates Go files from Thanos capnproto files.
capnp: check-git
capnp compile -I $(shell go list -m -f '{{.Dir}}' capnproto.org/go/capnp/v3)/std -ogo pkg/receive/writecapnp/write_request.capnp
@$(GOIMPORTS) -w pkg/receive/writecapnp/write_request.capnp.go
go run ./scripts/copyright

.PHONY: tarballs-release
tarballs-release: ## Build tarballs.
tarballs-release: $(PROMU)
Expand Down
54 changes: 43 additions & 11 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package main

import (
"context"
"fmt"
"net"
"os"
"path"
"strings"
Expand Down Expand Up @@ -271,6 +273,7 @@ func runReceive(
Limiter: limiter,

AsyncForwardWorkerCount: conf.asyncForwardWorkerCount,
ReplicationProtocol: receive.ReplicationProtocol(conf.replicationProtocol),
})

grpcProbe := prober.NewGRPC()
Expand Down Expand Up @@ -465,6 +468,26 @@ func runReceive(
}
}

{
capNProtoWriter := receive.NewCapNProtoWriter(logger, dbs, &receive.CapNProtoWriterOptions{
TooFarInFutureTimeWindow: int64(time.Duration(*conf.tsdbTooFarInFutureTimeWindow)),
})
handler := receive.NewCapNProtoHandler(logger, capNProtoWriter)
listener, err := net.Listen("tcp", conf.replicationAddr)
if err != nil {
return err
}
server := receive.NewCapNProtoServer(listener, handler, logger)
g.Add(func() error {
return server.ListenAndServe()
}, func(err error) {
server.Shutdown()
if err := listener.Close(); err != nil {
level.Warn(logger).Log("msg", "Cap'n Proto server did not shut down gracefully", "err", err.Error())
}
})
}

level.Info(logger).Log("msg", "starting receiver")
return nil
}
Expand Down Expand Up @@ -795,6 +818,7 @@ type receiveConfig struct {

grpcConfig grpcConfig

replicationAddr string
rwAddress string
rwServerCert string
rwServerKey string
Expand All @@ -816,17 +840,18 @@ type receiveConfig struct {
hashringsFileContent string
hashringsAlgorithm string

refreshInterval *model.Duration
endpoint string
tenantHeader string
tenantField string
tenantLabelName string
defaultTenantID string
replicaHeader string
replicationFactor uint64
forwardTimeout *model.Duration
maxBackoff *model.Duration
compression string
refreshInterval *model.Duration
endpoint string
tenantHeader string
tenantField string
tenantLabelName string
defaultTenantID string
replicaHeader string
replicationFactor uint64
forwardTimeout *model.Duration
maxBackoff *model.Duration
compression string
replicationProtocol string

tsdbMinBlockDuration *model.Duration
tsdbMaxBlockDuration *model.Duration
Expand Down Expand Up @@ -929,6 +954,13 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64Var(&rc.replicationFactor)

replicationProtocols := []string{string(receive.ProtobufReplication), string(receive.CapNProtoReplication)}
cmd.Flag("receive.replication-protocol", "The protocol to use for replicating remote-write requests. One of "+strings.Join(replicationProtocols, ", ")).
Default(string(receive.ProtobufReplication)).
EnumVar(&rc.replicationProtocol, replicationProtocols...)

cmd.Flag("receive.capnproto-address", "Address for the Cap'n Proto server.").Default(fmt.Sprintf("0.0.0.0:%s", receive.DefaultCapNProtoPort)).StringVar(&rc.replicationAddr)

rc.forwardTimeout = extkingpin.ModelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden())

rc.maxBackoff = extkingpin.ModelDuration(cmd.Flag("receive-forward-max-backoff", "Maximum backoff for each forward fan-out request").Default("5s").Hidden())
Expand Down
27 changes: 26 additions & 1 deletion docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@ If you are using the `hashmod` algorithm and wish to migrate to `ketama`, the si

This algorithm uses a `hashmod` function over all labels to decide which receiver is responsible for a given timeseries. This is the default algorithm due to historical reasons. However, its usage for new Receive installations is discouraged since adding new Receiver nodes leads to series churn and memory usage spikes.

### Replication protocols

By default, Receivers replicate data using Protobuf over gRPC. Deserializing protobuf-encoded messages can be resource-intensive and cause significant GC pressure. Alternatively, you can use [Cap'N Proto](https://capnproto.org/) for replication encoding and as the RPC framework.

In order to enable this mode, you can use the `receive.replication-protocol=capnproto` option on the receiver. Thanos will try to infer the Cap'N Proto address of each peer in the hashring using the existing gRPC address. You can also explicitly set the Cap'N Proto as follows:

```json
[
{
"endpoints": [
{"address": "node-1:10901", "capnproto_address": "node-1:19391"},
{"address": "node-2:10901", "capnproto_address": "node-2:19391"},
{"address": "node-3:10901", "capnproto_address": "node-3:19391"}
]
}
]
```

### Hashring management and autoscaling in Kubernetes

The [Thanos Receive Controller](https://github.com/observatorium/thanos-receive-controller) project aims to automate hashring management when running Thanos in Kubernetes. In combination with the Ketama hashring algorithm, this controller can also be used to keep hashrings up to date when Receivers are scaled automatically using an HPA or [Keda](https://keda.sh/).
Expand Down Expand Up @@ -312,7 +330,8 @@ Please see the metric `thanos_receive_forward_delay_seconds` to see if you need

The following formula is used for calculating quorum:

```go mdox-exec="sed -n '999,1008p' pkg/receive/handler.go"
```go mdox-exec="sed -n '1012,1022p' pkg/receive/handler.go"
// writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success.
func (h *Handler) writeQuorum() int {
// NOTE(GiedriusS): this is here because otherwise RF=2 doesn't make sense as all writes
// would need to succeed all the time. Another way to think about it is when migrating
Expand Down Expand Up @@ -392,6 +411,8 @@ Flags:
Path to YAML file that contains object
store configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--receive.capnproto-address="0.0.0.0:19391"
Address for the Cap'n Proto server.
--receive.default-tenant-id="default-tenant"
Default tenant ID to use when none is provided
via a header.
Expand Down Expand Up @@ -438,6 +459,10 @@ Flags:
--receive.replication-factor=1
How many times to replicate incoming write
requests.
--receive.replication-protocol=protobuf
The protocol to use for replicating
remote-write requests. One of protobuf,
capnproto
--receive.split-tenant-label-name=""
Label name through which the request will
be split into multiple tenants. This takes
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ require (
)

require (
capnproto.org/go/capnp/v3 v3.0.1-alpha.1
github.com/cortexproject/promqlsmith v0.0.0-20240506042652-6cfdd9739a5e
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1
github.com/hashicorp/golang-lru/v2 v2.0.7
Expand All @@ -132,6 +133,7 @@ require (
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 // indirect
github.com/cilium/ebpf v0.11.0 // indirect
github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381 // indirect
github.com/containerd/cgroups/v3 v3.0.3 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/elastic/go-licenser v0.3.1 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
capnproto.org/go/capnp/v3 v3.0.1-alpha.1 h1:hYEclwXEKsnu+PdHASdx3nLP0fC9kZnR+x1CEvMp9ck=
capnproto.org/go/capnp/v3 v3.0.1-alpha.1/go.mod h1:B+ZjwFmHwTYv201x6CdIo7MmDC/TROJDa00kbjTnv1s=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU=
Expand Down Expand Up @@ -1496,6 +1498,8 @@ github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq
github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b h1:ga8SEFjZ60pxLcmhnThWgvH2wg8376yUJmPhEH4H3kw=
github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381 h1:d5EKgQfRQvO97jnISfR89AiCCCJMwMFoSxUiU0OGCRU=
github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381/go.mod h1:OU76gHeRo8xrzGJU3F3I1CqX1ekM8dfJw0+wPeMwnp0=
github.com/containerd/cgroups/v3 v3.0.3 h1:S5ByHZ/h9PMe5IOQoN7E+nMc2UcLEM/V48DGDJ9kip0=
github.com/containerd/cgroups/v3 v3.0.3/go.mod h1:8HBe7V3aWGLFPd/k03swSIsGjZhHI2WzJmticMgVuz0=
github.com/coreos/go-systemd/v22 v22.4.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
Expand Down Expand Up @@ -2094,6 +2098,8 @@ github.com/ovh/go-ovh v1.6.0 h1:ixLOwxQdzYDx296sXcgS35TOPEahJkpjMGtzPadCjQI=
github.com/ovh/go-ovh v1.6.0/go.mod h1:cTVDnl94z4tl8pP1uZ/8jlVxntjSIf09bNcQ5TJSC7c=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/philhofer/fwd v1.1.1 h1:GdGcTjf5RNAxwS4QLsiMzJYj5KEvPJD3Abr261yRQXQ=
github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY=
github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI=
github.com/phpdave11/gofpdi v1.0.13/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI=
Expand Down Expand Up @@ -2249,6 +2255,10 @@ github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31 h1:xPaP58g
github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY=
github.com/tinylib/msgp v1.1.5 h1:2gXmtWueD2HefZHQe1QOy9HVzmFrLOVvsXwXBQ0ayy0=
github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg=
github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk=
github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk=
github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyVklgXycLB6ek=
github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw=
github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk=
Expand Down
108 changes: 108 additions & 0 deletions pkg/receive/capnp_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package receive

import (
"context"
"net"

"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/rpc"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"

"github.com/thanos-io/thanos/pkg/receive/writecapnp"
"github.com/thanos-io/thanos/pkg/runutil"
)

type CapNProtoServer struct {
listener net.Listener
server writecapnp.Writer
logger log.Logger
}

func NewCapNProtoServer(listener net.Listener, handler *CapNProtoHandler, logger log.Logger) *CapNProtoServer {
return &CapNProtoServer{
listener: listener,
server: writecapnp.Writer_ServerToClient(handler),
logger: logger,
}
}

func (c *CapNProtoServer) ListenAndServe() error {
for {
conn, err := c.listener.Accept()
if err != nil {
return err
}

go func() {
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
defer runutil.CloseWithLogOnErr(c.logger, conn, "receive capnp conn")
rpcConn := rpc.NewConn(rpc.NewPackedStreamTransport(conn), &rpc.Options{
// The BootstrapClient is the RPC interface that will be made available
// to the remote endpoint by default.
BootstrapClient: capnp.Client(c.server).AddRef(),
})
<-rpcConn.Done()
}()
}
}

func (c *CapNProtoServer) Shutdown() {
c.server.Release()
}

type CapNProtoHandler struct {
writer *CapNProtoWriter
logger log.Logger
}

func NewCapNProtoHandler(logger log.Logger, writer *CapNProtoWriter) *CapNProtoHandler {
return &CapNProtoHandler{logger: logger, writer: writer}
}

func (c CapNProtoHandler) Write(ctx context.Context, call writecapnp.Writer_write) error {
call.Go()
wr, err := call.Args().Wr()
if err != nil {
return err
}
t, err := wr.Tenant()
if err != nil {
return err
}
req, err := writecapnp.NewRequest(wr)
if err != nil {
return err
}
defer req.Close()

var errs writeErrors
errs.Add(c.writer.Write(ctx, t, req))
if err := errs.ErrOrNil(); err != nil {
level.Debug(c.logger).Log("msg", "failed to handle request", "err", err)
result, allocErr := call.AllocResults()
if allocErr != nil {
return allocErr
}

switch errors.Cause(err) {
case nil:
return nil
case errNotReady:
result.SetError(writecapnp.WriteError_unavailable)
case errUnavailable:
result.SetError(writecapnp.WriteError_unavailable)
case errConflict:
result.SetError(writecapnp.WriteError_alreadyExists)
case errBadReplica:
result.SetError(writecapnp.WriteError_invalidArgument)
default:
result.SetError(writecapnp.WriteError_internal)
}
}

return nil
}
Loading
Loading