stream

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2026 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewStreamMetadataService

func NewStreamMetadataService(streamClass *v1.StreamClass, streamDefinition job.SecretReferenceProvider) job.ConfiguratorProvider

func NewStreamReconciler

func NewStreamReconciler(client client.Client, gvk schema.GroupVersionKind, jobBuilder JobBuilder, streamClass *v1.StreamClass, eventRecorder record.EventRecorder, definitionParser DefinitionParser, managers map[Backend]BackendResourceManager, backfillResourceManager BackfillBackendResourceManager) controllers.UnmanagedReconciler

NewStreamReconciler creates a new StreamReconciler instance.

Types

type Backend added in v1.1.0

type Backend string
const (
	BatchJob Backend = "BatchJobBackend"
	CronJob  Backend = "CronJob"
)

type BackendResource added in v1.1.0

type BackendResource interface {
	// Name returns the name of the backend resource.
	Name() string

	// UID returns the Kubernetes UID of the backend resource.
	UID() types.UID

	// CurrentConfiguration returns the hash sum of the current configuration (spec) of the backend resource.
	CurrentConfiguration() (string, error)

	// IsCompleted return true if the workload represented by the backend resource has completed successfully.
	// e.g the job has completed with a Succeeded condition, or the cronjob has a last schedule time and no active jobs.
	IsCompleted() bool

	// IsFailed returns true if the workload represented by the backend resource has failed.
	IsFailed() bool

	// ToObject converts the backend resource to a client.Object for use with the Kubernetes API.
	ToObject() client.Object

	// IsBackfill returns true if the backend resource is associated with a backfill request.
	IsBackfill() bool
}

BackendResource defines an interface for resources that represent the backend of a Stream. This could be a Kubernetes Job or CronJob, see implementation in the backend folder.

type BackendResourceManager added in v1.1.0

type BackendResourceManager interface {

	// SetupWithController sets up the necessary watches and handlers for the backend resources with the provided controller.
	SetupWithController(cache cache.Cache, scheme *runtime.Scheme, mapper meta.RESTMapper, controller controller.Controller, primaryGvk schema.GroupVersionKind) error

	// Get retrieves the current state of the backend resource associated with the given stream definition.
	Get(ctx context.Context, key client.ObjectKey) (BackendResource, error)

	// Remove deletes the backend resource associated with the given stream definition and updates the stream phase accordingly.
	Remove(ctx context.Context, definition Definition, nextPhase Phase, eventFunc controllers.EventFunc) (reconcile.Result, error)

	// Apply creates or updates the backend resource based on the provided stream definition and backfill request, and updates the stream phase accordingly.
	Apply(ctx context.Context, definition Definition, backfillRequest *v1.BackfillRequest, nextPhase Phase, streamClass *v1.StreamClass, eventFunc controllers.EventFunc) (reconcile.Result, error)

	// NoOp Does not perform any changes, but updates the stream status.
	NoOp(ctx context.Context, definition Definition, backfillRequest *v1.BackfillRequest, nextPhase Phase, eventFunc controllers.EventFunc) (reconcile.Result, error)
}

BackendResourceManager defines the interface for managing backend resources associated with a stream definition

type BackfillBackendResourceManager added in v1.1.0

type BackfillBackendResourceManager interface {
	BackendResourceManager

	// GetBackfillRequest returns the current backfill request associated with the given stream definition, if any.
	GetBackfillRequest(ctx context.Context, definition Definition) (*v1.BackfillRequest, error)
}

type DefaultStatusManager added in v1.1.0

type DefaultStatusManager struct {
	// contains filtered or unexported fields
}

func NewDefaultStatusManager added in v1.1.0

func NewDefaultStatusManager(client client.Client, gvk schema.GroupVersionKind, streamClass *v1.StreamClass, definitionParser DefinitionParser) *DefaultStatusManager

func (*DefaultStatusManager) UpdateStreamPhase added in v1.1.0

func (s *DefaultStatusManager) UpdateStreamPhase(ctx context.Context, definition Definition, backfillRequest *v1.BackfillRequest, next Phase, eventFunc controllers.EventFunc) (reconcile.Result, error)

type Definition

type Definition interface {
	job.ConfiguratorProvider

	// GetPhase returns the current phase of the stream definition.
	GetPhase() Phase

	// Suspended returns true if the stream definition is suspended.
	Suspended() bool

	// CurrentConfiguration returns the hash sum of the current configuration (spec) of the stream definition.
	CurrentConfiguration(request *v1.BackfillRequest) (string, error)

	// LastAppliedConfiguration returns the hash sum of the last observed configuration (spec) of the stream definition.
	LastAppliedConfiguration() string

	// RecomputeConfiguration recomputes and updates the last observed configuration hash.
	// This should be called after any changes to the spec have been applied and the object saved to the API server.
	RecomputeConfiguration(request *v1.BackfillRequest) error

	// NamespacedName returns the namespaced name of the stream definition.
	NamespacedName() types.NamespacedName

	// ToUnstructured converts the stream definition to an unstructured object.
	ToUnstructured() *unstructured.Unstructured

	// SetPhase sets the status of the stream definition.
	SetPhase(status Phase) error

	// SetSuspended sets the suspended state of the stream definition.
	SetSuspended(suspended bool) error

	// StateString returns a string representation of the current state.
	// This is primarily used for logging and debugging purposes.
	StateString() string

	// ToOwnerReference converts the stream definition to an owner reference.
	ToOwnerReference() metav1.OwnerReference

	// GetJobTemplate returns the job template reference based on the stream definition and backfill request.
	GetJobTemplate(request *v1.BackfillRequest) types.NamespacedName

	// SetConditions returns the current conditions of the stream definition.
	SetConditions(conditions []metav1.Condition) error

	// ComputeConditions computes the conditions for the stream definition based on the backfill request.
	ComputeConditions(bfr *v1.BackfillRequest) []metav1.Condition

	// GetReferenceForSecret returns a LocalObjectReference for the given secret name.
	GetReferenceForSecret(name string) (*corev1.LocalObjectReference, error)

	// Validate validates the stream definition and returns an error if any required fields are missing or invalid.
	Validate() error

	// GetBackend returns the streaming backend type (e.g., BatchJob, CronJob) defined in the stream definition.
	GetBackend() Backend

	// GetPreviousBackend returns the streaming backend type (e.g., BatchJob, CronJob) associated previously
	// with the stream definition, which can be used to determine if the backend has changed during an update.
	GetPreviousBackend(ctx context.Context, client client.Client) (*Backend, error)

	// GetSchedule returns the schedule string for a CronJob backend, if applicable. For non-CronJob backends,
	// it can return an empty string or an error indicating that the schedule is not applicable.
	GetSchedule() (string, error)
}

func GetStreamForClass added in v1.0.8

func GetStreamForClass(ctx context.Context, client client.Client, sc *v1.StreamClass, name types.NamespacedName, definitionParser DefinitionParser) (Definition, error)

GetStreamForClass retrieves the stream definition for a given stream class and namespaced name.

type DefinitionParser added in v1.1.0

type DefinitionParser func(*unstructured.Unstructured) (Definition, error)

DefinitionParser is a function type that takes an unstructured object and returns a validated Definition or an error if the parsing fails. This allows for flexible parsing logic that can be customized based on the specific structure of the unstructured object.

type JobBuilder

type JobBuilder interface {

	// BuildJob constructs a Kubernetes Job of the specified template type using the provided configurator.
	BuildJob(ctx context.Context, templateName types.NamespacedName, configurator job.Configurator) (*batchv1.Job, error)
}

JobBuilder defines an interface for building Kubernetes Jobs based on a specified template type and a configurator that modifies the Job object.

type Phase

type Phase string
const (
	New         Phase = ""
	Pending     Phase = "Pending"
	Running     Phase = "Running"
	Backfilling Phase = "Backfilling"
	Suspended   Phase = "Suspended"
	Failed      Phase = "Failed"
	Scheduled   Phase = "Scheduled"
)

type StatusManager added in v1.1.0

type StatusManager interface {

	// UpdateStreamPhase updates the phase of the stream definition's status and emits an event if the phase has changed.
	UpdateStreamPhase(ctx context.Context, definition Definition, backfillRequest *v1.BackfillRequest, next Phase, eventFunc controllers.EventFunc) (reconcile.Result, error)
}

StatusManager defines an interface for managing the status sub resource of a stream definition.

Directories

Path Synopsis
job

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL