-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
171 additions
and
25 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,30 +1,41 @@ | ||
# This workflow will build a golang project | ||
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go | ||
|
||
name: Go | ||
name: Go | ||
|
||
on: | ||
push: | ||
branches: [ "main" ] | ||
branches: ["main"] | ||
pull_request: | ||
branches: [ "main" ] | ||
branches: ["main"] | ||
|
||
jobs: | ||
|
||
build: | ||
runs-on: ubuntu-latest | ||
steps: | ||
- uses: actions/checkout@v4 | ||
- uses: actions/checkout@v4 | ||
|
||
- name: Set up Go | ||
uses: actions/setup-go@v5 | ||
with: | ||
go-version-file: "go.mod" | ||
|
||
- run: go version | ||
|
||
- name: Build | ||
run: go build -v ./... | ||
|
||
test: | ||
runs-on: ubuntu-latest | ||
steps: | ||
- uses: actions/checkout@v4 | ||
|
||
- name: Set up Go | ||
uses: actions/setup-go@v5 | ||
with: | ||
go-version-file: 'go.mod' | ||
|
||
- run: go version | ||
- name: Set up Go | ||
uses: actions/setup-go@v5 | ||
with: | ||
go-version-file: "go.mod" | ||
|
||
- name: Build | ||
run: go build -v ./... | ||
- run: go version | ||
|
||
- name: Test | ||
run: go test -v ./... | ||
- name: Test | ||
run: go test -v ./... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
# Screamer... (S)panner (C)hangest(REAM) read(ER) | ||
|
||
[![Test](https://github.com/anicoll/screamer/actions/workflows/go.yml/badge.svg)](https://github.com/anicoll/screamer/actions/workflows/go.yaml) | ||
[![Go Reference](https://pkg.go.dev/badge/github.com/anicoll/screamer.svg)](https://pkg.go.dev/github.com/anicoll/screamer) | ||
|
||
Cloud Spanner Change Streams Subscriber for Go | ||
|
||
### Sypnosis | ||
|
||
This library is an implementation to subscribe a change stream's records of Google Cloud Spanner in Go. | ||
It is heavily inspired by the SpannerIO connector of the [Apache Beam SDK](https://github.com/apache/beam) and is compatible with the PartitionMetadata data model. | ||
|
||
### Motivation | ||
|
||
To read a change streams, Google Cloud offers [Dataflow connector](https://cloud.google.com/spanner/docs/change-streams/use-dataflow) as a scalable and reliable solution, but in some cases the abstraction and capabilities of Dataflow pipelines can be too much (or is simply too expensive). | ||
This library aims to make reading change streams native for non beam/dataflow use cases. | ||
|
||
## Example Usage | ||
|
||
```go | ||
package main | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"os" | ||
"os/signal" | ||
"sync" | ||
|
||
"cloud.google.com/go/spanner" | ||
"github.com/anicoll/screamer" | ||
"github.com/anicoll/screamer/partitionstorage" | ||
) | ||
|
||
func main() { | ||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) | ||
defer stop() | ||
|
||
database := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "foo-project", "bar-instance", "baz-database") | ||
spannerClient, err := spanner.NewClient(ctx, database) | ||
if err != nil { | ||
panic(err) | ||
} | ||
defer spannerClient.Close() | ||
|
||
partitionMetadataTableName := "PartitionMetadata_FooStream" | ||
partitionStorage := partitionstorage.NewSpanner(spannerClient, partitionMetadataTableName) | ||
if err := partitionStorage.CreateTableIfNotExists(ctx); err != nil { | ||
panic(err) | ||
} | ||
|
||
changeStreamName := "FooStream" | ||
subscriber := spream.NewSubscriber(spannerClient, changeStreamName, partitionStorage) | ||
|
||
fmt.Fprintf(os.Stderr, "Reading the stream...\n") | ||
logger := &Logger{out: os.Stdout} | ||
if err := subscriber.Subscribe(ctx, logger); err != nil && !errors.Is(ctx.Err(), context.Canceled) { | ||
panic(err) | ||
} | ||
} | ||
|
||
type Logger struct { | ||
out io.Writer | ||
mu sync.Mutex | ||
} | ||
|
||
func (l *Logger) Consume(change *spream.DataChangeRecord) error { | ||
l.mu.Lock() | ||
defer l.mu.Unlock() | ||
return json.NewEncoder(l.out).Encode(change) | ||
} | ||
``` | ||
|
||
## CLI | ||
|
||
### Installation | ||
|
||
```console | ||
$ go install github.com/anicoll/screamer@latest | ||
``` | ||
|
||
### Usage | ||
|
||
``` | ||
NAME: | ||
screamer screamer | ||
USAGE: | ||
screamer screamer [command options] | ||
OPTIONS: | ||
--dsn value [$DSN] | ||
--stream value [$STREAM] | ||
--metadata-table value [$METADATA_TABLE] | ||
--start value (default: Start timestamp with RFC3339 format, default: current timestamp) [$START] | ||
--end value (default: End timestamp with RFC3339 format default: indefinite) [$END] | ||
--heartbeat-interval value (default: 10s) [$HEARTBEAT_INTERVAL] | ||
--partition-dsn value (default: Database dsn for use by the partition metadata table. If not provided, the main dsn will be used.) [$PARTITION_DSN] | ||
--help, -h show help | ||
``` | ||
|
||
### Example | ||
|
||
|
||
## Credits | ||
|
||
Heavily inspired by below projects. | ||
|
||
- spream (https://github.com/toga4/spream) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters