outbox

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package outbox implements the transactional outbox pattern for MIDAS.

The outbox is a durable staging table written in the same database transaction as the domain state change that produced the event. A separate dispatcher (not part of this package) reads unpublished rows and delivers them to downstream consumers, then marks them published. This decouples reliable event delivery from the evaluation hot path without introducing distributed transactions.

Audit log and outbox are distinct concerns:

  • Audit events are hash-chained, append-only, and governance records.
  • Outbox events are routing envelopes for downstream integration; they carry a JSON payload and are marked published once delivered.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrEmptyEventType is returned when EventType is blank.
	ErrEmptyEventType = errors.New("outbox: event_type must not be empty")

	// ErrEmptyAggregateType is returned when AggregateType is blank.
	ErrEmptyAggregateType = errors.New("outbox: aggregate_type must not be empty")

	// ErrEmptyAggregateID is returned when AggregateID is blank.
	ErrEmptyAggregateID = errors.New("outbox: aggregate_id must not be empty")

	// ErrEmptyTopic is returned when Topic is blank.
	ErrEmptyTopic = errors.New("outbox: topic must not be empty")

	// ErrInvalidPayload is returned when the payload is not valid JSON.
	ErrInvalidPayload = errors.New("outbox: payload must be valid JSON")
)

Sentinel errors returned by New.

Functions

func BuildDecisionCompletedEvent

func BuildDecisionCompletedEvent(
	envelopeID, requestSource, requestID,
	surfaceID, agentID,
	outcome, reasonCode string,
) (json.RawMessage, error)

BuildDecisionCompletedEvent constructs the payload for EventDecisionCompleted. All arguments are required; empty strings are accepted for fields that may be unavailable at construction time (e.g. surfaceID before surface resolution).

func BuildDecisionEnvelopeClosedEvent added in v1.0.2

func BuildDecisionEnvelopeClosedEvent(
	envelopeID, requestSource, requestID, finalOutcome string,
	closedAt time.Time,
	review *DecisionEnvelopeClosedReview,
) (json.RawMessage, error)

BuildDecisionEnvelopeClosedEvent constructs the full external event envelope for EventDecisionEnvelopeClosed. closedAt must be sourced from the envelope's ClosedAt field — the authoritative time the envelope transitioned to closed. review is nil for direct-close paths and non-nil for post-escalation-review closes.

func BuildDecisionEscalatedEvent

func BuildDecisionEscalatedEvent(
	envelopeID, requestSource, requestID,
	surfaceID, agentID,
	reasonCode string,
) (json.RawMessage, error)

BuildDecisionEscalatedEvent constructs the payload for EventDecisionEscalated.

func BuildDecisionOutcomeRecordedEvent added in v1.0.2

func BuildDecisionOutcomeRecordedEvent(
	envelopeID, requestSource, requestID,
	surfaceID, agentID,
	outcome, reasonCode string,
	occurredAt time.Time,
) (json.RawMessage, error)

BuildDecisionOutcomeRecordedEvent constructs the full external event envelope for EventDecisionOutcomeRecorded. occurredAt is the wall-clock time the evaluation outcome was produced; it is formatted as RFC3339Nano.

func BuildDecisionReviewResolvedEvent

func BuildDecisionReviewResolvedEvent(
	envelopeID, requestSource, requestID,
	decision, reviewerID string,
) (json.RawMessage, error)

BuildDecisionReviewResolvedEvent constructs the payload for EventDecisionReviewResolved.

func BuildGrantReinstatedEvent

func BuildGrantReinstatedEvent(grantID, agentID, profileID, reinstatedBy string) (json.RawMessage, error)

BuildGrantReinstatedEvent constructs the payload for EventGrantReinstated.

func BuildGrantRevokedEvent

func BuildGrantRevokedEvent(grantID, agentID, profileID, revokedBy, reason string) (json.RawMessage, error)

BuildGrantRevokedEvent constructs the payload for EventGrantRevoked.

func BuildGrantSuspendedEvent

func BuildGrantSuspendedEvent(grantID, agentID, profileID, suspendedBy, reason string) (json.RawMessage, error)

BuildGrantSuspendedEvent constructs the payload for EventGrantSuspended.

func BuildProfileApprovedEvent

func BuildProfileApprovedEvent(profileID, surfaceID, approvedBy string) (json.RawMessage, error)

BuildProfileApprovedEvent constructs the payload for EventProfileApproved.

func BuildProfileDeprecatedEvent

func BuildProfileDeprecatedEvent(profileID, surfaceID, deprecatedBy string) (json.RawMessage, error)

BuildProfileDeprecatedEvent constructs the payload for EventProfileDeprecated.

func BuildSurfaceApprovedEvent

func BuildSurfaceApprovedEvent(surfaceID, approvedBy string) (json.RawMessage, error)

BuildSurfaceApprovedEvent constructs the payload for EventSurfaceApproved.

func BuildSurfaceDeprecatedEvent

func BuildSurfaceDeprecatedEvent(surfaceID, deprecatedBy string) (json.RawMessage, error)

BuildSurfaceDeprecatedEvent constructs the payload for EventSurfaceDeprecated.

Types

type DecisionCompletedEvent

type DecisionCompletedEvent struct {
	EventVersion  string `json:"event_version"`
	EnvelopeID    string `json:"envelope_id"`
	RequestSource string `json:"request_source"`
	RequestID     string `json:"request_id"`
	SurfaceID     string `json:"surface_id"`
	AgentID       string `json:"agent_id"`
	Outcome       string `json:"outcome"`
	ReasonCode    string `json:"reason_code"`
	Timestamp     string `json:"timestamp"`
}

DecisionCompletedEvent is the payload for EventDecisionCompleted. Emitted when an evaluation closes with the Execute (accept) outcome.

type DecisionEnvelopeClosedPayload added in v1.0.2

type DecisionEnvelopeClosedPayload struct {
	RequestSource string                        `json:"request_source"`
	RequestID     string                        `json:"request_id"`
	FinalOutcome  string                        `json:"final_outcome"`
	ClosedAt      string                        `json:"closed_at"`
	Review        *DecisionEnvelopeClosedReview `json:"review,omitempty"`
}

DecisionEnvelopeClosedPayload is the payload for EventDecisionEnvelopeClosed. Emitted when a governance envelope reaches the closed terminal state. Review is present only for envelopes closed via escalation review.

type DecisionEnvelopeClosedReview added in v1.0.2

type DecisionEnvelopeClosedReview struct {
	Decision     string `json:"decision"`
	ReviewerID   string `json:"reviewer_id"`
	ReviewerKind string `json:"reviewer_kind"`
	Notes        string `json:"notes,omitempty"`
}

DecisionEnvelopeClosedReview carries the reviewer's decision for envelopes closed after escalation review. ReviewerKind has no omitempty because it is always known when a review object is present; Notes is explicitly optional.

type DecisionEscalatedEvent

type DecisionEscalatedEvent struct {
	EventVersion  string `json:"event_version"`
	EnvelopeID    string `json:"envelope_id"`
	RequestSource string `json:"request_source"`
	RequestID     string `json:"request_id"`
	SurfaceID     string `json:"surface_id"`
	AgentID       string `json:"agent_id"`
	ReasonCode    string `json:"reason_code"`
	Timestamp     string `json:"timestamp"`
}

DecisionEscalatedEvent is the payload for EventDecisionEscalated. Emitted when an evaluation produces an Escalate outcome and the envelope transitions to AWAITING_REVIEW.

type DecisionOutcomeRecordedPayload added in v1.0.2

type DecisionOutcomeRecordedPayload struct {
	RequestSource string `json:"request_source"`
	RequestID     string `json:"request_id"`
	SurfaceID     string `json:"surface_id"`
	AgentID       string `json:"agent_id"`
	Outcome       string `json:"outcome"`
	ReasonCode    string `json:"reason_code"`
}

DecisionOutcomeRecordedPayload is the payload for EventDecisionOutcomeRecorded. Emitted for all evaluation outcomes (accept, escalate, reject, request_clarification).

type DecisionReviewResolvedEvent

type DecisionReviewResolvedEvent struct {
	EventVersion  string `json:"event_version"`
	EnvelopeID    string `json:"envelope_id"`
	RequestSource string `json:"request_source"`
	RequestID     string `json:"request_id"`
	Decision      string `json:"decision"`
	ReviewerID    string `json:"reviewer_id"`
	Timestamp     string `json:"timestamp"`
}

DecisionReviewResolvedEvent is the payload for EventDecisionReviewResolved. Emitted when a reviewer closes an escalated envelope. The Decision field distinguishes APPROVED from REJECTED resolutions.

type EventType

type EventType string

EventType identifies the kind of domain event carried by an outbox row. Each value corresponds to a named integration event that downstream systems may subscribe to.

const (
	// EventDecisionCompleted is emitted when an evaluation closes with the
	// Execute (accept) outcome. Downstream systems use this to trigger
	// post-decision workflows. This event is emitted only for the Execute
	// outcome; Reject and RequestClarification outcomes do not produce this
	// event because no downstream action is warranted for them.
	EventDecisionCompleted EventType = "decision.completed"

	// EventDecisionEscalated is emitted when an evaluation produces an
	// Escalate outcome and the envelope transitions to AWAITING_REVIEW.
	// This event is not emitted for Execute, Reject, or RequestClarification.
	EventDecisionEscalated EventType = "decision.escalated"

	// EventDecisionReviewResolved is emitted when a reviewer closes an
	// escalated envelope via ResolveEscalation. This event is emitted for
	// both APPROVED and REJECTED review decisions; the payload carries the
	// decision field to distinguish them.
	EventDecisionReviewResolved EventType = "decision.review_resolved"

	// EventSurfaceApproved is emitted when ApproveSurface successfully
	// transitions a surface from review to active.
	EventSurfaceApproved EventType = "surface.approved"

	// EventSurfaceDeprecated is emitted when DeprecateSurface successfully
	// transitions a surface from active to deprecated.
	EventSurfaceDeprecated EventType = "surface.deprecated"

	// EventProfileApproved is emitted when ApproveProfile successfully
	// transitions a profile from review to active.
	EventProfileApproved EventType = "profile.approved"

	// EventProfileDeprecated is emitted when DeprecateProfile successfully
	// transitions a profile from active to deprecated.
	EventProfileDeprecated EventType = "profile.deprecated"

	// EventGrantSuspended is emitted when SuspendGrant successfully
	// transitions a grant from active to suspended.
	EventGrantSuspended EventType = "grant.suspended"

	// EventGrantRevoked is emitted when RevokeGrant permanently revokes a grant.
	EventGrantRevoked EventType = "grant.revoked"

	// EventGrantReinstated is emitted when ReinstateGrant restores a suspended
	// grant to active.
	EventGrantReinstated EventType = "grant.reinstated"

	// EventDecisionOutcomeRecorded is the external contract event emitted for
	// every evaluation outcome (accept, escalate, reject, request_clarification).
	// It carries the full external event envelope defined in docs/events.md.
	EventDecisionOutcomeRecorded EventType = "decision.outcome_recorded"

	// EventDecisionEnvelopeClosed is the external contract event emitted when a
	// governance envelope reaches the closed terminal state. It fires for all
	// close paths: direct close after accept/reject/request_clarification, and
	// deferred close after escalation review. It carries the full external event
	// envelope defined in docs/events.md.
	EventDecisionEnvelopeClosed EventType = "decision.envelope_closed"
)

type ExternalEventEnvelope added in v1.0.2

type ExternalEventEnvelope struct {
	SchemaVersion string          `json:"schema_version"`
	EventID       string          `json:"event_id"`
	Type          string          `json:"type"`
	OccurredAt    string          `json:"occurred_at"`
	EnvelopeID    string          `json:"envelope_id"`
	Payload       json.RawMessage `json:"payload"`
}

ExternalEventEnvelope is the outer wrapper for all external MIDAS events. Payload holds the typed inner payload marshalled as raw JSON.

type GrantReinstatedEvent

type GrantReinstatedEvent struct {
	EventVersion string `json:"event_version"`
	GrantID      string `json:"grant_id"`
	AgentID      string `json:"agent_id"`
	ProfileID    string `json:"profile_id"`
	ReinstatedBy string `json:"reinstated_by"`
	Timestamp    string `json:"timestamp"`
}

GrantReinstatedEvent is the payload for EventGrantReinstated. Emitted when ReinstateGrant restores a suspended grant to active.

type GrantRevokedEvent

type GrantRevokedEvent struct {
	EventVersion string `json:"event_version"`
	GrantID      string `json:"grant_id"`
	AgentID      string `json:"agent_id"`
	ProfileID    string `json:"profile_id"`
	RevokedBy    string `json:"revoked_by"`
	Reason       string `json:"reason,omitempty"`
	Timestamp    string `json:"timestamp"`
}

GrantRevokedEvent is the payload for EventGrantRevoked. Emitted when RevokeGrant permanently revokes a grant.

type GrantSuspendedEvent

type GrantSuspendedEvent struct {
	EventVersion string `json:"event_version"`
	GrantID      string `json:"grant_id"`
	AgentID      string `json:"agent_id"`
	ProfileID    string `json:"profile_id"`
	SuspendedBy  string `json:"suspended_by"`
	Reason       string `json:"reason,omitempty"`
	Timestamp    string `json:"timestamp"`
}

GrantSuspendedEvent is the payload for EventGrantSuspended. Emitted when SuspendGrant successfully transitions a grant from active to suspended.

type MemoryRepository

type MemoryRepository struct {
	// contains filtered or unexported fields
}

MemoryRepository is an in-process outbox repository backed by a mutex-guarded slice. It is suitable for unit tests and in-memory integration tests that do not require durable storage.

Because the memory store has no real transaction support, Append merely appends to the slice. Tests that need to verify rollback behaviour must use the Postgres repository with a real transaction.

func NewMemoryRepository

func NewMemoryRepository() *MemoryRepository

NewMemoryRepository returns an empty MemoryRepository.

func (*MemoryRepository) All

All returns a snapshot of every event in the repository, published or not. Used in tests to assert the full set of appended events.

func (*MemoryRepository) Append

func (r *MemoryRepository) Append(_ context.Context, ev *OutboxEvent) error

Append adds ev to the in-memory event list. ev must not be nil.

func (*MemoryRepository) ClaimUnpublished

func (r *MemoryRepository) ClaimUnpublished(_ context.Context, limit int) ([]*OutboxEvent, error)

ClaimUnpublished returns up to limit events where PublishedAt is nil, in insertion order. The in-memory implementation does not use actual locking; it is suitable for unit tests only.

func (*MemoryRepository) ListUnpublished

func (r *MemoryRepository) ListUnpublished(_ context.Context) ([]*OutboxEvent, error)

ListUnpublished returns all events where PublishedAt is nil, in insertion order.

func (*MemoryRepository) MarkPublished

func (r *MemoryRepository) MarkPublished(_ context.Context, id string) error

MarkPublished sets PublishedAt on the event with the given ID. Returns an error if no event with that ID exists.

type OutboxEvent

type OutboxEvent struct {
	// ID is a UUID assigned at construction time.
	ID string

	// EventType identifies the kind of domain event (e.g. "decision.completed").
	EventType EventType

	// AggregateType is the resource kind that produced the event
	// (e.g. "envelope", "surface").
	AggregateType string

	// AggregateID is the identifier of the aggregate instance (e.g. envelope ID,
	// surface ID).
	AggregateID string

	// Topic is the logical destination for the event (e.g. "midas.decisions").
	// Dispatcher implementations map this to a Kafka topic, SNS topic, etc.
	Topic string

	// EventKey is the optional partition/routing key for ordered delivery
	// (e.g. request_source + ":" + request_id, or surface ID).
	EventKey string

	// Payload is the JSON-encoded event body delivered to consumers.
	// Always a valid JSON value; never nil (normalised to {} on construction).
	Payload json.RawMessage

	// CreatedAt is set at construction time.
	CreatedAt time.Time

	// PublishedAt is nil until the dispatcher successfully delivers the event.
	PublishedAt *time.Time
}

OutboxEvent is a single row in the outbox_events table.

Fields that influence routing (topic, event_key) are separate from the payload so that dispatcher implementations can route without deserialising the payload.

func New

func New(
	eventType EventType,
	aggregateType string,
	aggregateID string,
	topic string,
	eventKey string,
	payload json.RawMessage,
) (*OutboxEvent, error)

New constructs an OutboxEvent with a new UUID and the current time.

Invariants enforced at construction:

  • eventType must not be empty.
  • aggregateType must not be empty.
  • aggregateID must not be empty.
  • topic must not be empty.
  • nil payload is normalised to json.RawMessage(`{}`).
  • payload must be valid JSON (checked after normalisation).

type ProfileApprovedEvent

type ProfileApprovedEvent struct {
	EventVersion string `json:"event_version"`
	ProfileID    string `json:"profile_id"`
	SurfaceID    string `json:"surface_id"`
	ApprovedBy   string `json:"approved_by"`
	Timestamp    string `json:"timestamp"`
}

ProfileApprovedEvent is the payload for EventProfileApproved. Emitted when ApproveProfile successfully transitions a profile from review to active.

type ProfileDeprecatedEvent

type ProfileDeprecatedEvent struct {
	EventVersion string `json:"event_version"`
	ProfileID    string `json:"profile_id"`
	SurfaceID    string `json:"surface_id"`
	DeprecatedBy string `json:"deprecated_by"`
	Timestamp    string `json:"timestamp"`
}

ProfileDeprecatedEvent is the payload for EventProfileDeprecated. Emitted when DeprecateProfile successfully transitions a profile from active to deprecated.

type Repository

type Repository interface {
	// Append writes a single outbox event. The event must not be nil.
	// Returns an error if persistence fails.
	Append(ctx context.Context, ev *OutboxEvent) error

	// ListUnpublished returns all rows where published_at IS NULL, ordered
	// by created_at ascending. Dispatcher implementations call this to find
	// events awaiting delivery.
	ListUnpublished(ctx context.Context) ([]*OutboxEvent, error)

	// ClaimUnpublished returns up to limit unpublished rows using
	// SELECT FOR UPDATE SKIP LOCKED inside a short-lived transaction.
	// Rows are ordered by created_at ASC, id ASC. The locking prevents
	// concurrent dispatchers from processing the same rows simultaneously.
	// Claimed rows remain unpublished in the database until MarkPublished
	// is called for each one.
	ClaimUnpublished(ctx context.Context, limit int) ([]*OutboxEvent, error)

	// MarkPublished sets published_at to now for the given event ID.
	// Returns an error if the event does not exist or the update fails.
	MarkPublished(ctx context.Context, id string) error
}

Repository defines the persistence contract for outbox events.

All write methods must be called with a repository instance that is bound to the same database transaction as the domain state change. This is the invariant that makes the outbox durable: the event row and the domain row commit together or roll back together.

type SurfaceApprovedEvent

type SurfaceApprovedEvent struct {
	EventVersion string `json:"event_version"`
	SurfaceID    string `json:"surface_id"`
	ApprovedBy   string `json:"approved_by"`
	Timestamp    string `json:"timestamp"`
}

SurfaceApprovedEvent is the payload for EventSurfaceApproved. Emitted when ApproveSurface successfully transitions a surface from review to active.

type SurfaceDeprecatedEvent

type SurfaceDeprecatedEvent struct {
	EventVersion string `json:"event_version"`
	SurfaceID    string `json:"surface_id"`
	DeprecatedBy string `json:"deprecated_by"`
	Timestamp    string `json:"timestamp"`
}

SurfaceDeprecatedEvent is the payload for EventSurfaceDeprecated. Emitted when DeprecateSurface successfully transitions a surface from active to deprecated.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL