Skip to content

Commit

Permalink
docs(pubsublite): extra examples and documentation clean up (#3663)
Browse files Browse the repository at this point in the history
tmdiep authored Feb 9, 2021
1 parent 28decb5 commit eebb48a
Showing 5 changed files with 88 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pubsublite/README.md
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ To publish messages to a topic:
const topic = "projects/project-id/locations/us-central1-b/topics/topic1"
publisher, err := pscompat.NewPublisherClient(ctx, topic)
if err != nil {
log.Fatal(err)
log.Fatal(err)
}

// Publish "hello world".
45 changes: 31 additions & 14 deletions pubsublite/doc.go
Original file line number Diff line number Diff line change
@@ -29,17 +29,34 @@ https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite.
More information about Pub/Sub Lite is available at
https://cloud.google.com/pubsub/lite.
Note: This library is in ALPHA. Backwards-incompatible changes may be made
before stable v1.0.0 is released.
Introduction
See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts,
connection pooling and similar aspects of this package.
Note: This library is in ALPHA. Backwards-incompatible changes may be made
before stable v1.0.0 is released.
The following imports are required for code snippets below:
import (
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite"
"cloud.google.com/go/pubsublite/pscompat"
)
More complete examples can be found at
https://pkg.go.dev/cloud.google.com/go/pubsublite#pkg-examples
and
https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-examples.
Creating Topics
Messages are published to topics. Pub/Sub Lite topics may be created like so:
ctx := context.Background()
const topicPath = "projects/my-project/locations/us-central1-c/topics/my-topic"
topicConfig := pubsublite.TopicConfig{
Name: topicPath,
@@ -53,16 +70,15 @@ Messages are published to topics. Pub/Sub Lite topics may be created like so:
if err != nil {
// TODO: Handle error.
}
topic, err = adminClient.CreateTopic(ctx, topicConfig)
if err != nil {
if _, err = adminClient.CreateTopic(ctx, topicConfig); err != nil {
// TODO: Handle error.
}
See https://cloud.google.com/pubsub/lite/docs/topics for more information about
how Pub/Sub Lite topics are configured.
See https://cloud.google.com/pubsub/lite/docs/locations for the list of regions
and zones where Pub/Sub Lite is available.
See https://cloud.google.com/pubsub/lite/docs/locations for the list of zones
where Pub/Sub Lite is available.
Publishing
@@ -100,8 +116,9 @@ service:
// TODO: Handle error.
}
Once you've finishing publishing, call Stop to flush all messages to the service
and close gRPC streams:
Once you've finishing publishing all messages, call Stop to flush all messages
to the service and close gRPC streams. The PublisherClient can no longer be used
after it has been stopped or has terminated due to a permanent service error.
publisher.Stop()
@@ -123,8 +140,7 @@ Pub/Sub Lite subscriptions may be created like so:
Topic: topicPath,
DeliveryRequirement: pubsublite.DeliverImmediately,
}
subscription, err = adminClient.CreateSubscription(ctx, subscriptionConfig)
if err != nil {
if _, err = adminClient.CreateSubscription(ctx, subscriptionConfig); err != nil {
// TODO: Handle error.
}
@@ -138,7 +154,9 @@ To receive messages for a subscription, first create a SubscriberClient:
subscriber, err := pscompat.NewSubscriberClient(ctx, subscriptionPath)
Messages are then consumed from a subscription via callback.
Messages are then consumed from a subscription via callback. The callback may be
invoked concurrently by multiple goroutines (one per partition that the
subscriber client is connected to).
cctx, cancel := context.WithCancel(ctx)
err = subscriber.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
@@ -149,9 +167,8 @@ Messages are then consumed from a subscription via callback.
// TODO: Handle error.
}
The callback may be invoked concurrently by multiple goroutines (one per
partition that the subscriber client is connected to). To terminate a call to
Receive, cancel its context:
Receive blocks until either the context is canceled or a fatal service error
occurs. To terminate a call to Receive, cancel its context:
cancel()
17 changes: 17 additions & 0 deletions pubsublite/example_test.go
Original file line number Diff line number Diff line change
@@ -22,8 +22,14 @@ import (
"google.golang.org/api/iterator"
)

// This example demonstrates how to create a new topic.
// See https://cloud.google.com/pubsub/lite/docs/topics for more information
// about how Pub/Sub Lite topics are configured.
// See https://cloud.google.com/pubsub/lite/docs/locations for the list of zones
// where Pub/Sub Lite is available.
func ExampleAdminClient_CreateTopic() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the topic.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
@@ -47,6 +53,7 @@ func ExampleAdminClient_CreateTopic() {

func ExampleAdminClient_UpdateTopic() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the topic.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
@@ -67,6 +74,7 @@ func ExampleAdminClient_UpdateTopic() {

func ExampleAdminClient_DeleteTopic() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the topic.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
@@ -80,6 +88,7 @@ func ExampleAdminClient_DeleteTopic() {

func ExampleAdminClient_Topics() {
ctx := context.Background()
// NOTE: region must correspond to the zone below.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
@@ -101,6 +110,7 @@ func ExampleAdminClient_Topics() {

func ExampleAdminClient_TopicSubscriptions() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the topic.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
@@ -121,8 +131,12 @@ func ExampleAdminClient_TopicSubscriptions() {
}
}

// This example demonstrates how to create a new subscription for a topic.
// See https://cloud.google.com/pubsub/lite/docs/subscriptions for more
// information about how subscriptions are configured.
func ExampleAdminClient_CreateSubscription() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the topic and subscription.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
@@ -143,6 +157,7 @@ func ExampleAdminClient_CreateSubscription() {

func ExampleAdminClient_UpdateSubscription() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the subscription.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
@@ -162,6 +177,7 @@ func ExampleAdminClient_UpdateSubscription() {

func ExampleAdminClient_DeleteSubscription() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the subscription.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
@@ -175,6 +191,7 @@ func ExampleAdminClient_DeleteSubscription() {

func ExampleAdminClient_Subscriptions() {
ctx := context.Background()
// NOTE: region must correspond to the zone below.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
6 changes: 3 additions & 3 deletions pubsublite/pscompat/doc.go
Original file line number Diff line number Diff line change
@@ -15,9 +15,9 @@
Package pscompat contains clients for publishing and subscribing using the
Pub/Sub Lite service.
The clients in this package are designed to compatible with the Cloud Pub/Sub
library: https://pkg.go.dev/cloud.google.com/go/pubsub. If interfaces are
defined by the client, PublisherClient and SubscriberClient can be used as
This package is designed to compatible with the Cloud Pub/Sub library:
https://pkg.go.dev/cloud.google.com/go/pubsub. If interfaces are defined by the
client application, PublisherClient and SubscriberClient can be used as
substitutions for pubsub.Topic.Publish() and pubsub.Subscription.Receive(),
respectively, from the pubsub package.
40 changes: 36 additions & 4 deletions pubsublite/pscompat/example_test.go
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ func ExamplePublisherClient_Publish() {
Data: []byte("hello world"),
})
results = append(results, r)
// Do other work ...
// Publish more messages ...
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
@@ -68,7 +68,7 @@ func ExamplePublisherClient_Publish_batchingSettings() {
Data: []byte("hello world"),
})
results = append(results, r)
// Do other work ...
// Publish more messages ...
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
@@ -92,13 +92,15 @@ func ExamplePublisherClient_Error() {
Data: []byte("hello world"),
})
results = append(results, r)
// Do other work ...
// Publish more messages ...
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
// TODO: Handle error.
if err == pscompat.ErrPublisherStopped {
// Prints the fatal error that caused the publisher to terminate.
fmt.Printf("Publisher client stopped due to error: %v\n", publisher.Error())
break
}
}
fmt.Printf("Published a message with a message ID: %s\n", id)
@@ -151,6 +153,36 @@ func ExampleSubscriberClient_Receive_maxOutstanding() {
// TODO: Handle error.
}

// Call cancel from callback, or another goroutine.
// Call cancel from the receiver callback or another goroutine to stop
// receiving.
cancel()
}

// This example shows how to manually assign which topic partitions a
// SubscriberClient should connect to. If not specified, the SubscriberClient
// will use Pub/Sub Lite's partition assignment service to automatically
// determine which partitions it should connect to.
func ExampleSubscriberClient_Receive_manualPartitionAssignment() {
ctx := context.Background()
const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription"
settings := pscompat.DefaultReceiveSettings
// NOTE: The corresponding topic must have 2 or more partitions.
settings.Partitions = []int{0, 1}
subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscription, settings)
if err != nil {
// TODO: Handle error.
}
cctx, cancel := context.WithCancel(ctx)
err = subscriber.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
// TODO: Handle message.
// NOTE: May be called concurrently; synchronize access to shared memory.
m.Ack()
})
if err != nil {
// TODO: Handle error.
}

// Call cancel from the receiver callback or another goroutine to stop
// receiving.
cancel()
}

0 comments on commit eebb48a

Please sign in to comment.