pipeline

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2026 License: AGPL-3.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExecutePipeline

func ExecutePipeline(
	ctx context.Context,
	req *model.PipelineRequest,
	stages []Stage,
	queue *FIFOQueue,
	auditFn func(*model.AuditEntry),
	rateLimitRecordFn func(agentID, toolName string),
) (*model.Response, error)

ExecutePipeline runs a tools/call request through ACL → RateLimit → Approval → Queue → Forward → Audit.

Types

type ACL

type ACL struct {
	// contains filtered or unexported fields
}

ACL checks whether an agent is allowed to call a specific tool on a backend.

func NewACL

func NewACL(cfgMgr *config.Manager) *ACL

func (*ACL) Name

func (a *ACL) Name() string

func (*ACL) Process

func (a *ACL) Process(_ context.Context, req *model.PipelineRequest) (*model.StageResult, error)

type ApprovalGate

type ApprovalGate struct {
	// contains filtered or unexported fields
}

ApprovalGate blocks destructive operations until human approval.

func NewApprovalGate

func NewApprovalGate(cfgMgr *config.Manager, store *approval.Store) *ApprovalGate

func (*ApprovalGate) Name

func (a *ApprovalGate) Name() string

func (*ApprovalGate) Process

type FIFOQueue

type FIFOQueue struct {
	// contains filtered or unexported fields
}

FIFOQueue serializes tool calls per backend with random delays.

func NewFIFOQueue

func NewFIFOQueue(cfgMgr *config.Manager, forward ForwardFunc) *FIFOQueue

func (*FIFOQueue) Enqueue

func (q *FIFOQueue) Enqueue(ctx context.Context, req *model.PipelineRequest) (*model.Response, int, error)

Enqueue adds a request to the backend's FIFO queue and blocks until execution. Returns the backend response or an error if the queue is full/context cancelled.

func (*FIFOQueue) QueueStatus

func (q *FIFOQueue) QueueStatus() map[string]int

QueueStatus returns the current pending count per backend.

func (*FIFOQueue) Start

func (q *FIFOQueue) Start()

Start initializes queue workers for all configured backends.

func (*FIFOQueue) Stop

func (q *FIFOQueue) Stop()

Stop gracefully shuts down all queue workers and waits for them to finish.

type ForwardFunc

type ForwardFunc func(ctx context.Context, backendID string, rpcReq *model.Request, sessionID string) (*model.Response, string, error)

ForwardFunc sends a JSON-RPC request to the backend and returns the response.

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter enforces per-agent per-tool and global per-tool sliding window rate limits.

func NewRateLimiter

func NewRateLimiter(cfgMgr *config.Manager, logger *audit.Logger) *RateLimiter

func (*RateLimiter) Name

func (r *RateLimiter) Name() string

func (*RateLimiter) Process

type Stage

type Stage interface {
	Name() string
	Process(ctx context.Context, req *model.PipelineRequest) (*model.StageResult, error)
}

Stage processes a request and returns a verdict.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL