forgeext

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2026 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const Version = "0.1.0"

Version is the fabriq forge extension version.

Variables

This section is empty.

Functions

func LoadConfig

func LoadConfig(cm forge.ConfigManager, prefix string) fabriq.Config

LoadConfig builds a fabriq.Config from a forge ConfigManager. prefix is "" for the top-level key contract (cmd/fabriq serve) or "extensions.fabriq." for the first-class host-app convention. Relocated and parameterized from cmd/fabriq's loadFabriqConfig; the elasticsearch.addrs GetStringSlice handling is preserved (confy does not split a comma env string into a Go slice).

Types

type Config

type Config struct {
	Fabriq            fabriq.Config
	RunWorker         bool
	ReconcileInterval time.Duration
	// BlobGCGrace protects freshly-created CAS entries and orphan bytes from
	// collection for this window. Zero falls back to 1h at run time.
	BlobGCGrace time.Duration
	// Embedder enables the embedding worker: each write to an entity with an
	// EmbedSpec is embedded + vector-upserted asynchronously. Nil = disabled.
	Embedder agent.Embedder
	// Summarizer enables the distillation worker: each write to an entity with
	// a DistillSpec is summarized into its digest tree asynchronously (debounced,
	// per-tenant single-flight). Nil = distillation disabled.
	Summarizer agent.Summarizer
	// Guard is the optional PII/guardrail seam for distillation (nil = identity).
	Guard agent.Guard
	// DistillFailOpenGuard flips the guard from fail-closed (default) to fail-open.
	DistillFailOpenGuard bool
	// DistillRecipeVersion salts the digest ContentHash; bump to rebuild the tree.
	DistillRecipeVersion string
	// DistillDebounce is the per-tenant coalescing window for L0+rollup sweeps.
	DistillDebounce time.Duration
}

Config is the fabriq forge extension's configuration: the data-fabric config plus worker knobs. Build it with options; the extension overlays values from the config manager under extensions.fabriq.* at Register (options win).

type Extension

type Extension struct {
	forge.BaseExtension
	// contains filtered or unexported fields
}

Extension exposes the fabriq data fabric as a first-class Forge extension: the facade as a DI service (alias "fabriq"), auto health, fabriq's migrations (Task 3), and an opt-in background worker (Task 4).

func New

func New(reg *registry.Registry, opts ...Option) *Extension

New creates a new Extension with the given registry and options.

func (*Extension) Dependencies

func (e *Extension) Dependencies() []string

func (*Extension) Description

func (e *Extension) Description() string

func (*Extension) Fabriq

func (e *Extension) Fabriq() *fabriq.Fabriq

Fabriq returns the opened facade (nil before Start).

func (*Extension) Health

func (e *Extension) Health(ctx context.Context) error

Health implements forge.Extension. Pings the Postgres store.

func (*Extension) Migrate

func (e *Extension) Migrate(ctx context.Context) (*forge.MigrationResult, error)

Migrate runs fabriq's pending migrations forward (forge MigratableExtension).

It opens a fresh grove Orchestrator against the primary DSN, runs all pending migrations, and translates the grove MigrateResult into a forge.MigrationResult.

func (*Extension) MigrationStatus

func (e *Extension) MigrationStatus(ctx context.Context) ([]*forge.MigrationGroupInfo, error)

MigrationStatus returns the current state of fabriq's migrations grouped by migration group (forge MigratableExtension).

grove's Status method returns []*migrate.GroupStatus, each with Applied and Pending []*migrate.MigrationStatus slices. This is translated to the forge type hierarchy: one *forge.MigrationGroupInfo per grove GroupStatus, with *forge.MigrationInfo entries in Applied and Pending.

func (*Extension) Name

func (e *Extension) Name() string

func (*Extension) Register

func (e *Extension) Register(app forge.App) error

Register implements forge.Extension. It MUST call e.BaseExtension.Register first.

func (*Extension) Rollback

func (e *Extension) Rollback(ctx context.Context) (*forge.MigrationResult, error)

Rollback rolls back the last applied fabriq migration (forge MigratableExtension).

grove's Rollback rolls back exactly one migration (the most recently applied in the group). RolledBack will be 0 or 1.

func (*Extension) Run

func (e *Extension) Run(ctx context.Context) error

Run implements forge.RunnableExtension: supervise the leader-elected relay until shutdown. If RunWorker is false this is a no-op.

func (*Extension) Shutdown

func (e *Extension) Shutdown(ctx context.Context) error

Shutdown implements forge.RunnableExtension: SIGTERM drain. If RunWorker is false (or Run was never called), this is a no-op.

func (*Extension) Start

func (e *Extension) Start(ctx context.Context) error

Start implements forge.Extension. Opens the fabriq facade.

func (*Extension) Stop

func (e *Extension) Stop(_ context.Context) error

Stop implements forge.Extension. Closes the fabriq facade.

func (*Extension) Stores

func (e *Extension) Stores() *fabriq.Stores

Stores returns the opened adapters (nil before Start). The gateway extension reads Stores().Redis to build the live-query cluster transport.

func (*Extension) Version

func (e *Extension) Version() string

type GatewayConfig

type GatewayConfig struct {
	// GatewayID is this gateway's cluster identity (deltas are routed back to it
	// over lq:delta:{id}). Defaults to "gw-<host>-<pid>".
	GatewayID string
	// BasePath is the SSE endpoint path; the WebSocket endpoint is BasePath+"/ws".
	// Defaults to "/api/v1/live".
	BasePath string
	// HeartbeatTTL is the membership liveness window of the Redis cluster
	// transport (default 6s). Shards must refresh within it.
	HeartbeatTTL time.Duration
	// SSE and WS tune the per-connection delivery loops (heartbeat, write
	// deadlines/watchdog).
	SSE gateway.SSEOptions
	WS  gateway.WSOptions
	// RouteOptions are forwarded verbatim to router.SSE/router.WebSocket, so the
	// host app attaches its own auth middleware and AsyncAPI/OpenAPI schemas
	// (e.g. pkgAuth.RequirePermission(...), forge.WithSSEMessage(...)). fabriq
	// stays auth-scheme-agnostic.
	RouteOptions []forge.RouteOption
}

GatewayConfig configures the standalone live-query gateway tier: the edge that terminates client SSE/WebSocket connections and routes them over the sharded control/delta protocol to the matcher shards.

type GatewayExtension

type GatewayExtension struct {
	forge.BaseExtension
	// contains filtered or unexported fields
}

GatewayExtension exposes the fabriq live-query gateway as a Forge extension: it builds a cluster.Gateway over the fabriq facade's Redis transport, runs its demux pump, and registers the SSE + WebSocket controllers. It depends on the "fabriq" extension (it reads its Stores().Redis), so it starts after it.

func NewGateway

func NewGateway(fab *Extension, opts ...GatewayOption) *GatewayExtension

NewGateway builds the gateway extension wired to a started fabriq Extension.

func (*GatewayExtension) Dependencies

func (g *GatewayExtension) Dependencies() []string

func (*GatewayExtension) Description

func (g *GatewayExtension) Description() string

func (*GatewayExtension) Name

func (g *GatewayExtension) Name() string

func (*GatewayExtension) Register

func (g *GatewayExtension) Register(app forge.App) error

Register registers the SSE and WebSocket controllers. Their handlers resolve the backend lazily (it is built in Start), the same lazy-DI pattern the fabriq facade uses — safe because requests only arrive after Start.

func (*GatewayExtension) Start

func (g *GatewayExtension) Start(_ context.Context) error

Start builds the cluster.Gateway over the facade's Redis transport and runs its demux pump.

func (*GatewayExtension) Stop

func (g *GatewayExtension) Stop(ctx context.Context) error

Stop stops the gateway demux pump.

func (*GatewayExtension) Version

func (g *GatewayExtension) Version() string

type GatewayOption

type GatewayOption func(*GatewayConfig)

GatewayOption is a functional option for GatewayConfig.

func WithGatewayBasePath

func WithGatewayBasePath(p string) GatewayOption

WithGatewayBasePath sets the SSE endpoint path (WS is BasePath+"/ws").

func WithGatewayHeartbeatTTL

func WithGatewayHeartbeatTTL(d time.Duration) GatewayOption

WithGatewayHeartbeatTTL sets the cluster membership liveness window.

func WithGatewayID

func WithGatewayID(id string) GatewayOption

WithGatewayID sets the gateway's cluster identity.

func WithGatewayRouteOptions

func WithGatewayRouteOptions(opts ...forge.RouteOption) GatewayOption

WithGatewayRouteOptions appends route options forwarded to the SSE/WS routes (auth, AsyncAPI/OpenAPI documentation).

func WithGatewaySSEHeartbeat

func WithGatewaySSEHeartbeat(d time.Duration) GatewayOption

WithGatewaySSEHeartbeat sets the SSE keep-alive interval.

func WithGatewayWriteTimeout

func WithGatewayWriteTimeout(d time.Duration) GatewayOption

WithGatewayWriteTimeout bounds a single SSE/WS write before the connection is torn down (the client reconnects to a fresh snapshot).

type Option

type Option func(*Config)

Option is a functional option for Config.

func WithBlobGCGrace

func WithBlobGCGrace(d time.Duration) Option

WithBlobGCGrace sets the grace window before an unreferenced CAS entry or orphan byte becomes GC-eligible. Defaults to 1h when zero.

func WithConfig

func WithConfig(c fabriq.Config) Option

WithConfig sets the underlying fabriq data-fabric configuration.

func WithCustomAppliers

func WithCustomAppliers(a ...projection.CustomApplier) Option

WithCustomAppliers appends consumer-supplied projection appliers to the fabriq config. They are unioned after the built-in declarative applier for their Target and MUST be pure (see projection.CustomApplier).

func WithDistillDebounce

func WithDistillDebounce(d time.Duration) Option

WithDistillDebounce sets the per-tenant coalescing window for L0+rollup sweeps.

func WithDistillFailOpenGuard

func WithDistillFailOpenGuard(v bool) Option

WithDistillFailOpenGuard flips the guard from fail-closed (default) to fail-open.

func WithDistillRecipeVersion

func WithDistillRecipeVersion(v string) Option

WithDistillRecipeVersion salts the digest ContentHash; bump to rebuild the tree.

func WithEmbedder

func WithEmbedder(e agent.Embedder) Option

WithEmbedder enables the embedding worker: each write to an entity with an EmbedSpec is embedded + vector-upserted asynchronously. Nil = disabled.

func WithGuard

func WithGuard(g agent.Guard) Option

WithGuard sets the optional PII/guardrail seam for distillation.

func WithReconcileInterval

func WithReconcileInterval(d time.Duration) Option

WithReconcileInterval sets the interval at which the background worker reconciles projection state.

func WithSummarizer

func WithSummarizer(s agent.Summarizer) Option

WithSummarizer enables the distillation worker: each write to an entity with a DistillSpec is summarized into its digest tree asynchronously. Nil = disabled.

func WithWorker

func WithWorker(on bool) Option

WithWorker enables or disables the background reconcile worker.

Directories

Path Synopsis
Package agentmcp exposes the fabriq agent toolkit over MCP (JSON-RPC 2.0).
Package agentmcp exposes the fabriq agent toolkit over MCP (JSON-RPC 2.0).
Package shieldguard adapts github.com/xraph/shield to the agent.Guard seam.
Package shieldguard adapts github.com/xraph/shield to the agent.Guard seam.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL