Documentation
¶
Index ¶
Constants ¶
const ( // Annotation keys for pod queue metadata AnnotationMessageID = "queue.valkey.mid" AnnotationFile = "queue.file" AnnotationAttempts = "queue.attempts" // Annotation values AnnotationValueTrue = "true" // Condition types ConditionTypeProgressing = "Progressing" ConditionTypeSucceeded = "Succeeded" ConditionTypeDegraded = "Degraded" // Reconciliation intervals StatusUpdateInterval = 30 * time.Second // DefaultVideoInputPath is where the claimer stores downloaded artifacts when not overridden. DefaultVideoInputPath = "/ws/input.mp4" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type PipelineReconciler ¶
PipelineReconciler reconciles a Pipeline object
func (*PipelineReconciler) Reconcile ¶
Reconcile is part of the main kubernetes reconciliation loop which aims to move the current state of the cluster closer to the desired state. TODO(user): Modify the Reconcile function to compare the state specified by the Pipeline object against the actual cluster state, and then perform operations to make the cluster state reflect the state specified by the user.
For more details, check Reconcile and its Result here: - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.22.1/pkg/reconcile
func (*PipelineReconciler) SetupWithManager ¶
func (r *PipelineReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up the controller with the Manager.
type PipelineRunReconciler ¶
type PipelineRunReconciler struct {
client.Client
Scheme *runtime.Scheme
ValkeyClient ValkeyClientInterface
ValkeyAddr string
ValkeyPassword string
ClaimerImage string // Image for the claimer init container
}
PipelineRunReconciler reconciles a PipelineRun object
func (*PipelineRunReconciler) Reconcile ¶
func (r *PipelineRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)
Reconcile is part of the main kubernetes reconciliation loop which aims to move the current state of the cluster closer to the desired state.
The reconciler branches on Pipeline mode (Batch or Stream) and delegates to mode-specific reconciliation functions defined in: - pipelinerun_controller_batch.go: Batch mode reconciliation - pipelinerun_controller_streaming.go: Streaming mode reconciliation
func (*PipelineRunReconciler) SetupWithManager ¶
func (r *PipelineRunReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up the controller with the Manager.
type ValkeyClientInterface ¶
type ValkeyClientInterface interface {
CreateStreamAndGroup(ctx context.Context, streamKey, groupName string) error
GetStreamLength(ctx context.Context, streamKey string) (int64, error)
GetConsumerGroupLag(ctx context.Context, streamKey, groupName string) (int64, error)
GetPendingCount(ctx context.Context, streamKey, groupName string) (int64, error)
GetPendingForConsumer(ctx context.Context, streamKey, groupName, consumer string, count int64) ([]string, error)
AckMessage(ctx context.Context, streamKey, groupName, messageID string) error
EnqueueFileWithAttempts(ctx context.Context, streamKey, runID, filepath string, attempts int) (string, error)
AddToDLQ(ctx context.Context, dlqKey, runID, filepath string, attempts int, reason string) error
AutoClaim(ctx context.Context, streamKey, groupName, consumerName string, minIdleTime int64, count int64) ([]queue.XMessage, error)
ReadRange(ctx context.Context, streamKey, start, end string, count int64) ([]queue.XMessage, error)
DeleteMessages(ctx context.Context, streamKey string, messageIDs ...string) error
}
ValkeyClientInterface defines the interface for Valkey operations