Documentation
¶
Overview ¶
Package sdk provides the primary entry points for executing bobrapet components.
This package contains the runtime logic that bootstraps an Engram or Impulse, injects the necessary context from the environment, and manages its lifecycle. Developers typically interact with StartBatch, StartStreaming, or RunImpulse from their main.go file.
Entry Points ¶
For batch engrams (Jobs):
sdk.StartBatch(ctx, myEngram)
For streaming engrams (Deployments with gRPC):
sdk.StartStreaming(ctx, myStreamingEngram)
For impulses (Deployments that trigger Stories):
sdk.RunImpulse(ctx, myImpulse)
Environment-Driven Configuration ¶
The SDK is controlled entirely by environment variables injected by the bobrapet operator. SDK defaults are fallback values for local development. See docs/reference/config.md for the complete environment variable reference.
Concurrency and Cancellation ¶
All entry points respect context cancellation. Batch engrams enforce a timeout via BUBU_STEP_TIMEOUT. Streaming engrams implement graceful shutdown on SIGTERM with configurable drain timeouts via BUBU_SDK_GRACEFUL_SHUTDOWN_TIMEOUT.
Error Handling ¶
Entry points return errors for initialization and execution failures. Batch engrams additionally patch StepRun status with exit codes for operator retry policy classification (exit code 124 for timeouts, 1 for logic errors, 0 for success).
Index ¶
- Constants
- Variables
- func BatchExitCode(err error) int
- func DebugModeEnabled() bool
- func EmitSequencedSignal(ctx context.Context, key string, seq uint64, value any) error
- func EmitSignal(ctx context.Context, key string, value any) error
- func EmitSignalWithSequence(ctx context.Context, key string, seq *SignalSequence, value any) error
- func EmitTextSignal(ctx context.Context, key string, text string, opts TextSignalOptions) error
- func ExecuteEffectOnce(ctx context.Context, key string, fn func(context.Context) (any, error)) (any, bool, error)
- func ExecutionMode() string
- func ExtractTraceContext(ctx context.Context, msg *engram.StreamMessage) context.Context
- func HasEffect(ctx context.Context, key string) (bool, error)
- func LoggerFromContext(ctx context.Context) *slog.Logger
- func NewStreamErrorMessage(errObj runsv1alpha1.StructuredError, opts ...StreamMessageOption) (engram.StreamMessage, error)
- func NewStreamMessage(kind string, opts ...StreamMessageOption) engram.StreamMessage
- func NewStructuredError(typ runsv1alpha1.StructuredErrorType, message string, ...) error
- func ParseStreamErrorMessage(msg engram.StreamMessage) (runsv1alpha1.StructuredError, bool, error)
- func PublishLogFile(ctx context.Context, path string, contentType string) error
- func PublishLogs(ctx context.Context, payload []byte) error
- func PublishLogsWithContentType(ctx context.Context, payload []byte, contentType string) error
- func RecordEffect(ctx context.Context, key, status string, details any) error
- func ReplaySignals(ctx context.Context, stepRunName, namespace string, sinceSeq uint64) ([]runsv1alpha1.SignalEvent, error)
- func RunBatch[C any, I any](ctx context.Context, e engram.BatchEngram[C, I]) error
- func RunImpulse[C any](ctx context.Context, i engram.Impulse[C]) error
- func Start[C any, I any](ctx context.Context, e DualEngram[C, I]) error
- func StartBatch[C any, I any](ctx context.Context, e engram.BatchEngram[C, I]) error
- func StartStory(ctx context.Context, storyName string, inputs map[string]any) (*runsv1alpha1.StoryRun, error)
- func StartStoryInNamespace(ctx context.Context, storyName string, storyNamespace string, ...) (*runsv1alpha1.StoryRun, error)
- func StartStoryWithToken(ctx context.Context, storyName string, token string, inputs map[string]any) (*runsv1alpha1.StoryRun, error)
- func StartStoryWithTokenInNamespace(ctx context.Context, storyName string, storyNamespace string, token string, ...) (*runsv1alpha1.StoryRun, error)
- func StartStreamServer[C any](ctx context.Context, e engram.StreamingEngram[C]) error
- func StartStreaming[C any](ctx context.Context, e engram.StreamingEngram[C]) error
- func StopStory(ctx context.Context, storyRunName string) error
- func StopStoryInNamespace(ctx context.Context, storyRunName, namespace string) error
- func TriggerTokenFromContext(ctx context.Context) string
- func WithLogger(ctx context.Context, logger *slog.Logger) context.Context
- func WithTriggerToken(ctx context.Context, token string) context.Context
- type BatchTimeoutError
- type DualEngram
- type K8sClient
- type SequencedSignal
- type SignalEnvelope
- type SignalMeta
- type SignalSequence
- type StorageManager
- type StoryDispatcher
- func (d *StoryDispatcher) Forget(key string)
- func (d *StoryDispatcher) HasSession(key string) bool
- func (d *StoryDispatcher) Session(key string) (*StorySession, bool)
- func (d *StoryDispatcher) Stop(ctx context.Context, key string) (*StorySession, error)
- func (d *StoryDispatcher) Trigger(ctx context.Context, req StoryTriggerRequest) (*StoryTriggerResult, error)
- type StoryDispatcherOption
- type StorySession
- type StoryTriggerRequest
- type StoryTriggerResult
- type StreamMessageOption
- func WithBinaryPayload(mime string, payload []byte, timestamp time.Duration) StreamMessageOption
- func WithInputs(inputs []byte) StreamMessageOption
- func WithJSONData(v any) (StreamMessageOption, error)
- func WithJSONPayload(payload []byte) StreamMessageOption
- func WithMessageID(id string) StreamMessageOption
- func WithMetadata(metadata map[string]string) StreamMessageOption
- func WithStreamEnvelope(env *transportpb.StreamEnvelope) StreamMessageOption
- func WithTimestamp(ts time.Time) StreamMessageOption
- func WithTransports(descriptors []engram.TransportDescriptor) StreamMessageOption
- type StructuredErrorOption
- func WithStructuredErrorCause(cause error) StructuredErrorOption
- func WithStructuredErrorCode(code string) StructuredErrorOption
- func WithStructuredErrorDetails(details map[string]any) StructuredErrorOption
- func WithStructuredErrorExitClass(exitClass enums.ExitClass) StructuredErrorOption
- func WithStructuredErrorRetryable(retryable bool) StructuredErrorOption
- type StructuredErrorProvider
- type TargetStory
- type TextSignalOptions
- type TransportConnectorClient
Constants ¶
const ( // DefaultChannelBufferSize is the in-memory buffer used for Engram stream channels. DefaultChannelBufferSize = 16 // DefaultMaxMessageSize caps gRPC message sizes when talking to connectors. DefaultMaxMessageSize = 10 * 1024 * 1024 )
const (
// ReasonTimeout indicates the step failed because it exceeded its deadline.
ReasonTimeout = "Timeout"
)
Variables ¶
var ErrBatchTimeout = errors.New("bubu batch execution timed out")
ErrBatchTimeout is a sentinel used with errors.Is to detect batch timeouts.
var ErrEffectAlreadyRecorded = errors.New("effect already recorded")
ErrEffectAlreadyRecorded indicates that the requested effect key is already present in the current StepRun effect ledger.
ErrEffectsUnavailable indicates that the current process cannot record effects (e.g., it is not running inside a StepRun workload). Callers may treat this as a soft failure and continue without recording the effect.
var ErrImpulseSessionExists = errors.New("impulse session already active")
ErrImpulseSessionExists indicates that a dispatcher session key is already active.
var ErrImpulseSessionNotFound = errors.New("impulse session not found")
ErrImpulseSessionNotFound indicates that a dispatcher session key has no active session.
ErrLogsUnavailable indicates that logs cannot be published (for example, when not running inside a StepRun or when storage is disabled). Callers may treat this as a soft failure.
ErrSignalsUnavailable indicates that the current process cannot emit signals (e.g., it is not running inside a StepRun workload). Callers may treat this as a soft failure and continue without emitting metadata.
var ErrStoryRunNotFound = errors.New("storyrun not found")
ErrStoryRunNotFound indicates that a requested StoryRun could not be located.
Functions ¶
func BatchExitCode ¶ added in v0.1.2
BatchExitCode returns the recommended container exit code for an error. Timeout errors map to 124 (GNU timeout), all other non-nil errors default to 1.
func DebugModeEnabled ¶ added in v0.1.2
func DebugModeEnabled() bool
DebugModeEnabled reports whether verbose logging should be forced regardless of logger level.
func EmitSequencedSignal ¶ added in v0.1.2
EmitSequencedSignal emits a signal wrapped with a sequence number and timestamp.
func EmitSignal ¶ added in v0.1.2
EmitSignal patches the current StepRun status with a small JSON payload so that controllers and CEL expressions can react to live metadata. When value is nil, the signal is cleared. The latest value stored in status.signals is always a compact summary so raw payloads are not persisted in StepRun status. Signal events are sequenced by default.
func EmitSignalWithSequence ¶ added in v0.1.2
EmitSignalWithSequence emits a signal using the provided sequence generator.
func EmitTextSignal ¶ added in v0.1.2
EmitTextSignal emits a metadata summary for text payloads. By default it keeps metadata-only behavior; callers can opt into bounded inline samples via SampleBytes or SampleExtras. The full payload should still be passed via outputs/storage references when downstream steps need the complete content.
func ExecuteEffectOnce ¶ added in v0.1.2
func ExecuteEffectOnce(ctx context.Context, key string, fn func(context.Context) (any, error)) (any, bool, error)
ExecuteEffectOnce runs fn only if the effect key has not been recorded yet. It records a successful effect with the returned details. When the effect already exists, it returns `already=true` with `ErrEffectAlreadyRecorded`.
func ExecutionMode ¶ added in v0.1.2
func ExecutionMode() string
ExecutionMode returns the execution mode supplied by the operator (e.g. "batch", "job", "deployment"). Defaults to "batch" when not set so local runs behave sensibly.
func ExtractTraceContext ¶ added in v0.1.2
ExtractTraceContext restores tracing context from a StreamMessage's metadata so Engrams can start child spans that are linked to upstream steps.
func HasEffect ¶ added in v0.1.2
HasEffect returns true if the current StepRun already recorded an effect for the key.
func LoggerFromContext ¶
LoggerFromContext retrieves a slog.Logger from the context, or returns a default JSON logger.
If no logger was previously stored via WithLogger, this function returns a new JSON logger writing to stdout with default settings. This ensures the SDK always has a valid logger without requiring explicit configuration for simple use cases.
Thread-safe and idempotent.
func NewStreamErrorMessage ¶ added in v0.1.2
func NewStreamErrorMessage( errObj runsv1alpha1.StructuredError, opts ...StreamMessageOption) (engram.StreamMessage, error, )
NewStreamErrorMessage wraps a StructuredError into a StreamMessage payload with Kind "error".
func NewStreamMessage ¶ added in v0.1.2
func NewStreamMessage(kind string, opts ...StreamMessageOption) engram.StreamMessage
NewStreamMessage constructs an engram.StreamMessage pre-populated with the provided options. It trims the kind identifier and applies any options in order. Note that metadata-only messages are invalid at send time; at least one of audio/video/binary payloads, JSON payload, inputs, or transports must be populated for the message to be published.
func NewStructuredError ¶ added in v0.1.2
func NewStructuredError(typ runsv1alpha1.StructuredErrorType, message string, opts ...StructuredErrorOption) error
NewStructuredError returns an error that carries a StructuredError payload.
func ParseStreamErrorMessage ¶ added in v0.1.2
func ParseStreamErrorMessage(msg engram.StreamMessage) (runsv1alpha1.StructuredError, bool, error)
ParseStreamErrorMessage extracts StructuredError payloads from StreamMessage Kind "error".
func PublishLogFile ¶ added in v0.1.2
PublishLogFile reads the provided file and publishes its contents as logs.
func PublishLogs ¶ added in v0.1.2
PublishLogs uploads log bytes to storage (when enabled) and patches StepRun.status.logs with a storage reference. If storage is disabled, it returns ErrLogsUnavailable.
func PublishLogsWithContentType ¶ added in v0.1.2
PublishLogsWithContentType is like PublishLogs but lets callers set a content type.
func RecordEffect ¶ added in v0.1.2
RecordEffect appends an effect record to the current StepRun status ledger. The effect sequence is assigned server-side when Seq is 0.
func ReplaySignals ¶ added in v0.1.2
func ReplaySignals( ctx context.Context, stepRunName, namespace string, sinceSeq uint64) ([]runsv1alpha1.SignalEvent, error, )
ReplaySignals returns the signal events for a StepRun, optionally filtered by sequence. When stepRunName or namespace are empty, environment defaults are used.
func RunBatch ¶
RunBatch is the primary entry point for a BatchEngram. It provides a fully type-safe execution environment, handling all the boilerplate of context loading, data hydration, and status patching.
func RunImpulse ¶
RunImpulse is the type-safe entry point for impulses (Kubernetes Deployments that trigger Stories).
This function infers config type C from the impulse implementation, providing compile-time type safety. It orchestrates the complete lifecycle:
- Load execution context from environment (BUBU_STEP_CONFIG, BUBU_TRIGGER_DATA, etc.)
- Unmarshal config into type C
- Call impulse.Init with typed config and secrets
- Create pre-configured Kubernetes client with namespace resolution
- Call impulse.Run with client, transferring control to long-running process
The impulse's Run method should block until work completes or context is canceled. Typical use cases: webhook listeners, message queue consumers, schedulers, event watchers.
Respects context cancellation for graceful shutdown on SIGTERM. The impulse is responsible for handling shutdown signals within its Run implementation (e.g., draining in-flight requests).
Example:
type MyConfig struct {
WebhookPort int `mapstructure:"webhookPort"`
SecretToken string `mapstructure:"secretToken"`
}
type MyImpulse struct { /* ... */ }
func (m *MyImpulse) Init(ctx context.Context, cfg MyConfig, secrets *engram.Secrets) error {
// Setup webhook server, validate token, etc.
return nil
}
func (m *MyImpulse) Run(ctx context.Context, client *k8s.Client) error {
// Listen for webhooks, trigger stories via client.TriggerStory(...)
<-ctx.Done()
return ctx.Err()
}
func main() {
if err := sdk.RunImpulse(context.Background(), &MyImpulse{}); err != nil {
panic(err)
}
}
func Start ¶ added in v0.1.2
Start launches the provided engram in either batch or streaming mode based on ExecutionMode. This lets engram entrypoints avoid direct environment inspection and stick to the SDK abstraction.
func StartBatch ¶
StartBatch is the type-safe entry point for batch engrams (Kubernetes Jobs).
This function infers both config type C and input type I from the engram implementation, providing full compile-time type safety. It orchestrates the complete lifecycle:
- Load execution context from environment (BUBU_STEP_CONFIG, BUBU_TRIGGER_DATA, etc.)
- Unmarshal config and inputs into types C and I
- Call engram.Init with typed config and secrets
- Hydrate inputs from storage if needed
- Call engram.Process with typed inputs and execution context
- Dehydrate outputs to storage if they exceed size limits
- Patch StepRun status with result, timing, and exit code
Enforces timeout via BUBU_STEP_TIMEOUT with context cancellation. On timeout, patches status with exit code 124 (retryable) and forcefully exits to prevent zombie Jobs. On logic errors, patches with exit code 1 (terminal). On success, patches with exit code 0.
Example:
type MyConfig struct { APIKey string `mapstructure:"apiKey"` }
type MyInputs struct { UserID string `mapstructure:"userId"` }
func main() {
ctx := context.Background()
if err := sdk.StartBatch(ctx, NewMyEngram()); err != nil {
panic(err) // Ensure non-zero exit for Job failure detection
}
}
func StartStory ¶
func StartStory(ctx context.Context, storyName string, inputs map[string]any) (*runsv1alpha1.StoryRun, error)
StartStory triggers a new StoryRun for the named Story with the provided inputs. For cross-namespace executions, use StartStoryInNamespace to explicitly set the Story namespace.
This is the primary mechanism for programmatically initiating workflows, typically used from within an Impulse. The SDK automatically resolves the correct namespace from environment variables (BUBU_TARGET_STORY_NAMESPACE or fallbacks), creates a Kubernetes client, and submits the StoryRun resource.
Inputs are marshaled to JSON and stored in the StoryRun spec. The operator watches for new StoryRuns and orchestrates their execution.
Returns the created StoryRun on success, or an error if client creation or StoryRun creation fails. Respects context cancellation and deadlines.
Example:
sr, err := sdk.StartStory(ctx, "my-workflow", map[string]any{
"userId": "12345",
"action": "process",
})
if err != nil {
return fmt.Errorf("failed to trigger story: %w", err)
}
log.Printf("Triggered StoryRun: %s", sr.Name)
func StartStoryInNamespace ¶ added in v0.1.2
func StartStoryInNamespace( ctx context.Context, storyName string, storyNamespace string, inputs map[string]any, ) (*runsv1alpha1.StoryRun, error)
StartStoryInNamespace is identical to StartStory but allows specifying the namespace of the referenced Story explicitly. Provide an empty namespace to use the default resolution (BUBU_TARGET_STORY_NAMESPACE or the pod namespace).
func StartStoryWithToken ¶ added in v0.1.2
func StartStoryWithToken( ctx context.Context, storyName string, token string, inputs map[string]any, ) (*runsv1alpha1.StoryRun, error)
StartStoryWithToken behaves like StartStory but accepts a per-call token for deterministic retries.
func StartStoryWithTokenInNamespace ¶ added in v0.1.2
func StartStoryWithTokenInNamespace( ctx context.Context, storyName string, storyNamespace string, token string, inputs map[string]any, ) (*runsv1alpha1.StoryRun, error)
StartStoryWithTokenInNamespace behaves like StartStoryInNamespace but accepts a per-call token for deterministic retries.
func StartStreamServer ¶
StartStreamServer boots a StreamingEngram using the new transport connector contract. The Engram must have BUBU_TRANSPORT_BINDING set; no other transport modes are supported.
func StartStreaming ¶
StartStreaming is the type-safe entry point for streaming engrams (Kubernetes Deployments with gRPC).
This function infers config type C from the engram implementation, providing compile-time type safety. It orchestrates the complete lifecycle:
- Load execution context from environment (BUBU_STEP_CONFIG, etc.)
- Unmarshal config into type C
- Call engram.Init with typed config and secrets
- Start gRPC server on BUBU_GRPC_PORT (default 50051)
- Register engram.Stream as the bidirectional streaming handler
- Serve until context cancellation (SIGTERM) or error
- Gracefully drain active streams before shutdown
The gRPC server implements:
- Transparent heartbeat sending/filtering to detect connection hangs
- Backpressure handling with configurable timeouts
- Graceful shutdown with BUBU_GRPC_GRACEFUL_SHUTDOWN_TIMEOUT drain phase
- Optional TLS via BUBU_GRPC_TLS_CERT_FILE and BUBU_GRPC_TLS_KEY_FILE
- Configurable message size limits via BUBU_GRPC_MAX_RECV_BYTES and BUBU_GRPC_MAX_SEND_BYTES
Example:
type MyConfig struct { BufferSize int `mapstructure:"bufferSize"` }
func main() {
ctx := context.Background()
if err := sdk.StartStreaming(ctx, NewMyStreamingEngram()); err != nil {
panic(err)
}
}
func StopStory ¶ added in v0.1.2
StopStory cancels an in-flight StoryRun in the current namespace. Equivalent to StopStoryInNamespace with an empty namespace.
func StopStoryInNamespace ¶ added in v0.1.2
StopStoryInNamespace cancels an in-flight StoryRun by marking it finished. If the StoryRun does not exist, ErrStoryRunNotFound is returned. Already-terminal StoryRuns are treated as a no-op. Active StoryRuns in phases the SDK will not force-finish return an invalid-transition error from the underlying k8s client.
func TriggerTokenFromContext ¶ added in v0.1.2
TriggerTokenFromContext returns the trigger token stored in the context, if any.
func WithLogger ¶
WithLogger stores a slog.Logger in the context for SDK use.
This allows you to inject a custom configured logger (e.g., with specific log levels, handlers, or structured attributes) that the SDK will use for all internal logging. If not provided, the SDK defaults to JSON logging to stdout.
Example:
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
ctx := sdk.WithLogger(context.Background(), logger)
sdk.StartBatch(ctx, myEngram)
The logger is used for lifecycle events (init, shutdown), errors, and metrics. It does not intercept engram-specific logs; engrams should use their own loggers or retrieve the SDK logger via LoggerFromContext within their execution context.
func WithTriggerToken ¶ added in v0.1.2
WithTriggerToken attaches an idempotency token that StartStory passes through to the Kubernetes client. When provided, the SDK derives deterministic StoryRun names without relying on process-wide env vars. Nil contexts are accepted so callers can safely attach a token before choosing a base context.
Types ¶
type BatchTimeoutError ¶ added in v0.1.2
type BatchTimeoutError struct {
// Timeout is the configured duration limit that was exceeded.
Timeout time.Duration
// Cause is the underlying timeout-related error, when one is available.
Cause error
}
BatchTimeoutError conveys that a batch engram exceeded its configured timeout.
func (*BatchTimeoutError) Error ¶ added in v0.1.2
func (e *BatchTimeoutError) Error() string
Error implements the error interface.
func (*BatchTimeoutError) Is ¶ added in v0.1.2
func (e *BatchTimeoutError) Is(target error) bool
Is allows errors.Is(err, ErrBatchTimeout) to match *BatchTimeoutError values.
func (*BatchTimeoutError) Unwrap ¶ added in v0.1.2
func (e *BatchTimeoutError) Unwrap() error
Unwrap exposes the underlying cause for errors.Unwrap / errors.Is checks.
type DualEngram ¶ added in v0.1.2
type DualEngram[C any, I any] interface { engram.BatchEngram[C, I] engram.StreamingEngram[C] }
DualEngram is implemented by Engram types that support both batch and streaming modes.
type K8sClient ¶
type K8sClient interface {
// TriggerStory creates a new StoryRun for the named Story with the provided inputs.
// The inputs map is marshaled to JSON and stored in the StoryRun's spec.inputs field.
// Returns the created StoryRun on success, or an error if creation fails.
// Respects context cancellation and deadlines.
TriggerStory(
ctx context.Context,
storyName string,
storyNamespace string,
inputs map[string]any,
) (*runsv1alpha1.StoryRun, error)
// PatchStepRunStatus updates the status of the named StepRun with the provided patch data.
// The implementation should use field-wise merging to avoid clobbering controller-managed
// fields and implement retry-on-conflict logic to handle concurrent updates.
// Respects context cancellation and deadlines.
PatchStepRunStatus(ctx context.Context, stepRunName string, patchData runsv1alpha1.StepRunStatus) error
}
K8sClient defines the interface for Kubernetes operations required by the SDK.
This interface abstracts the SDK's dependency on Kubernetes, enabling mocking in tests and providing a stable contract. Implementations must be safe for concurrent use by multiple goroutines.
The SDK's default implementation (k8s.Client) provides:
- Automatic namespace resolution from environment variables
- Retry-on-conflict logic for status patches
- Phase transition validation to prevent state corruption
- OpenTelemetry metrics for operation latency and success/failure
Custom implementations should follow the same concurrency and idempotency guarantees.
type SequencedSignal ¶ added in v0.1.2
type SequencedSignal struct {
// Seq is the monotonic sequence number assigned to the signal event.
Seq uint64 `json:"seq"`
// EmittedAt records when the signal was emitted in UTC.
EmittedAt time.Time `json:"emittedAt,omitempty"`
// Value carries the original signal payload associated with Seq.
Value any `json:"value,omitempty"`
}
SequencedSignal wraps a signal payload with a monotonically increasing sequence.
type SignalEnvelope ¶ added in v0.1.2
type SignalEnvelope struct {
// Meta summarizes the full payload without embedding it directly in StepRun status.
Meta SignalMeta `json:"meta,omitempty"`
// Sample carries an optional, already-sanitized sample payload when callers choose to include one.
Sample any `json:"sample,omitempty"`
}
SignalEnvelope is the standard signal structure for metadata + optional samples.
type SignalMeta ¶ added in v0.1.2
type SignalMeta struct {
// Format identifies the logical payload shape, such as "text" or "json".
Format string `json:"format,omitempty"`
// ContentType carries the MIME content type when one is known.
ContentType string `json:"contentType,omitempty"`
// SizeBytes reports the original payload size in bytes.
SizeBytes int `json:"sizeBytes,omitempty"`
// HashSHA256 optionally carries the lowercase hex SHA-256 digest of the original payload.
HashSHA256 string `json:"hashSha256,omitempty"`
// Attributes carries additional safe, non-secret metadata about the payload.
Attributes map[string]string `json:"attributes,omitempty"`
}
SignalMeta describes a lightweight summary of a payload that is safe for signals.
type SignalSequence ¶ added in v0.1.2
type SignalSequence struct {
// contains filtered or unexported fields
}
SignalSequence is a process-local monotonic sequence generator for signals.
func NewSignalSequence ¶ added in v0.1.2
func NewSignalSequence(start uint64) *SignalSequence
NewSignalSequence constructs a SignalSequence starting at the provided value.
func (*SignalSequence) Next ¶ added in v0.1.2
func (s *SignalSequence) Next() uint64
Next increments and returns the next sequence value.
type StorageManager ¶
type StorageManager interface {
// Hydrate recursively scans a data structure for storage references and replaces
// them with the actual content from the storage backend. Returns the hydrated
// data on success, or an error if reading fails or a reference is invalid.
// Respects context cancellation and enforces BUBU_STORAGE_TIMEOUT.
Hydrate(ctx context.Context, data any) (any, error)
// Dehydrate recursively checks the size of a data structure. If any part exceeds
// the inline size limit (BUBU_MAX_INLINE_SIZE), it saves that part to the storage
// backend and replaces it with a storage reference. Returns the dehydrated data
// (potentially containing references) on success, or an error if writing fails.
// Respects context cancellation and enforces BUBU_STORAGE_TIMEOUT.
Dehydrate(ctx context.Context, data any, stepRunID string) (any, error)
}
StorageManager defines the interface for storage operations required by the SDK.
This interface provides transparent data offloading for large inputs and outputs, automatically handling marshaling, storage backend operations, and reference tracking. Implementations must be safe for concurrent use by multiple goroutines.
The SDK's default implementation (storage.Manager) provides:
- Automatic size-based offloading (configurable via BUBU_MAX_INLINE_SIZE)
- Recursive hydration/dehydration of nested structures
- Support for S3 and file storage backends
- Path traversal protection and validation
- OpenTelemetry metrics for operation latency and data sizes
Storage references use one of the formats below:
- {"$bubuStorageRef": "outputs/steprun-id/path.json"}
- {"$bubuStorageRef": "outputs/steprun-id/path.json", "$bubuStoragePath":"result.text"}
- {"$bubuConfigMapRef": "namespace/name:key"}
- {"$bubuSecretRef": "namespace/name:key"}
- {"$bubuConfigMapRef": {"name":"cfg","key":"payload","namespace":"ns","format":"json"}}
Supported formats for ConfigMap/Secret refs: auto (default), json, raw. When namespace is omitted, the SDK defaults to BUBU_POD_NAMESPACE/BUBU_STEPRUN_NAMESPACE.
type StoryDispatcher ¶ added in v0.1.2
type StoryDispatcher struct {
// contains filtered or unexported fields
}
StoryDispatcher manages StoryRun lifecycles on behalf of an impulse, providing session tracking and idempotent stop semantics.
func NewStoryDispatcher ¶ added in v0.1.2
func NewStoryDispatcher(opts ...StoryDispatcherOption) *StoryDispatcher
NewStoryDispatcher creates a StoryDispatcher with optional configuration.
func (*StoryDispatcher) Forget ¶ added in v0.1.2
func (d *StoryDispatcher) Forget(key string)
Forget removes a session without attempting to stop the StoryRun.
func (*StoryDispatcher) HasSession ¶ added in v0.1.2
func (d *StoryDispatcher) HasSession(key string) bool
HasSession reports whether a session is currently tracked for the key.
func (*StoryDispatcher) Session ¶ added in v0.1.2
func (d *StoryDispatcher) Session(key string) (*StorySession, bool)
Session returns the session metadata for a key without mutating state.
func (*StoryDispatcher) Stop ¶ added in v0.1.2
func (d *StoryDispatcher) Stop(ctx context.Context, key string) (*StorySession, error)
Stop cancels the StoryRun associated with the session key. Returns the session metadata when successful.
func (*StoryDispatcher) Trigger ¶ added in v0.1.2
func (d *StoryDispatcher) Trigger(ctx context.Context, req StoryTriggerRequest) (*StoryTriggerResult, error)
Trigger starts a StoryRun and optionally records a session keyed by req.Key.
If req.StoryName is empty, the target story is resolved from the Impulse's spec.storyRef via GetTargetStory(). This allows impulses to omit the story name in their trigger requests, relying on the operator-injected environment variables instead.
type StoryDispatcherOption ¶ added in v0.1.2
type StoryDispatcherOption func(*StoryDispatcher)
StoryDispatcherOption configures a StoryDispatcher.
func WithStoryRuntime ¶ added in v0.1.2
func WithStoryRuntime( start func( ctx context.Context, storyName string, storyNamespace string, inputs map[string]any, ) (*runsv1alpha1.StoryRun, error), stop func(ctx context.Context, storyRunName, storyNamespace string) error, ) StoryDispatcherOption
WithStoryRuntime overrides the start/stop implementation used by the dispatcher. Intended primarily for tests.
type StorySession ¶ added in v0.1.2
type StorySession struct {
// Key is the dispatcher-local session key used to look up or stop the StoryRun later.
Key string
// StoryRun is the created StoryRun resource name.
StoryRun string
// Namespace is the namespace that owns StoryRun.
Namespace string
// StoryName is the logical Story that produced StoryRun.
StoryName string
// StartedAt records when the dispatcher observed the StoryRun as started.
StartedAt time.Time
// Metadata carries optional impulse-owned attributes associated with the session.
Metadata map[string]string
}
StorySession holds metadata about an active StoryRun started by an impulse.
type StoryTriggerRequest ¶ added in v0.1.2
type StoryTriggerRequest struct {
// Key optionally reserves a dispatcher session slot for later Stop/Forget calls.
Key string
// TriggerToken enables idempotent StoryRun creation when the caller provides one.
TriggerToken string
// StoryName overrides the target story name; when empty the dispatcher resolves it from the Impulse environment.
StoryName string
//nolint:lll,lll
// StoryNamespace overrides the target story namespace; when empty the dispatcher resolves it from the Impulse environment.
StoryNamespace string
// Inputs contains the structured trigger payload forwarded to the new StoryRun.
Inputs map[string]any
// Metadata carries caller-defined session metadata stored only in the local dispatcher session.
Metadata map[string]string
}
StoryTriggerRequest defines the inputs required to trigger a story.
type StoryTriggerResult ¶ added in v0.1.2
type StoryTriggerResult struct {
// StoryRun is the created Kubernetes StoryRun object returned by the runtime.
StoryRun *runsv1alpha1.StoryRun
// Session is the dispatcher-tracked session metadata when a session key was requested.
Session *StorySession
}
StoryTriggerResult returns the StoryRun created by the dispatcher and the associated session.
type StreamMessageOption ¶ added in v0.1.2
type StreamMessageOption func(*engram.StreamMessage)
StreamMessageOption configures an engram.StreamMessage produced via NewStreamMessage.
func WithBinaryPayload ¶ added in v0.1.2
func WithBinaryPayload(mime string, payload []byte, timestamp time.Duration) StreamMessageOption
WithBinaryPayload attaches an arbitrary binary payload plus MIME type. The payload is copied.
func WithInputs ¶ added in v0.1.2
func WithInputs(inputs []byte) StreamMessageOption
WithInputs attaches CEL-evaluated inputs (already marshaled JSON).
func WithJSONData ¶ added in v0.1.2
func WithJSONData(v any) (StreamMessageOption, error)
WithJSONData marshals the provided value to JSON and attaches it as the payload. It returns an option alongside any marshaling error so callers can handle failures inline.
func WithJSONPayload ¶ added in v0.1.2
func WithJSONPayload(payload []byte) StreamMessageOption
WithJSONPayload attaches a JSON payload (already marshaled). The byte slice is copied.
func WithMessageID ¶ added in v0.1.2
func WithMessageID(id string) StreamMessageOption
WithMessageID sets the message identifier for correlation across steps.
func WithMetadata ¶ added in v0.1.2
func WithMetadata(metadata map[string]string) StreamMessageOption
WithMetadata merges the supplied metadata into the message, cloning the map to prevent callers from mutating shared state.
func WithStreamEnvelope ¶ added in v0.1.2
func WithStreamEnvelope(env *transportpb.StreamEnvelope) StreamMessageOption
WithStreamEnvelope attaches transport-layer stream sequencing metadata.
func WithTimestamp ¶ added in v0.1.2
func WithTimestamp(ts time.Time) StreamMessageOption
WithTimestamp overrides the message timestamp. Zero values are ignored.
func WithTransports ¶ added in v0.1.2
func WithTransports(descriptors []engram.TransportDescriptor) StreamMessageOption
WithTransports records the story's declared transports for downstream inspection.
type StructuredErrorOption ¶ added in v0.1.2
type StructuredErrorOption func(*structuredError)
StructuredErrorOption mutates a structured error before it is returned.
func WithStructuredErrorCause ¶ added in v0.1.2
func WithStructuredErrorCause(cause error) StructuredErrorOption
WithStructuredErrorCause preserves the underlying error for wrapping/unwrapping.
func WithStructuredErrorCode ¶ added in v0.1.2
func WithStructuredErrorCode(code string) StructuredErrorOption
WithStructuredErrorCode sets a component-specific error code.
func WithStructuredErrorDetails ¶ added in v0.1.2
func WithStructuredErrorDetails(details map[string]any) StructuredErrorOption
WithStructuredErrorDetails attaches structured metadata for diagnostics.
func WithStructuredErrorExitClass ¶ added in v0.1.2
func WithStructuredErrorExitClass(exitClass enums.ExitClass) StructuredErrorOption
WithStructuredErrorExitClass sets the desired exit class ("retry", "terminal", etc.).
func WithStructuredErrorRetryable ¶ added in v0.1.2
func WithStructuredErrorRetryable(retryable bool) StructuredErrorOption
WithStructuredErrorRetryable annotates the error as retryable/terminal.
type StructuredErrorProvider ¶ added in v0.1.2
type StructuredErrorProvider interface {
StructuredError() runsv1alpha1.StructuredError
}
StructuredErrorProvider allows errors to supply a versioned StructuredError payload.
type TargetStory ¶ added in v0.1.2
type TargetStory struct {
// Name is the Story name from Impulse.spec.storyRef.name.
Name string
// Namespace is the Story namespace from Impulse.spec.storyRef.namespace.
// Empty if the Story is in the same namespace as the Impulse.
Namespace string
}
TargetStory holds the target story information resolved from the Impulse's storyRef. This is set by the operator via environment variables.
func GetTargetStory ¶ added in v0.1.2
func GetTargetStory() (TargetStory, error)
GetTargetStory returns the target story configured via the Impulse's spec.storyRef. The operator injects these values as BUBU_TARGET_STORY_NAME and BUBU_TARGET_STORY_NAMESPACE environment variables when running an Impulse pod.
Returns an error if BUBU_TARGET_STORY_NAME is not set, as the Impulse CRD requires a storyRef to be specified.
Example:
target, err := sdk.GetTargetStory()
if err != nil {
return fmt.Errorf("no target story configured: %w", err)
}
sr, err := sdk.StartStoryInNamespace(ctx, target.Name, target.Namespace, inputs)
func MustGetTargetStory ¶ added in v0.1.2
func MustGetTargetStory() TargetStory
MustGetTargetStory is like GetTargetStory but panics if the target story is not configured. Useful in main() where early failure is preferred over error handling.
type TextSignalOptions ¶ added in v0.1.2
type TextSignalOptions struct {
// Format overrides the logical payload format label stored in SignalMeta.Format.
Format string
// ContentType sets SignalMeta.ContentType for the emitted signal.
ContentType string
// SampleBytes caps the inline sample size in bytes when callers choose to include a text sample.
// When zero, EmitTextSignal preserves the default metadata-only behavior unless SampleExtras is set.
SampleBytes int
// IncludeHash enables SHA-256 hashing of the original text into SignalMeta.HashSHA256.
IncludeHash bool
// Attributes attaches additional safe metadata to SignalMeta.Attributes.
Attributes map[string]string
// SampleExtras adds additional sample metadata alongside the sampled text.
SampleExtras map[string]any
}
TextSignalOptions controls how text payloads are summarized for signals.
type TransportConnectorClient ¶ added in v0.1.2
type TransportConnectorClient struct {
// contains filtered or unexported fields
}
TransportConnectorClient wraps the generic Engram↔connector gRPC contract. It dials the connector endpoint advertised via the TransportBinding env payload.
func DialTransportConnector ¶ added in v0.1.2
func DialTransportConnector( ctx context.Context, endpoint string, opts ...grpc.DialOption, ) (*TransportConnectorClient, error)
DialTransportConnector establishes a gRPC client connection to the generic transport connector. Endpoint may be tcp host:port or a unix domain socket (prefixed with unix://).
func (*TransportConnectorClient) Client ¶ added in v0.1.2
func (c *TransportConnectorClient) Client() transportpb.TransportConnectorServiceClient
Client exposes the underlying generated gRPC client.
func (*TransportConnectorClient) Close ¶ added in v0.1.2
func (c *TransportConnectorClient) Close() error
Close tears down the connector connection.
Source Files
¶
- batch.go
- cel_templates.go
- config_hydration.go
- debug.go
- effects.go
- env_resolver.go
- errors.go
- impulse.go
- logs.go
- schema_validation.go
- sdk.go
- signal_payloads.go
- signal_replay.go
- signal_sequence.go
- signals.go
- story_dispatcher.go
- stream.go
- stream_chunks.go
- stream_context.go
- stream_error.go
- stream_message_builder.go
- stream_trace.go
- structured_error.go
- transport_binding.go
- transport_connector.go
- transport_envelope.go
- transport_helpers.go
Directories
¶
| Path | Synopsis |
|---|---|
|
package engram defines the core interfaces that developers implement to create components for the bobrapet ecosystem.
|
package engram defines the core interfaces that developers implement to create components for the bobrapet ecosystem. |
|
integration
|
|
|
Package k8s contains the SDK's controller-runtime client helpers for StoryRun, StepRun, and Impulse operations.
|
Package k8s contains the SDK's controller-runtime client helpers for StoryRun, StepRun, and Impulse operations. |
|
Package media provides helpers for offloading large streaming payloads to shared object storage while keeping small payloads inline.
|
Package media provides helpers for offloading large streaming payloads to shared object storage while keeping small payloads inline. |
|
pkg
|
|
|
env
Package env provides small helpers for parsing validated environment overrides used across the SDK runtime.
|
Package env provides small helpers for parsing validated environment overrides used across the SDK runtime. |
|
metrics
Package metrics provides OpenTelemetry-based observability for the SDK and allows developers to register custom metrics for their engrams and impulses.
|
Package metrics provides OpenTelemetry-based observability for the SDK and allows developers to register custom metrics for their engrams and impulses. |