Documentation
¶
Index ¶
- Constants
- Variables
- func Execute() error
- func GenerateFilename(tableName string, timestamp time.Time, duration string, formatExt string, ...) string
- func GetPIDFilePath() string
- func GetTaskFilePath() string
- func GetTimeRangeForDuration(baseTime time.Time, duration string) (time.Time, time.Time)
- func IsProcessRunning(pid int) bool
- func ReadPIDFile() (int, error)
- func RemovePIDFile() error
- func RemoveTaskFile() error
- func SetSignalContext(ctx context.Context, stopFile string)
- func SplitPartitionByDuration(partitionStart, partitionEnd time.Time, duration string) []struct{ ... }
- func WritePIDFile() error
- func WriteTaskInfo(info *TaskInfo) error
- type Archiver
- type CacheEntry
- type CacheResponse
- type Config
- type DatabaseConfig
- type GitHubRelease
- type PartitionCache
- type PartitionCacheEntry
- type PartitionInfo
- type PathTemplate
- type Phase
- type ProcessResult
- type RowCountCache
- type RowCountEntry
- type S3Config
- type StatusResponse
- type TableCache
- type TaskInfo
- type VersionCheckCache
- type VersionCheckResult
- type WSMessage
Constants ¶
const ( StageSkipped = "Skipped" StageCancelled = "Cancelled" StageSetup = "Setup" )
Stage constants
const ( DurationHourly = "hourly" DurationDaily = "daily" DurationWeekly = "weekly" DurationMonthly = "monthly" DurationYearly = "yearly" )
Duration constants for output splitting
Variables ¶
var ( ErrInsufficientPermissions = errors.New("insufficient permissions to read table") ErrPartitionNoPermissions = errors.New("partition tables exist but you don't have SELECT permissions") ErrS3ClientNotInitialized = errors.New("S3 client not initialized") ErrS3UploaderNotInitialized = errors.New("S3 uploader not initialized") )
Error definitions
var ( ErrDatabaseUserRequired = errors.New("database user is required") ErrDatabaseNameRequired = errors.New("database name is required") ErrDatabasePortInvalid = errors.New("database port must be between 1 and 65535") ErrStatementTimeoutInvalid = errors.New("database statement timeout must be >= 0") ErrMaxRetriesInvalid = errors.New("database max retries must be >= 0") ErrRetryDelayInvalid = errors.New("database retry delay must be >= 0") ErrS3EndpointRequired = errors.New("S3 endpoint is required") ErrS3BucketRequired = errors.New("S3 bucket is required") ErrS3AccessKeyRequired = errors.New("S3 access key is required") ErrS3SecretKeyRequired = errors.New("S3 secret key is required") ErrS3RegionInvalid = errors.New("S3 region contains invalid characters or is too long") ErrTableNameRequired = errors.New("table name is required") ErrTableNameInvalid = errors.New("table name is invalid: must be 1-63 characters, start with a letter or underscore, and contain only letters, numbers, and underscores") ErrStartDateFormatInvalid = errors.New("invalid start date format") ErrEndDateFormatInvalid = errors.New("invalid end date format") ErrWorkersMinimum = errors.New("workers must be at least 1") ErrWorkersMaximum = errors.New("workers must not exceed 1000") ErrPathTemplateRequired = errors.New("path template is required") ErrPathTemplateInvalid = errors.New("path template must contain {table} placeholder") ErrOutputDurationInvalid = errors.New("output duration must be one of: hourly, daily, weekly, monthly, yearly") ErrOutputFormatInvalid = errors.New("output format must be one of: jsonl, csv, parquet") ErrCompressionInvalid = errors.New("compression must be one of: zstd, lz4, gzip, none") ErrCompressionLevelInvalid = errors.New("compression level must be between 1 and 22 (zstd), 1-9 (lz4/gzip)") ErrDateColumnInvalid = errors.New("date column is invalid: must start with a letter or underscore, and contain only letters, numbers, and underscores") )
Static errors for configuration validation
var (
ErrVersionCheckFailed = errors.New("version check failed")
)
Static errors for version checking
var ( // Version information - set via ldflags during build // Example: go build -ldflags "-X github.com/airframesio/data-archiver/cmd.Version=1.2.3" Version = "dev" // Default to "dev" if not set during build )
Functions ¶
func GenerateFilename ¶
func GenerateFilename(tableName string, timestamp time.Time, duration string, formatExt string, compressionExt string) string
GenerateFilename creates a filename based on duration and timestamp
func GetTaskFilePath ¶
func GetTaskFilePath() string
GetTaskFilePath returns the path to the task info file
func GetTimeRangeForDuration ¶
GetTimeRangeForDuration returns the start and end time for a given duration
func IsProcessRunning ¶
IsProcessRunning checks if a process with given PID is running Works on both Unix and Windows systems
func SetSignalContext ¶
SetSignalContext stores the signal-aware context created in main() This must be called before Execute() to ensure proper signal handling
func SplitPartitionByDuration ¶
func SplitPartitionByDuration(partitionStart, partitionEnd time.Time, duration string) []struct { Start time.Time End time.Time }
SplitPartitionByDuration splits a partition's date range into multiple time ranges based on duration
func WriteTaskInfo ¶
WriteTaskInfo writes current task information to file
Types ¶
type Archiver ¶
type Archiver struct {
// contains filtered or unexported fields
}
func (*Archiver) ProcessPartitionWithProgress ¶
func (a *Archiver) ProcessPartitionWithProgress(partition PartitionInfo, program *tea.Program) ProcessResult
type CacheEntry ¶
type CacheEntry struct {
Table string `json:"table"`
Partition string `json:"partition"`
RowCount int64 `json:"rowCount"`
CountTime time.Time `json:"countTime"`
FileSize int64 `json:"fileSize"`
UncompressedSize int64 `json:"uncompressedSize"`
FileMD5 string `json:"fileMD5"`
FileTime time.Time `json:"fileTime"`
S3Key string `json:"s3Key"`
S3Uploaded bool `json:"s3Uploaded"`
S3UploadTime time.Time `json:"s3UploadTime"`
LastError string `json:"lastError"`
ErrorTime time.Time `json:"errorTime"`
}
type CacheResponse ¶
type CacheResponse struct {
Tables []TableCache `json:"tables"`
Timestamp time.Time `json:"timestamp"`
}
type Config ¶
type Config struct {
Debug bool
LogFormat string
DryRun bool
Workers int
SkipCount bool
CacheViewer bool
ViewerPort int
Database DatabaseConfig
S3 S3Config
Table string
StartDate string
EndDate string
OutputDuration string
OutputFormat string
Compression string
CompressionLevel int
DateColumn string
}
type DatabaseConfig ¶
type DatabaseConfig struct {
Host string
Port int
User string
Password string
Name string
SSLMode string
StatementTimeout int // Statement timeout in seconds (0 = no timeout, default 300)
MaxRetries int // Maximum number of retry attempts for failed queries (default 3)
RetryDelay int // Delay in seconds between retry attempts (default 5)
}
type GitHubRelease ¶
type GitHubRelease struct {
TagName string `json:"tag_name"`
Name string `json:"name"`
PublishedAt time.Time `json:"published_at"`
HTMLURL string `json:"html_url"`
}
GitHubRelease represents the structure of GitHub's latest release API response
type PartitionCache ¶
type PartitionCache struct {
Entries map[string]PartitionCacheEntry `json:"entries"`
}
PartitionCache stores both row counts and file metadata
type PartitionCacheEntry ¶
type PartitionCacheEntry struct {
// Row count information
RowCount int64 `json:"row_count"`
CountTime time.Time `json:"count_time"`
// File metadata (stored after processing)
FileSize int64 `json:"file_size,omitempty"` // Compressed size
UncompressedSize int64 `json:"uncompressed_size,omitempty"` // Original size
FileMD5 string `json:"file_md5,omitempty"`
FileTime time.Time `json:"file_time,omitempty"`
// S3 information
S3Key string `json:"s3_key,omitempty"`
S3Uploaded bool `json:"s3_uploaded,omitempty"`
S3UploadTime time.Time `json:"s3_upload_time,omitempty"`
// Error tracking
LastError string `json:"last_error,omitempty"`
ErrorTime time.Time `json:"error_time,omitempty"`
}
type PathTemplate ¶
type PathTemplate struct {
// contains filtered or unexported fields
}
PathTemplate provides functionality to generate S3 paths from templates
func NewPathTemplate ¶
func NewPathTemplate(template string) *PathTemplate
NewPathTemplate creates a new PathTemplate instance
type ProcessResult ¶
type ProcessResult struct {
Partition PartitionInfo
Compressed bool
Uploaded bool
Skipped bool
SkipReason string
Error error
BytesWritten int64
Stage string
S3Key string // S3 object key for uploaded file
StartTime time.Time // When partition processing started
Duration time.Duration // How long partition processing took
}
type RowCountCache ¶
type RowCountCache struct {
Counts map[string]RowCountEntry `json:"counts"`
}
Legacy support - keep old structure for backward compatibility
type RowCountEntry ¶
type StatusResponse ¶
type StatusResponse struct {
ArchiverRunning bool `json:"archiverRunning"`
PID int `json:"pid,omitempty"`
CurrentTask *TaskInfo `json:"currentTask,omitempty"`
Version string `json:"version"`
UpdateAvailable bool `json:"updateAvailable"`
LatestVersion string `json:"latestVersion,omitempty"`
ReleaseURL string `json:"releaseUrl,omitempty"`
}
type TableCache ¶
type TableCache struct {
TableName string `json:"tableName"`
Entries []CacheEntry `json:"entries"`
}
type TaskInfo ¶
type TaskInfo struct {
PID int `json:"pid"`
StartTime time.Time `json:"start_time"`
Table string `json:"table"`
StartDate string `json:"start_date"`
EndDate string `json:"end_date"`
CurrentTask string `json:"current_task"`
CurrentPartition string `json:"current_partition,omitempty"`
CurrentStep string `json:"current_step,omitempty"`
Progress float64 `json:"progress"`
TotalItems int `json:"total_items"`
CompletedItems int `json:"completed_items"`
LastUpdate time.Time `json:"last_update"`
}
TaskInfo represents the current archiving task status
func ReadTaskInfo ¶
ReadTaskInfo reads current task information from file
type VersionCheckCache ¶
type VersionCheckCache struct {
UpdateAvailable bool `json:"update_available"`
LatestVersion string `json:"latest_version"`
ReleaseURL string `json:"release_url"`
Timestamp time.Time `json:"timestamp"`
}
VersionCheckCache represents cached version check data