Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add CLI #1

Merged
merged 3 commits into from
Jan 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 46 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ Cloud Spanner Change Streams Subscriber for Go

### Sypnosis

This library is an implementation to subscribe a change stream's records of Google Spanner in Go.
This library is an implementation to subscribe a change stream's records of Google Cloud Spanner in Go.
It is heavily inspired by the SpannerIO connector of the [Apache Beam SDK](https://github.com/apache/beam) and is compatible with the PartitionMetadata data model.

### Motivation

To read a change streams, Google Cloud offers [Dataflow connector](https://cloud.google.com/spanner/docs/change-streams/use-dataflow) as a scalable and reliable solution, but in some cases the abstraction and capabilities of Dataflow pipelines can be too much (or is simply too expensive).
For more flexibility, use the change stream API directly, but it is a bit complex.
This library aims to make reading change streams more flexible and casual to use.
This library aims to make reading change streams more flexible and casual, while maintaining an easily transition to the use of Dataflow connectors as needed.

## Example Usage

Expand Down Expand Up @@ -74,3 +74,47 @@ func (l *Logger) Consume(change *spream.DataChangeRecord) error {
return json.NewEncoder(l.out).Encode(change)
}
```

## CLI

Use the CLI as a tool for tracking change streams or as a more detailed implementation example.

### Installation

```console
$ go install github.com/toga4/spream/cmd/spream@latest
```

### Usage

```
Usage: spream [OPTIONS...]

Options:
-d, --database (required) Database name of change stream with the form 'projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID'.
-s, --stream (required) Change stream name
-t, --metadata-table Table name for partition metadata (default: store partition metadata on memory, not Cloud Spanner)
--start Start timestamp with RFC3339 format (default: current timestamp)
--end End timestamp with RFC3339 format (default: indefinite)
--heartbeat-interval Heartbeat interval with time.Duration format (default: 10s)
--priority [high|medium|low] Request priority for Cloud Spanner (default: high)
--metadata-database Database name of partition metadata table (default: same as database option)
-h, --help Print this message
```

### Example

```
$ spream -d projects/my-project/instances/my-instance/databases/my-database -s SingerStream
Waiting changes...
{"commit_timestamp":"2023-01-08T05:47:57.998479Z","record_sequence":"00000000","server_transaction_id":"ODIzNDU0OTc2NzUxOTc0NTU1OQ==","is_last_record_in_transaction_in_partition":true,"table_name":"Singers","column_types":[{"name":"SingerId","type":{"code":"INT64"},"is_primary_key":true,"ordinal_position":1},{"name":"Name","type":{"code":"STRING"},"ordinal_position":2}],"mods":[{"keys":{"SingerId":"1"},"new_values":{"Name":"foo"}}],"mod_type":"INSERT","value_capture_type":"OLD_AND_NEW_VALUES","number_of_records_in_transaction":1,"number_of_partitions_in_transaction":1,"transaction_tag":"","is_system_transaction":false}
{"commit_timestamp":"2023-01-08T05:47:58.766575Z","record_sequence":"00000000","server_transaction_id":"MjQ3ODQzMDcxOTMwNjcyODg4Nw==","is_last_record_in_transaction_in_partition":true,"table_name":"Singers","column_types":[{"name":"SingerId","type":{"code":"INT64"},"is_primary_key":true,"ordinal_position":1},{"name":"Name","type":{"code":"STRING"},"ordinal_position":2}],"mods":[{"keys":{"SingerId":"1"},"new_values":{"Name":"bar"},"old_values":{"Name":"foo"}}],"mod_type":"UPDATE","value_capture_type":"OLD_AND_NEW_VALUES","number_of_records_in_transaction":1,"number_of_partitions_in_transaction":1,"transaction_tag":"","is_system_transaction":false}
{"commit_timestamp":"2023-01-08T05:47:59.117807Z","record_sequence":"00000000","server_transaction_id":"ODkwNDMzNDgxMDU2NzAwMDM2MA==","is_last_record_in_transaction_in_partition":true,"table_name":"Singers","column_types":[{"name":"SingerId","type":{"code":"INT64"},"is_primary_key":true,"ordinal_position":1},{"name":"Name","type":{"code":"STRING"},"ordinal_position":2}],"mods":[{"keys":{"SingerId":"1"},"old_values":{"Name":"bar"}}],"mod_type":"DELETE","value_capture_type":"OLD_AND_NEW_VALUES","number_of_records_in_transaction":1,"number_of_partitions_in_transaction":1,"transaction_tag":"","is_system_transaction":false}
```

## Credits

Heavily inspired by below projects.

- The SpannerIO connector of the Apache Beam SDK. (https://github.com/apache/beam)
- spanner-change-streams-tail (https://github.com/cloudspannerecosystem/spanner-change-streams-tail)
183 changes: 183 additions & 0 deletions cmd/spream/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package main

import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"os"
"os/signal"
"sync"
"time"

"cloud.google.com/go/spanner"
"cloud.google.com/go/spanner/apiv1/spannerpb"
"github.com/toga4/spream"
"github.com/toga4/spream/partitionstorage"
)

type flags struct {
database string
streamName string
startTimestamp time.Time
endTimestamp time.Time
heartbeatInterval time.Duration
priority spannerpb.RequestOptions_Priority
metadataTableName string
metadataDatabase string
}

const (
priorityHigh = "high"
priorityMedium = "medium"
priorityLow = "low"
)

func parseFlags(cmd string, args []string) (*flags, error) {
var flags flags

fs := flag.NewFlagSet(cmd, flag.ExitOnError)
fs.Usage = func() {
fmt.Fprintf(os.Stderr, `Usage: %s [OPTIONS...]

Options:
-d, --database (required) Database name of change stream with the form 'projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID'.
-s, --stream (required) Change stream name
-t, --metadata-table Table name for partition metadata (default: store partition metadata on memory, not Cloud Spanner)
--start Start timestamp with RFC3339 format (default: current timestamp)
--end End timestamp with RFC3339 format (default: indefinite)
--heartbeat-interval Heartbeat interval with time.Duration format (default: 10s)
--priority [high|medium|low] Request priority for Cloud Spanner (default: high)
--metadata-database Database name of partition metadata table (default: same as database option)
-h, --help Print this message

`, cmd)
}

fs.StringVar(&flags.database, "d", "", "")
fs.StringVar(&flags.streamName, "s", "", "")
fs.StringVar(&flags.metadataTableName, "t", "", "")

fs.StringVar(&flags.database, "database", "", "")
fs.StringVar(&flags.streamName, "stream", "", "")
fs.StringVar(&flags.metadataTableName, "metadata-table", flags.database, "")
fs.StringVar(&flags.metadataDatabase, "metadata-database", flags.database, "")
fs.DurationVar(&flags.heartbeatInterval, "heartbeat-interval", 10*time.Second, "")

var start, end, priority string
fs.StringVar(&start, "start", "", "")
fs.StringVar(&end, "end", "", "")
fs.StringVar(&priority, "priority", "", "")

if err := fs.Parse(args); err != nil {
return nil, err
}

if flags.database == "" || flags.streamName == "" {
fs.Usage()
return nil, errors.New("database and stream is required")
}

if start != "" {
t, err := time.Parse(time.RFC3339, start)
if err != nil {
fs.Usage()
return nil, fmt.Errorf("invalid start timestamp: %v", err)
}
flags.startTimestamp = t
}
if end != "" {
t, err := time.Parse(time.RFC3339, end)
if err != nil {
fs.Usage()
return nil, fmt.Errorf("invalid end timestamp: %v", err)
}
flags.startTimestamp = t
}
if priority != "" {
switch priority {
case priorityHigh:
flags.priority = spannerpb.RequestOptions_PRIORITY_HIGH
case priorityMedium:
flags.priority = spannerpb.RequestOptions_PRIORITY_MEDIUM
case priorityLow:
flags.priority = spannerpb.RequestOptions_PRIORITY_LOW
default:
fs.Usage()
return nil, fmt.Errorf("invalid priority: %v", priority)
}
}

return &flags, nil
}

type jsonOutputConsumer struct {
out io.Writer
mu sync.Mutex
}

func (l *jsonOutputConsumer) Consume(change *spream.DataChangeRecord) error {
l.mu.Lock()
defer l.mu.Unlock()
return json.NewEncoder(l.out).Encode(change)
}

func main() {
flags, err := parseFlags(os.Args[0], os.Args[1:])
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}

ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer stop()

spannerClient, err := spanner.NewClient(ctx, flags.database)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
defer spannerClient.Close()

var partitionStorage spream.PartitionStorage
if flags.metadataTableName == "" {
partitionStorage = partitionstorage.NewInmemory()
} else {
metadataSpannerClient, err := spanner.NewClient(ctx, flags.metadataDatabase)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
ps := partitionstorage.NewSpanner(metadataSpannerClient, flags.metadataTableName)
if err := ps.CreateTableIfNotExists(ctx); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
partitionStorage = ps
}

options := []spream.Option{}
if !flags.startTimestamp.IsZero() {
options = append(options, spream.WithStartTimestamp(flags.startTimestamp))
}
if !flags.endTimestamp.IsZero() {
options = append(options, spream.WithEndTimestamp(flags.endTimestamp))
}
if flags.heartbeatInterval != 0 {
options = append(options, spream.WithHeartbeatInterval(flags.heartbeatInterval))
}
if flags.priority != spannerpb.RequestOptions_PRIORITY_UNSPECIFIED {
options = append(options, spream.WithSpannerRequestPriotiry(flags.priority))
}

subscriber := spream.NewSubscriber(spannerClient, flags.streamName, partitionStorage, options...)
consumer := &jsonOutputConsumer{out: os.Stdout}

fmt.Fprintln(os.Stderr, "Waiting changes...")
if err := subscriber.Subscribe(ctx, consumer); err != nil && !errors.Is(ctx.Err(), context.Canceled) {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
12 changes: 6 additions & 6 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type config struct {
spannerRequestPriority spannerpb.RequestOptions_Priority
}

type option interface {
type Option interface {
Apply(*config)
}

Expand All @@ -60,7 +60,7 @@ func (o withStartTimestamp) Apply(c *config) {
//
// The value must be within the retention period of the change stream and before the current time.
// Default value is current timestamp.
func WithStartTimestamp(startTimestamp time.Time) option {
func WithStartTimestamp(startTimestamp time.Time) Option {
return withStartTimestamp(startTimestamp)
}

Expand All @@ -74,7 +74,7 @@ func (o withEndTimestamp) Apply(c *config) {
//
// The value must be within the retention period of the change stream and must be after the start timestamp.
// If not set, read latest changes until canceled.
func WithEndTimestamp(endTimestamp time.Time) option {
func WithEndTimestamp(endTimestamp time.Time) Option {
return withEndTimestamp(endTimestamp)
}

Expand All @@ -87,7 +87,7 @@ func (o withHeartbeatInterval) Apply(c *config) {
// WithHeartbeatInterval set the heartbeat interval for read change streams.
//
// Default value is 10 seconds.
func WithHeartbeatInterval(heartbeatInterval time.Duration) option {
func WithHeartbeatInterval(heartbeatInterval time.Duration) Option {
return withHeartbeatInterval(heartbeatInterval)
}

Expand All @@ -100,7 +100,7 @@ func (o withSpannerRequestPriotiry) Apply(c *config) {
// WithSpannerRequestPriotiry set the request priority option for read change streams.
//
// Default value is unspecified, equivalent to high.
func WithSpannerRequestPriotiry(priority spannerpb.RequestOptions_Priority) option {
func WithSpannerRequestPriotiry(priority spannerpb.RequestOptions_Priority) Option {
return withSpannerRequestPriotiry(priority)
}

Expand All @@ -116,7 +116,7 @@ func NewSubscriber(
client *spanner.Client,
streamName string,
partitionStorage PartitionStorage,
options ...option,
options ...Option,
) *Subscriber {
c := &config{
startTimestamp: nowFunc(),
Expand Down