events

package
v0.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package events implements a unified event-contract core: a typed Envelope, a runtime Kind registry, table-driven dispatch, a generic config-driven producer engine, and a dependency-free JSONPath subset.

The design is deliberately schema-additive: event kinds are registered at runtime (mirroring verifier_profiles) rather than enumerated in Go, so new event types can be added without touching this package.

Index

Constants

View Source
const (
	RollupGreen   = "GREEN"
	RollupFailing = "FAILING"
	RollupPending = "PENDING"
)

Rollup state constants. These are the only legal values of Rollup.State.

View Source
const (
	KindPROpened                = "event.pr.opened"
	KindPRCIGreen               = "event.pr.ci_green"
	KindPRCIFailed              = "event.pr.ci_failed"
	KindPRMerged                = "event.pr.merged"
	KindPRClosed                = "event.pr.closed"
	KindPRForceRebased          = "event.pr.force_rebased"
	KindPRReviewRequestedChange = "event.pr.review_requested_change"
	KindPRCommentPosted         = "event.pr.comment_posted"

	// PRNamespace is the namespace all PR kinds share.
	PRNamespace = "event.pr"
)

PR event kind names (pr-event-source D1). These are the first registered kinds on the event.pr namespace; each maps 1:1 onto a layered-pr-fanout poll-detector transition and the monitor-pr-review-comment-routing §4 events.

Variables

This section is empty.

Functions

func DeriveRollupState added in v0.4.0

func DeriveRollupState(checks []Check) string

DeriveRollupState applies the one shared derive rule (pr-event-source D3):

  • any check whose conclusion is FAILURE/TIMED_OUT/CANCELLED -> FAILING
  • else any check not yet COMPLETED -> PENDING
  • else -> GREEN

An empty check set is GREEN (nothing failing, nothing pending). The rule is case-insensitive on both Status and Conclusion.

func JSONPath

func JSONPath(root any, expr string) (any, error)

JSONPath evaluates a small JSONPath subset against a value decoded from JSON (map[string]any / []any / scalars). Supported syntax:

  • dotted traversal: ".a.b"
  • array index: ".items[0]"
  • one equality filter: ".checks[?(@.conclusion=='FAILURE')]"

A leading "$" is optional. The filter selects the first array element whose field equals the quoted literal. No other predicates are supported.

func PRKinds added in v0.4.0

func PRKinds() []string

PRKinds is the ordered set of canonical PR event kinds. It is the single source of truth for which kinds RegisterPRKinds installs.

func RegisterPRKinds added in v0.4.0

func RegisterPRKinds(r *Registry) error

RegisterPRKinds installs every event.pr.* kind on the registry with control-plane (reject-on-unknown) disposition (pr-event-source D1/R1). It also sets the namespace disposition to reject so a mistyped event.pr.* type fails loudly at emit time rather than silently routing to a soft handler.

Registration is idempotent: re-registering replaces with the same values.

Types

type AuthRoundTripper added in v0.4.0

type AuthRoundTripper struct {
	// ProxyBase is the da service injector base, e.g. "http://localhost:8765".
	// Empty selects the direct-load fallback.
	ProxyBase string
	// Loader resolves a credential for a host in the fallback path. Nil in the
	// fallback path means "no credential available" and requests pass through
	// unauthenticated.
	Loader CredentialLoader
	// Base is the underlying RoundTripper. Nil uses http.DefaultTransport.
	Base http.RoundTripper
}

AuthRoundTripper is the auth seam the producer receives (pr-event-source D5). When ProxyBase is set it rewrites outbound requests to the da service injector at localhost:<port>/proxy/<host>, which attaches the credential and owns refresh — the producer stays credential-unaware. When ProxyBase is empty it falls back to the direct-load path: it calls Loader for the request host and sets a bearer Authorization header inline.

func (*AuthRoundTripper) RoundTrip added in v0.4.0

func (a *AuthRoundTripper) RoundTrip(req *http.Request) (*http.Response, error)

RoundTrip implements http.RoundTripper. It never mutates the caller's request; it clones before rewriting URL or headers.

type Check added in v0.4.0

type Check struct {
	Name       string `json:"name"`
	Status     string `json:"status"`
	Conclusion string `json:"conclusion"`
	Link       string `json:"link"`
}

Check is a single CI check on a PR.

type Comment added in v0.4.0

type Comment struct {
	Author    string `json:"author"`
	Body      string `json:"body"`
	Path      string `json:"path"`
	Line      int    `json:"line"`
	CreatedAt string `json:"created_at"`
}

Comment is a single PR review or issue comment (pr-event-source D3).

type CredentialLoader added in v0.4.0

type CredentialLoader func(host string) (token string, err error)

CredentialLoader resolves a per-host credential for the direct-load fallback path (pr-event-source D5). It is the seam the external-agent-sources cred-store loader plugs into; the producer never sees the resolution strategy. Returning an empty token with a nil error means "no credential for this host" — the request proceeds unauthenticated rather than failing.

type DefaultFetcher

type DefaultFetcher struct {
	// Client is used for http fetches; nil falls back to http.DefaultClient.
	Client *http.Client
}

DefaultFetcher implements Fetcher with real exec and http I/O. It is the production seam; tests substitute a fake.

func (DefaultFetcher) Fetch

func (f DefaultFetcher) Fetch(ctx context.Context, spec FetchSpec) ([]byte, error)

Fetch dispatches to exec or http based on the spec.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher routes Envelopes to Handlers using the Registry to resolve disposition. Routing is table-driven: there is no switch on the event type.

func NewDispatcher

func NewDispatcher(registry *Registry, logw io.Writer) *Dispatcher

NewDispatcher builds a Dispatcher over the given registry. logw receives the warning lines emitted for soft, unregistered types; pass io.Discard to mute.

func (*Dispatcher) Dispatch

func (d *Dispatcher) Dispatch(e Envelope) error

Dispatch validates and routes an Envelope. The resolution order is:

  • invalid envelope -> error
  • explicit per-type handler -> invoke it
  • reject disposition -> loud fail-fast error
  • soft disposition -> warn + generic handler (no-op if unset)

func (*Dispatcher) On

func (d *Dispatcher) On(typ string, h Handler) error

On registers a per-type handler. The type need not be a registered kind, but emit-time validation still applies the namespace disposition.

func (*Dispatcher) SetSoftHandler

func (d *Dispatcher) SetSoftHandler(h Handler)

SetSoftHandler installs the generic handler used for soft-disposition types that have no explicit per-type handler.

type Disposition

type Disposition int

Disposition controls what happens when an Envelope whose Type is not registered is encountered. It is resolved by the most specific matching namespace; an exact kind registration always wins.

const (
	// DispositionReject is the fail-fast default: unregistered types in a
	// reject namespace are loud errors at emit time.
	DispositionReject Disposition = iota
	// DispositionSoft routes unregistered types to a generic handler and logs
	// a warning instead of failing.
	DispositionSoft
)

func (Disposition) String

func (d Disposition) String() string

String renders a Disposition for diagnostics and error messages.

type Envelope

type Envelope struct {
	Type           string          `json:"type"`
	Source         string          `json:"source"`
	OccurredAt     time.Time       `json:"occurred_at"`
	IdempotencyKey string          `json:"idempotency_key"`
	Payload        json.RawMessage `json:"payload"`
}

Envelope is the canonical wire shape for every event flowing through the system. Payload is left as raw JSON so the core never needs to know the concrete shape of any particular kind.

func NewEnvelope

func NewEnvelope(typ, source, idempotencyKey string, occurredAt time.Time, payload json.RawMessage) (Envelope, error)

NewEnvelope constructs an Envelope and validates it. OccurredAt defaults to the current time when the caller passes the zero value.

func (Envelope) Namespace

func (e Envelope) Namespace() string

Namespace returns the dotted prefix of the event type up to (but excluding) the final segment — e.g. "event.pr.merged" -> "event.pr". When the type has no separating dot the whole type is treated as the namespace.

func (Envelope) Validate

func (e Envelope) Validate() error

Validate enforces the invariants required of every Envelope. It is cheap and has no side effects so it can be called freely at construction and emit time.

type FetchBlock added in v0.4.0

type FetchBlock struct {
	Argv    []string          `json:"argv,omitempty"`
	URL     string            `json:"url,omitempty"`
	Method  string            `json:"method,omitempty"`
	Headers map[string]string `json:"headers,omitempty"`
	Each    string            `json:"each,omitempty"`
	Map     map[string]string `json:"map,omitempty"`
}

FetchBlock is one named fetch in a pr_source config (the "list" or "comments" block). It maps directly onto the generic engine: Argv/URL drive the FetchSpec, Each selects the item list, and Map projects each item onto canonical fields.

type FetchSpec

type FetchSpec struct {
	Argv    []string          // exec: command and arguments
	URL     string            // http: request URL
	Method  string            // http: defaults to GET
	Headers map[string]string // http: request headers
}

FetchSpec describes how the producer obtains the raw JSON document for a cycle. Exactly one of Argv (exec) or URL (http) must be set.

type Fetcher

type Fetcher interface {
	Fetch(ctx context.Context, spec FetchSpec) ([]byte, error)
}

Fetcher is the seam isolating exec/http I/O so tests stay hermetic.

type Handler

type Handler interface {
	Handle(Envelope) error
}

Handler processes a single Envelope. Implementations must be safe to call from the Dispatcher and should treat the Envelope as read-only.

type HandlerFunc

type HandlerFunc func(Envelope) error

HandlerFunc adapts an ordinary function to the Handler interface.

func (HandlerFunc) Handle

func (f HandlerFunc) Handle(e Envelope) error

Handle calls the underlying function.

type Kind

type Kind struct {
	Name        string
	Disposition Disposition
	Producer    ProducerFactory
}

Kind is a registered event kind. Producer is optional and only set when a producer factory was registered for the kind's name.

type PR added in v0.4.0

type PR struct {
	Number    int    `json:"number"`
	Title     string `json:"title"`
	Branch    string `json:"branch"`
	BaseRef   string `json:"base_ref"`
	State     string `json:"state"`
	Mergeable string `json:"mergeable"`
	HeadSHA   string `json:"head_sha"`
	URL       string `json:"url"`
	Rollup    Rollup `json:"rollup"`
}

PR is the canonical, platform-independent pull-request payload (pr-event-source design D3). A producer maps any platform's JSON shape onto these fields via the pr_source field map, so consumers never branch on the source platform.

type PRProducer added in v0.4.0

type PRProducer struct {
	// contains filtered or unexported fields
}

PRProducer wraps the generic engine Producer with the PR-specific rollup derive step (pr-event-source D3). Keeping the derive here — not in the engine — honours D2 (no per-platform producer files in the generic engine): the engine stays config-only, while this reaction-side wrapper applies the one shared Rollup.State rule. A platform whose source has no checks simply maps no rollup field and the derive is a no-op.

func (*PRProducer) Cycle added in v0.4.0

func (p *PRProducer) Cycle(ctx context.Context) ([]Envelope, error)

Cycle runs one engine cycle, then rewrites each emitted envelope's payload so its rollup field is the canonical Rollup{State, Checks} with State derived by DeriveRollupState. Envelopes without a rollup field pass through unchanged so non-CI sources (e.g. a comments source) are unaffected.

type PRSourceConfig added in v0.4.0

type PRSourceConfig struct {
	// Producer selects the named producer: "gh" (the zero-config default),
	// "exec", "http", or any registered code producer.
	Producer string `json:"producer,omitempty"`
	// List fetches the open PR list and maps it onto canonical PR fields.
	List FetchBlock `json:"list,omitempty"`
	// Comments fetches one PR's comments (the "{number}" placeholder in Argv is
	// substituted per PR by the caller). Optional.
	Comments FetchBlock `json:"comments,omitempty"`
	// PollIntervalS is the producer poll cadence in seconds.
	PollIntervalS int `json:"poll_interval_s,omitempty"`
}

PRSourceConfig is the engine-facing form of the .agentsrc.json pr_source field (pr-event-source D4). It is producer-agnostic: a gh, exec, http, or GitLab source differs only by the Argv/URL and Map in its blocks — zero Go.

func DefaultGHPRSource added in v0.4.0

func DefaultGHPRSource() PRSourceConfig

DefaultGHPRSource returns the built-in `gh` pr_source config (pr-event-source D4 / proposal §3.1). It is the zero-config default: it shells out to the GitHub CLI and maps gh's JSON onto the canonical PR fields. A new platform overrides this block in .agentsrc.json — no Go change (pr-event-source done-criteria 1).

func (PRSourceConfig) NewListProducer added in v0.4.0

func (c PRSourceConfig) NewListProducer(typ, source string, fetcher Fetcher) (*PRProducer, error)

NewListProducer constructs the PR list producer for this source. A nil fetcher uses the default exec/http fetcher; tests inject a fake. When the list block's Each is the document-identity path, the fetcher is wrapped so the engine sees the array under a synthetic key. The returned *PRProducer wraps the generic engine and derives each PR's canonical Rollup.State on every cycle, so a default-source fetch emits event.pr.* payloads carrying a derived rollup state (pr-event-source D3) — DeriveRollupState is on this live path, not dead code.

func (PRSourceConfig) ProducerConfigForList added in v0.4.0

func (c PRSourceConfig) ProducerConfigForList(typ, source string) (ProducerConfig, error)

ProducerConfigForList builds the engine ProducerConfig that emits envelopes of the given typ from this source's List block. The source/Each/Map all flow from config, so this is the entire "PR support" producer wiring — no per-platform code (pr-event-source D2).

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer is a stateful event source: each Cycle fetches, maps, diffs against the previous snapshot, and returns Envelopes for changed items.

func NewProducer

func NewProducer(cfg ProducerConfig, fetcher Fetcher) (*Producer, error)

NewProducer builds a Producer. A nil fetcher defaults to the real exec/http fetcher; tests inject a fake.

func (*Producer) Cycle

func (p *Producer) Cycle(ctx context.Context) ([]Envelope, error)

Cycle runs one fetch/diff cycle and returns Envelopes for new or changed items. The first cycle treats every item as new.

type ProducerConfig

type ProducerConfig struct {
	Type   string            // event type emitted for each change
	Source string            // envelope source
	Fetch  FetchSpec         // how to obtain the document
	Each   string            // jsonpath to the list of items
	Map    map[string]string // canonical field -> jsonpath (relative to item)
	KeyBy  string            // canonical field used as the snapshot/idempotency key
}

ProducerConfig drives the generic producer engine. There are deliberately no per-platform fields: a GitHub PR producer and a metrics producer differ only by config.

type ProducerFactory

type ProducerFactory func() (*Producer, error)

ProducerFactory builds a Producer for a registered kind. It is defined here (rather than producer.go) because the registry owns the binding from name to factory; producer.go supplies the concrete engine.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry is a runtime, thread-safe table of event kinds keyed by name plus a set of namespace-level dispositions. There is intentionally no Go enum of kind names — everything is registered at runtime.

func NewRegistry

func NewRegistry() *Registry

NewRegistry returns an empty Registry whose namespaces default to reject.

func (*Registry) DispositionFor

func (r *Registry) DispositionFor(typ string) Disposition

DispositionFor resolves the disposition for a type: an exact kind wins, then the longest matching namespace prefix, then the registry default.

func (*Registry) Lookup

func (r *Registry) Lookup(name string) (Kind, bool)

Lookup returns the registered kind for an exact name.

func (*Registry) Names

func (r *Registry) Names() []string

Names returns the registered kind names sorted for deterministic output.

func (*Registry) Register

func (r *Registry) Register(name string, disp Disposition) error

Register adds (or replaces) a kind by exact name with the given disposition.

func (*Registry) RegisterProducer

func (r *Registry) RegisterProducer(name string, factory ProducerFactory) error

RegisterProducer binds a producer factory to a kind name, creating the kind with the default disposition if it was not registered yet.

func (*Registry) SetNamespaceDisposition

func (r *Registry) SetNamespaceDisposition(namespace string, disp Disposition) error

SetNamespaceDisposition declares the disposition for a whole namespace (e.g. "event.metric"). Unregistered types in that namespace inherit it.

type Rollup added in v0.4.0

type Rollup struct {
	State  string  `json:"state"`
	Checks []Check `json:"checks"`
}

Rollup is the aggregate CI status of a PR. State is one of the RollupState constants and is always derived by DeriveRollupState — never set ad hoc.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL