pbclient

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2026 License: MIT Imports: 14 Imported by: 0

README

PocketBase Client

Go client library for PocketBase with automatic admin auth, generic repository helpers, and a simple KV store built on collections.

Installation

go get github.com/eqr/pbclient

Quick Start

package main

import (
	"context"
	"log"

	"github.com/eqr/pbclient"
)

type Todo struct {
	ID    string `json:"id"`
	Title string `json:"title"`
	Done  bool   `json:"done"`
}

func main() {
	client, err := pbclient.NewClient("https://your-pocketbase-host")
	if err != nil {
		log.Fatal(err)
	}

	authed, err := client.AuthenticateSuperuser(pbclient.Credentials{
		Email:    "admin@example.com",
		Password: "super-secret",
	})
	if err != nil {
		log.Fatal(err)
	}

	repo := pbclient.NewRepository[Todo](authed, "todos")

	ctx := context.Background()

	created, err := repo.Create(ctx, Todo{Title: "try pbclient"})
	if err != nil {
		log.Fatal(err)
	}

	item, err := repo.Get(ctx, created.ID)
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("got todo: %+v", item)
}

Client Options

  • WithHTTPClient(*http.Client): reuse your own transport (e.g., tracing, custom TLS).
  • WithTimeout(time.Duration): set HTTP timeout.
  • WithRetry(maxRetries, backoff): retry 429/network errors with exponential backoff.
  • WithLogger(*slog.Logger): structured logging for auth and retries.

Repository Usage

client, _ := pbclient.NewClient("https://your-pocketbase-host")
authed, _ := client.AuthenticateSuperuser(pbclient.Credentials{Email: "admin@example.com", Password: "super-secret"})
repo := pbclient.NewRepository[Todo](authed, "todos")

// List with filters, sorting, and field selection
todos, err := repo.List(ctx, pbclient.ListOptions{
	Page:   1,
	PerPage: 20,
	Filter: pbclient.And(pbclient.Eq("done", "false"), pbclient.Gt("priority", "3")),
	Sort:   "-created",
	Fields: []string{"id", "title", "done"},
})

// Update an item
updated, err := repo.Update(ctx, created.ID, Todo{Title: "updated title", Done: true})

KV Store Usage

kv := pbclient.NewTypedKVStore[map[string]any](authed, "kv_store", "myapp") // collection defaults to "kv_store"

_ = kv.Set(ctx, "feature_flag", map[string]any{"name": "beta", "enabled": true})

flag, _ := kv.Get(ctx, "feature_flag")

exists, _ := kv.Exists(ctx, "feature_flag")
keys, _ := kv.List(ctx, "feature_")
_ = kv.Delete(ctx, "feature_flag")

Consumer (Message Processing)

The Consumer provides offset-tracked message consumption from PocketBase collections, similar to Kafka consumers. It supports:

  • Offset tracking: Automatic tracking of processed messages using timestamps or record IDs
  • Consumer groups: Multiple consumers can share progress tracking
  • Filtering: Apply PocketBase filters to consume specific messages
  • At-least-once delivery: Messages are reprocessed if not committed
  • Graceful shutdown: Clean resource cleanup
Basic Consumer Usage
import (
	"github.com/eqr/pbclient"
	"github.com/eqr/pbclient/migrations"
)

// Run migration to create consumer_offsets collection
runner := migrations.NewRunner(authed)
runner.Register(migrations.NewConsumerOffsetsMigration())
runner.Run(ctx)

// Create consumer with timestamp-based offset tracking
consumer, err := pbclient.NewConsumer[Event](authed, "events", pbclient.ConsumerOptions{
	ConsumerGroup:  "event-processor",
	OffsetProvider: pbclient.NewTimestampOffsetProvider(),
	BatchSize:      100,
	PollInterval:   1 * time.Second,
})
defer consumer.Close()

// Start consuming
consumer.Start(ctx)

// Process messages
for {
	select {
	case msg := <-consumer.Messages():
		// Process the message
		fmt.Printf("Processing: %+v\n", msg.Data)
		
		// Commit to mark as processed
		if err := msg.Commit(ctx); err != nil {
			log.Printf("commit failed: %v", err)
		}
		
	case err := <-consumer.Errors():
		log.Printf("consumer error: %v", err)
	}
}
Offset Providers

Two built-in offset providers are available:

Timestamp-based (recommended for most cases):

OffsetProvider: pbclient.NewTimestampOffsetProvider()
  • Uses PocketBase's created timestamp field
  • Works with any collection without schema changes
  • Starting fresh begins from current time

ID-based:

OffsetProvider: pbclient.NewIDOffsetProvider()
  • Uses PocketBase's 15-character record IDs
  • Lexicographic ordering
  • Starting fresh processes all existing records
Consumer Options
  • ConsumerGroup (required): Unique identifier for this consumer group
  • OffsetProvider (required): Strategy for tracking offsets
  • Filter: PocketBase filter to apply (e.g., "type = 'user_action'")
  • BatchSize: Number of messages to fetch per poll (default: 100)
  • PollInterval: Time between polls (default: 1s)
  • BufferSize: Message channel buffer size (default: 100)
  • OffsetCollection: Collection name for storing offsets (default: "consumer_offsets")
Consumer Groups

WARNING: Consumer groups provide shared offset tracking, NOT load balancing like Kafka.

Multiple consumers with the same ConsumerGroup will:

  • Read from the same offset and see the same messages
  • Have potential race conditions when committing offsets
  • May process duplicate messages

This pattern is useful for:

  • High availability (one consumer can take over if another dies)
  • NOT for load balancing (use different consumer groups for independent processing)
// Consumer 1
consumer1, _ := pbclient.NewConsumer[Event](authed, "events", pbclient.ConsumerOptions{
	ConsumerGroup:  "shared-group",
	OffsetProvider: pbclient.NewTimestampOffsetProvider(),
})

// Consumer 2 - shares offset but will process duplicate messages
consumer2, _ := pbclient.NewConsumer[Event](authed, "events", pbclient.ConsumerOptions{
	ConsumerGroup:  "shared-group", // Same group name
	OffsetProvider: pbclient.NewTimestampOffsetProvider(),
})
Examples

See examples/consumer_example.go for complete working examples including:

  • Basic message processing
  • ID-based offset tracking
  • Filtered consumption
  • Shared offset tracking (with warnings about race conditions)

Filters

Helpers for PocketBase filter strings:

  • Eq/Neq/Gt/Gte/Lt/Lte
  • And/Or to combine conditions

Example: pbclient.And(pbclient.Eq("status", "active"), pbclient.Gt("score", "10"))

Error Handling

Common HTTP statuses map to sentinel errors (ErrBadRequest, ErrUnauthorized, ErrForbidden, ErrNotFound, ErrConflict, ErrValidation, ErrRateLimited, ErrServer). Other statuses return *HTTPError with status/message.

Thread Safety

Client is safe for concurrent use; token access is locked and retries respect context cancellation. Repository and KV helpers share the same client and rely on PocketBase for atomicity.

Embedded PocketBase Tests

Use github.com/eqr/pbclient/testpb to spin up embedded PocketBase in tests and get authenticated pbclient.AuthenticatedClient instances.

package myapp_test

import (
	"context"
	"net/http"
	"testing"

	"github.com/eqr/pbclient"
	"github.com/eqr/pbclient/testpb"
)

func TestRepositoryAgainstEmbeddedPocketBase(t *testing.T) {
	pb := testpb.Start(t)
	client := pb.SuperuserClient(t, "test@example.com")

	repo := pbclient.NewRepository[map[string]any](client, "demo2")
	page, err := repo.List(context.Background(), pbclient.ListOptions{PerPage: 1})
	if err != nil {
		t.Fatalf("List: %v", err)
	}
	if len(page.Items) == 0 {
		t.Fatalf("expected seeded records")
	}

	resp, err := client.Do(context.Background(), http.MethodGet, "/api/collections/demo2", nil)
	if err != nil || resp.StatusCode != http.StatusOK {
		t.Fatalf("collection request failed: err=%v status=%v", err, resp.StatusCode)
	}
	resp.Body.Close()
}

testpb.Start supports options:

  • testpb.WithDataDir(path) to override the PocketBase test data directory.
  • testpb.WithHookBinder(func(app *tests.TestApp) error { ... }) to register hooks before routes are served (github.com/pocketbase/pocketbase/tests).

testpb itself is not integration-tagged, so you can use it in regular tests.

Run tests with:

go test ./...

By default, testpb reuses PocketBase's bundled test data from either vendor/ or module cache. If test data is not present in either location, testpb.Start-based tests are skipped.

Migrations

The migrations package ships an HTTP-based runner. By default it auto-creates a pb_migrations collection with authenticated-only rules and records applied migration names/timestamps. Example:

runner := migrations.NewRunner(client) // auto-creates pb_migrations with auth-only access rules
_ = runner.Register(migrations.MyFirstMigration{})
_ = runner.Run(ctx)

pending, _ := runner.Pending(ctx)
_ = runner.Down(ctx, 1) // roll back latest

To disable auto-creation and require a pre-provisioned collection, pass migrations.WithAutoCreate(false); the runner will return ErrCollectionNotFound if the collection is missing.

License

MIT – see LICENSE for details.

Release Notes (v0.0.1)

  • Client with lazy admin auth, retries, and token refresh.
  • Generic repository with CRUD, filtering, pagination helpers.
  • KV store abstraction on top of collections.
  • Filter builders (Eq, Gt, And, etc.).
  • PocketBase-aware error mapping.
  • Extensive unit tests and an optional integration test.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrBadRequest   = errors.New("bad request")
	ErrUnauthorized = errors.New("unauthorized")
	ErrForbidden    = errors.New("forbidden")
	ErrNotFound     = errors.New("not found")
	ErrConflict     = errors.New("conflict")
	ErrValidation   = errors.New("validation failed")
	ErrRateLimited  = errors.New("rate limited")
	ErrServer       = errors.New("server error")
)

Sentinel errors returned for common HTTP statuses.

Functions

func And

func And(filters ...string) string

And joins filters with logical AND, skipping empty entries.

func Eq

func Eq(field, value string) string

Eq builds an equality filter: field='value'.

func Gt

func Gt(field, value string) string

Gt builds a greater-than filter: field>value.

func Gte

func Gte(field, value string) string

Gte builds a greater-than-or-equal filter: field>=value.

func Lt

func Lt(field, value string) string

Lt builds a less-than filter: field<value.

func Lte

func Lte(field, value string) string

Lte builds a less-than-or-equal filter: field<=value.

func Neq

func Neq(field, value string) string

Neq builds a not-equal filter: field!='value'.

func Or

func Or(filters ...string) string

Or joins filters with logical OR, skipping empty entries.

Types

type AuthenticatedClient added in v0.0.3

type AuthenticatedClient interface {
	Do(ctx context.Context, method, path string, body io.Reader) (*http.Response, error)
}

AuthenticatedClient provides authenticated HTTP access to PocketBase.

type Client

type Client interface {
	AuthenticateUser(creds Credentials) (AuthenticatedClient, error)
	AuthenticateSuperuser(creds Credentials) (AuthenticatedClient, error)
}

Client provides unauthenticated access to PocketBase and can create authenticated clients.

func NewClient

func NewClient(baseURL string, opts ...ClientOption) (Client, error)

NewClient constructs a PocketBase client.

type ClientOption

type ClientOption func(*client)

ClientOption configures optional Client settings.

func WithHTTPClient

func WithHTTPClient(hc *http.Client) ClientOption

WithHTTPClient overrides the default http.Client.

func WithLogger

func WithLogger(logger *slog.Logger) ClientOption

WithLogger attaches a logger used for debug information.

func WithRetry

func WithRetry(maxRetries int, backoff time.Duration) ClientOption

WithRetry sets the maximum number of retries for transient errors and the base backoff.

func WithTimeout

func WithTimeout(timeout time.Duration) ClientOption

WithTimeout sets the HTTP client timeout.

type Consumer added in v0.0.3

type Consumer[T any] struct {
	// contains filtered or unexported fields
}

Consumer manages offset-tracked consumption from a PocketBase collection.

func NewConsumer added in v0.0.3

func NewConsumer[T any](
	client AuthenticatedClient,
	collection string,
	opts ConsumerOptions,
) (*Consumer[T], error)

NewConsumer creates a consumer that reads from a collection with offset tracking.

func (*Consumer[T]) Close added in v0.0.3

func (c *Consumer[T]) Close() error

Close stops the consumer and releases resources.

func (*Consumer[T]) Errors added in v0.0.3

func (c *Consumer[T]) Errors() <-chan error

Errors returns the channel for receiving errors.

func (*Consumer[T]) Messages added in v0.0.3

func (c *Consumer[T]) Messages() <-chan Message[T]

Messages returns the channel for receiving messages.

func (*Consumer[T]) Start added in v0.0.3

func (c *Consumer[T]) Start(ctx context.Context) error

Start begins consuming messages. Call this before reading from Messages() channel.

type ConsumerOptions added in v0.0.3

type ConsumerOptions struct {
	// ConsumerGroup identifies this consumer (REQUIRED).
	// Multiple consumers with the same group name will share offset tracking.
	//
	// WARNING: Unlike Kafka, this does NOT provide load balancing.
	// Multiple consumers in the same group will:
	//   - See and process the SAME messages (duplicates)
	//   - Have race conditions on offset commits
	//   - Risk losing at-least-once delivery guarantees
	// Use shared groups only for high availability, not load distribution.
	ConsumerGroup string

	// OffsetProvider determines offset tracking strategy (REQUIRED).
	// Use NewTimestampOffsetProvider() or NewIDOffsetProvider().
	OffsetProvider OffsetProvider

	// Filter to apply when fetching messages (optional).
	Filter string

	// BatchSize for polling messages (default: 100).
	BatchSize int

	// PollInterval between checks for new messages (default: 1s).
	PollInterval time.Duration

	// BufferSize for message channel (default: 100).
	BufferSize int

	// ErrorBufferSize for error channel (default: 10).
	ErrorBufferSize int

	// OffsetCollection is the collection name for storing offsets (default: "consumer_offsets").
	OffsetCollection string
}

ConsumerOptions configures consumer behavior.

type Credentials added in v0.0.3

type Credentials struct {
	Email    string
	Password string
}

Credentials holds authentication credentials.

type HTTPError

type HTTPError struct {
	Status  int
	Message string
}

HTTPError captures the status and response message for non-2xx responses.

func (*HTTPError) Error

func (e *HTTPError) Error() string

type IDOffsetProvider added in v0.0.3

type IDOffsetProvider struct{}

IDOffsetProvider tracks offsets using record IDs with lexicographic ordering. PocketBase record IDs are 15-character alphanumeric strings.

func (*IDOffsetProvider) BuildFilter added in v0.0.3

func (i *IDOffsetProvider) BuildFilter(lastOffset string, userFilter string) string

func (*IDOffsetProvider) ExtractOffset added in v0.0.3

func (i *IDOffsetProvider) ExtractOffset(record map[string]any) (string, error)

func (*IDOffsetProvider) InitialOffset added in v0.0.3

func (i *IDOffsetProvider) InitialOffset() string

func (*IDOffsetProvider) SortField added in v0.0.3

func (i *IDOffsetProvider) SortField() string

type KVStore

type KVStore struct {
	// contains filtered or unexported fields
}

KVStore offers simple key-value helpers backed by PocketBase.

func NewKVStore

func NewKVStore(client AuthenticatedClient, collection string, appName string) KVStore

NewKVStore creates a key-value store backed by the provided collection. If collection is empty, a default "kv" collection is used. appName scopes keys when the backing collection includes an "appname" field.

func (KVStore) Delete

func (s KVStore) Delete(ctx context.Context, key string) error

Delete removes a key. It is idempotent and returns nil if the key does not exist.

func (KVStore) Exists

func (s KVStore) Exists(ctx context.Context, key string) (bool, error)

Exists returns true if a key exists.

func (KVStore) Get

func (s KVStore) Get(ctx context.Context, key string) (json.RawMessage, error)

Get fetches a value for the given key as raw JSON bytes. For collections using a text field, the returned bytes contain the decoded JSON string.

func (KVStore) List

func (s KVStore) List(ctx context.Context, prefix string) ([]string, error)

List returns all keys, optionally filtered by prefix.

func (KVStore) Set

func (s KVStore) Set(ctx context.Context, key string, value interface{}) error

Set inserts or overwrites a value for the given key.

type ListOptions

type ListOptions struct {
	Page    int
	PerPage int
	Filter  string
	Sort    string
	Fields  []string
}

ListOptions describes pagination and filtering options for list calls.

type ListResult

type ListResult[T any] struct {
	Items      []T
	Page       int
	PerPage    int
	TotalItems int
	TotalPages int
}

ListResult contains a page of items with pagination metadata.

type Message added in v0.0.3

type Message[T any] struct {
	Data T
	// contains filtered or unexported fields
}

Message wraps a record with commit capability.

func (*Message[T]) Commit added in v0.0.3

func (m *Message[T]) Commit(ctx context.Context) error

Commit marks this message as successfully processed by updating the consumer offset.

type OffsetProvider added in v0.0.3

type OffsetProvider interface {
	// BuildFilter creates a filter expression for records after the given offset.
	// lastOffset is the last processed offset (empty string if starting fresh).
	// userFilter is an optional additional filter to combine.
	BuildFilter(lastOffset string, userFilter string) string

	// ExtractOffset extracts the offset value from a record.
	// Record is represented as a map for generic handling.
	ExtractOffset(record map[string]any) (string, error)

	// InitialOffset returns the offset to use when starting fresh (no saved offset).
	InitialOffset() string

	// SortField returns the field name to sort by for this offset provider.
	// Example: "+created" or "+id"
	SortField() string
}

OffsetProvider defines how offsets are tracked and queried for message consumption.

func NewIDOffsetProvider added in v0.0.3

func NewIDOffsetProvider() OffsetProvider

NewIDOffsetProvider creates an ID-based offset provider.

func NewTimestampOffsetProvider added in v0.0.3

func NewTimestampOffsetProvider() OffsetProvider

NewTimestampOffsetProvider creates a timestamp-based offset provider.

type Repository

type Repository[T any] struct {
	// contains filtered or unexported fields
}

Repository exposes CRUD helpers for PocketBase collections.

func NewRepository

func NewRepository[T any](client AuthenticatedClient, collection string) *Repository[T]

NewRepository creates a repository bound to a PocketBase collection.

func (*Repository[T]) Create

func (r *Repository[T]) Create(ctx context.Context, record T) (*T, error)

Create inserts a new record.

func (*Repository[T]) Delete

func (r *Repository[T]) Delete(ctx context.Context, id string) error

Delete removes a record by ID.

func (*Repository[T]) Get

func (r *Repository[T]) Get(ctx context.Context, id string) (*T, error)

Get fetches a single record by ID.

func (*Repository[T]) List

func (r *Repository[T]) List(ctx context.Context, opts ListOptions) (*ListResult[T], error)

List returns a page of records using the provided options.

func (*Repository[T]) Update

func (r *Repository[T]) Update(ctx context.Context, id string, record T) (*T, error)

Update patches an existing record.

type TimestampOffsetProvider added in v0.0.3

type TimestampOffsetProvider struct{}

TimestampOffsetProvider tracks offsets using PocketBase's built-in created timestamp. This works with any collection without requiring schema changes.

func (*TimestampOffsetProvider) BuildFilter added in v0.0.3

func (t *TimestampOffsetProvider) BuildFilter(lastOffset string, userFilter string) string

func (*TimestampOffsetProvider) ExtractOffset added in v0.0.3

func (t *TimestampOffsetProvider) ExtractOffset(record map[string]any) (string, error)

func (*TimestampOffsetProvider) InitialOffset added in v0.0.3

func (t *TimestampOffsetProvider) InitialOffset() string

func (*TimestampOffsetProvider) SortField added in v0.0.3

func (t *TimestampOffsetProvider) SortField() string

type TypedKVStore added in v0.0.3

type TypedKVStore[T any] struct {
	// contains filtered or unexported fields
}

TypedKVStore provides typed helpers built on top of KVStore.

func NewTypedKVStore added in v0.0.3

func NewTypedKVStore[T any](client AuthenticatedClient, collection string, appName string) TypedKVStore[T]

NewTypedKVStore creates a typed KV store bound to a PocketBase collection.

func (TypedKVStore[T]) Delete added in v0.0.3

func (s TypedKVStore[T]) Delete(ctx context.Context, key string) error

Delete removes a key. It is idempotent and returns nil if the key does not exist.

func (TypedKVStore[T]) Exists added in v0.0.3

func (s TypedKVStore[T]) Exists(ctx context.Context, key string) (bool, error)

Exists returns true if a key exists.

func (TypedKVStore[T]) Get added in v0.0.3

func (s TypedKVStore[T]) Get(ctx context.Context, key string) (T, error)

Get fetches a value for the given key.

func (TypedKVStore[T]) List added in v0.0.3

func (s TypedKVStore[T]) List(ctx context.Context, prefix string) ([]string, error)

List returns all keys, optionally filtered by prefix.

func (TypedKVStore[T]) Set added in v0.0.3

func (s TypedKVStore[T]) Set(ctx context.Context, key string, value T) error

Set inserts or overwrites a value for the given key.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL