Documentation
¶
Index ¶
- Constants
- Variables
- func Execute() error
- func GenerateFilename(tableName string, timestamp time.Time, duration string, formatExt string, ...) string
- func GetPIDFilePath() string
- func GetTaskFilePath() string
- func GetTimeRangeForDuration(baseTime time.Time, duration string) (time.Time, time.Time)
- func IsProcessRunning(pid int) bool
- func ReadPIDFile() (int, error)
- func RemovePIDFile() error
- func RemoveTaskFile() error
- func SetSignalContext(ctx context.Context, stopFile string)
- func SplitPartitionByDuration(partitionStart, partitionEnd time.Time, duration string) []struct{ ... }
- func WritePIDFile() error
- func WriteTaskInfo(info *TaskInfo) error
- type Archiver
- type CacheEntry
- type CacheResponse
- type CacheScope
- type ColumnInfo
- type ColumnTypeMismatch
- type CompareConfig
- type Comparer
- type ComparisonResult
- type ComparisonSource
- type Config
- type DataComparisonResult
- type DatabaseConfig
- type GitHubRelease
- type LogMessage
- type PartitionCache
- type PartitionCacheEntry
- type PartitionInfo
- type PathTemplate
- type PgDumpExecutor
- type Phase
- type ProcessResult
- type Restorer
- type RowByRowDiff
- type RowCountCache
- type RowCountDiff
- type RowCountEntry
- type S3Config
- type S3File
- type SampleDiff
- type SchemaComparisonResult
- type StatusResponse
- type TableCache
- type TableSchema
- type TableSchemaDiff
- type TaskInfo
- type VersionCheckCache
- type VersionCheckResult
- type WSMessage
Constants ¶
const ( StageSkipped = "Skipped" StageCancelled = "Cancelled" StageSetup = "Setup" )
Stage constants
const ( DurationHourly = "hourly" DurationDaily = "daily" DurationWeekly = "weekly" DurationMonthly = "monthly" DurationYearly = "yearly" )
Duration constants for output splitting
Variables ¶
var ( ErrInsufficientPermissions = errors.New("insufficient permissions to read table") ErrPartitionNoPermissions = errors.New("partition tables exist but you don't have SELECT permissions") ErrS3ClientNotInitialized = errors.New("S3 client not initialized") ErrS3UploaderNotInitialized = errors.New("S3 uploader not initialized") )
Error definitions
var ( ErrDatabaseUserRequired = errors.New("database user is required") ErrDatabaseNameRequired = errors.New("database name is required") ErrDatabasePortInvalid = errors.New("database port must be between 1 and 65535") ErrStatementTimeoutInvalid = errors.New("database statement timeout must be >= 0") ErrMaxRetriesInvalid = errors.New("database max retries must be >= 0") ErrRetryDelayInvalid = errors.New("database retry delay must be >= 0") ErrS3EndpointRequired = errors.New("S3 endpoint is required") ErrS3BucketRequired = errors.New("S3 bucket is required") ErrS3AccessKeyRequired = errors.New("S3 access key is required") ErrS3SecretKeyRequired = errors.New("S3 secret key is required") ErrS3RegionInvalid = errors.New("S3 region contains invalid characters or is too long") ErrTableNameRequired = errors.New("table name is required") ErrTableNameInvalid = errors.New("table name is invalid: must be 1-63 characters, start with a letter or underscore, and contain only letters, numbers, and underscores") ErrStartDateFormatInvalid = errors.New("invalid start date format") ErrEndDateFormatInvalid = errors.New("invalid end date format") ErrWorkersMinimum = errors.New("workers must be at least 1") ErrWorkersMaximum = errors.New("workers must not exceed 1000") ErrChunkSizeMinimum = errors.New("chunk size must be at least 100") ErrChunkSizeMaximum = errors.New("chunk size must not exceed 1000000") ErrPathTemplateRequired = errors.New("path template is required") ErrPathTemplateInvalid = errors.New("path template must contain {table} placeholder") ErrOutputDurationInvalid = errors.New("output duration must be one of: hourly, daily, weekly, monthly, yearly") ErrOutputFormatInvalid = errors.New("output format must be one of: jsonl, csv, parquet") ErrCompressionInvalid = errors.New("compression must be one of: zstd, lz4, gzip, none") ErrCompressionLevelInvalid = errors.New("compression level must be between 1 and 22 (zstd), 1-9 (lz4/gzip)") ErrDateColumnInvalid = errors.New("date column is invalid: must start with a letter or underscore, and contain only letters, numbers, and underscores") ErrDumpModeInvalid = errors.New("dump mode must be one of: schema-only, data-only, schema-and-data") )
Static errors for configuration validation
var ErrTableHasNoColumns = errors.New("table exists but has no columns")
ErrTableHasNoColumns is returned when a table exists but has no columns
var ErrTableNotFound = errors.New("table not found")
ErrTableNotFound is returned when a table does not exist
var ErrTableNotFoundOrEmpty = errors.New("table not found or has no columns")
ErrTableNotFoundOrEmpty is returned when a table is not found or has no columns
var (
ErrVersionCheckFailed = errors.New("version check failed")
)
Static errors for version checking
var ( // Version information - set via ldflags during build // Example: go build -ldflags "-X github.com/airframesio/data-archiver/cmd.Version=1.2.3" Version = "dev" // Default to "dev" if not set during build )
Functions ¶
func GenerateFilename ¶
func GenerateFilename(tableName string, timestamp time.Time, duration string, formatExt string, compressionExt string) string
GenerateFilename creates a filename based on duration and timestamp
func GetTaskFilePath ¶
func GetTaskFilePath() string
GetTaskFilePath returns the path to the task info file
func GetTimeRangeForDuration ¶
GetTimeRangeForDuration returns the start and end time for a given duration
func IsProcessRunning ¶
IsProcessRunning checks if a process with given PID is running Works on both Unix and Windows systems
func SetSignalContext ¶
SetSignalContext stores the signal-aware context created in main() This must be called before Execute() to ensure proper signal handling
func SplitPartitionByDuration ¶
func SplitPartitionByDuration(partitionStart, partitionEnd time.Time, duration string) []struct { Start time.Time End time.Time }
SplitPartitionByDuration splits a partition's date range into multiple time ranges based on duration
func WriteTaskInfo ¶
WriteTaskInfo writes current task information to file
Types ¶
type Archiver ¶
type Archiver struct {
// contains filtered or unexported fields
}
func (*Archiver) ProcessPartitionWithProgress ¶
func (a *Archiver) ProcessPartitionWithProgress(partition PartitionInfo, program *tea.Program) ProcessResult
type CacheEntry ¶
type CacheEntry struct {
Table string `json:"table"`
Partition string `json:"partition"`
RowCount int64 `json:"rowCount"`
CountTime time.Time `json:"countTime"`
FileSize int64 `json:"fileSize"`
UncompressedSize int64 `json:"uncompressedSize"`
FileMD5 string `json:"fileMD5"`
MultipartETag string `json:"multipartETag,omitempty"` // S3 multipart ETag for files >100MB
FileTime time.Time `json:"fileTime"`
S3Key string `json:"s3Key"`
S3Uploaded bool `json:"s3Uploaded"`
S3UploadTime time.Time `json:"s3UploadTime"`
LastError string `json:"lastError"`
ErrorTime time.Time `json:"errorTime"`
ProcessStartTime time.Time `json:"processStartTime,omitempty"` // When processing started for this job
}
type CacheResponse ¶
type CacheResponse struct {
Tables []TableCache `json:"tables"`
Timestamp time.Time `json:"timestamp"`
}
type CacheScope ¶ added in v1.6.0
CacheScope represents a unique namespace for cached metadata. It combines the executing subcommand with the fully-qualified output path so that different commands or destinations do not clobber each other's cache.
func NewCacheScope ¶ added in v1.6.0
func NewCacheScope(command string, cfg *Config) CacheScope
NewCacheScope builds a cache scope for the provided command/config pair.
type ColumnInfo ¶ added in v1.4.0
type ColumnInfo struct {
Name string
DataType string
UDTName string // PostgreSQL user-defined type name (e.g., int4, varchar, timestamp)
}
ColumnInfo represents metadata about a database column
func (*ColumnInfo) GetName ¶ added in v1.4.0
func (c *ColumnInfo) GetName() string
GetName implements formatters.ColumnSchema
func (*ColumnInfo) GetType ¶ added in v1.4.0
func (c *ColumnInfo) GetType() string
GetType implements formatters.ColumnSchema
type ColumnTypeMismatch ¶ added in v1.5.5
type ColumnTypeMismatch struct {
ColumnName string `json:"column_name"`
Source1Type string `json:"source1_type"`
Source2Type string `json:"source2_type"`
}
ColumnTypeMismatch represents a column type difference
type CompareConfig ¶ added in v1.5.5
type CompareConfig struct {
Mode string // schema-only, data-only, schema-and-data
DataCompareType string // row-count, row-by-row, sample
SampleSize int
Tables []string // Empty = all tables
OutputFormat string // text, json
OutputFile string
Debug bool
DryRun bool
}
CompareConfig contains comparison configuration
type Comparer ¶ added in v1.5.5
type Comparer struct {
// contains filtered or unexported fields
}
Comparer handles comparison operations
func NewComparer ¶ added in v1.5.5
func NewComparer(source1, source2 *ComparisonSource, config *CompareConfig, logger *slog.Logger) *Comparer
NewComparer creates a new Comparer instance
type ComparisonResult ¶ added in v1.5.5
type ComparisonResult struct {
Schema *SchemaComparisonResult `json:"schema,omitempty"`
Data *DataComparisonResult `json:"data,omitempty"`
}
ComparisonResult contains the results of a comparison
type ComparisonSource ¶ added in v1.5.5
type ComparisonSource struct {
Type string // "db" or "s3"
Database DatabaseConfig
S3 S3Config
SchemaPath string // S3 path for schemas
DataPath string // S3 path for data
SchemaSource string // pg_dump, inferred, auto
}
ComparisonSource represents a source for comparison (database or S3)
type Config ¶
type Config struct {
Debug bool
LogFormat string
DryRun bool
Workers int
SkipCount bool
CacheViewer bool
ViewerPort int
ChunkSize int // Number of rows to process in each chunk (streaming mode)
Database DatabaseConfig
S3 S3Config
Table string
StartDate string
EndDate string
OutputDuration string
OutputFormat string
Compression string
CompressionLevel int
DateColumn string
DumpMode string // pg_dump mode: schema-only, data-only, schema-and-data
CacheScope CacheScope
}
type DataComparisonResult ¶ added in v1.5.5
type DataComparisonResult struct {
RowCountDiffs map[string]*RowCountDiff `json:"row_count_diffs,omitempty"`
RowByRowDiffs map[string]*RowByRowDiff `json:"row_by_row_diffs,omitempty"`
SampleDiffs map[string]*SampleDiff `json:"sample_diffs,omitempty"`
}
DataComparisonResult contains data comparison results
type DatabaseConfig ¶
type DatabaseConfig struct {
Host string
Port int
User string
Password string
Name string
SSLMode string
StatementTimeout int // Statement timeout in seconds (0 = no timeout, default 300)
MaxRetries int // Maximum number of retry attempts for failed queries (default 3)
RetryDelay int // Delay in seconds between retry attempts (default 5)
}
type GitHubRelease ¶
type GitHubRelease struct {
TagName string `json:"tag_name"`
Name string `json:"name"`
PublishedAt time.Time `json:"published_at"`
HTMLURL string `json:"html_url"`
}
GitHubRelease represents the structure of GitHub's latest release API response
type LogMessage ¶ added in v1.5.0
type PartitionCache ¶
type PartitionCache struct {
Entries map[string]PartitionCacheEntry `json:"entries"`
}
PartitionCache stores both row counts and file metadata
type PartitionCacheEntry ¶
type PartitionCacheEntry struct {
// Row count information
RowCount int64 `json:"row_count"`
CountTime time.Time `json:"count_time"`
// File metadata (stored after processing)
FileSize int64 `json:"file_size,omitempty"` // Compressed size
UncompressedSize int64 `json:"uncompressed_size,omitempty"` // Original size
FileMD5 string `json:"file_md5,omitempty"`
MultipartETag string `json:"multipart_etag,omitempty"` // S3 multipart ETag for files >100MB
FileTime time.Time `json:"file_time,omitempty"`
// S3 information
S3Key string `json:"s3_key,omitempty"`
S3Uploaded bool `json:"s3_uploaded,omitempty"`
S3UploadTime time.Time `json:"s3_upload_time,omitempty"`
// Error tracking
LastError string `json:"last_error,omitempty"`
ErrorTime time.Time `json:"error_time,omitempty"`
// Processing time tracking
ProcessStartTime time.Time `json:"process_start_time,omitempty"` // When processing started for this job
}
type PartitionInfo ¶
type PartitionInfo struct {
TableName string
Date time.Time
RowCount int64
RangeStart time.Time
RangeEnd time.Time
}
func (PartitionInfo) HasCustomRange ¶ added in v1.6.0
func (p PartitionInfo) HasCustomRange() bool
type PathTemplate ¶
type PathTemplate struct {
// contains filtered or unexported fields
}
PathTemplate provides functionality to generate S3 paths from templates
func NewPathTemplate ¶
func NewPathTemplate(template string) *PathTemplate
NewPathTemplate creates a new PathTemplate instance
type PgDumpExecutor ¶ added in v1.5.4
type PgDumpExecutor struct {
// contains filtered or unexported fields
}
PgDumpExecutor handles pg_dump operations with S3 upload
func NewPgDumpExecutor ¶ added in v1.5.4
func NewPgDumpExecutor(config *Config, logger *slog.Logger) *PgDumpExecutor
NewPgDumpExecutor creates a new pg_dump executor
type ProcessResult ¶
type ProcessResult struct {
Partition PartitionInfo
Compressed bool
Uploaded bool
Skipped bool
SkipReason string
Error error
BytesWritten int64
Stage string
S3Key string // S3 object key for uploaded file
StartTime time.Time // When partition processing started
Duration time.Duration // How long partition processing took
}
type Restorer ¶ added in v1.5.3
type Restorer struct {
// contains filtered or unexported fields
}
Restorer handles restoration of tables from S3
func NewRestorer ¶ added in v1.5.3
NewRestorer creates a new Restorer instance
type RowByRowDiff ¶ added in v1.5.5
type RowByRowDiff struct {
Source1TotalRows int64 `json:"source1_total_rows"`
Source2TotalRows int64 `json:"source2_total_rows"`
MatchingRows int64 `json:"matching_rows"`
MissingInSource2 int64 `json:"missing_in_source2"`
ExtraInSource2 int64 `json:"extra_in_source2"`
}
RowByRowDiff contains row-by-row comparison results
type RowCountCache ¶
type RowCountCache struct {
Counts map[string]RowCountEntry `json:"counts"`
}
Legacy support - keep old structure for backward compatibility
type RowCountDiff ¶ added in v1.5.5
type RowCountDiff struct {
Source1Count int64 `json:"source1_count"`
Source2Count int64 `json:"source2_count"`
Difference int64 `json:"difference"`
}
RowCountDiff contains row count differences
type RowCountEntry ¶
type S3File ¶ added in v1.5.3
type S3File struct {
Key string
Size int64
LastModified time.Time
DetectedFormat string
DetectedCompression string
Date time.Time // Extracted from filename
}
S3File represents a file found in S3
type SampleDiff ¶ added in v1.5.5
type SampleDiff struct {
Source1Sample []map[string]interface{} `json:"source1_sample,omitempty"`
Source2Sample []map[string]interface{} `json:"source2_sample,omitempty"`
Differences []string `json:"differences"`
}
SampleDiff contains sample comparison results
type SchemaComparisonResult ¶ added in v1.5.5
type SchemaComparisonResult struct {
TablesOnlyInSource1 []string `json:"tables_only_in_source1"`
TablesOnlyInSource2 []string `json:"tables_only_in_source2"`
TableDiffs map[string]*TableSchemaDiff `json:"table_diffs"`
}
SchemaComparisonResult contains schema comparison results
type StatusResponse ¶
type StatusResponse struct {
ArchiverRunning bool `json:"archiverRunning"`
PID int `json:"pid,omitempty"`
CurrentTask *TaskInfo `json:"currentTask,omitempty"`
Version string `json:"version"`
UpdateAvailable bool `json:"updateAvailable"`
LatestVersion string `json:"latestVersion,omitempty"`
ReleaseURL string `json:"releaseUrl,omitempty"`
// Slice tracking fields
CurrentSliceIndex int `json:"currentSliceIndex,omitempty"`
TotalSlices int `json:"totalSlices,omitempty"`
CurrentSliceDate string `json:"currentSliceDate,omitempty"`
IsSlicing bool `json:"isSlicing"`
}
type TableCache ¶
type TableCache struct {
TableName string `json:"tableName"`
Entries []CacheEntry `json:"entries"`
}
type TableSchema ¶ added in v1.4.0
type TableSchema struct {
TableName string
Columns []ColumnInfo
}
TableSchema represents the schema of a database table
func (*TableSchema) GetColumns ¶ added in v1.4.0
func (s *TableSchema) GetColumns() []formatters.ColumnSchema
GetColumns implements formatters.TableSchema
type TableSchemaDiff ¶ added in v1.5.5
type TableSchemaDiff struct {
ColumnsOnlyInSource1 []ColumnInfo `json:"columns_only_in_source1"`
ColumnsOnlyInSource2 []ColumnInfo `json:"columns_only_in_source2"`
TypeMismatches []ColumnTypeMismatch `json:"type_mismatches"`
}
TableSchemaDiff contains differences for a single table
type TaskInfo ¶
type TaskInfo struct {
PID int `json:"pid"`
StartTime time.Time `json:"start_time"`
Table string `json:"table"`
StartDate string `json:"start_date"`
EndDate string `json:"end_date"`
CurrentTask string `json:"current_task"`
CurrentPartition string `json:"current_partition,omitempty"`
CurrentStep string `json:"current_step,omitempty"`
Progress float64 `json:"progress"`
TotalItems int `json:"total_items"`
CompletedItems int `json:"completed_items"`
LastUpdate time.Time `json:"last_update"`
// Slice tracking fields
CurrentSliceIndex int `json:"current_slice_index,omitempty"`
TotalSlices int `json:"total_slices,omitempty"`
CurrentSliceDate string `json:"current_slice_date,omitempty"`
// Partition and slice statistics
TotalPartitions int `json:"total_partitions,omitempty"` // Total partitions discovered
PartitionsCounted int `json:"partitions_counted,omitempty"` // Partitions that have been counted
PartitionsProcessed int `json:"partitions_processed,omitempty"` // Partitions that have been processed
SlicesProcessed int `json:"slices_processed,omitempty"` // Total slices processed across all partitions
}
TaskInfo represents the current archiving task status
func ReadTaskInfo ¶
ReadTaskInfo reads current task information from file
type VersionCheckCache ¶
type VersionCheckCache struct {
UpdateAvailable bool `json:"update_available"`
LatestVersion string `json:"latest_version"`
ReleaseURL string `json:"release_url"`
Timestamp time.Time `json:"timestamp"`
}
VersionCheckCache represents cached version check data