webhook

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2026 License: MIT Imports: 26 Imported by: 0

Documentation

Overview

Package webhook is the outbound-webhook battery for GoFastr.

Apps publish events; subscribers are HTTP endpoints that receive signed POST requests. Failed deliveries retry with exponential backoff and are parked in a dead-letter list once the attempt budget is exhausted.

The core package (Manager, Store, SignWithTimestamp/VerifyTimestamped) is dependency-free beyond the standard library. The optional Bridge helpers in bridge.go pull in framework/event so internal Emit/EmitAsync calls can auto-fan out to subscribers; if you don't use the bridge, that dependency is dead code.

See docs/webhooks.md for the wiring guide.

Index

Constants

View Source
const DefaultMaxResponseBodyBytes int64 = 64 << 10 // 64 KiB

DefaultMaxResponseBodyBytes is the per-attempt response-body cap when Options.MaxResponseBodyBytes is unset. A malicious receiver returning gigabytes of body would otherwise exhaust manager memory.

View Source
const DefaultTimestampTolerance = 5 * time.Minute

DefaultTimestampTolerance is the suggested replay window for VerifyTimestamped — wide enough to cover modest clock skew, narrow enough that a captured signature decays quickly.

View Source
const MaxPayloadBytes = 1 << 20 // 1 MiB

MaxPayloadBytes caps webhook payload size. Larger payloads are rejected at Publish time to prevent unbounded queue/memory growth.

View Source
const SignatureHeader = "X-GoFastr-Signature"

SignatureHeader is the response/request header that carries the payload signature.

Variables

This section is empty.

Functions

func Bridge

func Bridge(bus *event.EventBus, mgr *Manager, events ...string) (cancel func())

Bridge subscribes the Manager to every named event on the bus so that an Emit/EmitAsync automatically fans out to matching webhook subscribers.

The returned cancel function detaches every subscription at once — call it from your shutdown path before stopping the Manager so no in-flight Emit lands after the workers have exited.

If events is empty (the variadic), the bridge defaults to the three entity lifecycle events (entity.created / entity.updated / entity.deleted). Pass the explicit list when your app emits a custom taxonomy.

The event's Data field is marshalled to JSON for the webhook payload. Marshal and publish failures are silently dropped by default; install callbacks via WithBridgeMarshalError / WithBridgePublishError to observe them.

func BridgeWithOptions

func BridgeWithOptions(bus *event.EventBus, mgr *Manager, events []string, opts ...BridgeOption) (cancel func())

BridgeWithOptions is the option-aware variant of Bridge. The events list slot is explicit (use nil/empty for the lifecycle default) so the trailing variadic stays available for BridgeOption.

func SignWithTimestamp

func SignWithTimestamp(secret string, unixTimestamp int64, body []byte) string

SignWithTimestamp returns a `t=<unix>,v1=<hex-hmac>` signature header where the HMAC covers `<unix>.<body>`. Binding the timestamp into the signed material lets receivers reject replays via VerifyTimestamped.

Pass time.Now().Unix() for fresh signatures; tests may pin to a specific instant.

func VerifyTimestamped

func VerifyTimestamped(secret, header string, body []byte, tolerance time.Duration) bool

VerifyTimestamped accepts a header in the `t=<unix>,v1=<hex>` form and a tolerance window. Returns true iff:

  • the header parses cleanly
  • |now - ts| <= tolerance
  • the HMAC over <ts>.<body> equals v1

`now` is captured at call time so receivers don't need a clock arg.

tolerance MUST be positive. A non-positive tolerance is treated as a usage error and always rejects — otherwise a caller that accidentally passed 0 (zero value of a forgotten config field) would silently disable the replay check. Use DefaultTimestampTolerance for the suggested default.

An empty secret always rejects.

Types

type BridgeOption

type BridgeOption func(*bridgeOpts)

BridgeOption configures the auto-bridge from a framework event bus to a webhook Manager.

func WithBridgeMarshalError

func WithBridgeMarshalError(fn func(eventType string, err error)) BridgeOption

WithBridgeMarshalError installs a callback invoked when json.Marshal fails on an event payload. The default is a silent drop (so a buggy emitter cannot take the bus down); supply a callback to surface the failure to logs, metrics, or an alerting hook.

func WithBridgePublishError

func WithBridgePublishError(fn func(eventType string, err error)) BridgeOption

WithBridgePublishError installs a callback invoked when Manager.Publish itself returns an error (typically a store write failure). Same default-silent rationale as marshal error.

type Delivery

type Delivery struct {
	ID            string
	SubscriberID  string
	Event         string
	Payload       []byte
	Attempts      int
	Status        DeliveryStatus
	LastError     string
	NextAttemptAt time.Time
	CreatedAt     time.Time
	UpdatedAt     time.Time
}

Delivery is one attempt-set against a Subscriber for a single event.

type DeliveryStatus

type DeliveryStatus string

DeliveryStatus is the lifecycle state of a single delivery attempt.

const (
	StatusPending DeliveryStatus = "pending"
	StatusSuccess DeliveryStatus = "success"
	StatusFailed  DeliveryStatus = "failed"
	StatusDead    DeliveryStatus = "dead"
)

type LeasedStore

type LeasedStore interface {
	Store
	ClaimDueDeliveries(ctx context.Context, now time.Time, limit int, leasePeriod time.Duration) ([]Delivery, error)
}

LeasedStore is the optional Store upgrade for multi-instance deployments. Implementations atomically claim up to `limit` pending rows that are due at `now` and push their next_attempt_at to `now + leasePeriod`, so concurrent workers see those rows as not yet due and skip them.

If the worker crashes mid-attempt, the lease expires and another worker re-claims. Pick a leasePeriod larger than your worst-case handler latency.

The Manager probes for this interface at tick time; stores that don't implement it fall back to the plain DueDeliveries + best-effort behaviour, which is fine for single-instance setups.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager owns the worker goroutine and is the entry point for Publish and subscriber management.

func New

func New(s Store, opts Options) *Manager

New constructs a Manager with defaults applied.

func (*Manager) DeadDeliveries

func (m *Manager) DeadDeliveries(ctx context.Context, limit int) ([]Delivery, error)

DeadDeliveries lists terminally-failed deliveries when the store supports it.

func (*Manager) Publish

func (m *Manager) Publish(ctx context.Context, event string, payload []byte) (int, error)

Publish fans the event out to every active matching subscriber.

func (*Manager) Replay

func (m *Manager) Replay(ctx context.Context, id string) error

Replay re-queues a dead delivery when the store supports it. The worker (Start) picks it up on the next tick.

func (*Manager) Start

func (m *Manager) Start()

Start launches the worker goroutine. Safe to call once per Manager; subsequent calls are no-ops.

func (*Manager) Stop

func (m *Manager) Stop(ctx context.Context) error

Stop signals the worker to exit, cancels any in-flight HTTP attempt via the worker's derived context, then waits for the worker to return. If ctx fires first, returns ctx.Err — but the cancellation of in-flight HTTP requests has already been signaled. Safe to call more than once.

func (*Manager) Subscribe

func (m *Manager) Subscribe(ctx context.Context, s Subscriber) (Subscriber, error)

Subscribe registers (or replaces) a Subscriber. The ID is assigned when empty. The URL must point at a public endpoint unless Options.AllowPrivateNetworks is set; otherwise the call returns an SSRF error.

Active defaults to true for the zero-value struct (the common case "register and start delivering"). To register a paused subscriber, set Paused=true and Active will be forced false.

func (*Manager) Subscribers

func (m *Manager) Subscribers(ctx context.Context) ([]Subscriber, error)

Subscribers returns every registered Subscriber.

func (*Manager) Unsubscribe

func (m *Manager) Unsubscribe(ctx context.Context, id string) error

Unsubscribe removes a Subscriber. Missing IDs are not an error.

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore is the bundled in-process Store. It keeps subscribers and deliveries in maps protected by a single mutex. Suitable for single-instance apps and tests; nothing is persistent.

func NewMemoryStore

func NewMemoryStore() *MemoryStore

NewMemoryStore creates an empty store.

func (*MemoryStore) AddDelivery

func (m *MemoryStore) AddDelivery(_ context.Context, d Delivery) error

func (*MemoryStore) AddSubscriber

func (m *MemoryStore) AddSubscriber(_ context.Context, s Subscriber) error

AddSubscriber stores s, replacing any existing record with the same ID.

func (*MemoryStore) ClaimDueDeliveries

func (m *MemoryStore) ClaimDueDeliveries(_ context.Context, now time.Time, limit int, leasePeriod time.Duration) ([]Delivery, error)

ClaimDueDeliveries reserves rows under the store's write lock and pushes their NextAttemptAt to now+leasePeriod so a concurrent claimer sees them as not-yet-due. Single-process by design — the memory store can't span instances, but exposing the same interface keeps Manager wiring uniform across store backends.

func (*MemoryStore) DeleteSubscriber

func (m *MemoryStore) DeleteSubscriber(_ context.Context, id string) error

func (*MemoryStore) DueDeliveries

func (m *MemoryStore) DueDeliveries(_ context.Context, now time.Time, limit int) ([]Delivery, error)

func (*MemoryStore) GetSubscriber

func (m *MemoryStore) GetSubscriber(_ context.Context, id string) (*Subscriber, error)

GetSubscriber returns (nil, nil) when the ID is unknown.

func (*MemoryStore) ListDeadDeliveries

func (m *MemoryStore) ListDeadDeliveries(_ context.Context, limit int) ([]Delivery, error)

ListDeadDeliveries implements ReplayableStore: terminally-failed (StatusDead) deliveries, newest-first.

func (*MemoryStore) ListDeliveries

func (m *MemoryStore) ListDeliveries(_ context.Context, subscriberID string, limit int) ([]Delivery, error)

func (*MemoryStore) ListSubscribers

func (m *MemoryStore) ListSubscribers(_ context.Context) ([]Subscriber, error)

func (*MemoryStore) ResetDelivery

func (m *MemoryStore) ResetDelivery(_ context.Context, id string) error

ResetDelivery implements ReplayableStore: returns a dead delivery to pending (attempts + error cleared, due now). Only StatusDead rows are touched, so resetting a non-dead/unknown delivery is a no-op.

func (*MemoryStore) UpdateDelivery

func (m *MemoryStore) UpdateDelivery(_ context.Context, d Delivery) error

type NoopSecretCodec

type NoopSecretCodec struct{}

NoopSecretCodec is the default — it stores secrets as-is. Use it only when the database is itself encrypted at rest or the threat model doesn't include a DB-snapshot attacker.

func (NoopSecretCodec) Decode

func (NoopSecretCodec) Decode(e string) (string, error)

Decode returns encoded unchanged.

func (NoopSecretCodec) Encode

func (NoopSecretCodec) Encode(p string) (string, error)

Encode returns plaintext unchanged.

type Options

type Options struct {
	MaxAttempts          int
	Backoff              []time.Duration
	HTTPClient           *http.Client
	PollInterval         time.Duration
	MaxResponseBodyBytes int64
	AllowPrivateNetworks bool
	SignatureTolerance   time.Duration
	LeasePeriod          time.Duration
}

Options configures the Manager.

MaxAttempts caps how many times a delivery will be retried before it becomes "dead". Default 6 (≈3h with the default backoff).

Backoff is the wait between attempts. Position [i-1] selects the wait after attempt i. The last value is reused for any further attempts. Default: 30s, 1m, 5m, 15m, 1h, 3h.

HTTPClient is the client used for outbound POSTs. Default has a 10s per-request timeout, no redirect following, and a dial-time SSRF guard (net.Dialer.Control re-checks the resolved IP at connect time to defeat DNS rebinding) unless AllowPrivateNetworks is set. A caller-supplied client is used as-is — it gets no dial-time guard.

PollInterval controls how often the worker looks for due deliveries. Default 1 second.

MaxResponseBodyBytes caps bytes read from each subscriber response. Default 64 KiB. Set < 0 to disable the cap (not recommended).

AllowPrivateNetworks opts-out of the SSRF guard that rejects subscriber URLs targeting RFC1918, loopback, link-local, or cloud metadata endpoints. Default false — required production posture. Tests and explicit dev wiring may set true.

SignatureTolerance bounds how far receivers may consider the signed timestamp drifting from "now" when verifying. The sender embeds the timestamp into the signature; this field is purely documentation here — receivers pass it to VerifyTimestamped.

LeasePeriod controls how long a claimed delivery is hidden from other workers when the store implements LeasedStore. Must be longer than the worst-case handler latency; default 30s.

type ReplayableStore

type ReplayableStore interface {
	Store
	// ListDeadDeliveries returns up to limit terminally-failed (StatusDead)
	// deliveries, newest-first.
	ListDeadDeliveries(ctx context.Context, limit int) ([]Delivery, error)
	// ResetDelivery returns a dead delivery to pending so the worker retries
	// it (attempts + last error cleared, due immediately). Idempotent: it MUST
	// only touch StatusDead rows, so resetting a non-dead/unknown delivery is a
	// no-op — never resurrecting an in-flight or already-delivered one.
	ResetDelivery(ctx context.Context, id string) error
}

ReplayableStore is the optional Store capability for dead-letter inspection and replay. Both shipped stores (SQLStore, MemoryStore) implement it; Manager.DeadDeliveries / Manager.Replay probe for it so admin tooling can surface a replay action only when the backend supports it.

type SQLOption

type SQLOption func(*SQLStore)

SQLOption configures SQLStore.

func WithSQLAllowPlaintext

func WithSQLAllowPlaintext() SQLOption

WithSQLAllowPlaintext opts the SQLStore into plaintext secret storage using NoopSecretCodec. Without this (or an explicit WithSQLSecretCodec) NewSQLStore returns an error rather than silently storing subscriber secrets in cleartext.

func WithSQLDeliveriesTable

func WithSQLDeliveriesTable(name string) SQLOption

WithSQLDeliveriesTable overrides the default "webhook_deliveries" table name.

func WithSQLSecretCodec

func WithSQLSecretCodec(c SecretCodec) SQLOption

WithSQLSecretCodec installs a SecretCodec that encrypts subscriber secrets at write time and decrypts them at read time.

Use NewAESGCMSecretCodec to wrap a 16/24/32-byte key. Callers that genuinely want plaintext storage (DB encrypted at rest, threat model excludes a DB-snapshot attacker) must use WithSQLAllowPlaintext explicitly — there is no plaintext default.

func WithSQLSubscribersTable

func WithSQLSubscribersTable(name string) SQLOption

WithSQLSubscribersTable overrides the default "webhook_subscribers" table name.

type SQLStore

type SQLStore struct {
	// contains filtered or unexported fields
}

SQLStore is a SQL-backed Store. Subscribers and deliveries each live in a single table; allow lists and payloads are persisted in place rather than normalised, since fan-out cardinality is small and replay needs the body intact.

Dialect is detected at construction (sqlite vs postgres).

Schemas:

webhook_subscribers(
    id       TEXT PRIMARY KEY,
    url      TEXT NOT NULL,
    secret   TEXT NOT NULL,
    events   TEXT NOT NULL,   -- JSON array
    active   INTEGER NOT NULL,
    created  TIMESTAMP NOT NULL
)

webhook_deliveries(
    id              TEXT PRIMARY KEY,
    subscriber_id   TEXT NOT NULL,
    event           TEXT NOT NULL,
    payload         BLOB NOT NULL,
    attempts        INTEGER NOT NULL,
    status          TEXT NOT NULL,
    last_error      TEXT NOT NULL,
    next_attempt_at TIMESTAMP,
    created_at      TIMESTAMP NOT NULL,
    updated_at      TIMESTAMP NOT NULL
)

func NewSQLStore

func NewSQLStore(db *sql.DB, opts ...SQLOption) (*SQLStore, error)

NewSQLStore constructs a SQL-backed Store and ensures both tables exist.

func (*SQLStore) AddDelivery

func (s *SQLStore) AddDelivery(ctx context.Context, d Delivery) error

func (*SQLStore) AddSubscriber

func (s *SQLStore) AddSubscriber(ctx context.Context, sub Subscriber) error

func (*SQLStore) ClaimDueDeliveries

func (s *SQLStore) ClaimDueDeliveries(ctx context.Context, now time.Time, limit int, leasePeriod time.Duration) ([]Delivery, error)

ClaimDueDeliveries atomically reserves up to `limit` pending rows whose next_attempt_at <= now, pushing them to now+leasePeriod so concurrent workers skip them.

Postgres uses FOR UPDATE SKIP LOCKED inside a CTE so the inner SELECT only sees uncontested rows; SQLite serializes writers via BEGIN IMMEDIATE so the SELECT-then-UPDATE sequence inside one tx is safely exclusive.

func (*SQLStore) DeleteSubscriber

func (s *SQLStore) DeleteSubscriber(ctx context.Context, id string) error

func (*SQLStore) DueDeliveries

func (s *SQLStore) DueDeliveries(ctx context.Context, now time.Time, limit int) ([]Delivery, error)

func (*SQLStore) GetSubscriber

func (s *SQLStore) GetSubscriber(ctx context.Context, id string) (*Subscriber, error)

func (*SQLStore) ListDeadDeliveries

func (s *SQLStore) ListDeadDeliveries(ctx context.Context, limit int) ([]Delivery, error)

ListDeadDeliveries implements ReplayableStore: terminally-failed (StatusDead) deliveries, newest-first.

func (*SQLStore) ListDeliveries

func (s *SQLStore) ListDeliveries(ctx context.Context, subscriberID string, limit int) ([]Delivery, error)

func (*SQLStore) ListSubscribers

func (s *SQLStore) ListSubscribers(ctx context.Context) ([]Subscriber, error)

func (*SQLStore) ResetDelivery

func (s *SQLStore) ResetDelivery(ctx context.Context, id string) error

ResetDelivery implements ReplayableStore: returns a dead delivery to pending (attempts + error cleared, due now). The `AND status = dead` clause makes it idempotent — resetting a non-dead/unknown delivery matches no row and is a no-op, so it can never resurrect an in-flight or delivered one.

func (*SQLStore) UpdateDelivery

func (s *SQLStore) UpdateDelivery(ctx context.Context, d Delivery) error

type SecretCodec

type SecretCodec interface {
	Encode(plaintext string) (string, error)
	Decode(encoded string) (string, error)
}

SecretCodec encodes and decodes subscriber secrets when they're persisted by a Store. Implementations must be safe for concurrent use.

Encode is called at write time (AddSubscriber); Decode is called at read time (GetSubscriber / ListSubscribers). The codec is purely a storage concern — the Manager only ever sees plaintext.

func NewAESGCMSecretCodec

func NewAESGCMSecretCodec(key []byte) (SecretCodec, error)

NewAESGCMSecretCodec constructs a SecretCodec backed by AES-GCM using the supplied key. Key length must be 16, 24, or 32 bytes (AES-128 / AES-192 / AES-256).

Treat the key as a critical secret: rotate it by re-encrypting subscribers through a transitional codec (decode-from-old, encode-to-new); the package does not bundle a key-ring abstraction to keep this primitive small.

type Store

type Store interface {
	AddSubscriber(ctx context.Context, s Subscriber) error
	GetSubscriber(ctx context.Context, id string) (*Subscriber, error)
	ListSubscribers(ctx context.Context) ([]Subscriber, error)
	DeleteSubscriber(ctx context.Context, id string) error

	AddDelivery(ctx context.Context, d Delivery) error
	UpdateDelivery(ctx context.Context, d Delivery) error
	ListDeliveries(ctx context.Context, subscriberID string, limit int) ([]Delivery, error)
	DueDeliveries(ctx context.Context, now time.Time, limit int) ([]Delivery, error)
}

Store is the persistence interface.

type Subscriber

type Subscriber struct {
	ID      string
	URL     string
	Secret  string
	Events  []string
	Active  bool
	Paused  bool // explicit opt-out; honored by Subscribe so callers can register paused
	Created time.Time
}

Subscriber is a registered HTTP endpoint that wants to receive signed POSTs for the listed events.

Events accept simple glob patterns: `*` matches a single segment, `**` matches one-or-more segments. An entry of `*` alone (or `**` alone) matches every event.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL