filepubsub

package
v5.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2026 License: AGPL-3.0 Imports: 25 Imported by: 0

Documentation

Overview

Package filepubsub provides an AsyncQueue adapter that wraps the gocloud pubsub file driver, enabling sync endpoints to use filesystem-backed pubsub queues.

URLs

The queue registers the "fpub" scheme. The URL format is:

fpub:///path/to/spool?name=streamname&ackdeadline=1m

Query parameters:

  • name: Required stream name (used for directory naming)
  • ackdeadline: Optional ack deadline (defaults to 1m)
  • sendbatchsize: Optional send batch size for publisher (defaults to 10)
  • recvbatchsize: Optional receive batch size for subscriber (defaults to 1)

Example:

fpub:///shared/broker?name=syncro

Package filepubsub provides a filesystem-backed pubsub implementation using gocloud.dev/pubsub. It writes messages as .pb files to a directory and uses filesystem notifications (notify) for efficient subscription instead of polling.

Use NewTopic to construct a *pubsub.Topic, and NewSubscription to construct a *pubsub.Subscription.

filepubsub provides at-least-once delivery semantics. Messages that are not acknowledged within the ack deadline will be redelivered.

URLs

For pubsub.OpenTopic and pubsub.OpenSubscription, filepubsub registers for the scheme "file". The host+path is used as the directory path for message storage. Query parameters:

  • ackdeadline: The ack deadline for OpenSubscription, in time.ParseDuration formats. Defaults to 1m.

Examples:

  • file:///shared/broker/mytopic
  • file:///var/spool/pubsub/events?ackdeadline=30s

As

filepubsub does not support any types for As.

Index

Constants

View Source
const Scheme = "file"

Scheme is the URL scheme filepubsub registers its URLOpeners under on pubsub.DefaultMux.

Variables

View Source
var (
	ErrTopicNotExist      = errors.New("filepubsub: topic does not exist")
	ErrTopicClosed        = errors.New("filepubsub: topic is closed")
	ErrSubscriptionClosed = errors.New("filepubsub: subscription is closed")
	ErrInvalidPath        = errors.New("filepubsub: path is required")
	ErrInvalidParam       = errors.New("filepubsub: invalid query parameter")
	ErrInvalidAckDeadline = errors.New("filepubsub: invalid ackdeadline")
	ErrCreateDir          = errors.New("filepubsub: failed to create directory")
	ErrWriteFile          = errors.New("filepubsub: failed to write file")
	ErrMoveFile           = errors.New("filepubsub: failed to move file")
)

Sentinel errors for filepubsub package

Functions

func NewSubscription

func NewSubscription(pstopic *pubsub.Topic, ackDeadline time.Duration) (*pubsub.Subscription, error)

NewSubscription creates a new subscription for the given topic. It panics if the given topic did not come from filepubsub.

func NewSubscriptionWithOptions

func NewSubscriptionWithOptions(pstopic *pubsub.Topic, ackDeadline time.Duration, opts *SubscriptionOptions) (*pubsub.Subscription, error)

NewSubscriptionWithOptions is similar to NewSubscription, but supports SubscriptionOptions.

func NewTopic

func NewTopic(basePath string) (*pubsub.Topic, error)

NewTopic creates a new filesystem-backed topic.

func NewTopicWithOptions

func NewTopicWithOptions(basePath string, opts *TopicOptions) (*pubsub.Topic, error)

NewTopicWithOptions is similar to NewTopic, but supports TopicOptions.

Types

type SubscriptionOptions

type SubscriptionOptions struct {
	// ReceiveBatcherOptions adds constraints to the default batching done for receives.
	ReceiveBatcherOptions batcher.Options
	// AckBatcherOptions adds constraints to the default batching done for acks.
	AckBatcherOptions batcher.Options
}

SubscriptionOptions sets options for constructing a *pubsub.Subscription backed by filepubsub.

type TopicOptions

type TopicOptions struct {
	// BatcherOptions adds constraints to the default batching done for sends.
	BatcherOptions batcher.Options
}

TopicOptions contains configuration options for topics.

type URLOpener

type URLOpener struct {
	// contains filtered or unexported fields
}

URLOpener opens filepubsub URLs like "file:///path/to/topic".

The URL's host+path is used as the directory for message storage.

Query parameters:

  • ackdeadline: The ack deadline for OpenSubscription, in time.ParseDuration formats. Defaults to 1m.

func (*URLOpener) OpenSubscriptionURL

func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)

OpenSubscriptionURL opens a pubsub.Subscription based on u.

func (*URLOpener) OpenTopicURL

func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error)

OpenTopicURL opens a pubsub.Topic based on u.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL