db

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2026 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DevMode = false

DevMode controls whether to use filesystem or embedded migrations. Set to true in development for hot-reloading, false in production.

Functions

func CompareSchemas

func CompareSchemas(schema1, schema2 map[string]string) (score int, differences []string)

CompareSchemas compares two schemas and returns a similarity score (0-100) and a list of differences.

func GetLatestMigrationVersion

func GetLatestMigrationVersion(migrationsFS fs.FS) (uint, error)

GetLatestMigrationVersion returns the latest available migration version by scanning the migrations filesystem.

func PrintMigrateHelp

func PrintMigrateHelp()

PrintMigrateHelp displays the help message for the migrate command

func RunMigrateCommand

func RunMigrateCommand(args []string, dbPath string)

RunMigrateCommand handles the 'migrate' subcommand dispatching

Types

type DB

type DB struct {
	*sql.DB
}

func NewDB

func NewDB(path string) (*DB, error)

func NewDBWithMigrationCheck

func NewDBWithMigrationCheck(path string, checkMigrations bool) (*DB, error)

NewDBWithMigrationCheck opens a database and optionally checks for pending migrations. If checkMigrations is true and migrations are pending, returns an error prompting user to run migrations.

func OpenDB

func OpenDB(path string) (*DB, error)

OpenDB opens a database connection without running schema initialization. This is useful for migration commands that manage schema independently. Note: PRAGMAs are still applied for performance and concurrency.

func (*DB) AnalyseTransitOverlaps

func (db *DB) AnalyseTransitOverlaps(ctx context.Context) (*TransitOverlapStats, error)

AnalyseTransitOverlaps returns statistics about overlapping transits across model versions.

func (*DB) AttachAdminRoutes

func (db *DB) AttachAdminRoutes(mux *http.ServeMux)

func (*DB) BaselineAtVersion

func (db *DB) BaselineAtVersion(version uint) error

BaselineAtVersion creates a schema_migrations entry at the specified version without running any migrations. This is useful for existing databases that already have the schema from that version applied.

func (*DB) CheckAndPromptMigrations

func (db *DB) CheckAndPromptMigrations(migrationsFS fs.FS) (bool, error)

CheckAndPromptMigrations checks if the database version differs from the latest available migration version. If they differ, it prompts the user to apply migrations. Returns true if migrations were needed but not applied (should exit), false otherwise.

func (*DB) CountUniqueBgSnapshotHashes

func (db *DB) CountUniqueBgSnapshotHashes(sensorID string) (int, error)

CountUniqueBgSnapshotHashes counts the total number of unique grid_blob hashes for a sensor, including both duplicates and singletons.

func (*DB) CreateSite

func (db *DB) CreateSite(site *Site) error

CreateSite creates a new site in the database

func (*DB) CreateSiteConfigPeriod

func (db *DB) CreateSiteConfigPeriod(period *SiteConfigPeriod) error

CreateSiteConfigPeriod inserts a new site config period.

func (*DB) CreateSiteReport

func (db *DB) CreateSiteReport(report *SiteReport) error

CreateSiteReport creates a new report record in the database

func (*DB) DeleteBgSnapshots

func (db *DB) DeleteBgSnapshots(snapshotIDs []int64) (int64, error)

DeleteBgSnapshots deletes snapshots by their IDs. Returns the number of rows deleted.

func (*DB) DeleteDuplicateBgSnapshots

func (db *DB) DeleteDuplicateBgSnapshots(sensorID string) (int64, error)

DeleteDuplicateBgSnapshots removes duplicate snapshots for a given sensor_id. Duplicates are defined as sharing the same grid_blob content, regardless of timestamp. This deduplicates history, keeping only the most recent snapshot (highest ID) for each unique grid configuration.

func (*DB) DeleteSite

func (db *DB) DeleteSite(id int) error

DeleteSite deletes a site from the database

func (*DB) DeleteSiteReport

func (db *DB) DeleteSiteReport(id int) error

DeleteSiteReport deletes a site report by ID

func (*DB) DetectSchemaVersion

func (db *DB) DetectSchemaVersion(migrationsFS fs.FS) (detectedVersion uint, matchScore int, differences []string, err error)

DetectSchemaVersion attempts to detect which migration version a database is at by comparing its schema to known schemas at each migration point. Returns the detected version, match score, and any differences.

Performance optimization: Iterates from version 1 to latest, applying migrations incrementally to a single temporary database. This is much more efficient than creating a new database for each version. Stops early if a high similarity threshold (98%) is reached or a perfect match (100%) is found.

func (*DB) Events

func (db *DB) Events() ([]Event, error)

func (*DB) FindDuplicateBgSnapshots

func (db *DB) FindDuplicateBgSnapshots(sensorID string) ([]DuplicateSnapshotGroup, error)

FindDuplicateBgSnapshots finds groups of snapshots with identical grid_blob data. Returns groups where Count > 1 (i.e., duplicates exist).

func (*DB) GetActiveSiteConfigPeriod

func (db *DB) GetActiveSiteConfigPeriod(siteID int) (*SiteConfigPeriod, error)

GetActiveSiteConfigPeriod returns the active config period for a site.

func (*DB) GetAllSites

func (db *DB) GetAllSites() ([]Site, error)

GetAllSites retrieves all sites from the database

func (*DB) GetDatabaseSchema

func (db *DB) GetDatabaseSchema() (map[string]string, error)

GetDatabaseSchema extracts the current schema from the database. Returns a map of object names to their SQL definitions (normalized).

func (*DB) GetDatabaseStats

func (db *DB) GetDatabaseStats() (*DatabaseStats, error)

GetDatabaseStats returns size and row count information for all tables in the database. Uses SQLite's dbstat virtual table to get accurate size information.

func (*DB) GetLatestBgSnapshot

func (db *DB) GetLatestBgSnapshot(sensorID string) (*lidar.BgSnapshot, error)

GetLatestBgSnapshot returns the most recent BgSnapshot for the given sensor_id, or nil if none.

func (*DB) GetMigrationStatus

func (db *DB) GetMigrationStatus(migrationsFS fs.FS) (map[string]interface{}, error)

GetMigrationStatus returns a summary of the migration status including current version, dirty state, and whether migrations are needed.

func (*DB) GetRecentReportsAllSites

func (db *DB) GetRecentReportsAllSites(limit int) ([]SiteReport, error)

GetRecentReportsAllSites retrieves the most recent N reports across all sites

func (*DB) GetRecentReportsForSite

func (db *DB) GetRecentReportsForSite(siteID int, limit int) ([]SiteReport, error)

GetRecentReportsForSite retrieves the most recent N reports for a specific site

func (*DB) GetSchemaAtMigration

func (db *DB) GetSchemaAtMigration(migrationsFS fs.FS, targetVersion uint) (map[string]string, error)

GetSchemaAtMigration returns the schema that would result from applying migrations up to a specific version. This is done by creating a temporary database and applying migrations.

func (*DB) GetSite

func (db *DB) GetSite(id int) (*Site, error)

GetSite retrieves a site by ID

func (*DB) GetSiteConfigPeriod

func (db *DB) GetSiteConfigPeriod(id int) (*SiteConfigPeriod, error)

GetSiteConfigPeriod retrieves a single config period by ID.

func (*DB) GetSiteReport

func (db *DB) GetSiteReport(id int) (*SiteReport, error)

GetSiteReport retrieves a report by ID

func (*DB) InsertBgSnapshot

func (db *DB) InsertBgSnapshot(s *lidar.BgSnapshot) (int64, error)

InsertBgSnapshot persists a Background snapshot into the lidar_bg_snapshot table and returns the new snapshot_id.

func (*DB) ListRecentBgSnapshots

func (db *DB) ListRecentBgSnapshots(sensorID string, limit int) ([]*lidar.BgSnapshot, error)

ListRecentBgSnapshots returns the last N BgSnapshots for a sensor_id, ordered by most recent.

func (*DB) ListSiteConfigPeriods

func (db *DB) ListSiteConfigPeriods(siteID *int) ([]SiteConfigPeriod, error)

ListSiteConfigPeriods returns all site config periods, optionally filtered by site.

func (*DB) MigrateDown

func (db *DB) MigrateDown(migrationsFS fs.FS) error

MigrateDown rolls back the most recent migration.

func (*DB) MigrateForce

func (db *DB) MigrateForce(migrationsFS fs.FS, version int) error

MigrateForce forces the migration version to a specific value. This should only be used to recover from a dirty migration state.

func (*DB) MigrateTo

func (db *DB) MigrateTo(migrationsFS fs.FS, version uint) error

MigrateTo migrates to a specific version. Use this to migrate up or down to a specific version.

func (*DB) MigrateUp

func (db *DB) MigrateUp(migrationsFS fs.FS) error

MigrateUp runs all pending migrations up to the latest version. Returns nil if no migrations were needed (already at latest version).

func (*DB) MigrateVersion

func (db *DB) MigrateVersion(migrationsFS fs.FS) (version uint, dirty bool, err error)

MigrateVersion returns the current migration version and dirty state. Returns 0, false, nil if no migrations have been applied yet.

func (*DB) RadarDataRange

func (db *DB) RadarDataRange() (*DataRange, error)

RadarDataRange returns the earliest and latest radar object timestamps.

func (*DB) RadarObjectRollupRange

func (db *DB) RadarObjectRollupRange(startUnix, endUnix, groupSeconds int64, minSpeed float64, dataSource string, modelVersion string, histBucketSize, histMax float64, siteID int, boundaryThreshold int) (*RadarStatsResult, error)

RadarObjectRollupRange aggregates radar speed sources into time buckets and optionally computes a histogram. dataSource may be either "radar_objects" (default), "radar_data", or "radar_data_transits". If histBucketSize > 0, a histogram is computed; histMax (if > 0) clips histogram values above that threshold. Both histBucketSize and histMax are in meters-per-second (mps). boundaryThreshold: if > 0, filters out boundary hours (first/last hour of each day) with fewer than this many data points. This helps exclude incomplete survey periods. Set to 0 to disable boundary filtering. NOTE: Boundary filtering is always applied at 1-hour granularity, regardless of the requested groupSeconds. This ensures consistent filtering whether requesting hourly, daily (24h), or overall (all) aggregation.

func (*DB) RadarObjects

func (db *DB) RadarObjects() ([]RadarObject, error)

func (*DB) RecordRadarObject

func (db *DB) RecordRadarObject(rawRadarJSON string) error

func (*DB) RecordRawData

func (db *DB) RecordRawData(rawDataJSON string) error

func (*DB) UpdateSite

func (db *DB) UpdateSite(site *Site) error

UpdateSite updates an existing site in the database

func (*DB) UpdateSiteConfigPeriod

func (db *DB) UpdateSiteConfigPeriod(period *SiteConfigPeriod) error

UpdateSiteConfigPeriod updates an existing site config period.

type DataRange

type DataRange struct {
	StartUnix float64 `json:"start_unix"`
	EndUnix   float64 `json:"end_unix"`
}

DataRange represents a data coverage window in unix seconds.

type DatabaseStats

type DatabaseStats struct {
	TotalSizeMB float64      `json:"total_size_mb"`
	Tables      []TableStats `json:"tables"`
}

DatabaseStats contains overall database statistics.

type DuplicateSnapshotGroup

type DuplicateSnapshotGroup struct {
	BlobHash    string  // hex-encoded hash of grid_blob
	Count       int     // number of snapshots with this hash
	SnapshotIDs []int64 // list of snapshot IDs with this hash
	KeepID      int64   // the snapshot ID to keep (oldest)
	DeleteIDs   []int64 // snapshot IDs that would be deleted
	BlobBytes   int     // size of the blob in bytes
	SensorID    string  // sensor ID for this group
}

DuplicateSnapshotGroup represents a group of snapshots with the same grid_blob hash.

type Event

type Event struct {
	Magnitude sql.NullFloat64
	Uptime    sql.NullFloat64
	Speed     sql.NullFloat64
}

func (*Event) String

func (e *Event) String() string

type EventAPI

type EventAPI struct {
	Magnitude *float64 `json:"Magnitude,omitempty"`
	Uptime    *float64 `json:"Uptime,omitempty"`
	Speed     *float64 `json:"Speed,omitempty"`
}

func EventToAPI

func EventToAPI(e Event) EventAPI

type RadarObject

type RadarObject struct {
	Classifier   string
	StartTime    time.Time
	EndTime      time.Time
	DeltaTimeMs  int64
	MaxSpeed     float64
	MinSpeed     float64
	SpeedChange  float64
	MaxMagnitude int64
	AvgMagnitude int64
	TotalFrames  int64
	FramesPerMps float64
	Length       float64
}

func (*RadarObject) String

func (e *RadarObject) String() string

type RadarObjectsRollupRow

type RadarObjectsRollupRow struct {
	Classifier string
	StartTime  time.Time
	Count      int64
	P50Speed   float64
	P85Speed   float64
	P98Speed   float64
	MaxSpeed   float64
}

RadarObjectsRollupRow represents an aggregate row for radar object rollup.

func (*RadarObjectsRollupRow) String

func (e *RadarObjectsRollupRow) String() string

type RadarStatsResult

type RadarStatsResult struct {
	Metrics      []RadarObjectsRollupRow
	Histogram    map[float64]int64 // bucket start (mps) -> count; nil if histogram not requested
	MinSpeedUsed float64           // actual minimum speed filter applied (mps)
}

RadarStatsResult combines time-aggregated metrics with an optional histogram.

type Site

type Site struct {
	ID              int       `json:"id"`
	Name            string    `json:"name"`
	Location        string    `json:"location"`
	Description     *string   `json:"description"`
	Surveyor        string    `json:"surveyor"`
	Contact         string    `json:"contact"`
	Address         *string   `json:"address"`
	Latitude        *float64  `json:"latitude"`
	Longitude       *float64  `json:"longitude"`
	MapAngle        *float64  `json:"map_angle"`
	IncludeMap      bool      `json:"include_map"`
	SiteDescription *string   `json:"site_description"`
	CreatedAt       time.Time `json:"created_at"`
	UpdatedAt       time.Time `json:"updated_at"`
}

Site represents a survey site configuration

type SiteConfigPeriod

type SiteConfigPeriod struct {
	ID                 int       `json:"id"`
	SiteID             int       `json:"site_id"`
	EffectiveStartUnix float64   `json:"effective_start_unix"`
	EffectiveEndUnix   *float64  `json:"effective_end_unix"`
	IsActive           bool      `json:"is_active"`
	Notes              *string   `json:"notes"`
	CosineErrorAngle   float64   `json:"cosine_error_angle"`
	CreatedAt          time.Time `json:"created_at"`
	UpdatedAt          time.Time `json:"updated_at"`
}

SiteConfigPeriod represents a time-based configuration period for a site.

type SiteReport

type SiteReport struct {
	ID          int       `json:"id"`
	SiteID      int       `json:"site_id"`
	StartDate   string    `json:"start_date"`   // YYYY-MM-DD
	EndDate     string    `json:"end_date"`     // YYYY-MM-DD
	Filepath    string    `json:"filepath"`     // Relative path from pdf-generator directory
	Filename    string    `json:"filename"`     // PDF filename
	ZipFilepath *string   `json:"zip_filepath"` // Relative path to sources ZIP
	ZipFilename *string   `json:"zip_filename"` // ZIP filename
	RunID       string    `json:"run_id"`       // Timestamp-based run ID
	Timezone    string    `json:"timezone"`     // Report timezone
	Units       string    `json:"units"`        // mph or kph
	Source      string    `json:"source"`       // radar_objects, radar_data, or radar_data_transits
	CreatedAt   time.Time `json:"created_at"`
}

SiteReport represents a generated PDF report for a site

type TableStats

type TableStats struct {
	Name     string  `json:"name"`
	RowCount int64   `json:"row_count"`
	SizeMB   float64 `json:"size_mb"`
}

TableStats contains size and row count information for a database table.

type TransitController

type TransitController struct {
	// contains filtered or unexported fields
}

TransitController manages the state and execution of the transit worker. It provides thread-safe control over whether the transit worker runs, and supports manual triggering from the UI.

func NewTransitController

func NewTransitController(worker *TransitWorker) *TransitController

NewTransitController creates a new controller for the transit worker.

func (*TransitController) GetStatus

func (tc *TransitController) GetStatus() TransitStatus

GetStatus returns the current status of the transit worker.

func (*TransitController) IsEnabled

func (tc *TransitController) IsEnabled() bool

IsEnabled returns whether the transit worker is currently enabled.

func (*TransitController) Run

func (tc *TransitController) Run(ctx context.Context) error

Run starts the transit worker loop. This should be called in a goroutine. It will run periodically based on the worker's Interval, but only when enabled. It also responds to manual triggers from the UI.

func (*TransitController) SetEnabled

func (tc *TransitController) SetEnabled(enabled bool)

SetEnabled sets whether the transit worker should run. If enabling, it also triggers an immediate run.

func (*TransitController) TriggerFullHistoryRun

func (tc *TransitController) TriggerFullHistoryRun()

TriggerFullHistoryRun triggers a full-history run of the transit worker. This is non-blocking and safe to call multiple times.

func (*TransitController) TriggerManualRun

func (tc *TransitController) TriggerManualRun()

TriggerManualRun triggers a manual run of the transit worker. This is non-blocking and safe to call multiple times.

type TransitOverlap

type TransitOverlap struct {
	ModelVersion1 string
	ModelVersion2 string
	OverlapCount  int64
}

TransitOverlap represents a pair of overlapping transits with different model versions.

type TransitOverlapStats

type TransitOverlapStats struct {
	TotalTransits      int64
	ModelVersionCounts map[string]int64
	Overlaps           []TransitOverlap
}

TransitOverlapStats contains statistics about overlapping transits.

type TransitRunInfo

type TransitRunInfo struct {
	Trigger    string    `json:"trigger,omitempty"`
	StartedAt  time.Time `json:"started_at"`
	FinishedAt time.Time `json:"finished_at,omitempty"`
	DurationMs int64     `json:"duration_ms,omitempty"`
	Error      string    `json:"error,omitempty"`
}

TransitRunInfo captures details about a single transit worker run.

type TransitStatus

type TransitStatus struct {
	Enabled      bool            `json:"enabled"`
	LastRunAt    time.Time       `json:"last_run_at"`
	LastRunError string          `json:"last_run_error,omitempty"`
	RunCount     int64           `json:"run_count"`
	IsHealthy    bool            `json:"is_healthy"`
	CurrentRun   *TransitRunInfo `json:"current_run,omitempty"`
	LastRun      *TransitRunInfo `json:"last_run,omitempty"`
}

TransitStatus represents the current state of the transit worker.

type TransitWorker

type TransitWorker struct {
	DB *DB
	// Threshold in seconds used to split sessions (1,2,3,4,5 -> 1000,2000,...ms)
	ThresholdSeconds int
	ModelVersion     string
	Interval         time.Duration // how often to run (e.g., 15m)
	Window           time.Duration // lookback window (e.g., 20m)
	StopChan         chan struct{}
}

TransitWorker periodically scans recent radar_data and upserts sessionized transits into radar_data_transits and radar_transit_links. Designed to run every 15 minutes and process the last 20 minutes window (with a small overlap to allow updates).

func NewTransitWorker

func NewTransitWorker(db *DB, thresholdSeconds int, modelVersion string) *TransitWorker

func (*TransitWorker) DeleteAllTransits

func (w *TransitWorker) DeleteAllTransits(ctx context.Context, modelVersion string) (int64, error)

DeleteAllTransits removes all transits for a given model version.

func (*TransitWorker) MigrateModelVersion

func (w *TransitWorker) MigrateModelVersion(ctx context.Context, oldVersion string) error

MigrateModelVersion replaces all transits from oldVersion with the worker's current ModelVersion by deleting old transits and re-running over full history.

func (*TransitWorker) RunFullHistory

func (w *TransitWorker) RunFullHistory(ctx context.Context) error

RunFullHistory scans the full available radar_data range and upserts transits.

func (*TransitWorker) RunOnce

func (w *TransitWorker) RunOnce(ctx context.Context) error

RunOnce scans the last w.Window (+ small overlap) and upserts transits.

func (*TransitWorker) RunRange

func (w *TransitWorker) RunRange(ctx context.Context, start, end float64) error

RunRange scans the provided [start,end] (unix seconds as float64) and upserts transits.

func (*TransitWorker) Start

func (w *TransitWorker) Start()

Start runs the periodic worker loop in a goroutine.

func (*TransitWorker) Stop

func (w *TransitWorker) Stop()

Stop requests the worker to stop.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL