Documentation
¶
Overview ¶
Package server implements Server-side session management for connected Agents.
Package server provides the Server-side control plane for Croupier.
Index ¶
- func IsLocalTCP(addr string) bool
- type AgentSession
- type AgentSessionLoader
- type AgentSessionStore
- func (store *AgentSessionStore) Add(sess *AgentSession) error
- func (store *AgentSessionStore) Count() int
- func (store *AgentSessionStore) Get(agentID string) (*AgentSession, bool)
- func (store *AgentSessionStore) List() []*AgentSession
- func (store *AgentSessionStore) PruneStale(ttl time.Duration) int
- func (store *AgentSessionStore) Remove(agentID string)
- func (store *AgentSessionStore) ResolveAgentConn(agentID string) (*tcptr.MuxConn, bool)
- func (store *AgentSessionStore) ResolveSessionCaller(agentID string) (transport.SessionCaller, bool)
- func (store *AgentSessionStore) Upsert(sess *AgentSession)
- type ControlHandler
- func (h *ControlHandler) HandleHeartbeat(ctx context.Context, req *agentv1.HeartbeatRequest) (*agentv1.HeartbeatResponse, error)
- func (h *ControlHandler) HandleRegister(ctx context.Context, req *agentv1.RegisterRequest) (*agentv1.RegisterResponse, error)
- func (h *ControlHandler) HandleRegisterCapabilities(ctx context.Context, req *agentv1.RegisterCapabilitiesRequest) (*agentv1.RegisterCapabilitiesResponse, error)
- type ControlService
- func (s *ControlService) GetStats() map[string]interface{}
- func (s *ControlService) LoadAgentSessions() error
- func (s *ControlService) MetricsStore() *reg.MetricsStore
- func (s *ControlService) SetDefaultSessionTTL(ttl time.Duration)
- func (s *ControlService) SetLogger(logger *slog.Logger)
- func (s *ControlService) SetTaskStore(store *tasks.Store)
- func (s *ControlService) SetUpstreamHandler(h Handler)
- func (s *ControlService) StartBackgroundTasks()
- func (s *ControlService) Stop()
- func (s *ControlService) Store() *reg.Store
- func (s *ControlService) SystemInfoCache() *reg.SystemInfoCache
- func (s *ControlService) TransportHandler() transportcore.Handler
- type Handler
- type ListenAddr
- type SessionResolverAdapter
- type TCPListener
- type TCPListenerConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsLocalTCP ¶
IsLocalTCP checks if an address is a local TCP address.
Types ¶
type AgentSession ¶
type AgentSession struct {
// AgentID is the unique identifier of the connected Agent.
AgentID string
// SessionID is the unique session identifier assigned on registration.
SessionID string
// GameID is the game this agent belongs to.
GameID string
// Env is the deployment environment (e.g., "dev", "prod").
Env string
// Version is the agent's reported version.
Version string
// RPCAddr mirrors the legacy compatibility address published during register.
// Session routing should prefer the live TCP session instead of this field.
RPCAddr string
// ConnectedAt is the time the session was established.
ConnectedAt time.Time
// LastSeen is the most recent heartbeat or activity time.
LastSeen atomic.Int64 // Unix timestamp
// contains filtered or unexported fields
}
AgentSession represents an established session with a connected Agent. Server dispatches Invoke/StartTask/CancelTask requests through this session.
func (*AgentSession) Close ¶
func (s *AgentSession) Close() error
Close closes the underlying connection.
func (*AgentSession) Conn ¶
func (s *AgentSession) Conn() *tcptr.MuxConn
Conn returns the underlying MuxConn for sending requests to this Agent.
func (*AgentSession) GetLastSeen ¶
func (s *AgentSession) GetLastSeen() time.Time
GetLastSeen returns the LastSeen time.
func (*AgentSession) UpdateLastSeen ¶
func (s *AgentSession) UpdateLastSeen()
UpdateLastSeen updates the LastSeen timestamp to now.
type AgentSessionLoader ¶
type AgentSessionLoader interface {
LoadActiveSessions(ctx context.Context) ([]*reg.AgentSession, error)
Upsert(ctx context.Context, sess *reg.AgentSession) error
DeleteExpired(ctx context.Context) (int64, error)
}
AgentSessionLoader defines the interface for loading and managing agent sessions from a database.
type AgentSessionStore ¶
type AgentSessionStore struct {
// contains filtered or unexported fields
}
AgentSessionStore manages active Agent sessions. It provides thread-safe CRUD operations indexed by AgentID.
func NewAgentSessionStore ¶
func NewAgentSessionStore() *AgentSessionStore
NewAgentSessionStore creates a new AgentSessionStore.
func (*AgentSessionStore) Add ¶
func (store *AgentSessionStore) Add(sess *AgentSession) error
Add registers a new Agent session. Returns an error if a session with the same AgentID already exists.
func (*AgentSessionStore) Count ¶
func (store *AgentSessionStore) Count() int
Count returns the number of active sessions.
func (*AgentSessionStore) Get ¶
func (store *AgentSessionStore) Get(agentID string) (*AgentSession, bool)
Get returns the Agent session by AgentID.
func (*AgentSessionStore) List ¶
func (store *AgentSessionStore) List() []*AgentSession
List returns all active Agent sessions.
func (*AgentSessionStore) PruneStale ¶
func (store *AgentSessionStore) PruneStale(ttl time.Duration) int
PruneStale removes sessions that haven't been seen within the given TTL.
func (*AgentSessionStore) Remove ¶
func (store *AgentSessionStore) Remove(agentID string)
Remove removes an Agent session by AgentID and closes its connection.
func (*AgentSessionStore) ResolveAgentConn ¶
func (store *AgentSessionStore) ResolveAgentConn(agentID string) (*tcptr.MuxConn, bool)
ResolveAgentConn returns the MuxConn for an agent's active TCP session. This implements the dispatch.AgentSessionResolver interface. Returns the MuxConn (which has Call()) or false if no session exists.
func (*AgentSessionStore) ResolveSessionCaller ¶
func (store *AgentSessionStore) ResolveSessionCaller(agentID string) (transport.SessionCaller, bool)
ResolveSessionCaller returns a SessionCaller for dispatch.AgentSessionResolver.
func (*AgentSessionStore) Upsert ¶
func (store *AgentSessionStore) Upsert(sess *AgentSession)
Upsert adds or replaces an Agent session.
type ControlHandler ¶
type ControlHandler struct {
// contains filtered or unexported fields
}
ControlHandler wraps ControlService to implement the Handler interface.
func NewControlHandler ¶
func NewControlHandler(service *ControlService) *ControlHandler
NewControlHandler creates a new control handler.
func (*ControlHandler) HandleHeartbeat ¶
func (h *ControlHandler) HandleHeartbeat(ctx context.Context, req *agentv1.HeartbeatRequest) (*agentv1.HeartbeatResponse, error)
func (*ControlHandler) HandleRegister ¶
func (h *ControlHandler) HandleRegister(ctx context.Context, req *agentv1.RegisterRequest) (*agentv1.RegisterResponse, error)
func (*ControlHandler) HandleRegisterCapabilities ¶
func (h *ControlHandler) HandleRegisterCapabilities(ctx context.Context, req *agentv1.RegisterCapabilitiesRequest) (*agentv1.RegisterCapabilitiesResponse, error)
type ControlService ¶
type ControlService struct {
// contains filtered or unexported fields
}
ControlService implements the control-plane business logic (register, heartbeat, capabilities). It is transport-agnostic — TCPListener delegates inbound frames to this service.
func NewControlService ¶
func NewControlService(registry *reg.Store, loader AgentSessionLoader) *ControlService
NewControlService creates a new ControlService.
func (*ControlService) GetStats ¶
func (s *ControlService) GetStats() map[string]interface{}
GetStats returns server statistics.
func (*ControlService) LoadAgentSessions ¶
func (s *ControlService) LoadAgentSessions() error
LoadAgentSessions loads active agent sessions from the database into memory.
func (*ControlService) MetricsStore ¶
func (s *ControlService) MetricsStore() *reg.MetricsStore
MetricsStore returns the metrics store.
func (*ControlService) SetDefaultSessionTTL ¶
func (s *ControlService) SetDefaultSessionTTL(ttl time.Duration)
SetDefaultSessionTTL sets the default session TTL.
func (*ControlService) SetLogger ¶
func (s *ControlService) SetLogger(logger *slog.Logger)
SetLogger sets the logger.
func (*ControlService) SetTaskStore ¶
func (s *ControlService) SetTaskStore(store *tasks.Store)
func (*ControlService) SetUpstreamHandler ¶
func (s *ControlService) SetUpstreamHandler(h Handler)
SetUpstreamHandler sets an upstream handler for forwarding requests.
func (*ControlService) StartBackgroundTasks ¶
func (s *ControlService) StartBackgroundTasks()
StartBackgroundTasks starts background maintenance loops (DB loading, metrics pruning, etc.).
func (*ControlService) Store ¶
func (s *ControlService) Store() *reg.Store
Store returns the registry store.
func (*ControlService) SystemInfoCache ¶
func (s *ControlService) SystemInfoCache() *reg.SystemInfoCache
SystemInfoCache returns the system info cache.
func (*ControlService) TransportHandler ¶
func (s *ControlService) TransportHandler() transportcore.Handler
TransportHandler exposes the control-plane request handler for TCP transport.
type Handler ¶
type Handler interface {
HandleRegister(ctx context.Context, req *agentv1.RegisterRequest) (*agentv1.RegisterResponse, error)
HandleHeartbeat(ctx context.Context, req *agentv1.HeartbeatRequest) (*agentv1.HeartbeatResponse, error)
HandleRegisterCapabilities(ctx context.Context, req *agentv1.RegisterCapabilitiesRequest) (*agentv1.RegisterCapabilitiesResponse, error)
}
Handler handles control service requests.
type ListenAddr ¶
type ListenAddr struct {
Addr string // Raw address (e.g., ":19090", "ipc://croupier-server")
Transport string // Transport type: "tcp", "ipc", etc.
URL string // Full URL (e.g., "tcp://:19090", "ipc://croupier-server")
}
ListenAddr represents a single listen address with transport type.
func ParseListenAddr ¶
func ParseListenAddr(addr string) ListenAddr
ParseListenAddr parses a string address into a ListenAddr.
type SessionResolverAdapter ¶
type SessionResolverAdapter struct {
// contains filtered or unexported fields
}
SessionResolverAdapter adapts AgentSessionStore to dispatch.AgentSessionResolver.
func NewSessionResolverAdapter ¶
func NewSessionResolverAdapter(store *AgentSessionStore) *SessionResolverAdapter
NewSessionResolverAdapter creates a new adapter.
func (*SessionResolverAdapter) ResolveAgentConn ¶
func (a *SessionResolverAdapter) ResolveAgentConn(agentID string) (transport.SessionCaller, bool)
ResolveAgentConn implements dispatch.AgentSessionResolver.
type TCPListener ¶
type TCPListener struct {
// contains filtered or unexported fields
}
TCPListener accepts Agent TCP sessions and creates AgentSession entries.
Lifecycle:
- Agent dials in → first frame must be RegisterRequest
- Server validates and creates AgentSession → stores in AgentSessionStore
- Subsequent Heartbeat/Invoke/StartTask/CancelTask flow through MuxConn
- On disconnect, session is removed from store
func NewTCPListener ¶
func NewTCPListener(config *TCPListenerConfig, sessionStore *AgentSessionStore, registry *reg.Store, logger *slog.Logger) (*TCPListener, error)
NewTCPListener creates a new TCP session listener.
func (*TCPListener) Addr ¶
func (l *TCPListener) Addr() string
Addr returns the bound listener address.
func (*TCPListener) Close ¶
func (l *TCPListener) Close() error
Close stops accepting new connections and waits for active ones to finish.
func (*TCPListener) IsClosed ¶
func (l *TCPListener) IsClosed() bool
IsClosed reports whether the listener has been closed.
func (*TCPListener) Serve ¶
func (l *TCPListener) Serve(ctx context.Context) error
Serve accepts connections until ctx is done or the listener is closed.
func (*TCPListener) SessionStore ¶
func (l *TCPListener) SessionStore() *AgentSessionStore
SessionStore returns the Agent session store.
func (*TCPListener) SetHandler ¶
func (l *TCPListener) SetHandler(h *ControlService)
SetHandler sets the control service for processing register/heartbeat.
type TCPListenerConfig ¶
type TCPListenerConfig struct {
// Address is the listen address (e.g., ":19090").
Address string
// Insecure controls whether TLS is disabled.
Insecure bool
// TLS files.
CertFile string
KeyFile string
CAFile string
// Timeouts.
RecvTimeout time.Duration
SendTimeout time.Duration
}
TCPListenerConfig holds the configuration for the Server-side TCP session listener.