hub

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2026 License: Apache-2.0 Imports: 73 Imported by: 0

Documentation

Overview

materialize.go handles runtime template materialization for streaming packets.

Materialization flow:

  1. A packet's template references are detected (e.g. offloaded outputs).
  2. A materialize StepRun is created to resolve the template via a dedicated engram.
  3. The resolved result is unwrapped and routed to the target step.

The main entry points are routeToMaterialize (request path) and handleMaterializeResult (response path). Helper functions detect offloaded storage refs in step output expressions and build the intermediate request structures. The detectOffloadedOutputRefs function walks template expressions to find step output references that point to storage-backed data, which triggers the materialization detour.

Index

Constants

View Source
const (
	DefaultHubTLSDir  = "/var/run/hub-tls"
	DefaultHubTLSCert = DefaultHubTLSDir + "/tls.crt"
	DefaultHubTLSKey  = DefaultHubTLSDir + "/tls.key"
	DefaultHubTLSCA   = DefaultHubTLSDir + "/ca.crt"
)

Variables

View Source
var (
	// MaxBufferSize is the maximum number of messages to buffer per downstream engram
	MaxBufferSize = getMaxBufferSize()
	// MaxBufferBytes is the maximum total size of buffered messages in bytes
	MaxBufferBytes = getMaxBufferBytes()
)
View Source
var EvictionInterval time.Duration

EvictionInterval is the interval at which the hub evicts idle message buffers. Initialized in init() so that tests can override the environment before import.

Functions

This section is empty.

Types

type HandoffCallback added in v0.3.0

type HandoffCallback func(storyRunName, storyRunNamespace, stepID, phase, reason string)

HandoffCallback is invoked asynchronously when a stream's handoff phase changes.

type MessageBuffer

type MessageBuffer struct {
	// contains filtered or unexported fields
}

MessageBuffer holds buffered messages for a downstream engram that's not ready

func NewMessageBuffer

func NewMessageBuffer(storyRunName, stepName string) *MessageBuffer

NewMessageBuffer creates a new message buffer

func NewMessageBufferWithLimits added in v0.3.0

func NewMessageBufferWithLimits(storyRunName, stepName string, limits bufferLimits) *MessageBuffer

NewMessageBufferWithLimits creates a new message buffer with explicit limits.

func (*MessageBuffer) Add

func (mb *MessageBuffer) Add(msg *transportpb.DataPacket) bool

Add attempts to add a message to the buffer. Returns true if added, false if buffer is full.

func (*MessageBuffer) ApplyDeadline added in v0.3.0

func (mb *MessageBuffer) ApplyDeadline(deadline time.Time)

ApplyDeadline ensures the buffer has an expiration deadline; the earliest deadline wins.

func (*MessageBuffer) ApplyLimits added in v0.3.0

func (mb *MessageBuffer) ApplyLimits(limits bufferLimits)

ApplyLimits updates the buffer limits in-place.

func (*MessageBuffer) ApplyRetryPolicy added in v0.3.0

func (mb *MessageBuffer) ApplyRetryPolicy(policy *bubuv1alpha1.RetryPolicy, maxDelay time.Duration) bool

ApplyRetryPolicy sets retry behavior for this buffer. Returns false when retries are disabled.

func (*MessageBuffer) DropAll added in v0.3.0

func (mb *MessageBuffer) DropAll(reason string)

DropAll removes all buffered messages and records them as dropped.

func (*MessageBuffer) DroppedCount

func (mb *MessageBuffer) DroppedCount() int64

DroppedCount returns the total number of dropped messages

func (*MessageBuffer) FlushWithSender

func (mb *MessageBuffer) FlushWithSender(ctx context.Context, send func(*transportpb.DataPacket) error) (int, error)

FlushWithSender flushes buffered messages using the provided sender function. The sender function should perform the actual send (e.g., stream.Send) and handle any necessary serialization outside this method.

func (*MessageBuffer) FlushWithSenderAndPolicy added in v0.3.0

func (mb *MessageBuffer) FlushWithSenderAndPolicy(ctx context.Context, send func(*transportpb.DataPacket) error, allow func(*transportpb.DataPacket) bool) (int, error)

FlushWithSenderAndPolicy flushes buffered messages using the provided sender function and an optional allow predicate to honor flow-control decisions. The lock is released during sends to prevent head-of-line blocking when a consumer is slow.

func (*MessageBuffer) LastActive

func (mb *MessageBuffer) LastActive() time.Duration

LastActive returns the duration since the last buffer activity (add/flush) using monotonic time to avoid issues with system clock adjustments

func (*MessageBuffer) RecordFlushResult added in v0.3.0

func (mb *MessageBuffer) RecordFlushResult(err error, base, max time.Duration)

RecordFlushResult updates retry tracking based on the last flush attempt.

func (*MessageBuffer) ShouldRetry added in v0.3.0

func (mb *MessageBuffer) ShouldRetry(now time.Time) bool

ShouldRetry returns true if the buffer has messages and the retry window has elapsed.

func (*MessageBuffer) Size

func (mb *MessageBuffer) Size() int

Size returns the current number of buffered messages

func (*MessageBuffer) Utilization added in v0.3.0

func (mb *MessageBuffer) Utilization() float64

Utilization returns the max of message-count and byte-capacity utilization (0..1).

type Server

type Server struct {
	transportpb.UnimplementedHubServiceServer
	// contains filtered or unexported fields
}

Server is the gRPC hub server.

func NewServer

func NewServer(ctx context.Context, k8sClient client.Client, templateCfg templating.Config, offloadedPolicy, materializeEngram string) (*Server, error)

NewServer creates a new hub server.

func (*Server) Close added in v0.3.0

func (s *Server) Close()

Close releases long-lived resources owned by the Server.

func (*Server) Process

func (s *Server) Process(stream transportpb.HubService_ProcessServer) error

Process is the bidirectional streaming RPC for the hub.

func (*Server) Start

func (s *Server) Start(ctx context.Context, port int) error

Start starts the gRPC server. The caller is responsible for calling Close() after Start() returns to release resources.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream represents a single client stream.

func (*Stream) Close added in v0.3.0

func (s *Stream) Close()

Close signals the send loop to exit and waits for it to drain.

func (*Stream) Send

func (s *Stream) Send(ctx context.Context, req *transportpb.DataPacket) error

Send sends a packet to the stream.

func (*Stream) SendFlow added in v0.3.0

func (s *Stream) SendFlow(ctx context.Context, flow *transportpb.FlowControl) error

SendFlow sends a flow-control update to the stream.

type StreamManager

type StreamManager struct {
	// contains filtered or unexported fields
}

StreamManager manages all active client streams.

func NewStreamManager

func NewStreamManager(store *storage.StorageManager) *StreamManager

NewStreamManager creates a new StreamManager.

func (*StreamManager) AddStream

func (sm *StreamManager) AddStream(ctx context.Context, storyRunName, storyRunNamespace, stepID string, grpcStream transportpb.HubService_ProcessServer, connectorGen ...int32) *streamEntry

AddStream adds a new stream to the manager and returns its entry handle. When a positive connector generation higher than the current active stream arrives, it is stored as a shadow (green) stream for blue-green handoff instead of replacing the active one.

func (*StreamManager) ApplyFlow added in v0.3.0

func (sm *StreamManager) ApplyFlow(storyRunName, storyRunNamespace, stepID string, flow *transportpb.FlowControl)

ApplyFlow updates flow-control and ack state based on connector feedback.

func (*StreamManager) CutoverShadow added in v0.3.0

func (sm *StreamManager) CutoverShadow(storyRunName, storyRunNamespace, stepID string) bool

CutoverShadow atomically promotes the shadow (green) stream to active and signals the old (blue) stream to drain. Returns true if cutover succeeded.

func (*StreamManager) HasShadow added in v0.3.0

func (sm *StreamManager) HasShadow(storyRunName, storyRunNamespace, stepID string) bool

HasShadow reports whether a shadow stream exists for the given step key.

func (*StreamManager) HasStream added in v0.3.0

func (sm *StreamManager) HasStream(storyRunName, storyRunNamespace, stepID string) bool

HasStream reports whether a stream is currently registered for the StepRun key.

func (*StreamManager) RemoveStream

func (sm *StreamManager) RemoveStream(storyRunName, storyRunNamespace, stepID string, entry *streamEntry)

RemoveStream removes a stream from the manager when the supplied entry still matches. It also handles shadow stream removal for blue-green handoff.

func (*StreamManager) SendHeartbeats

func (sm *StreamManager) SendHeartbeats(ctx context.Context) error

SendHeartbeats sends a heartbeat to all active streams.

func (*StreamManager) SendOrBuffer

func (sm *StreamManager) SendOrBuffer(ctx context.Context, storyRunName, storyRunNamespace, stepID string, packet *transportpb.DataPacket) bool

SendOrBuffer tries to send a packet to a stream, or buffers it if the stream is not yet available.

func (*StreamManager) SendOrBufferWithOptions added in v0.3.0

func (sm *StreamManager) SendOrBufferWithOptions(ctx context.Context, storyRunName, storyRunNamespace, stepID string, packet *transportpb.DataPacket, opts StreamOptions) bool

SendOrBufferWithOptions applies retry, buffering, flow-control, and delivery policies.

func (*StreamManager) SendOrBufferWithPolicy added in v0.3.0

func (sm *StreamManager) SendOrBufferWithPolicy(ctx context.Context, storyRunName, storyRunNamespace, stepID string, packet *transportpb.DataPacket, policy *bubuv1alpha1.RetryPolicy) bool

SendOrBufferWithPolicy applies retry policy when buffering or retrying.

func (*StreamManager) SendOrBufferWithPolicyAndLimits added in v0.3.0

func (sm *StreamManager) SendOrBufferWithPolicyAndLimits(ctx context.Context, storyRunName, storyRunNamespace, stepID string, packet *transportpb.DataPacket, policy *bubuv1alpha1.RetryPolicy, limits bufferLimits) bool

SendOrBufferWithPolicyAndLimits applies retry policy and buffer limits when buffering or retrying.

func (*StreamManager) SetOnHandoffChange added in v0.3.0

func (sm *StreamManager) SetOnHandoffChange(cb HandoffCallback)

SetOnHandoffChange registers a callback invoked whenever a stream's handoff phase transitions. Must be called before any streams are added.

func (*StreamManager) StartEvictor

func (sm *StreamManager) StartEvictor(ctx context.Context)

StartEvictor starts a background goroutine to flush retryable buffers and evict old, unused ones.

type StreamOptions added in v0.3.0

type StreamOptions struct {
	RetryPolicy  *bubuv1alpha1.RetryPolicy
	BufferLimits bufferLimits
	Flow         flowControlPolicy
	Delivery     deliveryPolicy
	MaxInFlight  int
	// DrainTimeout caps how long buffered messages may live when waiting for reconnect/cutover.
	DrainTimeout time.Duration
	// DrainEnabled gates whether DrainTimeout is applied for stream-not-found/retry scenarios.
	DrainEnabled bool
}

StreamOptions captures per-stream delivery and flow-control settings.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL