Documentation
¶
Overview ¶
Package outbox implements the transactional outbox pattern for MIDAS.
The outbox is a durable staging table written in the same database transaction as the domain state change that produced the event. A separate dispatcher (not part of this package) reads unpublished rows and delivers them to downstream consumers, then marks them published. This decouples reliable event delivery from the evaluation hot path without introducing distributed transactions.
Audit log and outbox are distinct concerns:
- Audit events are hash-chained, append-only, and governance records.
- Outbox events are routing envelopes for downstream integration; they carry a JSON payload and are marked published once delivered.
Index ¶
- Variables
- func BuildDecisionCompletedEvent(...) (json.RawMessage, error)
- func BuildDecisionEscalatedEvent(envelopeID, requestSource, requestID, surfaceID, agentID, reasonCode string) (json.RawMessage, error)
- func BuildDecisionReviewResolvedEvent(envelopeID, requestSource, requestID, decision, reviewerID string) (json.RawMessage, error)
- func BuildGrantReinstatedEvent(grantID, agentID, profileID, reinstatedBy string) (json.RawMessage, error)
- func BuildGrantRevokedEvent(grantID, agentID, profileID, revokedBy, reason string) (json.RawMessage, error)
- func BuildGrantSuspendedEvent(grantID, agentID, profileID, suspendedBy, reason string) (json.RawMessage, error)
- func BuildProfileApprovedEvent(profileID, surfaceID, approvedBy string) (json.RawMessage, error)
- func BuildProfileDeprecatedEvent(profileID, surfaceID, deprecatedBy string) (json.RawMessage, error)
- func BuildSurfaceApprovedEvent(surfaceID, approvedBy string) (json.RawMessage, error)
- func BuildSurfaceDeprecatedEvent(surfaceID, deprecatedBy string) (json.RawMessage, error)
- type DecisionCompletedEvent
- type DecisionEscalatedEvent
- type DecisionReviewResolvedEvent
- type EventType
- type GrantReinstatedEvent
- type GrantRevokedEvent
- type GrantSuspendedEvent
- type MemoryRepository
- func (r *MemoryRepository) All(_ context.Context) []*OutboxEvent
- func (r *MemoryRepository) Append(_ context.Context, ev *OutboxEvent) error
- func (r *MemoryRepository) ClaimUnpublished(_ context.Context, limit int) ([]*OutboxEvent, error)
- func (r *MemoryRepository) ListUnpublished(_ context.Context) ([]*OutboxEvent, error)
- func (r *MemoryRepository) MarkPublished(_ context.Context, id string) error
- type OutboxEvent
- type ProfileApprovedEvent
- type ProfileDeprecatedEvent
- type Repository
- type SurfaceApprovedEvent
- type SurfaceDeprecatedEvent
Constants ¶
This section is empty.
Variables ¶
var ( // ErrEmptyEventType is returned when EventType is blank. ErrEmptyEventType = errors.New("outbox: event_type must not be empty") // ErrEmptyAggregateType is returned when AggregateType is blank. ErrEmptyAggregateType = errors.New("outbox: aggregate_type must not be empty") // ErrEmptyAggregateID is returned when AggregateID is blank. ErrEmptyAggregateID = errors.New("outbox: aggregate_id must not be empty") // ErrEmptyTopic is returned when Topic is blank. ErrEmptyTopic = errors.New("outbox: topic must not be empty") // ErrInvalidPayload is returned when the payload is not valid JSON. ErrInvalidPayload = errors.New("outbox: payload must be valid JSON") )
Sentinel errors returned by New.
Functions ¶
func BuildDecisionCompletedEvent ¶
func BuildDecisionCompletedEvent( envelopeID, requestSource, requestID, surfaceID, agentID, outcome, reasonCode string, ) (json.RawMessage, error)
BuildDecisionCompletedEvent constructs the payload for EventDecisionCompleted. All arguments are required; empty strings are accepted for fields that may be unavailable at construction time (e.g. surfaceID before surface resolution).
func BuildDecisionEscalatedEvent ¶
func BuildDecisionEscalatedEvent( envelopeID, requestSource, requestID, surfaceID, agentID, reasonCode string, ) (json.RawMessage, error)
BuildDecisionEscalatedEvent constructs the payload for EventDecisionEscalated.
func BuildDecisionReviewResolvedEvent ¶
func BuildDecisionReviewResolvedEvent( envelopeID, requestSource, requestID, decision, reviewerID string, ) (json.RawMessage, error)
BuildDecisionReviewResolvedEvent constructs the payload for EventDecisionReviewResolved.
func BuildGrantReinstatedEvent ¶
func BuildGrantReinstatedEvent(grantID, agentID, profileID, reinstatedBy string) (json.RawMessage, error)
BuildGrantReinstatedEvent constructs the payload for EventGrantReinstated.
func BuildGrantRevokedEvent ¶
func BuildGrantRevokedEvent(grantID, agentID, profileID, revokedBy, reason string) (json.RawMessage, error)
BuildGrantRevokedEvent constructs the payload for EventGrantRevoked.
func BuildGrantSuspendedEvent ¶
func BuildGrantSuspendedEvent(grantID, agentID, profileID, suspendedBy, reason string) (json.RawMessage, error)
BuildGrantSuspendedEvent constructs the payload for EventGrantSuspended.
func BuildProfileApprovedEvent ¶
func BuildProfileApprovedEvent(profileID, surfaceID, approvedBy string) (json.RawMessage, error)
BuildProfileApprovedEvent constructs the payload for EventProfileApproved.
func BuildProfileDeprecatedEvent ¶
func BuildProfileDeprecatedEvent(profileID, surfaceID, deprecatedBy string) (json.RawMessage, error)
BuildProfileDeprecatedEvent constructs the payload for EventProfileDeprecated.
func BuildSurfaceApprovedEvent ¶
func BuildSurfaceApprovedEvent(surfaceID, approvedBy string) (json.RawMessage, error)
BuildSurfaceApprovedEvent constructs the payload for EventSurfaceApproved.
func BuildSurfaceDeprecatedEvent ¶
func BuildSurfaceDeprecatedEvent(surfaceID, deprecatedBy string) (json.RawMessage, error)
BuildSurfaceDeprecatedEvent constructs the payload for EventSurfaceDeprecated.
Types ¶
type DecisionCompletedEvent ¶
type DecisionCompletedEvent struct {
EventVersion string `json:"event_version"`
EnvelopeID string `json:"envelope_id"`
RequestSource string `json:"request_source"`
RequestID string `json:"request_id"`
SurfaceID string `json:"surface_id"`
AgentID string `json:"agent_id"`
Outcome string `json:"outcome"`
ReasonCode string `json:"reason_code"`
Timestamp string `json:"timestamp"`
}
DecisionCompletedEvent is the payload for EventDecisionCompleted. Emitted when an evaluation closes with the Execute (accept) outcome.
type DecisionEscalatedEvent ¶
type DecisionEscalatedEvent struct {
EventVersion string `json:"event_version"`
EnvelopeID string `json:"envelope_id"`
RequestSource string `json:"request_source"`
RequestID string `json:"request_id"`
SurfaceID string `json:"surface_id"`
AgentID string `json:"agent_id"`
ReasonCode string `json:"reason_code"`
Timestamp string `json:"timestamp"`
}
DecisionEscalatedEvent is the payload for EventDecisionEscalated. Emitted when an evaluation produces an Escalate outcome and the envelope transitions to AWAITING_REVIEW.
type DecisionReviewResolvedEvent ¶
type DecisionReviewResolvedEvent struct {
EventVersion string `json:"event_version"`
EnvelopeID string `json:"envelope_id"`
RequestSource string `json:"request_source"`
RequestID string `json:"request_id"`
Decision string `json:"decision"`
ReviewerID string `json:"reviewer_id"`
Timestamp string `json:"timestamp"`
}
DecisionReviewResolvedEvent is the payload for EventDecisionReviewResolved. Emitted when a reviewer closes an escalated envelope. The Decision field distinguishes APPROVED from REJECTED resolutions.
type EventType ¶
type EventType string
EventType identifies the kind of domain event carried by an outbox row. Each value corresponds to a named integration event that downstream systems may subscribe to.
const ( // EventDecisionCompleted is emitted when an evaluation closes with the // Execute (accept) outcome. Downstream systems use this to trigger // post-decision workflows. This event is emitted only for the Execute // outcome; Reject and RequestClarification outcomes do not produce this // event because no downstream action is warranted for them. EventDecisionCompleted EventType = "decision.completed" // EventDecisionEscalated is emitted when an evaluation produces an // Escalate outcome and the envelope transitions to AWAITING_REVIEW. // This event is not emitted for Execute, Reject, or RequestClarification. EventDecisionEscalated EventType = "decision.escalated" // EventDecisionReviewResolved is emitted when a reviewer closes an // escalated envelope via ResolveEscalation. This event is emitted for // both APPROVED and REJECTED review decisions; the payload carries the // decision field to distinguish them. EventDecisionReviewResolved EventType = "decision.review_resolved" // EventSurfaceApproved is emitted when ApproveSurface successfully // transitions a surface from review to active. EventSurfaceApproved EventType = "surface.approved" // EventSurfaceDeprecated is emitted when DeprecateSurface successfully // transitions a surface from active to deprecated. EventSurfaceDeprecated EventType = "surface.deprecated" // EventProfileApproved is emitted when ApproveProfile successfully // transitions a profile from review to active. EventProfileApproved EventType = "profile.approved" // EventProfileDeprecated is emitted when DeprecateProfile successfully // transitions a profile from active to deprecated. EventProfileDeprecated EventType = "profile.deprecated" // EventGrantSuspended is emitted when SuspendGrant successfully // transitions a grant from active to suspended. EventGrantSuspended EventType = "grant.suspended" // EventGrantRevoked is emitted when RevokeGrant permanently revokes a grant. EventGrantRevoked EventType = "grant.revoked" // EventGrantReinstated is emitted when ReinstateGrant restores a suspended // grant to active. EventGrantReinstated EventType = "grant.reinstated" )
type GrantReinstatedEvent ¶
type GrantReinstatedEvent struct {
EventVersion string `json:"event_version"`
GrantID string `json:"grant_id"`
AgentID string `json:"agent_id"`
ProfileID string `json:"profile_id"`
ReinstatedBy string `json:"reinstated_by"`
Timestamp string `json:"timestamp"`
}
GrantReinstatedEvent is the payload for EventGrantReinstated. Emitted when ReinstateGrant restores a suspended grant to active.
type GrantRevokedEvent ¶
type GrantRevokedEvent struct {
EventVersion string `json:"event_version"`
GrantID string `json:"grant_id"`
AgentID string `json:"agent_id"`
ProfileID string `json:"profile_id"`
RevokedBy string `json:"revoked_by"`
Reason string `json:"reason,omitempty"`
Timestamp string `json:"timestamp"`
}
GrantRevokedEvent is the payload for EventGrantRevoked. Emitted when RevokeGrant permanently revokes a grant.
type GrantSuspendedEvent ¶
type GrantSuspendedEvent struct {
EventVersion string `json:"event_version"`
GrantID string `json:"grant_id"`
AgentID string `json:"agent_id"`
ProfileID string `json:"profile_id"`
SuspendedBy string `json:"suspended_by"`
Reason string `json:"reason,omitempty"`
Timestamp string `json:"timestamp"`
}
GrantSuspendedEvent is the payload for EventGrantSuspended. Emitted when SuspendGrant successfully transitions a grant from active to suspended.
type MemoryRepository ¶
type MemoryRepository struct {
// contains filtered or unexported fields
}
MemoryRepository is an in-process outbox repository backed by a mutex-guarded slice. It is suitable for unit tests and in-memory integration tests that do not require durable storage.
Because the memory store has no real transaction support, Append merely appends to the slice. Tests that need to verify rollback behaviour must use the Postgres repository with a real transaction.
func NewMemoryRepository ¶
func NewMemoryRepository() *MemoryRepository
NewMemoryRepository returns an empty MemoryRepository.
func (*MemoryRepository) All ¶
func (r *MemoryRepository) All(_ context.Context) []*OutboxEvent
All returns a snapshot of every event in the repository, published or not. Used in tests to assert the full set of appended events.
func (*MemoryRepository) Append ¶
func (r *MemoryRepository) Append(_ context.Context, ev *OutboxEvent) error
Append adds ev to the in-memory event list. ev must not be nil.
func (*MemoryRepository) ClaimUnpublished ¶
func (r *MemoryRepository) ClaimUnpublished(_ context.Context, limit int) ([]*OutboxEvent, error)
ClaimUnpublished returns up to limit events where PublishedAt is nil, in insertion order. The in-memory implementation does not use actual locking; it is suitable for unit tests only.
func (*MemoryRepository) ListUnpublished ¶
func (r *MemoryRepository) ListUnpublished(_ context.Context) ([]*OutboxEvent, error)
ListUnpublished returns all events where PublishedAt is nil, in insertion order.
func (*MemoryRepository) MarkPublished ¶
func (r *MemoryRepository) MarkPublished(_ context.Context, id string) error
MarkPublished sets PublishedAt on the event with the given ID. Returns an error if no event with that ID exists.
type OutboxEvent ¶
type OutboxEvent struct {
// ID is a UUID assigned at construction time.
ID string
// EventType identifies the kind of domain event (e.g. "decision.completed").
EventType EventType
// AggregateType is the resource kind that produced the event
// (e.g. "envelope", "surface").
AggregateType string
// AggregateID is the identifier of the aggregate instance (e.g. envelope ID,
// surface ID).
AggregateID string
// Topic is the logical destination for the event (e.g. "midas.decisions").
// Dispatcher implementations map this to a Kafka topic, SNS topic, etc.
Topic string
// EventKey is the optional partition/routing key for ordered delivery
// (e.g. request_source + ":" + request_id, or surface ID).
EventKey string
// Payload is the JSON-encoded event body delivered to consumers.
// Always a valid JSON value; never nil (normalised to {} on construction).
Payload json.RawMessage
// CreatedAt is set at construction time.
CreatedAt time.Time
// PublishedAt is nil until the dispatcher successfully delivers the event.
PublishedAt *time.Time
}
OutboxEvent is a single row in the outbox_events table.
Fields that influence routing (topic, event_key) are separate from the payload so that dispatcher implementations can route without deserialising the payload.
func New ¶
func New( eventType EventType, aggregateType string, aggregateID string, topic string, eventKey string, payload json.RawMessage, ) (*OutboxEvent, error)
New constructs an OutboxEvent with a new UUID and the current time.
Invariants enforced at construction:
- eventType must not be empty.
- aggregateType must not be empty.
- aggregateID must not be empty.
- topic must not be empty.
- nil payload is normalised to json.RawMessage(`{}`).
- payload must be valid JSON (checked after normalisation).
type ProfileApprovedEvent ¶
type ProfileApprovedEvent struct {
EventVersion string `json:"event_version"`
ProfileID string `json:"profile_id"`
SurfaceID string `json:"surface_id"`
ApprovedBy string `json:"approved_by"`
Timestamp string `json:"timestamp"`
}
ProfileApprovedEvent is the payload for EventProfileApproved. Emitted when ApproveProfile successfully transitions a profile from review to active.
type ProfileDeprecatedEvent ¶
type ProfileDeprecatedEvent struct {
EventVersion string `json:"event_version"`
ProfileID string `json:"profile_id"`
SurfaceID string `json:"surface_id"`
DeprecatedBy string `json:"deprecated_by"`
Timestamp string `json:"timestamp"`
}
ProfileDeprecatedEvent is the payload for EventProfileDeprecated. Emitted when DeprecateProfile successfully transitions a profile from active to deprecated.
type Repository ¶
type Repository interface {
// Append writes a single outbox event. The event must not be nil.
// Returns an error if persistence fails.
Append(ctx context.Context, ev *OutboxEvent) error
// ListUnpublished returns all rows where published_at IS NULL, ordered
// by created_at ascending. Dispatcher implementations call this to find
// events awaiting delivery.
ListUnpublished(ctx context.Context) ([]*OutboxEvent, error)
// ClaimUnpublished returns up to limit unpublished rows using
// SELECT FOR UPDATE SKIP LOCKED inside a short-lived transaction.
// Rows are ordered by created_at ASC, id ASC. The locking prevents
// concurrent dispatchers from processing the same rows simultaneously.
// Claimed rows remain unpublished in the database until MarkPublished
// is called for each one.
ClaimUnpublished(ctx context.Context, limit int) ([]*OutboxEvent, error)
// MarkPublished sets published_at to now for the given event ID.
// Returns an error if the event does not exist or the update fails.
MarkPublished(ctx context.Context, id string) error
}
Repository defines the persistence contract for outbox events.
All write methods must be called with a repository instance that is bound to the same database transaction as the domain state change. This is the invariant that makes the outbox durable: the event row and the domain row commit together or roll back together.
type SurfaceApprovedEvent ¶
type SurfaceApprovedEvent struct {
EventVersion string `json:"event_version"`
SurfaceID string `json:"surface_id"`
ApprovedBy string `json:"approved_by"`
Timestamp string `json:"timestamp"`
}
SurfaceApprovedEvent is the payload for EventSurfaceApproved. Emitted when ApproveSurface successfully transitions a surface from review to active.
type SurfaceDeprecatedEvent ¶
type SurfaceDeprecatedEvent struct {
EventVersion string `json:"event_version"`
SurfaceID string `json:"surface_id"`
DeprecatedBy string `json:"deprecated_by"`
Timestamp string `json:"timestamp"`
}
SurfaceDeprecatedEvent is the payload for EventSurfaceDeprecated. Emitted when DeprecateSurface successfully transitions a surface from active to deprecated.