correlation

package
v0.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2025 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DAO

type DAO interface {
	Save(ctx context.Context, g *Group) error
	Load(ctx context.Context, id string) (*Group, error)
	Delete(ctx context.Context, id string) error
	List(ctx context.Context) ([]*Group, error)
}

DAO abstracts persistence operations for correlation groups so that the allocator can recover its in-memory state after a restart.

type Group

type Group struct {
	ID              string
	ParentProcessID string
	ParentExecID    string

	Expected int

	Outputs []interface{}

	DoneAt    *time.Time
	TimeoutAt *time.Time // nil means no timeout

	Mode  string
	Merge string
	// contains filtered or unexported fields
}

Group represents a rendez-vous for a set of asynchronous executions emitted by a parent task. The group tracks how many children were expected and how many have already reported completion.

func (*Group) AggregateOutputs

func (g *Group) AggregateOutputs() []interface{}

AggregateOutputs returns slice of collected child outputs.

func (*Group) Done

func (g *Group) Done() bool

Done returns whether the group has completed.

func (*Group) Failed

func (g *Group) Failed() bool

Failed returns true when at least one child reported failure.

func (*Group) MarkDone

func (g *Group) MarkDone(failed bool, output interface{}) (groupComplete bool)

MarkDone registers the completion of a child execution and returns true when the rendez-vous condition (all children finished) has been satisfied. pass failed=true if the child ended in error.

func (*Group) TimedOut

func (g *Group) TimedOut() bool

TimedOut returns true if TimeoutAt is set and now past it.

type MemoryDAO

type MemoryDAO struct {
	// contains filtered or unexported fields
}

MemoryDAO stores correlation groups purely in memory; useful for unit tests and single-instance deployments.

func NewMemoryDAO

func NewMemoryDAO() *MemoryDAO

func (*MemoryDAO) Delete

func (d *MemoryDAO) Delete(_ context.Context, id string) error

func (*MemoryDAO) List

func (d *MemoryDAO) List(_ context.Context) ([]*Group, error)

func (*MemoryDAO) Load

func (d *MemoryDAO) Load(_ context.Context, id string) (*Group, error)

func (*MemoryDAO) Save

func (d *MemoryDAO) Save(_ context.Context, g *Group) error

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store is an in-memory implementation satisfying basic operations needed by the allocator/processor communication. It can be replaced by a Redis or DB implementation later without changing callers.

func NewStore

func NewStore() *Store

func (*Store) Create

func (s *Store) Create(g *Group) *Group

Create registers a new group. If it already exists the existing pointer is returned.

func (*Store) Delete

func (s *Store) Delete(id string)

func (*Store) Get

func (s *Store) Get(id string) *Group

func (*Store) Iterate

func (s *Store) Iterate(fn func(id string, g *Group))

Iterate executes fn for each group under read lock.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL