Skip to content

Commit

Permalink
updates docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Shubhang Balkundi committed Mar 26, 2024
1 parent e948772 commit 70faddc
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 48 deletions.
151 changes: 150 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,159 @@ Consumer Orchestration made easy
go install github.com/gojekfarm/ziggurat/v2/cmd/ziggurat@latest
```

### I already have an application and I just want to use Ziggurat Go without scaffolding a new app
#### Run the following command in your go application directory
```shell
go get github.com/gojekfarm/ziggurat/v2
```

### create a new app using the `new` command

```shell
ziggurat new <app_name>
go mod tidy -v #cleans up dependencies
go mod tidy -v # cleans up dependencies
```

### Features

- Ziggurat-Go enables you to orchestrate multiple message consumers by decoupling the consumer implementation from the
orchestration
- A small and simple API footprint
- Ziggurat Go currently supports only Kafka as a message consumer implementation
- Ziggurat Go includes a regex based router to support complex routing patterns
- Ziggurat Go provides a RabbitMQ middleware for retrying messages
- Ziggurat Go provides a RabbitMQ message consumer implementation to consume "retried" messages from RabbitMQ
- Ziggurat Go also includes a Prometheus middleware and exposes a Prometheus exporter server for instrumentation

### How to consume messages from Kafka

```go
package main

import (
"context"
"github.com/gojekfarm/ziggurat/v2"
"github.com/gojekfarm/ziggurat/v2/kafka"
"github.com/gojekfarm/ziggurat/v2/logger"
)

func main() {
var zig ziggurat.Ziggurat
router := ziggurat.NewRouter()

ctx := context.Background()
l := logger.NewLogger(logger.LevelInfo)

kcg := kafka.ConsumerGroup{
Logger: nil,
GroupConfig: kafka.ConsumerConfig{
BootstrapServers: "localhost:9092",
GroupID: "foo.id",
Topics: []string{"foo"},
},
}

router.HandlerFunc("foo.id/*", func(ctx context.Context, event *ziggurat.Event) error {
return nil
})

h := ziggurat.Use(router)

if runErr := zig.Run(ctx, h, &kcg); runErr != nil {
l.Error("error running consumers", runErr)
}

}
```
### Configuring the `Ziggurat` struct
```go
ziggurat.Ziggurat{
Logger StructuredLogger // a logger implementation of ziggurat.StructuredLogger
WaitTimeout time.Duration // wait timeout when consumers are shutdown
ErrorHandler func(err error) // a notifier for when one of the message consumers is shutdown abruptly
}
```
> [!NOTE]
> The zero value of `ziggurat.Ziggurat` is perfectly valid and can be used without any issues
### Ziggurat Run method
The `ziggurat.Run` method is used to start the consumer orchestration. It takes in a `context.Context` implementation, a `ziggurat.Handler` and a variable number of message consumer implementations.
```go
ctx := context.Background()
h := ziggurat.HandlerFunc(func(context.Context,*ziggurat.Event) error {...})
groupOne := kafka.ConsumerGroup{...}
groupTwo := kafka.ConsumerGroup{...}
if runErr := zig.Run(ctx, h, &groupOne,&groupTwo); runErr != nil {
logger.Error("error running consumers", runErr)
}
```

### Ziggurat Handler interface
The `ziggurat.Handler` is an interface for handling ziggurat events, an event is just something that happens in a finite timeframe. This event can come from
any source (kafka,redis,rabbitmq). The handler's job is to handle the event, i.e... the handler contains your application's business logic
```go
type Handler interface {
Handle(ctx context.Context, event *Event) error
}
type HandlerFunc func(ctx context.Context, event *Event) error // serves as an adapter for normal functions to be used as handlers
```
> Any function / struct which implements the above handler interface can be used in the ziggurat.Run method. The ziggurat.Router also implements the above interface.
### Ziggurat Event struct
The `ziggurat.Event` struct is a generic event struct that is passed to the handler. This is a pointer value and should not be modified by handlers as it is not thread safe. The struct can be cloned and modified.
#### Description
```go
ziggurat.Event{
Metadata map[string]any `json:"meta"` // metadata is a generic map for storing event related info
Value []byte `json:"value"` // a byte slice value which contains the actual message
Key []byte `json:"key"` // a byte slice value which contains the actual key
RoutingPath string `json:"routing_path"` // an arbitrary string set by the message consumer implementation
ProducerTimestamp time.Time `json:"producer_timestamp"` // the producer timestamp set by the message consumer implementation
ReceivedTimestamp time.Time `json:"received_timestamp"` // the timestamp at which the message was ingested by the system, this is also set by the message consumer implementation
EventType string `json:"event_type"` // the type of event, ex:= kafka,rabbitmq, this is also set by the message consumer implementation
}

```



## Kafka consumer configuration

```go
type ConsumerConfig struct {
BootstrapServers string // A required comma separated list of broker addresses
DebugLevel string // generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, mock, assignor, conf, all
GroupID string // A required string
Topics []string // A required non-empty list of topics to consume from
AutoCommitInterval int // A commit Interval time in milliseconds
ConsumerCount int // Number of concurrent consumer instances to consume from Kafka
PollTimeout int // Kafka Poll timeout in milliseconds
RouteGroup string // An optional route group to use for routing purposes
AutoOffsetReset string // earliest or latest
PartitionAssignment string // refer partition.assignment.strategy https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
MaxPollIntervalMS int // Kafka Failure detection interval in milliseconds
}
```

## How to use the router

A router enables you to handle complex routing problems by defining handler functions for predefined regex paths.

### A practical example

I am consuming from the kafka topic `mobile-application-logs` which has 12 partitions. All the even partitions contain logs for android devices and all the odd partitions contain logs for iOS devices. I want to execute different logic for logs from different platforms.

The `ziggurat.Event` struct contains a field called `RoutingPath` this field is set by the `MessageConsumer` implementation, the Kafka implementation uses the following format
```text
<consumer_group_id>/<topic_name>/<partition>
ex: mobile_app_log_consumer/mobile-application-logs/1
```

```go
router := ziggurat.NewRouter()
// to execute logic for iOS logs I would use this
router.HandlerFunc("mobile_app_log_consumer/mobile-application-logs/(1|3|5|7|9|11)", func (ctx, *ziggurat.Event) error {....})
// to execute logic for Android logs I would use this
router.HandlerFunc("mobile_app_log_consumer/mobile-application-logs/(2|4|6|8|10|12)", func (ctx, *ziggurat.Event) error {....})
```
Based on how the routing path is set by the message consumer implementation, you can define your routing paths.
27 changes: 2 additions & 25 deletions example/sampleapp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,8 @@ package main
import (
"context"
"github.com/gojekfarm/ziggurat/v2"
"math/rand"
"time"

"github.com/gojekfarm/ziggurat/v2/kafka"
"github.com/gojekfarm/ziggurat/v2/logger"
"github.com/gojekfarm/ziggurat/v2/mw/rabbitmq"
)

func main() {
Expand All @@ -27,33 +23,14 @@ func main() {
},
}

ar := rabbitmq.AutoRetry([]rabbitmq.QueueConfig{
{
QueueKey: "plain_text_messages_retry",
DelayExpirationInMS: "500",
ConsumerPrefetchCount: 1,
ConsumerCount: 1,
RetryCount: 1,
},
}, rabbitmq.WithLogger(l),
rabbitmq.WithUsername("user"),
rabbitmq.WithPassword("bitnami"),
rabbitmq.WithConnectionTimeout(3*time.Second))

router.HandlerFunc("cpool/", func(ctx context.Context, event *ziggurat.Event) error {
if rand.Intn(1000)%2 == 0 {
l.Info("retrying")
err := ar.Retry(ctx, event, "plain_text_messages_retry")
l.Info("retrying finished")
return err
}
router.HandlerFunc("foo.id/*", func(ctx context.Context, event *ziggurat.Event) error {
return nil
})

h := ziggurat.Use(router)

if runErr := zig.Run(ctx, h, &kcg); runErr != nil {
l.Error("error running streams", runErr)
l.Error("error running consumers", runErr)
}

}
1 change: 0 additions & 1 deletion mw/rabbitmq/pubpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package rabbitmq
import (
"context"
"fmt"

"github.com/makasim/amqpextra"
"github.com/makasim/amqpextra/logger"
"github.com/makasim/amqpextra/publisher"
Expand Down
47 changes: 26 additions & 21 deletions router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,42 @@ func Test_match(t *testing.T) {
tests := []struct {
want string
name string
path string
paths []string
input []routerEntry
}{
{
name: "should match the longest prefix without regex",
want: "localhost:9092",
path: "localhost:9092/foo_consumer",
input: []routerEntry{{pattern: "localhost:9092", handler: nil}},
want: "foo.id",
paths: []string{"foo.id/foo_consumer"},
input: []routerEntry{{pattern: "foo.id", handler: nil}},
},
{
name: "should match when topic regex is provided",
want: "localhost:9092/foo_consumer/.*-log",
path: "localhost:9092/foo_consumer/message-log/0",
input: []routerEntry{{pattern: "localhost:9092/foo_consumer/.*-log", handler: nil}},
want: "foo.id/foo_consumer/.*-log",
paths: []string{"foo.id/foo_consumer/message-log/0", "foo.id/foo_consumer/app-log/0"},
input: []routerEntry{{pattern: "foo.id/foo_consumer/.*-log", handler: nil}},
},
{
name: "should match exact partition when a certain partition is specified",
want: "",
path: "localhost:9092/foo_consumer/message-log/10",
input: []routerEntry{{pattern: "localhost:9092/foo_consumer/message-log/1$", handler: nil}},
paths: []string{"foo.id/foo_consumer/message-log/10"},
input: []routerEntry{{pattern: "foo.id/foo_consumer/message-log/1$", handler: nil}},
},
{
name: "should match the longest prefix with regex",
want: "localhost:9092/foo_consumer/.*-log/\\d+",
path: "localhost:9092/foo_consumer/message-log/5",
name: "should match the longest prefix with regex",
want: "foo.id/foo_consumer/.*-log/\\d+",
paths: []string{"foo.id/foo_consumer/message-log/5"},
input: []routerEntry{
{pattern: "localhost:9092/foo_consumer/.*-log/\\d+", handler: nil},
{pattern: "localhost:9092/foo_consumer/.*-log", handler: nil},
{pattern: "foo.id/foo_consumer/.*-log/\\d+", handler: nil},
{pattern: "foo.id/foo_consumer/.*-log", handler: nil},
},
},
{
name: "should not match similar consumer group names",
want: "",
path: "localhost:9092/foo_consumer/",
name: "should not match similar consumer group names",
want: "",
paths: []string{"foo.id/foo_consumer/"},
input: []routerEntry{
{pattern: "localhost:9092/foo/"},
{pattern: "foo.id/foo/"},
},
},
}
Expand All @@ -69,10 +69,15 @@ func Test_match(t *testing.T) {
var r Router
r.es = test.input
r.handlerEntry = esToMap(test.input)
_, m := r.match(test.path)
if m != test.want {
t.Errorf("%s test failed, expected %q got %q", test.name, test.want, m)
for _, path := range test.paths {
_, m := r.match(path)
t.Logf("match:%s\n", m)
if m != test.want {
t.Errorf("%s test failed, expected %q got %q", test.name, test.want, m)
return
}
}

})
}
}
Expand Down

0 comments on commit 70faddc

Please sign in to comment.