Documentation
¶
Overview ¶
Package sdk provides the primary entry points for executing bobrapet components.
This package contains the runtime logic that bootstraps an Engram or Impulse, injects the necessary context from the environment, and manages its lifecycle. Developers typically interact with StartBatch, StartStreaming, or RunImpulse from their main.go file.
Entry Points ¶
For batch engrams (Jobs):
sdk.StartBatch(ctx, myEngram)
For streaming engrams (Deployments with gRPC):
sdk.StartStreaming(ctx, myStreamingEngram)
For impulses (Deployments that trigger Stories):
sdk.RunImpulse(ctx, myImpulse)
Environment-Driven Configuration ¶
The SDK is controlled entirely by environment variables injected by the bobrapet operator. SDK defaults are fallback values for local development. See docs/reference/config.md for the complete environment variable reference.
Concurrency and Cancellation ¶
All entry points respect context cancellation. Batch engrams enforce a timeout via BUBU_STEP_TIMEOUT. Streaming engrams implement graceful shutdown on SIGTERM with configurable drain timeouts via BUBU_GRPC_GRACEFUL_SHUTDOWN_TIMEOUT.
Error Handling ¶
Entry points return errors for initialization and execution failures. Batch engrams additionally patch StepRun status with exit codes for operator retry policy classification (exit code 124 for timeouts, 1 for logic errors, 0 for success).
Index ¶
- Constants
- func LoggerFromContext(ctx context.Context) *slog.Logger
- func RunBatch[C any, I any](ctx context.Context, e engram.BatchEngram[C, I]) error
- func RunImpulse[C any](ctx context.Context, i engram.Impulse[C]) error
- func StartBatch[C any, I any](ctx context.Context, e engram.BatchEngram[C, I]) error
- func StartStory(ctx context.Context, storyName string, inputs map[string]any) (*runsv1alpha1.StoryRun, error)
- func StartStreamServer[C any](ctx context.Context, e engram.StreamingEngram[C]) error
- func StartStreaming[C any](ctx context.Context, e engram.StreamingEngram[C]) error
- func StreamTo(ctx context.Context, target string, in <-chan []byte, out chan<- []byte) error
- func StreamToWithMetadata(ctx context.Context, target string, in <-chan engram.StreamMessage, ...) error
- func WithLogger(ctx context.Context, logger *slog.Logger) context.Context
- type K8sClient
- type StorageManager
Constants ¶
const ( // DefaultChannelBufferSize is the buffer size for gRPC streaming channels. // // A buffer of 16 provides reasonable throughput while limiting memory usage. // Override via BUBU_GRPC_CHANNEL_BUFFER_SIZE for workloads with different // latency/throughput profiles. DefaultChannelBufferSize = 16 // DefaultGRPCPort is the default port for gRPC servers in streaming mode. // // Override via BUBU_GRPC_PORT. The operator typically sets this to 50051. DefaultGRPCPort = "50051" // DefaultMessageTimeout is the default timeout for individual message operations. // // Prevents indefinite hangs on network stalls. Override via BUBU_GRPC_MESSAGE_TIMEOUT. DefaultMessageTimeout = 30 * time.Second // DefaultMaxMessageSize is the default max message size for gRPC (10 MiB). // // Override via BUBU_GRPC_MAX_RECV_BYTES and BUBU_GRPC_MAX_SEND_BYTES. // Larger messages should use storage offloading instead of increasing this limit. DefaultMaxMessageSize = 10 * 1024 * 1024 // Client buffer defaults (bounded) DefaultClientBufferMaxMessages = 100 DefaultClientBufferMaxBytes = 10 * 1024 * 1024 // 10 MiB )
Variables ¶
This section is empty.
Functions ¶
func LoggerFromContext ¶
LoggerFromContext retrieves a slog.Logger from the context, or returns a default JSON logger.
If no logger was previously stored via WithLogger, this function returns a new JSON logger writing to stdout with default settings. This ensures the SDK always has a valid logger without requiring explicit configuration for simple use cases.
Thread-safe and idempotent.
func RunBatch ¶
RunBatch is the primary entry point for a BatchEngram. It provides a fully type-safe execution environment, handling all the boilerplate of context loading, data hydration, and status patching.
func RunImpulse ¶
RunImpulse is the type-safe entry point for impulses (Kubernetes Deployments that trigger Stories).
This function infers config type C from the impulse implementation, providing compile-time type safety. It orchestrates the complete lifecycle:
- Load execution context from environment (BUBU_CONFIG, BUBU_IMPULSE_WITH, etc.)
- Merge BUBU_IMPULSE_WITH JSON into config if provided (for operator injection)
- Unmarshal config into type C
- Call impulse.Init with typed config and secrets
- Create pre-configured Kubernetes client with namespace resolution
- Call impulse.Run with client, transferring control to long-running process
The impulse's Run method should block until work completes or context is canceled. Typical use cases: webhook listeners, message queue consumers, schedulers, event watchers.
Respects context cancellation for graceful shutdown on SIGTERM. The impulse is responsible for handling shutdown signals within its Run implementation (e.g., draining in-flight requests).
Example:
type MyConfig struct {
WebhookPort int `mapstructure:"webhookPort"`
SecretToken string `mapstructure:"secretToken"`
}
type MyImpulse struct { /* ... */ }
func (m *MyImpulse) Init(ctx context.Context, cfg MyConfig, secrets *engram.Secrets) error {
// Setup webhook server, validate token, etc.
return nil
}
func (m *MyImpulse) Run(ctx context.Context, client *k8s.Client) error {
// Listen for webhooks, trigger stories via client.TriggerStory(...)
<-ctx.Done()
return ctx.Err()
}
func main() {
if err := sdk.RunImpulse(context.Background(), &MyImpulse{}); err != nil {
panic(err)
}
}
func StartBatch ¶
StartBatch is the type-safe entry point for batch engrams (Kubernetes Jobs).
This function infers both config type C and input type I from the engram implementation, providing full compile-time type safety. It orchestrates the complete lifecycle:
- Load execution context from environment (BUBU_CONFIG, BUBU_INPUTS, etc.)
- Unmarshal config and inputs into types C and I
- Call engram.Init with typed config and secrets
- Hydrate inputs from storage if needed
- Call engram.Process with typed inputs and execution context
- Dehydrate outputs to storage if they exceed size limits
- Patch StepRun status with result, timing, and exit code
Enforces timeout via BUBU_STEP_TIMEOUT with context cancellation. On timeout, patches status with exit code 124 (retryable) and forcefully exits to prevent zombie Jobs. On logic errors, patches with exit code 1 (terminal). On success, patches with exit code 0.
Example:
type MyConfig struct { APIKey string `mapstructure:"apiKey"` }
type MyInputs struct { UserID string `mapstructure:"userId"` }
func main() {
ctx := context.Background()
if err := sdk.StartBatch(ctx, NewMyEngram()); err != nil {
panic(err) // Ensure non-zero exit for Job failure detection
}
}
func StartStory ¶
func StartStory(ctx context.Context, storyName string, inputs map[string]any) (*runsv1alpha1.StoryRun, error)
StartStory triggers a new StoryRun for the named Story with the provided inputs.
This is the primary mechanism for programmatically initiating workflows, typically used from within an Impulse. The SDK automatically resolves the correct namespace from environment variables (BUBU_TARGET_STORY_NAMESPACE or fallbacks), creates a Kubernetes client, and submits the StoryRun resource.
Inputs are marshaled to JSON and stored in the StoryRun spec. The operator watches for new StoryRuns and orchestrates their execution.
Returns the created StoryRun on success, or an error if client creation or StoryRun creation fails. Respects context cancellation and deadlines.
Example:
sr, err := sdk.StartStory(ctx, "my-workflow", map[string]any{
"userId": "12345",
"action": "process",
})
if err != nil {
return fmt.Errorf("failed to trigger story: %w", err)
}
log.Printf("Triggered StoryRun: %s", sr.Name)
func StartStreamServer ¶
StartStreamServer is the main entry point for a StreamingEngram. This function bootstraps a long-running service that can process data in real-time over gRPC.
This function orchestrates the lifecycle of a streaming service:
- It loads the execution context for configuration and secrets.
- It calls the StreamingEngram's `Init` method.
- It starts a gRPC server on the configured port.
- It registers the StreamingEngram's `Stream` method as the gRPC handler.
- It gracefully handles server shutdown on context cancellation.
Streaming Delivery Guarantees ¶
The SDK provides reliable message delivery for direct engram-to-engram connections (peer-to-peer mode). In hub-and-spoke mode (primitives between streaming engrams), the Hub may drop messages if downstream engrams are not ready at the time of forwarding. For production use cases requiring guaranteed delivery:
- Use peer-to-peer mode (avoid primitives between streaming engrams), OR
- Implement application-level acknowledgment and retry in your engram logic, OR
- Wait for Hub buffering support (tracked in bobravoz-grpc roadmap)
func StartStreaming ¶
StartStreaming is the type-safe entry point for streaming engrams (Kubernetes Deployments with gRPC).
This function infers config type C from the engram implementation, providing compile-time type safety. It orchestrates the complete lifecycle:
- Load execution context from environment (BUBU_CONFIG, etc.)
- Unmarshal config into type C
- Call engram.Init with typed config and secrets
- Start gRPC server on BUBU_GRPC_PORT (default 50051)
- Register engram.Stream as the bidirectional streaming handler
- Serve until context cancellation (SIGTERM) or error
- Gracefully drain active streams before shutdown
The gRPC server implements:
- Transparent heartbeat sending/filtering to detect connection hangs
- Backpressure handling with configurable timeouts
- Graceful shutdown with BUBU_GRPC_GRACEFUL_SHUTDOWN_TIMEOUT drain phase
- Optional TLS via BUBU_GRPC_TLS_CERT_FILE and BUBU_GRPC_TLS_KEY_FILE
- Configurable message size limits via BUBU_GRPC_MAX_RECV_BYTES and BUBU_GRPC_MAX_SEND_BYTES
Example:
type MyConfig struct { BufferSize int `mapstructure:"bufferSize"` }
func main() {
ctx := context.Background()
if err := sdk.StartStreaming(ctx, NewMyStreamingEngram()); err != nil {
panic(err)
}
}
func StreamTo ¶
StreamTo connects to a downstream gRPC server and streams data to it (client side).
This function provides a simplified []byte channel API for backward compatibility. It wraps StreamToWithMetadata, converting []byte channels to StreamMessage channels with empty metadata and inputs fields.
For new code that requires tracing, correlation, or dynamic per-message configuration, use StreamToWithMetadata directly.
The function implements:
- Automatic reconnection on transient failures (configurable via BUBU_GRPC_RECONNECT_MAX_RETRIES)
- Exponential backoff with jitter (base/max configurable via env)
- Transparent heartbeat sending/receiving to detect connection hangs
- Optional TLS via BUBU_GRPC_CA_FILE or BUBU_GRPC_CLIENT_TLS=true
- Backpressure handling with configurable timeouts
Blocks until the input channel is closed, context is canceled, or a permanent error occurs. Respects context cancellation for graceful shutdown.
Example:
in := make(chan []byte, 16)
out := make(chan []byte, 16)
go func() {
defer close(in)
in <- []byte(`{"key": "value"}`)
}()
go func() {
for msg := range out {
log.Printf("Received: %s", msg)
}
}()
if err := sdk.StreamTo(ctx, "downstream-service:50051", in, out); err != nil {
return fmt.Errorf("streaming failed: %w", err)
}
func StreamToWithMetadata ¶
func StreamToWithMetadata( ctx context.Context, target string, in <-chan engram.StreamMessage, out chan<- engram.StreamMessage, ) error
StreamToWithMetadata connects to a downstream gRPC server with full metadata and inputs support (client side).
This function provides the full StreamMessage API, enabling:
- Metadata propagation for tracing (StoryRunID, StepName, custom trace IDs)
- Per-message dynamic configuration via the Inputs field (analogous to BUBU_INPUTS in batch mode)
- End-to-end correlation across streaming pipeline steps
The SDK automatically injects Hub metadata (storyrun-name, storyrun-namespace, current-step-id) from the execution context if available, enabling interop with the bobravoz Hub.
The function implements:
- Automatic reconnection on transient failures (Unavailable, ResourceExhausted, Aborted, DeadlineExceeded)
- Exponential backoff with jitter (configurable via BUBU_GRPC_RECONNECT_BASE_BACKOFF and _MAX_BACKOFF)
- Transparent heartbeat sending/filtering to detect connection hangs (BUBU_GRPC_HANG_TIMEOUT)
- Optional TLS via BUBU_GRPC_CA_FILE (custom CA) or BUBU_GRPC_CLIENT_TLS=true (system roots)
- Backpressure handling with timeouts (BUBU_GRPC_CHANNEL_SEND_TIMEOUT or BUBU_GRPC_MESSAGE_TIMEOUT)
- Configurable message size limits (BUBU_GRPC_CLIENT_MAX_RECV_BYTES, BUBU_GRPC_CLIENT_MAX_SEND_BYTES)
Blocks until the input channel is closed, context is canceled, or a permanent error occurs. Respects context cancellation for graceful shutdown.
Example:
in := make(chan engram.StreamMessage, 16)
out := make(chan engram.StreamMessage, 16)
go func() {
defer close(in)
in <- engram.StreamMessage{
Metadata: map[string]string{"trace-id": "abc123"},
Payload: []byte(`{"key": "value"}`),
Inputs: []byte(`{"configKey": "configValue"}`),
}
}()
go func() {
for msg := range out {
log.Printf("Received: %s (trace: %s)", msg.Payload, msg.Metadata["trace-id"])
}
}()
if err := sdk.StreamToWithMetadata(ctx, "downstream:50051", in, out); err != nil {
return fmt.Errorf("streaming failed: %w", err)
}
func WithLogger ¶
WithLogger stores a slog.Logger in the context for SDK use.
This allows you to inject a custom configured logger (e.g., with specific log levels, handlers, or structured attributes) that the SDK will use for all internal logging. If not provided, the SDK defaults to JSON logging to stdout.
Example:
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
ctx := sdk.WithLogger(context.Background(), logger)
sdk.StartBatch(ctx, myEngram)
The logger is used for lifecycle events (init, shutdown), errors, and metrics. It does not intercept engram-specific logs; engrams should use their own loggers or retrieve the SDK logger via LoggerFromContext within their execution context.
Types ¶
type K8sClient ¶
type K8sClient interface {
// TriggerStory creates a new StoryRun for the named Story with the provided inputs.
// The inputs map is marshaled to JSON and stored in the StoryRun's spec.inputs field.
// Returns the created StoryRun on success, or an error if creation fails.
// Respects context cancellation and deadlines.
TriggerStory(ctx context.Context, storyName string, inputs map[string]any) (*runsv1alpha1.StoryRun, error)
// PatchStepRunStatus updates the status of the named StepRun with the provided patch data.
// The implementation should use field-wise merging to avoid clobbering controller-managed
// fields and implement retry-on-conflict logic to handle concurrent updates.
// Respects context cancellation and deadlines.
PatchStepRunStatus(ctx context.Context, stepRunName string, patchData runsv1alpha1.StepRunStatus) error
}
K8sClient defines the interface for Kubernetes operations required by the SDK.
This interface abstracts the SDK's dependency on Kubernetes, enabling mocking in tests and providing a stable contract. Implementations must be safe for concurrent use by multiple goroutines.
The SDK's default implementation (k8s.Client) provides:
- Automatic namespace resolution from environment variables
- Retry-on-conflict logic for status patches
- Phase transition validation to prevent state corruption
- OpenTelemetry metrics for operation latency and success/failure
Custom implementations should follow the same concurrency and idempotency guarantees.
type StorageManager ¶
type StorageManager interface {
// Hydrate recursively scans a data structure for storage references and replaces
// them with the actual content from the storage backend. Returns the hydrated
// data on success, or an error if reading fails or a reference is invalid.
// Respects context cancellation and enforces BUBU_STORAGE_TIMEOUT.
Hydrate(ctx context.Context, data any) (any, error)
// Dehydrate recursively checks the size of a data structure. If any part exceeds
// the inline size limit (BUBU_MAX_INLINE_SIZE), it saves that part to the storage
// backend and replaces it with a storage reference. Returns the dehydrated data
// (potentially containing references) on success, or an error if writing fails.
// Respects context cancellation and enforces BUBU_STORAGE_TIMEOUT.
Dehydrate(ctx context.Context, data any, stepRunID string) (any, error)
}
StorageManager defines the interface for storage operations required by the SDK.
This interface provides transparent data offloading for large inputs and outputs, automatically handling marshaling, storage backend operations, and reference tracking. Implementations must be safe for concurrent use by multiple goroutines.
The SDK's default implementation (storage.Manager) provides:
- Automatic size-based offloading (configurable via BUBU_MAX_INLINE_SIZE)
- Recursive hydration/dehydration of nested structures
- Support for S3 and file storage backends
- Path traversal protection and validation
- OpenTelemetry metrics for operation latency and data sizes
Storage references use the format {"$bubuStorageRef": "outputs/steprun-id/path.json"}.
Directories
¶
| Path | Synopsis |
|---|---|
|
package engram defines the core interfaces that developers implement to create components for the bobrapet ecosystem.
|
package engram defines the core interfaces that developers implement to create components for the bobrapet ecosystem. |
|
pkg
|
|
|
metrics
Package metrics provides OpenTelemetry-based observability for the SDK and allows developers to register custom metrics for their engrams and impulses.
|
Package metrics provides OpenTelemetry-based observability for the SDK and allows developers to register custom metrics for their engrams and impulses. |