Documentation
¶
Index ¶
Constants ¶
const ( // Annotation keys for pod queue metadata AnnotationMessageID = "queue.valkey.mid" AnnotationFile = "queue.file" AnnotationAttempts = "queue.attempts" // Annotation values AnnotationValueTrue = "true" // Condition types ConditionTypeProgressing = "Progressing" ConditionTypeSucceeded = "Succeeded" ConditionTypeDegraded = "Degraded" // Reconciliation intervals StatusUpdateInterval = 30 * time.Second // DefaultVideoInputPath is where the claimer stores downloaded artifacts when not overridden. DefaultVideoInputPath = "/ws/input.mp4" // DefaultValkeyNSSecretName is the default name for per-namespace Valkey credentials secrets. DefaultValkeyNSSecretName = "valkey-ns-credentials" // FinalizerValkeyCredentials ensures Valkey ACL users are cleaned up on deletion. FinalizerValkeyCredentials = "filter.plainsight.ai/valkey-credentials" // FinalizerStreamingCleanup ensures streaming resources (Deployment, Services) are cleaned up on deletion. FinalizerStreamingCleanup = "filter.plainsight.ai/streaming-cleanup" // TraceparentAnnotation is the PipelineInstance annotation key carrying the // W3C `traceparent` header that the upstream span context wrote during the // API → controller → filter handoff. The controller copies it into the // TRACEPARENT env var on each filter container so openfilter's OTel SDK // continues the trace. Owned cross-repo with plainsight-deployment-agent // (PLAT-851), which writes the same key. TraceparentAnnotation = "traces.opentelemetry.io/traceparent" // TracestateAnnotation is the PipelineInstance annotation key carrying the // W3C `tracestate` header. Propagated as TRACESTATE env var alongside // TRACEPARENT so vendor-specific trace context survives the controller hop. TracestateAnnotation = "traces.opentelemetry.io/tracestate" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type PipelineInstanceReconciler ¶ added in v0.2.0
type PipelineInstanceReconciler struct {
client.Client
Scheme *runtime.Scheme
ValkeyClient ValkeyClientInterface
ValkeyAddr string
ValkeyNSSecretName string // Name of the per-namespace Valkey credentials secret (default: valkey-ns-credentials)
ClaimerImage string // Image for the claimer init container
GPUNodeSelectorLabels map[string]string // Node selector labels applied to pods that request nvidia.com/gpu resources; nil disables the feature
GPULibraryPath string // Value injected as LD_LIBRARY_PATH for GPU containers; empty string disables injection
GPUBinPath string // Value injected as OPENFILTER_APPEND_PATH for GPU containers; empty string disables injection
// TelemetryExporterType and TelemetryExporterOTLPEndpoint are injected into
// filter containers as TELEMETRY_EXPORTER_TYPE and TELEMETRY_EXPORTER_OTLP_ENDPOINT
// so openfilter's OTel client ships spans and metrics to the configured collector.
// Both empty disables injection and openfilter falls back to its silent exporter.
TelemetryExporterType string
TelemetryExporterOTLPEndpoint string
}
PipelineInstanceReconciler reconciles a PipelineInstance object
func (*PipelineInstanceReconciler) Reconcile ¶ added in v0.2.0
func (r *PipelineInstanceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)
Reconcile is part of the main kubernetes reconciliation loop which aims to move the current state of the cluster closer to the desired state.
The reconciler branches on Pipeline mode (Batch or Stream) and delegates to mode-specific reconciliation functions defined in: - pipelineinstance_controller_batch.go: Batch mode reconciliation - pipelineinstance_controller_streaming.go: Streaming mode reconciliation
func (*PipelineInstanceReconciler) SetupWithManager ¶ added in v0.2.0
func (r *PipelineInstanceReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up the controller with the Manager.
type PipelineReconciler ¶
PipelineReconciler reconciles a Pipeline object
func (*PipelineReconciler) Reconcile ¶
Reconcile is part of the main kubernetes reconciliation loop which aims to move the current state of the cluster closer to the desired state. TODO(user): Modify the Reconcile function to compare the state specified by the Pipeline object against the actual cluster state, and then perform operations to make the cluster state reflect the state specified by the user.
For more details, check Reconcile and its Result here: - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.22.1/pkg/reconcile
func (*PipelineReconciler) SetupWithManager ¶
func (r *PipelineReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up the controller with the Manager.
type ValkeyClientInterface ¶
type ValkeyClientInterface interface {
CreateStreamAndGroup(ctx context.Context, streamKey, groupName string) error
GetStreamLength(ctx context.Context, streamKey string) (int64, error)
GetConsumerGroupLag(ctx context.Context, streamKey, groupName string) (int64, error)
GetPendingCount(ctx context.Context, streamKey, groupName string) (int64, error)
GetPendingForConsumer(ctx context.Context, streamKey, groupName, consumer string, count int64) ([]string, error)
GetPendingEntryDetails(ctx context.Context, streamKey, groupName string, minIdleTime int64, count int64) ([]queue.PendingEntry, error)
AckMessage(ctx context.Context, streamKey, groupName, messageID string) error
EnqueueFileWithAttempts(ctx context.Context, streamKey, runID, filepath string, attempts int) (string, error)
AddToDLQ(ctx context.Context, dlqKey, runID, filepath string, attempts int, reason string) error
AutoClaim(ctx context.Context, streamKey, groupName, consumerName string, minIdleTime int64, count int64) ([]queue.XMessage, error)
ClaimMessages(ctx context.Context, streamKey, groupName, consumerName string, minIdleTime int64, messageIDs ...string) ([]queue.XMessage, error)
ReadRange(ctx context.Context, streamKey, start, end string, count int64) ([]queue.XMessage, error)
DeleteMessages(ctx context.Context, streamKey string, messageIDs ...string) error
EnsureACLUser(ctx context.Context, username, password, namespace string) error
DeleteACLUser(ctx context.Context, username string) error
}
ValkeyClientInterface defines the interface for Valkey operations