Versions in this module Expand all Collapse all v0 v0.1.2 Apr 9, 2026 Changes in this version + const ReasonTimeout + var ErrBatchTimeout = errors.New("bubu batch execution timed out") + var ErrEffectAlreadyRecorded = errors.New("effect already recorded") + var ErrEffectsUnavailable = errors.New("effect recording unavailable: not running inside a StepRun") + var ErrImpulseSessionExists = errors.New("impulse session already active") + var ErrImpulseSessionNotFound = errors.New("impulse session not found") + var ErrLogsUnavailable = errors.New("log publishing unavailable: not running inside a StepRun or storage disabled") + var ErrSignalsUnavailable = errors.New("signal emission unavailable: not running inside a StepRun") + var ErrStoryRunNotFound = errors.New("storyrun not found") + func BatchExitCode(err error) int + func DebugModeEnabled() bool + func EmitSequencedSignal(ctx context.Context, key string, seq uint64, value any) error + func EmitSignal(ctx context.Context, key string, value any) error + func EmitSignalWithSequence(ctx context.Context, key string, seq *SignalSequence, value any) error + func EmitTextSignal(ctx context.Context, key string, text string, opts TextSignalOptions) error + func ExecuteEffectOnce(ctx context.Context, key string, fn func(context.Context) (any, error)) (any, bool, error) + func ExecutionMode() string + func ExtractTraceContext(ctx context.Context, msg *engram.StreamMessage) context.Context + func HasEffect(ctx context.Context, key string) (bool, error) + func NewStreamErrorMessage(errObj runsv1alpha1.StructuredError, opts ...StreamMessageOption) (engram.StreamMessage, error) + func NewStreamMessage(kind string, opts ...StreamMessageOption) engram.StreamMessage + func NewStructuredError(typ runsv1alpha1.StructuredErrorType, message string, ...) error + func ParseStreamErrorMessage(msg engram.StreamMessage) (runsv1alpha1.StructuredError, bool, error) + func PublishLogFile(ctx context.Context, path string, contentType string) error + func PublishLogs(ctx context.Context, payload []byte) error + func PublishLogsWithContentType(ctx context.Context, payload []byte, contentType string) error + func RecordEffect(ctx context.Context, key, status string, details any) error + func ReplaySignals(ctx context.Context, stepRunName, namespace string, sinceSeq uint64) ([]runsv1alpha1.SignalEvent, error) + func StartStoryInNamespace(ctx context.Context, storyName string, storyNamespace string, ...) (*runsv1alpha1.StoryRun, error) + func StartStoryWithToken(ctx context.Context, storyName string, token string, inputs map[string]any) (*runsv1alpha1.StoryRun, error) + func StartStoryWithTokenInNamespace(ctx context.Context, storyName string, storyNamespace string, token string, ...) (*runsv1alpha1.StoryRun, error) + func Start[C any, I any](ctx context.Context, e DualEngram[C, I]) error + func StopStory(ctx context.Context, storyRunName string) error + func StopStoryInNamespace(ctx context.Context, storyRunName, namespace string) error + func TriggerTokenFromContext(ctx context.Context) string + func WithTriggerToken(ctx context.Context, token string) context.Context + type BatchTimeoutError struct + Cause error + Timeout time.Duration + func (e *BatchTimeoutError) Error() string + func (e *BatchTimeoutError) Is(target error) bool + func (e *BatchTimeoutError) Unwrap() error + type DualEngram interface + type SequencedSignal struct + EmittedAt time.Time + Seq uint64 + Value any + type SignalEnvelope struct + Meta SignalMeta + Sample any + type SignalMeta struct + Attributes map[string]string + ContentType string + Format string + HashSHA256 string + SizeBytes int + type SignalSequence struct + func NewSignalSequence(start uint64) *SignalSequence + func (s *SignalSequence) Next() uint64 + type StoryDispatcher struct + func NewStoryDispatcher(opts ...StoryDispatcherOption) *StoryDispatcher + func (d *StoryDispatcher) Forget(key string) + func (d *StoryDispatcher) HasSession(key string) bool + func (d *StoryDispatcher) Session(key string) (*StorySession, bool) + func (d *StoryDispatcher) Stop(ctx context.Context, key string) (*StorySession, error) + func (d *StoryDispatcher) Trigger(ctx context.Context, req StoryTriggerRequest) (*StoryTriggerResult, error) + type StoryDispatcherOption func(*StoryDispatcher) + func WithStoryRuntime(...) StoryDispatcherOption + type StorySession struct + Key string + Metadata map[string]string + Namespace string + StartedAt time.Time + StoryName string + StoryRun string + type StoryTriggerRequest struct + Inputs map[string]any + Key string + Metadata map[string]string + StoryName string + StoryNamespace string + TriggerToken string + type StoryTriggerResult struct + Session *StorySession + StoryRun *runsv1alpha1.StoryRun + type StreamMessageOption func(*engram.StreamMessage) + func WithBinaryPayload(mime string, payload []byte, timestamp time.Duration) StreamMessageOption + func WithInputs(inputs []byte) StreamMessageOption + func WithJSONData(v any) (StreamMessageOption, error) + func WithJSONPayload(payload []byte) StreamMessageOption + func WithMessageID(id string) StreamMessageOption + func WithMetadata(metadata map[string]string) StreamMessageOption + func WithStreamEnvelope(env *transportpb.StreamEnvelope) StreamMessageOption + func WithTimestamp(ts time.Time) StreamMessageOption + func WithTransports(descriptors []engram.TransportDescriptor) StreamMessageOption + type StructuredErrorOption func(*structuredError) + func WithStructuredErrorCause(cause error) StructuredErrorOption + func WithStructuredErrorCode(code string) StructuredErrorOption + func WithStructuredErrorDetails(details map[string]any) StructuredErrorOption + func WithStructuredErrorExitClass(exitClass enums.ExitClass) StructuredErrorOption + func WithStructuredErrorRetryable(retryable bool) StructuredErrorOption + type StructuredErrorProvider interface + StructuredError func() runsv1alpha1.StructuredError + type TargetStory struct + Name string + Namespace string + func GetTargetStory() (TargetStory, error) + func MustGetTargetStory() TargetStory + type TextSignalOptions struct + Attributes map[string]string + ContentType string + Format string + IncludeHash bool + SampleBytes int + SampleExtras map[string]any + type TransportConnectorClient struct + func DialTransportConnector(ctx context.Context, endpoint string, opts ...grpc.DialOption) (*TransportConnectorClient, error) + func (c *TransportConnectorClient) Client() transportpb.TransportConnectorServiceClient + func (c *TransportConnectorClient) Close() error v0.1.0 Oct 18, 2025 Changes in this version + const DefaultChannelBufferSize + const DefaultClientBufferMaxBytes + const DefaultClientBufferMaxMessages + const DefaultGRPCPort + const DefaultMaxMessageSize + const DefaultMessageTimeout + func LoggerFromContext(ctx context.Context) *slog.Logger + func RunBatch[C any, I any](ctx context.Context, e engram.BatchEngram[C, I]) error + func RunImpulse[C any](ctx context.Context, i engram.Impulse[C]) error + func StartBatch[C any, I any](ctx context.Context, e engram.BatchEngram[C, I]) error + func StartStory(ctx context.Context, storyName string, inputs map[string]any) (*runsv1alpha1.StoryRun, error) + func StartStreamServer[C any](ctx context.Context, e engram.StreamingEngram[C]) error + func StartStreaming[C any](ctx context.Context, e engram.StreamingEngram[C]) error + func StreamTo(ctx context.Context, target string, in <-chan []byte, out chan<- []byte) error + func StreamToWithMetadata(ctx context.Context, target string, in <-chan engram.StreamMessage, ...) error + func WithLogger(ctx context.Context, logger *slog.Logger) context.Context + type K8sClient interface + PatchStepRunStatus func(ctx context.Context, stepRunName string, patchData runsv1alpha1.StepRunStatus) error + TriggerStory func(ctx context.Context, storyName string, inputs map[string]any) (*runsv1alpha1.StoryRun, error) + type StorageManager interface + Dehydrate func(ctx context.Context, data any, stepRunID string) (any, error) + Hydrate func(ctx context.Context, data any) (any, error)