Skip to content

Commit

Permalink
updates interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
Shubhang Balkundi committed Mar 26, 2024
1 parent 70faddc commit ac8f71d
Show file tree
Hide file tree
Showing 17 changed files with 142 additions and 119 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres
to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [v2.0.5] 2024-03-25
## [v2.0.6] 2024-03-25

# Removed

Expand All @@ -27,6 +27,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
- ziggurat.RunAll has been removed
- ziggurat.StartFunc and StopFunc have been removed
- ziggurat.Event struct has been cleaned up
- ziggurat.Handler does not return an error anymore

## [v1.7.4] 2023-11-01

Expand Down
129 changes: 96 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ go install github.com/gojekfarm/ziggurat/v2/cmd/ziggurat@latest
```

### I already have an application and I just want to use Ziggurat Go without scaffolding a new app

#### Run the following command in your go application directory

```shell
go get github.com/gojekfarm/ziggurat/v2
```
Expand Down Expand Up @@ -72,85 +74,145 @@ func main() {

}
```

### Configuring the `Ziggurat` struct

```go
ziggurat.Ziggurat{
Logger StructuredLogger // a logger implementation of ziggurat.StructuredLogger
WaitTimeout time.Duration // wait timeout when consumers are shutdown
ErrorHandler func(err error) // a notifier for when one of the message consumers is shutdown abruptly
Logger StructuredLogger // a logger implementation of ziggurat.StructuredLogger
WaitTimeout time.Duration // wait timeout when consumers are shutdown
ErrorHandler func(err error) // a notifier for when one of the message consumers is shutdown abruptly
}
```

> [!NOTE]
> The zero value of `ziggurat.Ziggurat` is perfectly valid and can be used without any issues
### Ziggurat Run method
The `ziggurat.Run` method is used to start the consumer orchestration. It takes in a `context.Context` implementation, a `ziggurat.Handler` and a variable number of message consumer implementations.

The `ziggurat.Run` method is used to start the consumer orchestration. It takes in a `context.Context` implementation,
a `ziggurat.Handler` and a variable number of message consumer implementations.

```go
ctx := context.Background()
h := ziggurat.HandlerFunc(func(context.Context,*ziggurat.Event) error {...})
h := ziggurat.HandlerFunc(func (context.Context, *ziggurat.Event) error {...})
groupOne := kafka.ConsumerGroup{...}
groupTwo := kafka.ConsumerGroup{...}
if runErr := zig.Run(ctx, h, &groupOne,&groupTwo); runErr != nil {
logger.Error("error running consumers", runErr)
if runErr := zig.Run(ctx, h, &groupOne, &groupTwo); runErr != nil {
logger.Error("error running consumers", runErr)
}
```

### Ziggurat Handler interface
The `ziggurat.Handler` is an interface for handling ziggurat events, an event is just something that happens in a finite timeframe. This event can come from
any source (kafka,redis,rabbitmq). The handler's job is to handle the event, i.e... the handler contains your application's business logic

The `ziggurat.Handler` is an interface for handling ziggurat events, an event is just something that happens in a finite
timeframe. This event can come from
any source (kafka,redis,rabbitmq). The handler's job is to handle the event, i.e... the handler contains your
application's business logic

```go
type Handler interface {
Handle(ctx context.Context, event *Event) error
Handle(ctx context.Context, event *Event) error
}
type HandlerFunc func(ctx context.Context, event *Event) error // serves as an adapter for normal functions to be used as handlers
type HandlerFunc func (ctx context.Context, event *Event) error // serves as an adapter for normal functions to be used as handlers
```
> Any function / struct which implements the above handler interface can be used in the ziggurat.Run method. The ziggurat.Router also implements the above interface.

> Any function / struct which implements the above handler interface can be used in the ziggurat.Run method. The
> ziggurat.Router also implements the above interface.
### Ziggurat Event struct
The `ziggurat.Event` struct is a generic event struct that is passed to the handler. This is a pointer value and should not be modified by handlers as it is not thread safe. The struct can be cloned and modified.

The `ziggurat.Event` struct is a generic event struct that is passed to the handler. This is a pointer value and should
not be modified by handlers as it is not thread safe. The struct can be cloned and modified.

#### Description

```go
ziggurat.Event{
Metadata map[string]any `json:"meta"` // metadata is a generic map for storing event related info
Value []byte `json:"value"` // a byte slice value which contains the actual message
Key []byte `json:"key"` // a byte slice value which contains the actual key
RoutingPath string `json:"routing_path"` // an arbitrary string set by the message consumer implementation
ProducerTimestamp time.Time `json:"producer_timestamp"` // the producer timestamp set by the message consumer implementation
ReceivedTimestamp time.Time `json:"received_timestamp"` // the timestamp at which the message was ingested by the system, this is also set by the message consumer implementation
EventType string `json:"event_type"` // the type of event, ex:= kafka,rabbitmq, this is also set by the message consumer implementation
Metadata map[string]any `json:"meta"` // metadata is a generic map for storing event related info
Value []byte `json:"value"` // a byte slice value which contains the actual message
Key []byte `json:"key"` // a byte slice value which contains the actual key
RoutingPath string `json:"routing_path"` // an arbitrary string set by the message consumer implementation
ProducerTimestamp time.Time `json:"producer_timestamp"` // the producer timestamp set by the message consumer implementation
ReceivedTimestamp time.Time `json:"received_timestamp"` // the timestamp at which the message was ingested by the system, this is also set by the message consumer implementation
EventType string `json:"event_type"` // the type of event, ex:= kafka,rabbitmq, this is also set by the message consumer implementation
}

```

### ziggurat.MessageConsumer interface

The `ziggurat.MessageConsumer` interface is the interface used for implementing message consumers which can be passed to
the ziggurat.Run method for orchestration.

```go
type MessageConsumer interface {
Consume(ctx context.Context, handler Handler) error
}
```

The `kafka.ConsumerGroup` and `rabbitmq.AutoRetry` implement the above interface.

A sample implementation which consumes infinite numbers
```go
type NumberConsumer struct {
counter int
}

func (nc *NumberConsumer) Consume(ctx context.Context, h Handler) error {
var i int64
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
time.Sleep(1000 * time.Millisecond)
e := &Event{
Value: strconv.AppendInt(make([]byte, 8), i, 10),
Key: strconv.AppendInt(make([]byte, 8), i, 10),
RoutingPath: "numpath",
EventType: "numbergen",}
h.Handle(ctx, e)
}
}
}
```


## Kafka consumer configuration

```go
type ConsumerConfig struct {
BootstrapServers string // A required comma separated list of broker addresses
DebugLevel string // generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, mock, assignor, conf, all
GroupID string // A required string
Topics []string // A required non-empty list of topics to consume from
AutoCommitInterval int // A commit Interval time in milliseconds
ConsumerCount int // Number of concurrent consumer instances to consume from Kafka
PollTimeout int // Kafka Poll timeout in milliseconds
RouteGroup string // An optional route group to use for routing purposes
AutoOffsetReset string // earliest or latest
PartitionAssignment string // refer partition.assignment.strategy https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
MaxPollIntervalMS int // Kafka Failure detection interval in milliseconds
BootstrapServers string // A required comma separated list of broker addresses
DebugLevel string // generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, mock, assignor, conf, all
GroupID string // A required string
Topics []string // A required non-empty list of topics to consume from
AutoCommitInterval int // A commit Interval time in milliseconds
ConsumerCount int // Number of concurrent consumer instances to consume from Kafka
PollTimeout int // Kafka Poll timeout in milliseconds
RouteGroup string // An optional route group to use for routing purposes
AutoOffsetReset string // earliest or latest
PartitionAssignment string // refer partition.assignment.strategy https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
MaxPollIntervalMS int // Kafka Failure detection interval in milliseconds
}
```

> For low level details please check For full details please check
https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md

## How to use the router

A router enables you to handle complex routing problems by defining handler functions for predefined regex paths.

### A practical example

I am consuming from the kafka topic `mobile-application-logs` which has 12 partitions. All the even partitions contain logs for android devices and all the odd partitions contain logs for iOS devices. I want to execute different logic for logs from different platforms.
I am consuming from the kafka topic `mobile-application-logs` which has 12 partitions. All the even partitions contain
logs for android devices and all the odd partitions contain logs for iOS devices. I want to execute different logic for
logs from different platforms.

The `ziggurat.Event` struct contains a field called `RoutingPath` this field is set by the `MessageConsumer`
implementation, the Kafka implementation uses the following format

The `ziggurat.Event` struct contains a field called `RoutingPath` this field is set by the `MessageConsumer` implementation, the Kafka implementation uses the following format
```text
<consumer_group_id>/<topic_name>/<partition>
ex: mobile_app_log_consumer/mobile-application-logs/1
Expand All @@ -163,4 +225,5 @@ router.HandlerFunc("mobile_app_log_consumer/mobile-application-logs/(1|3|5|7|9|1
// to execute logic for Android logs I would use this
router.HandlerFunc("mobile_app_log_consumer/mobile-application-logs/(2|4|6|8|10|12)", func (ctx, *ziggurat.Event) error {....})
```

Based on how the routing path is set by the message consumer implementation, you can define your routing paths.
9 changes: 0 additions & 9 deletions error.go

This file was deleted.

4 changes: 2 additions & 2 deletions example/sampleapp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func main() {
},
}

router.HandlerFunc("foo.id/*", func(ctx context.Context, event *ziggurat.Event) error {
return nil
router.HandlerFunc("foo.id/*", func(ctx context.Context, event *ziggurat.Event) {

})

h := ziggurat.Use(router)
Expand Down
8 changes: 4 additions & 4 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import "context"
// HandlerFunc serves as an adapter to convert
// regular functions of the signature f(context.Context,ziggurat.Event)
// to implement the ziggurat.Handler interface
type HandlerFunc func(ctx context.Context, event *Event) error
type HandlerFunc func(ctx context.Context, event *Event)

func (h HandlerFunc) Handle(ctx context.Context, event *Event) error {
return h(ctx, event)
func (h HandlerFunc) Handle(ctx context.Context, event *Event) {
h(ctx, event)
}

// Handler is an interface which can be implemented
// to handle messages produced by a stream source
// the router package implements this interface
type Handler interface {
Handle(ctx context.Context, event *Event) error
Handle(ctx context.Context, event *Event)
}
3 changes: 1 addition & 2 deletions kafka/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ func TestWorkerOrchestration(t *testing.T) {
var msgCount int32
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
err := cg.Consume(ctx, ziggurat.HandlerFunc(func(ctx context.Context, event *ziggurat.Event) error {
err := cg.Consume(ctx, ziggurat.HandlerFunc(func(ctx context.Context, event *ziggurat.Event) {
atomic.AddInt32(&msgCount, 1)
return nil
}))
if !errors.Is(err, ErrCleanShutdown) {
t.Error("expected nil error, got:", err.Error())
Expand Down
2 changes: 1 addition & 1 deletion kafka/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ func processMessage(ctx context.Context, msg *kafka.Message, h ziggurat.Handler,
ReceivedTimestamp: time.Now(),
EventType: EventType,
}
l.Error("kafka processing error", h.Handle(ctx, &event))
h.Handle(ctx, &event)

}
11 changes: 5 additions & 6 deletions kafka/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ type mockHandler struct {
mock.Mock
}

func (mh *mockHandler) Handle(ctx context.Context, event *ziggurat.Event) error {
return mh.Called(ctx, event).Error(0)
func (mh *mockHandler) Handle(ctx context.Context, event *ziggurat.Event) {
return
}

func TestWorker(t *testing.T) {
Expand Down Expand Up @@ -76,8 +76,8 @@ func TestWorker(t *testing.T) {
t.Run("confluent consumer returns a fatal error", func(t *testing.T) {
mc := MockConsumer{}
w := worker{
handler: ziggurat.HandlerFunc(func(ctx context.Context, event *ziggurat.Event) error {
return nil
handler: ziggurat.HandlerFunc(func(ctx context.Context, event *ziggurat.Event) {

}),
logger: logger.NOOP,
consumer: &mc,
Expand All @@ -104,8 +104,7 @@ func TestWorker(t *testing.T) {
t.Run("worker kill", func(t *testing.T) {
mc := MockConsumer{}
w := worker{
handler: ziggurat.HandlerFunc(func(ctx context.Context, event *ziggurat.Event) error {
return nil
handler: ziggurat.HandlerFunc(func(ctx context.Context, event *ziggurat.Event) {
}),
logger: logger.NOOP,
consumer: &mc,
Expand Down
4 changes: 2 additions & 2 deletions mw/event/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

func Logger(l ziggurat.StructuredLogger) func(handler ziggurat.Handler) ziggurat.Handler {
return func(handler ziggurat.Handler) ziggurat.Handler {
f := func(ctx context.Context, event *ziggurat.Event) error {
f := func(ctx context.Context, event *ziggurat.Event) {
kvs := map[string]interface{}{
"path": event.RoutingPath,
"producer-timestamp": event.ProducerTimestamp,
Expand All @@ -21,7 +21,7 @@ func Logger(l ziggurat.StructuredLogger) func(handler ziggurat.Handler) ziggurat
}

l.Info("processing message", kvs)
return handler.Handle(ctx, event)
handler.Handle(ctx, event)
}
return ziggurat.HandlerFunc(f)
}
Expand Down
8 changes: 2 additions & 6 deletions mw/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ func Register() {

// PublishHandlerMetrics - middleware to update registered handler metrics
func PublishHandlerMetrics(next ziggurat.Handler) ziggurat.Handler {
f := func(ctx context.Context, event *ziggurat.Event) error {
f := func(ctx context.Context, event *ziggurat.Event) {
t1 := time.Now()
err := next.Handle(ctx, event)
next.Handle(ctx, event)

labels := prometheus.Labels{
RouteLabel: event.RoutingPath,
Expand All @@ -89,11 +89,7 @@ func PublishHandlerMetrics(next ziggurat.Handler) ziggurat.Handler {
HandlerDurationHistogram.With(labels).Observe(time.Since(t1).Seconds())

HandlerEventsCounter.With(labels).Inc()
if err != nil {
HandlerFailuresCounter.With(labels).Inc()
}

return err
}
return ziggurat.HandlerFunc(f)
}
6 changes: 2 additions & 4 deletions mw/rabbitmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func startConsumer(ctx context.Context, d *amqpextra.Dialer, c QueueConfig, h zi
pfc = c.ConsumerPrefetchCount
}

ogl.Info("starting consumer", map[string]any{"name": c.QueueKey, "count": c.ConsumerCount})
qname := fmt.Sprintf("%s_%s_%s", c.QueueKey, QueueTypeInstant, "queue")
consumerName := fmt.Sprintf("%s_consumer", c.QueueKey)
cons, err := d.Consumer(
Expand All @@ -35,10 +36,7 @@ func startConsumer(ctx context.Context, d *amqpextra.Dialer, c QueueConfig, h zi
return msg.Reject(true)
}
ogl.Info("amqp processing message", map[string]interface{}{"consumer": consumerName})
err = h.Handle(ctx, &event)
if err != nil {
ogl.Error("amqp message processing error", err, event.Metadata)
}
h.Handle(ctx, &event)
return msg.Ack(false)
})),
)
Expand Down
Loading

0 comments on commit ac8f71d

Please sign in to comment.