stream

package
v1.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2026 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewBackfillRequestFilter

func NewBackfillRequestFilter(streamClass string) predicate.TypedPredicate[*v1.BackfillRequest]

func NewJobFilter

func NewJobFilter() predicate.TypedPredicate[*v1.Job]

func NewStreamControllerFactory

func NewStreamControllerFactory(client client.Client, jobBuilder JobBuilder, manager manager.Manager, eventRecorder record.EventRecorder) stream_class.UnmanagedControllerFactory

NewStreamControllerFactory creates a new instance of StreamControllerFactory

func NewStreamMetadataService

func NewStreamMetadataService(streamClass *v1.StreamClass, streamDefinition Definition) job.ConfiguratorProvider

func NewStreamReconciler

func NewStreamReconciler(client client.Client, gvk schema.GroupVersionKind, jobBuilder JobBuilder, streamClass *v1.StreamClass, eventRecorder record.EventRecorder) controllers.UnmanagedReconciler

NewStreamReconciler creates a new StreamReconciler instance.

Types

type BackfillRequestFilter

type BackfillRequestFilter struct {
	// contains filtered or unexported fields
}

func (*BackfillRequestFilter) Create

Create filters BackfillRequests that are not completed and match the specified stream class.

func (*BackfillRequestFilter) Delete

Delete filters BackfillRequests that are not completed and match the specified stream class.

func (*BackfillRequestFilter) Generic

Generic always returns false to ignore generic events.

func (*BackfillRequestFilter) Update

Update always returns false to ignore update events.

type Definition

type Definition interface {
	// GetPhase returns the current phase of the stream definition.
	GetPhase() Phase

	// Suspended returns true if the stream definition is suspended.
	Suspended() bool

	// CurrentConfiguration returns the hash sum of the current configuration (spec) of the stream definition.
	CurrentConfiguration(request *v1.BackfillRequest) (string, error)

	// LastAppliedConfiguration returns the hash sum of the last observed configuration (spec) of the stream definition.
	LastAppliedConfiguration() string

	// RecomputeConfiguration recomputes and updates the last observed configuration hash.
	// This should be called after any changes to the spec have been applied and the object saved to the API server.
	RecomputeConfiguration(request *v1.BackfillRequest) error

	// NamespacedName returns the namespaced name of the stream definition.
	NamespacedName() types.NamespacedName

	// ToUnstructured converts the stream definition to an unstructured object.
	ToUnstructured() *unstructured.Unstructured

	// SetPhase sets the status of the stream definition.
	SetPhase(status Phase) error

	// SetSuspended sets the suspended state of the stream definition.
	SetSuspended(suspended bool) error

	// StateString returns a string representation of the current state.
	// This is primarily used for logging and debugging purposes.
	StateString() string

	// ToOwnerReference converts the stream definition to an owner reference.
	ToOwnerReference() metav1.OwnerReference

	// ToConfiguratorProvider converts the stream definition to a JobConfiguratorProvider.
	ToConfiguratorProvider() job.ConfiguratorProvider

	// GetJobTemplate returns the job template reference based on the stream definition and backfill request.
	GetJobTemplate(request *v1.BackfillRequest) types.NamespacedName

	// SetConditions returns the current conditions of the stream definition.
	SetConditions(conditions []metav1.Condition) error

	// ComputeConditions computes the conditions for the stream definition based on the backfill request.
	ComputeConditions(bfr *v1.BackfillRequest) []metav1.Condition

	// GetReferenceForSecret returns a LocalObjectReference for the given secret name.
	GetReferenceForSecret(name string) (*corev1.LocalObjectReference, error)
}

type JobBuilder

type JobBuilder interface {

	// BuildJob constructs a Kubernetes Job of the specified template type using the provided configurator.
	BuildJob(ctx context.Context, templateName types.NamespacedName, configurator job.Configurator) (*batchv1.Job, error)
}

JobBuilder defines an interface for building Kubernetes Jobs based on a specified template type and a configurator that modifies the Job object.

type JobFilter

type JobFilter struct {
}

JobFilter is a predicate that allows job events to pass through to the Stream controller.

func (*JobFilter) Create

func (j *JobFilter) Create(_ event.TypedCreateEvent[*v1.Job]) bool

Create is called when a new object is created. It returns true to allow the event to be processed.

func (*JobFilter) Delete

func (j *JobFilter) Delete(_ event.TypedDeleteEvent[*v1.Job]) bool

Delete is called when an object is deleted. It returns true to allow the event to be processed.

func (*JobFilter) Generic

func (j *JobFilter) Generic(_ event.TypedGenericEvent[*v1.Job]) bool

Generic is called for generic events. We're filtering out generic events by returning false.

func (*JobFilter) Update

func (j *JobFilter) Update(e event.TypedUpdateEvent[*v1.Job]) bool

Update is called when an object is updated.

type Phase

type Phase string
const (
	New         Phase = ""
	Pending     Phase = "Pending"
	Running     Phase = "Running"
	Backfilling Phase = "Backfilling"
	Suspended   Phase = "Suspended"
	Failed      Phase = "Failed"
)

type StreamingJob

type StreamingJob v1.Job

func NewStreamingJobFromV1Job

func NewStreamingJobFromV1Job(job *v1.Job) StreamingJob

func (StreamingJob) CurrentConfiguration

func (j StreamingJob) CurrentConfiguration() (string, error)

func (StreamingJob) IsBackfill

func (j StreamingJob) IsBackfill() bool

func (StreamingJob) IsCompleted

func (j StreamingJob) IsCompleted() bool

func (StreamingJob) IsFailed

func (j StreamingJob) IsFailed() bool

func (StreamingJob) ToV1Job

func (j StreamingJob) ToV1Job() *v1.Job

type TypedSecondaryWatcher

type TypedSecondaryWatcher[object client.Object] struct {
	// contains filtered or unexported fields
}

TypedSecondaryWatcher watches secondary resources and enqueues reconcile requests for the primary resource.

func (*TypedSecondaryWatcher[object]) SetupWithController

func (w *TypedSecondaryWatcher[object]) SetupWithController(controller controller.Controller, obj object) error

SetupWithController sets up the watcher with the given controller

type TypedSecondaryWatcherBuilder

type TypedSecondaryWatcherBuilder[object client.Object] struct {
	// contains filtered or unexported fields
}

TypedSecondaryWatcherBuilder builds a TypedSecondaryWatcher instance

func NewTypedSecondaryWatcherBuilder

func NewTypedSecondaryWatcherBuilder[object client.Object]() *TypedSecondaryWatcherBuilder[object]

NewTypedSecondaryWatcherBuilder creates a new builder for TypedSecondaryWatcher

func (*TypedSecondaryWatcherBuilder[object]) Build

func (b *TypedSecondaryWatcherBuilder[object]) Build() *TypedSecondaryWatcher[object]

Build creates the TypedSecondaryWatcher instance

func (*TypedSecondaryWatcherBuilder[object]) WithCache

func (b *TypedSecondaryWatcherBuilder[object]) WithCache(cache cache.Cache) *TypedSecondaryWatcherBuilder[object]

WithCache sets the cache for the watcher

func (*TypedSecondaryWatcherBuilder[object]) WithFilter

func (b *TypedSecondaryWatcherBuilder[object]) WithFilter(filter predicate.TypedPredicate[object]) *TypedSecondaryWatcherBuilder[object]

WithFilter sets the predicate filter for the watcher

func (*TypedSecondaryWatcherBuilder[object]) WithHandler

func (b *TypedSecondaryWatcherBuilder[object]) WithHandler(handler handler.TypedEventHandler[object, reconcile.Request]) *TypedSecondaryWatcherBuilder[object]

WithHandler sets the event handler for the watcher

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL