dispatch

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2026 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher routes function invocations to live agents discovered via registry store. Now uses NNG instead of gRPC for Agent communication.

func NewDispatcher

func NewDispatcher(store *reg.Store) *Dispatcher

func NewDispatcherWithJobStore

func NewDispatcherWithJobStore(store *reg.Store, jobStore JobRoutingStore) *Dispatcher

NewDispatcherWithJobStore creates a new Dispatcher with optional job routing store

func (*Dispatcher) CancelJob

func (d *Dispatcher) CancelJob(ctx context.Context, jobID string) error

func (*Dispatcher) CleanupOldJobs

func (d *Dispatcher) CleanupOldJobs(ttl time.Duration) error

CleanupOldJobs removes old job routing entries

func (*Dispatcher) Close

func (d *Dispatcher) Close() error

Close closes the dispatcher and its resources

func (*Dispatcher) Invoke

func (d *Dispatcher) Invoke(ctx context.Context, functionID string, payload []byte) ([]byte, error)

func (*Dispatcher) InvokeRequest

func (d *Dispatcher) InvokeRequest(ctx context.Context, req *sdkv1.InvokeRequest) (*sdkv1.InvokeResponse, error)

InvokeRequest forwards a fully populated InvokeRequest to a live agent.

func (*Dispatcher) JobAddr

func (d *Dispatcher) JobAddr(jobID string) (string, bool)

JobAddr exposes tracked job routing addresses (primarily for diagnostics).

func (*Dispatcher) ListFunctionAgents

func (d *Dispatcher) ListFunctionAgents(functionID string) []string

ListFunctionAgents returns agent IDs that currently expose the function.

func (*Dispatcher) RegisterJob

func (d *Dispatcher) RegisterJob(jobID, addr string)

RegisterJob registers a job routing (exported method)

func (*Dispatcher) SetTLSConfig

func (d *Dispatcher) SetTLSConfig(cfg *tlsutil.ClientTLSConfig)

func (*Dispatcher) StartJob

func (d *Dispatcher) StartJob(ctx context.Context, functionID string, payload []byte) (string, error)

func (*Dispatcher) StartJobRequest

func (d *Dispatcher) StartJobRequest(ctx context.Context, req *sdkv1.InvokeRequest) (*sdkv1.StartJobResponse, error)

StartJobRequest forwards a structured InvokeRequest to the agent StartJob RPC.

func (*Dispatcher) Store

func (d *Dispatcher) Store() *reg.Store

func (*Dispatcher) StreamJob

func (d *Dispatcher) StreamJob(ctx context.Context, jobID string) ([]*sdkv1.JobEvent, bool, error)

func (*Dispatcher) StreamJobRealtime

func (d *Dispatcher) StreamJobRealtime(ctx context.Context, jobID string, fn func(*sdkv1.JobEvent) bool) (bool, error)

StreamJobRealtime forwards job events to the provided callback.

func (*Dispatcher) UnregisterJob

func (d *Dispatcher) UnregisterJob(jobID string)

UnregisterJob unregisters a job routing (exported method)

type FileJobRoutingStore

type FileJobRoutingStore struct {
	// contains filtered or unexported fields
}

FileJobRoutingStore implements JobRoutingStore using file-based persistence

func NewFileJobRoutingStore

func NewFileJobRoutingStore(dataDir string) (*FileJobRoutingStore, error)

NewFileJobRoutingStore creates a new file-based job routing store

func (*FileJobRoutingStore) Cleanup

func (s *FileJobRoutingStore) Cleanup(ttl time.Duration) error

func (*FileJobRoutingStore) Close

func (s *FileJobRoutingStore) Close() error

func (*FileJobRoutingStore) Delete

func (s *FileJobRoutingStore) Delete(jobID string) error

func (*FileJobRoutingStore) Get

func (s *FileJobRoutingStore) Get(jobID string) (*JobRouting, error)

func (*FileJobRoutingStore) List

func (s *FileJobRoutingStore) List() ([]*JobRouting, error)

func (*FileJobRoutingStore) Set

func (s *FileJobRoutingStore) Set(jobID, agentAddr string) error

type JobRouting

type JobRouting struct {
	JobID     string    `json:"job_id"`
	AgentAddr string    `json:"agent_addr"`
	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`
}

JobRouting represents a job routing entry

type JobRoutingStore

type JobRoutingStore interface {
	// Get retrieves job routing by job ID
	Get(jobID string) (*JobRouting, error)

	// Set stores or updates job routing
	Set(jobID, agentAddr string) error

	// Delete removes job routing
	Delete(jobID string) error

	// List returns all job routings
	List() ([]*JobRouting, error)

	// Cleanup removes old entries (older than ttl)
	Cleanup(ttl time.Duration) error

	// Close closes the store
	Close() error
}

JobRoutingStore defines the interface for job routing persistence

type MemoryJobRoutingStore

type MemoryJobRoutingStore struct {
	// contains filtered or unexported fields
}

MemoryJobRoutingStore implements JobRoutingStore using in-memory storage

func NewMemoryJobRoutingStore

func NewMemoryJobRoutingStore() *MemoryJobRoutingStore

NewMemoryJobRoutingStore creates a new in-memory job routing store

func (*MemoryJobRoutingStore) Cleanup

func (s *MemoryJobRoutingStore) Cleanup(ttl time.Duration) error

func (*MemoryJobRoutingStore) Close

func (s *MemoryJobRoutingStore) Close() error

func (*MemoryJobRoutingStore) Delete

func (s *MemoryJobRoutingStore) Delete(jobID string) error

func (*MemoryJobRoutingStore) Get

func (s *MemoryJobRoutingStore) Get(jobID string) (*JobRouting, error)

func (*MemoryJobRoutingStore) List

func (s *MemoryJobRoutingStore) List() ([]*JobRouting, error)

func (*MemoryJobRoutingStore) Set

func (s *MemoryJobRoutingStore) Set(jobID, agentAddr string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL