diff --git a/README.md b/README.md index 4f9ab31..3cd559c 100644 --- a/README.md +++ b/README.md @@ -79,9 +79,9 @@ func main() { ```go ziggurat.Ziggurat{ -Logger StructuredLogger // a logger implementation of ziggurat.StructuredLogger -WaitTimeout time.Duration // wait timeout when consumers are shutdown -ErrorHandler func(err error) // a notifier for when one of the message consumers is shutdown abruptly + Logger StructuredLogger // a logger implementation of ziggurat.StructuredLogger + WaitTimeout time.Duration // wait timeout when consumers are shutdown + ErrorHandler func(err error) // a notifier for when one of the message consumers is shutdown abruptly } ``` @@ -99,7 +99,7 @@ h := ziggurat.HandlerFunc(func (context.Context, *ziggurat.Event) error {...}) groupOne := kafka.ConsumerGroup{...} groupTwo := kafka.ConsumerGroup{...} if runErr := zig.Run(ctx, h, &groupOne, &groupTwo); runErr != nil { -logger.Error("error running consumers", runErr) + logger.Error("error running consumers", runErr) } ``` @@ -112,7 +112,7 @@ application's business logic ```go type Handler interface { -Handle(ctx context.Context, event *Event) error + Handle(ctx context.Context, event *Event) error } type HandlerFunc func (ctx context.Context, event *Event) error // serves as an adapter for normal functions to be used as handlers ``` @@ -129,13 +129,13 @@ not be modified by handlers as it is not thread safe. The struct can be cloned a ```go ziggurat.Event{ -Metadata map[string]any `json:"meta"` // metadata is a generic map for storing event related info -Value []byte `json:"value"` // a byte slice value which contains the actual message -Key []byte `json:"key"` // a byte slice value which contains the actual key -RoutingPath string `json:"routing_path"` // an arbitrary string set by the message consumer implementation -ProducerTimestamp time.Time `json:"producer_timestamp"` // the producer timestamp set by the message consumer implementation -ReceivedTimestamp time.Time `json:"received_timestamp"` // the timestamp at which the message was ingested by the system, this is also set by the message consumer implementation -EventType string `json:"event_type"` // the type of event, ex:= kafka,rabbitmq, this is also set by the message consumer implementation + Metadata map[string]any `json:"meta"` // metadata is a generic map for storing event related info + Value []byte `json:"value"` // a byte slice value which contains the actual message + Key []byte `json:"key"` // a byte slice value which contains the actual key + RoutingPath string `json:"routing_path"` // an arbitrary string set by the message consumer implementation + ProducerTimestamp time.Time `json:"producer_timestamp"` // the producer timestamp set by the message consumer implementation + ReceivedTimestamp time.Time `json:"received_timestamp"` // the timestamp at which the message was ingested by the system, this is also set by the message consumer implementation + EventType string `json:"event_type"` // the type of event, ex:= kafka,rabbitmq, this is also set by the message consumer implementation } ``` @@ -147,7 +147,7 @@ the ziggurat.Run method for orchestration. ```go type MessageConsumer interface { -Consume(ctx context.Context, handler Handler) error + Consume(ctx context.Context, handler Handler) error } ``` @@ -183,17 +183,17 @@ func (nc *NumberConsumer) Consume(ctx context.Context, h Handler) error { ```go type ConsumerConfig struct { -BootstrapServers string // A required comma separated list of broker addresses -DebugLevel string // generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, mock, assignor, conf, all -GroupID string // A required string -Topics []string // A required non-empty list of topics to consume from -AutoCommitInterval int // A commit Interval time in milliseconds -ConsumerCount int // Number of concurrent consumer instances to consume from Kafka -PollTimeout int // Kafka Poll timeout in milliseconds -RouteGroup string // An optional route group to use for routing purposes -AutoOffsetReset string // earliest or latest -PartitionAssignment string // refer partition.assignment.strategy https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md -MaxPollIntervalMS int // Kafka Failure detection interval in milliseconds + BootstrapServers string // A required comma separated list of broker addresses + DebugLevel string // generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, mock, assignor, conf, all + GroupID string // A required string + Topics []string // A required non-empty list of topics to consume from + AutoCommitInterval int // A commit Interval time in milliseconds + ConsumerCount int // Number of concurrent consumer instances to consume from Kafka + PollTimeout int // Kafka Poll timeout in milliseconds + RouteGroup string // An optional route group to use for routing purposes + AutoOffsetReset string // earliest or latest + PartitionAssignment string // refer partition.assignment.strategy https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md + MaxPollIntervalMS int // Kafka Failure detection interval in milliseconds } ```