Documentation
¶
Overview ¶
package engram defines the core interfaces that developers implement to create components for the bobrapet ecosystem. These interfaces provide a structured, type-safe framework for building everything from simple, single-task jobs to complex, long-running event listeners.
Index ¶
- Constants
- Variables
- func IdempotencyKey(info StoryInfo) (string, error)
- type AudioFrame
- type BatchEngram
- type BinaryFrame
- type ControlDirective
- type ControlDirectiveHandler
- type Engram
- type ExecutionContext
- type Impulse
- type InboundMessage
- type Result
- type Secrets
- type StoryInfo
- type StreamMessage
- type StreamingEngram
- type TransportDescriptor
- type VideoFrame
Constants ¶
const StreamMessageKindData = "data"
StreamMessageKindData marks a StreamMessage as a normal application data packet.
const StreamMessageKindError = "error"
StreamMessageKindError marks a StreamMessage payload as a StructuredError envelope.
const StreamMessageKindHeartbeat = "heartbeat"
StreamMessageKindHeartbeat marks a StreamMessage as a transport liveness heartbeat.
const StreamMessageKindNoop = "noop"
StreamMessageKindNoop marks a StreamMessage as an intentionally empty no-op packet.
Variables ¶
ErrIdempotencyUnavailable indicates that the execution context lacks stable identifiers.
var ErrInvalidStreamMessage = errors.New("invalid stream message")
ErrInvalidStreamMessage reports that a StreamMessage carries an unsupported or ambiguous payload shape.
var ErrSecretExpansionFailed = errors.New("secret expansion failed")
ErrSecretExpansionFailed reports that env/file secret descriptor expansion failed and the caller should decide whether to proceed with partial secrets.
Functions ¶
func IdempotencyKey ¶ added in v0.1.2
IdempotencyKey derives a stable key from the StoryRun and StepRun identity.
Types ¶
type AudioFrame ¶ added in v0.1.2
type AudioFrame struct {
// PCM contains raw little-endian PCM audio bytes for the frame.
PCM []byte
// SampleRateHz is the sampling rate in hertz (for example, 16000).
SampleRateHz int32
// Channels is the number of audio channels in PCM (for example, 1 for mono).
Channels int32
// Codec optionally names the codec when the frame is encoded instead of raw PCM.
Codec string
// Timestamp is the media timeline position for this frame.
Timestamp time.Duration
}
AudioFrame represents PCM audio delivered through the streaming SDK.
type BatchEngram ¶
type BatchEngram[C any, I any] interface { Engram[C] // Process contains the core business logic of the Engram. It is called exactly // once per execution. The SDK provides the typed inputs `I`, and expects a // `Result` in return, which it uses to report the output and status of the // StepRun. An error should be returned for any failures. Process(ctx context.Context, execCtx *ExecutionContext, inputs I) (*Result, error) }
BatchEngram is the interface for components that run as a single, finite task, typically executed as a Kubernetes Job. This is the most common type of Engram, used for data processing, API calls, and other script-like operations.
The generic parameters `C` (configuration) and `I` (inputs) allow for complete type safety from the YAML definition to your Go code.
type BinaryFrame ¶ added in v0.1.2
type BinaryFrame struct {
// Payload carries opaque binary bytes.
Payload []byte
// MimeType identifies the payload media type (for example, "application/octet-stream").
MimeType string
// Timestamp is an optional media timeline position for this frame.
Timestamp time.Duration
}
BinaryFrame represents generic binary payloads exchanged over the streaming SDK.
type ControlDirective ¶ added in v0.1.2
type ControlDirective struct {
// Type identifies the control instruction, such as "start", "stop", or "codec-select".
Type string
// Metadata carries optional directive-specific key/value hints from the connector.
Metadata map[string]string
}
ControlDirective represents a control-plane instruction flowing over the transport connector. Typical directive types include "start", "stop", or "codec-select".
type ControlDirectiveHandler ¶ added in v0.1.2
type ControlDirectiveHandler interface {
// HandleControlDirective processes an inbound directive. Returning a non-nil directive
// sends a response back to the connector. Implementations may return nil to skip replies.
HandleControlDirective(ctx context.Context, directive ControlDirective) (*ControlDirective, error)
}
ControlDirectiveHandler can be implemented by StreamingEngrams that want to react to transport control directives emitted by connectors.
type Engram ¶
type Engram[C any] interface { // Init is called once at the start of an Engram's lifecycle. It is the ideal // place for setting up connections, validating configuration, or performing // any other one-time setup. Init(ctx context.Context, config C, secrets *Secrets) error }
Engram is the foundational interface for all executable components in bobrapet. It establishes a common initialization contract.
The generic parameter `C` represents a developer-defined struct for static configuration. The SDK runtime will automatically unmarshal the `with` block from the Engram or Impulse resource's YAML definition into this struct and pass it to the `Init` method.
type ExecutionContext ¶
type ExecutionContext struct {
// contains filtered or unexported fields
}
ExecutionContext provides metadata and utilities for a single execution of an Engram. It serves as a dependency injection container for services provided by the SDK runtime, such as logging, tracing, and information about the current Story. This context is passed to the `Process` method of a `BatchEngram`.
func NewExecutionContext ¶
func NewExecutionContext(logger *slog.Logger, tracer trace.Tracer, storyInfo StoryInfo) *ExecutionContext
NewExecutionContext is a constructor used internally by the SDK runtime.
func NewExecutionContextWithCELContext ¶ added in v0.1.2
func NewExecutionContextWithCELContext( logger *slog.Logger, tracer trace.Tracer, storyInfo StoryInfo, celContext map[string]any, ) *ExecutionContext
NewExecutionContextWithCELContext is a constructor that also attaches CEL context data.
func (*ExecutionContext) CELContext ¶ added in v0.1.2
func (e *ExecutionContext) CELContext() map[string]any
CELContext returns a defensive copy of the CEL context map provided by the controller (inputs + steps) so callers cannot mutate SDK-owned runtime state.
func (*ExecutionContext) IdempotencyKey ¶ added in v0.1.2
func (e *ExecutionContext) IdempotencyKey() (string, error)
IdempotencyKey returns a stable key for the current execution context.
func (*ExecutionContext) Logger ¶
func (e *ExecutionContext) Logger() *slog.Logger
Logger returns the slog.Logger configured for this execution context.
This logger should be used for all engram logging to ensure consistent formatting and integration with the SDK's observability stack.
func (*ExecutionContext) StoryInfo ¶
func (e *ExecutionContext) StoryInfo() StoryInfo
StoryInfo returns metadata about the currently executing Story and Step.
This includes the Story name, StoryRun ID, Step name, and StepRun ID, which are useful for logging, tracing, and correlation.
func (*ExecutionContext) Tracer ¶
func (e *ExecutionContext) Tracer() trace.Tracer
Tracer returns the OpenTelemetry tracer for this execution context.
Use this tracer to create spans for internal operations, enabling distributed tracing across the execution pipeline.
type Impulse ¶
type Impulse[C any] interface { Engram[C] // Run is the main entry point for the Impulse's long-running process. The SDK // calls this method after `Init`, providing a pre-configured Kubernetes client // for interacting with bobrapet resources (like creating StoryRuns). The method // should block until the Impulse's work is complete or the context is canceled. Run(ctx context.Context, client *k8s.Client) error }
Impulse is the interface for long-running components that act as triggers for Stories. They typically listen for external events (e.g., via webhooks, message queues) and use the provided Kubernetes client to create new StoryRuns. An Impulse usually runs as a Kubernetes Deployment.
type InboundMessage ¶ added in v0.1.2
type InboundMessage struct {
StreamMessage
// contains filtered or unexported fields
}
InboundMessage wraps a transport-delivered StreamMessage and lets the SDK runtime track when user processing is complete for delivery policies that require it. Best-effort traffic ignores completion signals.
func BindProcessingReceipt ¶ added in v0.1.2
func BindProcessingReceipt(msg InboundMessage, onProcessed func()) InboundMessage
BindProcessingReceipt attaches an SDK-managed completion hook to an inbound message. Runtime code uses this to defer transport acknowledgements until user processing explicitly completes. External callers normally do not need it.
func NewInboundMessage ¶ added in v0.1.2
func NewInboundMessage(msg StreamMessage) InboundMessage
NewInboundMessage wraps a StreamMessage for inbound streaming delivery.
func (InboundMessage) Done ¶ added in v0.1.2
func (m InboundMessage) Done()
Done notifies the runtime that processing of this inbound message completed successfully. Messages without an attached processing receipt ignore this call, so best-effort traffic does not require special handling.
type Result ¶
type Result struct {
// Data is the output of the Engram. This can be any serializable type.
// The SDK will automatically handle JSON marshaling and, if configured,
// transparently offload the data to external storage if it exceeds the
// size threshold.
Data any
}
Result is the universal return type for a BatchEngram's Process method. It encapsulates the output data. The SDK uses this structure to determine the output of the step.
func NewResultFrom ¶ added in v0.1.2
NewResultFrom wraps the provided data in a Result. It keeps examples and callers working with a single helper so future metadata can be attached centrally.
type Secrets ¶
type Secrets struct {
// contains filtered or unexported fields
}
Secrets provides sandboxed access to the secrets mapped to an Engram's StepRun. The SDK runtime populates this object from environment variables injected by the bobrapet controller, ensuring that Engrams only have access to the secrets explicitly declared in their corresponding Step definition in the Story.
func NewSecrets ¶
NewSecrets creates a new Secrets object, expanding descriptor-style env/file references. Callers must pass a non-nil context; cancellation stops in-flight I/O and returns the secrets collected so far to honor shutdown deadlines. Nil contexts are treated as API misuse and fail closed with an empty secret set.
func NewSecretsWithError ¶ added in v0.1.2
NewSecretsWithError expands descriptor-style env/file references and returns any expansion failure to callers that want to fail closed during SDK/runtime initialization.
func (*Secrets) Format ¶
Format implements fmt.Formatter to prevent accidental logging of secrets. It ensures that printing the Secrets struct (e.g., with %+v) does not leak values.
func (*Secrets) GetAll ¶
GetAll returns a copy of the secret keys. The values are redacted to prevent accidental logging of sensitive data.
func (*Secrets) LogValue ¶ added in v0.1.2
LogValue implements slog.LogValuer to prevent structured logs from serializing plaintext secret values or keys.
type StoryInfo ¶
type StoryInfo struct {
StoryName string `json:"storyName"`
StoryRunID string `json:"storyRunID"`
StepName string `json:"stepName"`
StepRunID string `json:"stepRunID"`
StepRunNamespace string `json:"stepRunNamespace"`
}
StoryInfo contains metadata about the currently executing Story and Step.
type StreamMessage ¶
type StreamMessage struct {
// Kind declares the semantic intent of the packet (e.g., "data", "heartbeat").
Kind string
// MessageID is an optional caller-defined identifier that assists with deduplication.
MessageID string
// Timestamp captures when the packet was produced. Zero-value timestamps are omitted.
Timestamp time.Time
// Metadata contains tracing information (StoryRunID, StepName, etc.) from DataPacket.
// This should be propagated through the streaming pipeline to maintain observability.
Metadata map[string]string
// Payload is the JSON-encoded data to be processed. Prefer Binary for new code paths.
Payload []byte
// Audio carries PCM audio frames when present.
Audio *AudioFrame
// Video carries encoded or raw video frames when present.
Video *VideoFrame
// Binary carries arbitrary non-audio/video frames when present.
Binary *BinaryFrame
// Inputs contains the evaluated step 'with:' configuration (CEL-resolved per packet).
// This is analogous to BUBU_TRIGGER_DATA in batch mode - dynamic configuration that can
// reference outputs from previous steps. The Hub evaluates this before forwarding.
// Empty if the step has no 'with:' block or evaluation failed.
Inputs []byte
// Transports mirrors the Story's declared transports, allowing engrams to decide whether
// to keep payloads on the hot path (e.g., LiveKit) or fall back to storage without
// rereading pod environment.
Transports []TransportDescriptor
// Envelope carries optional stream sequencing metadata from the transport layer.
// When set, it can be used for ordering and replay-aware processing.
Envelope *transportpb.StreamEnvelope
}
StreamMessage represents a single message in a bidirectional stream with metadata. Metadata enables tracing and correlation across streaming pipeline steps.
func (StreamMessage) Validate ¶ added in v0.1.2
func (m StreamMessage) Validate() error
Validate reports whether the message uses a transport shape the SDK can encode without dropping or reinterpreting data.
type StreamingEngram ¶
type StreamingEngram[C any] interface { Engram[C] // Stream is the core method for handling bidirectional data flow with metadata. // The SDK provides inbound messages plus an outbound StreamMessage channel. // Metadata should be propagated to enable tracing across the streaming pipeline. // The method should process messages from `in`, call Done on messages it handled // successfully (or intentionally dropped), and write results to `out` until the // input channel is closed or the context is canceled. Stream(ctx context.Context, in <-chan InboundMessage, out chan<- StreamMessage) error }
StreamingEngram is the interface for components that handle real-time, continuous data streams. They are typically used for tasks like transformations, filtering, or routing of data between other streaming systems. A StreamingEngram usually runs as a Kubernetes Deployment and communicates over gRPC.
type TransportDescriptor ¶ added in v0.1.2
type TransportDescriptor struct {
// Name is the transport binding name referenced by the Story.
Name string `json:"name"`
// Kind identifies the transport driver kind, such as "livekit" or "storage".
Kind string `json:"kind"`
// Mode selects the runtime behavior for the transport binding.
Mode string `json:"mode,omitempty"`
// Config carries arbitrary transport-specific settings.
Config map[string]any `json:"config,omitempty"`
}
TransportDescriptor describes a named transport binding declared on the Story. Config carries arbitrary transport-specific settings (e.g. livekit/storage blocks).
func (TransportDescriptor) Clone ¶ added in v0.1.2
func (t TransportDescriptor) Clone() TransportDescriptor
Clone returns a deep copy of the descriptor to avoid callers mutating shared state.
type VideoFrame ¶ added in v0.1.2
type VideoFrame struct {
// Payload contains encoded bytes or raw pixel data for the frame.
Payload []byte
// Codec identifies the encoded video format (for example, "h264"), if applicable.
Codec string
// Width is the frame width in pixels.
Width uint32
// Height is the frame height in pixels.
Height uint32
// Timestamp is the media timeline position for this frame.
Timestamp time.Duration
// Raw reports whether Payload carries raw video pixels instead of encoded bytes.
Raw bool
}
VideoFrame represents encoded or raw video delivered through the streaming SDK.