server

package
v0.1.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2026 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Overview

Package server implements Server-side session management for connected Agents.

Package server provides the Server-side control plane for Croupier.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsLocalTCP

func IsLocalTCP(addr string) bool

IsLocalTCP checks if an address is a local TCP address.

Types

type AgentSession

type AgentSession struct {

	// AgentID is the unique identifier of the connected Agent.
	AgentID string

	// SessionID is the unique session identifier assigned on registration.
	SessionID string

	// GameID is the game this agent belongs to.
	GameID string

	// Env is the deployment environment (e.g., "dev", "prod").
	Env string

	// Version is the agent's reported version.
	Version string

	// RPCAddr mirrors the legacy compatibility address published during register.
	// Session routing should prefer the live TCP session instead of this field.
	RPCAddr string

	// ConnectedAt is the time the session was established.
	ConnectedAt time.Time

	// LastSeen is the most recent heartbeat or activity time.
	LastSeen atomic.Int64 // Unix timestamp
	// contains filtered or unexported fields
}

AgentSession represents an established session with a connected Agent. Server dispatches Invoke/StartTask/CancelTask requests through this session.

func (*AgentSession) Close

func (s *AgentSession) Close() error

Close closes the underlying connection.

func (*AgentSession) Conn

func (s *AgentSession) Conn() *tcptr.MuxConn

Conn returns the underlying MuxConn for sending requests to this Agent.

func (*AgentSession) GetLastSeen

func (s *AgentSession) GetLastSeen() time.Time

GetLastSeen returns the LastSeen time.

func (*AgentSession) UpdateLastSeen

func (s *AgentSession) UpdateLastSeen()

UpdateLastSeen updates the LastSeen timestamp to now.

type AgentSessionLoader

type AgentSessionLoader interface {
	LoadActiveSessions(ctx context.Context) ([]*reg.AgentSession, error)
	Upsert(ctx context.Context, sess *reg.AgentSession) error
	DeleteExpired(ctx context.Context) (int64, error)
}

AgentSessionLoader defines the interface for loading and managing agent sessions from a database.

type AgentSessionStore

type AgentSessionStore struct {
	// contains filtered or unexported fields
}

AgentSessionStore manages active Agent sessions. It provides thread-safe CRUD operations indexed by AgentID.

func NewAgentSessionStore

func NewAgentSessionStore() *AgentSessionStore

NewAgentSessionStore creates a new AgentSessionStore.

func (*AgentSessionStore) Add

func (store *AgentSessionStore) Add(sess *AgentSession) error

Add registers a new Agent session. Returns an error if a session with the same AgentID already exists.

func (*AgentSessionStore) Count

func (store *AgentSessionStore) Count() int

Count returns the number of active sessions.

func (*AgentSessionStore) Get

func (store *AgentSessionStore) Get(agentID string) (*AgentSession, bool)

Get returns the Agent session by AgentID.

func (*AgentSessionStore) List

func (store *AgentSessionStore) List() []*AgentSession

List returns all active Agent sessions.

func (*AgentSessionStore) PruneStale

func (store *AgentSessionStore) PruneStale(ttl time.Duration) int

PruneStale removes sessions that haven't been seen within the given TTL.

func (*AgentSessionStore) Remove

func (store *AgentSessionStore) Remove(agentID string)

Remove removes an Agent session by AgentID and closes its connection.

func (*AgentSessionStore) ResolveAgentConn

func (store *AgentSessionStore) ResolveAgentConn(agentID string) (*tcptr.MuxConn, bool)

ResolveAgentConn returns the MuxConn for an agent's active TCP session. This implements the dispatch.AgentSessionResolver interface. Returns the MuxConn (which has Call()) or false if no session exists.

func (*AgentSessionStore) ResolveSessionCaller

func (store *AgentSessionStore) ResolveSessionCaller(agentID string) (transport.SessionCaller, bool)

ResolveSessionCaller returns a SessionCaller for dispatch.AgentSessionResolver.

func (*AgentSessionStore) Upsert

func (store *AgentSessionStore) Upsert(sess *AgentSession)

Upsert adds or replaces an Agent session.

type ControlHandler

type ControlHandler struct {
	// contains filtered or unexported fields
}

ControlHandler wraps ControlService to implement the Handler interface.

func NewControlHandler

func NewControlHandler(service *ControlService) *ControlHandler

NewControlHandler creates a new control handler.

func (*ControlHandler) HandleHeartbeat

func (*ControlHandler) HandleRegister

type ControlService

type ControlService struct {
	// contains filtered or unexported fields
}

ControlService implements the control-plane business logic (register, heartbeat, capabilities). It is transport-agnostic — TCPListener delegates inbound frames to this service.

func NewControlService

func NewControlService(registry *reg.Store, loader AgentSessionLoader) *ControlService

NewControlService creates a new ControlService.

func (*ControlService) GetStats

func (s *ControlService) GetStats() map[string]interface{}

GetStats returns server statistics.

func (*ControlService) LoadAgentSessions

func (s *ControlService) LoadAgentSessions() error

LoadAgentSessions loads active agent sessions from the database into memory.

func (*ControlService) MetricsStore

func (s *ControlService) MetricsStore() *reg.MetricsStore

MetricsStore returns the metrics store.

func (*ControlService) SetDefaultSessionTTL

func (s *ControlService) SetDefaultSessionTTL(ttl time.Duration)

SetDefaultSessionTTL sets the default session TTL.

func (*ControlService) SetLogger

func (s *ControlService) SetLogger(logger *slog.Logger)

SetLogger sets the logger.

func (*ControlService) SetTaskStore

func (s *ControlService) SetTaskStore(store *tasks.Store)

func (*ControlService) SetUpstreamHandler

func (s *ControlService) SetUpstreamHandler(h Handler)

SetUpstreamHandler sets an upstream handler for forwarding requests.

func (*ControlService) StartBackgroundTasks

func (s *ControlService) StartBackgroundTasks()

StartBackgroundTasks starts background maintenance loops (DB loading, metrics pruning, etc.).

func (*ControlService) Stop

func (s *ControlService) Stop()

Stop cancels background goroutines.

func (*ControlService) Store

func (s *ControlService) Store() *reg.Store

Store returns the registry store.

func (*ControlService) SystemInfoCache

func (s *ControlService) SystemInfoCache() *reg.SystemInfoCache

SystemInfoCache returns the system info cache.

func (*ControlService) TransportHandler

func (s *ControlService) TransportHandler() transportcore.Handler

TransportHandler exposes the control-plane request handler for TCP transport.

type Handler

type Handler interface {
	HandleRegister(ctx context.Context, req *agentv1.RegisterRequest) (*agentv1.RegisterResponse, error)
	HandleHeartbeat(ctx context.Context, req *agentv1.HeartbeatRequest) (*agentv1.HeartbeatResponse, error)
	HandleRegisterCapabilities(ctx context.Context, req *agentv1.RegisterCapabilitiesRequest) (*agentv1.RegisterCapabilitiesResponse, error)
}

Handler handles control service requests.

type ListenAddr

type ListenAddr struct {
	Addr      string // Raw address (e.g., ":19090", "ipc://croupier-server")
	Transport string // Transport type: "tcp", "ipc", etc.
	URL       string // Full URL (e.g., "tcp://:19090", "ipc://croupier-server")
}

ListenAddr represents a single listen address with transport type.

func ParseListenAddr

func ParseListenAddr(addr string) ListenAddr

ParseListenAddr parses a string address into a ListenAddr.

type SessionResolverAdapter

type SessionResolverAdapter struct {
	// contains filtered or unexported fields
}

SessionResolverAdapter adapts AgentSessionStore to dispatch.AgentSessionResolver.

func NewSessionResolverAdapter

func NewSessionResolverAdapter(store *AgentSessionStore) *SessionResolverAdapter

NewSessionResolverAdapter creates a new adapter.

func (*SessionResolverAdapter) ResolveAgentConn

func (a *SessionResolverAdapter) ResolveAgentConn(agentID string) (transport.SessionCaller, bool)

ResolveAgentConn implements dispatch.AgentSessionResolver.

type TCPListener

type TCPListener struct {
	// contains filtered or unexported fields
}

TCPListener accepts Agent TCP sessions and creates AgentSession entries.

Lifecycle:

  1. Agent dials in → first frame must be RegisterRequest
  2. Server validates and creates AgentSession → stores in AgentSessionStore
  3. Subsequent Heartbeat/Invoke/StartTask/CancelTask flow through MuxConn
  4. On disconnect, session is removed from store

func NewTCPListener

func NewTCPListener(config *TCPListenerConfig, sessionStore *AgentSessionStore, registry *reg.Store, logger *slog.Logger) (*TCPListener, error)

NewTCPListener creates a new TCP session listener.

func (*TCPListener) Addr

func (l *TCPListener) Addr() string

Addr returns the bound listener address.

func (*TCPListener) Close

func (l *TCPListener) Close() error

Close stops accepting new connections and waits for active ones to finish.

func (*TCPListener) IsClosed

func (l *TCPListener) IsClosed() bool

IsClosed reports whether the listener has been closed.

func (*TCPListener) Serve

func (l *TCPListener) Serve(ctx context.Context) error

Serve accepts connections until ctx is done or the listener is closed.

func (*TCPListener) SessionStore

func (l *TCPListener) SessionStore() *AgentSessionStore

SessionStore returns the Agent session store.

func (*TCPListener) SetHandler

func (l *TCPListener) SetHandler(h *ControlService)

SetHandler sets the control service for processing register/heartbeat.

type TCPListenerConfig

type TCPListenerConfig struct {
	// Address is the listen address (e.g., ":19090").
	Address string

	// Insecure controls whether TLS is disabled.
	Insecure bool

	// TLS files.
	CertFile string
	KeyFile  string
	CAFile   string

	// Timeouts.
	RecvTimeout time.Duration
	SendTimeout time.Duration
}

TCPListenerConfig holds the configuration for the Server-side TCP session listener.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL