Documentation
¶
Overview ¶
materialize.go handles runtime template materialization for streaming packets.
Materialization flow:
- A packet's template references are detected (e.g. offloaded outputs).
- A materialize StepRun is created to resolve the template via a dedicated engram.
- The resolved result is unwrapped and routed to the target step.
The main entry points are routeToMaterialize (request path) and handleMaterializeResult (response path). Helper functions detect offloaded storage refs in step output expressions and build the intermediate request structures. The detectOffloadedOutputRefs function walks template expressions to find step output references that point to storage-backed data, which triggers the materialization detour.
Index ¶
- Constants
- Variables
- type HandoffCallback
- type MessageBuffer
- func (mb *MessageBuffer) Add(msg *transportpb.DataPacket) bool
- func (mb *MessageBuffer) ApplyDeadline(deadline time.Time)
- func (mb *MessageBuffer) ApplyLimits(limits bufferLimits)
- func (mb *MessageBuffer) ApplyRetryPolicy(policy *bubuv1alpha1.RetryPolicy, maxDelay time.Duration) bool
- func (mb *MessageBuffer) DropAll(reason string)
- func (mb *MessageBuffer) DroppedCount() int64
- func (mb *MessageBuffer) FlushWithSender(ctx context.Context, send func(*transportpb.DataPacket) error) (int, error)
- func (mb *MessageBuffer) FlushWithSenderAndPolicy(ctx context.Context, send func(*transportpb.DataPacket) error, ...) (int, error)
- func (mb *MessageBuffer) LastActive() time.Duration
- func (mb *MessageBuffer) RecordFlushResult(err error, base, max time.Duration)
- func (mb *MessageBuffer) ShouldRetry(now time.Time) bool
- func (mb *MessageBuffer) Size() int
- func (mb *MessageBuffer) Utilization() float64
- type Server
- type Stream
- type StreamManager
- func (sm *StreamManager) AddStream(ctx context.Context, storyRunName, storyRunNamespace, stepID string, ...) *streamEntry
- func (sm *StreamManager) ApplyFlow(storyRunName, storyRunNamespace, stepID string, flow *transportpb.FlowControl)
- func (sm *StreamManager) CutoverShadow(storyRunName, storyRunNamespace, stepID string) bool
- func (sm *StreamManager) HasShadow(storyRunName, storyRunNamespace, stepID string) bool
- func (sm *StreamManager) HasStream(storyRunName, storyRunNamespace, stepID string) bool
- func (sm *StreamManager) RemoveStream(storyRunName, storyRunNamespace, stepID string, entry *streamEntry)
- func (sm *StreamManager) SendHeartbeats(ctx context.Context) error
- func (sm *StreamManager) SendOrBuffer(ctx context.Context, storyRunName, storyRunNamespace, stepID string, ...) bool
- func (sm *StreamManager) SendOrBufferWithOptions(ctx context.Context, storyRunName, storyRunNamespace, stepID string, ...) bool
- func (sm *StreamManager) SendOrBufferWithPolicy(ctx context.Context, storyRunName, storyRunNamespace, stepID string, ...) bool
- func (sm *StreamManager) SendOrBufferWithPolicyAndLimits(ctx context.Context, storyRunName, storyRunNamespace, stepID string, ...) bool
- func (sm *StreamManager) SetOnHandoffChange(cb HandoffCallback)
- func (sm *StreamManager) StartEvictor(ctx context.Context)
- type StreamOptions
Constants ¶
const ( DefaultHubTLSDir = "/var/run/hub-tls" DefaultHubTLSCert = DefaultHubTLSDir + "/tls.crt" DefaultHubTLSKey = DefaultHubTLSDir + "/tls.key" DefaultHubTLSCA = DefaultHubTLSDir + "/ca.crt" )
Variables ¶
var ( // MaxBufferSize is the maximum number of messages to buffer per downstream engram MaxBufferSize = getMaxBufferSize() // MaxBufferBytes is the maximum total size of buffered messages in bytes MaxBufferBytes = getMaxBufferBytes() )
var EvictionInterval time.Duration
EvictionInterval is the interval at which the hub evicts idle message buffers. Initialized in init() so that tests can override the environment before import.
Functions ¶
This section is empty.
Types ¶
type HandoffCallback ¶ added in v0.3.0
type HandoffCallback func(storyRunName, storyRunNamespace, stepID, phase, reason string)
HandoffCallback is invoked asynchronously when a stream's handoff phase changes.
type MessageBuffer ¶
type MessageBuffer struct {
// contains filtered or unexported fields
}
MessageBuffer holds buffered messages for a downstream engram that's not ready
func NewMessageBuffer ¶
func NewMessageBuffer(storyRunName, stepName string) *MessageBuffer
NewMessageBuffer creates a new message buffer
func NewMessageBufferWithLimits ¶ added in v0.3.0
func NewMessageBufferWithLimits(storyRunName, stepName string, limits bufferLimits) *MessageBuffer
NewMessageBufferWithLimits creates a new message buffer with explicit limits.
func (*MessageBuffer) Add ¶
func (mb *MessageBuffer) Add(msg *transportpb.DataPacket) bool
Add attempts to add a message to the buffer. Returns true if added, false if buffer is full.
func (*MessageBuffer) ApplyDeadline ¶ added in v0.3.0
func (mb *MessageBuffer) ApplyDeadline(deadline time.Time)
ApplyDeadline ensures the buffer has an expiration deadline; the earliest deadline wins.
func (*MessageBuffer) ApplyLimits ¶ added in v0.3.0
func (mb *MessageBuffer) ApplyLimits(limits bufferLimits)
ApplyLimits updates the buffer limits in-place.
func (*MessageBuffer) ApplyRetryPolicy ¶ added in v0.3.0
func (mb *MessageBuffer) ApplyRetryPolicy(policy *bubuv1alpha1.RetryPolicy, maxDelay time.Duration) bool
ApplyRetryPolicy sets retry behavior for this buffer. Returns false when retries are disabled.
func (*MessageBuffer) DropAll ¶ added in v0.3.0
func (mb *MessageBuffer) DropAll(reason string)
DropAll removes all buffered messages and records them as dropped.
func (*MessageBuffer) DroppedCount ¶
func (mb *MessageBuffer) DroppedCount() int64
DroppedCount returns the total number of dropped messages
func (*MessageBuffer) FlushWithSender ¶
func (mb *MessageBuffer) FlushWithSender(ctx context.Context, send func(*transportpb.DataPacket) error) (int, error)
FlushWithSender flushes buffered messages using the provided sender function. The sender function should perform the actual send (e.g., stream.Send) and handle any necessary serialization outside this method.
func (*MessageBuffer) FlushWithSenderAndPolicy ¶ added in v0.3.0
func (mb *MessageBuffer) FlushWithSenderAndPolicy(ctx context.Context, send func(*transportpb.DataPacket) error, allow func(*transportpb.DataPacket) bool) (int, error)
FlushWithSenderAndPolicy flushes buffered messages using the provided sender function and an optional allow predicate to honor flow-control decisions. The lock is released during sends to prevent head-of-line blocking when a consumer is slow.
func (*MessageBuffer) LastActive ¶
func (mb *MessageBuffer) LastActive() time.Duration
LastActive returns the duration since the last buffer activity (add/flush) using monotonic time to avoid issues with system clock adjustments
func (*MessageBuffer) RecordFlushResult ¶ added in v0.3.0
func (mb *MessageBuffer) RecordFlushResult(err error, base, max time.Duration)
RecordFlushResult updates retry tracking based on the last flush attempt.
func (*MessageBuffer) ShouldRetry ¶ added in v0.3.0
func (mb *MessageBuffer) ShouldRetry(now time.Time) bool
ShouldRetry returns true if the buffer has messages and the retry window has elapsed.
func (*MessageBuffer) Size ¶
func (mb *MessageBuffer) Size() int
Size returns the current number of buffered messages
func (*MessageBuffer) Utilization ¶ added in v0.3.0
func (mb *MessageBuffer) Utilization() float64
Utilization returns the max of message-count and byte-capacity utilization (0..1).
type Server ¶
type Server struct {
transportpb.UnimplementedHubServiceServer
// contains filtered or unexported fields
}
Server is the gRPC hub server.
func NewServer ¶
func NewServer(ctx context.Context, k8sClient client.Client, templateCfg templating.Config, offloadedPolicy, materializeEngram string) (*Server, error)
NewServer creates a new hub server.
func (*Server) Close ¶ added in v0.3.0
func (s *Server) Close()
Close releases long-lived resources owned by the Server.
func (*Server) Process ¶
func (s *Server) Process(stream transportpb.HubService_ProcessServer) error
Process is the bidirectional streaming RPC for the hub.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream represents a single client stream.
func (*Stream) Close ¶ added in v0.3.0
func (s *Stream) Close()
Close signals the send loop to exit and waits for it to drain.
func (*Stream) Send ¶
func (s *Stream) Send(ctx context.Context, req *transportpb.DataPacket) error
Send sends a packet to the stream.
func (*Stream) SendFlow ¶ added in v0.3.0
func (s *Stream) SendFlow(ctx context.Context, flow *transportpb.FlowControl) error
SendFlow sends a flow-control update to the stream.
type StreamManager ¶
type StreamManager struct {
// contains filtered or unexported fields
}
StreamManager manages all active client streams.
func NewStreamManager ¶
func NewStreamManager(store *storage.StorageManager) *StreamManager
NewStreamManager creates a new StreamManager.
func (*StreamManager) AddStream ¶
func (sm *StreamManager) AddStream(ctx context.Context, storyRunName, storyRunNamespace, stepID string, grpcStream transportpb.HubService_ProcessServer, connectorGen ...int32) *streamEntry
AddStream adds a new stream to the manager and returns its entry handle. When a positive connector generation higher than the current active stream arrives, it is stored as a shadow (green) stream for blue-green handoff instead of replacing the active one.
func (*StreamManager) ApplyFlow ¶ added in v0.3.0
func (sm *StreamManager) ApplyFlow(storyRunName, storyRunNamespace, stepID string, flow *transportpb.FlowControl)
ApplyFlow updates flow-control and ack state based on connector feedback.
func (*StreamManager) CutoverShadow ¶ added in v0.3.0
func (sm *StreamManager) CutoverShadow(storyRunName, storyRunNamespace, stepID string) bool
CutoverShadow atomically promotes the shadow (green) stream to active and signals the old (blue) stream to drain. Returns true if cutover succeeded.
func (*StreamManager) HasShadow ¶ added in v0.3.0
func (sm *StreamManager) HasShadow(storyRunName, storyRunNamespace, stepID string) bool
HasShadow reports whether a shadow stream exists for the given step key.
func (*StreamManager) HasStream ¶ added in v0.3.0
func (sm *StreamManager) HasStream(storyRunName, storyRunNamespace, stepID string) bool
HasStream reports whether a stream is currently registered for the StepRun key.
func (*StreamManager) RemoveStream ¶
func (sm *StreamManager) RemoveStream(storyRunName, storyRunNamespace, stepID string, entry *streamEntry)
RemoveStream removes a stream from the manager when the supplied entry still matches. It also handles shadow stream removal for blue-green handoff.
func (*StreamManager) SendHeartbeats ¶
func (sm *StreamManager) SendHeartbeats(ctx context.Context) error
SendHeartbeats sends a heartbeat to all active streams.
func (*StreamManager) SendOrBuffer ¶
func (sm *StreamManager) SendOrBuffer(ctx context.Context, storyRunName, storyRunNamespace, stepID string, packet *transportpb.DataPacket) bool
SendOrBuffer tries to send a packet to a stream, or buffers it if the stream is not yet available.
func (*StreamManager) SendOrBufferWithOptions ¶ added in v0.3.0
func (sm *StreamManager) SendOrBufferWithOptions(ctx context.Context, storyRunName, storyRunNamespace, stepID string, packet *transportpb.DataPacket, opts StreamOptions) bool
SendOrBufferWithOptions applies retry, buffering, flow-control, and delivery policies.
func (*StreamManager) SendOrBufferWithPolicy ¶ added in v0.3.0
func (sm *StreamManager) SendOrBufferWithPolicy(ctx context.Context, storyRunName, storyRunNamespace, stepID string, packet *transportpb.DataPacket, policy *bubuv1alpha1.RetryPolicy) bool
SendOrBufferWithPolicy applies retry policy when buffering or retrying.
func (*StreamManager) SendOrBufferWithPolicyAndLimits ¶ added in v0.3.0
func (sm *StreamManager) SendOrBufferWithPolicyAndLimits(ctx context.Context, storyRunName, storyRunNamespace, stepID string, packet *transportpb.DataPacket, policy *bubuv1alpha1.RetryPolicy, limits bufferLimits) bool
SendOrBufferWithPolicyAndLimits applies retry policy and buffer limits when buffering or retrying.
func (*StreamManager) SetOnHandoffChange ¶ added in v0.3.0
func (sm *StreamManager) SetOnHandoffChange(cb HandoffCallback)
SetOnHandoffChange registers a callback invoked whenever a stream's handoff phase transitions. Must be called before any streams are added.
func (*StreamManager) StartEvictor ¶
func (sm *StreamManager) StartEvictor(ctx context.Context)
StartEvictor starts a background goroutine to flush retryable buffers and evict old, unused ones.
type StreamOptions ¶ added in v0.3.0
type StreamOptions struct {
RetryPolicy *bubuv1alpha1.RetryPolicy
BufferLimits bufferLimits
Flow flowControlPolicy
Delivery deliveryPolicy
MaxInFlight int
// DrainTimeout caps how long buffered messages may live when waiting for reconnect/cutover.
DrainTimeout time.Duration
// DrainEnabled gates whether DrainTimeout is applied for stream-not-found/retry scenarios.
DrainEnabled bool
}
StreamOptions captures per-stream delivery and flow-control settings.