Skip to content

Commit

Permalink
Merge pull request #333 from batchcorp/blinktag/rename_batch
Browse files Browse the repository at this point in the history
Rename remaining batch.sh references
  • Loading branch information
blinktag authored Feb 6, 2023
2 parents 838b444 + c62128f commit 3d63c40
Show file tree
Hide file tree
Showing 51 changed files with 1,706 additions and 1,674 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pr-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '^1.15.4' # The Go version to download (if necessary) and use.
go-version: '~1.16' # The Go version to download (if necessary) and use.
- name: Start up dependencies
run: docker-compose up -d
- name: Wait for dependencies to start up
Expand All @@ -49,6 +49,6 @@ jobs:
TEST_API_TOKEN: ${{ secrets.TEST_API_TOKEN }}
TEST_COLLECTION_TOKEN: ${{ secrets.TEST_COLLECTION_TOKEN }}
run: |
go run main.go server &
go run main.go server --auth-token "streamdal" &
sleep 5
go test --tags=functional
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ The tool enables you to:
* Decode protobuf/avro/thrift/JSON data in real-time
* Support for both Deep and Shallow protobuf envelope types
* Support for google.protobuf.Any fields
* Relay data to the [Batch platform](https://batch.sh)
* Ship change data capture events to [Batch platform](https://batch.sh)
* [Replay events into a message system on your local network](https://docs.batch.sh/what-are/what-are-destinations/plumber-as-a-destination)
* Relay data to the [Streamdal platform](https://streamdal.com)
* Ship change data capture events to [Streamdal platform](https://streamdal.com)
* [Replay events into a message system on your local network](https://docs.streamdal.com/what-are/what-are-destinations/plumber-as-a-destination)
* And _many_ other features (for a full list: `plumber -h`)

<sub>\[1] It's like `curl` for messaging systems.</sub>
Expand Down Expand Up @@ -159,7 +159,7 @@ $ plumber read kafka --help
## Hmm, what is this Batch thing?

We are distributed system enthusiasts that started a company called
[Batch](https://batch.sh).
[Streamdal](https://streamdal.com).

Our company focuses on solving data stream observability for complex systems
and workflows. Our goal is to allow _everyone_ to build asynchronous systems,
Expand All @@ -183,7 +183,7 @@ We consider ourselves "internet plumbers" of sort - so the name seemed to fit :)
* RabbitMQ Streams
* Google Cloud Platform PubSub
* MQTT
* Amazon Kinesis Streams **(NEW)**
* Amazon Kinesis Streams
* Amazon SQS
* Amazon SNS (Publishing)
* ActiveMQ (STOMP protocol)
Expand Down Expand Up @@ -223,14 +223,14 @@ in the Batch platform as a _replay destination_.
This mitigates the need make firewall changes to replay messages from a Batch
collection back to your message bus.

See https://docs.batch.sh/what-are/what-are-destinations/plumber-as-a-destination
See https://docs.streamdal.com/what-are/what-are-destinations/plumber-as-a-destination
for full documentation.

## High Performance & High Availability
`plumber` comes with a "server" mode which will cause plumber to operate as a
highly available cluster.

You can read more about "server mode" [here](https://docs.batch.sh/plumber/server-mode).
You can read more about "server mode" [here](https://docs.streamdal.com/plumber/server-mode).

Server mode examples can be found in [docs/server.md](https://github.com/batchcorp/plumber/blob/master/docs/server.md)

Expand Down
2 changes: 1 addition & 1 deletion backends/gcppubsub/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (g *GCPPubSub) Relay(ctx context.Context, relayOpts *opts.RelayOptions, rel

sub := g.client.Subscription(relayOpts.GcpPubsub.Args.SubscriptionId)

g.log.Infof("Relaying GCP pubsub messages from '%s' queue -> '%s'", sub.ID(), relayOpts.XBatchshGrpcAddress)
g.log.Infof("Relaying GCP pubsub messages from '%s' queue -> '%s'", sub.ID(), relayOpts.XStreamdalGrpcAddress)

for {
select {
Expand Down
8 changes: 4 additions & 4 deletions backends/batch/auth.go → backends/streamdal/auth.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package batch
package streamdal

import (
"bufio"
Expand Down Expand Up @@ -36,7 +36,7 @@ var (
)

// Login attempts to login to the Streamdal API using credentials supplied via stdin
func (b *Batch) Login() error {
func (b *Streamdal) Login() error {

// No credentials, or expired, ask for username/password
username, err := readUsername(os.Stdin)
Expand Down Expand Up @@ -71,7 +71,7 @@ func (b *Batch) Login() error {
}

// Logout logs a user out of the Streamdal API and clears saved credentials
func (b *Batch) Logout() error {
func (b *Streamdal) Logout() error {
// Perform APi logout
b.Post("/auth/logout", nil)

Expand All @@ -88,7 +88,7 @@ func (b *Batch) Logout() error {
}

// Authenticate makes an API call to the Streamdal API with the given account's credentials
func (b *Batch) Authenticate(username, password string) (*AuthResponse, error) {
func (b *Streamdal) Authenticate(username, password string) (*AuthResponse, error) {
res, code, err := b.Post("/v1/login", map[string]interface{}{
"email": username,
"password": password,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package batch
package streamdal

import (
"bytes"
Expand All @@ -22,7 +22,7 @@ func TestAuthenticate(t *testing.T) {
}
}`

b := BatchWithMockResponse(200, apiResponse)
b := StreamdalWithMockResponse(200, apiResponse)

output, err := b.Authenticate("[email protected]", "password123")
g.Expect(err).ToNot(HaveOccurred())
Expand Down
36 changes: 18 additions & 18 deletions backends/batch/collections.go → backends/streamdal/collections.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package batch
package streamdal

import (
"bufio"
Expand Down Expand Up @@ -60,7 +60,7 @@ var (
)

// ListCollections lists all of an account's collections
func (b *Batch) ListCollections() error {
func (b *Streamdal) ListCollections() error {
output, err := b.listCollections()
if err != nil {
return err
Expand All @@ -70,7 +70,7 @@ func (b *Batch) ListCollections() error {
return nil
}

func (b *Batch) listCollections() ([]CollectionOutput, error) {
func (b *Streamdal) listCollections() ([]CollectionOutput, error) {
res, _, err := b.Get("/v1/collection", nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -106,19 +106,19 @@ func (b *Batch) listCollections() ([]CollectionOutput, error) {
}

// SearchCollection queries a collection
func (b *Batch) SearchCollection() error {
return b.search(PageSize*int(b.Opts.Batch.Search.Page), PageSize, int(b.Opts.Batch.Search.Page))
func (b *Streamdal) SearchCollection() error {
return b.search(PageSize*int(b.Opts.Streamdal.Search.Page), PageSize, int(b.Opts.Streamdal.Search.Page))
}

// search recursively displays pages of (PageSize) results until no more are available
func (b *Batch) search(from, size, page int) error {
func (b *Streamdal) search(from, size, page int) error {
p := map[string]interface{}{
"query": b.Opts.Batch.Search.Query,
"query": b.Opts.Streamdal.Search.Query,
"from": from,
"size": size,
}

res, _, err := b.Post("/v1/collection/"+b.Opts.Batch.Search.CollectionId+"/search", p)
res, _, err := b.Post("/v1/collection/"+b.Opts.Streamdal.Search.CollectionId+"/search", p)
if err != nil {
return errors.Wrap(err, "unable to complete search request")
}
Expand All @@ -128,7 +128,7 @@ func (b *Batch) search(from, size, page int) error {
return errors.Wrap(err, "failed to search collection")
}

// Our JSON output should be human readable
// Our JSON output should be human-readable
m, err := json.MarshalIndent(results.Data, "", " ")
if err != nil {
return errors.Wrap(err, "could not display search results")
Expand Down Expand Up @@ -158,7 +158,7 @@ func (b *Batch) search(from, size, page int) error {
return nil
}

func (b *Batch) getDataLakeID() (string, error) {
func (b *Streamdal) getDataLakeID() (string, error) {
res, _, err := b.Get("/v1/datalake", nil)
if err != nil {
return "", err
Expand All @@ -176,7 +176,7 @@ func (b *Batch) getDataLakeID() (string, error) {
return lakes[0].ID, nil
}

func (b *Batch) CreateCollection() error {
func (b *Streamdal) CreateCollection() error {
// Get datalake ID
datalakeID, err := b.getDataLakeID()
if err != nil {
Expand All @@ -185,14 +185,14 @@ func (b *Batch) CreateCollection() error {

// Create collection
p := map[string]interface{}{
"schema_id": b.Opts.Batch.Create.Collection.SchemaId,
"name": b.Opts.Batch.Create.Collection.Name,
"notes": b.Opts.Batch.Create.Collection.Notes,
"schema_id": b.Opts.Streamdal.Create.Collection.SchemaId,
"name": b.Opts.Streamdal.Create.Collection.Name,
"notes": b.Opts.Streamdal.Create.Collection.Notes,
"datalake_id": datalakeID,
"envelope_type": b.Opts.Batch.Create.Collection.EnvelopeType,
"envelope_root_message": b.Opts.Batch.Create.Collection.EnvelopeRootMessage,
"payload_field_id": b.Opts.Batch.Create.Collection.PayloadFieldId,
"payload_root_message": b.Opts.Batch.Create.Collection.PayloadFieldMessage,
"envelope_type": b.Opts.Streamdal.Create.Collection.EnvelopeType,
"envelope_root_message": b.Opts.Streamdal.Create.Collection.EnvelopeRootMessage,
"payload_field_id": b.Opts.Streamdal.Create.Collection.PayloadFieldId,
"payload_root_message": b.Opts.Streamdal.Create.Collection.PayloadFieldMessage,
}

res, code, err := b.Post("/v1/collection", p)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package batch
package streamdal

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("Batch", func() {
var _ = Describe("Streamdal", func() {
Context("ListCollections", func() {
It("returns an error when empty", func() {
b := BatchWithMockResponse(200, `[]`)
b := StreamdalWithMockResponse(200, `[]`)

output, err := b.listCollections()
Expect(err).To(Equal(errNoCollections))
Expect(len(output)).To(Equal(0))
})

It("returns error on a bad response", func() {
b := BatchWithMockResponse(200, `{}`)
b := StreamdalWithMockResponse(200, `{}`)

output, err := b.listCollections()
Expect(err).To(Equal(errCollectionsFailed))
Expand Down Expand Up @@ -87,7 +87,7 @@ var _ = Describe("Batch", func() {
}
]`

b := BatchWithMockResponse(200, apiResponse)
b := StreamdalWithMockResponse(200, apiResponse)

output, err := b.listCollections()
Expect(err).ToNot(HaveOccurred())
Expand All @@ -105,7 +105,7 @@ var _ = Describe("Batch", func() {
It("returns an error when no datalakes exist", func() {
apiResponse := `[]`

b := BatchWithMockResponse(200, apiResponse)
b := StreamdalWithMockResponse(200, apiResponse)

output, err := b.getDataLakeID()
Expect(err).To(HaveOccurred())
Expand All @@ -128,7 +128,7 @@ var _ = Describe("Batch", func() {
}
]`

b := BatchWithMockResponse(200, apiResponse)
b := StreamdalWithMockResponse(200, apiResponse)

output, err := b.getDataLakeID()
Expect(err).ToNot(HaveOccurred())
Expand Down
Loading

0 comments on commit 3d63c40

Please sign in to comment.