transport

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2026 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Overview

Package transport provides utilities for gRPC transport management.

This package contains helpers for Transport and TransportBinding resources, including capability aggregation, codec management, security configuration, and engram annotation patching.

Transport Capabilities

Aggregate capabilities from multiple transports:

caps := transport.AggregateCapabilities(transports)

Check for specific capabilities:

if transport.HasAudioCapability(caps, "opus") { ... }
if transport.HasVideoCapability(caps, "h264") { ... }

Codec Management

Parse and validate codec specifications:

codec, err := transport.ParseAudioCodec(spec)
err := transport.ValidateCodecs(codecs)

Security Configuration

Build TLS configuration for transports:

tlsConfig := transport.BuildTLSConfig(binding)

Engram Annotation Patching

The EngramAnnotationPatcher patches transport annotations on Engrams:

patcher := transport.NewEngramAnnotationPatcher(client, logger, recorder)
err := patcher.PatchBindingRef(ctx, namespace, engramName, bindingRef)
err := patcher.PatchReadyStatus(ctx, namespace, engramName, ready, message)

Subpackages

binding: TransportBinding naming and status utilities

name := binding.Name(storyRunName, stepID)
binding.SetReady(binding, true, "Transport ready")

bindinginfo: Binding information aggregation

capabilities: Capability aggregation and helpers

validation: Transport spec and codec validation

errs := validation.ValidateTransport(transport)

Index

Constants

View Source
const MaxReadyMessageLength = 512

MaxReadyMessageLength is the maximum length for transport readiness messages.

Variables

This section is empty.

Functions

func AppendUniqueAudioCodec

func AppendUniqueAudioCodec(
	dst []transportv1alpha1.AudioCodec,
	codec transportv1alpha1.AudioCodec,
) []transportv1alpha1.AudioCodec

AppendUniqueAudioCodec normalizes the provided codec and appends it to dst only when no equivalent codec already exists in the slice.

func AppendUniqueBinary

func AppendUniqueBinary(dst []string, mime string) []string

AppendUniqueBinary normalizes the provided MIME string and appends it to dst only when an identical entry does not already exist.

func AppendUniqueCapability

func AppendUniqueCapability[T any](
	dst []T,
	value T,
	valid func(T) bool,
	canonical func(T) T,
	equals func(a, b T) bool,
) []T

AppendUniqueCapability normalizes the provided value, validates it, and appends it to dst only when no equivalent normalized entry already exists.

func AppendUniqueVideoCodec

func AppendUniqueVideoCodec(
	dst []transportv1alpha1.VideoCodec,
	codec transportv1alpha1.VideoCodec,
) []transportv1alpha1.VideoCodec

AppendUniqueVideoCodec normalizes the provided codec and appends it to dst only when no equivalent codec already exists in the slice.

func ApplyTransportSecurityEnv

func ApplyTransportSecurityEnv(envVars []corev1.EnvVar) []corev1.EnvVar

ApplyTransportSecurityEnv ensures pods advertise the effective transport security policy.

func BuildBindingInfo

func BuildBindingInfo(binding *transportv1alpha1.TransportBinding) *transportpb.BindingInfo

BuildBindingInfo converts a TransportBinding into the shared BindingInfo proto.

func CanUseDirectConnection

func CanUseDirectConnection(upstream, downstream *v1alpha1.Step) bool

CanUseDirectConnection returns true if the connection between two steps can bypass the hub and use a direct P2P gRPC connection.

func CanonicalCapabilityName

func CanonicalCapabilityName(name string) string

CanonicalCapabilityName trims leading/trailing whitespace and lowercases the capability string.

func CanonicalizeAudioCodec

func CanonicalizeAudioCodec(codec transportv1alpha1.AudioCodec) transportv1alpha1.AudioCodec

CanonicalizeAudioCodec trims and lowercases the codec name.

func CanonicalizeVideoCodec

func CanonicalizeVideoCodec(codec transportv1alpha1.VideoCodec) transportv1alpha1.VideoCodec

CanonicalizeVideoCodec trims and lowercases the codec name and profile.

func CloneAudioCodecs

func CloneAudioCodecs(src []transportv1alpha1.AudioCodec) []transportv1alpha1.AudioCodec

CloneAudioCodecs returns a deep copy of the provided audio codec slice so callers can mutate the result without affecting the original slice.

func CloneBinaryCapabilities

func CloneBinaryCapabilities(src []string) []string

CloneBinaryCapabilities returns a deep copy of the provided binary capability slice.

func CloneSlice

func CloneSlice[T any](src []T) []T

CloneSlice returns a shallow copy of the provided slice.

func CloneVideoCodecs

func CloneVideoCodecs(src []transportv1alpha1.VideoCodec) []transportv1alpha1.VideoCodec

CloneVideoCodecs returns a deep copy of the provided video codec slice.

func DecodeBindingInfo

func DecodeBindingInfo(value string) (*transportpb.BindingInfo, error)

DecodeBindingInfo extracts BindingInfo proto from the serialized binding env value.

func DeriveNegotiatedCapabilities

func DeriveNegotiatedCapabilities(
	binding *transportv1alpha1.TransportBinding,
	status *transportv1alpha1.TransportBindingStatus,
)

DeriveNegotiatedCapabilities ensures the TransportBinding status reflects the currently requested codecs. It only fills missing negotiated fields so that connectors can later override them with the actual selection.

func EffectiveServicePort

func EffectiveServicePort(resolved *config.ResolvedExecutionConfig, defaultPort int32) int32

EffectiveServicePort returns the first declared service port or falls back to defaultPort.

func EncodeBindingEnv

func EncodeBindingEnv(binding *transportv1alpha1.TransportBinding) (string, error)

EncodeBindingEnv serializes binding metadata into a JSON envelope that can be passed via env.

func ExplicitTLSSecretName

func ExplicitTLSSecretName(engram *bubuv1alpha1.Engram) string

ExplicitTLSSecretName returns the concrete secret explicitly referenced by spec.

func HasDeclaredCapabilities

func HasDeclaredCapabilities(spec *transportv1alpha1.TransportSpec) bool

HasDeclaredCapabilities reports whether the given transport spec declares at least one supported audio, video, or binary capability slice. Controllers and webhooks use this to enforce the shared "a transport must declare some capability" rule.

func LocalConnectorEndpoint

func LocalConnectorEndpoint(port int32) string

LocalConnectorEndpoint returns the loopback endpoint bindings should advertise when sidecars are injected.

func MarshalRuntimeTransportDescriptors

func MarshalRuntimeTransportDescriptors(
	ctx context.Context,
	reader client.Reader,
	story *bubuv1alpha1.Story,
) (string, error)

MarshalRuntimeTransportDescriptors serializes Story transports into the runtime-facing descriptor contract expected by engrams and transport-aware SDKs.

Behavior:

  • Returns an empty string when the Story declares no transports.
  • Resolves descriptor.Kind from the referenced Transport when possible, preferring provider, then driver, then the raw transportRef.
  • Carries effective mode information from Story status when available.
  • Includes transportRef and modeReason in descriptor.Config for downstream consumers.

func MarshalSettings

func MarshalSettings(value any) (*runtime.RawExtension, error)

MarshalSettings encodes a settings struct into a RawExtension, returning nil for empty values.

func MarshalStoryTransportStatuses

func MarshalStoryTransportStatuses(transports []bubuv1alpha1.StoryTransportStatus) (string, error)

MarshalStoryTransportStatuses serializes effective transport statuses (including controller-selected mode) to JSON.

func MarshalStoryTransports

func MarshalStoryTransports(transports []bubuv1alpha1.StoryTransport) (string, error)

MarshalStoryTransports serializes the Story transports slice to JSON.

func MergeSettings

func MergeSettings(base, overrides *runtime.RawExtension) ([]byte, error)

MergeSettings merges the default transport settings with overrides, returning a raw JSON blob.

func MergeSettingsWithStreaming

func MergeSettingsWithStreaming(base *runtime.RawExtension, streaming any) (*runtime.RawExtension, error)

MergeSettingsWithStreaming merges a raw settings block with structured streaming settings.

func PopulateBindingMedia

func PopulateBindingMedia(binding *transportv1alpha1.TransportBinding, transport *transportv1alpha1.Transport) error

PopulateBindingMedia ensures the binding's audio/video declarations are compatible with the Transport.

func ResolveBindingEnvValue

func ResolveBindingEnvValue(ctx context.Context, reader client.Reader, namespace, bindingName string) string

ResolveBindingEnvValue fetches a TransportBinding by name and returns its encoded env value. Returns the binding name as fallback if the binding cannot be fetched or encoded.

func ResolveStoryTransport

func ResolveStoryTransport(
	ctx context.Context,
	reader client.Reader,
	story *bubuv1alpha1.Story,
	transportName string,
) (*transportv1alpha1.Transport, *bubuv1alpha1.StoryTransport, error)

ResolveStoryTransport returns the named transport (or the first available one when name is empty) along with its declaration.

func ResolveTLSSecretName

func ResolveTLSSecretName(engram *bubuv1alpha1.Engram, defaultSecret string) string

ResolveTLSSecretName determines the TLS secret for an Engram, considering spec overrides and an operator-level default.

func ShouldForceHubRouting

func ShouldForceHubRouting(transportSettings map[string]any) bool

ShouldForceHubRouting checks if transport settings explicitly disable P2P routing. Transport settings can include: { "routing": { "mode": "hub" } } Returns true if routing should be forced to hub, false otherwise (allowing P2P).

func StepNeedsHubRouting

func StepNeedsHubRouting(step *v1alpha1.Step) bool

StepNeedsHubRouting returns true if the step requires hub-based routing for per-packet template evaluation. Steps with runtime configuration must go through the hub; steps without runtime config can use direct P2P connections.

func TruncateMessage

func TruncateMessage(message string, maxRunes int) string

TruncateMessage trims whitespace and truncates to maxRunes.

func ValidateCodecSupport

func ValidateCodecSupport(binding *transportv1alpha1.TransportBinding, transport *transportv1alpha1.Transport) error

ValidateCodecSupport ensures binding codecs exist in the transport capability list.

Types

type Aggregation

type Aggregation struct {
	Audio         []transportv1alpha1.AudioCodec
	Video         []transportv1alpha1.VideoCodec
	Binary        []string
	ReadyBindings int
	TotalBindings int
	BindingErrors []string
	LastHeartbeat *metav1.Time
}

Aggregation captures the negotiated codec slices and binding health counters discovered while inspecting TransportBindings for a given Transport.

func AggregateBindings

func AggregateBindings(
	ctx context.Context,
	c client.Client,
	transportObj *transportv1alpha1.Transport,
	opts AggregationOptions,
) (*Aggregation, error)

AggregateBindings lists TransportBindings referencing the provided Transport, derives negotiated capability slices, enforces heartbeat freshness, and returns the aggregated results.

type AggregationOptions

type AggregationOptions struct {
	Logger           logr.Logger
	HeartbeatTimeout time.Duration
}

AggregationOptions configures how AggregateBindings inspects and patches TransportBindings.

type EngramAnnotationPatcher

type EngramAnnotationPatcher struct {
	// Client is the Kubernetes client for fetching and patching Engrams.
	Client client.Client
	// Logger is used for debug and error logging.
	Logger logr.Logger
}

EngramAnnotationPatcher provides methods to patch Engram transport annotations. This abstraction allows both bobrapet and bobravoz-grpc to share annotation logic while using their own Engram types.

func NewEngramAnnotationPatcher

func NewEngramAnnotationPatcher(c client.Client, log logr.Logger) *EngramAnnotationPatcher

NewEngramAnnotationPatcher creates a new patcher with the given client and logger.

func (*EngramAnnotationPatcher) PatchBindingRef

func (p *EngramAnnotationPatcher) PatchBindingRef(
	ctx context.Context,
	namespace, name string,
	bindingValue string,
	fetchEngram func(ctx context.Context, key types.NamespacedName) (client.Object, error),
) (bool, error)

PatchBindingRef patches the TransportBindingAnnotation on an Engram.

Behavior:

  • Sanitizes the binding value using coretransport.SanitizeBindingAnnotationValue.
  • Only patches if the annotation value has changed.
  • Returns nil if the Engram is not found (deleted before patch).

Arguments:

  • ctx context.Context: context for API calls.
  • namespace string: the Engram's namespace.
  • name string: the Engram's name.
  • bindingValue string: the raw binding env value to sanitize and store.
  • fetchEngram func(ctx context.Context, key types.NamespacedName) (client.Object, error): function to fetch the Engram object.

Returns:

  • bool: true if the annotation was patched, false if unchanged or not found.
  • error: non-nil on fetch/patch failure.

func (*EngramAnnotationPatcher) PatchReadyStatus

func (p *EngramAnnotationPatcher) PatchReadyStatus(
	ctx context.Context,
	namespace, name string,
	ready bool,
	message string,
	fetchEngram func(ctx context.Context, key types.NamespacedName) (client.Object, error),
) (bool, error)

PatchReadyStatus patches the TransportReadyAnnotation and TransportReadyMessageAnnotation.

Behavior:

  • Truncates message to MaxReadyMessageLength runes.
  • Only patches if ready flag or message has changed.
  • Returns nil if the Engram is not found (deleted before patch).

Arguments:

  • ctx context.Context: context for API calls.
  • namespace string: the Engram's namespace.
  • name string: the Engram's name.
  • ready bool: the transport readiness state.
  • message string: the status message (will be trimmed and truncated).
  • fetchEngram func(ctx context.Context, key types.NamespacedName) (client.Object, error): function to fetch the Engram object.

Returns:

  • bool: true if annotations were patched, false if unchanged or not found.
  • error: non-nil on fetch/patch failure.

type RoutingMode

type RoutingMode string

RoutingMode describes how data flows between steps.

const (
	// RoutingModeP2P indicates direct peer-to-peer connection between engrams
	RoutingModeP2P RoutingMode = "p2p"
	// RoutingModeHub indicates hub-mediated routing for template evaluation
	RoutingModeHub RoutingMode = "hub"
)

func RoutingModeOverride

func RoutingModeOverride(transportSettings map[string]any) (RoutingMode, bool, string)

RoutingModeOverride inspects transport settings for an explicit routing override. Returns the override mode, whether it is valid, and the raw string when present.

type RoutingResolver

type RoutingResolver struct {
	// contains filtered or unexported fields
}

RoutingResolver resolves routing endpoints for engrams based on Story topology.

func NewRoutingResolver

func NewRoutingResolver(
	cli client.Client,
	story *v1alpha1.Story,
	storyRun *runsv1alpha1.StoryRun,
) *RoutingResolver

NewRoutingResolver creates a resolver for the given Story and StoryRun.

func (*RoutingResolver) PopulateBindingRoutingStatus

func (r *RoutingResolver) PopulateBindingRoutingStatus(
	ctx context.Context,
	binding *transportv1alpha1.TransportBinding,
	hubEndpoint string,
) error

PopulateBindingRoutingStatus updates a TransportBinding's status with routing information.

func (*RoutingResolver) ResolveUpstreamEndpoint

func (r *RoutingResolver) ResolveUpstreamEndpoint(
	ctx context.Context,
	stepName string,
	hubEndpoint string,
) (endpoint string, isHub bool, err error)

ResolveUpstreamEndpoint determines the endpoint this step should connect to. Returns:

  • endpoint string: The gRPC endpoint to connect to (host:port)
  • isHub bool: True if connecting to hub, false if P2P to downstream engram
  • error: Any error during resolution

type StepRoutingInfo

type StepRoutingInfo struct {
	StepName         string
	DownstreamSteps  []string
	RoutingMode      RoutingMode
	UpstreamEndpoint string // Empty means use hub, otherwise the direct peer endpoint
}

StepRoutingInfo contains routing information for a step.

type TopologyAnalyzer

type TopologyAnalyzer struct {
	// contains filtered or unexported fields
}

TopologyAnalyzer analyzes Story DAG to determine routing topology.

func NewTopologyAnalyzer

func NewTopologyAnalyzer(story *v1alpha1.Story) *TopologyAnalyzer

NewTopologyAnalyzer creates a new analyzer for the given story.

func (*TopologyAnalyzer) AnalyzeStepRouting

func (a *TopologyAnalyzer) AnalyzeStepRouting(stepName string) (*StepRoutingInfo, error)

AnalyzeStepRouting determines the routing mode and upstream endpoint for a step. Returns the routing info for the step.

func (*TopologyAnalyzer) CanUseP2PRouting

func (a *TopologyAnalyzer) CanUseP2PRouting(upstreamName, downstreamName string) bool

CanUseP2PRouting returns true if the connection from upstream to downstream can use P2P.

func (*TopologyAnalyzer) GetRoutingTopology

func (a *TopologyAnalyzer) GetRoutingTopology() (map[string]*StepRoutingInfo, error)

GetRoutingTopology returns routing information for all steps in the story.

func (*TopologyAnalyzer) HasP2PUpstream

func (a *TopologyAnalyzer) HasP2PUpstream(stepName string) bool

HasP2PUpstream reports whether any upstream step can connect directly to the given step using P2P routing. This is used to decide if a step should advertise a downstream host for direct connections.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL