controller

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2025 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Annotation keys for pod queue metadata
	AnnotationMessageID = "queue.valkey.mid"
	AnnotationFile      = "queue.file"
	AnnotationAttempts  = "queue.attempts"

	// Annotation values
	AnnotationValueTrue = "true"

	// Condition types
	ConditionTypeProgressing = "Progressing"
	ConditionTypeSucceeded   = "Succeeded"
	ConditionTypeDegraded    = "Degraded"

	// Reconciliation intervals
	StatusUpdateInterval = 30 * time.Second

	// DefaultVideoInputPath is where the claimer stores downloaded artifacts when not overridden.
	DefaultVideoInputPath = "/ws/input.mp4"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type PipelineReconciler

type PipelineReconciler struct {
	client.Client
	Scheme *runtime.Scheme
}

PipelineReconciler reconciles a Pipeline object

func (*PipelineReconciler) Reconcile

func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

Reconcile is part of the main kubernetes reconciliation loop which aims to move the current state of the cluster closer to the desired state. TODO(user): Modify the Reconcile function to compare the state specified by the Pipeline object against the actual cluster state, and then perform operations to make the cluster state reflect the state specified by the user.

For more details, check Reconcile and its Result here: - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.22.1/pkg/reconcile

func (*PipelineReconciler) SetupWithManager

func (r *PipelineReconciler) SetupWithManager(mgr ctrl.Manager) error

SetupWithManager sets up the controller with the Manager.

type PipelineRunReconciler

type PipelineRunReconciler struct {
	client.Client
	Scheme         *runtime.Scheme
	ValkeyClient   ValkeyClientInterface
	ValkeyAddr     string
	ValkeyPassword string
	ClaimerImage   string // Image for the claimer init container
}

PipelineRunReconciler reconciles a PipelineRun object

func (*PipelineRunReconciler) Reconcile

func (r *PipelineRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

Reconcile is part of the main kubernetes reconciliation loop which aims to move the current state of the cluster closer to the desired state.

The reconciler branches on Pipeline mode (Batch or Stream) and delegates to mode-specific reconciliation functions defined in: - pipelinerun_controller_batch.go: Batch mode reconciliation - pipelinerun_controller_streaming.go: Streaming mode reconciliation

func (*PipelineRunReconciler) SetupWithManager

func (r *PipelineRunReconciler) SetupWithManager(mgr ctrl.Manager) error

SetupWithManager sets up the controller with the Manager.

type ValkeyClientInterface

type ValkeyClientInterface interface {
	CreateStreamAndGroup(ctx context.Context, streamKey, groupName string) error
	GetStreamLength(ctx context.Context, streamKey string) (int64, error)
	GetConsumerGroupLag(ctx context.Context, streamKey, groupName string) (int64, error)
	GetPendingCount(ctx context.Context, streamKey, groupName string) (int64, error)
	GetPendingForConsumer(ctx context.Context, streamKey, groupName, consumer string, count int64) ([]string, error)
	AckMessage(ctx context.Context, streamKey, groupName, messageID string) error
	EnqueueFileWithAttempts(ctx context.Context, streamKey, runID, filepath string, attempts int) (string, error)
	AddToDLQ(ctx context.Context, dlqKey, runID, filepath string, attempts int, reason string) error
	AutoClaim(ctx context.Context, streamKey, groupName, consumerName string, minIdleTime int64, count int64) ([]queue.XMessage, error)
	ReadRange(ctx context.Context, streamKey, start, end string, count int64) ([]queue.XMessage, error)
	DeleteMessages(ctx context.Context, streamKey string, messageIDs ...string) error
}

ValkeyClientInterface defines the interface for Valkey operations

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL