sdk

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2026 License: Apache-2.0 Imports: 66 Imported by: 0

README

🧰 bubu-sdk-go — Official Go SDK for BubuStack

Go Reference Go Report Card

bubu-sdk-go is the public Go SDK for building BubuStack components:

  • Engrams for batch and streaming data processing
  • Impulses for long-running external event listeners

This is the component boundary. Engrams and Impulses depend on this SDK, not on bobrapet controller internals.

Prerequisites

  • Go 1.26.2 or newer (matching go.mod)
  • Docker or another OCI-compatible image builder
  • Access to a Kubernetes cluster supported by the current bobrapet release set

What the SDK Handles

  • Type-safe config, input, and secret binding
  • StepRun status patching and structured errors
  • Storage-ref hydration and large-payload offloading
  • Streaming transport lifecycle, control directives, and replay-safe acknowledgements
  • Trigger submission from Impulses via durable StoryTrigger requests
  • Cross-process effect deduplication via EffectClaim
  • Test harnesses and conformance suites for component authors

Execution Modes

Entry point Use case Kubernetes workload
sdk.StartBatch[C, I] Finite tasks with clear start/end Job
sdk.StartStreaming[C] Continuous processing with gRPC bidirectional streaming Deployment
sdk.RunImpulse[C] Long-running trigger services that submit durable StoryTrigger requests Deployment

sdk.StartStory(...) remains the helper Impulses use to trigger workflows, but the latest contract no longer creates StoryRun objects directly. The SDK now:

  1. submits a StoryTrigger
  2. waits for controller resolution
  3. returns the resolved StoryRun

Quick Start

Create a minimal batch Engram:

package main

import (
	"context"
	"fmt"
	"log"
	"os"

	sdk "github.com/bubustack/bubu-sdk-go"
	"github.com/bubustack/bubu-sdk-go/engram"
)

type Config struct {
	DefaultMessage string `mapstructure:"defaultMessage"`
}

type Inputs struct {
	Name string `mapstructure:"name"`
}

type HelloEngram struct {
	message string
}

func (e *HelloEngram) Init(ctx context.Context, cfg Config, secrets *engram.Secrets) error {
	e.message = cfg.DefaultMessage
	if e.message == "" {
		e.message = "Hello"
	}
	return nil
}

func (e *HelloEngram) Process(ctx context.Context, execCtx *engram.ExecutionContext, inputs Inputs) (*engram.Result, error) {
	if inputs.Name == "" {
		return nil, fmt.Errorf("name is required")
	}
	return engram.NewResultFrom(map[string]any{
		"message": fmt.Sprintf("%s, %s!", e.message, inputs.Name),
	}), nil
}

func main() {
	if err := sdk.StartBatch(context.Background(), &HelloEngram{}); err != nil {
		log.Printf("engram failed: %v", err)
		os.Exit(sdk.BatchExitCode(err))
	}
}

Build it with:

go build ./...

Then follow the website guides for image build, Engram / Story manifests, and deployment:

Trigger Helpers for Impulses

The trigger helpers are intended for Impulses and other trusted automation paths.

ctx = sdk.WithTriggerToken(ctx, "source-event-id-123")
run, err := sdk.StartStory(ctx, "my-story", map[string]any{
	"event": "opened",
})

Available helpers:

  • sdk.StartStory(ctx, storyName, inputs)
  • sdk.StartStoryInNamespace(ctx, storyName, namespace, inputs)
  • sdk.StartStoryWithToken(ctx, storyName, token, inputs)
  • sdk.StartStoryWithTokenInNamespace(ctx, storyName, namespace, token, inputs)
  • sdk.StopStory(ctx, storyRunName)
  • sdk.GetTargetStory()

Secrets

engram.Secrets is intentionally narrow. Prefer scoped accessors instead of dumping the full secret map.

apiKey, ok := secrets.Get("apiKey")
all := secrets.GetAll()              // returns a copy
names := secrets.Names()             // sorted key names
subset := secrets.Select("apiKey")   // bounded plaintext selection

Useful methods:

  • Get(key)
  • GetAll()
  • Names()
  • Select(keys...)

Streaming Notes

  • Streaming Engrams receive engram.InboundMessage, not raw StreamMessage.
  • Call msg.Done() after successful processing or intentional drop.
  • Structured JSON streaming outputs should keep the canonical JSON in Payload and mirror the same bytes into Binary with MimeType: application/json.
  • Use raw Binary without Payload only for opaque media or non-JSON blobs.
  • Startup now requires connector readiness metadata before the SDK starts the Engram stream loop.
  • Startup capability negotiation uses strict latest-only startup.capabilities=required|none metadata.

See the full contract: https://bubustack.io/docs/streaming/streaming-contract

Signals and Effects

  • sdk.EmitSignal(...) records bounded progress/state data on the current StepRun.
  • sdk.RecordEffect(...) appends an effect record to StepRun.status.effects.
  • sdk.ExecuteEffectOnce(...) uses EffectClaim for cross-process reservation, renewal, recovery, and completion.

See:

Testing

The SDK ships with:

  • testkit.BatchHarness
  • testkit.StreamHarness
  • conformance.BatchSuite
  • conformance.StreamSuite

Run the standard quality gates with:

make test
make lint

Environment Variables

The operator injects runtime configuration for components. Do not hard-code the env var set in downstream components; use the SDK and core/contracts as the source of truth.

Common groups:

  • Execution context
    • BUBU_STORY_NAME
    • BUBU_STORYRUN_ID
    • BUBU_STEP_NAME
    • BUBU_STEPRUN_NAME
    • BUBU_STEP_TIMEOUT
    • BUBU_MAX_INLINE_SIZE
  • Config and templating
    • BUBU_TRIGGER_DATA
    • BUBU_STEP_CONFIG
    • BUBU_TEMPLATE_CONTEXT
  • Transport
    • BUBU_TRANSPORT_BINDING
    • BUBU_GRPC_PORT
    • BUBU_GRPC_CHANNEL_BUFFER_SIZE
    • BUBU_GRPC_CHANNEL_SEND_TIMEOUT
    • BUBU_GRPC_MESSAGE_TIMEOUT
    • BUBU_GRPC_HANG_TIMEOUT
    • BUBU_GRPC_GRACEFUL_SHUTDOWN_TIMEOUT
    • BUBU_GRPC_RECONNECT_MAX_RETRIES
  • SDK observability
    • BUBU_SDK_METRICS_ENABLED
    • BUBU_SDK_TRACING_ENABLED
  • Kubernetes client
    • BUBU_K8S_USER_AGENT
    • BUBU_K8S_TIMEOUT
    • BUBU_K8S_OPERATION_TIMEOUT
    • BUBU_K8S_PATCH_MAX_RETRIES
  • Effects and signals
    • BUBU_EFFECT_MAX_DETAILS_BYTES
    • BUBU_SIGNAL_MAX_PAYLOAD_BYTES

Use the website docs for the curated reference:

Local Development

git clone https://github.com/bubustack/bubu-sdk-go.git
cd bubu-sdk-go
make test
make lint

Support, Security, and Changelog

License

Copyright 2025 BubuStack.

Licensed under the Apache License, Version 2.0.

Documentation

Overview

Package sdk provides the primary entry points for executing bobrapet components.

This package contains the runtime logic that bootstraps an Engram or Impulse, injects the necessary context from the environment, and manages its lifecycle. Developers typically interact with StartBatch, StartStreaming, or RunImpulse from their main.go file.

Entry Points

For batch engrams (Jobs):

sdk.StartBatch(ctx, myEngram)

For streaming engrams (Deployments with gRPC):

sdk.StartStreaming(ctx, myStreamingEngram)

For impulses (Deployments that trigger Stories):

sdk.RunImpulse(ctx, myImpulse)

Environment-Driven Configuration

The SDK is controlled entirely by environment variables injected by the bobrapet operator. SDK defaults are fallback values for local development. See docs/reference/config.md for the complete environment variable reference.

Concurrency and Cancellation

All entry points respect context cancellation. Batch engrams enforce a timeout via BUBU_STEP_TIMEOUT. Streaming engrams implement graceful shutdown on SIGTERM with configurable drain timeouts via BUBU_SDK_GRACEFUL_SHUTDOWN_TIMEOUT.

Error Handling

Entry points return errors for initialization and execution failures. Batch engrams additionally patch StepRun status with exit codes for operator retry policy classification (exit code 124 for timeouts, 1 for logic errors, 0 for success).

Index

Constants

View Source
const (
	// DefaultChannelBufferSize is the in-memory buffer used for Engram stream channels.
	DefaultChannelBufferSize = 16
	// DefaultMaxMessageSize caps gRPC message sizes when talking to connectors.
	DefaultMaxMessageSize = 10 * 1024 * 1024
)
View Source
const (
	// ReasonTimeout indicates the step failed because it exceeded its deadline.
	ReasonTimeout = "Timeout"
)

Variables

View Source
var ErrBatchTimeout = errors.New("bubu batch execution timed out")

ErrBatchTimeout is a sentinel used with errors.Is to detect batch timeouts.

View Source
var ErrEffectAlreadyRecorded = errors.New("effect already recorded")

ErrEffectAlreadyRecorded indicates that the requested effect key is already present in the current StepRun effect ledger.

View Source
var ErrEffectsUnavailable = errors.New("effect recording unavailable: not running inside a StepRun")

ErrEffectsUnavailable indicates that the current process cannot record effects (e.g., it is not running inside a StepRun workload). Callers may treat this as a soft failure and continue without recording the effect.

View Source
var ErrImpulseSessionExists = errors.New("impulse session already active")

ErrImpulseSessionExists indicates that a dispatcher session key is already active.

View Source
var ErrImpulseSessionNotFound = errors.New("impulse session not found")

ErrImpulseSessionNotFound indicates that a dispatcher session key has no active session.

View Source
var ErrLogsUnavailable = errors.New("log publishing unavailable: not running inside a StepRun or storage disabled")

ErrLogsUnavailable indicates that logs cannot be published (for example, when not running inside a StepRun or when storage is disabled). Callers may treat this as a soft failure.

View Source
var ErrSignalsUnavailable = errors.New("signal emission unavailable: not running inside a StepRun")

ErrSignalsUnavailable indicates that the current process cannot emit signals (e.g., it is not running inside a StepRun workload). Callers may treat this as a soft failure and continue without emitting metadata.

View Source
var ErrStoryRunNotFound = errors.New("storyrun not found")

ErrStoryRunNotFound indicates that a requested StoryRun could not be located.

Functions

func BatchExitCode added in v0.1.2

func BatchExitCode(err error) int

BatchExitCode returns the recommended container exit code for an error. Timeout errors map to 124 (GNU timeout), all other non-nil errors default to 1.

func DebugModeEnabled added in v0.1.2

func DebugModeEnabled() bool

DebugModeEnabled reports whether verbose logging should be forced regardless of logger level.

func EmitSequencedSignal added in v0.1.2

func EmitSequencedSignal(ctx context.Context, key string, seq uint64, value any) error

EmitSequencedSignal emits a signal wrapped with a sequence number and timestamp.

func EmitSignal added in v0.1.2

func EmitSignal(ctx context.Context, key string, value any) error

EmitSignal patches the current StepRun status with a small JSON payload so that controllers and CEL expressions can react to live metadata. When value is nil, the signal is cleared. The latest value stored in status.signals is always a compact summary so raw payloads are not persisted in StepRun status. Signal events are sequenced by default.

func EmitSignalWithSequence added in v0.1.2

func EmitSignalWithSequence(ctx context.Context, key string, seq *SignalSequence, value any) error

EmitSignalWithSequence emits a signal using the provided sequence generator.

func EmitTextSignal added in v0.1.2

func EmitTextSignal(ctx context.Context, key string, text string, opts TextSignalOptions) error

EmitTextSignal emits a metadata summary for text payloads. By default it keeps metadata-only behavior; callers can opt into bounded inline samples via SampleBytes or SampleExtras. The full payload should still be passed via outputs/storage references when downstream steps need the complete content.

func ExecuteEffectOnce added in v0.1.2

func ExecuteEffectOnce(ctx context.Context, key string, fn func(context.Context) (any, error)) (any, bool, error)

ExecuteEffectOnce runs fn only if the effect key has not been recorded yet. It records a successful effect with the returned details. When the effect already exists, it returns `already=true` with `ErrEffectAlreadyRecorded`.

func ExecutionMode added in v0.1.2

func ExecutionMode() string

ExecutionMode returns the execution mode supplied by the operator (e.g. "batch", "job", "deployment"). Defaults to "batch" when not set so local runs behave sensibly.

func ExtractTraceContext added in v0.1.2

func ExtractTraceContext(ctx context.Context, msg *engram.StreamMessage) context.Context

ExtractTraceContext restores tracing context from a StreamMessage's metadata so Engrams can start child spans that are linked to upstream steps.

func HasEffect added in v0.1.2

func HasEffect(ctx context.Context, key string) (bool, error)

HasEffect returns true if the current StepRun already recorded an effect for the key.

func LoggerFromContext

func LoggerFromContext(ctx context.Context) *slog.Logger

LoggerFromContext retrieves a slog.Logger from the context, or returns a default JSON logger.

If no logger was previously stored via WithLogger, this function returns a new JSON logger writing to stdout with default settings. This ensures the SDK always has a valid logger without requiring explicit configuration for simple use cases.

Thread-safe and idempotent.

func NewStreamErrorMessage added in v0.1.2

func NewStreamErrorMessage(
	errObj runsv1alpha1.StructuredError,
	opts ...StreamMessageOption) (engram.StreamMessage,
	error,
)

NewStreamErrorMessage wraps a StructuredError into a StreamMessage payload with Kind "error".

func NewStreamMessage added in v0.1.2

func NewStreamMessage(kind string, opts ...StreamMessageOption) engram.StreamMessage

NewStreamMessage constructs an engram.StreamMessage pre-populated with the provided options. It trims the kind identifier and applies any options in order. Note that metadata-only messages are invalid at send time; at least one of audio/video/binary payloads, JSON payload, inputs, or transports must be populated for the message to be published.

func NewStructuredError added in v0.1.2

func NewStructuredError(typ runsv1alpha1.StructuredErrorType, message string, opts ...StructuredErrorOption) error

NewStructuredError returns an error that carries a StructuredError payload.

func ParseStreamErrorMessage added in v0.1.2

func ParseStreamErrorMessage(msg engram.StreamMessage) (runsv1alpha1.StructuredError, bool, error)

ParseStreamErrorMessage extracts StructuredError payloads from StreamMessage Kind "error".

func PublishLogFile added in v0.1.2

func PublishLogFile(ctx context.Context, path string, contentType string) error

PublishLogFile reads the provided file and publishes its contents as logs.

func PublishLogs added in v0.1.2

func PublishLogs(ctx context.Context, payload []byte) error

PublishLogs uploads log bytes to storage (when enabled) and patches StepRun.status.logs with a storage reference. If storage is disabled, it returns ErrLogsUnavailable.

func PublishLogsWithContentType added in v0.1.2

func PublishLogsWithContentType(ctx context.Context, payload []byte, contentType string) error

PublishLogsWithContentType is like PublishLogs but lets callers set a content type.

func RecordEffect added in v0.1.2

func RecordEffect(ctx context.Context, key, status string, details any) error

RecordEffect appends an effect record to the current StepRun status ledger. The effect sequence is assigned server-side when Seq is 0.

func ReplaySignals added in v0.1.2

func ReplaySignals(
	ctx context.Context,
	stepRunName,
	namespace string,
	sinceSeq uint64) ([]runsv1alpha1.SignalEvent,
	error,
)

ReplaySignals returns the signal events for a StepRun, optionally filtered by sequence. When stepRunName or namespace are empty, environment defaults are used.

func RunBatch

func RunBatch[C any, I any](ctx context.Context, e engram.BatchEngram[C, I]) error

RunBatch is the primary entry point for a BatchEngram. It provides a fully type-safe execution environment, handling all the boilerplate of context loading, data hydration, and status patching.

func RunImpulse

func RunImpulse[C any](ctx context.Context, i engram.Impulse[C]) error

RunImpulse is the type-safe entry point for impulses (Kubernetes Deployments that trigger Stories).

This function infers config type C from the impulse implementation, providing compile-time type safety. It orchestrates the complete lifecycle:

  1. Load execution context from environment (BUBU_STEP_CONFIG, BUBU_TRIGGER_DATA, etc.)
  2. Unmarshal config into type C
  3. Call impulse.Init with typed config and secrets
  4. Create pre-configured Kubernetes client with namespace resolution
  5. Call impulse.Run with client, transferring control to long-running process

The impulse's Run method should block until work completes or context is canceled. Typical use cases: webhook listeners, message queue consumers, schedulers, event watchers.

Respects context cancellation for graceful shutdown on SIGTERM. The impulse is responsible for handling shutdown signals within its Run implementation (e.g., draining in-flight requests).

Example:

type MyConfig struct {
    WebhookPort int    `mapstructure:"webhookPort"`
    SecretToken string `mapstructure:"secretToken"`
}

type MyImpulse struct { /* ... */ }

func (m *MyImpulse) Init(ctx context.Context, cfg MyConfig, secrets *engram.Secrets) error {
    // Setup webhook server, validate token, etc.
    return nil
}

func (m *MyImpulse) Run(ctx context.Context, client *k8s.Client) error {
    // Listen for webhooks, trigger stories via client.TriggerStory(...)
    <-ctx.Done()
    return ctx.Err()
}

func main() {
    if err := sdk.RunImpulse(context.Background(), &MyImpulse{}); err != nil {
        panic(err)
    }
}

func Start added in v0.1.2

func Start[C any, I any](ctx context.Context, e DualEngram[C, I]) error

Start launches the provided engram in either batch or streaming mode based on ExecutionMode. This lets engram entrypoints avoid direct environment inspection and stick to the SDK abstraction.

func StartBatch

func StartBatch[C any, I any](ctx context.Context, e engram.BatchEngram[C, I]) error

StartBatch is the type-safe entry point for batch engrams (Kubernetes Jobs).

This function infers both config type C and input type I from the engram implementation, providing full compile-time type safety. It orchestrates the complete lifecycle:

  1. Load execution context from environment (BUBU_STEP_CONFIG, BUBU_TRIGGER_DATA, etc.)
  2. Unmarshal config and inputs into types C and I
  3. Call engram.Init with typed config and secrets
  4. Hydrate inputs from storage if needed
  5. Call engram.Process with typed inputs and execution context
  6. Dehydrate outputs to storage if they exceed size limits
  7. Patch StepRun status with result, timing, and exit code

Enforces timeout via BUBU_STEP_TIMEOUT with context cancellation. On timeout, patches status with exit code 124 (retryable) and forcefully exits to prevent zombie Jobs. On logic errors, patches with exit code 1 (terminal). On success, patches with exit code 0.

Example:

type MyConfig struct { APIKey string `mapstructure:"apiKey"` }
type MyInputs struct { UserID string `mapstructure:"userId"` }

func main() {
    ctx := context.Background()
    if err := sdk.StartBatch(ctx, NewMyEngram()); err != nil {
        panic(err)  // Ensure non-zero exit for Job failure detection
    }
}

func StartStory

func StartStory(ctx context.Context, storyName string, inputs map[string]any) (*runsv1alpha1.StoryRun, error)

StartStory triggers a new StoryRun for the named Story with the provided inputs. For cross-namespace executions, use StartStoryInNamespace to explicitly set the Story namespace.

This is the primary mechanism for programmatically initiating workflows, typically used from within an Impulse. The SDK automatically resolves the correct namespace from environment variables (BUBU_TARGET_STORY_NAMESPACE or fallbacks), creates a Kubernetes client, and submits the StoryRun resource.

Inputs are marshaled to JSON and stored in the StoryRun spec. The operator watches for new StoryRuns and orchestrates their execution.

Returns the created StoryRun on success, or an error if client creation or StoryRun creation fails. Respects context cancellation and deadlines.

Example:

sr, err := sdk.StartStory(ctx, "my-workflow", map[string]any{
    "userId": "12345",
    "action": "process",
})
if err != nil {
    return fmt.Errorf("failed to trigger story: %w", err)
}
log.Printf("Triggered StoryRun: %s", sr.Name)

func StartStoryInNamespace added in v0.1.2

func StartStoryInNamespace(
	ctx context.Context,
	storyName string,
	storyNamespace string,
	inputs map[string]any,
) (*runsv1alpha1.StoryRun, error)

StartStoryInNamespace is identical to StartStory but allows specifying the namespace of the referenced Story explicitly. Provide an empty namespace to use the default resolution (BUBU_TARGET_STORY_NAMESPACE or the pod namespace).

func StartStoryWithToken added in v0.1.2

func StartStoryWithToken(
	ctx context.Context,
	storyName string,
	token string,
	inputs map[string]any,
) (*runsv1alpha1.StoryRun, error)

StartStoryWithToken behaves like StartStory but accepts a per-call token for deterministic retries.

func StartStoryWithTokenInNamespace added in v0.1.2

func StartStoryWithTokenInNamespace(
	ctx context.Context,
	storyName string,
	storyNamespace string,
	token string,
	inputs map[string]any,
) (*runsv1alpha1.StoryRun, error)

StartStoryWithTokenInNamespace behaves like StartStoryInNamespace but accepts a per-call token for deterministic retries.

func StartStreamServer

func StartStreamServer[C any](ctx context.Context, e engram.StreamingEngram[C]) error

StartStreamServer boots a StreamingEngram using the new transport connector contract. The Engram must have BUBU_TRANSPORT_BINDING set; no other transport modes are supported.

func StartStreaming

func StartStreaming[C any](ctx context.Context, e engram.StreamingEngram[C]) error

StartStreaming is the type-safe entry point for streaming engrams (Kubernetes Deployments with gRPC).

This function infers config type C from the engram implementation, providing compile-time type safety. It orchestrates the complete lifecycle:

  1. Load execution context from environment (BUBU_STEP_CONFIG, etc.)
  2. Unmarshal config into type C
  3. Call engram.Init with typed config and secrets
  4. Start gRPC server on BUBU_GRPC_PORT (default 50051)
  5. Register engram.Stream as the bidirectional streaming handler
  6. Serve until context cancellation (SIGTERM) or error
  7. Gracefully drain active streams before shutdown

The gRPC server implements:

  • Transparent heartbeat sending/filtering to detect connection hangs
  • Backpressure handling with configurable timeouts
  • Graceful shutdown with BUBU_GRPC_GRACEFUL_SHUTDOWN_TIMEOUT drain phase
  • Optional TLS via BUBU_GRPC_TLS_CERT_FILE and BUBU_GRPC_TLS_KEY_FILE
  • Configurable message size limits via BUBU_GRPC_MAX_RECV_BYTES and BUBU_GRPC_MAX_SEND_BYTES

Example:

type MyConfig struct { BufferSize int `mapstructure:"bufferSize"` }

func main() {
    ctx := context.Background()
    if err := sdk.StartStreaming(ctx, NewMyStreamingEngram()); err != nil {
        panic(err)
    }
}

func StopStory added in v0.1.2

func StopStory(ctx context.Context, storyRunName string) error

StopStory cancels an in-flight StoryRun in the current namespace. Equivalent to StopStoryInNamespace with an empty namespace.

func StopStoryInNamespace added in v0.1.2

func StopStoryInNamespace(ctx context.Context, storyRunName, namespace string) error

StopStoryInNamespace cancels an in-flight StoryRun by marking it finished. If the StoryRun does not exist, ErrStoryRunNotFound is returned. Already-terminal StoryRuns are treated as a no-op. Active StoryRuns in phases the SDK will not force-finish return an invalid-transition error from the underlying k8s client.

func TriggerTokenFromContext added in v0.1.2

func TriggerTokenFromContext(ctx context.Context) string

TriggerTokenFromContext returns the trigger token stored in the context, if any.

func WithLogger

func WithLogger(ctx context.Context, logger *slog.Logger) context.Context

WithLogger stores a slog.Logger in the context for SDK use.

This allows you to inject a custom configured logger (e.g., with specific log levels, handlers, or structured attributes) that the SDK will use for all internal logging. If not provided, the SDK defaults to JSON logging to stdout.

Example:

logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
ctx := sdk.WithLogger(context.Background(), logger)
sdk.StartBatch(ctx, myEngram)

The logger is used for lifecycle events (init, shutdown), errors, and metrics. It does not intercept engram-specific logs; engrams should use their own loggers or retrieve the SDK logger via LoggerFromContext within their execution context.

func WithTriggerToken added in v0.1.2

func WithTriggerToken(ctx context.Context, token string) context.Context

WithTriggerToken attaches an idempotency token that StartStory passes through to the Kubernetes client. When provided, the SDK derives deterministic StoryRun names without relying on process-wide env vars. Nil contexts are accepted so callers can safely attach a token before choosing a base context.

Types

type BatchTimeoutError added in v0.1.2

type BatchTimeoutError struct {
	// Timeout is the configured duration limit that was exceeded.
	Timeout time.Duration
	// Cause is the underlying timeout-related error, when one is available.
	Cause error
}

BatchTimeoutError conveys that a batch engram exceeded its configured timeout.

func (*BatchTimeoutError) Error added in v0.1.2

func (e *BatchTimeoutError) Error() string

Error implements the error interface.

func (*BatchTimeoutError) Is added in v0.1.2

func (e *BatchTimeoutError) Is(target error) bool

Is allows errors.Is(err, ErrBatchTimeout) to match *BatchTimeoutError values.

func (*BatchTimeoutError) Unwrap added in v0.1.2

func (e *BatchTimeoutError) Unwrap() error

Unwrap exposes the underlying cause for errors.Unwrap / errors.Is checks.

type DualEngram added in v0.1.2

type DualEngram[C any, I any] interface {
	engram.BatchEngram[C, I]
	engram.StreamingEngram[C]
}

DualEngram is implemented by Engram types that support both batch and streaming modes.

type K8sClient

type K8sClient interface {
	// TriggerStory creates a new StoryRun for the named Story with the provided inputs.
	// The inputs map is marshaled to JSON and stored in the StoryRun's spec.inputs field.
	// Returns the created StoryRun on success, or an error if creation fails.
	// Respects context cancellation and deadlines.
	TriggerStory(
		ctx context.Context,
		storyName string,
		storyNamespace string,
		inputs map[string]any,
	) (*runsv1alpha1.StoryRun, error)

	// PatchStepRunStatus updates the status of the named StepRun with the provided patch data.
	// The implementation should use field-wise merging to avoid clobbering controller-managed
	// fields and implement retry-on-conflict logic to handle concurrent updates.
	// Respects context cancellation and deadlines.
	PatchStepRunStatus(ctx context.Context, stepRunName string, patchData runsv1alpha1.StepRunStatus) error
}

K8sClient defines the interface for Kubernetes operations required by the SDK.

This interface abstracts the SDK's dependency on Kubernetes, enabling mocking in tests and providing a stable contract. Implementations must be safe for concurrent use by multiple goroutines.

The SDK's default implementation (k8s.Client) provides:

  • Automatic namespace resolution from environment variables
  • Retry-on-conflict logic for status patches
  • Phase transition validation to prevent state corruption
  • OpenTelemetry metrics for operation latency and success/failure

Custom implementations should follow the same concurrency and idempotency guarantees.

type SequencedSignal added in v0.1.2

type SequencedSignal struct {
	// Seq is the monotonic sequence number assigned to the signal event.
	Seq uint64 `json:"seq"`
	// EmittedAt records when the signal was emitted in UTC.
	EmittedAt time.Time `json:"emittedAt,omitempty"`
	// Value carries the original signal payload associated with Seq.
	Value any `json:"value,omitempty"`
}

SequencedSignal wraps a signal payload with a monotonically increasing sequence.

type SignalEnvelope added in v0.1.2

type SignalEnvelope struct {
	// Meta summarizes the full payload without embedding it directly in StepRun status.
	Meta SignalMeta `json:"meta,omitempty"`
	// Sample carries an optional, already-sanitized sample payload when callers choose to include one.
	Sample any `json:"sample,omitempty"`
}

SignalEnvelope is the standard signal structure for metadata + optional samples.

type SignalMeta added in v0.1.2

type SignalMeta struct {
	// Format identifies the logical payload shape, such as "text" or "json".
	Format string `json:"format,omitempty"`
	// ContentType carries the MIME content type when one is known.
	ContentType string `json:"contentType,omitempty"`
	// SizeBytes reports the original payload size in bytes.
	SizeBytes int `json:"sizeBytes,omitempty"`
	// HashSHA256 optionally carries the lowercase hex SHA-256 digest of the original payload.
	HashSHA256 string `json:"hashSha256,omitempty"`
	// Attributes carries additional safe, non-secret metadata about the payload.
	Attributes map[string]string `json:"attributes,omitempty"`
}

SignalMeta describes a lightweight summary of a payload that is safe for signals.

type SignalSequence added in v0.1.2

type SignalSequence struct {
	// contains filtered or unexported fields
}

SignalSequence is a process-local monotonic sequence generator for signals.

func NewSignalSequence added in v0.1.2

func NewSignalSequence(start uint64) *SignalSequence

NewSignalSequence constructs a SignalSequence starting at the provided value.

func (*SignalSequence) Next added in v0.1.2

func (s *SignalSequence) Next() uint64

Next increments and returns the next sequence value.

type StorageManager

type StorageManager interface {
	// Hydrate recursively scans a data structure for storage references and replaces
	// them with the actual content from the storage backend. Returns the hydrated
	// data on success, or an error if reading fails or a reference is invalid.
	// Respects context cancellation and enforces BUBU_STORAGE_TIMEOUT.
	Hydrate(ctx context.Context, data any) (any, error)

	// Dehydrate recursively checks the size of a data structure. If any part exceeds
	// the inline size limit (BUBU_MAX_INLINE_SIZE), it saves that part to the storage
	// backend and replaces it with a storage reference. Returns the dehydrated data
	// (potentially containing references) on success, or an error if writing fails.
	// Respects context cancellation and enforces BUBU_STORAGE_TIMEOUT.
	Dehydrate(ctx context.Context, data any, stepRunID string) (any, error)
}

StorageManager defines the interface for storage operations required by the SDK.

This interface provides transparent data offloading for large inputs and outputs, automatically handling marshaling, storage backend operations, and reference tracking. Implementations must be safe for concurrent use by multiple goroutines.

The SDK's default implementation (storage.Manager) provides:

  • Automatic size-based offloading (configurable via BUBU_MAX_INLINE_SIZE)
  • Recursive hydration/dehydration of nested structures
  • Support for S3 and file storage backends
  • Path traversal protection and validation
  • OpenTelemetry metrics for operation latency and data sizes

Storage references use one of the formats below:

  • {"$bubuStorageRef": "outputs/steprun-id/path.json"}
  • {"$bubuStorageRef": "outputs/steprun-id/path.json", "$bubuStoragePath":"result.text"}
  • {"$bubuConfigMapRef": "namespace/name:key"}
  • {"$bubuSecretRef": "namespace/name:key"}
  • {"$bubuConfigMapRef": {"name":"cfg","key":"payload","namespace":"ns","format":"json"}}

Supported formats for ConfigMap/Secret refs: auto (default), json, raw. When namespace is omitted, the SDK defaults to BUBU_POD_NAMESPACE/BUBU_STEPRUN_NAMESPACE.

type StoryDispatcher added in v0.1.2

type StoryDispatcher struct {
	// contains filtered or unexported fields
}

StoryDispatcher manages StoryRun lifecycles on behalf of an impulse, providing session tracking and idempotent stop semantics.

func NewStoryDispatcher added in v0.1.2

func NewStoryDispatcher(opts ...StoryDispatcherOption) *StoryDispatcher

NewStoryDispatcher creates a StoryDispatcher with optional configuration.

func (*StoryDispatcher) Forget added in v0.1.2

func (d *StoryDispatcher) Forget(key string)

Forget removes a session without attempting to stop the StoryRun.

func (*StoryDispatcher) HasSession added in v0.1.2

func (d *StoryDispatcher) HasSession(key string) bool

HasSession reports whether a session is currently tracked for the key.

func (*StoryDispatcher) Session added in v0.1.2

func (d *StoryDispatcher) Session(key string) (*StorySession, bool)

Session returns the session metadata for a key without mutating state.

func (*StoryDispatcher) Stop added in v0.1.2

func (d *StoryDispatcher) Stop(ctx context.Context, key string) (*StorySession, error)

Stop cancels the StoryRun associated with the session key. Returns the session metadata when successful.

func (*StoryDispatcher) Trigger added in v0.1.2

Trigger starts a StoryRun and optionally records a session keyed by req.Key.

If req.StoryName is empty, the target story is resolved from the Impulse's spec.storyRef via GetTargetStory(). This allows impulses to omit the story name in their trigger requests, relying on the operator-injected environment variables instead.

type StoryDispatcherOption added in v0.1.2

type StoryDispatcherOption func(*StoryDispatcher)

StoryDispatcherOption configures a StoryDispatcher.

func WithStoryRuntime added in v0.1.2

func WithStoryRuntime(
	start func(
		ctx context.Context,
		storyName string,
		storyNamespace string,
		inputs map[string]any,
	) (*runsv1alpha1.StoryRun, error),
	stop func(ctx context.Context, storyRunName, storyNamespace string) error,
) StoryDispatcherOption

WithStoryRuntime overrides the start/stop implementation used by the dispatcher. Intended primarily for tests.

type StorySession added in v0.1.2

type StorySession struct {
	// Key is the dispatcher-local session key used to look up or stop the StoryRun later.
	Key string
	// StoryRun is the created StoryRun resource name.
	StoryRun string
	// Namespace is the namespace that owns StoryRun.
	Namespace string
	// StoryName is the logical Story that produced StoryRun.
	StoryName string
	// StartedAt records when the dispatcher observed the StoryRun as started.
	StartedAt time.Time
	// Metadata carries optional impulse-owned attributes associated with the session.
	Metadata map[string]string
}

StorySession holds metadata about an active StoryRun started by an impulse.

type StoryTriggerRequest added in v0.1.2

type StoryTriggerRequest struct {
	// Key optionally reserves a dispatcher session slot for later Stop/Forget calls.
	Key string
	// TriggerToken enables idempotent StoryRun creation when the caller provides one.
	TriggerToken string
	// StoryName overrides the target story name; when empty the dispatcher resolves it from the Impulse environment.
	StoryName string
	//nolint:lll,lll
	// StoryNamespace overrides the target story namespace; when empty the dispatcher resolves it from the Impulse environment.
	StoryNamespace string
	// Inputs contains the structured trigger payload forwarded to the new StoryRun.
	Inputs map[string]any
	// Metadata carries caller-defined session metadata stored only in the local dispatcher session.
	Metadata map[string]string
}

StoryTriggerRequest defines the inputs required to trigger a story.

type StoryTriggerResult added in v0.1.2

type StoryTriggerResult struct {
	// StoryRun is the created Kubernetes StoryRun object returned by the runtime.
	StoryRun *runsv1alpha1.StoryRun
	// Session is the dispatcher-tracked session metadata when a session key was requested.
	Session *StorySession
}

StoryTriggerResult returns the StoryRun created by the dispatcher and the associated session.

type StreamMessageOption added in v0.1.2

type StreamMessageOption func(*engram.StreamMessage)

StreamMessageOption configures an engram.StreamMessage produced via NewStreamMessage.

func WithBinaryPayload added in v0.1.2

func WithBinaryPayload(mime string, payload []byte, timestamp time.Duration) StreamMessageOption

WithBinaryPayload attaches an arbitrary binary payload plus MIME type. The payload is copied.

func WithInputs added in v0.1.2

func WithInputs(inputs []byte) StreamMessageOption

WithInputs attaches CEL-evaluated inputs (already marshaled JSON).

func WithJSONData added in v0.1.2

func WithJSONData(v any) (StreamMessageOption, error)

WithJSONData marshals the provided value to JSON and attaches it as the payload. It returns an option alongside any marshaling error so callers can handle failures inline.

func WithJSONPayload added in v0.1.2

func WithJSONPayload(payload []byte) StreamMessageOption

WithJSONPayload attaches a JSON payload (already marshaled). The byte slice is copied.

func WithMessageID added in v0.1.2

func WithMessageID(id string) StreamMessageOption

WithMessageID sets the message identifier for correlation across steps.

func WithMetadata added in v0.1.2

func WithMetadata(metadata map[string]string) StreamMessageOption

WithMetadata merges the supplied metadata into the message, cloning the map to prevent callers from mutating shared state.

func WithStreamEnvelope added in v0.1.2

func WithStreamEnvelope(env *transportpb.StreamEnvelope) StreamMessageOption

WithStreamEnvelope attaches transport-layer stream sequencing metadata.

func WithTimestamp added in v0.1.2

func WithTimestamp(ts time.Time) StreamMessageOption

WithTimestamp overrides the message timestamp. Zero values are ignored.

func WithTransports added in v0.1.2

func WithTransports(descriptors []engram.TransportDescriptor) StreamMessageOption

WithTransports records the story's declared transports for downstream inspection.

type StructuredErrorOption added in v0.1.2

type StructuredErrorOption func(*structuredError)

StructuredErrorOption mutates a structured error before it is returned.

func WithStructuredErrorCause added in v0.1.2

func WithStructuredErrorCause(cause error) StructuredErrorOption

WithStructuredErrorCause preserves the underlying error for wrapping/unwrapping.

func WithStructuredErrorCode added in v0.1.2

func WithStructuredErrorCode(code string) StructuredErrorOption

WithStructuredErrorCode sets a component-specific error code.

func WithStructuredErrorDetails added in v0.1.2

func WithStructuredErrorDetails(details map[string]any) StructuredErrorOption

WithStructuredErrorDetails attaches structured metadata for diagnostics.

func WithStructuredErrorExitClass added in v0.1.2

func WithStructuredErrorExitClass(exitClass enums.ExitClass) StructuredErrorOption

WithStructuredErrorExitClass sets the desired exit class ("retry", "terminal", etc.).

func WithStructuredErrorRetryable added in v0.1.2

func WithStructuredErrorRetryable(retryable bool) StructuredErrorOption

WithStructuredErrorRetryable annotates the error as retryable/terminal.

type StructuredErrorProvider added in v0.1.2

type StructuredErrorProvider interface {
	StructuredError() runsv1alpha1.StructuredError
}

StructuredErrorProvider allows errors to supply a versioned StructuredError payload.

type TargetStory added in v0.1.2

type TargetStory struct {
	// Name is the Story name from Impulse.spec.storyRef.name.
	Name string

	// Namespace is the Story namespace from Impulse.spec.storyRef.namespace.
	// Empty if the Story is in the same namespace as the Impulse.
	Namespace string
}

TargetStory holds the target story information resolved from the Impulse's storyRef. This is set by the operator via environment variables.

func GetTargetStory added in v0.1.2

func GetTargetStory() (TargetStory, error)

GetTargetStory returns the target story configured via the Impulse's spec.storyRef. The operator injects these values as BUBU_TARGET_STORY_NAME and BUBU_TARGET_STORY_NAMESPACE environment variables when running an Impulse pod.

Returns an error if BUBU_TARGET_STORY_NAME is not set, as the Impulse CRD requires a storyRef to be specified.

Example:

target, err := sdk.GetTargetStory()
if err != nil {
    return fmt.Errorf("no target story configured: %w", err)
}
sr, err := sdk.StartStoryInNamespace(ctx, target.Name, target.Namespace, inputs)

func MustGetTargetStory added in v0.1.2

func MustGetTargetStory() TargetStory

MustGetTargetStory is like GetTargetStory but panics if the target story is not configured. Useful in main() where early failure is preferred over error handling.

type TextSignalOptions added in v0.1.2

type TextSignalOptions struct {
	// Format overrides the logical payload format label stored in SignalMeta.Format.
	Format string
	// ContentType sets SignalMeta.ContentType for the emitted signal.
	ContentType string
	// SampleBytes caps the inline sample size in bytes when callers choose to include a text sample.
	// When zero, EmitTextSignal preserves the default metadata-only behavior unless SampleExtras is set.
	SampleBytes int
	// IncludeHash enables SHA-256 hashing of the original text into SignalMeta.HashSHA256.
	IncludeHash bool
	// Attributes attaches additional safe metadata to SignalMeta.Attributes.
	Attributes map[string]string
	// SampleExtras adds additional sample metadata alongside the sampled text.
	SampleExtras map[string]any
}

TextSignalOptions controls how text payloads are summarized for signals.

type TransportConnectorClient added in v0.1.2

type TransportConnectorClient struct {
	// contains filtered or unexported fields
}

TransportConnectorClient wraps the generic Engram↔connector gRPC contract. It dials the connector endpoint advertised via the TransportBinding env payload.

func DialTransportConnector added in v0.1.2

func DialTransportConnector(
	ctx context.Context,
	endpoint string,
	opts ...grpc.DialOption,
) (*TransportConnectorClient, error)

DialTransportConnector establishes a gRPC client connection to the generic transport connector. Endpoint may be tcp host:port or a unix domain socket (prefixed with unix://).

func (*TransportConnectorClient) Client added in v0.1.2

Client exposes the underlying generated gRPC client.

func (*TransportConnectorClient) Close added in v0.1.2

func (c *TransportConnectorClient) Close() error

Close tears down the connector connection.

Directories

Path Synopsis
package engram defines the core interfaces that developers implement to create components for the bobrapet ecosystem.
package engram defines the core interfaces that developers implement to create components for the bobrapet ecosystem.
integration
Package k8s contains the SDK's controller-runtime client helpers for StoryRun, StepRun, and Impulse operations.
Package k8s contains the SDK's controller-runtime client helpers for StoryRun, StepRun, and Impulse operations.
Package media provides helpers for offloading large streaming payloads to shared object storage while keeping small payloads inline.
Package media provides helpers for offloading large streaming payloads to shared object storage while keeping small payloads inline.
pkg
env
Package env provides small helpers for parsing validated environment overrides used across the SDK runtime.
Package env provides small helpers for parsing validated environment overrides used across the SDK runtime.
metrics
Package metrics provides OpenTelemetry-based observability for the SDK and allows developers to register custom metrics for their engrams and impulses.
Package metrics provides OpenTelemetry-based observability for the SDK and allows developers to register custom metrics for their engrams and impulses.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL