Documentation
¶
Index ¶
- Constants
- func BuildHTTPRequest(c *fiber.Ctx) (*http.Request, error)
- func CheckWritePermissions(c *fiber.Ctx, rbacManager RBACChecker, logger zerolog.Logger, database string, ...) error
- func CopyResponse(c *fiber.Ctx, resp *http.Response) error
- func GetDecompBufferDiscards() int64
- func HandleRoutingError(c *fiber.Ctx, err error) error
- func HandleShardRoutingError(c *fiber.Ctx, err error) error
- func ListenAddr(host string, port int) (string, string)
- func OptimizeLikePatterns(sql string) (string, bool)
- func RewriteRegexToStringFuncs(sql string) (string, bool)
- func RouteShardedQuery(shardRouter *sharding.ShardRouter, database string, c *fiber.Ctx) (*http.Response, error)
- func RouteShardedWrite(shardRouter *sharding.ShardRouter, c *fiber.Ctx) (*http.Response, error)
- func ShouldForwardQuery(router *cluster.Router, c *fiber.Ctx) bool
- func ShouldForwardWrite(router *cluster.Router, c *fiber.Ctx) bool
- func ValidateSQLRequest(sql string) error
- type AuditHandler
- type AuthHandler
- type AuthManager
- type BackupHandler
- func (h *BackupHandler) CreateBackup(c *fiber.Ctx) error
- func (h *BackupHandler) DeleteBackup(c *fiber.Ctx) error
- func (h *BackupHandler) GetBackup(c *fiber.Ctx) error
- func (h *BackupHandler) GetStatus(c *fiber.Ctx) error
- func (h *BackupHandler) ListBackups(c *fiber.Ctx) error
- func (h *BackupHandler) RegisterRoutes(app fiber.Router)
- func (h *BackupHandler) RestoreBackup(c *fiber.Ctx) error
- type CQCoordinator
- type CQExecution
- type CQSchedulerInterface
- type CQSchedulerReloader
- type CacheInvalidateHandler
- type ClusterHandler
- type CompactionHandler
- type ContinuousQuery
- type ContinuousQueryHandler
- func (h *ContinuousQueryHandler) Close() error
- func (h *ContinuousQueryHandler) ExecuteCQ(ctx context.Context, queryID int64) (*ExecuteCQResponse, error)
- func (h *ContinuousQueryHandler) GetActiveCQs() ([]ContinuousQuery, error)
- func (h *ContinuousQueryHandler) GetCQ(queryID int64) (*ContinuousQuery, error)
- func (h *ContinuousQueryHandler) RegisterRoutes(app *fiber.App)
- func (h *ContinuousQueryHandler) SetCoordinator(c CQCoordinator)
- func (h *ContinuousQueryHandler) SetScheduler(s CQSchedulerReloader)
- type ContinuousQueryRequest
- type CreateBackupRequest
- type CreateDatabaseRequest
- type CreateTokenRequest
- type DatabaseInfo
- type DatabaseListResponse
- type DatabaseMeasurement
- type DatabasesHandler
- type DebugHandler
- type DeleteConfigResponse
- type DeleteCoordinator
- type DeleteHandler
- type DeleteRequest
- type DeleteResponse
- type EstimateResponse
- type ExecuteCQRequest
- type ExecuteCQResponse
- type ExecuteRetentionRequest
- type ExecuteRetentionResponse
- type GovernanceHandler
- func (h *GovernanceHandler) CreatePolicy(c *fiber.Ctx) error
- func (h *GovernanceHandler) DeletePolicy(c *fiber.Ctx) error
- func (h *GovernanceHandler) GetPolicy(c *fiber.Ctx) error
- func (h *GovernanceHandler) GetUsage(c *fiber.Ctx) error
- func (h *GovernanceHandler) ListPolicies(c *fiber.Ctx) error
- func (h *GovernanceHandler) RegisterRoutes(app fiber.Router)
- func (h *GovernanceHandler) UpdatePolicy(c *fiber.Ctx) error
- type ImportHandler
- type ImportResult
- type LPImportResult
- type LineProtocolHandler
- func (h *LineProtocolHandler) Flush(c *fiber.Ctx) error
- func (h *LineProtocolHandler) GetStats() map[string]int64
- func (h *LineProtocolHandler) Health(c *fiber.Ctx) error
- func (h *LineProtocolHandler) RegisterRoutes(app *fiber.App)
- func (h *LineProtocolHandler) SetAuthAndRBAC(authManager *auth.AuthManager, rbacManager RBACChecker)
- func (h *LineProtocolHandler) SetRouter(router *cluster.Router)
- func (h *LineProtocolHandler) Stats(c *fiber.Ctx) error
- func (h *LineProtocolHandler) WriteInfluxDB(c *fiber.Ctx) error
- func (h *LineProtocolHandler) WriteSimple(c *fiber.Ctx) error
- func (h *LineProtocolHandler) WriteV1(c *fiber.Ctx) error
- type MQTTHandler
- type MQTTSubscriptionHandler
- type MeasurementInfo
- type MeasurementListResponse
- type MsgPackHandler
- type ParallelQueryInfo
- type PolicyRequest
- type PooledBuffer
- type QueryHandler
- func (h *QueryHandler) InvalidateCaches()
- func (h *QueryHandler) QueryGate503Total() int64
- func (h *QueryHandler) RegisterRoutes(app *fiber.App)
- func (h *QueryHandler) SetAuthAndRBAC(am AuthManager, rm RBACChecker)
- func (h *QueryHandler) SetCluster(coordinator *cluster.Coordinator, queryGateOnCatchup bool)
- func (h *QueryHandler) SetGovernance(manager *governance.Manager, lc *license.Client)
- func (h *QueryHandler) SetQueryRegistry(registry *queryregistry.Registry)
- func (h *QueryHandler) SetRouter(router *cluster.Router)
- func (h *QueryHandler) SetTieringManager(manager *tiering.Manager)
- func (h *QueryHandler) StartBackgroundWorkers(ctx context.Context)
- type QueryManagementHandler
- type QueryRequest
- type QueryResponse
- type RBACChecker
- type RBACHandler
- type ReconciliationHandler
- type ReconciliationSchedulerInterface
- type RestoreRequest
- type RetentionCoordinator
- type RetentionExecution
- type RetentionHandler
- func (h *RetentionHandler) Close() error
- func (h *RetentionHandler) ExecutePolicy(ctx context.Context, policyID int64) (*ExecuteRetentionResponse, error)
- func (h *RetentionHandler) GetActivePolicies() ([]RetentionPolicy, error)
- func (h *RetentionHandler) GetPolicy(policyID int64) (*RetentionPolicy, error)
- func (h *RetentionHandler) RegisterRoutes(app *fiber.App)
- func (h *RetentionHandler) SetCoordinator(c RetentionCoordinator)
- type RetentionPolicy
- type RetentionPolicyRequest
- type RetentionSchedulerInterface
- type SQLValidationError
- type SchedulerHandler
- type Server
- func (s *Server) GetApp() *fiber.App
- func (s *Server) GetMaxPayloadSize() int64
- func (s *Server) MarkNotReady()
- func (s *Server) MarkReady()
- func (s *Server) RegisterRoutes()
- func (s *Server) Shutdown(timeout time.Duration) error
- func (s *Server) Start() error
- func (s *Server) WaitForShutdown(shutdownTimeout time.Duration)
- type ServerConfig
- type TLEHandler
- type TLEImportResult
- type TableReference
- type TieringHandler
- func (h *TieringHandler) GetFiles(c *fiber.Ctx) error
- func (h *TieringHandler) GetStats(c *fiber.Ctx) error
- func (h *TieringHandler) GetStatus(c *fiber.Ctx) error
- func (h *TieringHandler) RegisterRoutes(app fiber.Router)
- func (h *TieringHandler) ScanFiles(c *fiber.Ctx) error
- func (h *TieringHandler) TriggerMigration(c *fiber.Ctx) error
- type TieringPoliciesHandler
- func (h *TieringPoliciesHandler) DeletePolicy(c *fiber.Ctx) error
- func (h *TieringPoliciesHandler) GetEffectivePolicy(c *fiber.Ctx) error
- func (h *TieringPoliciesHandler) GetPolicy(c *fiber.Ctx) error
- func (h *TieringPoliciesHandler) ListPolicies(c *fiber.Ctx) error
- func (h *TieringPoliciesHandler) RegisterRoutes(app fiber.Router)
- func (h *TieringPoliciesHandler) SetPolicy(c *fiber.Ctx) error
- type TriggerMigrationRequest
- type UpdateTokenRequest
Constants ¶
const CacheInvalidatePath = "/api/v1/internal/cache/invalidate"
CacheInvalidatePath is the single source of truth for the /api/v1/internal/cache/invalidate route. The receiver registers it in CacheInvalidateHandler.Register; the cluster post-compaction sender (cmd/arc/main.go) and the auth middleware's PublicRoutes config both reference this constant so a future typo cannot silently break fan-out.
const ForwardedByHeader = "X-Arc-Forwarded-By"
ForwardedByHeader is the header used to detect forwarded requests and prevent routing loops.
const ShardRoutedHeader = "X-Arc-Shard-Routed"
ShardRoutedHeader indicates a request has been routed by the shard router.
Variables ¶
This section is empty.
Functions ¶
func BuildHTTPRequest ¶
BuildHTTPRequest converts a Fiber context to a net/http Request for forwarding via the router.
Lifecycle: every caller in this codebase (lineprotocol.go, msgpack.go, tle.go, query.go, routing.go's RouteShardedWrite/Query) invokes the returned *http.Request synchronously within the Fiber handler — they pass it to router.RouteWrite/RouteQuery (which blocks on http.Client.Do, and internally already buffers the body via io.ReadAll for retry support, see internal/cluster/router.go forwardRequest) or to shardRouter.RouteWrite/RouteQuery, then read+close the response body inside CopyResponse, then return. fasthttp does not recycle the RequestCtx until after the handler returns (fasthttp server.go releaseCtx is post-handler), so wrapping c.Body() in bytes.NewReader directly is safe and avoids a copy that can be large at max payload (up to 1GB). Using c.Context() (returns *fasthttp.RequestCtx which implements context.Context) is also correct here — it propagates client-disconnect cancellation to the forwarded request, which c.UserContext() (returns context.Background()) does NOT.
If a future refactor moves any of this work to a background goroutine or stream writer, those properties no longer hold and the caller must take a defensive copy + use a non-pooled context. Today's callers are all synchronous; revisit this if that changes.
func CheckWritePermissions ¶
func CheckWritePermissions(c *fiber.Ctx, rbacManager RBACChecker, logger zerolog.Logger, database string, measurements []string) error
CheckWritePermissions checks if the token has write permission for the database and measurements. This is a shared implementation used by both LineProtocol and MsgPack handlers.
func CopyResponse ¶
CopyResponse writes an HTTP response from the router to a Fiber context. It copies the status code, headers (excluding hop-by-hop), and body.
func GetDecompBufferDiscards ¶
func GetDecompBufferDiscards() int64
GetDecompBufferDiscards returns the count of oversized buffers not returned to pool
func HandleRoutingError ¶
HandleRoutingError returns an appropriate error response for routing failures.
func HandleShardRoutingError ¶
HandleShardRoutingError returns an appropriate error response for shard routing failures.
func ListenAddr ¶
ListenAddr canonicalizes (host, port) into the host:port string callers hand to a listener — Fiber on the HTTP path, the cluster coordinator's advertise/peer logic on the cluster path.
Behavior:
- host="" preserves the historical wildcard (":<port>") so existing deployments keep dual-stack binding on Linux on upgrade.
- net.JoinHostPort adds IPv6 brackets when needed; surrounding brackets in user input are stripped first so we never produce an invalid double-bracketed address like "[[::1]]:8000".
The canonicalized host is returned alongside the address so callers can use it for logging without re-parsing. Exported so non-api callers (cmd/arc) format the same address shape this server does — historically this was duplicated with fmt.Sprintf("%s:%d", …) which mis-formatted IPv6 literals.
func OptimizeLikePatterns ¶
OptimizeLikePatterns rewrites SQL to reorder WHERE clause predicates for better query performance. Returns the optimized SQL and whether any changes were made.
func RewriteRegexToStringFuncs ¶
RewriteRegexToStringFuncs rewrites regex function calls to equivalent string functions. This provides 2x performance improvement for common patterns like URL domain extraction. Returns the rewritten SQL and whether any rewrites were applied.
func RouteShardedQuery ¶
func RouteShardedQuery(shardRouter *sharding.ShardRouter, database string, c *fiber.Ctx) (*http.Response, error)
RouteShardedQuery routes a query request using the shard router. Returns:
- (nil, nil) if request should be handled locally
- (*http.Response, nil) if request was forwarded successfully
- (nil, error) if routing failed
func RouteShardedWrite ¶
RouteShardedWrite routes a write request using the shard router. Returns:
- (nil, nil) if request should be handled locally
- (*http.Response, nil) if request was forwarded successfully
- (nil, error) if routing failed
func ShouldForwardQuery ¶
ShouldForwardQuery checks if a query request should be forwarded to another node. Returns true if the request should be forwarded (local node cannot handle queries). Returns false if:
- Router is nil (no clustering)
- Request is already forwarded (loop prevention)
- Local node can handle queries
func ShouldForwardWrite ¶
ShouldForwardWrite checks if a write request should be forwarded to another node. Returns true if the request should be forwarded (local node cannot handle writes). Returns false if:
- Router is nil (no clustering)
- Request is already forwarded (loop prevention)
- Local node can handle writes
func ValidateSQLRequest ¶
ValidateSQLRequest validates an SQL query for common issues. Returns nil if valid, or an error with appropriate message. This is a shared function used by multiple query endpoints.
SECURITY: The denylist regex must run on a normalised version of the SQL — comments stripped (so `DROP /* */ TABLE x` does not bypass the `\b...\b` token boundary) and string literals masked (so `SELECT 'DROP TABLE x'` does not false-positive). The cost is one full pass over the SQL; for the 10KB cap that's microseconds.
Types ¶
type AuditHandler ¶
type AuditHandler struct {
// contains filtered or unexported fields
}
AuditHandler handles audit log query endpoints
func NewAuditHandler ¶
func NewAuditHandler(auditLogger *audit.Logger, authManager *auth.AuthManager, licenseClient *license.Client, logger zerolog.Logger) *AuditHandler
NewAuditHandler creates a new audit handler
func (*AuditHandler) RegisterRoutes ¶
func (h *AuditHandler) RegisterRoutes(app *fiber.App)
RegisterRoutes registers audit log query endpoints
type AuthHandler ¶
type AuthHandler struct {
// contains filtered or unexported fields
}
AuthHandler handles authentication-related endpoints
func NewAuthHandler ¶
func NewAuthHandler(authManager *auth.AuthManager, logger zerolog.Logger) *AuthHandler
NewAuthHandler creates a new auth handler
func (*AuthHandler) RegisterRoutes ¶
func (h *AuthHandler) RegisterRoutes(app *fiber.App)
RegisterRoutes registers auth-related endpoints
func (*AuthHandler) RegisterTokenMembershipRoutes ¶
func (h *AuthHandler) RegisterTokenMembershipRoutes(app *fiber.App)
RegisterTokenMembershipRoutes registers token membership endpoints (requires RBAC manager)
func (*AuthHandler) SetRBACManager ¶
func (h *AuthHandler) SetRBACManager(rbacManager *auth.RBACManager)
SetRBACManager sets the RBAC manager for token membership endpoints
type AuthManager ¶
AuthManager interface for getting token info from context
type BackupHandler ¶
type BackupHandler struct {
// contains filtered or unexported fields
}
BackupHandler handles backup and restore API operations.
func NewBackupHandler ¶
func NewBackupHandler(manager *backup.Manager, authManager *auth.AuthManager, logger zerolog.Logger) *BackupHandler
NewBackupHandler creates a new backup handler.
func (*BackupHandler) CreateBackup ¶
func (h *BackupHandler) CreateBackup(c *fiber.Ctx) error
CreateBackup triggers a new backup. POST /api/v1/backup
func (*BackupHandler) DeleteBackup ¶
func (h *BackupHandler) DeleteBackup(c *fiber.Ctx) error
DeleteBackup removes a backup. DELETE /api/v1/backup/:id
func (*BackupHandler) GetBackup ¶
func (h *BackupHandler) GetBackup(c *fiber.Ctx) error
GetBackup returns the manifest for a specific backup. GET /api/v1/backup/:id
func (*BackupHandler) GetStatus ¶
func (h *BackupHandler) GetStatus(c *fiber.Ctx) error
GetStatus returns the progress of the current active operation. GET /api/v1/backup/status
func (*BackupHandler) ListBackups ¶
func (h *BackupHandler) ListBackups(c *fiber.Ctx) error
ListBackups returns all available backups. GET /api/v1/backup
func (*BackupHandler) RegisterRoutes ¶
func (h *BackupHandler) RegisterRoutes(app fiber.Router)
RegisterRoutes registers backup and restore API routes.
func (*BackupHandler) RestoreBackup ¶
func (h *BackupHandler) RestoreBackup(c *fiber.Ctx) error
RestoreBackup triggers a restore from a backup. POST /api/v1/backup/restore
type CQCoordinator ¶
CQCoordinator is the minimal cluster interface the CQ handler needs to gate manual execute requests to the primary writer. nil = standalone mode (no gate).
type CQExecution ¶
type CQExecution struct {
ID int64 `json:"id"`
QueryID int64 `json:"query_id"`
ExecutionID string `json:"execution_id"`
ExecutionTime string `json:"execution_time"`
Status string `json:"status"`
StartTime string `json:"start_time"`
EndTime string `json:"end_time"`
RecordsRead *int64 `json:"records_read"`
RecordsWritten int64 `json:"records_written"`
ExecutionDurationSeconds float64 `json:"execution_duration_seconds"`
ErrorMessage *string `json:"error_message"`
}
CQExecution represents an execution history record
type CQSchedulerInterface ¶
type CQSchedulerInterface interface {
Status() map[string]interface{}
ReloadAll() error
JobCount() int
}
CQSchedulerInterface defines the interface for CQ scheduler operations
type CQSchedulerReloader ¶
type CQSchedulerReloader interface {
ReloadCQ(cqID int64) error
// StartJobDirect schedules a job using data already in hand, avoiding a
// redundant SQLite read that could race with the just-committed INSERT.
StartJobDirect(cqID int64, name, interval string, isActive bool) error
}
CQSchedulerReloader is an interface for reloading CQ schedules after updates. This avoids a circular import between api and scheduler packages.
type CacheInvalidateHandler ¶
type CacheInvalidateHandler struct {
// contains filtered or unexported fields
}
CacheInvalidateHandler serves the post-compaction cluster cache-invalidate endpoint. It exists only when cluster mode is active AND cluster.shared_secret is configured — callers in cmd/arc/main.go construct it conditionally and skip Register entirely otherwise.
Closes audit finding #3 from 2026-05-19: the previous gate was a static header (`X-Arc-Internal: cache-invalidate`) which any network-reachable caller could replay, forcing DuckDB's cache_httpfs glob results to repopulate from S3 — cost amplification + p99 spikes with no auth.
The "unconfigured = no route" shape (rather than a runtime secret-string check inside the handler) means an unconfigured Arc instance returns 404 for this path — there is nothing to probe, not even a 403. Configured-but-misauthed callers still get a uniform 403 across every rejection branch (see handle).
Per-decision: the audit deliberately assigned NO CVE to this finding because OSS does not reach the endpoint (post-compaction invalidation is in-process via db.ClearHTTPCache) and the endpoint carries no secrets — it is only a cluster-mode "clear your caches now" signal. Do NOT retroactively file a CVE without re-evaluating the threat model.
func NewCacheInvalidateHandler ¶
func NewCacheInvalidateHandler( sharedSecret, clusterName, localNodeID string, nonceCache *security.NonceCache, tolerance time.Duration, onInvalidate func(), logger zerolog.Logger, ) *CacheInvalidateHandler
NewCacheInvalidateHandler constructs the handler. All arguments are required; the constructor panics on missing pieces so a misconfigured caller is loudly broken at startup instead of producing a silently half-armed endpoint. By construction, every field on the returned value is non-zero — handle() needs no nil-guards.
func (*CacheInvalidateHandler) Register ¶
func (h *CacheInvalidateHandler) Register(app fiber.Router)
Register wires POST CacheInvalidatePath. Callers MUST add CacheInvalidatePath to the auth middleware's PublicRoutes — the endpoint authenticates via HMAC, not via the user-token auth middleware (cluster peers do not carry user auth).
type ClusterHandler ¶
type ClusterHandler struct {
// contains filtered or unexported fields
}
ClusterHandler handles cluster management API endpoints.
func NewClusterHandler ¶
func NewClusterHandler( coordinator *cluster.Coordinator, authManager *auth.AuthManager, licenseClient *license.Client, logger zerolog.Logger, ) *ClusterHandler
NewClusterHandler creates a new cluster handler. The coordinator can be nil if clustering is not enabled.
func (*ClusterHandler) RegisterRoutes ¶
func (h *ClusterHandler) RegisterRoutes(app *fiber.App)
RegisterRoutes registers cluster API routes.
type CompactionHandler ¶
type CompactionHandler struct {
// contains filtered or unexported fields
}
CompactionHandler handles compaction API endpoints
func NewCompactionHandler ¶
func NewCompactionHandler(manager *compaction.Manager, hourlyScheduler, dailyScheduler *compaction.Scheduler, authManager *auth.AuthManager, logger zerolog.Logger) *CompactionHandler
NewCompactionHandler creates a new compaction handler
func (*CompactionHandler) RegisterRoutes ¶
func (h *CompactionHandler) RegisterRoutes(app *fiber.App)
RegisterRoutes registers compaction endpoints
type ContinuousQuery ¶
type ContinuousQuery struct {
ID int64 `json:"id"`
Name string `json:"name"`
Description *string `json:"description"`
Database string `json:"database"`
SourceMeasurement string `json:"source_measurement"`
DestinationMeasurement string `json:"destination_measurement"`
Query string `json:"query"`
Interval string `json:"interval"`
RetentionDays *int `json:"retention_days"`
DeleteSourceAfterDays *int `json:"delete_source_after_days"`
IsActive bool `json:"is_active"`
LastExecutionTime *string `json:"last_execution_time"`
LastExecutionStatus *string `json:"last_execution_status"`
LastProcessedTime *string `json:"last_processed_time"`
LastRecordsWritten *int64 `json:"last_records_written"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
ContinuousQuery represents a continuous query definition
type ContinuousQueryHandler ¶
type ContinuousQueryHandler struct {
// contains filtered or unexported fields
}
ContinuousQueryHandler handles continuous query operations
func NewContinuousQueryHandler ¶
func NewContinuousQueryHandler(db *database.DuckDB, storage storage.Backend, arrowBuffer *ingest.ArrowBuffer, cfg *config.ContinuousQueryConfig, authManager *auth.AuthManager, logger zerolog.Logger) (*ContinuousQueryHandler, error)
NewContinuousQueryHandler creates a new continuous query handler
func (*ContinuousQueryHandler) Close ¶
func (h *ContinuousQueryHandler) Close() error
Close closes the database connection
func (*ContinuousQueryHandler) ExecuteCQ ¶
func (h *ContinuousQueryHandler) ExecuteCQ(ctx context.Context, queryID int64) (*ExecuteCQResponse, error)
ExecuteCQ executes a continuous query by ID programmatically (used by scheduler) Returns the execution response and any error
func (*ContinuousQueryHandler) GetActiveCQs ¶
func (h *ContinuousQueryHandler) GetActiveCQs() ([]ContinuousQuery, error)
GetActiveCQs returns all active continuous queries (used by scheduler)
func (*ContinuousQueryHandler) GetCQ ¶
func (h *ContinuousQueryHandler) GetCQ(queryID int64) (*ContinuousQuery, error)
GetCQ returns a continuous query by ID (used by scheduler)
func (*ContinuousQueryHandler) RegisterRoutes ¶
func (h *ContinuousQueryHandler) RegisterRoutes(app *fiber.App)
RegisterRoutes registers continuous query endpoints
func (*ContinuousQueryHandler) SetCoordinator ¶
func (h *ContinuousQueryHandler) SetCoordinator(c CQCoordinator)
SetCoordinator sets the cluster coordinator for writer-gate checks. Called after coordinator creation since it depends on the handler.
func (*ContinuousQueryHandler) SetScheduler ¶
func (h *ContinuousQueryHandler) SetScheduler(s CQSchedulerReloader)
SetScheduler sets the CQ scheduler for reloading after updates. Called after scheduler creation since it depends on the handler.
type ContinuousQueryRequest ¶
type ContinuousQueryRequest struct {
Name string `json:"name"`
Description *string `json:"description"`
Database string `json:"database"`
SourceMeasurement string `json:"source_measurement"`
DestinationMeasurement string `json:"destination_measurement"`
Query string `json:"query"`
Interval string `json:"interval"`
RetentionDays *int `json:"retention_days"`
DeleteSourceAfterDays *int `json:"delete_source_after_days"`
IsActive bool `json:"is_active"`
}
ContinuousQueryRequest represents a request to create/update a CQ
type CreateBackupRequest ¶
type CreateBackupRequest struct {
IncludeMetadata *bool `json:"include_metadata"` // default: true
IncludeConfig *bool `json:"include_config"` // default: true
}
CreateBackupRequest is the request body for POST /api/v1/backup.
type CreateDatabaseRequest ¶
type CreateDatabaseRequest struct {
Name string `json:"name"`
}
CreateDatabaseRequest represents a request to create a new database
type CreateTokenRequest ¶
type CreateTokenRequest struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Permissions *[]string `json:"permissions,omitempty"` // nil = default (read,write), empty array = no permissions (RBAC-only)
ExpiresIn string `json:"expires_in,omitempty"` // e.g., "24h", "7d", "30d"
}
CreateTokenRequest represents a token creation request
type DatabaseInfo ¶
type DatabaseInfo struct {
Name string `json:"name"`
MeasurementCount int `json:"measurement_count"`
CreatedAt string `json:"created_at,omitempty"`
}
DatabaseInfo represents information about a database
type DatabaseListResponse ¶
type DatabaseListResponse struct {
Databases []DatabaseInfo `json:"databases"`
Count int `json:"count"`
}
DatabaseListResponse represents the response for listing databases
type DatabaseMeasurement ¶
type DatabaseMeasurement struct {
Name string `json:"name"`
FileCount int `json:"file_count,omitempty"`
}
DatabaseMeasurement represents a measurement within a database
type DatabasesHandler ¶
type DatabasesHandler struct {
// contains filtered or unexported fields
}
DatabasesHandler handles database management API endpoints
func NewDatabasesHandler ¶
func NewDatabasesHandler(storage storage.Backend, deleteConfig *config.DeleteConfig, authManager *auth.AuthManager, logger zerolog.Logger) *DatabasesHandler
NewDatabasesHandler creates a new databases handler
func (*DatabasesHandler) RegisterRoutes ¶
func (h *DatabasesHandler) RegisterRoutes(app *fiber.App)
RegisterRoutes registers the database management routes.
Read-only routes (list, get, list measurements) pass through the global any-valid-token middleware mounted in cmd/arc/main.go — listing what exists is appropriate for any authenticated caller.
Mutating routes (Create, Delete) require admin permission. The global middleware would accept any-valid-token; the route-level admin wrapper tightens that to admin only. Without it, a read- scoped token could provision/destroy databases — a privilege escalation by configuration.
`withAdminAuth` returns auth.RequireAdmin(am) when am is non-nil and a passthrough when am is nil (OSS / auth-disabled). This matches the convention in import.go, lineprotocol.go, msgpack.go, and tle.go — single source of truth for the nil-am branching.
func (*DatabasesHandler) SetTieringManager ¶
func (h *DatabasesHandler) SetTieringManager(tm *tiering.Manager)
SetTieringManager sets the tiering manager for multi-tier database/measurement listing. This is called after initialization when tiering is enabled and licensed.
type DebugHandler ¶
type DebugHandler struct {
// contains filtered or unexported fields
}
DebugHandler exposes process- and DuckDB-level memory diagnostics. Used to attribute RSS to Go heap, DuckDB native heap, or glibc arenas during support cases where the customer reports unexplained memory growth.
func NewDebugHandler ¶
func NewDebugHandler(db *database.DuckDB, authManager *auth.AuthManager, logger zerolog.Logger) *DebugHandler
func (*DebugHandler) RegisterRoutes ¶
func (h *DebugHandler) RegisterRoutes(app *fiber.App)
type DeleteConfigResponse ¶
type DeleteConfigResponse struct {
Enabled bool `json:"enabled"`
ConfirmationThreshold int `json:"confirmation_threshold"`
MaxRowsPerDelete int `json:"max_rows_per_delete"`
Implementation string `json:"implementation"`
PerformanceImpact map[string]string `json:"performance_impact"`
}
DeleteConfigResponse represents delete configuration info
type DeleteCoordinator ¶
type DeleteCoordinator interface {
BatchFileOpsInManifest(ops []raft.BatchFileOp) error
UpdateFileInManifest(file raft.FileEntry) error
GetFileEntry(path string) (*raft.FileEntry, bool)
IsPrimaryWriter() bool
Role() string
}
DeleteCoordinator is the minimal cluster interface the delete handler needs to gate execution to the primary writer and propagate file manifest changes. nil = standalone mode (no gate, manifest not updated).
type DeleteHandler ¶
type DeleteHandler struct {
// contains filtered or unexported fields
}
DeleteHandler handles delete operations using file rewrite strategy
func NewDeleteHandler ¶
func NewDeleteHandler(db *database.DuckDB, storage storage.Backend, cfg *config.DeleteConfig, authManager *auth.AuthManager, tempDir string, logger zerolog.Logger) *DeleteHandler
NewDeleteHandler creates a new delete handler. tempDir MUST be the same path cmd/arc/main.go added to the DuckDB sandbox's allowed_directories, otherwise the COPY ... TO inside rewriteS3File fails on S3-backed deployments. Logs a Warn on empty tempDir so misconfigured deployments surface the issue at startup rather than at the first DELETE.
func (*DeleteHandler) RegisterRoutes ¶
func (h *DeleteHandler) RegisterRoutes(app *fiber.App)
RegisterRoutes registers delete endpoints
func (*DeleteHandler) SetCoordinator ¶
func (h *DeleteHandler) SetCoordinator(c DeleteCoordinator)
SetCoordinator wires the cluster coordinator for manifest updates and role gating. Called after construction when cluster mode is enabled.
type DeleteRequest ¶
type DeleteRequest struct {
Database string `json:"database"`
Measurement string `json:"measurement"`
Where string `json:"where"`
DryRun bool `json:"dry_run"`
Confirm bool `json:"confirm"`
}
DeleteRequest represents a delete operation request
type DeleteResponse ¶
type DeleteResponse struct {
Success bool `json:"success"`
DeletedCount int64 `json:"deleted_count"`
AffectedFiles int `json:"affected_files"`
RewrittenFiles int `json:"rewritten_files"`
ExecutionTimeMs float64 `json:"execution_time_ms"`
DryRun bool `json:"dry_run"`
FilesProcessed []string `json:"files_processed"`
FailedFiles []string `json:"failed_files,omitempty"`
Error string `json:"error,omitempty"`
}
DeleteResponse represents a delete operation response
type EstimateResponse ¶
type EstimateResponse struct {
Success bool `json:"success"`
EstimatedRows *int64 `json:"estimated_rows"`
WarningLevel string `json:"warning_level"`
WarningMessage string `json:"warning_message,omitempty"`
ExecutionTimeMs float64 `json:"execution_time_ms"`
Error string `json:"error,omitempty"`
}
EstimateResponse represents the response for query estimation
type ExecuteCQRequest ¶
type ExecuteCQRequest struct {
StartTime *string `json:"start_time"`
EndTime *string `json:"end_time"`
DryRun bool `json:"dry_run"`
}
ExecuteCQRequest represents a request to execute a CQ
type ExecuteCQResponse ¶
type ExecuteCQResponse struct {
QueryID int64 `json:"query_id"`
QueryName string `json:"query_name"`
ExecutionID string `json:"execution_id"`
Status string `json:"status"`
StartTime string `json:"start_time"`
EndTime string `json:"end_time"`
RecordsRead *int64 `json:"records_read"`
RecordsWritten int64 `json:"records_written"`
ExecutionTimeSeconds float64 `json:"execution_time_seconds"`
DestinationMeasurement string `json:"destination_measurement"`
DryRun bool `json:"dry_run"`
ExecutedAt string `json:"executed_at"`
ExecutedQuery string `json:"executed_query,omitempty"`
}
ExecuteCQResponse represents the result of executing a CQ
type ExecuteRetentionRequest ¶
ExecuteRetentionRequest represents a request to execute a policy
type ExecuteRetentionResponse ¶
type ExecuteRetentionResponse struct {
PolicyID int64 `json:"policy_id"`
PolicyName string `json:"policy_name"`
DeletedCount int64 `json:"deleted_count"`
FilesDeleted int `json:"files_deleted"`
ExecutionTimeMs float64 `json:"execution_time_ms"`
DryRun bool `json:"dry_run"`
CutoffDate string `json:"cutoff_date"`
AffectedMeasurements []string `json:"affected_measurements"`
}
ExecuteRetentionResponse represents the result of executing a policy
type GovernanceHandler ¶
type GovernanceHandler struct {
// contains filtered or unexported fields
}
GovernanceHandler handles governance policy API operations.
func NewGovernanceHandler ¶
func NewGovernanceHandler(manager *governance.Manager, authManager *auth.AuthManager, licenseClient *license.Client, logger zerolog.Logger) *GovernanceHandler
NewGovernanceHandler creates a new governance handler.
func (*GovernanceHandler) CreatePolicy ¶
func (h *GovernanceHandler) CreatePolicy(c *fiber.Ctx) error
CreatePolicy creates a governance policy for a token. POST /api/v1/governance/policies
func (*GovernanceHandler) DeletePolicy ¶
func (h *GovernanceHandler) DeletePolicy(c *fiber.Ctx) error
DeletePolicy removes the governance policy for a specific token. DELETE /api/v1/governance/policies/:token_id
func (*GovernanceHandler) GetPolicy ¶
func (h *GovernanceHandler) GetPolicy(c *fiber.Ctx) error
GetPolicy returns the governance policy for a specific token. GET /api/v1/governance/policies/:token_id
func (*GovernanceHandler) GetUsage ¶
func (h *GovernanceHandler) GetUsage(c *fiber.Ctx) error
GetUsage returns current usage statistics for a specific token. GET /api/v1/governance/usage/:token_id
func (*GovernanceHandler) ListPolicies ¶
func (h *GovernanceHandler) ListPolicies(c *fiber.Ctx) error
ListPolicies returns all governance policies. GET /api/v1/governance/policies
func (*GovernanceHandler) RegisterRoutes ¶
func (h *GovernanceHandler) RegisterRoutes(app fiber.Router)
RegisterRoutes registers governance API routes.
func (*GovernanceHandler) UpdatePolicy ¶
func (h *GovernanceHandler) UpdatePolicy(c *fiber.Ctx) error
UpdatePolicy updates the governance policy for a specific token. PUT /api/v1/governance/policies/:token_id
type ImportHandler ¶
type ImportHandler struct {
// contains filtered or unexported fields
}
ImportHandler handles bulk CSV, Parquet, Line Protocol, and TLE file imports.
All formats parse rows in-process and ingest through the ArrowBuffer pipeline (the same path as streaming writes). No import path issues DuckDB queries against the uploaded file, so imports do not require the upload directory to be in the DuckDB sandbox allowlist. (The allowlist entry that still exists is retained solely for delete.go's S3 COPY ... TO staging — see cmd/arc/main.go.)
func NewImportHandler ¶
func NewImportHandler(logger zerolog.Logger) *ImportHandler
NewImportHandler creates a new ImportHandler. The ArrowBuffer must be set via SetArrowBuffer before any import is served.
func (*ImportHandler) RegisterRoutes ¶
func (h *ImportHandler) RegisterRoutes(app *fiber.App)
RegisterRoutes registers import API routes. Import endpoints write historical data and use admin-tier auth (not write-tier) — bulk imports can rewrite or supplant existing partitions, so the stricter gate is appropriate.
func (*ImportHandler) SetArrowBuffer ¶
func (h *ImportHandler) SetArrowBuffer(buf *ingest.ArrowBuffer)
SetArrowBuffer sets the ArrowBuffer for Line Protocol import
func (*ImportHandler) SetAuthAndRBAC ¶
func (h *ImportHandler) SetAuthAndRBAC(authManager *auth.AuthManager, rbacManager RBACChecker)
SetAuthAndRBAC sets the auth and RBAC managers. See MsgPackHandler.SetAuthAndRBAC for the full rationale.
type ImportResult ¶
type ImportResult struct {
Database string `json:"database"`
Measurement string `json:"measurement"`
RowsImported int64 `json:"rows_imported"`
PartitionsCreated int `json:"partitions_created"`
TimeRangeMin string `json:"time_range_min,omitempty"`
TimeRangeMax string `json:"time_range_max,omitempty"`
Columns []string `json:"columns"`
DurationMs int64 `json:"duration_ms"`
}
ImportResult holds the result of a bulk import operation
type LPImportResult ¶
type LPImportResult struct {
Database string `json:"database"`
Measurements []string `json:"measurements"`
RowsImported int64 `json:"rows_imported"`
Precision string `json:"precision"`
DurationMs int64 `json:"duration_ms"`
}
LPImportResult holds the result of a Line Protocol bulk import operation
type LineProtocolHandler ¶
type LineProtocolHandler struct {
// contains filtered or unexported fields
}
LineProtocolHandler handles Line Protocol write requests
func NewLineProtocolHandler ¶
func NewLineProtocolHandler(buffer *ingest.ArrowBuffer, logger zerolog.Logger) *LineProtocolHandler
NewLineProtocolHandler creates a new Line Protocol handler
func (*LineProtocolHandler) Flush ¶
func (h *LineProtocolHandler) Flush(c *fiber.Ctx) error
Flush triggers a buffer flush
func (*LineProtocolHandler) GetStats ¶
func (h *LineProtocolHandler) GetStats() map[string]int64
GetStats returns stats as a map (for programmatic access)
func (*LineProtocolHandler) Health ¶
func (h *LineProtocolHandler) Health(c *fiber.Ctx) error
Health returns health status
func (*LineProtocolHandler) RegisterRoutes ¶
func (h *LineProtocolHandler) RegisterRoutes(app *fiber.App)
RegisterRoutes registers Line Protocol routes. All write endpoints are gated by auth.RequireWrite when auth is enabled; the /flush endpoint is admin-gated because it forces a global flush across all shards (operationally heavy + spammable). Per CLAUDE.md, mutating endpoints MUST have an explicit auth middleware.
func (*LineProtocolHandler) SetAuthAndRBAC ¶
func (h *LineProtocolHandler) SetAuthAndRBAC(authManager *auth.AuthManager, rbacManager RBACChecker)
SetAuthAndRBAC sets the auth and RBAC managers. See MsgPackHandler.SetAuthAndRBAC for the full rationale.
func (*LineProtocolHandler) SetRouter ¶
func (h *LineProtocolHandler) SetRouter(router *cluster.Router)
SetRouter sets the cluster router for request forwarding. When set, write requests from reader nodes will be forwarded to writer nodes.
func (*LineProtocolHandler) Stats ¶
func (h *LineProtocolHandler) Stats(c *fiber.Ctx) error
Stats returns Line Protocol handler statistics
func (*LineProtocolHandler) WriteInfluxDB ¶
func (h *LineProtocolHandler) WriteInfluxDB(c *fiber.Ctx) error
WriteInfluxDB handles InfluxDB 2.x compatible write requests POST /api/v2/write?org=myorg&bucket=mybucket&precision=ns
func (*LineProtocolHandler) WriteSimple ¶
func (h *LineProtocolHandler) WriteSimple(c *fiber.Ctx) error
WriteSimple handles simple write requests without query parameters POST /api/v1/write/line-protocol
type MQTTHandler ¶
type MQTTHandler struct {
// contains filtered or unexported fields
}
MQTTHandler handles MQTT stats and health endpoints
func NewMQTTHandler ¶
func NewMQTTHandler(manager mqtt.Manager, authManager *auth.AuthManager, logger zerolog.Logger) *MQTTHandler
NewMQTTHandler creates a new MQTT handler
func (*MQTTHandler) RegisterRoutes ¶
func (h *MQTTHandler) RegisterRoutes(app *fiber.App)
RegisterRoutes registers the MQTT stats/health API routes
type MQTTSubscriptionHandler ¶
type MQTTSubscriptionHandler struct {
// contains filtered or unexported fields
}
MQTTSubscriptionHandler handles MQTT subscription REST API endpoints
func NewMQTTSubscriptionHandler ¶
func NewMQTTSubscriptionHandler(manager mqtt.Manager, authManager *auth.AuthManager, logger zerolog.Logger) *MQTTSubscriptionHandler
NewMQTTSubscriptionHandler creates a new MQTT subscription handler
func (*MQTTSubscriptionHandler) RegisterRoutes ¶
func (h *MQTTSubscriptionHandler) RegisterRoutes(app *fiber.App)
RegisterRoutes registers the MQTT subscription API routes.
The requireEnabled middleware is applied to the route group so every current and future endpoint is automatically guarded against a nil manager. This is the project policy for MQTT API surfaces — see also MQTTHandler in mqtt.go.
type MeasurementInfo ¶
type MeasurementInfo struct {
Database string `json:"database"`
Measurement string `json:"measurement"`
FileCount int `json:"file_count"`
TotalSizeMB float64 `json:"total_size_mb"`
StoragePath string `json:"storage_path"`
}
MeasurementInfo represents information about a measurement
type MeasurementListResponse ¶
type MeasurementListResponse struct {
Database string `json:"database"`
Measurements []DatabaseMeasurement `json:"measurements"`
Count int `json:"count"`
}
MeasurementListResponse represents the response for listing measurements
type MsgPackHandler ¶
type MsgPackHandler struct {
// contains filtered or unexported fields
}
MsgPackHandler handles MessagePack binary protocol endpoints
func NewMsgPackHandler ¶
func NewMsgPackHandler(logger zerolog.Logger, arrowBuffer *ingest.ArrowBuffer, maxPayloadSize int64) *MsgPackHandler
NewMsgPackHandler creates a new MessagePack handler
func (*MsgPackHandler) RegisterRoutes ¶
func (h *MsgPackHandler) RegisterRoutes(app *fiber.App)
RegisterRoutes registers MessagePack endpoints. The write endpoint is gated by auth.RequireWrite when an auth manager is configured — per CLAUDE.md, mutating endpoints MUST have an explicit write-tier auth middleware. Stats and spec remain at any-authenticated-token level (gated by the global auth middleware in main.go).
func (*MsgPackHandler) SetAuthAndRBAC ¶
func (h *MsgPackHandler) SetAuthAndRBAC(authManager *auth.AuthManager, rbacManager RBACChecker)
SetAuthAndRBAC sets the auth and RBAC managers. Takes the concrete *auth.AuthManager (not the local AuthManager interface) so route- level auth.RequireWrite middleware can be wired without a type assertion that could silently miss on a mock and disable protection. nil = auth disabled (OSS default).
func (*MsgPackHandler) SetRouter ¶
func (h *MsgPackHandler) SetRouter(router *cluster.Router)
SetRouter sets the cluster router for request forwarding. When set, write requests from reader nodes will be forwarded to writer nodes.
type ParallelQueryInfo ¶
type ParallelQueryInfo struct {
// Paths contains the partition paths to execute in parallel
Paths []string
// QueryTemplate is the SQL with {PARTITION_PATH} placeholder
QueryTemplate string
// ReadParquetOptions are the options to pass to read_parquet
ReadParquetOptions string
}
ParallelQueryInfo contains information for parallel partition execution. When set, the query should be executed using the parallel executor.
type PolicyRequest ¶
type PolicyRequest struct {
HotOnly bool `json:"hot_only,omitempty"` // Exclude from tiering entirely
HotMaxAgeDays *int `json:"hot_max_age_days,omitempty"` // nil = use global default
}
PolicyRequest represents a request to create/update a tiering policy 2-tier system: data older than HotMaxAgeDays moves from hot to cold
type PooledBuffer ¶
type PooledBuffer struct {
Data []byte // The decompressed data (slice of pooled buffer)
// contains filtered or unexported fields
}
PooledBuffer wraps a decompression buffer that must be returned to pool after use This enables zero-copy decompression by returning the pooled buffer directly
func (*PooledBuffer) Release ¶
func (pb *PooledBuffer) Release()
Release returns the buffer to the pool - MUST be called after use Safe to call multiple times (idempotent)
type QueryHandler ¶
type QueryHandler struct {
// contains filtered or unexported fields
}
QueryHandler handles SQL query endpoints
func NewQueryHandler ¶
func NewQueryHandler(db *database.DuckDB, storage storage.Backend, logger zerolog.Logger, queryTimeoutSeconds int, slowQueryThresholdMs int) *QueryHandler
NewQueryHandler creates a new query handler queryTimeoutSeconds: timeout for query execution in seconds (0 = no timeout) slowQueryThresholdMs: threshold in milliseconds for slow query WARN logging (0 = disabled)
func (*QueryHandler) InvalidateCaches ¶
func (h *QueryHandler) InvalidateCaches()
InvalidateCaches clears all internal caches (partition pruner and SQL transform cache). This should be called after compaction to prevent stale file references.
func (*QueryHandler) QueryGate503Total ¶
func (h *QueryHandler) QueryGate503Total() int64
QueryGate503Total returns the total number of times the catch-up gate has fired since process start. Zero when the gate is disabled or has never fired. Exposed for Prometheus / metrics dashboards to alert on a non-zero rate without inferring from HTTP error logs.
func (*QueryHandler) RegisterRoutes ¶
func (h *QueryHandler) RegisterRoutes(app *fiber.App)
RegisterRoutes registers query endpoints
func (*QueryHandler) SetAuthAndRBAC ¶
func (h *QueryHandler) SetAuthAndRBAC(am AuthManager, rm RBACChecker)
SetAuthAndRBAC sets the auth and RBAC managers for permission checking
func (*QueryHandler) SetCluster ¶
func (h *QueryHandler) SetCluster(coordinator *cluster.Coordinator, queryGateOnCatchup bool)
SetCluster wires the cluster coordinator and the query-gate-on-catchup behavior flag into the query handler. Consulted by checkReplicationReady to short-circuit queries with 503 while peer file replication is still draining (#392). Both arguments are optional — nil coordinator OR gate=false makes the gate a no-op.
Initialization-only: must be called once during process startup before the HTTP server starts accepting requests. Order relative to RegisterRoutes does not matter (Fiber resolves field reads at request time, and request handling does not begin until server.Start). Runtime re-wiring is not supported — the fields are read lock-free per request.
The explicit nil branch protects against the typed-nil interface trap: assigning a nil *cluster.Coordinator to an interface field yields a non-nil interface that panics on method call. The current caller in main.go is gated on clusterCoordinator != nil so the branch is dead today, but we keep it so a future caller passing nil literally can't break the gate.
func (*QueryHandler) SetGovernance ¶
func (h *QueryHandler) SetGovernance(manager *governance.Manager, lc *license.Client)
SetGovernance sets the governance manager and license client for query rate limiting and quotas.
func (*QueryHandler) SetQueryRegistry ¶
func (h *QueryHandler) SetQueryRegistry(registry *queryregistry.Registry)
SetQueryRegistry sets the query registry for long-running query management.
func (*QueryHandler) SetRouter ¶
func (h *QueryHandler) SetRouter(router *cluster.Router)
SetRouter sets the cluster router for request forwarding. When set, query requests from nodes that cannot handle queries will be forwarded. Note: Currently writers always process queries locally (CanQuery=true). This is provided for future extensibility (e.g., prefer_readers mode).
func (*QueryHandler) SetTieringManager ¶
func (h *QueryHandler) SetTieringManager(manager *tiering.Manager)
SetTieringManager sets the tiering manager for multi-tier query routing. When set, queries will check both hot and cold tiers for data.
func (*QueryHandler) StartBackgroundWorkers ¶
func (h *QueryHandler) StartBackgroundWorkers(ctx context.Context)
StartBackgroundWorkers spawns the long-lived goroutines the handler needs: today this is the partition pruner cache janitor, which sweeps expired entries from globCache + partitionCache so they don't accumulate over the process lifetime. (Both caches are TTL-only with no max-size cap and no read-side eviction — get() returns "expired" as a miss but leaves the stale entry in the map.)
The workers stop when ctx is cancelled. The handler itself remains usable after that, and InvalidateCaches() (called post-compaction) continues to reset both maps, but expired entries are no longer swept on a schedule — they accumulate until the next compaction flushes everything or the process exits. Production callers should pass a context tied to process lifetime via the shutdown coordinator.
type QueryManagementHandler ¶
type QueryManagementHandler struct {
// contains filtered or unexported fields
}
QueryManagementHandler handles query management API operations.
func NewQueryManagementHandler ¶
func NewQueryManagementHandler(registry *queryregistry.Registry, authManager *auth.AuthManager, licenseClient *license.Client, logger zerolog.Logger) *QueryManagementHandler
NewQueryManagementHandler creates a new query management handler.
func (*QueryManagementHandler) RegisterRoutes ¶
func (h *QueryManagementHandler) RegisterRoutes(app fiber.Router)
RegisterRoutes registers query management API routes.
type QueryRequest ¶
type QueryRequest struct {
SQL string `json:"sql"`
}
QueryRequest represents a SQL query request
type QueryResponse ¶
type QueryResponse struct {
Success bool `json:"success"`
Columns []string `json:"columns"`
Data [][]interface{} `json:"data"`
RowCount int `json:"row_count"`
ExecutionTimeMs float64 `json:"execution_time_ms"`
Timestamp string `json:"timestamp"`
Error string `json:"error,omitempty"`
Profile *database.QueryProfile `json:"profile,omitempty"`
}
QueryResponse represents a SQL query response
type RBACChecker ¶
type RBACChecker interface {
IsRBACEnabled() bool
CheckPermission(req *auth.PermissionCheckRequest) *auth.PermissionCheckResult
CheckPermissionsBatch(reqs []*auth.PermissionCheckRequest) []*auth.PermissionCheckResult
}
RBACChecker interface for RBAC permission checking
type RBACHandler ¶
type RBACHandler struct {
// contains filtered or unexported fields
}
RBACHandler handles RBAC-related endpoints
func NewRBACHandler ¶
func NewRBACHandler(authManager *auth.AuthManager, rbacManager *auth.RBACManager, logger zerolog.Logger) *RBACHandler
NewRBACHandler creates a new RBAC handler
func (*RBACHandler) RegisterRoutes ¶
func (h *RBACHandler) RegisterRoutes(app *fiber.App)
RegisterRoutes registers RBAC-related endpoints
type ReconciliationHandler ¶
type ReconciliationHandler struct {
// contains filtered or unexported fields
}
ReconciliationHandler exposes the reconciliation engine over HTTP. Mirrors the SchedulerHandler shape: GET /status open to any authenticated token, mutating routes wrapped with RequireAdmin.
func NewReconciliationHandler ¶
func NewReconciliationHandler( scheduler ReconciliationSchedulerInterface, licenseClient *license.Client, authManager *auth.AuthManager, logger zerolog.Logger, ) *ReconciliationHandler
NewReconciliationHandler constructs the handler. scheduler may be nil when reconciliation is not available (no clustering, no license, or not configured); in that case all routes return 503.
func (*ReconciliationHandler) RegisterRoutes ¶
func (h *ReconciliationHandler) RegisterRoutes(app *fiber.App)
RegisterRoutes wires the routes under /api/v1/reconciliation. Every route is gated by an explicit auth middleware per CLAUDE.md's "every new API endpoint MUST have auth middleware" rule, then by a license re-check (per CLAUDE.md "License middleware goes after auth middleware") on the mutating routes:
- /status — RequireRead. NOT license-gated so an expired cluster can still report its disabled state to the dashboard.
- /trigger — RequireAdmin + requireClusteringLicense (kicks off destructive cluster-wide deletes).
- /runs/:id — RequireAdmin + requireClusteringLicense (run details include sample paths).
The authManager-nil branch exists for tests and OSS no-auth deployments where the operator has explicitly disabled auth.
type ReconciliationSchedulerInterface ¶
type ReconciliationSchedulerInterface interface {
IsRunning() bool
Status() map[string]interface{}
TriggerNow(ctx context.Context, dryRun bool) (*reconciliation.Run, error)
FindRun(id string) (*reconciliation.Run, bool)
}
ReconciliationSchedulerInterface is the minimal surface the API needs. Defining it as an interface here (rather than depending on the concrete reconciliation.Scheduler) keeps the API package mockable in tests without dragging in the full reconciliation package.
type RestoreRequest ¶
type RestoreRequest struct {
BackupID string `json:"backup_id"`
RestoreData *bool `json:"restore_data"` // default: true
RestoreMetadata *bool `json:"restore_metadata"` // default: true
RestoreConfig *bool `json:"restore_config"` // default: false
Confirm bool `json:"confirm"` // must be true
}
RestoreRequest is the request body for POST /api/v1/backup/restore.
type RetentionCoordinator ¶
type RetentionCoordinator interface {
BatchFileOpsInManifest(ops []raft.BatchFileOp) error
// IsPrimaryWriter reports whether this node may execute writer-only mutations.
// Returns true unconditionally for standalone (coordinator is nil).
IsPrimaryWriter() bool
// Role returns a human-readable role string for log messages.
Role() string
}
RetentionCoordinator is the minimal cluster interface retention needs to propagate file deletes to the Raft manifest and gate execution to the primary writer. Using a minimal interface avoids a compile-time dependency on the cluster package.
type RetentionExecution ¶
type RetentionExecution struct {
ID int64 `json:"id"`
PolicyID int64 `json:"policy_id"`
ExecutionTime string `json:"execution_time"`
Status string `json:"status"`
DeletedCount int64 `json:"deleted_count"`
CutoffDate *string `json:"cutoff_date"`
ExecutionDurationMs float64 `json:"execution_duration_ms"`
ErrorMessage *string `json:"error_message"`
}
RetentionExecution represents an execution history record
type RetentionHandler ¶
type RetentionHandler struct {
// contains filtered or unexported fields
}
RetentionHandler handles retention policy operations
func NewRetentionHandler ¶
func NewRetentionHandler(storage storage.Backend, duckdb *database.DuckDB, cfg *config.RetentionConfig, licenseClient *license.Client, authManager *auth.AuthManager, logger zerolog.Logger) (*RetentionHandler, error)
NewRetentionHandler creates a new retention handler
func (*RetentionHandler) Close ¶
func (h *RetentionHandler) Close() error
Close closes the database connection
func (*RetentionHandler) ExecutePolicy ¶
func (h *RetentionHandler) ExecutePolicy(ctx context.Context, policyID int64) (*ExecuteRetentionResponse, error)
ExecutePolicy executes a retention policy by ID programmatically (used by scheduler) Returns the execution response and any error
func (*RetentionHandler) GetActivePolicies ¶
func (h *RetentionHandler) GetActivePolicies() ([]RetentionPolicy, error)
GetActivePolicies returns all active retention policies (used by scheduler)
func (*RetentionHandler) GetPolicy ¶
func (h *RetentionHandler) GetPolicy(policyID int64) (*RetentionPolicy, error)
GetPolicy returns a retention policy by ID (used by scheduler)
func (*RetentionHandler) RegisterRoutes ¶
func (h *RetentionHandler) RegisterRoutes(app *fiber.App)
RegisterRoutes registers retention endpoints
func (*RetentionHandler) SetCoordinator ¶
func (h *RetentionHandler) SetCoordinator(c RetentionCoordinator)
SetCoordinator wires the cluster coordinator for manifest updates. Called after construction when cluster mode is enabled.
type RetentionPolicy ¶
type RetentionPolicy struct {
ID int64 `json:"id"`
Name string `json:"name"`
Database string `json:"database"`
Measurement *string `json:"measurement"`
RetentionDays int `json:"retention_days"`
BufferDays int `json:"buffer_days"`
IsActive bool `json:"is_active"`
LastExecutionTime *string `json:"last_execution_time"`
LastExecutionStatus *string `json:"last_execution_status"`
LastDeletedCount *int64 `json:"last_deleted_count"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
RetentionPolicy represents a retention policy
type RetentionPolicyRequest ¶
type RetentionPolicyRequest struct {
Name string `json:"name"`
Database string `json:"database"`
Measurement *string `json:"measurement"`
RetentionDays int `json:"retention_days"`
BufferDays int `json:"buffer_days"`
IsActive bool `json:"is_active"`
}
RetentionPolicyRequest represents a request to create/update a policy
type RetentionSchedulerInterface ¶
type RetentionSchedulerInterface interface {
Status() map[string]interface{}
TriggerNow(ctx context.Context) error
}
RetentionSchedulerInterface defines the interface for retention scheduler operations
type SQLValidationError ¶
type SQLValidationError struct {
Message string
}
SQLValidationError represents an error from SQL validation
func (*SQLValidationError) Error ¶
func (e *SQLValidationError) Error() string
type SchedulerHandler ¶
type SchedulerHandler struct {
// contains filtered or unexported fields
}
SchedulerHandler handles scheduler status API endpoints
func NewSchedulerHandler ¶
func NewSchedulerHandler( cqScheduler CQSchedulerInterface, retentionScheduler RetentionSchedulerInterface, licenseClient *license.Client, authManager *auth.AuthManager, logger zerolog.Logger, ) *SchedulerHandler
NewSchedulerHandler creates a new scheduler handler
func (*SchedulerHandler) RegisterRoutes ¶
func (h *SchedulerHandler) RegisterRoutes(app *fiber.App)
RegisterRoutes registers scheduler API routes
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server represents the HTTP API server
func NewServer ¶
func NewServer(config *ServerConfig, logger zerolog.Logger) *Server
NewServer creates a new HTTP server with Fiber
func (*Server) GetMaxPayloadSize ¶
GetMaxPayloadSize returns the configured maximum payload size in bytes
func (*Server) MarkNotReady ¶
func (s *Server) MarkNotReady()
MarkNotReady flips the readiness flag to false. Called by the graceful-shutdown hook so the load balancer stops routing new traffic to this node before the HTTP listener closes. In-flight requests still drain via Fiber's normal shutdown handling.
func (*Server) MarkReady ¶
func (s *Server) MarkReady()
MarkReady flips the readiness flag to true. Called by main.go once all blocking startup work (notably WAL recovery in Pattern 2 shared-storage multi-writer mode) has completed and the server is safe for the load balancer to route traffic to.
func (*Server) RegisterRoutes ¶
func (s *Server) RegisterRoutes()
RegisterRoutes registers all API routes
func (*Server) WaitForShutdown ¶
WaitForShutdown blocks until shutdown signal is received
type ServerConfig ¶
type ServerConfig struct {
// Host is the bind address. Empty preserves the historical
// dual-stack wildcard (":<port>" on Linux binds both IPv4 and
// IPv6 via IPv4-mapped addresses). Set to a specific address
// (e.g. "127.0.0.1", "::1", "192.0.2.10") to restrict the bind.
// Explicit "0.0.0.0" forces IPv4-only.
Host string
Port int
ReadTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration
ShutdownTimeout time.Duration
MaxPayloadSize int64 // Maximum request payload size in bytes
// TLS Configuration
TLSEnabled bool
TLSCertFile string
TLSKeyFile string
}
ServerConfig holds server configuration
func DefaultServerConfig ¶
func DefaultServerConfig() *ServerConfig
DefaultServerConfig returns default server configuration
type TLEHandler ¶
type TLEHandler struct {
// contains filtered or unexported fields
}
TLEHandler handles streaming TLE (Two-Line Element) write requests. TLE data is parsed into the satellite_tle measurement with orbital elements as fields and satellite identifiers as tags.
func NewTLEHandler ¶
func NewTLEHandler(buffer *ingest.ArrowBuffer, logger zerolog.Logger) *TLEHandler
NewTLEHandler creates a new TLE handler
func (*TLEHandler) RegisterRoutes ¶
func (h *TLEHandler) RegisterRoutes(app *fiber.App)
RegisterRoutes registers TLE routes. The write endpoint is gated by auth.RequireWrite when auth is enabled — per CLAUDE.md, mutating endpoints MUST have an explicit write-tier auth middleware.
func (*TLEHandler) SetAuthAndRBAC ¶
func (h *TLEHandler) SetAuthAndRBAC(authManager *auth.AuthManager, rbacManager RBACChecker)
SetAuthAndRBAC sets the auth and RBAC managers. See MsgPackHandler.SetAuthAndRBAC for the full rationale.
func (*TLEHandler) SetRouter ¶
func (h *TLEHandler) SetRouter(router *cluster.Router)
SetRouter sets the cluster router for request forwarding. When set, write requests from reader nodes will be forwarded to writer nodes.
type TLEImportResult ¶
type TLEImportResult struct {
Database string `json:"database"`
Measurement string `json:"measurement"`
SatelliteCount int `json:"satellite_count"`
RowsImported int64 `json:"rows_imported"`
ParseWarnings []string `json:"parse_warnings,omitempty"`
DurationMs int64 `json:"duration_ms"`
}
TLEImportResult holds the result of a TLE bulk import operation
type TableReference ¶
TableReference represents a database.measurement reference extracted from SQL
type TieringHandler ¶
type TieringHandler struct {
// contains filtered or unexported fields
}
TieringHandler handles tiered storage API operations
func NewTieringHandler ¶
func NewTieringHandler(manager *tiering.Manager, authManager *auth.AuthManager, licenseClient *license.Client, logger zerolog.Logger) *TieringHandler
NewTieringHandler creates a new tiering handler
func (*TieringHandler) GetFiles ¶
func (h *TieringHandler) GetFiles(c *fiber.Ctx) error
GetFiles returns files by tier GET /api/v1/tiering/files Query params: tier (optional), database (optional), limit (optional)
func (*TieringHandler) GetStats ¶
func (h *TieringHandler) GetStats(c *fiber.Ctx) error
GetStats returns migration statistics GET /api/v1/tiering/stats
func (*TieringHandler) GetStatus ¶
func (h *TieringHandler) GetStatus(c *fiber.Ctx) error
GetStatus returns the current tiering status GET /api/v1/tiering/status
func (*TieringHandler) RegisterRoutes ¶
func (h *TieringHandler) RegisterRoutes(app fiber.Router)
RegisterRoutes registers tiering API routes
func (*TieringHandler) ScanFiles ¶
func (h *TieringHandler) ScanFiles(c *fiber.Ctx) error
ScanFiles scans the hot tier storage and registers all existing files POST /api/v1/tiering/scan
func (*TieringHandler) TriggerMigration ¶
func (h *TieringHandler) TriggerMigration(c *fiber.Ctx) error
TriggerMigration triggers a manual migration POST /api/v1/tiering/migrate
type TieringPoliciesHandler ¶
type TieringPoliciesHandler struct {
// contains filtered or unexported fields
}
TieringPoliciesHandler handles tiering policy API operations
func NewTieringPoliciesHandler ¶
func NewTieringPoliciesHandler(manager *tiering.Manager, authManager *auth.AuthManager, licenseClient *license.Client, logger zerolog.Logger) *TieringPoliciesHandler
NewTieringPoliciesHandler creates a new tiering policies handler
func (*TieringPoliciesHandler) DeletePolicy ¶
func (h *TieringPoliciesHandler) DeletePolicy(c *fiber.Ctx) error
DeletePolicy removes the custom policy for a database (reverts to global defaults) DELETE /api/v1/tiering/policies/:database
func (*TieringPoliciesHandler) GetEffectivePolicy ¶
func (h *TieringPoliciesHandler) GetEffectivePolicy(c *fiber.Ctx) error
GetEffectivePolicy returns the effective policy for a database (resolved with global defaults) GET /api/v1/tiering/policies/:database/effective
func (*TieringPoliciesHandler) GetPolicy ¶
func (h *TieringPoliciesHandler) GetPolicy(c *fiber.Ctx) error
GetPolicy returns the policy for a specific database GET /api/v1/tiering/policies/:database
func (*TieringPoliciesHandler) ListPolicies ¶
func (h *TieringPoliciesHandler) ListPolicies(c *fiber.Ctx) error
ListPolicies returns all custom tiering policies GET /api/v1/tiering/policies
func (*TieringPoliciesHandler) RegisterRoutes ¶
func (h *TieringPoliciesHandler) RegisterRoutes(app fiber.Router)
RegisterRoutes registers tiering policy API routes
type TriggerMigrationRequest ¶
type TriggerMigrationRequest struct {
FromTier string `json:"from_tier,omitempty"` // Optional: "hot" (2-tier system)
ToTier string `json:"to_tier,omitempty"` // Optional: "cold" (2-tier system)
Database string `json:"database,omitempty"` // Optional: filter by database
Measurement string `json:"measurement,omitempty"` // Optional: filter by measurement
DryRun bool `json:"dry_run,omitempty"` // If true, only report what would be migrated
}
TriggerMigrationRequest represents a manual migration request
type UpdateTokenRequest ¶
type UpdateTokenRequest struct {
Name *string `json:"name,omitempty"`
Description *string `json:"description,omitempty"`
Permissions *[]string `json:"permissions,omitempty"` // nil = don't change, empty array = clear permissions (RBAC-only)
ExpiresIn *string `json:"expires_in,omitempty"`
}
UpdateTokenRequest represents a token update request
Source Files
¶
- audit_routes.go
- auth_middleware.go
- auth_routes.go
- backup_routes.go
- cache_invalidate.go
- cluster.go
- compaction.go
- continuous_query.go
- databases.go
- debug.go
- debug_proc_linux.go
- delete.go
- governance.go
- import.go
- import_inprocess.go
- like_optimizer.go
- lineprotocol.go
- mqtt.go
- mqtt_subscriptions.go
- msgpack.go
- permissions.go
- query.go
- query_arrow_stub.go
- query_json_writer.go
- query_management.go
- query_msgpack_response.go
- rbac_routes.go
- reconciliation.go
- regex_rewriter.go
- retention.go
- routing.go
- scheduler.go
- server.go
- tiering.go
- tiering_policies.go
- tle.go