Documentation
¶
Overview ¶
Package gcppubsub provides a pubsub implementation that uses GCP PubSub. Use OpenTopic to construct a *pubsub.Topic, and/or OpenSubscription to construct a *pubsub.Subscription.
URLs ¶
For pubsub.OpenTopic and pubsub.OpenSubscription, gcppubsubv2 registers for the scheme "gcppubsubv2". The default URL opener will creating a connection using use default credentials from the environment, as described in https://cloud.google.com/docs/authentication/production. To customize the URL opener, or for more details on the URL format, see URLOpener. See https://gocloud.dev/concepts/urls/ for background information.
GCP Pub/Sub emulator is supported as per https://cloud.google.com/pubsub/docs/emulator So, when environment variable 'PUBSUB_EMULATOR_HOST' is set driver connects to the specified emulator host by default.
Message Delivery Semantics ¶
GCP Pub/Sub supports at-least-once semantics; applications must call Message.Ack after processing a message, or it will be redelivered. See https://godoc.org/gocloud.dev/pubsub#hdr-At_most_once_and_At_least_once_Delivery for more background.
As ¶
gcppubsubv2 exposes the following types for As:
- Topic: *raw.Publisher
- Subscription: *raw.Subscriber
- Message.BeforeSend: *raw.Message
- Message.AfterSend: *string for the raw.PublishResult serverID corresponding to the message.
- Message: *raw.Message
- Error: *google.golang.org/grpc/status.Status
Example (OpenSubscriptionFromURL) ¶
package main
import (
"context"
"log"
"gocloud.dev/pubsub"
)
func main() {
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/gcppubsubv2"
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()
subscription, err := pubsub.OpenSubscription(ctx,
"gcppubsubv2://projects/my-project/subscriptions/my-subscription")
if err != nil {
log.Fatal(err)
}
defer subscription.Shutdown(ctx)
}
Example (OpenTopicFromURL) ¶
package main
import (
"context"
"log"
"gocloud.dev/pubsub"
)
func main() {
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/gcppubsubv2"
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()
topic, err := pubsub.OpenTopic(ctx, "gcppubsubv2://projects/myproject/topics/mytopic")
if err != nil {
log.Fatal(err)
}
defer topic.Shutdown(ctx)
}
Index ¶
- Constants
- Variables
- func Client(ctx context.Context, projectID gcp.ProjectID, conn *grpc.ClientConn) (*raw.Client, error)
- func Dial(ctx context.Context, ts gcp.TokenSource) (*grpc.ClientConn, func(), error)
- func OpenSubscription(client *raw.Client, subscriptionName string, opts *SubscriptionOptions) *pubsub.Subscription
- func OpenSubscriptionByPath(client *raw.Client, subscriptionPath string, opts *SubscriptionOptions) (*pubsub.Subscription, error)
- func OpenTopic(client *raw.Client, topicName string, opts *TopicOptions) *pubsub.Topic
- func OpenTopicByPath(client *raw.Client, topicPath string, opts *TopicOptions) (*pubsub.Topic, error)
- type SubscriptionOptions
- type TopicOptions
- type URLOpener
Examples ¶
Constants ¶
const Scheme = "gcppubsubv2"
Scheme is the URL scheme gcppubsubv2 registers its URLOpeners under on pubsub.DefaultMux.
Variables ¶
var Set = wire.NewSet( Dial, Client, wire.Struct(new(SubscriptionOptions)), wire.Struct(new(TopicOptions)), wire.Struct(new(URLOpener), "Conn", "TopicOptions", "SubscriptionOptions"), )
Set holds Wire providers for this package.
Functions ¶
func Client ¶
func Client(ctx context.Context, projectID gcp.ProjectID, conn *grpc.ClientConn) (*raw.Client, error)
Client returns a *raw.Client that can be used in OpenTopic and/or OpenSubscription. conn is optional.
func Dial ¶
func Dial(ctx context.Context, ts gcp.TokenSource) (*grpc.ClientConn, func(), error)
Dial opens a gRPC connection to the GCP Pub Sub API.
The second return value is a function that can be called to clean up the connection opened by Dial.
func OpenSubscription ¶
func OpenSubscription(client *raw.Client, subscriptionName string, opts *SubscriptionOptions) *pubsub.Subscription
OpenSubscription returns a *pubsub.Subscription backed by an existing GCP PubSub subscription subscriptionName. See the package documentation for an example.
Example ¶
package main
import (
"context"
"log"
"gocloud.dev/gcp"
"gocloud.dev/pubsub/gcppubsubv2"
)
func main() {
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()
// Your GCP credentials.
// See https://cloud.google.com/docs/authentication/production
// for more info on alternatives.
creds, err := gcp.DefaultCredentials(ctx)
if err != nil {
log.Fatal(err)
}
// Open a gRPC connection to the GCP Pub/Sub API.
// Alternatively, skip this and pass nil to Client below to use the default.
conn, cleanup, err := gcppubsubv2.Dial(ctx, creds.TokenSource)
if err != nil {
log.Fatal(err)
}
defer cleanup()
// Construct a Client using the connection.
client, err := gcppubsubv2.Client(ctx, "myprojectID", conn)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Construct a *pubsub.Subscription.
subscription := gcppubsubv2.OpenSubscription(client, "example-subscription", nil)
defer subscription.Shutdown(ctx)
}
func OpenSubscriptionByPath ¶
func OpenSubscriptionByPath(client *raw.Client, subscriptionPath string, opts *SubscriptionOptions) (*pubsub.Subscription, error)
OpenSubscriptionByPath returns a *pubsub.Subscription backed by an existing GCP PubSub subscription. subscriptionPath must be of the form "projects/<projectID>/subscriptions/<subscription>". See the package documentation for an example.
func OpenTopic ¶
OpenTopic returns a *pubsub.Topic backed by an existing GCP PubSub topic topicName is the last part of the full topic path, e.g., "foo" from "projects/<projectID>/topic/foo". See the package documentation for an example.
Example ¶
package main
import (
"context"
"log"
"gocloud.dev/gcp"
"gocloud.dev/pubsub/gcppubsubv2"
)
func main() {
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()
// Your GCP credentials.
// See https://cloud.google.com/docs/authentication/production
// for more info on alternatives.
creds, err := gcp.DefaultCredentials(ctx)
if err != nil {
log.Fatal(err)
}
// Open a gRPC connection to the GCP Pub/Sub API.
// Alternatively, skip this and pass nil to Client below to use the default.
conn, cleanup, err := gcppubsubv2.Dial(ctx, creds.TokenSource)
if err != nil {
log.Fatal(err)
}
defer cleanup()
// Construct a Client using the connection.
client, err := gcppubsubv2.Client(ctx, "myprojectid", conn)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Construct a *pubsub.Topic.
topic := gcppubsubv2.OpenTopic(client, "example-topic", nil)
defer topic.Shutdown(ctx)
}
func OpenTopicByPath ¶
func OpenTopicByPath(client *raw.Client, topicPath string, opts *TopicOptions) (*pubsub.Topic, error)
OpenTopicByPath returns a *pubsub.Topic backed by an existing GCP PubSub topic. topicPath must be of the form "projects/<projectID>/topic/<topic>". See the package documentation for an example.
Types ¶
type SubscriptionOptions ¶
type SubscriptionOptions struct {
// MaxBatchSize caps the maximum batch size used when retrieving messages. It defaults to 1000.
MaxBatchSize int
// ReceiveBatcherOptions adds constraints to the default batching done for receives.
ReceiveBatcherOptions batcher.Options
// AckBatcherOptions adds constraints to the default batching done for acks.
AckBatcherOptions batcher.Options
}
SubscriptionOptions will contain configuration for subscriptions.
type TopicOptions ¶
type TopicOptions struct {
// BatcherOptions adds constraints to the default batching done for sends.
BatcherOptions batcher.Options
}
TopicOptions will contain configuration for topics.
type URLOpener ¶
type URLOpener struct {
// Conn must be set to a non-nil ClientConn authenticated with
// Cloud Pub/Sub scope or equivalent.
Conn *grpc.ClientConn
// TopicOptions specifies the options to pass to OpenTopic.
TopicOptions TopicOptions
// SubscriptionOptions specifies the options to pass to OpenSubscription.
SubscriptionOptions SubscriptionOptions
}
URLOpener opens GCP Pub/Sub URLs like "gcppubsubv2://projects/myproject/topics/mytopic" for topics or "gcppubsubv2://projects/myproject/subscriptions/mysub" for subscriptions.
The shortened forms "gcppubsubv2://myproject/mytopic" for topics or "gcppubsubv2://myproject/mysub" for subscriptions are also supported.
The following query parameters are supported:
- max_recv_batch_size: sets SubscriptionOptions.MaxBatchSize.
- max_send_batch_size: sets TopicOptions.BatcherOptions.MaxBatchSize.
Currently their use is limited to subscribers.
func (*URLOpener) OpenSubscriptionURL ¶
func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)
OpenSubscriptionURL opens a pubsub.Subscription based on u.