controller

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WorkerId

func WorkerId(ctx context.Context) string

Types

type Controller

type Controller interface {
	Start(ctx context.Context) error
	Stop()
}

Controller processes entities of a specific kind

type ControllerEntity

type ControllerEntity interface {
	Decode(getter entity.AttrGetter)
	Encode() []entity.Attr
}

type ControllerManager

type ControllerManager struct {
	// contains filtered or unexported fields
}

ControllerManager manages multiple controllers

func NewControllerManager

func NewControllerManager() *ControllerManager

NewControllerManager creates a new controller manager

func (*ControllerManager) AddController

func (m *ControllerManager) AddController(controller Controller)

AddController adds a controller to the manager

func (*ControllerManager) Start

func (m *ControllerManager) Start(ctx context.Context) error

Start starts all controllers

func (*ControllerManager) Stop

func (m *ControllerManager) Stop()

Stop stops all controllers

type DeletingReconcileController

type DeletingReconcileController interface {
	Delete(ctx context.Context, e entity.Id) error
}

DeletingReconcileController is an optional interface that reconcile controllers can implement to handle deletion of their managed entities.

type EntityAccessClient

type EntityAccessClient interface {
	// Get returns a single entity by its ID
	Get(ctx context.Context, id string) (*entityserver_v1alpha.EntityAccessClientGetResults, error)

	// List returns all entities matching the given index
	List(ctx context.Context, index entity.Attr) (*entityserver_v1alpha.EntityAccessClientListResults, error)

	// WatchIndex watches for changes to entities matching the given index
	// and sends updates through the provided sender
	WatchIndex(ctx context.Context, index entity.Attr, sender stream.SendStream[*entityserver_v1alpha.EntityOp]) error
}

EntityAccessClient defines the interface for interacting with the entity server

type Event

type Event struct {
	Type EventType
	Id   entity.Id

	Entity *entity.Entity // The entity that was changed

	Rev, PrevRev int64 // Revision and previous revision for the entity
}

Event represents a change to an entity

type EventType

type EventType string

EventType represents the type of event that occurred on an entity

const (
	EventAdded   EventType = "ADDED"
	EventUpdated EventType = "UPDATED"
	EventDeleted EventType = "DELETED"
)

type GenericController

type GenericController[P ControllerEntity] interface {
	Init(context.Context) error
	Create(ctx context.Context, obj P, meta *entity.Meta) error
	Delete(ctx context.Context, e entity.Id) error
}

type HandlerFunc

type HandlerFunc func(ctx context.Context, event Event) ([]entity.Attr, error)

HandlerFunc is a function that processes an entity

func AdaptController

func AdaptController[
	T any,
	P interface {
		*T
		ControllerEntity
	},
	C GenericController[P],
](cont C) HandlerFunc

func AdaptReconcileController

func AdaptReconcileController[
	T any,
	P interface {
		*T
		ControllerEntity
	},
	C ReconcileControllerI[P],
](cont C) HandlerFunc

AdaptReconcileController adapts a ReconcileControllerI into a HandlerFunc. It calls Reconcile() for both Add and Update events. If the controller implements DeletingReconcileController, Delete() is called for Delete events.

type ReconcileController

type ReconcileController struct {
	Log *slog.Logger
	// contains filtered or unexported fields
}

ReconcileController implements the Controller interface

func NewReconcileController

func NewReconcileController(name string, log *slog.Logger, index entity.Attr, esc *entityserver_v1alpha.EntityAccessClient, handler HandlerFunc, resyncPeriod time.Duration, workers int) *ReconcileController

NewReconcileController creates a new controller

func (*ReconcileController) Enqueue

func (c *ReconcileController) Enqueue(event Event)

Enqueue adds an event to the work queue for processing

func (*ReconcileController) ProcessEventForTest

func (c *ReconcileController) ProcessEventForTest(ctx context.Context, event Event) error

ProcessEventForTest processes a single event and applies any updates. This is exposed for testing controllers without starting the full controller loop.

func (*ReconcileController) RecordWrite

func (c *ReconcileController) RecordWrite(revision int64)

RecordWrite records a revision that was written by this controller. Subsequent watch events for this revision will be skipped to reduce unnecessary reconciliation noise from self-generated updates.

func (*ReconcileController) SetPeriodic

func (c *ReconcileController) SetPeriodic(often time.Duration, fn func(ctx context.Context) error)

SetPeriodic sets the periodic callback function

func (*ReconcileController) Start

func (c *ReconcileController) Start(top context.Context) error

Start starts the controller

func (*ReconcileController) Stop

func (c *ReconcileController) Stop()

Stop stops the controller

func (*ReconcileController) WriteTracker

func (c *ReconcileController) WriteTracker() WriteTracker

WriteTracker returns a WriteTracker interface that can be used by controllers to record manual entity writes outside the reconciliation framework.

type ReconcileControllerI

type ReconcileControllerI[P ControllerEntity] interface {
	Init(context.Context) error
	Reconcile(ctx context.Context, obj P, meta *entity.Meta) error
}

ReconcileControllerI is for controllers that maintain aggregate state across multiple entities. Unlike GenericController which maps 1:1 between an entity and a resource, ReconcileControllerI handles controllers where one entity drives reconciliation of N resources.

type RingSet

type RingSet struct {
	// contains filtered or unexported fields
}

RingSet maintains a fixed-size set of recently seen int64 values using a ring buffer. Controllers use this to track recently written entity revisions, allowing them to skip watch events for their own writes and reduce unnecessary reconciliation noise.

Thread-safe for concurrent access by multiple workers and the watch goroutine.

func NewRingSet

func NewRingSet(capacity int) *RingSet

func (*RingSet) Add

func (r *RingSet) Add(value int64)

func (*RingSet) Contains

func (r *RingSet) Contains(value int64) bool

type UpdatingController

type UpdatingController[P ControllerEntity] interface {
	Update(ctx context.Context, obj P, meta *entity.Meta) error
}

UpdatingController is an optional interface that controllers can implement to handle updates differently from creates

type WriteTracker

type WriteTracker interface {
	RecordWrite(revision int64)
}

WriteTracker provides a way to track entity write revisions to skip self-generated watch events. Controllers that make manual entity writes outside the reconciliation framework can use this to record their writes and avoid unnecessary re-reconciliation.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL