Documentation
¶
Index ¶
- func NewStreamMetadataService(streamClass *v1.StreamClass, streamDefinition job.SecretReferenceProvider) job.ConfiguratorProvider
- func NewStreamReconciler(client client.Client, gvk schema.GroupVersionKind, jobBuilder JobBuilder, ...) controllers.UnmanagedReconciler
- type Backend
- type BackendResource
- type BackendResourceManager
- type BackfillBackendResourceManager
- type DefaultStatusManager
- type Definition
- type DefinitionParser
- type JobBuilder
- type Phase
- type StatusManager
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewStreamMetadataService ¶
func NewStreamMetadataService(streamClass *v1.StreamClass, streamDefinition job.SecretReferenceProvider) job.ConfiguratorProvider
func NewStreamReconciler ¶
func NewStreamReconciler(client client.Client, gvk schema.GroupVersionKind, jobBuilder JobBuilder, streamClass *v1.StreamClass, eventRecorder record.EventRecorder, definitionParser DefinitionParser, managers map[Backend]BackendResourceManager, backfillResourceManager BackfillBackendResourceManager) controllers.UnmanagedReconciler
NewStreamReconciler creates a new StreamReconciler instance.
Types ¶
type BackendResource ¶ added in v1.1.0
type BackendResource interface {
// Name returns the name of the backend resource.
Name() string
// UID returns the Kubernetes UID of the backend resource.
UID() types.UID
// CurrentConfiguration returns the hash sum of the current configuration (spec) of the backend resource.
CurrentConfiguration() (string, error)
// IsCompleted return true if the workload represented by the backend resource has completed successfully.
// e.g the job has completed with a Succeeded condition, or the cronjob has a last schedule time and no active jobs.
IsCompleted() bool
// IsFailed returns true if the workload represented by the backend resource has failed.
IsFailed() bool
// ToObject converts the backend resource to a client.Object for use with the Kubernetes API.
ToObject() client.Object
// IsBackfill returns true if the backend resource is associated with a backfill request.
IsBackfill() bool
}
BackendResource defines an interface for resources that represent the backend of a Stream. This could be a Kubernetes Job or CronJob, see implementation in the backend folder.
type BackendResourceManager ¶ added in v1.1.0
type BackendResourceManager interface {
// SetupWithController sets up the necessary watches and handlers for the backend resources with the provided controller.
SetupWithController(cache cache.Cache, scheme *runtime.Scheme, mapper meta.RESTMapper, controller controller.Controller, primaryGvk schema.GroupVersionKind) error
// Get retrieves the current state of the backend resource associated with the given stream definition.
Get(ctx context.Context, key client.ObjectKey) (BackendResource, error)
// Remove deletes the backend resource associated with the given stream definition and updates the stream phase accordingly.
Remove(ctx context.Context, definition Definition, nextPhase Phase, eventFunc controllers.EventFunc) (reconcile.Result, error)
// Apply creates or updates the backend resource based on the provided stream definition and backfill request, and updates the stream phase accordingly.
Apply(ctx context.Context, definition Definition, backfillRequest *v1.BackfillRequest, nextPhase Phase, streamClass *v1.StreamClass, eventFunc controllers.EventFunc) (reconcile.Result, error)
// NoOp Does not perform any changes, but updates the stream status.
NoOp(ctx context.Context, definition Definition, backfillRequest *v1.BackfillRequest, nextPhase Phase, eventFunc controllers.EventFunc) (reconcile.Result, error)
}
BackendResourceManager defines the interface for managing backend resources associated with a stream definition
type BackfillBackendResourceManager ¶ added in v1.1.0
type BackfillBackendResourceManager interface {
BackendResourceManager
// GetBackfillRequest returns the current backfill request associated with the given stream definition, if any.
GetBackfillRequest(ctx context.Context, definition Definition) (*v1.BackfillRequest, error)
}
type DefaultStatusManager ¶ added in v1.1.0
type DefaultStatusManager struct {
// contains filtered or unexported fields
}
func NewDefaultStatusManager ¶ added in v1.1.0
func NewDefaultStatusManager(client client.Client, gvk schema.GroupVersionKind, streamClass *v1.StreamClass, definitionParser DefinitionParser) *DefaultStatusManager
func (*DefaultStatusManager) UpdateStreamPhase ¶ added in v1.1.0
func (s *DefaultStatusManager) UpdateStreamPhase(ctx context.Context, definition Definition, backfillRequest *v1.BackfillRequest, next Phase, eventFunc controllers.EventFunc) (reconcile.Result, error)
type Definition ¶
type Definition interface {
job.ConfiguratorProvider
// GetPhase returns the current phase of the stream definition.
GetPhase() Phase
// Suspended returns true if the stream definition is suspended.
Suspended() bool
// CurrentConfiguration returns the hash sum of the current configuration (spec) of the stream definition.
CurrentConfiguration(request *v1.BackfillRequest) (string, error)
// LastAppliedConfiguration returns the hash sum of the last observed configuration (spec) of the stream definition.
LastAppliedConfiguration() string
// RecomputeConfiguration recomputes and updates the last observed configuration hash.
// This should be called after any changes to the spec have been applied and the object saved to the API server.
RecomputeConfiguration(request *v1.BackfillRequest) error
// NamespacedName returns the namespaced name of the stream definition.
NamespacedName() types.NamespacedName
// ToUnstructured converts the stream definition to an unstructured object.
ToUnstructured() *unstructured.Unstructured
// SetPhase sets the status of the stream definition.
SetPhase(status Phase) error
// SetSuspended sets the suspended state of the stream definition.
SetSuspended(suspended bool) error
// StateString returns a string representation of the current state.
// This is primarily used for logging and debugging purposes.
StateString() string
// ToOwnerReference converts the stream definition to an owner reference.
ToOwnerReference() metav1.OwnerReference
// GetJobTemplate returns the job template reference based on the stream definition and backfill request.
GetJobTemplate(request *v1.BackfillRequest) types.NamespacedName
// SetConditions returns the current conditions of the stream definition.
SetConditions(conditions []metav1.Condition) error
// ComputeConditions computes the conditions for the stream definition based on the backfill request.
ComputeConditions(bfr *v1.BackfillRequest) []metav1.Condition
// GetReferenceForSecret returns a LocalObjectReference for the given secret name.
GetReferenceForSecret(name string) (*corev1.LocalObjectReference, error)
// Validate validates the stream definition and returns an error if any required fields are missing or invalid.
Validate() error
// GetBackend returns the streaming backend type (e.g., BatchJob, CronJob) defined in the stream definition.
GetBackend() Backend
// GetPreviousBackend returns the streaming backend type (e.g., BatchJob, CronJob) associated previously
// with the stream definition, which can be used to determine if the backend has changed during an update.
GetPreviousBackend(ctx context.Context, client client.Client) (*Backend, error)
// GetSchedule returns the schedule string for a CronJob backend, if applicable. For non-CronJob backends,
// it can return an empty string or an error indicating that the schedule is not applicable.
GetSchedule() (string, error)
}
func GetStreamForClass ¶ added in v1.0.8
func GetStreamForClass(ctx context.Context, client client.Client, sc *v1.StreamClass, name types.NamespacedName, definitionParser DefinitionParser) (Definition, error)
GetStreamForClass retrieves the stream definition for a given stream class and namespaced name.
type DefinitionParser ¶ added in v1.1.0
type DefinitionParser func(*unstructured.Unstructured) (Definition, error)
DefinitionParser is a function type that takes an unstructured object and returns a validated Definition or an error if the parsing fails. This allows for flexible parsing logic that can be customized based on the specific structure of the unstructured object.
type JobBuilder ¶
type JobBuilder interface {
// BuildJob constructs a Kubernetes Job of the specified template type using the provided configurator.
BuildJob(ctx context.Context, templateName types.NamespacedName, configurator job.Configurator) (*batchv1.Job, error)
}
JobBuilder defines an interface for building Kubernetes Jobs based on a specified template type and a configurator that modifies the Job object.
type StatusManager ¶ added in v1.1.0
type StatusManager interface {
// UpdateStreamPhase updates the phase of the stream definition's status and emits an event if the phase has changed.
UpdateStreamPhase(ctx context.Context, definition Definition, backfillRequest *v1.BackfillRequest, next Phase, eventFunc controllers.EventFunc) (reconcile.Result, error)
}
StatusManager defines an interface for managing the status sub resource of a stream definition.