sdk

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2025 License: Apache-2.0 Imports: 36 Imported by: 0

README

🧰 bubu-sdk-go — Official Go SDK for bobrapet

Go Reference Go Report Card

The official Go SDK for building type-safe, production-grade components for bobrapet, the Kubernetes‑native AI and data workflow orchestration engine.

Quick links:

🌟 Key Features

Use this SDK to build engrams (data processing tasks) and impulses (event listeners that trigger workflows). bobrapet orchestrates their execution as Kubernetes Jobs and Deployments, handling:

  • Type-safe configuration and inputs — Define your interfaces as Go structs, get compile-time safety.
  • Automatic large payload handling — Outputs exceeding size limits are transparently offloaded to S3/file storage.
  • Streaming pipelines — Build real-time data processing chains with gRPC bidirectional streaming.
  • Retries and observability — Exit codes inform retry policies; OpenTelemetry metrics/tracing hooks included (initialize an exporter in your app/infra).

🏗️ Architecture

High-level SDK architecture, execution modes, and operator integration are documented here:

🧭 When to use which mode
  • Batch (Jobs): finite tasks with clear start/end; use StartBatch. Evidence: batch flow in batch.go.
  • Streaming (Deployments with gRPC): continuous processing with backpressure/heartbeats; use StartStreaming. Evidence: stream.go.
  • Impulse (Deployments): long‑running trigger that creates StoryRuns; use RunImpulse. Evidence: impulse.go.

🚀 Quick Start

Let's build a simple batch engram that greets users.

1. Create your Go module
mkdir hello-engram && cd hello-engram
go mod init github.com/yourusername/hello-engram
go get github.com/bubustack/bubu-sdk-go@latest
2. Write the code

Create a single main.go file with all the necessary components.

package main

import (
	"context"
	"fmt"
	"log"

	sdk "github.com/bubustack/bubu-sdk-go"
	"github.com/bubustack/bubu-sdk-go/engram"
)

// Config holds static configuration from the Engram resource 'with' block.
type Config struct {
	DefaultGreeting string `mapstructure:"defaultGreeting"`
}

// Inputs holds runtime data passed to this execution via StepRun inputs.
type Inputs struct {
	Name string `mapstructure:"name"`
}

// GreeterEngram implements the engram.Batch interface.
type GreeterEngram struct {
	greeting string
}

// NewGreeter creates a new GreeterEngram.
func NewGreeter() *GreeterEngram {
	return &GreeterEngram{}
}

// Init is called once when the engram starts.
func (g *GreeterEngram) Init(ctx context.Context, cfg Config, secrets *engram.Secrets) error {
	g.greeting = "Hello"
	if cfg.DefaultGreeting != "" {
		g.greeting = cfg.DefaultGreeting
	}
	return nil
}

// Process is the core logic. It receives typed inputs and returns a Result.
func (g *GreeterEngram) Process(ctx context.Context, ec *engram.ExecutionContext, inputs Inputs) (*engram.Result, error) {
	if inputs.Name == "" {
		return nil, fmt.Errorf("input 'name' is required")
	}
	message := fmt.Sprintf("%s, %s!", g.greeting, inputs.Name)
	return engram.NewResultFrom(map[string]any{
		"greeting": message,
	})
}

func main() {
	if err := sdk.StartBatch(context.Background(), NewGreeter()); err != nil {
		log.Fatalf("Engram failed: %v", err)
	}
}
3. Build the binary
go build -o hello-engram .
4. Containerize and deploy

Create a Dockerfile:

# Use a smaller base image
FROM golang:1.24-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
# Build a static binary
RUN CGO_ENABLED=0 go build -o /hello-engram .

# Final stage
FROM alpine:latest
# Add non-root user
RUN addgroup -S app && adduser -S app -G app
USER app
# Copy binary and certificates
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=builder /hello-engram /hello-engram
ENTRYPOINT ["/hello-engram"]

Build and push the image to your container registry:

docker build -t myregistry.io/hello-engram:latest .
docker push myregistry.io/hello-engram:latest
5. Deploy to Kubernetes

Create an Engram resource (hello-engram.yaml):

apiVersion: bubustack.io/v1alpha1
kind: Engram
metadata:
  name: hello-engram
spec:
  image: myregistry.io/hello-engram:latest
  with:
    defaultGreeting: "Greetings"

Create a Story that uses the engram (greet-story.yaml):

apiVersion: bubustack.io/v1alpha1
kind: Story
metadata:
  name: greet-users
spec:
  steps:
    - name: greet
      engram: hello-engram
      with:
        name: "{{ .inputs.userName }}"

Apply the resources and trigger a StoryRun:

kubectl apply -f hello-engram.yaml -f greet-story.yaml

kubectl create -f - <<EOF
apiVersion: bubustack.io/v1alpha1
kind: StoryRun
metadata:
  generateName: greet-users-
spec:
  storyRef:
    name: greet-users
  inputs:
    userName: "Bob"
EOF

📚 Core concepts

Engrams

Engrams are the building blocks of workflows. They are stateless, single-purpose components that execute a specific task.

Type Use Case Kubernetes Workload
BatchEngram Data transformations, API calls, ETL tasks Job
StreamingEngram Real-time data processing, filtering, routing Deployment (gRPC server)
Impulses

Impulses are long-running services that listen for external events (webhooks, message queues, schedulers) and trigger StoryRuns.

Component Role Kubernetes Workload
Impulse Event listener → Story trigger Deployment

🌟 Key features

Type-Safe and Generic

Define your configuration and inputs as native Go structs. The SDK handles deserialization and validation:

type Config struct {
    APIKey string `mapstructure:"apiKey"`
    Timeout time.Duration `mapstructure:"timeout"` // Supports duration parsing
}
Transparent Storage Offloading

Large outputs are automatically offloaded to S3 or file storage, keeping Kubernetes resources lean:

return &sdk.Result{
    Data: map[string]any{
        "largePayload": someLargeData, // Automatically offloaded if > BUBU_MAX_INLINE_SIZE
    },
}, nil
Streaming Pipelines

Build real-time data processing chains with metadata propagation for tracing:

func (s *Streamer) Stream(ctx context.Context, in <-chan engram.StreamMessage, out chan<- engram.StreamMessage) error {
    for msg := range in {
        // Process msg.Payload, propagate msg.Metadata for tracing
        out <- engram.StreamMessage{
            Metadata: msg.Metadata, // Preserve trace IDs
            Payload: processedData,
        }
    }
    return nil
}
Retries and Exit Codes

The SDK patches StepRun status with exit codes that inform the operator's retry policy:

  • 0: Success
  • 1: Logic error (terminal, no retry)
  • 124: Timeout (retryable)

📚 Documentation


✅ Support matrix

Component Version
Go 1.24+
Kubernetes 1.28+ (bobrapet operator compatibility)
bobrapet operator v0.1.0+

⚙️ Environment variables

The SDK is controlled entirely by environment variables injected by the bobrapet operator. See docs/reference/config for the complete reference.

Key variables:

  • BUBU_STEP_TIMEOUT — Batch execution timeout (default: 30m)
  • BUBU_STORAGE_PROVIDER — Storage backend: s3, file, or unset (disabled)
  • BUBU_MAX_INLINE_SIZE — Offload threshold in bytes (default: 1024)
  • BUBU_GRPC_PORT — gRPC server port for streaming engrams (default: 50051)
  • BUBU_EXECUTION_MODE — Set by operator: batch | streaming (evidence in controllers)
  • BUBU_STORAGE_PATH — Required when BUBU_STORAGE_PROVIDER=file; base directory for file store
  • BUBU_MAX_RECURSION_DEPTH — Max traversal depth for hydrate/dehydrate (default: 10)
  • BUBU_STORAGE_TIMEOUT — Timeout for storage operations (default: 5m)

🛠️ Local Development

  1. Clone the repository:

    git clone https://github.com/bubustack/bubu-sdk-go.git
    cd bubu-sdk-go
    
  2. Run tests:

    make test
    
  3. Lint:

    make lint
    
  4. Run all checks:

    make all
    

📢 Support, Security, and Changelog

  • See SUPPORT.md for how to get help and report issues.
  • See SECURITY.md for vulnerability reporting and security posture.
  • See CHANGELOG.md for version history.

🤝 Community

📄 License

Copyright 2025 BubuStack.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Documentation

Overview

Package sdk provides the primary entry points for executing bobrapet components.

This package contains the runtime logic that bootstraps an Engram or Impulse, injects the necessary context from the environment, and manages its lifecycle. Developers typically interact with StartBatch, StartStreaming, or RunImpulse from their main.go file.

Entry Points

For batch engrams (Jobs):

sdk.StartBatch(ctx, myEngram)

For streaming engrams (Deployments with gRPC):

sdk.StartStreaming(ctx, myStreamingEngram)

For impulses (Deployments that trigger Stories):

sdk.RunImpulse(ctx, myImpulse)

Environment-Driven Configuration

The SDK is controlled entirely by environment variables injected by the bobrapet operator. SDK defaults are fallback values for local development. See docs/reference/config.md for the complete environment variable reference.

Concurrency and Cancellation

All entry points respect context cancellation. Batch engrams enforce a timeout via BUBU_STEP_TIMEOUT. Streaming engrams implement graceful shutdown on SIGTERM with configurable drain timeouts via BUBU_GRPC_GRACEFUL_SHUTDOWN_TIMEOUT.

Error Handling

Entry points return errors for initialization and execution failures. Batch engrams additionally patch StepRun status with exit codes for operator retry policy classification (exit code 124 for timeouts, 1 for logic errors, 0 for success).

Index

Constants

View Source
const (
	// DefaultChannelBufferSize is the buffer size for gRPC streaming channels.
	//
	// A buffer of 16 provides reasonable throughput while limiting memory usage.
	// Override via BUBU_GRPC_CHANNEL_BUFFER_SIZE for workloads with different
	// latency/throughput profiles.
	DefaultChannelBufferSize = 16

	// DefaultGRPCPort is the default port for gRPC servers in streaming mode.
	//
	// Override via BUBU_GRPC_PORT. The operator typically sets this to 50051.
	DefaultGRPCPort = "50051"

	// DefaultMessageTimeout is the default timeout for individual message operations.
	//
	// Prevents indefinite hangs on network stalls. Override via BUBU_GRPC_MESSAGE_TIMEOUT.
	DefaultMessageTimeout = 30 * time.Second

	// DefaultMaxMessageSize is the default max message size for gRPC (10 MiB).
	//
	// Override via BUBU_GRPC_MAX_RECV_BYTES and BUBU_GRPC_MAX_SEND_BYTES.
	// Larger messages should use storage offloading instead of increasing this limit.
	DefaultMaxMessageSize = 10 * 1024 * 1024

	// Client buffer defaults (bounded)
	DefaultClientBufferMaxMessages = 100
	DefaultClientBufferMaxBytes    = 10 * 1024 * 1024 // 10 MiB
)

Variables

This section is empty.

Functions

func LoggerFromContext

func LoggerFromContext(ctx context.Context) *slog.Logger

LoggerFromContext retrieves a slog.Logger from the context, or returns a default JSON logger.

If no logger was previously stored via WithLogger, this function returns a new JSON logger writing to stdout with default settings. This ensures the SDK always has a valid logger without requiring explicit configuration for simple use cases.

Thread-safe and idempotent.

func RunBatch

func RunBatch[C any, I any](ctx context.Context, e engram.BatchEngram[C, I]) error

RunBatch is the primary entry point for a BatchEngram. It provides a fully type-safe execution environment, handling all the boilerplate of context loading, data hydration, and status patching.

func RunImpulse

func RunImpulse[C any](ctx context.Context, i engram.Impulse[C]) error

RunImpulse is the type-safe entry point for impulses (Kubernetes Deployments that trigger Stories).

This function infers config type C from the impulse implementation, providing compile-time type safety. It orchestrates the complete lifecycle:

  1. Load execution context from environment (BUBU_CONFIG, BUBU_IMPULSE_WITH, etc.)
  2. Merge BUBU_IMPULSE_WITH JSON into config if provided (for operator injection)
  3. Unmarshal config into type C
  4. Call impulse.Init with typed config and secrets
  5. Create pre-configured Kubernetes client with namespace resolution
  6. Call impulse.Run with client, transferring control to long-running process

The impulse's Run method should block until work completes or context is canceled. Typical use cases: webhook listeners, message queue consumers, schedulers, event watchers.

Respects context cancellation for graceful shutdown on SIGTERM. The impulse is responsible for handling shutdown signals within its Run implementation (e.g., draining in-flight requests).

Example:

type MyConfig struct {
    WebhookPort int    `mapstructure:"webhookPort"`
    SecretToken string `mapstructure:"secretToken"`
}

type MyImpulse struct { /* ... */ }

func (m *MyImpulse) Init(ctx context.Context, cfg MyConfig, secrets *engram.Secrets) error {
    // Setup webhook server, validate token, etc.
    return nil
}

func (m *MyImpulse) Run(ctx context.Context, client *k8s.Client) error {
    // Listen for webhooks, trigger stories via client.TriggerStory(...)
    <-ctx.Done()
    return ctx.Err()
}

func main() {
    if err := sdk.RunImpulse(context.Background(), &MyImpulse{}); err != nil {
        panic(err)
    }
}

func StartBatch

func StartBatch[C any, I any](ctx context.Context, e engram.BatchEngram[C, I]) error

StartBatch is the type-safe entry point for batch engrams (Kubernetes Jobs).

This function infers both config type C and input type I from the engram implementation, providing full compile-time type safety. It orchestrates the complete lifecycle:

  1. Load execution context from environment (BUBU_CONFIG, BUBU_INPUTS, etc.)
  2. Unmarshal config and inputs into types C and I
  3. Call engram.Init with typed config and secrets
  4. Hydrate inputs from storage if needed
  5. Call engram.Process with typed inputs and execution context
  6. Dehydrate outputs to storage if they exceed size limits
  7. Patch StepRun status with result, timing, and exit code

Enforces timeout via BUBU_STEP_TIMEOUT with context cancellation. On timeout, patches status with exit code 124 (retryable) and forcefully exits to prevent zombie Jobs. On logic errors, patches with exit code 1 (terminal). On success, patches with exit code 0.

Example:

type MyConfig struct { APIKey string `mapstructure:"apiKey"` }
type MyInputs struct { UserID string `mapstructure:"userId"` }

func main() {
    ctx := context.Background()
    if err := sdk.StartBatch(ctx, NewMyEngram()); err != nil {
        panic(err)  // Ensure non-zero exit for Job failure detection
    }
}

func StartStory

func StartStory(ctx context.Context, storyName string, inputs map[string]any) (*runsv1alpha1.StoryRun, error)

StartStory triggers a new StoryRun for the named Story with the provided inputs.

This is the primary mechanism for programmatically initiating workflows, typically used from within an Impulse. The SDK automatically resolves the correct namespace from environment variables (BUBU_TARGET_STORY_NAMESPACE or fallbacks), creates a Kubernetes client, and submits the StoryRun resource.

Inputs are marshaled to JSON and stored in the StoryRun spec. The operator watches for new StoryRuns and orchestrates their execution.

Returns the created StoryRun on success, or an error if client creation or StoryRun creation fails. Respects context cancellation and deadlines.

Example:

sr, err := sdk.StartStory(ctx, "my-workflow", map[string]any{
    "userId": "12345",
    "action": "process",
})
if err != nil {
    return fmt.Errorf("failed to trigger story: %w", err)
}
log.Printf("Triggered StoryRun: %s", sr.Name)

func StartStreamServer

func StartStreamServer[C any](ctx context.Context, e engram.StreamingEngram[C]) error

StartStreamServer is the main entry point for a StreamingEngram. This function bootstraps a long-running service that can process data in real-time over gRPC.

This function orchestrates the lifecycle of a streaming service:

  1. It loads the execution context for configuration and secrets.
  2. It calls the StreamingEngram's `Init` method.
  3. It starts a gRPC server on the configured port.
  4. It registers the StreamingEngram's `Stream` method as the gRPC handler.
  5. It gracefully handles server shutdown on context cancellation.

Streaming Delivery Guarantees

The SDK provides reliable message delivery for direct engram-to-engram connections (peer-to-peer mode). In hub-and-spoke mode (primitives between streaming engrams), the Hub may drop messages if downstream engrams are not ready at the time of forwarding. For production use cases requiring guaranteed delivery:

  • Use peer-to-peer mode (avoid primitives between streaming engrams), OR
  • Implement application-level acknowledgment and retry in your engram logic, OR
  • Wait for Hub buffering support (tracked in bobravoz-grpc roadmap)

func StartStreaming

func StartStreaming[C any](ctx context.Context, e engram.StreamingEngram[C]) error

StartStreaming is the type-safe entry point for streaming engrams (Kubernetes Deployments with gRPC).

This function infers config type C from the engram implementation, providing compile-time type safety. It orchestrates the complete lifecycle:

  1. Load execution context from environment (BUBU_CONFIG, etc.)
  2. Unmarshal config into type C
  3. Call engram.Init with typed config and secrets
  4. Start gRPC server on BUBU_GRPC_PORT (default 50051)
  5. Register engram.Stream as the bidirectional streaming handler
  6. Serve until context cancellation (SIGTERM) or error
  7. Gracefully drain active streams before shutdown

The gRPC server implements:

  • Transparent heartbeat sending/filtering to detect connection hangs
  • Backpressure handling with configurable timeouts
  • Graceful shutdown with BUBU_GRPC_GRACEFUL_SHUTDOWN_TIMEOUT drain phase
  • Optional TLS via BUBU_GRPC_TLS_CERT_FILE and BUBU_GRPC_TLS_KEY_FILE
  • Configurable message size limits via BUBU_GRPC_MAX_RECV_BYTES and BUBU_GRPC_MAX_SEND_BYTES

Example:

type MyConfig struct { BufferSize int `mapstructure:"bufferSize"` }

func main() {
    ctx := context.Background()
    if err := sdk.StartStreaming(ctx, NewMyStreamingEngram()); err != nil {
        panic(err)
    }
}

func StreamTo

func StreamTo(
	ctx context.Context,
	target string,
	in <-chan []byte,
	out chan<- []byte,
) error

StreamTo connects to a downstream gRPC server and streams data to it (client side).

This function provides a simplified []byte channel API for backward compatibility. It wraps StreamToWithMetadata, converting []byte channels to StreamMessage channels with empty metadata and inputs fields.

For new code that requires tracing, correlation, or dynamic per-message configuration, use StreamToWithMetadata directly.

The function implements:

  • Automatic reconnection on transient failures (configurable via BUBU_GRPC_RECONNECT_MAX_RETRIES)
  • Exponential backoff with jitter (base/max configurable via env)
  • Transparent heartbeat sending/receiving to detect connection hangs
  • Optional TLS via BUBU_GRPC_CA_FILE or BUBU_GRPC_CLIENT_TLS=true
  • Backpressure handling with configurable timeouts

Blocks until the input channel is closed, context is canceled, or a permanent error occurs. Respects context cancellation for graceful shutdown.

Example:

in := make(chan []byte, 16)
out := make(chan []byte, 16)

go func() {
    defer close(in)
    in <- []byte(`{"key": "value"}`)
}()

go func() {
    for msg := range out {
        log.Printf("Received: %s", msg)
    }
}()

if err := sdk.StreamTo(ctx, "downstream-service:50051", in, out); err != nil {
    return fmt.Errorf("streaming failed: %w", err)
}

func StreamToWithMetadata

func StreamToWithMetadata(
	ctx context.Context,
	target string,
	in <-chan engram.StreamMessage,
	out chan<- engram.StreamMessage,
) error

StreamToWithMetadata connects to a downstream gRPC server with full metadata and inputs support (client side).

This function provides the full StreamMessage API, enabling:

  • Metadata propagation for tracing (StoryRunID, StepName, custom trace IDs)
  • Per-message dynamic configuration via the Inputs field (analogous to BUBU_INPUTS in batch mode)
  • End-to-end correlation across streaming pipeline steps

The SDK automatically injects Hub metadata (storyrun-name, storyrun-namespace, current-step-id) from the execution context if available, enabling interop with the bobravoz Hub.

The function implements:

  • Automatic reconnection on transient failures (Unavailable, ResourceExhausted, Aborted, DeadlineExceeded)
  • Exponential backoff with jitter (configurable via BUBU_GRPC_RECONNECT_BASE_BACKOFF and _MAX_BACKOFF)
  • Transparent heartbeat sending/filtering to detect connection hangs (BUBU_GRPC_HANG_TIMEOUT)
  • Optional TLS via BUBU_GRPC_CA_FILE (custom CA) or BUBU_GRPC_CLIENT_TLS=true (system roots)
  • Backpressure handling with timeouts (BUBU_GRPC_CHANNEL_SEND_TIMEOUT or BUBU_GRPC_MESSAGE_TIMEOUT)
  • Configurable message size limits (BUBU_GRPC_CLIENT_MAX_RECV_BYTES, BUBU_GRPC_CLIENT_MAX_SEND_BYTES)

Blocks until the input channel is closed, context is canceled, or a permanent error occurs. Respects context cancellation for graceful shutdown.

Example:

in := make(chan engram.StreamMessage, 16)
out := make(chan engram.StreamMessage, 16)

go func() {
    defer close(in)
    in <- engram.StreamMessage{
        Metadata: map[string]string{"trace-id": "abc123"},
        Payload:  []byte(`{"key": "value"}`),
        Inputs:   []byte(`{"configKey": "configValue"}`),
    }
}()

go func() {
    for msg := range out {
        log.Printf("Received: %s (trace: %s)", msg.Payload, msg.Metadata["trace-id"])
    }
}()

if err := sdk.StreamToWithMetadata(ctx, "downstream:50051", in, out); err != nil {
    return fmt.Errorf("streaming failed: %w", err)
}

func WithLogger

func WithLogger(ctx context.Context, logger *slog.Logger) context.Context

WithLogger stores a slog.Logger in the context for SDK use.

This allows you to inject a custom configured logger (e.g., with specific log levels, handlers, or structured attributes) that the SDK will use for all internal logging. If not provided, the SDK defaults to JSON logging to stdout.

Example:

logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
ctx := sdk.WithLogger(context.Background(), logger)
sdk.StartBatch(ctx, myEngram)

The logger is used for lifecycle events (init, shutdown), errors, and metrics. It does not intercept engram-specific logs; engrams should use their own loggers or retrieve the SDK logger via LoggerFromContext within their execution context.

Types

type K8sClient

type K8sClient interface {
	// TriggerStory creates a new StoryRun for the named Story with the provided inputs.
	// The inputs map is marshaled to JSON and stored in the StoryRun's spec.inputs field.
	// Returns the created StoryRun on success, or an error if creation fails.
	// Respects context cancellation and deadlines.
	TriggerStory(ctx context.Context, storyName string, inputs map[string]any) (*runsv1alpha1.StoryRun, error)

	// PatchStepRunStatus updates the status of the named StepRun with the provided patch data.
	// The implementation should use field-wise merging to avoid clobbering controller-managed
	// fields and implement retry-on-conflict logic to handle concurrent updates.
	// Respects context cancellation and deadlines.
	PatchStepRunStatus(ctx context.Context, stepRunName string, patchData runsv1alpha1.StepRunStatus) error
}

K8sClient defines the interface for Kubernetes operations required by the SDK.

This interface abstracts the SDK's dependency on Kubernetes, enabling mocking in tests and providing a stable contract. Implementations must be safe for concurrent use by multiple goroutines.

The SDK's default implementation (k8s.Client) provides:

  • Automatic namespace resolution from environment variables
  • Retry-on-conflict logic for status patches
  • Phase transition validation to prevent state corruption
  • OpenTelemetry metrics for operation latency and success/failure

Custom implementations should follow the same concurrency and idempotency guarantees.

type StorageManager

type StorageManager interface {
	// Hydrate recursively scans a data structure for storage references and replaces
	// them with the actual content from the storage backend. Returns the hydrated
	// data on success, or an error if reading fails or a reference is invalid.
	// Respects context cancellation and enforces BUBU_STORAGE_TIMEOUT.
	Hydrate(ctx context.Context, data any) (any, error)

	// Dehydrate recursively checks the size of a data structure. If any part exceeds
	// the inline size limit (BUBU_MAX_INLINE_SIZE), it saves that part to the storage
	// backend and replaces it with a storage reference. Returns the dehydrated data
	// (potentially containing references) on success, or an error if writing fails.
	// Respects context cancellation and enforces BUBU_STORAGE_TIMEOUT.
	Dehydrate(ctx context.Context, data any, stepRunID string) (any, error)
}

StorageManager defines the interface for storage operations required by the SDK.

This interface provides transparent data offloading for large inputs and outputs, automatically handling marshaling, storage backend operations, and reference tracking. Implementations must be safe for concurrent use by multiple goroutines.

The SDK's default implementation (storage.Manager) provides:

  • Automatic size-based offloading (configurable via BUBU_MAX_INLINE_SIZE)
  • Recursive hydration/dehydration of nested structures
  • Support for S3 and file storage backends
  • Path traversal protection and validation
  • OpenTelemetry metrics for operation latency and data sizes

Storage references use the format {"$bubuStorageRef": "outputs/steprun-id/path.json"}.

Directories

Path Synopsis
package engram defines the core interfaces that developers implement to create components for the bobrapet ecosystem.
package engram defines the core interfaces that developers implement to create components for the bobrapet ecosystem.
pkg
metrics
Package metrics provides OpenTelemetry-based observability for the SDK and allows developers to register custom metrics for their engrams and impulses.
Package metrics provides OpenTelemetry-based observability for the SDK and allows developers to register custom metrics for their engrams and impulses.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL