Documentation
¶
Overview ¶
Package agent provides business logic for handling local agent requests
Package agent implements Agent-side session management for connected SDK Providers.
Index ¶
- type LocalHandler
- func (h *LocalHandler) Handle(ctx context.Context, msgID uint32, reqID uint32, body []byte) ([]byte, error)
- func (h *LocalHandler) SetOpsServer(ops OpsServerWrapper)
- func (h *LocalHandler) SetProviderManager(pm ProviderManager)
- func (h *LocalHandler) SetTLSConfig(cfg *tlsutil.ClientTLSConfig)
- func (h *LocalHandler) SetTaskEventReporter(reporter TaskEventReporter)
- type OpsServerWrapper
- type ProviderManager
- type ProviderSession
- type ProviderSessionStore
- func (store *ProviderSessionStore) Add(sess *ProviderSession) error
- func (store *ProviderSessionStore) Count() int
- func (store *ProviderSessionStore) GetByServiceID(serviceID string) (*ProviderSession, bool)
- func (store *ProviderSessionStore) GetBySessionID(sessionID string) (*ProviderSession, bool)
- func (store *ProviderSessionStore) List() []*ProviderSession
- func (store *ProviderSessionStore) PruneStale(ttl time.Duration) int
- func (store *ProviderSessionStore) Remove(sessionID string)
- func (store *ProviderSessionStore) RemoveByServiceID(serviceID string)
- func (store *ProviderSessionStore) Upsert(sess *ProviderSession)
- type TCPLocalListener
- func (l *TCPLocalListener) Addr() string
- func (l *TCPLocalListener) Close() error
- func (l *TCPLocalListener) IsClosed() bool
- func (l *TCPLocalListener) Serve(ctx context.Context) error
- func (l *TCPLocalListener) SessionStore() *ProviderSessionStore
- func (l *TCPLocalListener) SetLocalHandler(h transportcore.Handler)
- func (l *TCPLocalListener) SetOnConnect(fn func(sess *ProviderSession))
- func (l *TCPLocalListener) SetOnDisconnect(fn func(sess *ProviderSession))
- type TCPLocalListenerConfig
- type TaskEventReporter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type LocalHandler ¶
type LocalHandler struct {
// contains filtered or unexported fields
}
LocalHandler contains the business logic for handling agent requests without any transport-specific dependencies.
func NewLocalHandler ¶
func NewLocalHandler(store *agentlocal.LocalStore, configDir, agentID string, logger *slog.Logger) *LocalHandler
NewLocalHandler creates a new LocalHandler instance
func (*LocalHandler) Handle ¶
func (h *LocalHandler) Handle(ctx context.Context, msgID uint32, reqID uint32, body []byte) ([]byte, error)
Handle implements transportcore.Handler interface It dispatches requests to the appropriate handler based on message type
func (*LocalHandler) SetOpsServer ¶
func (h *LocalHandler) SetOpsServer(ops OpsServerWrapper)
SetOpsServer sets the ops server wrapper
func (*LocalHandler) SetProviderManager ¶
func (h *LocalHandler) SetProviderManager(pm ProviderManager)
SetProviderManager sets the provider manager
func (*LocalHandler) SetTLSConfig ¶
func (h *LocalHandler) SetTLSConfig(cfg *tlsutil.ClientTLSConfig)
SetTLSConfig sets the TLS config for outbound connections
func (*LocalHandler) SetTaskEventReporter ¶
func (h *LocalHandler) SetTaskEventReporter(reporter TaskEventReporter)
type OpsServerWrapper ¶
type OpsServerWrapper interface {
GetSystemInfo(ctx context.Context, req *emptypb.Empty) (*opsv1.SystemInfo, error)
ListProcesses(ctx context.Context, req *emptypb.Empty) (*opsv1.ListProcessesResponse, error)
ReportMetrics(ctx context.Context, req *opsv1.MetricsReport) (*emptypb.Empty, error)
RestartProcess(ctx context.Context, req *opsv1.RestartProcessRequest) (*opsv1.RestartProcessResponse, error)
StopProcess(ctx context.Context, req *opsv1.StopProcessRequest) (*opsv1.StopProcessResponse, error)
StartProcess(ctx context.Context, req *opsv1.StartProcessRequest) (*opsv1.StartProcessResponse, error)
ExecuteCommand(ctx context.Context, req *opsv1.ExecuteCommandRequest) (*opsv1.ExecuteCommandResponse, error)
// System services (platform-specific, read-only)
// These use JSON request/response to avoid circular dependencies
ListServicesJSON(ctx context.Context, jsonReq []byte) ([]byte, error)
GetServiceStatusJSON(ctx context.Context, jsonReq []byte) ([]byte, error)
ListCronJobsJSON(ctx context.Context) ([]byte, error)
}
OpsServerWrapper wraps the OpsServer functionality
type ProviderManager ¶
type ProviderManager interface {
IsPlatformFunction(functionID string) bool
Call(ctx context.Context, functionID string, request []byte) ([]byte, error)
}
ProviderManager is the interface for provider function calls
type ProviderSession ¶
type ProviderSession struct {
// SessionID is the unique session identifier assigned on connect.
SessionID string
// ServiceID is the Provider's service identifier (e.g., "prom-adapter").
ServiceID string
// Version is the Provider's reported version.
Version string
// Functions are the function descriptors registered by this Provider.
Functions []*sdkv1.LocalFunctionDescriptor
// ConnectedAt is the time the session was established.
ConnectedAt time.Time
// LastSeen is the most recent heartbeat or activity time.
LastSeen atomic.Int64 // Unix timestamp
// SDKLanguage is the SDK language (e.g., "go", "java", "python").
SDKLanguage string
// SDKVersion is the SDK release version.
SDKVersion string
// contains filtered or unexported fields
}
ProviderSession represents an established session with a connected SDK Provider. Agent dispatches Invoke/StartTask/CancelTask requests to the Provider through this session.
func (*ProviderSession) Close ¶
func (s *ProviderSession) Close() error
Close closes the underlying connection.
func (*ProviderSession) Conn ¶
func (s *ProviderSession) Conn() *tcptr.MuxConn
Conn returns the underlying MuxConn for sending requests to this Provider.
func (*ProviderSession) FunctionIDs ¶
func (s *ProviderSession) FunctionIDs() []string
FunctionIDs returns the list of function IDs registered by this Provider.
func (*ProviderSession) GetLastSeen ¶
func (s *ProviderSession) GetLastSeen() time.Time
GetLastSeen returns the LastSeen time.
func (*ProviderSession) UpdateLastSeen ¶
func (s *ProviderSession) UpdateLastSeen()
UpdateLastSeen updates the LastSeen timestamp to now.
type ProviderSessionStore ¶
type ProviderSessionStore struct {
// contains filtered or unexported fields
}
ProviderSessionStore manages active Provider sessions. It provides thread-safe CRUD operations indexed by SessionID and ServiceID.
func NewProviderSessionStore ¶
func NewProviderSessionStore() *ProviderSessionStore
NewProviderSessionStore creates a new ProviderSessionStore.
func (*ProviderSessionStore) Add ¶
func (store *ProviderSessionStore) Add(sess *ProviderSession) error
Add registers a new Provider session. Returns an error if a session with the same SessionID already exists.
func (*ProviderSessionStore) Count ¶
func (store *ProviderSessionStore) Count() int
Count returns the number of active sessions.
func (*ProviderSessionStore) GetByServiceID ¶
func (store *ProviderSessionStore) GetByServiceID(serviceID string) (*ProviderSession, bool)
GetByServiceID returns the Provider session by ServiceID.
func (*ProviderSessionStore) GetBySessionID ¶
func (store *ProviderSessionStore) GetBySessionID(sessionID string) (*ProviderSession, bool)
GetBySessionID returns the Provider session by SessionID.
func (*ProviderSessionStore) List ¶
func (store *ProviderSessionStore) List() []*ProviderSession
List returns all active Provider sessions.
func (*ProviderSessionStore) PruneStale ¶
func (store *ProviderSessionStore) PruneStale(ttl time.Duration) int
PruneStale removes sessions that haven't been seen within the given TTL.
func (*ProviderSessionStore) Remove ¶
func (store *ProviderSessionStore) Remove(sessionID string)
Remove removes a Provider session by SessionID and closes its connection.
func (*ProviderSessionStore) RemoveByServiceID ¶
func (store *ProviderSessionStore) RemoveByServiceID(serviceID string)
RemoveByServiceID removes a Provider session by ServiceID.
func (*ProviderSessionStore) Upsert ¶
func (store *ProviderSessionStore) Upsert(sess *ProviderSession)
Upsert adds or replaces a Provider session.
type TCPLocalListener ¶
type TCPLocalListener struct {
// contains filtered or unexported fields
}
TCPLocalListener accepts SDK Provider TCP sessions on the Agent's local gateway.
Lifecycle:
- SDK Provider dials in → first frame must be ProviderConnectRequest
- Agent validates and creates ProviderSession → stores in ProviderSessionStore
- Heartbeat/Drain flow through MuxConn
- Agent can send Invoke/StartTask to Provider via session.conn.Call()
- On disconnect, session is removed from store
func NewTCPLocalListener ¶
func NewTCPLocalListener(config *TCPLocalListenerConfig, sessionStore *ProviderSessionStore, logger *slog.Logger) (*TCPLocalListener, error)
NewTCPLocalListener creates a new local TCP listener for SDK connections.
func (*TCPLocalListener) Addr ¶
func (l *TCPLocalListener) Addr() string
Addr returns the bound listener address.
func (*TCPLocalListener) Close ¶
func (l *TCPLocalListener) Close() error
Close stops accepting new connections and waits for active ones to finish.
func (*TCPLocalListener) IsClosed ¶
func (l *TCPLocalListener) IsClosed() bool
IsClosed reports whether the listener has been closed.
func (*TCPLocalListener) Serve ¶
func (l *TCPLocalListener) Serve(ctx context.Context) error
Serve accepts connections until ctx is done or the listener is closed.
func (*TCPLocalListener) SessionStore ¶
func (l *TCPLocalListener) SessionStore() *ProviderSessionStore
SessionStore returns the Provider session store.
func (*TCPLocalListener) SetLocalHandler ¶
func (l *TCPLocalListener) SetLocalHandler(h transportcore.Handler)
SetLocalHandler sets the handler for inbound requests from providers (e.g., invoke responses).
func (*TCPLocalListener) SetOnConnect ¶
func (l *TCPLocalListener) SetOnConnect(fn func(sess *ProviderSession))
SetOnConnect sets the callback invoked after a Provider successfully connects.
func (*TCPLocalListener) SetOnDisconnect ¶
func (l *TCPLocalListener) SetOnDisconnect(fn func(sess *ProviderSession))
SetOnDisconnect sets the callback invoked when a Provider disconnects.
type TCPLocalListenerConfig ¶
type TCPLocalListenerConfig struct {
// Address is the listen address (e.g., "127.0.0.1:19091").
Address string
// Timeouts.
RecvTimeout time.Duration
SendTimeout time.Duration
}
TCPLocalListenerConfig holds configuration for the Agent's local TCP listener that accepts SDK Provider connections.