extractor

package
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 29, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// EventStarted records that the runtime began extracting one transcript range.
	EventStarted = "memory.extractor.started"
	// EventCompleted records that extraction and inbox production finished.
	EventCompleted = "memory.extractor.completed"
	// EventFailed records extraction, inbox, decode, or controller handoff failures.
	EventFailed = "memory.extractor.failed"
	// EventCoalesced records bounded queue merging for one session.
	EventCoalesced = "memory.extractor.coalesced"
	// EventDropped records extractor work dropped before provider execution.
	EventDropped = "memory.extractor.dropped"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumeResult

type ConsumeResult struct {
	Files     int
	Proposed  int
	Failed    int
	Decisions []memcontract.Decision
	Failures  []string
}

ConsumeResult summarizes one inbox pass.

type ConsumerOption

type ConsumerOption func(*InboxConsumer)

ConsumerOption customizes inbox consumption.

func WithConsumerClock

func WithConsumerClock(now func() time.Time) ConsumerOption

WithConsumerClock injects deterministic time.

func WithConsumerEventSink

func WithConsumerEventSink(sink EventSink) ConsumerOption

WithConsumerEventSink records consumer telemetry.

func WithConsumerFailurePath

func WithConsumerFailurePath(path string) ConsumerOption

WithConsumerFailurePath overrides the default <root>/_system/extractor/failures directory.

func WithConsumerInboxPath

func WithConsumerInboxPath(path string) ConsumerOption

WithConsumerInboxPath overrides the default <root>/_inbox directory.

func WithConsumerLogger

func WithConsumerLogger(logger *slog.Logger) ConsumerOption

WithConsumerLogger configures warning output.

type Event

type Event struct {
	Op          string
	Turn        memcontract.TurnRecord
	SessionID   string
	WorkspaceID string
	AgentID     string
	ActorKind   string
	DecisionID  string
	TargetID    string
	Metadata    map[string]string
	Error       string
	At          time.Time
}

Event is redaction-safe extractor telemetry persisted into memory_events.

func (Event) Normalize

func (e Event) Normalize(now func() time.Time) Event

Normalize returns a copy with canonical identity fields filled from the turn.

type EventSink

type EventSink interface {
	RecordExtractorEvent(context.Context, Event) error
}

EventSink persists extractor telemetry.

type InboxConsumer

type InboxConsumer struct {
	// contains filtered or unexported fields
}

InboxConsumer drains daemon-owned extractor inbox files through the write controller.

func NewInboxConsumer

func NewInboxConsumer(root string, sink ProposalSink, opts ...ConsumerOption) (*InboxConsumer, error)

NewInboxConsumer constructs a FIFO inbox consumer.

func (*InboxConsumer) ConsumeOnce

func (c *InboxConsumer) ConsumeOnce(ctx context.Context) (ConsumeResult, error)

ConsumeOnce processes all currently visible JSONL files in FIFO order.

type Option

type Option func(*Runtime)

Option customizes the extractor runtime.

func WithClock

func WithClock(now func() time.Time) Option

WithClock injects deterministic time for tests.

func WithCoalesceMax

func WithCoalesceMax(limit int) Option

WithCoalesceMax configures the hard queue merge limit.

func WithEventSink

func WithEventSink(sink EventSink) Option

WithEventSink records extractor telemetry.

func WithInboxPath

func WithInboxPath(path string) Option

WithInboxPath overrides the default <root>/_inbox directory.

func WithLogger

func WithLogger(logger *slog.Logger) Option

WithLogger configures warning output.

func WithQueueCapacity added in v0.0.3

func WithQueueCapacity(limit int) Option

WithQueueCapacity bounds concurrent provider-backed extractor sessions.

func WithThrottleFlushWait added in v0.0.3

func WithThrottleFlushWait(wait time.Duration) Option

WithThrottleFlushWait overrides the idle flush wait for tests and tightly controlled runtimes.

func WithThrottleTurns added in v0.0.3

func WithThrottleTurns(limit int) Option

WithThrottleTurns coalesces same-session bursts before launching another extraction.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer writes extractor candidates into the daemon-owned inbox.

func NewProducer

func NewProducer(root string, now func() time.Time, opts ...ProducerOption) (*Producer, error)

NewProducer constructs an inbox producer rooted at the memory directory.

func (*Producer) Write

func (p *Producer) Write(
	ctx context.Context,
	turn memcontract.TurnRecord,
	candidates []memcontract.Candidate,
) (string, int, error)

Write persists one JSONL inbox file for a completed extractor turn.

type ProducerOption

type ProducerOption func(*Producer)

ProducerOption customizes inbox production.

func WithProducerInboxPath

func WithProducerInboxPath(path string) ProducerOption

WithProducerInboxPath overrides the default <root>/_inbox directory.

type ProposalSink

type ProposalSink interface {
	ProposeCandidate(context.Context, memcontract.Candidate) (memcontract.Decision, error)
}

ProposalSink is the controller-backed handoff used by the inbox consumer.

type Runtime

type Runtime struct {
	// contains filtered or unexported fields
}

Runtime owns asynchronous transcript extraction and daemon inbox production.

func NewRuntime

func NewRuntime(
	ctx context.Context,
	root string,
	extractor memcontract.Extractor,
	opts ...Option,
) (*Runtime, error)

NewRuntime constructs a daemon-owned extractor runtime.

func (*Runtime) Close

func (r *Runtime) Close(ctx context.Context) error

Close rejects new work, joins workers, and flushes the extractor.

func (*Runtime) Drain

func (r *Runtime) Drain(ctx context.Context) error

Drain waits for the current queue to empty and asks the extractor to flush.

func (*Runtime) Enqueue

func (r *Runtime) Enqueue(ctx context.Context, turn memcontract.TurnRecord) error

Enqueue requests extraction for one transcript turn using one in-flight plus one queued item per session.

func (*Runtime) HandleSessionMessagePersisted

func (r *Runtime) HandleSessionMessagePersisted(
	ctx context.Context,
	payload hookspkg.SessionMessagePersistedPayload,
) error

HandleSessionMessagePersisted converts the durable-message hook into an extractor turn.

func (*Runtime) RecordToolWrite

func (r *Runtime) RecordToolWrite(sessionID string, turnSeq int64)

RecordToolWrite marks an explicit root-agent memory tool write for turn-level mutual exclusion.

func (*Runtime) Stats

func (r *Runtime) Stats() RuntimeStats

Stats returns current queue counters without blocking workers.

type RuntimeStats

type RuntimeStats struct {
	QueuedSessions         int
	InFlightSessions       int
	ActiveProviderSessions int
	SkippedTurns           int64
	DroppedTurns           int64
	CoalescedTurns         int64
	BackpressuredSessions  int64
	Closed                 bool
}

RuntimeStats exposes bounded operational state for daemon status APIs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL