Documentation
¶
Index ¶
- func NewBackfillRequestFilter(streamClass string) predicate.TypedPredicate[*v1.BackfillRequest]
- func NewJobFilter() predicate.TypedPredicate[*v1.Job]
- func NewStreamControllerFactory(client client.Client, jobBuilder JobBuilder, manager manager.Manager, ...) stream_class.UnmanagedControllerFactory
- func NewStreamMetadataService(streamClass *v1.StreamClass, streamDefinition Definition) job.ConfiguratorProvider
- func NewStreamReconciler(client client.Client, gvk schema.GroupVersionKind, jobBuilder JobBuilder, ...) controllers.UnmanagedReconciler
- type BackfillRequestFilter
- func (j *BackfillRequestFilter) Create(e event.TypedCreateEvent[*v1.BackfillRequest]) bool
- func (j *BackfillRequestFilter) Delete(e event.TypedDeleteEvent[*v1.BackfillRequest]) bool
- func (j *BackfillRequestFilter) Generic(_ event.TypedGenericEvent[*v1.BackfillRequest]) bool
- func (j *BackfillRequestFilter) Update(_ event.TypedUpdateEvent[*v1.BackfillRequest]) bool
- type Definition
- type JobBuilder
- type JobFilter
- type Phase
- type StreamingJob
- type TypedSecondaryWatcher
- type TypedSecondaryWatcherBuilder
- func (b *TypedSecondaryWatcherBuilder[object]) Build() *TypedSecondaryWatcher[object]
- func (b *TypedSecondaryWatcherBuilder[object]) WithCache(cache cache.Cache) *TypedSecondaryWatcherBuilder[object]
- func (b *TypedSecondaryWatcherBuilder[object]) WithFilter(filter predicate.TypedPredicate[object]) *TypedSecondaryWatcherBuilder[object]
- func (b *TypedSecondaryWatcherBuilder[object]) WithHandler(handler handler.TypedEventHandler[object, reconcile.Request]) *TypedSecondaryWatcherBuilder[object]
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewBackfillRequestFilter ¶
func NewBackfillRequestFilter(streamClass string) predicate.TypedPredicate[*v1.BackfillRequest]
func NewJobFilter ¶
func NewJobFilter() predicate.TypedPredicate[*v1.Job]
func NewStreamControllerFactory ¶
func NewStreamControllerFactory(client client.Client, jobBuilder JobBuilder, manager manager.Manager, eventRecorder record.EventRecorder) stream_class.UnmanagedControllerFactory
NewStreamControllerFactory creates a new instance of StreamControllerFactory
func NewStreamMetadataService ¶
func NewStreamMetadataService(streamClass *v1.StreamClass, streamDefinition Definition) job.ConfiguratorProvider
func NewStreamReconciler ¶
func NewStreamReconciler(client client.Client, gvk schema.GroupVersionKind, jobBuilder JobBuilder, streamClass *v1.StreamClass, eventRecorder record.EventRecorder) controllers.UnmanagedReconciler
NewStreamReconciler creates a new StreamReconciler instance.
Types ¶
type BackfillRequestFilter ¶
type BackfillRequestFilter struct {
// contains filtered or unexported fields
}
func (*BackfillRequestFilter) Create ¶
func (j *BackfillRequestFilter) Create(e event.TypedCreateEvent[*v1.BackfillRequest]) bool
Create filters BackfillRequests that are not completed and match the specified stream class.
func (*BackfillRequestFilter) Delete ¶
func (j *BackfillRequestFilter) Delete(e event.TypedDeleteEvent[*v1.BackfillRequest]) bool
Delete filters BackfillRequests that are not completed and match the specified stream class.
func (*BackfillRequestFilter) Generic ¶
func (j *BackfillRequestFilter) Generic(_ event.TypedGenericEvent[*v1.BackfillRequest]) bool
Generic always returns false to ignore generic events.
func (*BackfillRequestFilter) Update ¶
func (j *BackfillRequestFilter) Update(_ event.TypedUpdateEvent[*v1.BackfillRequest]) bool
Update always returns false to ignore update events.
type Definition ¶
type Definition interface {
// GetPhase returns the current phase of the stream definition.
GetPhase() Phase
// Suspended returns true if the stream definition is suspended.
Suspended() bool
// CurrentConfiguration returns the hash sum of the current configuration (spec) of the stream definition.
CurrentConfiguration(request *v1.BackfillRequest) (string, error)
// LastAppliedConfiguration returns the hash sum of the last observed configuration (spec) of the stream definition.
LastAppliedConfiguration() string
// RecomputeConfiguration recomputes and updates the last observed configuration hash.
// This should be called after any changes to the spec have been applied and the object saved to the API server.
RecomputeConfiguration(request *v1.BackfillRequest) error
// NamespacedName returns the namespaced name of the stream definition.
NamespacedName() types.NamespacedName
// ToUnstructured converts the stream definition to an unstructured object.
ToUnstructured() *unstructured.Unstructured
// SetPhase sets the status of the stream definition.
SetPhase(status Phase) error
// SetSuspended sets the suspended state of the stream definition.
SetSuspended(suspended bool) error
// StateString returns a string representation of the current state.
// This is primarily used for logging and debugging purposes.
StateString() string
// ToOwnerReference converts the stream definition to an owner reference.
ToOwnerReference() metav1.OwnerReference
// ToConfiguratorProvider converts the stream definition to a JobConfiguratorProvider.
ToConfiguratorProvider() job.ConfiguratorProvider
// GetJobTemplate returns the job template reference based on the stream definition and backfill request.
GetJobTemplate(request *v1.BackfillRequest) types.NamespacedName
// SetConditions returns the current conditions of the stream definition.
SetConditions(conditions []metav1.Condition) error
// ComputeConditions computes the conditions for the stream definition based on the backfill request.
ComputeConditions(bfr *v1.BackfillRequest) []metav1.Condition
// GetReferenceForSecret returns a LocalObjectReference for the given secret name.
GetReferenceForSecret(name string) (*corev1.LocalObjectReference, error)
}
func FromUnstructured ¶ added in v1.0.9
func FromUnstructured(obj *unstructured.Unstructured) (Definition, error)
func GetStreamForClass ¶ added in v1.0.8
func GetStreamForClass(ctx context.Context, client client.Client, sc *v1.StreamClass, name types.NamespacedName) (Definition, error)
GetStreamForClass retrieves the stream definition for a given stream class and namespaced name.
type JobBuilder ¶
type JobBuilder interface {
// BuildJob constructs a Kubernetes Job of the specified template type using the provided configurator.
BuildJob(ctx context.Context, templateName types.NamespacedName, configurator job.Configurator) (*batchv1.Job, error)
}
JobBuilder defines an interface for building Kubernetes Jobs based on a specified template type and a configurator that modifies the Job object.
type JobFilter ¶
type JobFilter struct {
}
JobFilter is a predicate that allows job events to pass through to the Stream controller.
func (*JobFilter) Create ¶
Create is called when a new object is created. It returns true to allow the event to be processed.
func (*JobFilter) Delete ¶
Delete is called when an object is deleted. It returns true to allow the event to be processed.
type StreamingJob ¶
func NewStreamingJobFromV1Job ¶
func NewStreamingJobFromV1Job(job *v1.Job) StreamingJob
func (StreamingJob) CurrentConfiguration ¶
func (j StreamingJob) CurrentConfiguration() (string, error)
func (StreamingJob) IsBackfill ¶
func (j StreamingJob) IsBackfill() bool
func (StreamingJob) IsCompleted ¶
func (j StreamingJob) IsCompleted() bool
func (StreamingJob) IsFailed ¶
func (j StreamingJob) IsFailed() bool
func (StreamingJob) ToV1Job ¶
func (j StreamingJob) ToV1Job() *v1.Job
type TypedSecondaryWatcher ¶
type TypedSecondaryWatcher[object client.Object] struct { // contains filtered or unexported fields }
TypedSecondaryWatcher watches secondary resources and enqueues reconcile requests for the primary resource.
func (*TypedSecondaryWatcher[object]) SetupWithController ¶
func (w *TypedSecondaryWatcher[object]) SetupWithController(controller controller.Controller, obj object) error
SetupWithController sets up the watcher with the given controller
type TypedSecondaryWatcherBuilder ¶
type TypedSecondaryWatcherBuilder[object client.Object] struct { // contains filtered or unexported fields }
TypedSecondaryWatcherBuilder builds a TypedSecondaryWatcher instance
func NewTypedSecondaryWatcherBuilder ¶
func NewTypedSecondaryWatcherBuilder[object client.Object]() *TypedSecondaryWatcherBuilder[object]
NewTypedSecondaryWatcherBuilder creates a new builder for TypedSecondaryWatcher
func (*TypedSecondaryWatcherBuilder[object]) Build ¶
func (b *TypedSecondaryWatcherBuilder[object]) Build() *TypedSecondaryWatcher[object]
Build creates the TypedSecondaryWatcher instance
func (*TypedSecondaryWatcherBuilder[object]) WithCache ¶
func (b *TypedSecondaryWatcherBuilder[object]) WithCache(cache cache.Cache) *TypedSecondaryWatcherBuilder[object]
WithCache sets the cache for the watcher
func (*TypedSecondaryWatcherBuilder[object]) WithFilter ¶
func (b *TypedSecondaryWatcherBuilder[object]) WithFilter(filter predicate.TypedPredicate[object]) *TypedSecondaryWatcherBuilder[object]
WithFilter sets the predicate filter for the watcher
func (*TypedSecondaryWatcherBuilder[object]) WithHandler ¶
func (b *TypedSecondaryWatcherBuilder[object]) WithHandler(handler handler.TypedEventHandler[object, reconcile.Request]) *TypedSecondaryWatcherBuilder[object]
WithHandler sets the event handler for the watcher