Documentation
¶
Overview ¶
Package webhook is the outbound-webhook battery for GoFastr.
Apps publish events; subscribers are HTTP endpoints that receive signed POST requests. Failed deliveries retry with exponential backoff and are parked in a dead-letter list once the attempt budget is exhausted.
The core package (Manager, Store, SignWithTimestamp/VerifyTimestamped) is dependency-free beyond the standard library. The optional Bridge helpers in bridge.go pull in framework/event so internal Emit/EmitAsync calls can auto-fan out to subscribers; if you don't use the bridge, that dependency is dead code.
See docs/webhooks.md for the wiring guide.
Index ¶
- Constants
- func Bridge(bus *event.EventBus, mgr *Manager, events ...string) (cancel func())
- func BridgeWithOptions(bus *event.EventBus, mgr *Manager, events []string, opts ...BridgeOption) (cancel func())
- func SignWithTimestamp(secret string, unixTimestamp int64, body []byte) string
- func VerifyTimestamped(secret, header string, body []byte, tolerance time.Duration) bool
- type BridgeOption
- type Delivery
- type DeliveryStatus
- type LeasedStore
- type Manager
- func (m *Manager) DeadDeliveries(ctx context.Context, limit int) ([]Delivery, error)
- func (m *Manager) Publish(ctx context.Context, event string, payload []byte) (int, error)
- func (m *Manager) Replay(ctx context.Context, id string) error
- func (m *Manager) Start()
- func (m *Manager) Stop(ctx context.Context) error
- func (m *Manager) Subscribe(ctx context.Context, s Subscriber) (Subscriber, error)
- func (m *Manager) Subscribers(ctx context.Context) ([]Subscriber, error)
- func (m *Manager) Unsubscribe(ctx context.Context, id string) error
- type MemoryStore
- func (m *MemoryStore) AddDelivery(_ context.Context, d Delivery) error
- func (m *MemoryStore) AddSubscriber(_ context.Context, s Subscriber) error
- func (m *MemoryStore) ClaimDueDeliveries(_ context.Context, now time.Time, limit int, leasePeriod time.Duration) ([]Delivery, error)
- func (m *MemoryStore) DeleteSubscriber(_ context.Context, id string) error
- func (m *MemoryStore) DueDeliveries(_ context.Context, now time.Time, limit int) ([]Delivery, error)
- func (m *MemoryStore) GetSubscriber(_ context.Context, id string) (*Subscriber, error)
- func (m *MemoryStore) ListDeadDeliveries(_ context.Context, limit int) ([]Delivery, error)
- func (m *MemoryStore) ListDeliveries(_ context.Context, subscriberID string, limit int) ([]Delivery, error)
- func (m *MemoryStore) ListSubscribers(_ context.Context) ([]Subscriber, error)
- func (m *MemoryStore) ResetDelivery(_ context.Context, id string) error
- func (m *MemoryStore) UpdateDelivery(_ context.Context, d Delivery) error
- type NoopSecretCodec
- type Options
- type ReplayableStore
- type SQLOption
- type SQLStore
- func (s *SQLStore) AddDelivery(ctx context.Context, d Delivery) error
- func (s *SQLStore) AddSubscriber(ctx context.Context, sub Subscriber) error
- func (s *SQLStore) ClaimDueDeliveries(ctx context.Context, now time.Time, limit int, leasePeriod time.Duration) ([]Delivery, error)
- func (s *SQLStore) DeleteSubscriber(ctx context.Context, id string) error
- func (s *SQLStore) DueDeliveries(ctx context.Context, now time.Time, limit int) ([]Delivery, error)
- func (s *SQLStore) GetSubscriber(ctx context.Context, id string) (*Subscriber, error)
- func (s *SQLStore) ListDeadDeliveries(ctx context.Context, limit int) ([]Delivery, error)
- func (s *SQLStore) ListDeliveries(ctx context.Context, subscriberID string, limit int) ([]Delivery, error)
- func (s *SQLStore) ListSubscribers(ctx context.Context) ([]Subscriber, error)
- func (s *SQLStore) ResetDelivery(ctx context.Context, id string) error
- func (s *SQLStore) UpdateDelivery(ctx context.Context, d Delivery) error
- type SecretCodec
- type Store
- type Subscriber
Constants ¶
const DefaultMaxResponseBodyBytes int64 = 64 << 10 // 64 KiB
DefaultMaxResponseBodyBytes is the per-attempt response-body cap when Options.MaxResponseBodyBytes is unset. A malicious receiver returning gigabytes of body would otherwise exhaust manager memory.
const DefaultTimestampTolerance = 5 * time.Minute
DefaultTimestampTolerance is the suggested replay window for VerifyTimestamped — wide enough to cover modest clock skew, narrow enough that a captured signature decays quickly.
const MaxPayloadBytes = 1 << 20 // 1 MiB
MaxPayloadBytes caps webhook payload size. Larger payloads are rejected at Publish time to prevent unbounded queue/memory growth.
const SignatureHeader = "X-GoFastr-Signature"
SignatureHeader is the response/request header that carries the payload signature.
Variables ¶
This section is empty.
Functions ¶
func Bridge ¶
Bridge subscribes the Manager to every named event on the bus so that an Emit/EmitAsync automatically fans out to matching webhook subscribers.
The returned cancel function detaches every subscription at once — call it from your shutdown path before stopping the Manager so no in-flight Emit lands after the workers have exited.
If events is empty (the variadic), the bridge defaults to the three entity lifecycle events (entity.created / entity.updated / entity.deleted). Pass the explicit list when your app emits a custom taxonomy.
The event's Data field is marshalled to JSON for the webhook payload. Marshal and publish failures are silently dropped by default; install callbacks via WithBridgeMarshalError / WithBridgePublishError to observe them.
func BridgeWithOptions ¶
func BridgeWithOptions(bus *event.EventBus, mgr *Manager, events []string, opts ...BridgeOption) (cancel func())
BridgeWithOptions is the option-aware variant of Bridge. The events list slot is explicit (use nil/empty for the lifecycle default) so the trailing variadic stays available for BridgeOption.
func SignWithTimestamp ¶
SignWithTimestamp returns a `t=<unix>,v1=<hex-hmac>` signature header where the HMAC covers `<unix>.<body>`. Binding the timestamp into the signed material lets receivers reject replays via VerifyTimestamped.
Pass time.Now().Unix() for fresh signatures; tests may pin to a specific instant.
func VerifyTimestamped ¶
VerifyTimestamped accepts a header in the `t=<unix>,v1=<hex>` form and a tolerance window. Returns true iff:
- the header parses cleanly
- |now - ts| <= tolerance
- the HMAC over <ts>.<body> equals v1
`now` is captured at call time so receivers don't need a clock arg.
tolerance MUST be positive. A non-positive tolerance is treated as a usage error and always rejects — otherwise a caller that accidentally passed 0 (zero value of a forgotten config field) would silently disable the replay check. Use DefaultTimestampTolerance for the suggested default.
An empty secret always rejects.
Types ¶
type BridgeOption ¶
type BridgeOption func(*bridgeOpts)
BridgeOption configures the auto-bridge from a framework event bus to a webhook Manager.
func WithBridgeMarshalError ¶
func WithBridgeMarshalError(fn func(eventType string, err error)) BridgeOption
WithBridgeMarshalError installs a callback invoked when json.Marshal fails on an event payload. The default is a silent drop (so a buggy emitter cannot take the bus down); supply a callback to surface the failure to logs, metrics, or an alerting hook.
func WithBridgePublishError ¶
func WithBridgePublishError(fn func(eventType string, err error)) BridgeOption
WithBridgePublishError installs a callback invoked when Manager.Publish itself returns an error (typically a store write failure). Same default-silent rationale as marshal error.
type Delivery ¶
type Delivery struct {
ID string
SubscriberID string
Event string
Payload []byte
Attempts int
Status DeliveryStatus
LastError string
NextAttemptAt time.Time
CreatedAt time.Time
UpdatedAt time.Time
}
Delivery is one attempt-set against a Subscriber for a single event.
type DeliveryStatus ¶
type DeliveryStatus string
DeliveryStatus is the lifecycle state of a single delivery attempt.
const ( StatusPending DeliveryStatus = "pending" StatusSuccess DeliveryStatus = "success" StatusFailed DeliveryStatus = "failed" StatusDead DeliveryStatus = "dead" )
type LeasedStore ¶
type LeasedStore interface {
Store
ClaimDueDeliveries(ctx context.Context, now time.Time, limit int, leasePeriod time.Duration) ([]Delivery, error)
}
LeasedStore is the optional Store upgrade for multi-instance deployments. Implementations atomically claim up to `limit` pending rows that are due at `now` and push their next_attempt_at to `now + leasePeriod`, so concurrent workers see those rows as not yet due and skip them.
If the worker crashes mid-attempt, the lease expires and another worker re-claims. Pick a leasePeriod larger than your worst-case handler latency.
The Manager probes for this interface at tick time; stores that don't implement it fall back to the plain DueDeliveries + best-effort behaviour, which is fine for single-instance setups.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager owns the worker goroutine and is the entry point for Publish and subscriber management.
func (*Manager) DeadDeliveries ¶
DeadDeliveries lists terminally-failed deliveries when the store supports it.
func (*Manager) Replay ¶
Replay re-queues a dead delivery when the store supports it. The worker (Start) picks it up on the next tick.
func (*Manager) Start ¶
func (m *Manager) Start()
Start launches the worker goroutine. Safe to call once per Manager; subsequent calls are no-ops.
func (*Manager) Stop ¶
Stop signals the worker to exit, cancels any in-flight HTTP attempt via the worker's derived context, then waits for the worker to return. If ctx fires first, returns ctx.Err — but the cancellation of in-flight HTTP requests has already been signaled. Safe to call more than once.
func (*Manager) Subscribe ¶
func (m *Manager) Subscribe(ctx context.Context, s Subscriber) (Subscriber, error)
Subscribe registers (or replaces) a Subscriber. The ID is assigned when empty. The URL must point at a public endpoint unless Options.AllowPrivateNetworks is set; otherwise the call returns an SSRF error.
Active defaults to true for the zero-value struct (the common case "register and start delivering"). To register a paused subscriber, set Paused=true and Active will be forced false.
func (*Manager) Subscribers ¶
func (m *Manager) Subscribers(ctx context.Context) ([]Subscriber, error)
Subscribers returns every registered Subscriber.
type MemoryStore ¶
type MemoryStore struct {
// contains filtered or unexported fields
}
MemoryStore is the bundled in-process Store. It keeps subscribers and deliveries in maps protected by a single mutex. Suitable for single-instance apps and tests; nothing is persistent.
func (*MemoryStore) AddDelivery ¶
func (m *MemoryStore) AddDelivery(_ context.Context, d Delivery) error
func (*MemoryStore) AddSubscriber ¶
func (m *MemoryStore) AddSubscriber(_ context.Context, s Subscriber) error
AddSubscriber stores s, replacing any existing record with the same ID.
func (*MemoryStore) ClaimDueDeliveries ¶
func (m *MemoryStore) ClaimDueDeliveries(_ context.Context, now time.Time, limit int, leasePeriod time.Duration) ([]Delivery, error)
ClaimDueDeliveries reserves rows under the store's write lock and pushes their NextAttemptAt to now+leasePeriod so a concurrent claimer sees them as not-yet-due. Single-process by design — the memory store can't span instances, but exposing the same interface keeps Manager wiring uniform across store backends.
func (*MemoryStore) DeleteSubscriber ¶
func (m *MemoryStore) DeleteSubscriber(_ context.Context, id string) error
func (*MemoryStore) DueDeliveries ¶
func (*MemoryStore) GetSubscriber ¶
func (m *MemoryStore) GetSubscriber(_ context.Context, id string) (*Subscriber, error)
GetSubscriber returns (nil, nil) when the ID is unknown.
func (*MemoryStore) ListDeadDeliveries ¶
ListDeadDeliveries implements ReplayableStore: terminally-failed (StatusDead) deliveries, newest-first.
func (*MemoryStore) ListDeliveries ¶
func (*MemoryStore) ListSubscribers ¶
func (m *MemoryStore) ListSubscribers(_ context.Context) ([]Subscriber, error)
func (*MemoryStore) ResetDelivery ¶
func (m *MemoryStore) ResetDelivery(_ context.Context, id string) error
ResetDelivery implements ReplayableStore: returns a dead delivery to pending (attempts + error cleared, due now). Only StatusDead rows are touched, so resetting a non-dead/unknown delivery is a no-op.
func (*MemoryStore) UpdateDelivery ¶
func (m *MemoryStore) UpdateDelivery(_ context.Context, d Delivery) error
type NoopSecretCodec ¶
type NoopSecretCodec struct{}
NoopSecretCodec is the default — it stores secrets as-is. Use it only when the database is itself encrypted at rest or the threat model doesn't include a DB-snapshot attacker.
type Options ¶
type Options struct {
MaxAttempts int
Backoff []time.Duration
HTTPClient *http.Client
PollInterval time.Duration
MaxResponseBodyBytes int64
AllowPrivateNetworks bool
SignatureTolerance time.Duration
LeasePeriod time.Duration
}
Options configures the Manager.
MaxAttempts caps how many times a delivery will be retried before it becomes "dead". Default 6 (≈3h with the default backoff).
Backoff is the wait between attempts. Position [i-1] selects the wait after attempt i. The last value is reused for any further attempts. Default: 30s, 1m, 5m, 15m, 1h, 3h.
HTTPClient is the client used for outbound POSTs. Default has a 10s per-request timeout, no redirect following, and a dial-time SSRF guard (net.Dialer.Control re-checks the resolved IP at connect time to defeat DNS rebinding) unless AllowPrivateNetworks is set. A caller-supplied client is used as-is — it gets no dial-time guard.
PollInterval controls how often the worker looks for due deliveries. Default 1 second.
MaxResponseBodyBytes caps bytes read from each subscriber response. Default 64 KiB. Set < 0 to disable the cap (not recommended).
AllowPrivateNetworks opts-out of the SSRF guard that rejects subscriber URLs targeting RFC1918, loopback, link-local, or cloud metadata endpoints. Default false — required production posture. Tests and explicit dev wiring may set true.
SignatureTolerance bounds how far receivers may consider the signed timestamp drifting from "now" when verifying. The sender embeds the timestamp into the signature; this field is purely documentation here — receivers pass it to VerifyTimestamped.
LeasePeriod controls how long a claimed delivery is hidden from other workers when the store implements LeasedStore. Must be longer than the worst-case handler latency; default 30s.
type ReplayableStore ¶
type ReplayableStore interface {
Store
// ListDeadDeliveries returns up to limit terminally-failed (StatusDead)
// deliveries, newest-first.
ListDeadDeliveries(ctx context.Context, limit int) ([]Delivery, error)
// ResetDelivery returns a dead delivery to pending so the worker retries
// it (attempts + last error cleared, due immediately). Idempotent: it MUST
// only touch StatusDead rows, so resetting a non-dead/unknown delivery is a
// no-op — never resurrecting an in-flight or already-delivered one.
ResetDelivery(ctx context.Context, id string) error
}
ReplayableStore is the optional Store capability for dead-letter inspection and replay. Both shipped stores (SQLStore, MemoryStore) implement it; Manager.DeadDeliveries / Manager.Replay probe for it so admin tooling can surface a replay action only when the backend supports it.
type SQLOption ¶
type SQLOption func(*SQLStore)
SQLOption configures SQLStore.
func WithSQLAllowPlaintext ¶
func WithSQLAllowPlaintext() SQLOption
WithSQLAllowPlaintext opts the SQLStore into plaintext secret storage using NoopSecretCodec. Without this (or an explicit WithSQLSecretCodec) NewSQLStore returns an error rather than silently storing subscriber secrets in cleartext.
func WithSQLDeliveriesTable ¶
WithSQLDeliveriesTable overrides the default "webhook_deliveries" table name.
func WithSQLSecretCodec ¶
func WithSQLSecretCodec(c SecretCodec) SQLOption
WithSQLSecretCodec installs a SecretCodec that encrypts subscriber secrets at write time and decrypts them at read time.
Use NewAESGCMSecretCodec to wrap a 16/24/32-byte key. Callers that genuinely want plaintext storage (DB encrypted at rest, threat model excludes a DB-snapshot attacker) must use WithSQLAllowPlaintext explicitly — there is no plaintext default.
func WithSQLSubscribersTable ¶
WithSQLSubscribersTable overrides the default "webhook_subscribers" table name.
type SQLStore ¶
type SQLStore struct {
// contains filtered or unexported fields
}
SQLStore is a SQL-backed Store. Subscribers and deliveries each live in a single table; allow lists and payloads are persisted in place rather than normalised, since fan-out cardinality is small and replay needs the body intact.
Dialect is detected at construction (sqlite vs postgres).
Schemas:
webhook_subscribers(
id TEXT PRIMARY KEY,
url TEXT NOT NULL,
secret TEXT NOT NULL,
events TEXT NOT NULL, -- JSON array
active INTEGER NOT NULL,
created TIMESTAMP NOT NULL
)
webhook_deliveries(
id TEXT PRIMARY KEY,
subscriber_id TEXT NOT NULL,
event TEXT NOT NULL,
payload BLOB NOT NULL,
attempts INTEGER NOT NULL,
status TEXT NOT NULL,
last_error TEXT NOT NULL,
next_attempt_at TIMESTAMP,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
)
func NewSQLStore ¶
NewSQLStore constructs a SQL-backed Store and ensures both tables exist.
func (*SQLStore) AddDelivery ¶
func (*SQLStore) AddSubscriber ¶
func (s *SQLStore) AddSubscriber(ctx context.Context, sub Subscriber) error
func (*SQLStore) ClaimDueDeliveries ¶
func (s *SQLStore) ClaimDueDeliveries(ctx context.Context, now time.Time, limit int, leasePeriod time.Duration) ([]Delivery, error)
ClaimDueDeliveries atomically reserves up to `limit` pending rows whose next_attempt_at <= now, pushing them to now+leasePeriod so concurrent workers skip them.
Postgres uses FOR UPDATE SKIP LOCKED inside a CTE so the inner SELECT only sees uncontested rows; SQLite serializes writers via BEGIN IMMEDIATE so the SELECT-then-UPDATE sequence inside one tx is safely exclusive.
func (*SQLStore) DeleteSubscriber ¶
func (*SQLStore) DueDeliveries ¶
func (*SQLStore) GetSubscriber ¶
func (*SQLStore) ListDeadDeliveries ¶
ListDeadDeliveries implements ReplayableStore: terminally-failed (StatusDead) deliveries, newest-first.
func (*SQLStore) ListDeliveries ¶
func (*SQLStore) ListSubscribers ¶
func (s *SQLStore) ListSubscribers(ctx context.Context) ([]Subscriber, error)
func (*SQLStore) ResetDelivery ¶
ResetDelivery implements ReplayableStore: returns a dead delivery to pending (attempts + error cleared, due now). The `AND status = dead` clause makes it idempotent — resetting a non-dead/unknown delivery matches no row and is a no-op, so it can never resurrect an in-flight or delivered one.
type SecretCodec ¶
type SecretCodec interface {
Encode(plaintext string) (string, error)
Decode(encoded string) (string, error)
}
SecretCodec encodes and decodes subscriber secrets when they're persisted by a Store. Implementations must be safe for concurrent use.
Encode is called at write time (AddSubscriber); Decode is called at read time (GetSubscriber / ListSubscribers). The codec is purely a storage concern — the Manager only ever sees plaintext.
func NewAESGCMSecretCodec ¶
func NewAESGCMSecretCodec(key []byte) (SecretCodec, error)
NewAESGCMSecretCodec constructs a SecretCodec backed by AES-GCM using the supplied key. Key length must be 16, 24, or 32 bytes (AES-128 / AES-192 / AES-256).
Treat the key as a critical secret: rotate it by re-encrypting subscribers through a transitional codec (decode-from-old, encode-to-new); the package does not bundle a key-ring abstraction to keep this primitive small.
type Store ¶
type Store interface {
AddSubscriber(ctx context.Context, s Subscriber) error
GetSubscriber(ctx context.Context, id string) (*Subscriber, error)
ListSubscribers(ctx context.Context) ([]Subscriber, error)
DeleteSubscriber(ctx context.Context, id string) error
AddDelivery(ctx context.Context, d Delivery) error
UpdateDelivery(ctx context.Context, d Delivery) error
ListDeliveries(ctx context.Context, subscriberID string, limit int) ([]Delivery, error)
DueDeliveries(ctx context.Context, now time.Time, limit int) ([]Delivery, error)
}
Store is the persistence interface.
type Subscriber ¶
type Subscriber struct {
ID string
URL string
Secret string
Events []string
Active bool
Paused bool // explicit opt-out; honored by Subscribe so callers can register paused
Created time.Time
}
Subscriber is a registered HTTP endpoint that wants to receive signed POSTs for the listed events.
Events accept simple glob patterns: `*` matches a single segment, `**` matches one-or-more segments. An entry of `*` alone (or `**` alone) matches every event.