agent

package
v0.1.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Overview

Package agent provides business logic for handling local agent requests

Package agent implements Agent-side session management for connected SDK Providers.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type LocalHandler

type LocalHandler struct {
	// contains filtered or unexported fields
}

LocalHandler contains the business logic for handling agent requests without any transport-specific dependencies.

func NewLocalHandler

func NewLocalHandler(store *agentlocal.LocalStore, configDir, agentID string, logger *slog.Logger) *LocalHandler

NewLocalHandler creates a new LocalHandler instance

func (*LocalHandler) Handle

func (h *LocalHandler) Handle(ctx context.Context, msgID uint32, reqID uint32, body []byte) ([]byte, error)

Handle implements transportcore.Handler interface It dispatches requests to the appropriate handler based on message type

func (*LocalHandler) SetOpsServer

func (h *LocalHandler) SetOpsServer(ops OpsServerWrapper)

SetOpsServer sets the ops server wrapper

func (*LocalHandler) SetProviderManager

func (h *LocalHandler) SetProviderManager(pm ProviderManager)

SetProviderManager sets the provider manager

func (*LocalHandler) SetTLSConfig

func (h *LocalHandler) SetTLSConfig(cfg *tlsutil.ClientTLSConfig)

SetTLSConfig sets the TLS config for outbound connections

func (*LocalHandler) SetTaskEventReporter

func (h *LocalHandler) SetTaskEventReporter(reporter TaskEventReporter)

type OpsServerWrapper

type OpsServerWrapper interface {
	GetSystemInfo(ctx context.Context, req *emptypb.Empty) (*opsv1.SystemInfo, error)
	ListProcesses(ctx context.Context, req *emptypb.Empty) (*opsv1.ListProcessesResponse, error)
	ReportMetrics(ctx context.Context, req *opsv1.MetricsReport) (*emptypb.Empty, error)
	RestartProcess(ctx context.Context, req *opsv1.RestartProcessRequest) (*opsv1.RestartProcessResponse, error)
	StopProcess(ctx context.Context, req *opsv1.StopProcessRequest) (*opsv1.StopProcessResponse, error)
	StartProcess(ctx context.Context, req *opsv1.StartProcessRequest) (*opsv1.StartProcessResponse, error)
	ExecuteCommand(ctx context.Context, req *opsv1.ExecuteCommandRequest) (*opsv1.ExecuteCommandResponse, error)

	// System services (platform-specific, read-only)
	// These use JSON request/response to avoid circular dependencies
	ListServicesJSON(ctx context.Context, jsonReq []byte) ([]byte, error)
	GetServiceStatusJSON(ctx context.Context, jsonReq []byte) ([]byte, error)
	ListCronJobsJSON(ctx context.Context) ([]byte, error)
}

OpsServerWrapper wraps the OpsServer functionality

type ProviderManager

type ProviderManager interface {
	IsPlatformFunction(functionID string) bool
	Call(ctx context.Context, functionID string, request []byte) ([]byte, error)
}

ProviderManager is the interface for provider function calls

type ProviderSession

type ProviderSession struct {

	// SessionID is the unique session identifier assigned on connect.
	SessionID string

	// ServiceID is the Provider's service identifier (e.g., "prom-adapter").
	ServiceID string

	// Version is the Provider's reported version.
	Version string

	// Functions are the function descriptors registered by this Provider.
	Functions []*sdkv1.LocalFunctionDescriptor

	// ConnectedAt is the time the session was established.
	ConnectedAt time.Time

	// LastSeen is the most recent heartbeat or activity time.
	LastSeen atomic.Int64 // Unix timestamp

	// SDKLanguage is the SDK language (e.g., "go", "java", "python").
	SDKLanguage string

	// SDKVersion is the SDK release version.
	SDKVersion string
	// contains filtered or unexported fields
}

ProviderSession represents an established session with a connected SDK Provider. Agent dispatches Invoke/StartTask/CancelTask requests to the Provider through this session.

func (*ProviderSession) Close

func (s *ProviderSession) Close() error

Close closes the underlying connection.

func (*ProviderSession) Conn

func (s *ProviderSession) Conn() *tcptr.MuxConn

Conn returns the underlying MuxConn for sending requests to this Provider.

func (*ProviderSession) FunctionIDs

func (s *ProviderSession) FunctionIDs() []string

FunctionIDs returns the list of function IDs registered by this Provider.

func (*ProviderSession) GetLastSeen

func (s *ProviderSession) GetLastSeen() time.Time

GetLastSeen returns the LastSeen time.

func (*ProviderSession) UpdateLastSeen

func (s *ProviderSession) UpdateLastSeen()

UpdateLastSeen updates the LastSeen timestamp to now.

type ProviderSessionStore

type ProviderSessionStore struct {
	// contains filtered or unexported fields
}

ProviderSessionStore manages active Provider sessions. It provides thread-safe CRUD operations indexed by SessionID and ServiceID.

func NewProviderSessionStore

func NewProviderSessionStore() *ProviderSessionStore

NewProviderSessionStore creates a new ProviderSessionStore.

func (*ProviderSessionStore) Add

func (store *ProviderSessionStore) Add(sess *ProviderSession) error

Add registers a new Provider session. Returns an error if a session with the same SessionID already exists.

func (*ProviderSessionStore) Count

func (store *ProviderSessionStore) Count() int

Count returns the number of active sessions.

func (*ProviderSessionStore) GetByServiceID

func (store *ProviderSessionStore) GetByServiceID(serviceID string) (*ProviderSession, bool)

GetByServiceID returns the Provider session by ServiceID.

func (*ProviderSessionStore) GetBySessionID

func (store *ProviderSessionStore) GetBySessionID(sessionID string) (*ProviderSession, bool)

GetBySessionID returns the Provider session by SessionID.

func (*ProviderSessionStore) List

func (store *ProviderSessionStore) List() []*ProviderSession

List returns all active Provider sessions.

func (*ProviderSessionStore) PruneStale

func (store *ProviderSessionStore) PruneStale(ttl time.Duration) int

PruneStale removes sessions that haven't been seen within the given TTL.

func (*ProviderSessionStore) Remove

func (store *ProviderSessionStore) Remove(sessionID string)

Remove removes a Provider session by SessionID and closes its connection.

func (*ProviderSessionStore) RemoveByServiceID

func (store *ProviderSessionStore) RemoveByServiceID(serviceID string)

RemoveByServiceID removes a Provider session by ServiceID.

func (*ProviderSessionStore) Upsert

func (store *ProviderSessionStore) Upsert(sess *ProviderSession)

Upsert adds or replaces a Provider session.

type TCPLocalListener

type TCPLocalListener struct {
	// contains filtered or unexported fields
}

TCPLocalListener accepts SDK Provider TCP sessions on the Agent's local gateway.

Lifecycle:

  1. SDK Provider dials in → first frame must be ProviderConnectRequest
  2. Agent validates and creates ProviderSession → stores in ProviderSessionStore
  3. Heartbeat/Drain flow through MuxConn
  4. Agent can send Invoke/StartTask to Provider via session.conn.Call()
  5. On disconnect, session is removed from store

func NewTCPLocalListener

func NewTCPLocalListener(config *TCPLocalListenerConfig, sessionStore *ProviderSessionStore, logger *slog.Logger) (*TCPLocalListener, error)

NewTCPLocalListener creates a new local TCP listener for SDK connections.

func (*TCPLocalListener) Addr

func (l *TCPLocalListener) Addr() string

Addr returns the bound listener address.

func (*TCPLocalListener) Close

func (l *TCPLocalListener) Close() error

Close stops accepting new connections and waits for active ones to finish.

func (*TCPLocalListener) IsClosed

func (l *TCPLocalListener) IsClosed() bool

IsClosed reports whether the listener has been closed.

func (*TCPLocalListener) Serve

func (l *TCPLocalListener) Serve(ctx context.Context) error

Serve accepts connections until ctx is done or the listener is closed.

func (*TCPLocalListener) SessionStore

func (l *TCPLocalListener) SessionStore() *ProviderSessionStore

SessionStore returns the Provider session store.

func (*TCPLocalListener) SetLocalHandler

func (l *TCPLocalListener) SetLocalHandler(h transportcore.Handler)

SetLocalHandler sets the handler for inbound requests from providers (e.g., invoke responses).

func (*TCPLocalListener) SetOnConnect

func (l *TCPLocalListener) SetOnConnect(fn func(sess *ProviderSession))

SetOnConnect sets the callback invoked after a Provider successfully connects.

func (*TCPLocalListener) SetOnDisconnect

func (l *TCPLocalListener) SetOnDisconnect(fn func(sess *ProviderSession))

SetOnDisconnect sets the callback invoked when a Provider disconnects.

type TCPLocalListenerConfig

type TCPLocalListenerConfig struct {
	// Address is the listen address (e.g., "127.0.0.1:19091").
	Address string

	// Timeouts.
	RecvTimeout time.Duration
	SendTimeout time.Duration
}

TCPLocalListenerConfig holds configuration for the Agent's local TCP listener that accepts SDK Provider connections.

type TaskEventReporter

type TaskEventReporter interface {
	ReportTaskEvent(ctx context.Context, event *sdkv1.TaskEvent) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL