Documentation
¶
Index ¶
- Constants
- type ConsumeResult
- type ConsumerOption
- type Event
- type EventSink
- type InboxConsumer
- type Option
- type Producer
- type ProducerOption
- type ProposalSink
- type Runtime
- func (r *Runtime) Close(ctx context.Context) error
- func (r *Runtime) Drain(ctx context.Context) error
- func (r *Runtime) Enqueue(ctx context.Context, turn memcontract.TurnRecord) error
- func (r *Runtime) HandleSessionMessagePersisted(ctx context.Context, payload hookspkg.SessionMessagePersistedPayload) error
- func (r *Runtime) RecordToolWrite(sessionID string, turnSeq int64)
- func (r *Runtime) Stats() RuntimeStats
- type RuntimeStats
Constants ¶
const ( // EventStarted records that the runtime began extracting one transcript range. EventStarted = "memory.extractor.started" // EventCompleted records that extraction and inbox production finished. EventCompleted = "memory.extractor.completed" // EventFailed records extraction, inbox, decode, or controller handoff failures. EventFailed = "memory.extractor.failed" // EventCoalesced records bounded queue merging for one session. EventCoalesced = "memory.extractor.coalesced" // EventDropped records a queued extraction range dropped by the hard coalescing cap. EventDropped = "memory.extractor.dropped" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumeResult ¶
type ConsumeResult struct {
Files int
Proposed int
Failed int
Decisions []memcontract.Decision
Failures []string
}
ConsumeResult summarizes one inbox pass.
type ConsumerOption ¶
type ConsumerOption func(*InboxConsumer)
ConsumerOption customizes inbox consumption.
func WithConsumerClock ¶
func WithConsumerClock(now func() time.Time) ConsumerOption
WithConsumerClock injects deterministic time.
func WithConsumerEventSink ¶
func WithConsumerEventSink(sink EventSink) ConsumerOption
WithConsumerEventSink records consumer telemetry.
func WithConsumerFailurePath ¶
func WithConsumerFailurePath(path string) ConsumerOption
WithConsumerFailurePath overrides the default <root>/_system/extractor/failures directory.
func WithConsumerInboxPath ¶
func WithConsumerInboxPath(path string) ConsumerOption
WithConsumerInboxPath overrides the default <root>/_inbox directory.
func WithConsumerLogger ¶
func WithConsumerLogger(logger *slog.Logger) ConsumerOption
WithConsumerLogger configures warning output.
type Event ¶
type Event struct {
Op string
Turn memcontract.TurnRecord
SessionID string
WorkspaceID string
AgentID string
ActorKind string
DecisionID string
TargetID string
Metadata map[string]string
Error string
At time.Time
}
Event is redaction-safe extractor telemetry persisted into memory_events.
type InboxConsumer ¶
type InboxConsumer struct {
// contains filtered or unexported fields
}
InboxConsumer drains daemon-owned extractor inbox files through the write controller.
func NewInboxConsumer ¶
func NewInboxConsumer(root string, sink ProposalSink, opts ...ConsumerOption) (*InboxConsumer, error)
NewInboxConsumer constructs a FIFO inbox consumer.
func (*InboxConsumer) ConsumeOnce ¶
func (c *InboxConsumer) ConsumeOnce(ctx context.Context) (ConsumeResult, error)
ConsumeOnce processes all currently visible JSONL files in FIFO order.
type Option ¶
type Option func(*Runtime)
Option customizes the extractor runtime.
func WithCoalesceMax ¶
WithCoalesceMax configures the hard queue merge limit.
func WithEventSink ¶
WithEventSink records extractor telemetry.
func WithInboxPath ¶
WithInboxPath overrides the default <root>/_inbox directory.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer writes extractor candidates into the daemon-owned inbox.
func NewProducer ¶
NewProducer constructs an inbox producer rooted at the memory directory.
func (*Producer) Write ¶
func (p *Producer) Write( ctx context.Context, turn memcontract.TurnRecord, candidates []memcontract.Candidate, ) (string, int, error)
Write persists one JSONL inbox file for a completed extractor turn.
type ProducerOption ¶
type ProducerOption func(*Producer)
ProducerOption customizes inbox production.
func WithProducerInboxPath ¶
func WithProducerInboxPath(path string) ProducerOption
WithProducerInboxPath overrides the default <root>/_inbox directory.
type ProposalSink ¶
type ProposalSink interface {
ProposeCandidate(context.Context, memcontract.Candidate) (memcontract.Decision, error)
}
ProposalSink is the controller-backed handoff used by the inbox consumer.
type Runtime ¶
type Runtime struct {
// contains filtered or unexported fields
}
Runtime owns asynchronous transcript extraction and daemon inbox production.
func NewRuntime ¶
func NewRuntime( ctx context.Context, root string, extractor memcontract.Extractor, opts ...Option, ) (*Runtime, error)
NewRuntime constructs a daemon-owned extractor runtime.
func (*Runtime) Enqueue ¶
func (r *Runtime) Enqueue(ctx context.Context, turn memcontract.TurnRecord) error
Enqueue requests extraction for one transcript turn using one in-flight plus one queued item per session.
func (*Runtime) HandleSessionMessagePersisted ¶
func (r *Runtime) HandleSessionMessagePersisted( ctx context.Context, payload hookspkg.SessionMessagePersistedPayload, ) error
HandleSessionMessagePersisted converts the durable-message hook into an extractor turn.
func (*Runtime) RecordToolWrite ¶
RecordToolWrite marks an explicit root-agent memory tool write for turn-level mutual exclusion.
func (*Runtime) Stats ¶
func (r *Runtime) Stats() RuntimeStats
Stats returns current queue counters without blocking workers.