cmd

package
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2025 License: MIT Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StageSkipped   = "Skipped"
	StageCancelled = "Cancelled"
	StageSetup     = "Setup"
)

Stage constants

View Source
const (
	DurationHourly  = "hourly"
	DurationDaily   = "daily"
	DurationWeekly  = "weekly"
	DurationMonthly = "monthly"
	DurationYearly  = "yearly"
)

Duration constants for output splitting

Variables

View Source
var (
	ErrInsufficientPermissions  = errors.New("insufficient permissions to read table")
	ErrPartitionNoPermissions   = errors.New("partition tables exist but you don't have SELECT permissions")
	ErrS3ClientNotInitialized   = errors.New("S3 client not initialized")
	ErrS3UploaderNotInitialized = errors.New("S3 uploader not initialized")
)

Error definitions

View Source
var (
	ErrDatabaseUserRequired    = errors.New("database user is required")
	ErrDatabaseNameRequired    = errors.New("database name is required")
	ErrDatabasePortInvalid     = errors.New("database port must be between 1 and 65535")
	ErrStatementTimeoutInvalid = errors.New("database statement timeout must be >= 0")
	ErrMaxRetriesInvalid       = errors.New("database max retries must be >= 0")
	ErrRetryDelayInvalid       = errors.New("database retry delay must be >= 0")
	ErrS3EndpointRequired      = errors.New("S3 endpoint is required")
	ErrS3BucketRequired        = errors.New("S3 bucket is required")
	ErrS3AccessKeyRequired     = errors.New("S3 access key is required")
	ErrS3SecretKeyRequired     = errors.New("S3 secret key is required")
	ErrS3RegionInvalid         = errors.New("S3 region contains invalid characters or is too long")
	ErrTableNameRequired       = errors.New("table name is required")
	ErrTableNameInvalid        = errors.New("table name is invalid: must be 1-63 characters, start with a letter or underscore, and contain only letters, numbers, and underscores")
	ErrStartDateFormatInvalid  = errors.New("invalid start date format")
	ErrEndDateFormatInvalid    = errors.New("invalid end date format")
	ErrWorkersMinimum          = errors.New("workers must be at least 1")
	ErrWorkersMaximum          = errors.New("workers must not exceed 1000")
	ErrPathTemplateRequired    = errors.New("path template is required")
	ErrPathTemplateInvalid     = errors.New("path template must contain {table} placeholder")
	ErrOutputDurationInvalid   = errors.New("output duration must be one of: hourly, daily, weekly, monthly, yearly")
	ErrOutputFormatInvalid     = errors.New("output format must be one of: jsonl, csv, parquet")
	ErrCompressionInvalid      = errors.New("compression must be one of: zstd, lz4, gzip, none")
	ErrCompressionLevelInvalid = errors.New("compression level must be between 1 and 22 (zstd), 1-9 (lz4/gzip)")
	ErrDateColumnInvalid       = errors.New("date column is invalid: must start with a letter or underscore, and contain only letters, numbers, and underscores")
)

Static errors for configuration validation

View Source
var (
	ErrVersionCheckFailed = errors.New("version check failed")
)

Static errors for version checking

View Source
var (
	// Version information - set via ldflags during build
	// Example: go build -ldflags "-X github.com/airframesio/data-archiver/cmd.Version=1.2.3"
	Version = "dev" // Default to "dev" if not set during build

)

Functions

func Execute

func Execute() error

func GenerateFilename

func GenerateFilename(tableName string, timestamp time.Time, duration string, formatExt string, compressionExt string) string

GenerateFilename creates a filename based on duration and timestamp

func GetPIDFilePath

func GetPIDFilePath() string

GetPIDFilePath returns the path to the PID file

func GetTaskFilePath

func GetTaskFilePath() string

GetTaskFilePath returns the path to the task info file

func GetTimeRangeForDuration

func GetTimeRangeForDuration(baseTime time.Time, duration string) (time.Time, time.Time)

GetTimeRangeForDuration returns the start and end time for a given duration

func IsProcessRunning

func IsProcessRunning(pid int) bool

IsProcessRunning checks if a process with given PID is running Works on both Unix and Windows systems

func ReadPIDFile

func ReadPIDFile() (int, error)

ReadPIDFile reads the PID from file

func RemovePIDFile

func RemovePIDFile() error

RemovePIDFile removes the PID file

func RemoveTaskFile

func RemoveTaskFile() error

RemoveTaskFile removes the task info file

func SetSignalContext

func SetSignalContext(ctx context.Context, stopFile string)

SetSignalContext stores the signal-aware context created in main() This must be called before Execute() to ensure proper signal handling

func SplitPartitionByDuration

func SplitPartitionByDuration(partitionStart, partitionEnd time.Time, duration string) []struct {
	Start time.Time
	End   time.Time
}

SplitPartitionByDuration splits a partition's date range into multiple time ranges based on duration

func WritePIDFile

func WritePIDFile() error

WritePIDFile writes the current process PID to a file

func WriteTaskInfo

func WriteTaskInfo(info *TaskInfo) error

WriteTaskInfo writes current task information to file

Types

type Archiver

type Archiver struct {
	// contains filtered or unexported fields
}

func NewArchiver

func NewArchiver(config *Config, logger *slog.Logger) *Archiver

func (*Archiver) ProcessPartitionWithProgress

func (a *Archiver) ProcessPartitionWithProgress(partition PartitionInfo, program *tea.Program) ProcessResult

func (*Archiver) Run

func (a *Archiver) Run(ctx context.Context) error

type CacheEntry

type CacheEntry struct {
	Table            string    `json:"table"`
	Partition        string    `json:"partition"`
	RowCount         int64     `json:"rowCount"`
	CountTime        time.Time `json:"countTime"`
	FileSize         int64     `json:"fileSize"`
	UncompressedSize int64     `json:"uncompressedSize"`
	FileMD5          string    `json:"fileMD5"`
	FileTime         time.Time `json:"fileTime"`
	S3Key            string    `json:"s3Key"`
	S3Uploaded       bool      `json:"s3Uploaded"`
	S3UploadTime     time.Time `json:"s3UploadTime"`
	LastError        string    `json:"lastError"`
	ErrorTime        time.Time `json:"errorTime"`
}

type CacheResponse

type CacheResponse struct {
	Tables    []TableCache `json:"tables"`
	Timestamp time.Time    `json:"timestamp"`
}

type Config

type Config struct {
	Debug            bool
	LogFormat        string
	DryRun           bool
	Workers          int
	SkipCount        bool
	CacheViewer      bool
	ViewerPort       int
	Database         DatabaseConfig
	S3               S3Config
	Table            string
	StartDate        string
	EndDate          string
	OutputDuration   string
	OutputFormat     string
	Compression      string
	CompressionLevel int
	DateColumn       string
}

func (*Config) Validate

func (c *Config) Validate() error

type DatabaseConfig

type DatabaseConfig struct {
	Host             string
	Port             int
	User             string
	Password         string
	Name             string
	SSLMode          string
	StatementTimeout int // Statement timeout in seconds (0 = no timeout, default 300)
	MaxRetries       int // Maximum number of retry attempts for failed queries (default 3)
	RetryDelay       int // Delay in seconds between retry attempts (default 5)
}

type GitHubRelease

type GitHubRelease struct {
	TagName     string    `json:"tag_name"`
	Name        string    `json:"name"`
	PublishedAt time.Time `json:"published_at"`
	HTMLURL     string    `json:"html_url"`
}

GitHubRelease represents the structure of GitHub's latest release API response

type PartitionCache

type PartitionCache struct {
	Entries map[string]PartitionCacheEntry `json:"entries"`
}

PartitionCache stores both row counts and file metadata

type PartitionCacheEntry

type PartitionCacheEntry struct {
	// Row count information
	RowCount  int64     `json:"row_count"`
	CountTime time.Time `json:"count_time"`

	// File metadata (stored after processing)
	FileSize         int64     `json:"file_size,omitempty"`         // Compressed size
	UncompressedSize int64     `json:"uncompressed_size,omitempty"` // Original size
	FileMD5          string    `json:"file_md5,omitempty"`
	FileTime         time.Time `json:"file_time,omitempty"`

	// S3 information
	S3Key        string    `json:"s3_key,omitempty"`
	S3Uploaded   bool      `json:"s3_uploaded,omitempty"`
	S3UploadTime time.Time `json:"s3_upload_time,omitempty"`

	// Error tracking
	LastError string    `json:"last_error,omitempty"`
	ErrorTime time.Time `json:"error_time,omitempty"`
}

type PartitionInfo

type PartitionInfo struct {
	TableName string
	Date      time.Time
	RowCount  int64
}

type PathTemplate

type PathTemplate struct {
	// contains filtered or unexported fields
}

PathTemplate provides functionality to generate S3 paths from templates

func NewPathTemplate

func NewPathTemplate(template string) *PathTemplate

NewPathTemplate creates a new PathTemplate instance

func (*PathTemplate) Generate

func (pt *PathTemplate) Generate(tableName string, timestamp time.Time) string

Generate replaces placeholders in the template with actual values Supports: {table}, {YYYY}, {MM}, {DD}, {HH}

type Phase

type Phase int
const (
	PhaseConnecting Phase = iota
	PhaseCheckingPermissions
	PhaseDiscovering
	PhaseCounting
	PhaseProcessing
	PhaseComplete
)

type ProcessResult

type ProcessResult struct {
	Partition    PartitionInfo
	Compressed   bool
	Uploaded     bool
	Skipped      bool
	SkipReason   string
	Error        error
	BytesWritten int64
	Stage        string
	S3Key        string        // S3 object key for uploaded file
	StartTime    time.Time     // When partition processing started
	Duration     time.Duration // How long partition processing took
}

type RowCountCache

type RowCountCache struct {
	Counts map[string]RowCountEntry `json:"counts"`
}

Legacy support - keep old structure for backward compatibility

type RowCountEntry

type RowCountEntry struct {
	Count     int64     `json:"count"`
	Timestamp time.Time `json:"timestamp"`
}

type S3Config

type S3Config struct {
	Endpoint     string
	Bucket       string
	AccessKey    string
	SecretKey    string
	Region       string
	PathTemplate string
}

type StatusResponse

type StatusResponse struct {
	ArchiverRunning bool      `json:"archiverRunning"`
	PID             int       `json:"pid,omitempty"`
	CurrentTask     *TaskInfo `json:"currentTask,omitempty"`
	Version         string    `json:"version"`
	UpdateAvailable bool      `json:"updateAvailable"`
	LatestVersion   string    `json:"latestVersion,omitempty"`
	ReleaseURL      string    `json:"releaseUrl,omitempty"`
}

type TableCache

type TableCache struct {
	TableName string       `json:"tableName"`
	Entries   []CacheEntry `json:"entries"`
}

type TaskInfo

type TaskInfo struct {
	PID              int       `json:"pid"`
	StartTime        time.Time `json:"start_time"`
	Table            string    `json:"table"`
	StartDate        string    `json:"start_date"`
	EndDate          string    `json:"end_date"`
	CurrentTask      string    `json:"current_task"`
	CurrentPartition string    `json:"current_partition,omitempty"`
	CurrentStep      string    `json:"current_step,omitempty"`
	Progress         float64   `json:"progress"`
	TotalItems       int       `json:"total_items"`
	CompletedItems   int       `json:"completed_items"`
	LastUpdate       time.Time `json:"last_update"`
}

TaskInfo represents the current archiving task status

func ReadTaskInfo

func ReadTaskInfo() (*TaskInfo, error)

ReadTaskInfo reads current task information from file

type VersionCheckCache

type VersionCheckCache struct {
	UpdateAvailable bool      `json:"update_available"`
	LatestVersion   string    `json:"latest_version"`
	ReleaseURL      string    `json:"release_url"`
	Timestamp       time.Time `json:"timestamp"`
}

VersionCheckCache represents cached version check data

type VersionCheckResult

type VersionCheckResult struct {
	UpdateAvailable bool
	CurrentVersion  string
	LatestVersion   string
	ReleaseURL      string
	Error           error
}

VersionCheckResult contains the result of checking for updates

type WSMessage

type WSMessage struct {
	Type string      `json:"type"`
	Data interface{} `json:"data"`
}

WebSocket message types

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL