api

package
v0.0.0-...-17ad976 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2026 License: AGPL-3.0 Imports: 53 Imported by: 0

Documentation

Index

Constants

View Source
const ForwardedByHeader = "X-Arc-Forwarded-By"

ForwardedByHeader is the header used to detect forwarded requests and prevent routing loops.

View Source
const ShardRoutedHeader = "X-Arc-Shard-Routed"

ShardRoutedHeader indicates a request has been routed by the shard router.

Variables

This section is empty.

Functions

func BuildHTTPRequest

func BuildHTTPRequest(c *fiber.Ctx) (*http.Request, error)

BuildHTTPRequest converts a Fiber context to a net/http Request for forwarding via the router. It copies the method, URL, body, and headers from the Fiber request.

func CheckWritePermissions

func CheckWritePermissions(c *fiber.Ctx, rbacManager RBACChecker, logger zerolog.Logger, database string, measurements []string) error

CheckWritePermissions checks if the token has write permission for the database and measurements. This is a shared implementation used by both LineProtocol and MsgPack handlers.

func CopyResponse

func CopyResponse(c *fiber.Ctx, resp *http.Response) error

CopyResponse writes an HTTP response from the router to a Fiber context. It copies the status code, headers (excluding hop-by-hop), and body.

func GetDecompBufferDiscards

func GetDecompBufferDiscards() int64

GetDecompBufferDiscards returns the count of oversized buffers not returned to pool

func HandleRoutingError

func HandleRoutingError(c *fiber.Ctx, err error) error

HandleRoutingError returns an appropriate error response for routing failures.

func HandleShardRoutingError

func HandleShardRoutingError(c *fiber.Ctx, err error) error

HandleShardRoutingError returns an appropriate error response for shard routing failures.

func OptimizeLikePatterns

func OptimizeLikePatterns(sql string) (string, bool)

OptimizeLikePatterns rewrites SQL to reorder WHERE clause predicates for better query performance. Returns the optimized SQL and whether any changes were made.

func RewriteRegexToStringFuncs

func RewriteRegexToStringFuncs(sql string) (string, bool)

RewriteRegexToStringFuncs rewrites regex function calls to equivalent string functions. This provides 2x performance improvement for common patterns like URL domain extraction. Returns the rewritten SQL and whether any rewrites were applied.

func RouteShardedQuery

func RouteShardedQuery(shardRouter *sharding.ShardRouter, database string, c *fiber.Ctx) (*http.Response, error)

RouteShardedQuery routes a query request using the shard router. Returns:

  • (nil, nil) if request should be handled locally
  • (*http.Response, nil) if request was forwarded successfully
  • (nil, error) if routing failed

func RouteShardedWrite

func RouteShardedWrite(shardRouter *sharding.ShardRouter, c *fiber.Ctx) (*http.Response, error)

RouteShardedWrite routes a write request using the shard router. Returns:

  • (nil, nil) if request should be handled locally
  • (*http.Response, nil) if request was forwarded successfully
  • (nil, error) if routing failed

func ShouldForwardQuery

func ShouldForwardQuery(router *cluster.Router, c *fiber.Ctx) bool

ShouldForwardQuery checks if a query request should be forwarded to another node. Returns true if the request should be forwarded (local node cannot handle queries). Returns false if:

  • Router is nil (no clustering)
  • Request is already forwarded (loop prevention)
  • Local node can handle queries

func ShouldForwardWrite

func ShouldForwardWrite(router *cluster.Router, c *fiber.Ctx) bool

ShouldForwardWrite checks if a write request should be forwarded to another node. Returns true if the request should be forwarded (local node cannot handle writes). Returns false if:

  • Router is nil (no clustering)
  • Request is already forwarded (loop prevention)
  • Local node can handle writes

func ValidateSQLRequest

func ValidateSQLRequest(sql string) error

ValidateSQLRequest validates an SQL query for common issues. Returns nil if valid, or an error with appropriate message. This is a shared function used by multiple query endpoints.

Types

type AuditHandler

type AuditHandler struct {
	// contains filtered or unexported fields
}

AuditHandler handles audit log query endpoints

func NewAuditHandler

func NewAuditHandler(auditLogger *audit.Logger, authManager *auth.AuthManager, licenseClient *license.Client, logger zerolog.Logger) *AuditHandler

NewAuditHandler creates a new audit handler

func (*AuditHandler) RegisterRoutes

func (h *AuditHandler) RegisterRoutes(app *fiber.App)

RegisterRoutes registers audit log query endpoints

type AuthHandler

type AuthHandler struct {
	// contains filtered or unexported fields
}

AuthHandler handles authentication-related endpoints

func NewAuthHandler

func NewAuthHandler(authManager *auth.AuthManager, logger zerolog.Logger) *AuthHandler

NewAuthHandler creates a new auth handler

func (*AuthHandler) RegisterRoutes

func (h *AuthHandler) RegisterRoutes(app *fiber.App)

RegisterRoutes registers auth-related endpoints

func (*AuthHandler) RegisterTokenMembershipRoutes

func (h *AuthHandler) RegisterTokenMembershipRoutes(app *fiber.App)

RegisterTokenMembershipRoutes registers token membership endpoints (requires RBAC manager)

func (*AuthHandler) SetRBACManager

func (h *AuthHandler) SetRBACManager(rbacManager *auth.RBACManager)

SetRBACManager sets the RBAC manager for token membership endpoints

type AuthManager

type AuthManager interface {
	HasPermission(tokenInfo *auth.TokenInfo, permission string) bool
}

AuthManager interface for getting token info from context

type BackupHandler

type BackupHandler struct {
	// contains filtered or unexported fields
}

BackupHandler handles backup and restore API operations.

func NewBackupHandler

func NewBackupHandler(manager *backup.Manager, authManager *auth.AuthManager, logger zerolog.Logger) *BackupHandler

NewBackupHandler creates a new backup handler.

func (*BackupHandler) CreateBackup

func (h *BackupHandler) CreateBackup(c *fiber.Ctx) error

CreateBackup triggers a new backup. POST /api/v1/backup

func (*BackupHandler) DeleteBackup

func (h *BackupHandler) DeleteBackup(c *fiber.Ctx) error

DeleteBackup removes a backup. DELETE /api/v1/backup/:id

func (*BackupHandler) GetBackup

func (h *BackupHandler) GetBackup(c *fiber.Ctx) error

GetBackup returns the manifest for a specific backup. GET /api/v1/backup/:id

func (*BackupHandler) GetStatus

func (h *BackupHandler) GetStatus(c *fiber.Ctx) error

GetStatus returns the progress of the current active operation. GET /api/v1/backup/status

func (*BackupHandler) ListBackups

func (h *BackupHandler) ListBackups(c *fiber.Ctx) error

ListBackups returns all available backups. GET /api/v1/backup

func (*BackupHandler) RegisterRoutes

func (h *BackupHandler) RegisterRoutes(app fiber.Router)

RegisterRoutes registers backup and restore API routes.

func (*BackupHandler) RestoreBackup

func (h *BackupHandler) RestoreBackup(c *fiber.Ctx) error

RestoreBackup triggers a restore from a backup. POST /api/v1/backup/restore

type CQExecution

type CQExecution struct {
	ID                       int64   `json:"id"`
	QueryID                  int64   `json:"query_id"`
	ExecutionID              string  `json:"execution_id"`
	ExecutionTime            string  `json:"execution_time"`
	Status                   string  `json:"status"`
	StartTime                string  `json:"start_time"`
	EndTime                  string  `json:"end_time"`
	RecordsRead              *int64  `json:"records_read"`
	RecordsWritten           int64   `json:"records_written"`
	ExecutionDurationSeconds float64 `json:"execution_duration_seconds"`
	ErrorMessage             *string `json:"error_message"`
}

CQExecution represents an execution history record

type CQSchedulerInterface

type CQSchedulerInterface interface {
	Status() map[string]interface{}
	ReloadAll() error
	JobCount() int
}

CQSchedulerInterface defines the interface for CQ scheduler operations

type CQSchedulerReloader

type CQSchedulerReloader interface {
	ReloadCQ(cqID int64) error
}

CQSchedulerReloader is an interface for reloading CQ schedules after updates. This avoids a circular import between api and scheduler packages.

type ClusterHandler

type ClusterHandler struct {
	// contains filtered or unexported fields
}

ClusterHandler handles cluster management API endpoints.

func NewClusterHandler

func NewClusterHandler(
	coordinator *cluster.Coordinator,
	authManager *auth.AuthManager,
	licenseClient *license.Client,
	logger zerolog.Logger,
) *ClusterHandler

NewClusterHandler creates a new cluster handler. The coordinator can be nil if clustering is not enabled.

func (*ClusterHandler) RegisterRoutes

func (h *ClusterHandler) RegisterRoutes(app *fiber.App)

RegisterRoutes registers cluster API routes.

type CompactionHandler

type CompactionHandler struct {
	// contains filtered or unexported fields
}

CompactionHandler handles compaction API endpoints

func NewCompactionHandler

func NewCompactionHandler(manager *compaction.Manager, hourlyScheduler, dailyScheduler *compaction.Scheduler, authManager *auth.AuthManager, logger zerolog.Logger) *CompactionHandler

NewCompactionHandler creates a new compaction handler

func (*CompactionHandler) RegisterRoutes

func (h *CompactionHandler) RegisterRoutes(app *fiber.App)

RegisterRoutes registers compaction endpoints

type ContinuousQuery

type ContinuousQuery struct {
	ID                     int64   `json:"id"`
	Name                   string  `json:"name"`
	Description            *string `json:"description"`
	Database               string  `json:"database"`
	SourceMeasurement      string  `json:"source_measurement"`
	DestinationMeasurement string  `json:"destination_measurement"`
	Query                  string  `json:"query"`
	Interval               string  `json:"interval"`
	RetentionDays          *int    `json:"retention_days"`
	DeleteSourceAfterDays  *int    `json:"delete_source_after_days"`
	IsActive               bool    `json:"is_active"`
	LastExecutionTime      *string `json:"last_execution_time"`
	LastExecutionStatus    *string `json:"last_execution_status"`
	LastProcessedTime      *string `json:"last_processed_time"`
	LastRecordsWritten     *int64  `json:"last_records_written"`
	CreatedAt              string  `json:"created_at"`
	UpdatedAt              string  `json:"updated_at"`
}

ContinuousQuery represents a continuous query definition

type ContinuousQueryHandler

type ContinuousQueryHandler struct {
	// contains filtered or unexported fields
}

ContinuousQueryHandler handles continuous query operations

func NewContinuousQueryHandler

func NewContinuousQueryHandler(db *database.DuckDB, storage storage.Backend, arrowBuffer *ingest.ArrowBuffer, cfg *config.ContinuousQueryConfig, authManager *auth.AuthManager, logger zerolog.Logger) (*ContinuousQueryHandler, error)

NewContinuousQueryHandler creates a new continuous query handler

func (*ContinuousQueryHandler) Close

func (h *ContinuousQueryHandler) Close() error

Close closes the database connection

func (*ContinuousQueryHandler) ExecuteCQ

func (h *ContinuousQueryHandler) ExecuteCQ(ctx context.Context, queryID int64) (*ExecuteCQResponse, error)

ExecuteCQ executes a continuous query by ID programmatically (used by scheduler) Returns the execution response and any error

func (*ContinuousQueryHandler) GetActiveCQs

func (h *ContinuousQueryHandler) GetActiveCQs() ([]ContinuousQuery, error)

GetActiveCQs returns all active continuous queries (used by scheduler)

func (*ContinuousQueryHandler) GetCQ

func (h *ContinuousQueryHandler) GetCQ(queryID int64) (*ContinuousQuery, error)

GetCQ returns a continuous query by ID (used by scheduler)

func (*ContinuousQueryHandler) RegisterRoutes

func (h *ContinuousQueryHandler) RegisterRoutes(app *fiber.App)

RegisterRoutes registers continuous query endpoints

func (*ContinuousQueryHandler) SetScheduler

func (h *ContinuousQueryHandler) SetScheduler(s CQSchedulerReloader)

SetScheduler sets the CQ scheduler for reloading after updates. Called after scheduler creation since it depends on the handler.

type ContinuousQueryRequest

type ContinuousQueryRequest struct {
	Name                   string  `json:"name"`
	Description            *string `json:"description"`
	Database               string  `json:"database"`
	SourceMeasurement      string  `json:"source_measurement"`
	DestinationMeasurement string  `json:"destination_measurement"`
	Query                  string  `json:"query"`
	Interval               string  `json:"interval"`
	RetentionDays          *int    `json:"retention_days"`
	DeleteSourceAfterDays  *int    `json:"delete_source_after_days"`
	IsActive               bool    `json:"is_active"`
}

ContinuousQueryRequest represents a request to create/update a CQ

type CreateBackupRequest

type CreateBackupRequest struct {
	IncludeMetadata *bool `json:"include_metadata"` // default: true
	IncludeConfig   *bool `json:"include_config"`   // default: true
}

CreateBackupRequest is the request body for POST /api/v1/backup.

type CreateDatabaseRequest

type CreateDatabaseRequest struct {
	Name string `json:"name"`
}

CreateDatabaseRequest represents a request to create a new database

type CreateTokenRequest

type CreateTokenRequest struct {
	Name        string    `json:"name"`
	Description string    `json:"description,omitempty"`
	Permissions *[]string `json:"permissions,omitempty"` // nil = default (read,write), empty array = no permissions (RBAC-only)
	ExpiresIn   string    `json:"expires_in,omitempty"`  // e.g., "24h", "7d", "30d"
}

CreateTokenRequest represents a token creation request

type DatabaseInfo

type DatabaseInfo struct {
	Name             string `json:"name"`
	MeasurementCount int    `json:"measurement_count"`
	CreatedAt        string `json:"created_at,omitempty"`
}

DatabaseInfo represents information about a database

type DatabaseListResponse

type DatabaseListResponse struct {
	Databases []DatabaseInfo `json:"databases"`
	Count     int            `json:"count"`
}

DatabaseListResponse represents the response for listing databases

type DatabaseMeasurement

type DatabaseMeasurement struct {
	Name      string `json:"name"`
	FileCount int    `json:"file_count,omitempty"`
}

DatabaseMeasurement represents a measurement within a database

type DatabasesHandler

type DatabasesHandler struct {
	// contains filtered or unexported fields
}

DatabasesHandler handles database management API endpoints

func NewDatabasesHandler

func NewDatabasesHandler(storage storage.Backend, deleteConfig *config.DeleteConfig, authManager *auth.AuthManager, logger zerolog.Logger) *DatabasesHandler

NewDatabasesHandler creates a new databases handler

func (*DatabasesHandler) RegisterRoutes

func (h *DatabasesHandler) RegisterRoutes(app *fiber.App)

RegisterRoutes registers the database management routes

func (*DatabasesHandler) SetTieringManager

func (h *DatabasesHandler) SetTieringManager(tm *tiering.Manager)

SetTieringManager sets the tiering manager for multi-tier database/measurement listing. This is called after initialization when tiering is enabled and licensed.

type DeleteConfigResponse

type DeleteConfigResponse struct {
	Enabled               bool              `json:"enabled"`
	ConfirmationThreshold int               `json:"confirmation_threshold"`
	MaxRowsPerDelete      int               `json:"max_rows_per_delete"`
	Implementation        string            `json:"implementation"`
	PerformanceImpact     map[string]string `json:"performance_impact"`
}

DeleteConfigResponse represents delete configuration info

type DeleteHandler

type DeleteHandler struct {
	// contains filtered or unexported fields
}

DeleteHandler handles delete operations using file rewrite strategy

func NewDeleteHandler

func NewDeleteHandler(db *database.DuckDB, storage storage.Backend, cfg *config.DeleteConfig, authManager *auth.AuthManager, logger zerolog.Logger) *DeleteHandler

NewDeleteHandler creates a new delete handler

func (*DeleteHandler) RegisterRoutes

func (h *DeleteHandler) RegisterRoutes(app *fiber.App)

RegisterRoutes registers delete endpoints

type DeleteRequest

type DeleteRequest struct {
	Database    string `json:"database"`
	Measurement string `json:"measurement"`
	Where       string `json:"where"`
	DryRun      bool   `json:"dry_run"`
	Confirm     bool   `json:"confirm"`
}

DeleteRequest represents a delete operation request

type DeleteResponse

type DeleteResponse struct {
	Success         bool     `json:"success"`
	DeletedCount    int64    `json:"deleted_count"`
	AffectedFiles   int      `json:"affected_files"`
	RewrittenFiles  int      `json:"rewritten_files"`
	ExecutionTimeMs float64  `json:"execution_time_ms"`
	DryRun          bool     `json:"dry_run"`
	FilesProcessed  []string `json:"files_processed"`
	FailedFiles     []string `json:"failed_files,omitempty"`
	Error           string   `json:"error,omitempty"`
}

DeleteResponse represents a delete operation response

type EstimateResponse

type EstimateResponse struct {
	Success         bool    `json:"success"`
	EstimatedRows   *int64  `json:"estimated_rows"`
	WarningLevel    string  `json:"warning_level"`
	WarningMessage  string  `json:"warning_message,omitempty"`
	ExecutionTimeMs float64 `json:"execution_time_ms"`
	Error           string  `json:"error,omitempty"`
}

EstimateResponse represents the response for query estimation

type ExecuteCQRequest

type ExecuteCQRequest struct {
	StartTime *string `json:"start_time"`
	EndTime   *string `json:"end_time"`
	DryRun    bool    `json:"dry_run"`
}

ExecuteCQRequest represents a request to execute a CQ

type ExecuteCQResponse

type ExecuteCQResponse struct {
	QueryID                int64   `json:"query_id"`
	QueryName              string  `json:"query_name"`
	ExecutionID            string  `json:"execution_id"`
	Status                 string  `json:"status"`
	StartTime              string  `json:"start_time"`
	EndTime                string  `json:"end_time"`
	RecordsRead            *int64  `json:"records_read"`
	RecordsWritten         int64   `json:"records_written"`
	ExecutionTimeSeconds   float64 `json:"execution_time_seconds"`
	DestinationMeasurement string  `json:"destination_measurement"`
	DryRun                 bool    `json:"dry_run"`
	ExecutedAt             string  `json:"executed_at"`
	ExecutedQuery          string  `json:"executed_query,omitempty"`
}

ExecuteCQResponse represents the result of executing a CQ

type ExecuteRetentionRequest

type ExecuteRetentionRequest struct {
	DryRun  bool `json:"dry_run"`
	Confirm bool `json:"confirm"`
}

ExecuteRetentionRequest represents a request to execute a policy

type ExecuteRetentionResponse

type ExecuteRetentionResponse struct {
	PolicyID             int64    `json:"policy_id"`
	PolicyName           string   `json:"policy_name"`
	DeletedCount         int64    `json:"deleted_count"`
	FilesDeleted         int      `json:"files_deleted"`
	ExecutionTimeMs      float64  `json:"execution_time_ms"`
	DryRun               bool     `json:"dry_run"`
	CutoffDate           string   `json:"cutoff_date"`
	AffectedMeasurements []string `json:"affected_measurements"`
}

ExecuteRetentionResponse represents the result of executing a policy

type GovernanceHandler

type GovernanceHandler struct {
	// contains filtered or unexported fields
}

GovernanceHandler handles governance policy API operations.

func NewGovernanceHandler

func NewGovernanceHandler(manager *governance.Manager, authManager *auth.AuthManager, licenseClient *license.Client, logger zerolog.Logger) *GovernanceHandler

NewGovernanceHandler creates a new governance handler.

func (*GovernanceHandler) CreatePolicy

func (h *GovernanceHandler) CreatePolicy(c *fiber.Ctx) error

CreatePolicy creates a governance policy for a token. POST /api/v1/governance/policies

func (*GovernanceHandler) DeletePolicy

func (h *GovernanceHandler) DeletePolicy(c *fiber.Ctx) error

DeletePolicy removes the governance policy for a specific token. DELETE /api/v1/governance/policies/:token_id

func (*GovernanceHandler) GetPolicy

func (h *GovernanceHandler) GetPolicy(c *fiber.Ctx) error

GetPolicy returns the governance policy for a specific token. GET /api/v1/governance/policies/:token_id

func (*GovernanceHandler) GetUsage

func (h *GovernanceHandler) GetUsage(c *fiber.Ctx) error

GetUsage returns current usage statistics for a specific token. GET /api/v1/governance/usage/:token_id

func (*GovernanceHandler) ListPolicies

func (h *GovernanceHandler) ListPolicies(c *fiber.Ctx) error

ListPolicies returns all governance policies. GET /api/v1/governance/policies

func (*GovernanceHandler) RegisterRoutes

func (h *GovernanceHandler) RegisterRoutes(app fiber.Router)

RegisterRoutes registers governance API routes.

func (*GovernanceHandler) UpdatePolicy

func (h *GovernanceHandler) UpdatePolicy(c *fiber.Ctx) error

UpdatePolicy updates the governance policy for a specific token. PUT /api/v1/governance/policies/:token_id

type ImportHandler

type ImportHandler struct {
	// contains filtered or unexported fields
}

ImportHandler handles bulk CSV, Parquet, and Line Protocol file imports

func NewImportHandler

func NewImportHandler(db *database.DuckDB, storage storage.Backend, logger zerolog.Logger) *ImportHandler

NewImportHandler creates a new ImportHandler

func (*ImportHandler) RegisterRoutes

func (h *ImportHandler) RegisterRoutes(app *fiber.App)

RegisterRoutes registers import API routes

func (*ImportHandler) SetArrowBuffer

func (h *ImportHandler) SetArrowBuffer(buf *ingest.ArrowBuffer)

SetArrowBuffer sets the ArrowBuffer for Line Protocol import

func (*ImportHandler) SetAuthAndRBAC

func (h *ImportHandler) SetAuthAndRBAC(authManager AuthManager, rbacManager RBACChecker)

SetAuthAndRBAC sets the auth and RBAC managers for permission checking

func (*ImportHandler) Stats

func (h *ImportHandler) Stats(c *fiber.Ctx) error

Stats returns import handler statistics

type ImportResult

type ImportResult struct {
	Database          string   `json:"database"`
	Measurement       string   `json:"measurement"`
	RowsImported      int64    `json:"rows_imported"`
	PartitionsCreated int      `json:"partitions_created"`
	TimeRangeMin      string   `json:"time_range_min,omitempty"`
	TimeRangeMax      string   `json:"time_range_max,omitempty"`
	Columns           []string `json:"columns"`
	DurationMs        int64    `json:"duration_ms"`
}

ImportResult holds the result of a bulk import operation

type LPImportResult

type LPImportResult struct {
	Database     string   `json:"database"`
	Measurements []string `json:"measurements"`
	RowsImported int64    `json:"rows_imported"`
	Precision    string   `json:"precision"`
	DurationMs   int64    `json:"duration_ms"`
}

LPImportResult holds the result of a Line Protocol bulk import operation

type LineProtocolHandler

type LineProtocolHandler struct {
	// contains filtered or unexported fields
}

LineProtocolHandler handles Line Protocol write requests

func NewLineProtocolHandler

func NewLineProtocolHandler(buffer *ingest.ArrowBuffer, logger zerolog.Logger) *LineProtocolHandler

NewLineProtocolHandler creates a new Line Protocol handler

func (*LineProtocolHandler) Flush

func (h *LineProtocolHandler) Flush(c *fiber.Ctx) error

Flush triggers a buffer flush

func (*LineProtocolHandler) GetStats

func (h *LineProtocolHandler) GetStats() map[string]int64

GetStats returns stats as a map (for programmatic access)

func (*LineProtocolHandler) Health

func (h *LineProtocolHandler) Health(c *fiber.Ctx) error

Health returns health status

func (*LineProtocolHandler) RegisterRoutes

func (h *LineProtocolHandler) RegisterRoutes(app *fiber.App)

RegisterRoutes registers Line Protocol routes

func (*LineProtocolHandler) SetAuthAndRBAC

func (h *LineProtocolHandler) SetAuthAndRBAC(authManager AuthManager, rbacManager RBACChecker)

SetAuthAndRBAC sets the auth and RBAC managers for permission checking

func (*LineProtocolHandler) SetRouter

func (h *LineProtocolHandler) SetRouter(router *cluster.Router)

SetRouter sets the cluster router for request forwarding. When set, write requests from reader nodes will be forwarded to writer nodes.

func (*LineProtocolHandler) Stats

func (h *LineProtocolHandler) Stats(c *fiber.Ctx) error

Stats returns Line Protocol handler statistics

func (*LineProtocolHandler) WriteInfluxDB

func (h *LineProtocolHandler) WriteInfluxDB(c *fiber.Ctx) error

WriteInfluxDB handles InfluxDB 2.x compatible write requests POST /api/v2/write?org=myorg&bucket=mybucket&precision=ns

func (*LineProtocolHandler) WriteSimple

func (h *LineProtocolHandler) WriteSimple(c *fiber.Ctx) error

WriteSimple handles simple write requests without query parameters POST /api/v1/write/line-protocol

func (*LineProtocolHandler) WriteV1

func (h *LineProtocolHandler) WriteV1(c *fiber.Ctx) error

WriteV1 handles InfluxDB 1.x compatible write requests POST /write?db=telegraf&rp=default&precision=ns

type MQTTHandler

type MQTTHandler struct {
	// contains filtered or unexported fields
}

MQTTHandler handles MQTT stats and health endpoints

func NewMQTTHandler

func NewMQTTHandler(manager mqtt.Manager, authManager *auth.AuthManager, logger zerolog.Logger) *MQTTHandler

NewMQTTHandler creates a new MQTT handler

func (*MQTTHandler) RegisterRoutes

func (h *MQTTHandler) RegisterRoutes(app *fiber.App)

RegisterRoutes registers the MQTT stats/health API routes

type MQTTSubscriptionHandler

type MQTTSubscriptionHandler struct {
	// contains filtered or unexported fields
}

MQTTSubscriptionHandler handles MQTT subscription REST API endpoints

func NewMQTTSubscriptionHandler

func NewMQTTSubscriptionHandler(manager mqtt.Manager, authManager *auth.AuthManager, logger zerolog.Logger) *MQTTSubscriptionHandler

NewMQTTSubscriptionHandler creates a new MQTT subscription handler

func (*MQTTSubscriptionHandler) RegisterRoutes

func (h *MQTTSubscriptionHandler) RegisterRoutes(app *fiber.App)

RegisterRoutes registers the MQTT subscription API routes

type MeasurementInfo

type MeasurementInfo struct {
	Database    string  `json:"database"`
	Measurement string  `json:"measurement"`
	FileCount   int     `json:"file_count"`
	TotalSizeMB float64 `json:"total_size_mb"`
	StoragePath string  `json:"storage_path"`
}

MeasurementInfo represents information about a measurement

type MeasurementListResponse

type MeasurementListResponse struct {
	Database     string                `json:"database"`
	Measurements []DatabaseMeasurement `json:"measurements"`
	Count        int                   `json:"count"`
}

MeasurementListResponse represents the response for listing measurements

type MsgPackHandler

type MsgPackHandler struct {
	// contains filtered or unexported fields
}

MsgPackHandler handles MessagePack binary protocol endpoints

func NewMsgPackHandler

func NewMsgPackHandler(logger zerolog.Logger, arrowBuffer *ingest.ArrowBuffer, maxPayloadSize int64) *MsgPackHandler

NewMsgPackHandler creates a new MessagePack handler

func (*MsgPackHandler) RegisterRoutes

func (h *MsgPackHandler) RegisterRoutes(app *fiber.App)

RegisterRoutes registers MessagePack endpoints

func (*MsgPackHandler) SetAuthAndRBAC

func (h *MsgPackHandler) SetAuthAndRBAC(authManager AuthManager, rbacManager RBACChecker)

SetAuthAndRBAC sets the auth and RBAC managers for permission checking

func (*MsgPackHandler) SetRouter

func (h *MsgPackHandler) SetRouter(router *cluster.Router)

SetRouter sets the cluster router for request forwarding. When set, write requests from reader nodes will be forwarded to writer nodes.

type ParallelQueryInfo

type ParallelQueryInfo struct {
	// Paths contains the partition paths to execute in parallel
	Paths []string
	// QueryTemplate is the SQL with {PARTITION_PATH} placeholder
	QueryTemplate string
	// ReadParquetOptions are the options to pass to read_parquet
	ReadParquetOptions string
}

ParallelQueryInfo contains information for parallel partition execution. When set, the query should be executed using the parallel executor.

type PolicyRequest

type PolicyRequest struct {
	HotOnly       bool `json:"hot_only,omitempty"`         // Exclude from tiering entirely
	HotMaxAgeDays *int `json:"hot_max_age_days,omitempty"` // nil = use global default
}

PolicyRequest represents a request to create/update a tiering policy 2-tier system: data older than HotMaxAgeDays moves from hot to cold

type PooledBuffer

type PooledBuffer struct {
	Data []byte // The decompressed data (slice of pooled buffer)
	// contains filtered or unexported fields
}

PooledBuffer wraps a decompression buffer that must be returned to pool after use This enables zero-copy decompression by returning the pooled buffer directly

func (*PooledBuffer) Release

func (pb *PooledBuffer) Release()

Release returns the buffer to the pool - MUST be called after use Safe to call multiple times (idempotent)

type QueryHandler

type QueryHandler struct {
	// contains filtered or unexported fields
}

QueryHandler handles SQL query endpoints

func NewQueryHandler

func NewQueryHandler(db *database.DuckDB, storage storage.Backend, logger zerolog.Logger, queryTimeoutSeconds int, slowQueryThresholdMs int) *QueryHandler

NewQueryHandler creates a new query handler queryTimeoutSeconds: timeout for query execution in seconds (0 = no timeout) slowQueryThresholdMs: threshold in milliseconds for slow query WARN logging (0 = disabled)

func (*QueryHandler) InvalidateCaches

func (h *QueryHandler) InvalidateCaches()

InvalidateCaches clears all internal caches (partition pruner and SQL transform cache). This should be called after compaction to prevent stale file references.

func (*QueryHandler) RegisterRoutes

func (h *QueryHandler) RegisterRoutes(app *fiber.App)

RegisterRoutes registers query endpoints

func (*QueryHandler) SetAuthAndRBAC

func (h *QueryHandler) SetAuthAndRBAC(am AuthManager, rm RBACChecker)

SetAuthAndRBAC sets the auth and RBAC managers for permission checking

func (*QueryHandler) SetGovernance

func (h *QueryHandler) SetGovernance(manager *governance.Manager, lc *license.Client)

SetGovernance sets the governance manager and license client for query rate limiting and quotas.

func (*QueryHandler) SetQueryRegistry

func (h *QueryHandler) SetQueryRegistry(registry *queryregistry.Registry)

SetQueryRegistry sets the query registry for long-running query management.

func (*QueryHandler) SetRouter

func (h *QueryHandler) SetRouter(router *cluster.Router)

SetRouter sets the cluster router for request forwarding. When set, query requests from nodes that cannot handle queries will be forwarded. Note: Currently writers always process queries locally (CanQuery=true). This is provided for future extensibility (e.g., prefer_readers mode).

func (*QueryHandler) SetTieringManager

func (h *QueryHandler) SetTieringManager(manager *tiering.Manager)

SetTieringManager sets the tiering manager for multi-tier query routing. When set, queries will check both hot and cold tiers for data.

type QueryManagementHandler

type QueryManagementHandler struct {
	// contains filtered or unexported fields
}

QueryManagementHandler handles query management API operations.

func NewQueryManagementHandler

func NewQueryManagementHandler(registry *queryregistry.Registry, authManager *auth.AuthManager, licenseClient *license.Client, logger zerolog.Logger) *QueryManagementHandler

NewQueryManagementHandler creates a new query management handler.

func (*QueryManagementHandler) RegisterRoutes

func (h *QueryManagementHandler) RegisterRoutes(app fiber.Router)

RegisterRoutes registers query management API routes.

type QueryRequest

type QueryRequest struct {
	SQL string `json:"sql"`
}

QueryRequest represents a SQL query request

type QueryResponse

type QueryResponse struct {
	Success         bool                   `json:"success"`
	Columns         []string               `json:"columns"`
	Data            [][]interface{}        `json:"data"`
	RowCount        int                    `json:"row_count"`
	ExecutionTimeMs float64                `json:"execution_time_ms"`
	Timestamp       string                 `json:"timestamp"`
	Error           string                 `json:"error,omitempty"`
	Profile         *database.QueryProfile `json:"profile,omitempty"`
}

QueryResponse represents a SQL query response

type RBACChecker

type RBACChecker interface {
	IsRBACEnabled() bool
	CheckPermission(req *auth.PermissionCheckRequest) *auth.PermissionCheckResult
	CheckPermissionsBatch(reqs []*auth.PermissionCheckRequest) []*auth.PermissionCheckResult
}

RBACChecker interface for RBAC permission checking

type RBACHandler

type RBACHandler struct {
	// contains filtered or unexported fields
}

RBACHandler handles RBAC-related endpoints

func NewRBACHandler

func NewRBACHandler(authManager *auth.AuthManager, rbacManager *auth.RBACManager, logger zerolog.Logger) *RBACHandler

NewRBACHandler creates a new RBAC handler

func (*RBACHandler) RegisterRoutes

func (h *RBACHandler) RegisterRoutes(app *fiber.App)

RegisterRoutes registers RBAC-related endpoints

type RestoreRequest

type RestoreRequest struct {
	BackupID        string `json:"backup_id"`
	RestoreData     *bool  `json:"restore_data"`     // default: true
	RestoreMetadata *bool  `json:"restore_metadata"` // default: true
	RestoreConfig   *bool  `json:"restore_config"`   // default: false
	Confirm         bool   `json:"confirm"`          // must be true
}

RestoreRequest is the request body for POST /api/v1/backup/restore.

type RetentionExecution

type RetentionExecution struct {
	ID                  int64   `json:"id"`
	PolicyID            int64   `json:"policy_id"`
	ExecutionTime       string  `json:"execution_time"`
	Status              string  `json:"status"`
	DeletedCount        int64   `json:"deleted_count"`
	CutoffDate          *string `json:"cutoff_date"`
	ExecutionDurationMs float64 `json:"execution_duration_ms"`
	ErrorMessage        *string `json:"error_message"`
}

RetentionExecution represents an execution history record

type RetentionHandler

type RetentionHandler struct {
	// contains filtered or unexported fields
}

RetentionHandler handles retention policy operations

func NewRetentionHandler

func NewRetentionHandler(storage storage.Backend, duckdb *database.DuckDB, cfg *config.RetentionConfig, authManager *auth.AuthManager, logger zerolog.Logger) (*RetentionHandler, error)

NewRetentionHandler creates a new retention handler

func (*RetentionHandler) Close

func (h *RetentionHandler) Close() error

Close closes the database connection

func (*RetentionHandler) ExecutePolicy

func (h *RetentionHandler) ExecutePolicy(ctx context.Context, policyID int64) (*ExecuteRetentionResponse, error)

ExecutePolicy executes a retention policy by ID programmatically (used by scheduler) Returns the execution response and any error

func (*RetentionHandler) GetActivePolicies

func (h *RetentionHandler) GetActivePolicies() ([]RetentionPolicy, error)

GetActivePolicies returns all active retention policies (used by scheduler)

func (*RetentionHandler) GetPolicy

func (h *RetentionHandler) GetPolicy(policyID int64) (*RetentionPolicy, error)

GetPolicy returns a retention policy by ID (used by scheduler)

func (*RetentionHandler) RegisterRoutes

func (h *RetentionHandler) RegisterRoutes(app *fiber.App)

RegisterRoutes registers retention endpoints

type RetentionPolicy

type RetentionPolicy struct {
	ID                  int64   `json:"id"`
	Name                string  `json:"name"`
	Database            string  `json:"database"`
	Measurement         *string `json:"measurement"`
	RetentionDays       int     `json:"retention_days"`
	BufferDays          int     `json:"buffer_days"`
	IsActive            bool    `json:"is_active"`
	LastExecutionTime   *string `json:"last_execution_time"`
	LastExecutionStatus *string `json:"last_execution_status"`
	LastDeletedCount    *int64  `json:"last_deleted_count"`
	CreatedAt           string  `json:"created_at"`
	UpdatedAt           string  `json:"updated_at"`
}

RetentionPolicy represents a retention policy

type RetentionPolicyRequest

type RetentionPolicyRequest struct {
	Name          string  `json:"name"`
	Database      string  `json:"database"`
	Measurement   *string `json:"measurement"`
	RetentionDays int     `json:"retention_days"`
	BufferDays    int     `json:"buffer_days"`
	IsActive      bool    `json:"is_active"`
}

RetentionPolicyRequest represents a request to create/update a policy

type RetentionSchedulerInterface

type RetentionSchedulerInterface interface {
	Status() map[string]interface{}
	TriggerNow(ctx context.Context) error
}

RetentionSchedulerInterface defines the interface for retention scheduler operations

type SQLValidationError

type SQLValidationError struct {
	Message string
}

SQLValidationError represents an error from SQL validation

func (*SQLValidationError) Error

func (e *SQLValidationError) Error() string

type SchedulerHandler

type SchedulerHandler struct {
	// contains filtered or unexported fields
}

SchedulerHandler handles scheduler status API endpoints

func NewSchedulerHandler

func NewSchedulerHandler(
	cqScheduler CQSchedulerInterface,
	retentionScheduler RetentionSchedulerInterface,
	licenseClient *license.Client,
	authManager *auth.AuthManager,
	logger zerolog.Logger,
) *SchedulerHandler

NewSchedulerHandler creates a new scheduler handler

func (*SchedulerHandler) RegisterRoutes

func (h *SchedulerHandler) RegisterRoutes(app *fiber.App)

RegisterRoutes registers scheduler API routes

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server represents the HTTP API server

func NewServer

func NewServer(config *ServerConfig, logger zerolog.Logger) *Server

NewServer creates a new HTTP server with Fiber

func (*Server) GetApp

func (s *Server) GetApp() *fiber.App

GetApp returns the underlying Fiber app (for registering custom routes)

func (*Server) GetMaxPayloadSize

func (s *Server) GetMaxPayloadSize() int64

GetMaxPayloadSize returns the configured maximum payload size in bytes

func (*Server) RegisterRoutes

func (s *Server) RegisterRoutes()

RegisterRoutes registers all API routes

func (*Server) Shutdown

func (s *Server) Shutdown(timeout time.Duration) error

Shutdown gracefully shuts down the server

func (*Server) Start

func (s *Server) Start() error

Start starts the HTTP server

func (*Server) WaitForShutdown

func (s *Server) WaitForShutdown(shutdownTimeout time.Duration)

WaitForShutdown blocks until shutdown signal is received

type ServerConfig

type ServerConfig struct {
	Port            int
	ReadTimeout     time.Duration
	WriteTimeout    time.Duration
	IdleTimeout     time.Duration
	ShutdownTimeout time.Duration
	MaxPayloadSize  int64 // Maximum request payload size in bytes
	// TLS Configuration
	TLSEnabled  bool
	TLSCertFile string
	TLSKeyFile  string
}

ServerConfig holds server configuration

func DefaultServerConfig

func DefaultServerConfig() *ServerConfig

DefaultServerConfig returns default server configuration

type TLEHandler

type TLEHandler struct {
	// contains filtered or unexported fields
}

TLEHandler handles streaming TLE (Two-Line Element) write requests. TLE data is parsed into the satellite_tle measurement with orbital elements as fields and satellite identifiers as tags.

func NewTLEHandler

func NewTLEHandler(buffer *ingest.ArrowBuffer, logger zerolog.Logger) *TLEHandler

NewTLEHandler creates a new TLE handler

func (*TLEHandler) RegisterRoutes

func (h *TLEHandler) RegisterRoutes(app *fiber.App)

RegisterRoutes registers TLE routes

func (*TLEHandler) SetAuthAndRBAC

func (h *TLEHandler) SetAuthAndRBAC(authManager AuthManager, rbacManager RBACChecker)

SetAuthAndRBAC sets the auth and RBAC managers for permission checking

func (*TLEHandler) SetRouter

func (h *TLEHandler) SetRouter(router *cluster.Router)

SetRouter sets the cluster router for request forwarding. When set, write requests from reader nodes will be forwarded to writer nodes.

func (*TLEHandler) Stats

func (h *TLEHandler) Stats(c *fiber.Ctx) error

Stats returns TLE handler statistics

type TLEImportResult

type TLEImportResult struct {
	Database       string   `json:"database"`
	Measurement    string   `json:"measurement"`
	SatelliteCount int      `json:"satellite_count"`
	RowsImported   int64    `json:"rows_imported"`
	ParseWarnings  []string `json:"parse_warnings,omitempty"`
	DurationMs     int64    `json:"duration_ms"`
}

TLEImportResult holds the result of a TLE bulk import operation

type TableReference

type TableReference struct {
	Database    string
	Measurement string
}

TableReference represents a database.measurement reference extracted from SQL

type TieringHandler

type TieringHandler struct {
	// contains filtered or unexported fields
}

TieringHandler handles tiered storage API operations

func NewTieringHandler

func NewTieringHandler(manager *tiering.Manager, authManager *auth.AuthManager, licenseClient *license.Client, logger zerolog.Logger) *TieringHandler

NewTieringHandler creates a new tiering handler

func (*TieringHandler) GetFiles

func (h *TieringHandler) GetFiles(c *fiber.Ctx) error

GetFiles returns files by tier GET /api/v1/tiering/files Query params: tier (optional), database (optional), limit (optional)

func (*TieringHandler) GetStats

func (h *TieringHandler) GetStats(c *fiber.Ctx) error

GetStats returns migration statistics GET /api/v1/tiering/stats

func (*TieringHandler) GetStatus

func (h *TieringHandler) GetStatus(c *fiber.Ctx) error

GetStatus returns the current tiering status GET /api/v1/tiering/status

func (*TieringHandler) RegisterRoutes

func (h *TieringHandler) RegisterRoutes(app fiber.Router)

RegisterRoutes registers tiering API routes

func (*TieringHandler) ScanFiles

func (h *TieringHandler) ScanFiles(c *fiber.Ctx) error

ScanFiles scans the hot tier storage and registers all existing files POST /api/v1/tiering/scan

func (*TieringHandler) TriggerMigration

func (h *TieringHandler) TriggerMigration(c *fiber.Ctx) error

TriggerMigration triggers a manual migration POST /api/v1/tiering/migrate

type TieringPoliciesHandler

type TieringPoliciesHandler struct {
	// contains filtered or unexported fields
}

TieringPoliciesHandler handles tiering policy API operations

func NewTieringPoliciesHandler

func NewTieringPoliciesHandler(manager *tiering.Manager, authManager *auth.AuthManager, licenseClient *license.Client, logger zerolog.Logger) *TieringPoliciesHandler

NewTieringPoliciesHandler creates a new tiering policies handler

func (*TieringPoliciesHandler) DeletePolicy

func (h *TieringPoliciesHandler) DeletePolicy(c *fiber.Ctx) error

DeletePolicy removes the custom policy for a database (reverts to global defaults) DELETE /api/v1/tiering/policies/:database

func (*TieringPoliciesHandler) GetEffectivePolicy

func (h *TieringPoliciesHandler) GetEffectivePolicy(c *fiber.Ctx) error

GetEffectivePolicy returns the effective policy for a database (resolved with global defaults) GET /api/v1/tiering/policies/:database/effective

func (*TieringPoliciesHandler) GetPolicy

func (h *TieringPoliciesHandler) GetPolicy(c *fiber.Ctx) error

GetPolicy returns the policy for a specific database GET /api/v1/tiering/policies/:database

func (*TieringPoliciesHandler) ListPolicies

func (h *TieringPoliciesHandler) ListPolicies(c *fiber.Ctx) error

ListPolicies returns all custom tiering policies GET /api/v1/tiering/policies

func (*TieringPoliciesHandler) RegisterRoutes

func (h *TieringPoliciesHandler) RegisterRoutes(app fiber.Router)

RegisterRoutes registers tiering policy API routes

func (*TieringPoliciesHandler) SetPolicy

func (h *TieringPoliciesHandler) SetPolicy(c *fiber.Ctx) error

SetPolicy creates or updates the policy for a database PUT /api/v1/tiering/policies/:database

type TriggerMigrationRequest

type TriggerMigrationRequest struct {
	FromTier    string `json:"from_tier,omitempty"`   // Optional: "hot" (2-tier system)
	ToTier      string `json:"to_tier,omitempty"`     // Optional: "cold" (2-tier system)
	Database    string `json:"database,omitempty"`    // Optional: filter by database
	Measurement string `json:"measurement,omitempty"` // Optional: filter by measurement
	DryRun      bool   `json:"dry_run,omitempty"`     // If true, only report what would be migrated
}

TriggerMigrationRequest represents a manual migration request

type UpdateTokenRequest

type UpdateTokenRequest struct {
	Name        *string   `json:"name,omitempty"`
	Description *string   `json:"description,omitempty"`
	Permissions *[]string `json:"permissions,omitempty"` // nil = don't change, empty array = clear permissions (RBAC-only)
	ExpiresIn   *string   `json:"expires_in,omitempty"`
}

UpdateTokenRequest represents a token update request

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL