Documentation
¶
Index ¶
- Variables
- func CompareSchemas(schema1, schema2 map[string]string) (score int, differences []string)
- func GetLatestMigrationVersion(migrationsFS fs.FS) (uint, error)
- func PrintMigrateHelp()
- func RunMigrateCommand(args []string, dbPath string)
- type DB
- func (db *DB) AnalyseTransitOverlaps(ctx context.Context) (*TransitOverlapStats, error)
- func (db *DB) AttachAdminRoutes(mux *http.ServeMux)
- func (db *DB) BaselineAtVersion(version uint) error
- func (db *DB) CheckAndPromptMigrations(migrationsFS fs.FS) (bool, error)
- func (db *DB) CountUniqueBgSnapshotHashes(sensorID string) (int, error)
- func (db *DB) CreateSite(site *Site) error
- func (db *DB) CreateSiteConfigPeriod(period *SiteConfigPeriod) error
- func (db *DB) CreateSiteReport(report *SiteReport) error
- func (db *DB) DeleteBgSnapshots(snapshotIDs []int64) (int64, error)
- func (db *DB) DeleteDuplicateBgSnapshots(sensorID string) (int64, error)
- func (db *DB) DeleteSite(id int) error
- func (db *DB) DeleteSiteReport(id int) error
- func (db *DB) DetectSchemaVersion(migrationsFS fs.FS) (detectedVersion uint, matchScore int, differences []string, err error)
- func (db *DB) Events() ([]Event, error)
- func (db *DB) FindDuplicateBgSnapshots(sensorID string) ([]DuplicateSnapshotGroup, error)
- func (db *DB) GetActiveSiteConfigPeriod(siteID int) (*SiteConfigPeriod, error)
- func (db *DB) GetAllSites() ([]Site, error)
- func (db *DB) GetDatabaseSchema() (map[string]string, error)
- func (db *DB) GetDatabaseStats() (*DatabaseStats, error)
- func (db *DB) GetLatestBgSnapshot(sensorID string) (*lidar.BgSnapshot, error)
- func (db *DB) GetMigrationStatus(migrationsFS fs.FS) (map[string]interface{}, error)
- func (db *DB) GetRecentReportsAllSites(limit int) ([]SiteReport, error)
- func (db *DB) GetRecentReportsForSite(siteID int, limit int) ([]SiteReport, error)
- func (db *DB) GetSchemaAtMigration(migrationsFS fs.FS, targetVersion uint) (map[string]string, error)
- func (db *DB) GetSite(id int) (*Site, error)
- func (db *DB) GetSiteConfigPeriod(id int) (*SiteConfigPeriod, error)
- func (db *DB) GetSiteReport(id int) (*SiteReport, error)
- func (db *DB) InsertBgSnapshot(s *lidar.BgSnapshot) (int64, error)
- func (db *DB) ListRecentBgSnapshots(sensorID string, limit int) ([]*lidar.BgSnapshot, error)
- func (db *DB) ListSiteConfigPeriods(siteID *int) ([]SiteConfigPeriod, error)
- func (db *DB) MigrateDown(migrationsFS fs.FS) error
- func (db *DB) MigrateForce(migrationsFS fs.FS, version int) error
- func (db *DB) MigrateTo(migrationsFS fs.FS, version uint) error
- func (db *DB) MigrateUp(migrationsFS fs.FS) error
- func (db *DB) MigrateVersion(migrationsFS fs.FS) (version uint, dirty bool, err error)
- func (db *DB) RadarDataRange() (*DataRange, error)
- func (db *DB) RadarObjectRollupRange(startUnix, endUnix, groupSeconds int64, minSpeed float64, dataSource string, ...) (*RadarStatsResult, error)
- func (db *DB) RadarObjects() ([]RadarObject, error)
- func (db *DB) RecordRadarObject(rawRadarJSON string) error
- func (db *DB) RecordRawData(rawDataJSON string) error
- func (db *DB) UpdateSite(site *Site) error
- func (db *DB) UpdateSiteConfigPeriod(period *SiteConfigPeriod) error
- type DataRange
- type DatabaseStats
- type DuplicateSnapshotGroup
- type Event
- type EventAPI
- type RadarObject
- type RadarObjectsRollupRow
- type RadarStatsResult
- type Site
- type SiteConfigPeriod
- type SiteReport
- type TableStats
- type TransitController
- func (tc *TransitController) GetStatus() TransitStatus
- func (tc *TransitController) IsEnabled() bool
- func (tc *TransitController) Run(ctx context.Context) error
- func (tc *TransitController) SetEnabled(enabled bool)
- func (tc *TransitController) TriggerFullHistoryRun()
- func (tc *TransitController) TriggerManualRun()
- type TransitOverlap
- type TransitOverlapStats
- type TransitRunInfo
- type TransitStatus
- type TransitWorker
- func (w *TransitWorker) DeleteAllTransits(ctx context.Context, modelVersion string) (int64, error)
- func (w *TransitWorker) MigrateModelVersion(ctx context.Context, oldVersion string) error
- func (w *TransitWorker) RunFullHistory(ctx context.Context) error
- func (w *TransitWorker) RunOnce(ctx context.Context) error
- func (w *TransitWorker) RunRange(ctx context.Context, start, end float64) error
- func (w *TransitWorker) Start()
- func (w *TransitWorker) Stop()
Constants ¶
This section is empty.
Variables ¶
var DevMode = false
DevMode controls whether to use filesystem or embedded migrations. Set to true in development for hot-reloading, false in production.
Functions ¶
func CompareSchemas ¶
CompareSchemas compares two schemas and returns a similarity score (0-100) and a list of differences.
func GetLatestMigrationVersion ¶
GetLatestMigrationVersion returns the latest available migration version by scanning the migrations filesystem.
func PrintMigrateHelp ¶
func PrintMigrateHelp()
PrintMigrateHelp displays the help message for the migrate command
func RunMigrateCommand ¶
RunMigrateCommand handles the 'migrate' subcommand dispatching
Types ¶
type DB ¶
func NewDBWithMigrationCheck ¶
NewDBWithMigrationCheck opens a database and optionally checks for pending migrations. If checkMigrations is true and migrations are pending, returns an error prompting user to run migrations.
func OpenDB ¶
OpenDB opens a database connection without running schema initialization. This is useful for migration commands that manage schema independently. Note: PRAGMAs are still applied for performance and concurrency.
func (*DB) AnalyseTransitOverlaps ¶
func (db *DB) AnalyseTransitOverlaps(ctx context.Context) (*TransitOverlapStats, error)
AnalyseTransitOverlaps returns statistics about overlapping transits across model versions.
func (*DB) AttachAdminRoutes ¶
func (*DB) BaselineAtVersion ¶
BaselineAtVersion creates a schema_migrations entry at the specified version without running any migrations. This is useful for existing databases that already have the schema from that version applied.
func (*DB) CheckAndPromptMigrations ¶
CheckAndPromptMigrations checks if the database version differs from the latest available migration version. If they differ, it prompts the user to apply migrations. Returns true if migrations were needed but not applied (should exit), false otherwise.
func (*DB) CountUniqueBgSnapshotHashes ¶
CountUniqueBgSnapshotHashes counts the total number of unique grid_blob hashes for a sensor, including both duplicates and singletons.
func (*DB) CreateSite ¶
CreateSite creates a new site in the database
func (*DB) CreateSiteConfigPeriod ¶
func (db *DB) CreateSiteConfigPeriod(period *SiteConfigPeriod) error
CreateSiteConfigPeriod inserts a new site config period.
func (*DB) CreateSiteReport ¶
func (db *DB) CreateSiteReport(report *SiteReport) error
CreateSiteReport creates a new report record in the database
func (*DB) DeleteBgSnapshots ¶
DeleteBgSnapshots deletes snapshots by their IDs. Returns the number of rows deleted.
func (*DB) DeleteDuplicateBgSnapshots ¶
DeleteDuplicateBgSnapshots removes duplicate snapshots for a given sensor_id. Duplicates are defined as sharing the same grid_blob content, regardless of timestamp. This deduplicates history, keeping only the most recent snapshot (highest ID) for each unique grid configuration.
func (*DB) DeleteSite ¶
DeleteSite deletes a site from the database
func (*DB) DeleteSiteReport ¶
DeleteSiteReport deletes a site report by ID
func (*DB) DetectSchemaVersion ¶
func (db *DB) DetectSchemaVersion(migrationsFS fs.FS) (detectedVersion uint, matchScore int, differences []string, err error)
DetectSchemaVersion attempts to detect which migration version a database is at by comparing its schema to known schemas at each migration point. Returns the detected version, match score, and any differences.
Performance optimization: Iterates from version 1 to latest, applying migrations incrementally to a single temporary database. This is much more efficient than creating a new database for each version. Stops early if a high similarity threshold (98%) is reached or a perfect match (100%) is found.
func (*DB) FindDuplicateBgSnapshots ¶
func (db *DB) FindDuplicateBgSnapshots(sensorID string) ([]DuplicateSnapshotGroup, error)
FindDuplicateBgSnapshots finds groups of snapshots with identical grid_blob data. Returns groups where Count > 1 (i.e., duplicates exist).
func (*DB) GetActiveSiteConfigPeriod ¶
func (db *DB) GetActiveSiteConfigPeriod(siteID int) (*SiteConfigPeriod, error)
GetActiveSiteConfigPeriod returns the active config period for a site.
func (*DB) GetAllSites ¶
GetAllSites retrieves all sites from the database
func (*DB) GetDatabaseSchema ¶
GetDatabaseSchema extracts the current schema from the database. Returns a map of object names to their SQL definitions (normalized).
func (*DB) GetDatabaseStats ¶
func (db *DB) GetDatabaseStats() (*DatabaseStats, error)
GetDatabaseStats returns size and row count information for all tables in the database. Uses SQLite's dbstat virtual table to get accurate size information.
func (*DB) GetLatestBgSnapshot ¶
func (db *DB) GetLatestBgSnapshot(sensorID string) (*lidar.BgSnapshot, error)
GetLatestBgSnapshot returns the most recent BgSnapshot for the given sensor_id, or nil if none.
func (*DB) GetMigrationStatus ¶
GetMigrationStatus returns a summary of the migration status including current version, dirty state, and whether migrations are needed.
func (*DB) GetRecentReportsAllSites ¶
func (db *DB) GetRecentReportsAllSites(limit int) ([]SiteReport, error)
GetRecentReportsAllSites retrieves the most recent N reports across all sites
func (*DB) GetRecentReportsForSite ¶
func (db *DB) GetRecentReportsForSite(siteID int, limit int) ([]SiteReport, error)
GetRecentReportsForSite retrieves the most recent N reports for a specific site
func (*DB) GetSchemaAtMigration ¶
func (db *DB) GetSchemaAtMigration(migrationsFS fs.FS, targetVersion uint) (map[string]string, error)
GetSchemaAtMigration returns the schema that would result from applying migrations up to a specific version. This is done by creating a temporary database and applying migrations.
func (*DB) GetSiteConfigPeriod ¶
func (db *DB) GetSiteConfigPeriod(id int) (*SiteConfigPeriod, error)
GetSiteConfigPeriod retrieves a single config period by ID.
func (*DB) GetSiteReport ¶
func (db *DB) GetSiteReport(id int) (*SiteReport, error)
GetSiteReport retrieves a report by ID
func (*DB) InsertBgSnapshot ¶
func (db *DB) InsertBgSnapshot(s *lidar.BgSnapshot) (int64, error)
InsertBgSnapshot persists a Background snapshot into the lidar_bg_snapshot table and returns the new snapshot_id.
func (*DB) ListRecentBgSnapshots ¶
ListRecentBgSnapshots returns the last N BgSnapshots for a sensor_id, ordered by most recent.
func (*DB) ListSiteConfigPeriods ¶
func (db *DB) ListSiteConfigPeriods(siteID *int) ([]SiteConfigPeriod, error)
ListSiteConfigPeriods returns all site config periods, optionally filtered by site.
func (*DB) MigrateDown ¶
MigrateDown rolls back the most recent migration.
func (*DB) MigrateForce ¶
MigrateForce forces the migration version to a specific value. This should only be used to recover from a dirty migration state.
func (*DB) MigrateTo ¶
MigrateTo migrates to a specific version. Use this to migrate up or down to a specific version.
func (*DB) MigrateUp ¶
MigrateUp runs all pending migrations up to the latest version. Returns nil if no migrations were needed (already at latest version).
func (*DB) MigrateVersion ¶
MigrateVersion returns the current migration version and dirty state. Returns 0, false, nil if no migrations have been applied yet.
func (*DB) RadarDataRange ¶
RadarDataRange returns the earliest and latest radar object timestamps.
func (*DB) RadarObjectRollupRange ¶
func (db *DB) RadarObjectRollupRange(startUnix, endUnix, groupSeconds int64, minSpeed float64, dataSource string, modelVersion string, histBucketSize, histMax float64, siteID int, boundaryThreshold int) (*RadarStatsResult, error)
RadarObjectRollupRange aggregates radar speed sources into time buckets and optionally computes a histogram. dataSource may be either "radar_objects" (default), "radar_data", or "radar_data_transits". If histBucketSize > 0, a histogram is computed; histMax (if > 0) clips histogram values above that threshold. Both histBucketSize and histMax are in meters-per-second (mps). boundaryThreshold: if > 0, filters out boundary hours (first/last hour of each day) with fewer than this many data points. This helps exclude incomplete survey periods. Set to 0 to disable boundary filtering. NOTE: Boundary filtering is always applied at 1-hour granularity, regardless of the requested groupSeconds. This ensures consistent filtering whether requesting hourly, daily (24h), or overall (all) aggregation.
func (*DB) RadarObjects ¶
func (db *DB) RadarObjects() ([]RadarObject, error)
func (*DB) RecordRadarObject ¶
func (*DB) RecordRawData ¶
func (*DB) UpdateSite ¶
UpdateSite updates an existing site in the database
func (*DB) UpdateSiteConfigPeriod ¶
func (db *DB) UpdateSiteConfigPeriod(period *SiteConfigPeriod) error
UpdateSiteConfigPeriod updates an existing site config period.
type DatabaseStats ¶
type DatabaseStats struct {
TotalSizeMB float64 `json:"total_size_mb"`
Tables []TableStats `json:"tables"`
}
DatabaseStats contains overall database statistics.
type DuplicateSnapshotGroup ¶
type DuplicateSnapshotGroup struct {
BlobHash string // hex-encoded hash of grid_blob
Count int // number of snapshots with this hash
SnapshotIDs []int64 // list of snapshot IDs with this hash
KeepID int64 // the snapshot ID to keep (oldest)
DeleteIDs []int64 // snapshot IDs that would be deleted
BlobBytes int // size of the blob in bytes
SensorID string // sensor ID for this group
}
DuplicateSnapshotGroup represents a group of snapshots with the same grid_blob hash.
type Event ¶
type Event struct {
Magnitude sql.NullFloat64
Uptime sql.NullFloat64
Speed sql.NullFloat64
}
type EventAPI ¶
type EventAPI struct {
Magnitude *float64 `json:"Magnitude,omitempty"`
Uptime *float64 `json:"Uptime,omitempty"`
Speed *float64 `json:"Speed,omitempty"`
}
func EventToAPI ¶
type RadarObject ¶
type RadarObject struct {
Classifier string
StartTime time.Time
EndTime time.Time
DeltaTimeMs int64
MaxSpeed float64
MinSpeed float64
SpeedChange float64
MaxMagnitude int64
AvgMagnitude int64
TotalFrames int64
FramesPerMps float64
Length float64
}
func (*RadarObject) String ¶
func (e *RadarObject) String() string
type RadarObjectsRollupRow ¶
type RadarObjectsRollupRow struct {
Classifier string
StartTime time.Time
Count int64
P50Speed float64
P85Speed float64
P98Speed float64
MaxSpeed float64
}
RadarObjectsRollupRow represents an aggregate row for radar object rollup.
func (*RadarObjectsRollupRow) String ¶
func (e *RadarObjectsRollupRow) String() string
type RadarStatsResult ¶
type RadarStatsResult struct {
Metrics []RadarObjectsRollupRow
Histogram map[float64]int64 // bucket start (mps) -> count; nil if histogram not requested
MinSpeedUsed float64 // actual minimum speed filter applied (mps)
}
RadarStatsResult combines time-aggregated metrics with an optional histogram.
type Site ¶
type Site struct {
ID int `json:"id"`
Name string `json:"name"`
Location string `json:"location"`
Description *string `json:"description"`
Surveyor string `json:"surveyor"`
Contact string `json:"contact"`
Address *string `json:"address"`
Latitude *float64 `json:"latitude"`
Longitude *float64 `json:"longitude"`
MapAngle *float64 `json:"map_angle"`
IncludeMap bool `json:"include_map"`
SiteDescription *string `json:"site_description"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
Site represents a survey site configuration
type SiteConfigPeriod ¶
type SiteConfigPeriod struct {
ID int `json:"id"`
SiteID int `json:"site_id"`
EffectiveStartUnix float64 `json:"effective_start_unix"`
EffectiveEndUnix *float64 `json:"effective_end_unix"`
IsActive bool `json:"is_active"`
Notes *string `json:"notes"`
CosineErrorAngle float64 `json:"cosine_error_angle"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
SiteConfigPeriod represents a time-based configuration period for a site.
type SiteReport ¶
type SiteReport struct {
ID int `json:"id"`
SiteID int `json:"site_id"`
StartDate string `json:"start_date"` // YYYY-MM-DD
EndDate string `json:"end_date"` // YYYY-MM-DD
Filepath string `json:"filepath"` // Relative path from pdf-generator directory
Filename string `json:"filename"` // PDF filename
ZipFilepath *string `json:"zip_filepath"` // Relative path to sources ZIP
ZipFilename *string `json:"zip_filename"` // ZIP filename
RunID string `json:"run_id"` // Timestamp-based run ID
Timezone string `json:"timezone"` // Report timezone
Units string `json:"units"` // mph or kph
Source string `json:"source"` // radar_objects, radar_data, or radar_data_transits
CreatedAt time.Time `json:"created_at"`
}
SiteReport represents a generated PDF report for a site
type TableStats ¶
type TableStats struct {
Name string `json:"name"`
RowCount int64 `json:"row_count"`
SizeMB float64 `json:"size_mb"`
}
TableStats contains size and row count information for a database table.
type TransitController ¶
type TransitController struct {
// contains filtered or unexported fields
}
TransitController manages the state and execution of the transit worker. It provides thread-safe control over whether the transit worker runs, and supports manual triggering from the UI.
func NewTransitController ¶
func NewTransitController(worker *TransitWorker) *TransitController
NewTransitController creates a new controller for the transit worker.
func (*TransitController) GetStatus ¶
func (tc *TransitController) GetStatus() TransitStatus
GetStatus returns the current status of the transit worker.
func (*TransitController) IsEnabled ¶
func (tc *TransitController) IsEnabled() bool
IsEnabled returns whether the transit worker is currently enabled.
func (*TransitController) Run ¶
func (tc *TransitController) Run(ctx context.Context) error
Run starts the transit worker loop. This should be called in a goroutine. It will run periodically based on the worker's Interval, but only when enabled. It also responds to manual triggers from the UI.
func (*TransitController) SetEnabled ¶
func (tc *TransitController) SetEnabled(enabled bool)
SetEnabled sets whether the transit worker should run. If enabling, it also triggers an immediate run.
func (*TransitController) TriggerFullHistoryRun ¶
func (tc *TransitController) TriggerFullHistoryRun()
TriggerFullHistoryRun triggers a full-history run of the transit worker. This is non-blocking and safe to call multiple times.
func (*TransitController) TriggerManualRun ¶
func (tc *TransitController) TriggerManualRun()
TriggerManualRun triggers a manual run of the transit worker. This is non-blocking and safe to call multiple times.
type TransitOverlap ¶
TransitOverlap represents a pair of overlapping transits with different model versions.
type TransitOverlapStats ¶
type TransitOverlapStats struct {
TotalTransits int64
ModelVersionCounts map[string]int64
Overlaps []TransitOverlap
}
TransitOverlapStats contains statistics about overlapping transits.
type TransitRunInfo ¶
type TransitRunInfo struct {
Trigger string `json:"trigger,omitempty"`
StartedAt time.Time `json:"started_at"`
FinishedAt time.Time `json:"finished_at,omitempty"`
DurationMs int64 `json:"duration_ms,omitempty"`
Error string `json:"error,omitempty"`
}
TransitRunInfo captures details about a single transit worker run.
type TransitStatus ¶
type TransitStatus struct {
Enabled bool `json:"enabled"`
LastRunAt time.Time `json:"last_run_at"`
LastRunError string `json:"last_run_error,omitempty"`
RunCount int64 `json:"run_count"`
IsHealthy bool `json:"is_healthy"`
CurrentRun *TransitRunInfo `json:"current_run,omitempty"`
LastRun *TransitRunInfo `json:"last_run,omitempty"`
}
TransitStatus represents the current state of the transit worker.
type TransitWorker ¶
type TransitWorker struct {
DB *DB
// Threshold in seconds used to split sessions (1,2,3,4,5 -> 1000,2000,...ms)
ThresholdSeconds int
ModelVersion string
Interval time.Duration // how often to run (e.g., 15m)
Window time.Duration // lookback window (e.g., 20m)
StopChan chan struct{}
}
TransitWorker periodically scans recent radar_data and upserts sessionized transits into radar_data_transits and radar_transit_links. Designed to run every 15 minutes and process the last 20 minutes window (with a small overlap to allow updates).
func NewTransitWorker ¶
func NewTransitWorker(db *DB, thresholdSeconds int, modelVersion string) *TransitWorker
func (*TransitWorker) DeleteAllTransits ¶
DeleteAllTransits removes all transits for a given model version.
func (*TransitWorker) MigrateModelVersion ¶
func (w *TransitWorker) MigrateModelVersion(ctx context.Context, oldVersion string) error
MigrateModelVersion replaces all transits from oldVersion with the worker's current ModelVersion by deleting old transits and re-running over full history.
func (*TransitWorker) RunFullHistory ¶
func (w *TransitWorker) RunFullHistory(ctx context.Context) error
RunFullHistory scans the full available radar_data range and upserts transits.
func (*TransitWorker) RunOnce ¶
func (w *TransitWorker) RunOnce(ctx context.Context) error
RunOnce scans the last w.Window (+ small overlap) and upserts transits.
func (*TransitWorker) RunRange ¶
func (w *TransitWorker) RunRange(ctx context.Context, start, end float64) error
RunRange scans the provided [start,end] (unix seconds as float64) and upserts transits.
func (*TransitWorker) Start ¶
func (w *TransitWorker) Start()
Start runs the periodic worker loop in a goroutine.