Documentation
¶
Overview ¶
Package transport provides utilities for gRPC transport management.
This package contains helpers for Transport and TransportBinding resources, including capability aggregation, codec management, security configuration, and engram annotation patching.
Transport Capabilities ¶
Aggregate capabilities from multiple transports:
caps := transport.AggregateCapabilities(transports)
Check for specific capabilities:
if transport.HasAudioCapability(caps, "opus") { ... }
if transport.HasVideoCapability(caps, "h264") { ... }
Codec Management ¶
Parse and validate codec specifications:
codec, err := transport.ParseAudioCodec(spec) err := transport.ValidateCodecs(codecs)
Security Configuration ¶
Build TLS configuration for transports:
tlsConfig := transport.BuildTLSConfig(binding)
Engram Annotation Patching ¶
The EngramAnnotationPatcher patches transport annotations on Engrams:
patcher := transport.NewEngramAnnotationPatcher(client, logger, recorder) err := patcher.PatchBindingRef(ctx, namespace, engramName, bindingRef) err := patcher.PatchReadyStatus(ctx, namespace, engramName, ready, message)
Subpackages ¶
binding: TransportBinding naming and status utilities
name := binding.Name(storyRunName, stepID) binding.SetReady(binding, true, "Transport ready")
bindinginfo: Binding information aggregation
capabilities: Capability aggregation and helpers
validation: Transport spec and codec validation
errs := validation.ValidateTransport(transport)
Index ¶
- Constants
- func AppendUniqueAudioCodec(dst []transportv1alpha1.AudioCodec, codec transportv1alpha1.AudioCodec) []transportv1alpha1.AudioCodec
- func AppendUniqueBinary(dst []string, mime string) []string
- func AppendUniqueCapability[T any](dst []T, value T, valid func(T) bool, canonical func(T) T, ...) []T
- func AppendUniqueVideoCodec(dst []transportv1alpha1.VideoCodec, codec transportv1alpha1.VideoCodec) []transportv1alpha1.VideoCodec
- func ApplyTransportSecurityEnv(envVars []corev1.EnvVar) []corev1.EnvVar
- func BuildBindingInfo(binding *transportv1alpha1.TransportBinding) *transportpb.BindingInfo
- func CanUseDirectConnection(upstream, downstream *v1alpha1.Step) bool
- func CanonicalCapabilityName(name string) string
- func CanonicalizeAudioCodec(codec transportv1alpha1.AudioCodec) transportv1alpha1.AudioCodec
- func CanonicalizeVideoCodec(codec transportv1alpha1.VideoCodec) transportv1alpha1.VideoCodec
- func CloneAudioCodecs(src []transportv1alpha1.AudioCodec) []transportv1alpha1.AudioCodec
- func CloneBinaryCapabilities(src []string) []string
- func CloneSlice[T any](src []T) []T
- func CloneVideoCodecs(src []transportv1alpha1.VideoCodec) []transportv1alpha1.VideoCodec
- func DecodeBindingInfo(value string) (*transportpb.BindingInfo, error)
- func DeriveNegotiatedCapabilities(binding *transportv1alpha1.TransportBinding, ...)
- func EffectiveServicePort(resolved *config.ResolvedExecutionConfig, defaultPort int32) int32
- func EncodeBindingEnv(binding *transportv1alpha1.TransportBinding) (string, error)
- func ExplicitTLSSecretName(engram *bubuv1alpha1.Engram) string
- func HasDeclaredCapabilities(spec *transportv1alpha1.TransportSpec) bool
- func LocalConnectorEndpoint(port int32) string
- func MarshalRuntimeTransportDescriptors(ctx context.Context, reader client.Reader, story *bubuv1alpha1.Story) (string, error)
- func MarshalSettings(value any) (*runtime.RawExtension, error)
- func MarshalStoryTransportStatuses(transports []bubuv1alpha1.StoryTransportStatus) (string, error)
- func MarshalStoryTransports(transports []bubuv1alpha1.StoryTransport) (string, error)
- func MergeSettings(base, overrides *runtime.RawExtension) ([]byte, error)
- func MergeSettingsWithStreaming(base *runtime.RawExtension, streaming any) (*runtime.RawExtension, error)
- func PopulateBindingMedia(binding *transportv1alpha1.TransportBinding, ...) error
- func ResolveBindingEnvValue(ctx context.Context, reader client.Reader, namespace, bindingName string) string
- func ResolveStoryTransport(ctx context.Context, reader client.Reader, story *bubuv1alpha1.Story, ...) (*transportv1alpha1.Transport, *bubuv1alpha1.StoryTransport, error)
- func ResolveTLSSecretName(engram *bubuv1alpha1.Engram, defaultSecret string) string
- func ShouldForceHubRouting(transportSettings map[string]any) bool
- func StepNeedsHubRouting(step *v1alpha1.Step) bool
- func TruncateMessage(message string, maxRunes int) string
- func ValidateCodecSupport(binding *transportv1alpha1.TransportBinding, ...) error
- type Aggregation
- type AggregationOptions
- type EngramAnnotationPatcher
- type RoutingMode
- type RoutingResolver
- type StepRoutingInfo
- type TopologyAnalyzer
- func (a *TopologyAnalyzer) AnalyzeStepRouting(stepName string) (*StepRoutingInfo, error)
- func (a *TopologyAnalyzer) CanUseP2PRouting(upstreamName, downstreamName string) bool
- func (a *TopologyAnalyzer) GetRoutingTopology() (map[string]*StepRoutingInfo, error)
- func (a *TopologyAnalyzer) HasP2PUpstream(stepName string) bool
Constants ¶
const MaxReadyMessageLength = 512
MaxReadyMessageLength is the maximum length for transport readiness messages.
Variables ¶
This section is empty.
Functions ¶
func AppendUniqueAudioCodec ¶
func AppendUniqueAudioCodec( dst []transportv1alpha1.AudioCodec, codec transportv1alpha1.AudioCodec, ) []transportv1alpha1.AudioCodec
AppendUniqueAudioCodec normalizes the provided codec and appends it to dst only when no equivalent codec already exists in the slice.
func AppendUniqueBinary ¶
AppendUniqueBinary normalizes the provided MIME string and appends it to dst only when an identical entry does not already exist.
func AppendUniqueCapability ¶
func AppendUniqueCapability[T any]( dst []T, value T, valid func(T) bool, canonical func(T) T, equals func(a, b T) bool, ) []T
AppendUniqueCapability normalizes the provided value, validates it, and appends it to dst only when no equivalent normalized entry already exists.
func AppendUniqueVideoCodec ¶
func AppendUniqueVideoCodec( dst []transportv1alpha1.VideoCodec, codec transportv1alpha1.VideoCodec, ) []transportv1alpha1.VideoCodec
AppendUniqueVideoCodec normalizes the provided codec and appends it to dst only when no equivalent codec already exists in the slice.
func ApplyTransportSecurityEnv ¶
ApplyTransportSecurityEnv ensures pods advertise the effective transport security policy.
func BuildBindingInfo ¶
func BuildBindingInfo(binding *transportv1alpha1.TransportBinding) *transportpb.BindingInfo
BuildBindingInfo converts a TransportBinding into the shared BindingInfo proto.
func CanUseDirectConnection ¶
CanUseDirectConnection returns true if the connection between two steps can bypass the hub and use a direct P2P gRPC connection.
func CanonicalCapabilityName ¶
CanonicalCapabilityName trims leading/trailing whitespace and lowercases the capability string.
func CanonicalizeAudioCodec ¶
func CanonicalizeAudioCodec(codec transportv1alpha1.AudioCodec) transportv1alpha1.AudioCodec
CanonicalizeAudioCodec trims and lowercases the codec name.
func CanonicalizeVideoCodec ¶
func CanonicalizeVideoCodec(codec transportv1alpha1.VideoCodec) transportv1alpha1.VideoCodec
CanonicalizeVideoCodec trims and lowercases the codec name and profile.
func CloneAudioCodecs ¶
func CloneAudioCodecs(src []transportv1alpha1.AudioCodec) []transportv1alpha1.AudioCodec
CloneAudioCodecs returns a deep copy of the provided audio codec slice so callers can mutate the result without affecting the original slice.
func CloneBinaryCapabilities ¶
CloneBinaryCapabilities returns a deep copy of the provided binary capability slice.
func CloneSlice ¶
func CloneSlice[T any](src []T) []T
CloneSlice returns a shallow copy of the provided slice.
func CloneVideoCodecs ¶
func CloneVideoCodecs(src []transportv1alpha1.VideoCodec) []transportv1alpha1.VideoCodec
CloneVideoCodecs returns a deep copy of the provided video codec slice.
func DecodeBindingInfo ¶
func DecodeBindingInfo(value string) (*transportpb.BindingInfo, error)
DecodeBindingInfo extracts BindingInfo proto from the serialized binding env value.
func DeriveNegotiatedCapabilities ¶
func DeriveNegotiatedCapabilities( binding *transportv1alpha1.TransportBinding, status *transportv1alpha1.TransportBindingStatus, )
DeriveNegotiatedCapabilities ensures the TransportBinding status reflects the currently requested codecs. It only fills missing negotiated fields so that connectors can later override them with the actual selection.
func EffectiveServicePort ¶
func EffectiveServicePort(resolved *config.ResolvedExecutionConfig, defaultPort int32) int32
EffectiveServicePort returns the first declared service port or falls back to defaultPort.
func EncodeBindingEnv ¶
func EncodeBindingEnv(binding *transportv1alpha1.TransportBinding) (string, error)
EncodeBindingEnv serializes binding metadata into a JSON envelope that can be passed via env.
func ExplicitTLSSecretName ¶
func ExplicitTLSSecretName(engram *bubuv1alpha1.Engram) string
ExplicitTLSSecretName returns the concrete secret explicitly referenced by spec.
func HasDeclaredCapabilities ¶
func HasDeclaredCapabilities(spec *transportv1alpha1.TransportSpec) bool
HasDeclaredCapabilities reports whether the given transport spec declares at least one supported audio, video, or binary capability slice. Controllers and webhooks use this to enforce the shared "a transport must declare some capability" rule.
func LocalConnectorEndpoint ¶
LocalConnectorEndpoint returns the loopback endpoint bindings should advertise when sidecars are injected.
func MarshalRuntimeTransportDescriptors ¶
func MarshalRuntimeTransportDescriptors( ctx context.Context, reader client.Reader, story *bubuv1alpha1.Story, ) (string, error)
MarshalRuntimeTransportDescriptors serializes Story transports into the runtime-facing descriptor contract expected by engrams and transport-aware SDKs.
Behavior:
- Returns an empty string when the Story declares no transports.
- Resolves descriptor.Kind from the referenced Transport when possible, preferring provider, then driver, then the raw transportRef.
- Carries effective mode information from Story status when available.
- Includes transportRef and modeReason in descriptor.Config for downstream consumers.
func MarshalSettings ¶
func MarshalSettings(value any) (*runtime.RawExtension, error)
MarshalSettings encodes a settings struct into a RawExtension, returning nil for empty values.
func MarshalStoryTransportStatuses ¶
func MarshalStoryTransportStatuses(transports []bubuv1alpha1.StoryTransportStatus) (string, error)
MarshalStoryTransportStatuses serializes effective transport statuses (including controller-selected mode) to JSON.
func MarshalStoryTransports ¶
func MarshalStoryTransports(transports []bubuv1alpha1.StoryTransport) (string, error)
MarshalStoryTransports serializes the Story transports slice to JSON.
func MergeSettings ¶
func MergeSettings(base, overrides *runtime.RawExtension) ([]byte, error)
MergeSettings merges the default transport settings with overrides, returning a raw JSON blob.
func MergeSettingsWithStreaming ¶
func MergeSettingsWithStreaming(base *runtime.RawExtension, streaming any) (*runtime.RawExtension, error)
MergeSettingsWithStreaming merges a raw settings block with structured streaming settings.
func PopulateBindingMedia ¶
func PopulateBindingMedia(binding *transportv1alpha1.TransportBinding, transport *transportv1alpha1.Transport) error
PopulateBindingMedia ensures the binding's audio/video declarations are compatible with the Transport.
func ResolveBindingEnvValue ¶
func ResolveBindingEnvValue(ctx context.Context, reader client.Reader, namespace, bindingName string) string
ResolveBindingEnvValue fetches a TransportBinding by name and returns its encoded env value. Returns the binding name as fallback if the binding cannot be fetched or encoded.
func ResolveStoryTransport ¶
func ResolveStoryTransport( ctx context.Context, reader client.Reader, story *bubuv1alpha1.Story, transportName string, ) (*transportv1alpha1.Transport, *bubuv1alpha1.StoryTransport, error)
ResolveStoryTransport returns the named transport (or the first available one when name is empty) along with its declaration.
func ResolveTLSSecretName ¶
func ResolveTLSSecretName(engram *bubuv1alpha1.Engram, defaultSecret string) string
ResolveTLSSecretName determines the TLS secret for an Engram, considering spec overrides and an operator-level default.
func ShouldForceHubRouting ¶
ShouldForceHubRouting checks if transport settings explicitly disable P2P routing. Transport settings can include: { "routing": { "mode": "hub" } } Returns true if routing should be forced to hub, false otherwise (allowing P2P).
func StepNeedsHubRouting ¶
StepNeedsHubRouting returns true if the step requires hub-based routing for per-packet template evaluation. Steps with runtime configuration must go through the hub; steps without runtime config can use direct P2P connections.
func TruncateMessage ¶
TruncateMessage trims whitespace and truncates to maxRunes.
func ValidateCodecSupport ¶
func ValidateCodecSupport(binding *transportv1alpha1.TransportBinding, transport *transportv1alpha1.Transport) error
ValidateCodecSupport ensures binding codecs exist in the transport capability list.
Types ¶
type Aggregation ¶
type Aggregation struct {
Audio []transportv1alpha1.AudioCodec
Video []transportv1alpha1.VideoCodec
Binary []string
ReadyBindings int
TotalBindings int
BindingErrors []string
LastHeartbeat *metav1.Time
}
Aggregation captures the negotiated codec slices and binding health counters discovered while inspecting TransportBindings for a given Transport.
func AggregateBindings ¶
func AggregateBindings( ctx context.Context, c client.Client, transportObj *transportv1alpha1.Transport, opts AggregationOptions, ) (*Aggregation, error)
AggregateBindings lists TransportBindings referencing the provided Transport, derives negotiated capability slices, enforces heartbeat freshness, and returns the aggregated results.
type AggregationOptions ¶
AggregationOptions configures how AggregateBindings inspects and patches TransportBindings.
type EngramAnnotationPatcher ¶
type EngramAnnotationPatcher struct {
// Client is the Kubernetes client for fetching and patching Engrams.
Client client.Client
// Logger is used for debug and error logging.
Logger logr.Logger
}
EngramAnnotationPatcher provides methods to patch Engram transport annotations. This abstraction allows both bobrapet and bobravoz-grpc to share annotation logic while using their own Engram types.
func NewEngramAnnotationPatcher ¶
func NewEngramAnnotationPatcher(c client.Client, log logr.Logger) *EngramAnnotationPatcher
NewEngramAnnotationPatcher creates a new patcher with the given client and logger.
func (*EngramAnnotationPatcher) PatchBindingRef ¶
func (p *EngramAnnotationPatcher) PatchBindingRef( ctx context.Context, namespace, name string, bindingValue string, fetchEngram func(ctx context.Context, key types.NamespacedName) (client.Object, error), ) (bool, error)
PatchBindingRef patches the TransportBindingAnnotation on an Engram.
Behavior:
- Sanitizes the binding value using coretransport.SanitizeBindingAnnotationValue.
- Only patches if the annotation value has changed.
- Returns nil if the Engram is not found (deleted before patch).
Arguments:
- ctx context.Context: context for API calls.
- namespace string: the Engram's namespace.
- name string: the Engram's name.
- bindingValue string: the raw binding env value to sanitize and store.
- fetchEngram func(ctx context.Context, key types.NamespacedName) (client.Object, error): function to fetch the Engram object.
Returns:
- bool: true if the annotation was patched, false if unchanged or not found.
- error: non-nil on fetch/patch failure.
func (*EngramAnnotationPatcher) PatchReadyStatus ¶
func (p *EngramAnnotationPatcher) PatchReadyStatus( ctx context.Context, namespace, name string, ready bool, message string, fetchEngram func(ctx context.Context, key types.NamespacedName) (client.Object, error), ) (bool, error)
PatchReadyStatus patches the TransportReadyAnnotation and TransportReadyMessageAnnotation.
Behavior:
- Truncates message to MaxReadyMessageLength runes.
- Only patches if ready flag or message has changed.
- Returns nil if the Engram is not found (deleted before patch).
Arguments:
- ctx context.Context: context for API calls.
- namespace string: the Engram's namespace.
- name string: the Engram's name.
- ready bool: the transport readiness state.
- message string: the status message (will be trimmed and truncated).
- fetchEngram func(ctx context.Context, key types.NamespacedName) (client.Object, error): function to fetch the Engram object.
Returns:
- bool: true if annotations were patched, false if unchanged or not found.
- error: non-nil on fetch/patch failure.
type RoutingMode ¶
type RoutingMode string
RoutingMode describes how data flows between steps.
const ( // RoutingModeP2P indicates direct peer-to-peer connection between engrams RoutingModeP2P RoutingMode = "p2p" // RoutingModeHub indicates hub-mediated routing for template evaluation RoutingModeHub RoutingMode = "hub" )
func RoutingModeOverride ¶
func RoutingModeOverride(transportSettings map[string]any) (RoutingMode, bool, string)
RoutingModeOverride inspects transport settings for an explicit routing override. Returns the override mode, whether it is valid, and the raw string when present.
type RoutingResolver ¶
type RoutingResolver struct {
// contains filtered or unexported fields
}
RoutingResolver resolves routing endpoints for engrams based on Story topology.
func NewRoutingResolver ¶
func NewRoutingResolver( cli client.Client, story *v1alpha1.Story, storyRun *runsv1alpha1.StoryRun, ) *RoutingResolver
NewRoutingResolver creates a resolver for the given Story and StoryRun.
func (*RoutingResolver) PopulateBindingRoutingStatus ¶
func (r *RoutingResolver) PopulateBindingRoutingStatus( ctx context.Context, binding *transportv1alpha1.TransportBinding, hubEndpoint string, ) error
PopulateBindingRoutingStatus updates a TransportBinding's status with routing information.
func (*RoutingResolver) ResolveUpstreamEndpoint ¶
func (r *RoutingResolver) ResolveUpstreamEndpoint( ctx context.Context, stepName string, hubEndpoint string, ) (endpoint string, isHub bool, err error)
ResolveUpstreamEndpoint determines the endpoint this step should connect to. Returns:
- endpoint string: The gRPC endpoint to connect to (host:port)
- isHub bool: True if connecting to hub, false if P2P to downstream engram
- error: Any error during resolution
type StepRoutingInfo ¶
type StepRoutingInfo struct {
StepName string
DownstreamSteps []string
RoutingMode RoutingMode
UpstreamEndpoint string // Empty means use hub, otherwise the direct peer endpoint
}
StepRoutingInfo contains routing information for a step.
type TopologyAnalyzer ¶
type TopologyAnalyzer struct {
// contains filtered or unexported fields
}
TopologyAnalyzer analyzes Story DAG to determine routing topology.
func NewTopologyAnalyzer ¶
func NewTopologyAnalyzer(story *v1alpha1.Story) *TopologyAnalyzer
NewTopologyAnalyzer creates a new analyzer for the given story.
func (*TopologyAnalyzer) AnalyzeStepRouting ¶
func (a *TopologyAnalyzer) AnalyzeStepRouting(stepName string) (*StepRoutingInfo, error)
AnalyzeStepRouting determines the routing mode and upstream endpoint for a step. Returns the routing info for the step.
func (*TopologyAnalyzer) CanUseP2PRouting ¶
func (a *TopologyAnalyzer) CanUseP2PRouting(upstreamName, downstreamName string) bool
CanUseP2PRouting returns true if the connection from upstream to downstream can use P2P.
func (*TopologyAnalyzer) GetRoutingTopology ¶
func (a *TopologyAnalyzer) GetRoutingTopology() (map[string]*StepRoutingInfo, error)
GetRoutingTopology returns routing information for all steps in the story.
func (*TopologyAnalyzer) HasP2PUpstream ¶
func (a *TopologyAnalyzer) HasP2PUpstream(stepName string) bool
HasP2PUpstream reports whether any upstream step can connect directly to the given step using P2P routing. This is used to decide if a step should advertise a downstream host for direct connections.