engine

package module
v0.0.0-...-2e8b532 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 28, 2025 License: MIT Imports: 27 Imported by: 0

README

Engine Package

The engine package is the public, façade-oriented entry point for embedding Ariadne's crawling and processing capabilities. Implementation details (pipeline orchestration, rate limiting primitives, resource coordination, asset rewriting, telemetry internals) now live exclusively under engine/internal/* and are not part of the supported API surface.

Current Architecture (Post Phase 5)

Public (importable) surface:

  • engine (facade: construction, lifecycle, snapshotting, telemetry policy, health evaluation, asset strategy enablement)
  • engine/config (static configuration structs & normalization helpers). Dynamic / layered configuration (former experimental configx) was removed (see ADR md/decisions/2025-09-configx-removal.md). Any future dynamic reload capability will arrive via a new proposal & facade method, not by re‑exposing internal layering primitives.
  • engine/models (data structures: Page, CrawlResult, errors)
  • engine/resources (resource manager configuration & high-level stats)

Removed (internalized) since C5:

  • engine/ratelimit (adaptive limiter implementation + interfaces). A reduced diagnostic view is now exposed via engine.LimiterSnapshot fields on the facade Snapshot(). Existing consumers should remove imports of engine/ratelimit; no direct replacement API is required.

Internal-only (subject to change without notice):

  • engine/internal/pipeline (multi-stage orchestration, retries, backpressure)
  • engine/internal/* (crawler, processor, downloader/assets, telemetry subsystem wiring, test utilities)

The former public engine/pipeline package has been fully removed. All orchestration now occurs behind the facade; direct pipeline construction and tests were migrated internally to preserve behavior and coverage.

Stability Policy

See API_STABILITY.md for detailed stability tiers. In summary:

  • Facade lifecycle (New, Start, Stop, Snapshot) is Stable.
  • Core worker sizing & rate/resource toggle fields in Config are Stable.
  • Resume, asset policy, metrics backend knobs are Experimental (shape may evolve).
  • Internal packages provide no compatibility guarantees.

Testing Strategy

Behavioral and stress tests for backpressure, graceful shutdown, metrics aggregation, rate limiting feedback, and asset strategy integration reside under engine/internal/pipeline/*_test.go to validate invariants while keeping implementation private. Facade integration tests (e.g. engine_integration_test.go, resume_integration_test.go) ensure public contract correctness.

Telemetry & Observability

The engine wires an adaptive tracer, metrics provider (Prometheus or OpenTelemetry), event bus, and health evaluator. Policy-driven thresholds (failure ratios, probe TTLs, resource backlog) are configurable via UpdateTelemetryPolicy and reflected in HealthSnapshot plus metrics gauges.

Rationale for Internalization

Eliminating the public pipeline entry:

  1. Prevents accidental tight coupling to orchestration internals.
  2. Enables iterative evolution (stage composition, concurrency control, retry semantics) without breaking downstream users.
  3. Simplifies API surface and documentation for the first tagged release (v0.1.0).

Regenerating API Report

Run make api-report to rebuild API_REPORT.md (uses the tools/apireport module) enumerating exported symbols by stability tier. Internal packages (engine/internal/*) are excluded.


This README reflects the post-internalization architecture and will evolve ahead of the v0.1.0 tag.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AssetAction

type AssetAction struct {
	Ref  AssetRef
	Mode AssetMode
}

AssetAction couples a reference with a decided handling mode.

type AssetEvent

type AssetEvent struct {
	Type          string        // e.g. asset_download, asset_stage_error, asset_rewrite
	URL           string        // asset URL (where applicable)
	Stage         string        // discover|decide|execute|rewrite
	BytesIn       int           // pre-optimization bytes
	BytesOut      int           // post-optimization bytes
	Duration      time.Duration // operation latency (download/optimize)
	Error         string        // error message if any
	Optimizations []string      // optimization identifiers applied
}

AssetEvent represents a lifecycle occurrence for observability.

type AssetEventPublisher

type AssetEventPublisher interface{ Publish(AssetEvent) }

AssetEventPublisher publishes events (non-blocking behavior recommended).

type AssetMetrics

type AssetMetrics struct {
	// contains filtered or unexported fields
}

AssetMetrics holds counters for asset processing lifecycle.

type AssetMetricsSnapshot

type AssetMetricsSnapshot struct {
	Discovered      int64
	Selected        int64
	Skipped         int64
	Downloaded      int64
	Failed          int64
	Inlined         int64
	Optimized       int64
	BytesIn         int64
	BytesOut        int64
	RewriteFailures int64
}

Snapshot returns immutable view for assertions / reporting.

type AssetMode

type AssetMode int

AssetMode describes the handling decision for an asset.

const (
	AssetModeDownload AssetMode = iota
	AssetModeSkip
	AssetModeInline
	AssetModeRewrite
)

type AssetPolicy

type AssetPolicy struct {
	Enabled        bool
	MaxBytes       int64
	MaxPerPage     int
	InlineMaxBytes int64
	Optimize       bool
	RewritePrefix  string
	AllowTypes     []string
	BlockTypes     []string
	MaxConcurrent  int // Iteration 7: parallel Execute worker count (>=1). 0 => auto
}

AssetPolicy configures the asset subsystem when enabled. Iteration 1 surface; enforcement & validation logic comes in later iterations.

func (AssetPolicy) Validate

func (p AssetPolicy) Validate() error

Validation placeholder: ensure rewrite prefix has leading & trailing slash semantics.

type AssetRef

type AssetRef struct {
	URL      string
	Type     string // e.g. img, script, stylesheet
	Attr     string // attribute name (src, href, data-src)
	Original string // original raw attribute value
}

AssetRef represents a discovered asset reference inside a page.

type AssetStrategy

type AssetStrategy interface {
	Discover(ctx context.Context, page *engmodels.Page) ([]AssetRef, error)
	Decide(ctx context.Context, refs []AssetRef, policy AssetPolicy) ([]AssetAction, error)
	Execute(ctx context.Context, actions []AssetAction, policy AssetPolicy) ([]MaterializedAsset, error)
	Rewrite(ctx context.Context, page *engmodels.Page, assets []MaterializedAsset, policy AssetPolicy) (*engmodels.Page, error)
	Name() string
}

AssetStrategy defines the pluggable asset handling pipeline lifecycle.

type Config

type Config struct {
	// DiscoveryWorkers controls the concurrency of the seed/url discovery stage.
	// Experimental: May be folded into a single unified worker setting.
	DiscoveryWorkers int
	// ExtractionWorkers controls HTML extraction / parsing concurrency.
	// Experimental.
	ExtractionWorkers int
	// ProcessingWorkers controls post-extraction processing (enrichment, classification).
	// Experimental.
	ProcessingWorkers int
	// OutputWorkers controls the number of workers writing results to sinks.
	// Experimental.
	OutputWorkers int
	// BufferSize tunes internal channel buffering between stages.
	// Experimental: Subject to removal if adaptive backpressure is introduced.
	BufferSize int

	// RetryBaseDelay is the initial backoff delay for transient fetch failures.
	// Experimental: Retry model may be replaced by policy struct.
	RetryBaseDelay time.Duration
	// RetryMaxDelay caps the exponential backoff delay.
	// Experimental.
	RetryMaxDelay time.Duration
	// RetryMaxAttempts caps the number of retry attempts for a single fetch.
	// Experimental.
	RetryMaxAttempts int

	// RateLimit configures adaptive per-domain rate limiting.
	// Experimental: Location may change (likely to move fully under ratelimit/).
	RateLimit models.RateLimitConfig

	// Resources configures in-memory caches and spill / checkpoint behavior.
	// Experimental: Will be narrowed to higher-level policy before v1.0; implementation hidden.
	Resources ResourcesConfig

	// Resume enables filtering of already-processed seeds based on a checkpoint file.
	// Experimental: Mechanism & file format may change.
	Resume bool
	// CheckpointPath overrides Resources.CheckpointPath when non-empty.
	// Experimental.
	CheckpointPath string

	// AssetPolicy defines behavior for asset handling (Phase 5D).
	// Experimental: Entire asset subsystem is under active iteration.
	AssetPolicy AssetPolicy

	// MetricsEnabled toggles metrics collection / instrumentation.
	// Experimental: May be replaced by a Telemetry struct.
	MetricsEnabled bool
	// PrometheusListenAddr optional HTTP listen address for a Prometheus scrape endpoint.
	// Experimental: CLI layer may become the canonical place to expose endpoints.
	PrometheusListenAddr string
	// MetricsBackend selects metrics implementation: "prom" (default), "otel", or "noop".
	// Experimental: Backend selection mechanism may change.
	MetricsBackend string
}

Config is the public configuration surface for the Engine facade. Experimental: Field set, names, and semantics may change before v1.0. Most fields are pass-through tuning knobs for underlying subsystems and will be reviewed for reduction / consolidation ahead of a stable baseline.

func Defaults

func Defaults() Config

Defaults returns a Config with reasonable defaults. Defaults returns a Config with conservative starting values. Experimental: Returned default values may be tuned between minor versions pre-v1.

type DefaultAssetStrategy

type DefaultAssetStrategy struct {
	// contains filtered or unexported fields
}

DefaultAssetStrategy implements AssetStrategy with instrumentation hooks.

func (*DefaultAssetStrategy) Decide

func (s *DefaultAssetStrategy) Decide(ctx context.Context, refs []AssetRef, policy AssetPolicy) ([]AssetAction, error)

func (*DefaultAssetStrategy) Discover

func (s *DefaultAssetStrategy) Discover(ctx context.Context, page *engmodels.Page) ([]AssetRef, error)

Discover parses the HTML and extracts candidate asset references.

func (*DefaultAssetStrategy) Execute

func (s *DefaultAssetStrategy) Execute(ctx context.Context, actions []AssetAction, policy AssetPolicy) ([]MaterializedAsset, error)

func (*DefaultAssetStrategy) Name

func (s *DefaultAssetStrategy) Name() string

func (*DefaultAssetStrategy) Rewrite

func (s *DefaultAssetStrategy) Rewrite(ctx context.Context, page *engmodels.Page, assets []MaterializedAsset, policy AssetPolicy) (*engmodels.Page, error)

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine composes all subsystems behind a single facade. Stable: Core lifecycle methods (Start, Stop, Snapshot, Policy, UpdateTelemetryPolicy) are committed to backwards compatible behavior after v1.0; until then only additive changes should occur.

func New

func New(cfg Config, opts ...optionFn) (*Engine, error)

New constructs an Engine with supplied configuration. Functional options were removed (previously ...Option) during Wave 3 pruning; callers now configure exclusively via the Config struct.

func NewWithStrategies

func NewWithStrategies(cfg Config, strategies EngineStrategies, opts ...optionFn) (*Engine, error)

NewWithStrategies creates an engine with custom business logic strategies. Experimental: Construction path likely to change once strategy integration lands.

func (*Engine) AssetEvents

func (e *Engine) AssetEvents() []AssetEvent

AssetEvents returns a snapshot copy of collected asset events. Experimental: Event model & buffering policy may change or become streaming.

func (*Engine) AssetMetricsSnapshot

func (e *Engine) AssetMetricsSnapshot() AssetMetricsSnapshot

AssetMetricsSnapshot returns current aggregated counters (zero-value if strategy disabled). Experimental: Asset subsystem instrumentation may change drastically pre-v1.0.

func (*Engine) HealthSnapshot

func (e *Engine) HealthSnapshot(ctx context.Context) telemetryhealth.Snapshot

HealthSnapshot evaluates (or returns cached) subsystem health. Zero-value if disabled. Experimental: Health snapshot structure & evaluation cadence may change.

func (*Engine) MetricsHandler

func (e *Engine) MetricsHandler() http.Handler

MetricsHandler returns the HTTP handler for metrics exposition (Prometheus backend only). Returns nil if metrics disabled or backend does not provide an HTTP handler.

func (*Engine) Policy

func (e *Engine) Policy() TelemetryPolicy

func (*Engine) RegisterEventObserver

func (e *Engine) RegisterEventObserver(obs EventObserver)

RegisterEventObserver adds an observer invoked synchronously for each internal telemetry event. Safe for concurrent use. No-op if nil provided. Experimental: May gain filtering / async delivery options pre-v1.0.

func (*Engine) Snapshot

func (e *Engine) Snapshot() Snapshot

Snapshot returns a unified state view. Stable: See Snapshot field stability guarantees.

func (*Engine) Start

func (e *Engine) Start(ctx context.Context, seeds []string) (<-chan *engmodels.CrawlResult, error)

Start begins processing of the provided seed URLs and returns a read-only results channel. Stable: Contract (non-nil channel on success, error on invalid state) will hold after v1.0.

func (*Engine) Stop

func (e *Engine) Stop() error

Stop gracefully stops the engine and underlying components. Stable: Idempotent; safe to call multiple times after v1.0.

func (*Engine) UpdateTelemetryPolicy

func (e *Engine) UpdateTelemetryPolicy(p *TelemetryPolicy)

UpdateTelemetryPolicy atomically swaps the active policy. Nil input resets to defaults. Experimental: May relocate behind a dedicated telemetry subpackage pre-v1.0. Safe for concurrent use; probes pick up new thresholds on next evaluation cycle.

type EngineStrategies

type EngineStrategies struct {
	Fetcher     interface{} // Placeholder for crawler.Fetcher interface
	Processors  interface{} // Placeholder for []processor.Processor slice
	OutputSinks interface{} // Placeholder for []output.OutputSink slice
}

EngineStrategies defines business logic components for dependency injection. Experimental: Placeholder for future strategy extension wiring; not yet integrated.

type EventBusPolicy

type EventBusPolicy = inttelempolicy.EventBusPolicy

type EventObserver

type EventObserver func(ev TelemetryEvent)

EventObserver receives TelemetryEvent notifications. Experimental: May gain filtering or asynchronous delivery options.

type Fetcher

type Fetcher interface {
	Fetch(ctx context.Context, url string) (*engmodels.Page, error)
}

Fetcher defines how pages are fetched. Experimental: May add richer metadata result.

type HealthPolicy

type HealthPolicy = inttelempolicy.HealthPolicy

type LimiterDomainState

type LimiterDomainState struct {
	Domain       string    `json:"domain"`
	FillRate     float64   `json:"fill_rate"`
	CircuitState string    `json:"circuit_state"`
	LastActivity time.Time `json:"last_activity"`
}

LimiterDomainState summarizes recent domain-level adaptive state. Experimental: May be removed or replaced with aggregated counters only.

type LimiterSnapshot

type LimiterSnapshot struct {
	TotalRequests    int64                `json:"total_requests"`
	Throttled        int64                `json:"throttled"`
	Denied           int64                `json:"denied"`
	OpenCircuits     int64                `json:"open_circuits"`
	HalfOpenCircuits int64                `json:"half_open_circuits"`
	Domains          []LimiterDomainState `json:"domains,omitempty"`
}

LimiterSnapshot is a public, reduced view of the internal adaptive rate limiter state. Experimental: Field set may shrink prior to v1.0; external consumers should treat as best-effort diagnostics (subject to consolidation under a future telemetry facade).

type MaterializedAsset

type MaterializedAsset struct {
	Ref           AssetRef
	Bytes         []byte
	Hash          string   // sha256
	Path          string   // stable relative path
	Size          int      // size after optimization (if any)
	Optimizations []string // applied optimization identifiers
}

MaterializedAsset represents an asset after execution (download / inline / optimization).

type OutputSink

type OutputSink interface {
	Name() string
	Write(ctx context.Context, page *engmodels.Page) error
	Flush(ctx context.Context) error
	Close(ctx context.Context) error
}

OutputSink consumes processed pages. Experimental: Flush/Close semantics may narrow; prefer facade-managed lifecycle.

type Processor

type Processor interface {
	Process(ctx context.Context, page *engmodels.Page) (*engmodels.Page, error)
}

Processor transforms a fetched page into enriched content. Experimental: May gain streaming hooks.

type ResourceSnapshot

type ResourceSnapshot struct {
	CacheEntries     int `json:"cache_entries"`
	SpillFiles       int `json:"spill_files"`
	InFlight         int `json:"in_flight"`
	CheckpointQueued int `json:"checkpoint_queued"`
}

ResourceSnapshot summarizes resource manager internal counters. Experimental: Field set & naming may change pre-v1.0.

type ResourcesConfig

type ResourcesConfig struct {
	CacheCapacity      int
	MaxInFlight        int
	SpillDirectory     string
	CheckpointPath     string
	CheckpointInterval time.Duration
}

ResourcesConfig is the public facade configuration for resource management. Experimental: Shape and semantics may change before v1.0. Mirrors internal/resources.Config but kept separate to permit future reduction.

type ResumeSnapshot

type ResumeSnapshot struct {
	SeedsBefore int   `json:"seeds_before"`
	Skipped     int64 `json:"skipped"`
}

ResumeSnapshot contains resume filter statistics. Experimental: Mechanism & counters may change; only present when resume enabled.

type Snapshot

type Snapshot struct {
	StartedAt time.Time                    `json:"started_at"`
	Uptime    time.Duration                `json:"uptime"`
	Pipeline  *engpipeline.PipelineMetrics `json:"pipeline,omitempty"`
	Limiter   *LimiterSnapshot             `json:"limiter,omitempty"`
	Resources *ResourceSnapshot            `json:"resources,omitempty"`
	Resume    *ResumeSnapshot              `json:"resume,omitempty"`
}

Snapshot is a unified view of engine state. Stable: Field additions are allowed; existing fields retain semantics.

type TelemetryEvent

type TelemetryEvent struct {
	Time     time.Time              `json:"time"`
	Category string                 `json:"category"`
	Type     string                 `json:"type"`
	Severity string                 `json:"severity,omitempty"`
	TraceID  string                 `json:"trace_id,omitempty"`
	SpanID   string                 `json:"span_id,omitempty"`
	Labels   map[string]string      `json:"labels,omitempty"`
	Fields   map[string]interface{} `json:"fields,omitempty"`
}

TelemetryEvent is a reduced, stable event representation for external observers. Experimental: Field set may evolve (additive) pre-v1.0. Replaces direct access to internal event bus over time (Phase C6).

type TelemetryOptions

type TelemetryOptions struct {
	EnableMetrics   bool
	EnableTracing   bool
	EnableEvents    bool
	EnableHealth    bool
	MetricsBackend  string
	SamplingPercent float64
}

TelemetryOptions describes which telemetry subsystems are enabled plus tuning knobs. Experimental: Shape may change (e.g., embedded policy structs) before v1.0.

type TelemetryPolicy

type TelemetryPolicy = inttelempolicy.TelemetryPolicy

Policy returns the current telemetry policy snapshot. Experimental: Policy struct shape & semantics may evolve pre-v1.0. Never returns nil. Re-export telemetry policy types (C6 step 2b): stable facade surface while implementation internal.

func DefaultTelemetryPolicy

func DefaultTelemetryPolicy() TelemetryPolicy

DefaultTelemetryPolicy returns the default normalized telemetry policy (wrapper around internal).

type TracingPolicy

type TracingPolicy = inttelempolicy.TracingPolicy

Directories

Path Synopsis
Package config provides higher-level composition helpers for engine component policies plus runtime configuration facilities.
Package config provides higher-level composition helpers for engine component policies plus runtime configuration facilities.
internal
telemetry

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL