controller

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2026 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Annotation keys for pod queue metadata
	AnnotationMessageID = "queue.valkey.mid"
	AnnotationFile      = "queue.file"
	AnnotationAttempts  = "queue.attempts"

	// Annotation values
	AnnotationValueTrue = "true"

	// Condition types
	ConditionTypeProgressing = "Progressing"
	ConditionTypeSucceeded   = "Succeeded"
	ConditionTypeDegraded    = "Degraded"

	// Reconciliation intervals
	StatusUpdateInterval = 30 * time.Second

	// DefaultVideoInputPath is where the claimer stores downloaded artifacts when not overridden.
	DefaultVideoInputPath = "/ws/input.mp4"

	// DefaultValkeyNSSecretName is the default name for per-namespace Valkey credentials secrets.
	DefaultValkeyNSSecretName = "valkey-ns-credentials"

	// FinalizerValkeyCredentials ensures Valkey ACL users are cleaned up on deletion.
	FinalizerValkeyCredentials = "filter.plainsight.ai/valkey-credentials"

	// FinalizerStreamingCleanup ensures streaming resources (Deployment, Services) are cleaned up on deletion.
	FinalizerStreamingCleanup = "filter.plainsight.ai/streaming-cleanup"

	// TraceparentAnnotation is the PipelineInstance annotation key carrying the
	// W3C `traceparent` header that the upstream span context wrote during the
	// API → controller → filter handoff. The controller copies it into the
	// TRACEPARENT env var on each filter container so openfilter's OTel SDK
	// continues the trace. Owned cross-repo with plainsight-deployment-agent
	// (PLAT-851), which writes the same key.
	TraceparentAnnotation = "traces.opentelemetry.io/traceparent"

	// TracestateAnnotation is the PipelineInstance annotation key carrying the
	// W3C `tracestate` header. Propagated as TRACESTATE env var alongside
	// TRACEPARENT so vendor-specific trace context survives the controller hop.
	TracestateAnnotation = "traces.opentelemetry.io/tracestate"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type PipelineInstanceReconciler added in v0.2.0

type PipelineInstanceReconciler struct {
	client.Client
	Scheme                *runtime.Scheme
	ValkeyClient          ValkeyClientInterface
	ValkeyAddr            string
	ValkeyNSSecretName    string            // Name of the per-namespace Valkey credentials secret (default: valkey-ns-credentials)
	ClaimerImage          string            // Image for the claimer init container
	GPUNodeSelectorLabels map[string]string // Node selector labels applied to pods that request nvidia.com/gpu resources; nil disables the feature
	GPULibraryPath        string            // Value injected as LD_LIBRARY_PATH for GPU containers; empty string disables injection
	GPUBinPath            string            // Value injected as OPENFILTER_APPEND_PATH for GPU containers; empty string disables injection

	// TelemetryExporterType and TelemetryExporterOTLPEndpoint are injected into
	// filter containers as TELEMETRY_EXPORTER_TYPE and TELEMETRY_EXPORTER_OTLP_ENDPOINT
	// so openfilter's OTel client ships spans and metrics to the configured collector.
	// Both empty disables injection and openfilter falls back to its silent exporter.
	TelemetryExporterType         string
	TelemetryExporterOTLPEndpoint string
}

PipelineInstanceReconciler reconciles a PipelineInstance object

func (*PipelineInstanceReconciler) Reconcile added in v0.2.0

Reconcile is part of the main kubernetes reconciliation loop which aims to move the current state of the cluster closer to the desired state.

The reconciler branches on Pipeline mode (Batch or Stream) and delegates to mode-specific reconciliation functions defined in: - pipelineinstance_controller_batch.go: Batch mode reconciliation - pipelineinstance_controller_streaming.go: Streaming mode reconciliation

func (*PipelineInstanceReconciler) SetupWithManager added in v0.2.0

func (r *PipelineInstanceReconciler) SetupWithManager(mgr ctrl.Manager) error

SetupWithManager sets up the controller with the Manager.

type PipelineReconciler

type PipelineReconciler struct {
	client.Client
	Scheme *runtime.Scheme
}

PipelineReconciler reconciles a Pipeline object

func (*PipelineReconciler) Reconcile

func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

Reconcile is part of the main kubernetes reconciliation loop which aims to move the current state of the cluster closer to the desired state. TODO(user): Modify the Reconcile function to compare the state specified by the Pipeline object against the actual cluster state, and then perform operations to make the cluster state reflect the state specified by the user.

For more details, check Reconcile and its Result here: - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.22.1/pkg/reconcile

func (*PipelineReconciler) SetupWithManager

func (r *PipelineReconciler) SetupWithManager(mgr ctrl.Manager) error

SetupWithManager sets up the controller with the Manager.

type ValkeyClientInterface

type ValkeyClientInterface interface {
	CreateStreamAndGroup(ctx context.Context, streamKey, groupName string) error
	GetStreamLength(ctx context.Context, streamKey string) (int64, error)
	GetConsumerGroupLag(ctx context.Context, streamKey, groupName string) (int64, error)
	GetPendingCount(ctx context.Context, streamKey, groupName string) (int64, error)
	GetPendingForConsumer(ctx context.Context, streamKey, groupName, consumer string, count int64) ([]string, error)
	GetPendingEntryDetails(ctx context.Context, streamKey, groupName string, minIdleTime int64, count int64) ([]queue.PendingEntry, error)
	AckMessage(ctx context.Context, streamKey, groupName, messageID string) error
	EnqueueFileWithAttempts(ctx context.Context, streamKey, runID, filepath string, attempts int) (string, error)
	AddToDLQ(ctx context.Context, dlqKey, runID, filepath string, attempts int, reason string) error
	AutoClaim(ctx context.Context, streamKey, groupName, consumerName string, minIdleTime int64, count int64) ([]queue.XMessage, error)
	ClaimMessages(ctx context.Context, streamKey, groupName, consumerName string, minIdleTime int64, messageIDs ...string) ([]queue.XMessage, error)
	ReadRange(ctx context.Context, streamKey, start, end string, count int64) ([]queue.XMessage, error)
	DeleteMessages(ctx context.Context, streamKey string, messageIDs ...string) error
	EnsureACLUser(ctx context.Context, username, password, namespace string) error
	DeleteACLUser(ctx context.Context, username string) error
}

ValkeyClientInterface defines the interface for Valkey operations

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL