Skip to content

Commit

Permalink
feat(kafka): Add Ingestion from Kafka in Ingesters (#14192)
Browse files Browse the repository at this point in the history
Co-authored-by: Joao Marcal <[email protected]>
Co-authored-by: benclive <[email protected]>
  • Loading branch information
3 people authored Sep 24, 2024
1 parent de24945 commit b6e9945
Show file tree
Hide file tree
Showing 21 changed files with 782 additions and 168 deletions.
17 changes: 17 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,12 @@ kafka_config:
# CLI flag: -kafka.consumer-group
[consumer_group: <string> | default = ""]

# How frequently a consumer should commit the consumed offset to Kafka. The
# last committed offset is used at startup to continue the consumption from
# where it was left.
# CLI flag: -kafka.consumer-group-offset-commit-interval
[consumer_group_offset_commit_interval: <duration> | default = 1s]

# How long to retry a failed request to get the last produced offset.
# CLI flag: -kafka.last-produced-offset-retry-timeout
[last_produced_offset_retry_timeout: <duration> | default = 10s]
Expand All @@ -809,6 +815,17 @@ kafka_config:
# CLI flag: -kafka.auto-create-topic-enabled
[auto_create_topic_enabled: <boolean> | default = true]

# When auto-creation of Kafka topic is enabled and this value is positive,
# Kafka's num.partitions configuration option is set on Kafka brokers with
# this value when Loki component that uses Kafka starts. This configuration
# option specifies the default number of partitions that the Kafka broker uses
# for auto-created topics. Note that this is a Kafka-cluster wide setting, and
# applies to any auto-created topic. If the setting of num.partitions fails,
# Loki proceeds anyways, but auto-created topics could have an incorrect
# number of partitions.
# CLI flag: -kafka.auto-create-topic-default-partitions
[auto_create_topic_default_partitions: <int> | default = 1000]

# The maximum size of a Kafka record data that should be generated by the
# producer. An incoming write request larger than this size is split into
# multiple Kafka records. We strongly recommend to not change this setting
Expand Down
104 changes: 104 additions & 0 deletions pkg/ingester/downscale.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package ingester

import (
"net/http"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"

"github.com/grafana/loki/v3/pkg/util"
)

// PreparePartitionDownscaleHandler prepares the ingester's partition downscaling. The partition owned by the
// ingester will switch to INACTIVE state (read-only).
//
// Following methods are supported:
//
// - GET
// Returns timestamp when partition was switched to INACTIVE state, or 0, if partition is not in INACTIVE state.
//
// - POST
// Switches the partition to INACTIVE state (if not yet), and returns the timestamp when the switch to
// INACTIVE state happened.
//
// - DELETE
// Sets partition back from INACTIVE to ACTIVE state, and returns 0 signalling the partition is not in INACTIVE state
func (i *Ingester) PreparePartitionDownscaleHandler(w http.ResponseWriter, r *http.Request) {
logger := log.With(i.logger, "partition", i.ingestPartitionID)

// Don't allow callers to change the shutdown configuration while we're in the middle
// of starting or shutting down.
if i.State() != services.Running {
w.WriteHeader(http.StatusServiceUnavailable)
return
}

if !i.cfg.KafkaIngestion.Enabled {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

switch r.Method {
case http.MethodPost:
// It's not allowed to prepare the downscale while in PENDING state. Why? Because if the downscale
// will be later cancelled, we don't know if it was requested in PENDING or ACTIVE state, so we
// don't know to which state reverting back. Given a partition is expected to stay in PENDING state
// for a short period, we simply don't allow this case.
state, _, err := i.partitionRingLifecycler.GetPartitionState(r.Context())
if err != nil {
level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

if state == ring.PartitionPending {
level.Warn(logger).Log("msg", "received a request to prepare partition for shutdown, but the request can't be satisfied because the partition is in PENDING state")
w.WriteHeader(http.StatusConflict)
return
}

if err := i.partitionRingLifecycler.ChangePartitionState(r.Context(), ring.PartitionInactive); err != nil {
level.Error(logger).Log("msg", "failed to change partition state to inactive", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

case http.MethodDelete:
state, _, err := i.partitionRingLifecycler.GetPartitionState(r.Context())
if err != nil {
level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

// If partition is inactive, make it active. We ignore other states Active and especially Pending.
if state == ring.PartitionInactive {
// We don't switch it back to PENDING state if there are not enough owners because we want to guarantee consistency
// in the read path. If the partition is within the lookback period we need to guarantee that partition will be queried.
// Moving back to PENDING will cause us loosing consistency, because PENDING partitions are not queried by design.
// We could move back to PENDING if there are not enough owners and the partition moved to INACTIVE more than
// "lookback period" ago, but since we delete inactive partitions with no owners that moved to inactive since longer
// than "lookback period" ago, it looks to be an edge case not worth to address.
if err := i.partitionRingLifecycler.ChangePartitionState(r.Context(), ring.PartitionActive); err != nil {
level.Error(logger).Log("msg", "failed to change partition state to active", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}

state, stateTimestamp, err := i.partitionRingLifecycler.GetPartitionState(r.Context())
if err != nil {
level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

if state == ring.PartitionInactive {
util.WriteJSONResponse(w, map[string]any{"timestamp": stateTimestamp.Unix()})
} else {
util.WriteJSONResponse(w, map[string]any{"timestamp": 0})
}
}
106 changes: 94 additions & 12 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/modules"
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/ring"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/grafana/loki/v3/pkg/ingester/index"
"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/v3/pkg/kafka/partitionring"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
Expand Down Expand Up @@ -225,6 +227,7 @@ type Interface interface {
GetOrCreateInstance(instanceID string) (*instance, error)
ShutdownHandler(w http.ResponseWriter, r *http.Request)
PrepareShutdown(w http.ResponseWriter, r *http.Request)
PreparePartitionDownscaleHandler(w http.ResponseWriter, r *http.Request)
}

// Ingester builds chunks for incoming log streams.
Expand Down Expand Up @@ -290,6 +293,10 @@ type Ingester struct {
// recalculateOwnedStreams periodically checks the ring for changes and recalculates owned streams for each instance.
readRing ring.ReadRing
recalculateOwnedStreams *recalculateOwnedStreams

ingestPartitionID int32
partitionRingLifecycler *ring.PartitionInstanceLifecycler
partitionReader *partition.Reader
}

// New makes a new Ingester.
Expand Down Expand Up @@ -353,6 +360,34 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
i.lifecyclerWatcher = services.NewFailureWatcher()
i.lifecyclerWatcher.WatchService(i.lifecycler)

if i.cfg.KafkaIngestion.Enabled {
i.ingestPartitionID, err = partitionring.ExtractIngesterPartitionID(cfg.LifecyclerConfig.ID)
if err != nil {
return nil, fmt.Errorf("calculating ingester partition ID: %w", err)
}
partitionRingKV := cfg.KafkaIngestion.PartitionRingConfig.KVStore.Mock
if partitionRingKV == nil {
partitionRingKV, err = kv.NewClient(cfg.KafkaIngestion.PartitionRingConfig.KVStore, ring.GetPartitionRingCodec(), kv.RegistererWithKVName(registerer, PartitionRingName+"-lifecycler"), logger)
if err != nil {
return nil, fmt.Errorf("creating KV store for ingester partition ring: %w", err)
}
}
i.partitionRingLifecycler = ring.NewPartitionInstanceLifecycler(
i.cfg.KafkaIngestion.PartitionRingConfig.ToLifecyclerConfig(i.ingestPartitionID, cfg.LifecyclerConfig.ID),
PartitionRingName,
PartitionRingKey,
partitionRingKV,
logger,
prometheus.WrapRegistererWithPrefix("loki_", registerer))

i.partitionReader, err = partition.NewReader(cfg.KafkaIngestion.KafkaConfig, i.ingestPartitionID, cfg.LifecyclerConfig.ID, NewKafkaConsumerFactory(i, logger, registerer), logger, registerer)
if err != nil {
return nil, err
}
i.lifecyclerWatcher.WatchService(i.partitionRingLifecycler)
i.lifecyclerWatcher.WatchService(i.partitionReader)
}

// Now that the lifecycler has been created, we can create the limiter
// which depends on it.
i.limiter = NewLimiter(limits, metrics, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor)
Expand Down Expand Up @@ -461,7 +496,15 @@ func (i *Ingester) setupAutoForget() {
}()
}

func (i *Ingester) starting(ctx context.Context) error {
func (i *Ingester) starting(ctx context.Context) (err error) {
defer func() {
if err != nil {
// if starting() fails for any reason (e.g., context canceled),
// the lifecycler must be stopped.
_ = services.StopAndAwaitTerminated(context.Background(), i.lifecycler)
}
}()

if i.cfg.WAL.Enabled {
start := time.Now()

Expand Down Expand Up @@ -546,17 +589,6 @@ func (i *Ingester) starting(ctx context.Context) error {

i.InitFlushQueues()

// pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done
err := i.lifecycler.StartAsync(context.Background())
if err != nil {
return err
}

err = i.lifecycler.AwaitRunning(ctx)
if err != nil {
return err
}

shutdownMarkerPath := path.Join(i.cfg.ShutdownMarkerPath, shutdownMarkerFilename)
shutdownMarker, err := shutdownMarkerExists(shutdownMarkerPath)
if err != nil {
Expand All @@ -568,6 +600,26 @@ func (i *Ingester) starting(ctx context.Context) error {
i.setPrepareShutdown()
}

// When kafka ingestion is enabled, we have to make sure that reader catches up replaying the partition
// BEFORE the ingester ring lifecycler is started, because once the ingester ring lifecycler will start
// it will switch the ingester state in the ring to ACTIVE.
if i.partitionReader != nil {
if err := services.StartAndAwaitRunning(ctx, i.partitionReader); err != nil {
return fmt.Errorf("failed to start partition reader: %w", err)
}
}

// pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done
err = i.lifecycler.StartAsync(context.Background())
if err != nil {
return err
}

err = i.lifecycler.AwaitRunning(ctx)
if err != nil {
return err
}

err = i.recalculateOwnedStreams.StartAsync(ctx)
if err != nil {
return fmt.Errorf("can not start recalculate owned streams service: %w", err)
Expand All @@ -578,6 +630,11 @@ func (i *Ingester) starting(ctx context.Context) error {
return fmt.Errorf("can not ensure recalculate owned streams service is running: %w", err)
}

if i.partitionRingLifecycler != nil {
if err := services.StartAndAwaitRunning(ctx, i.partitionRingLifecycler); err != nil {
return fmt.Errorf("failed to start partition ring lifecycler: %w", err)
}
}
// start our loop
i.loopDone.Add(1)
go i.loop()
Expand Down Expand Up @@ -610,6 +667,19 @@ func (i *Ingester) running(ctx context.Context) error {
// At this point, loop no longer runs, but flushers are still running.
func (i *Ingester) stopping(_ error) error {
i.stopIncomingRequests()

if i.partitionReader != nil {
if err := services.StopAndAwaitTerminated(context.Background(), i.partitionReader); err != nil {
level.Warn(i.logger).Log("msg", "failed to stop partition reader", "err", err)
}
}

if i.partitionRingLifecycler != nil {
if err := services.StopAndAwaitTerminated(context.Background(), i.partitionRingLifecycler); err != nil {
level.Warn(i.logger).Log("msg", "failed to stop partition ring lifecycler", "err", err)
}
}

var errs util.MultiError
errs.Add(i.wal.Stop())

Expand Down Expand Up @@ -766,6 +836,18 @@ func (i *Ingester) setPrepareShutdown() {
i.lifecycler.SetUnregisterOnShutdown(true)
i.terminateOnShutdown = true
i.metrics.shutdownMarker.Set(1)

if i.partitionRingLifecycler != nil {
// When the prepare shutdown endpoint is called there are two changes in the partitions ring behavior:
//
// 1. If setPrepareShutdown() is called at startup, because of the shutdown marker found on disk,
// the ingester shouldn't create the partition if doesn't exist, because we expect the ingester will
// be scaled down shortly after.
// 2. When the ingester will shutdown we'll have to remove the ingester from the partition owners,
// because we expect the ingester to be scaled down.
i.partitionRingLifecycler.SetCreatePartitionOnStartup(false)
i.partitionRingLifecycler.SetRemoveOwnerOnShutdown(true)
}
}

func (i *Ingester) unsetPrepareShutdown() {
Expand Down
Loading

0 comments on commit b6e9945

Please sign in to comment.