pubsub

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AttributeExtractor

type AttributeExtractor[T any] func(msg *T) map[string]string

AttributeExtractor is a function that extracts message attributes for Pub/Sub. These attributes are stored alongside the message and can be used for filtering.

type Client

type Client[T Message] interface {
	// Publisher returns the publisher for sending messages.
	Publisher() Publisher[T]

	// Consumer returns the consumer for receiving messages.
	Consumer() Consumer[T]

	// Close closes the client and all underlying connections.
	Close() error
}

Client defines the interface for a Pub/Sub client that provides both publishing and consuming capabilities.

type ClientConfig

type ClientConfig struct {
	ProjectID           string
	Region              string
	TopicName           string
	SubscriptionName    string
	DLQTopicName        string
	AckDeadline         time.Duration
	MinAckExtension     time.Duration
	MaxOutstandingMsgs  int
	MaxDeliveryAttempts int
	NumWorkers          int
}

ClientConfig holds configuration for the Pub/Sub client.

type Consumer

type Consumer[T any] interface {
	// Start begins receiving messages and processing them with the provided handler.
	// Blocks until the context is cancelled or an unrecoverable error occurs.
	Start(ctx context.Context, handler MessageHandler[T]) error

	// Stop gracefully shuts down the consumer.
	Stop()
}

Consumer defines the interface for consuming messages from a Pub/Sub subscription.

type Message

type Message interface {
	Marshal() ([]byte, error)
}

Message is the constraint for message types that can be published/consumed. Types must implement Marshal for serialization.

type MessageHandler

type MessageHandler[T any] func(ctx context.Context, msg *T) error

MessageHandler is a function that processes a message of type T. Returns nil on success, or an error if processing failed (will trigger nack/retry).

type MockClient

type MockClient[T Message] struct {
	MockPub *MockPublisher[T]
	MockCon *MockConsumer[T]
	Closed  bool
}

MockClient implements the Client interface for testing.

func NewMockClient

func NewMockClient[T Message]() *MockClient[T]

NewMockClient creates a new mock client for testing.

func (*MockClient[T]) Close

func (c *MockClient[T]) Close() error

func (*MockClient[T]) Consumer

func (c *MockClient[T]) Consumer() Consumer[T]

func (*MockClient[T]) Publisher

func (c *MockClient[T]) Publisher() Publisher[T]

type MockConsumer

type MockConsumer[T any] struct {
	StartCalled  bool
	StopCalled   bool
	StartError   error
	Handler      MessageHandler[T]
	StartContext context.Context
	// contains filtered or unexported fields
}

MockConsumer implements the Consumer interface for testing.

func (*MockConsumer[T]) SimulateMessage

func (c *MockConsumer[T]) SimulateMessage(ctx context.Context, msg *T) error

SimulateMessage allows tests to simulate receiving a message by invoking the handler. This must be called after Start() has been invoked.

func (*MockConsumer[T]) Start

func (c *MockConsumer[T]) Start(ctx context.Context, handler MessageHandler[T]) error

func (*MockConsumer[T]) Stop

func (c *MockConsumer[T]) Stop()

type MockPublisher

type MockPublisher[T Message] struct {
	PublishedMsgs []*T
	PublishError  error
	CloseCalled   bool
	// contains filtered or unexported fields
}

MockPublisher implements the Publisher interface for testing.

func (*MockPublisher[T]) Close

func (p *MockPublisher[T]) Close() error

func (*MockPublisher[T]) Publish

func (p *MockPublisher[T]) Publish(ctx context.Context, msg *T) error

type PubSubClient

type PubSubClient[T Message] struct {
	// contains filtered or unexported fields
}

PubSubClient implements the Client interface for GCP Pub/Sub.

func NewClient

func NewClient[T Message](
	ctx context.Context,
	cfg ClientConfig,
	attributeExtractor AttributeExtractor[T],
	unmarshaler Unmarshaler[T],
) (*PubSubClient[T], error)

NewClient creates a new Pub/Sub client with the specified configuration. It ensures the required topics and subscription exist, creating them if necessary.

The attributeExtractor function is called when publishing messages to extract key-value attributes that are stored with the message.

The unmarshaler function is called when consuming messages to deserialize the message data into the typed message struct.

func (*PubSubClient[T]) Close

func (c *PubSubClient[T]) Close() error

Close closes the client and all underlying connections.

func (*PubSubClient[T]) Consumer

func (c *PubSubClient[T]) Consumer() Consumer[T]

Consumer returns the consumer for receiving messages.

func (*PubSubClient[T]) Publisher

func (c *PubSubClient[T]) Publisher() Publisher[T]

Publisher returns the publisher for sending messages.

type Publisher

type Publisher[T Message] interface {
	// Publish sends a message to the Pub/Sub topic.
	// Blocks until the publish is confirmed or fails.
	Publish(ctx context.Context, msg *T) error

	// Close stops the publisher and flushes any pending messages.
	Close() error
}

Publisher defines the interface for publishing messages to a Pub/Sub topic.

type Unmarshaler

type Unmarshaler[T any] func(data []byte) (*T, error)

Unmarshaler is a function that deserializes message data into a typed message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL