Documentation
¶
Index ¶
- type Checkpoint
- type ProcessedFile
- type Progress
- type ResumeInfo
- type State
- type Statistics
- type Tracker
- func (t *Tracker) CleanupOldProgress(olderThan time.Duration) error
- func (t *Tracker) GetProgress(sourceHash string) (*Progress, error)
- func (t *Tracker) GetResumeInfo() (*ResumeInfo, error)
- func (t *Tracker) GetStatistics() (*Statistics, error)
- func (t *Tracker) IsFileProcessed(fileName string) bool
- func (t *Tracker) MarkCompleted() error
- func (t *Tracker) MarkFailed(errorMsg string) error
- func (t *Tracker) RecordFileProcessed(fileName string, fileSize int64, recordsCount int, checksum string) error
- func (t *Tracker) SaveCheckpointFile(dir string) error
- func (t *Tracker) SetCheckpointInterval(interval time.Duration)
- func (t *Tracker) StartOrResume(sourceURL string, forceRestart bool) (*Progress, error)
- func (t *Tracker) UpdateDownloadProgress(downloadedBytes, totalBytes int64) error
- func (t *Tracker) UpdateProcessingProgress(tarPosition int64, processedBytes int64, lastFile string, records int64) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Checkpoint ¶
type Checkpoint struct {
ID int64 `json:"id" db:"id"`
ProgressID int64 `json:"progress_id" db:"progress_id"`
CheckpointTime time.Time `json:"checkpoint_time" db:"checkpoint_time"`
TarPosition int64 `json:"tar_position" db:"tar_position"`
BytesProcessed int64 `json:"bytes_processed" db:"bytes_processed"`
RecordsProcessed int64 `json:"records_processed" db:"records_processed"`
LastTransactionID string `json:"last_transaction_id" db:"last_transaction_id"`
}
Checkpoint represents a checkpoint in the ingestion process
type ProcessedFile ¶
type ProcessedFile struct {
ID int64 `json:"id" db:"id"`
ProgressID int64 `json:"progress_id" db:"progress_id"`
FileName string `json:"file_name" db:"file_name"`
FileSize int64 `json:"file_size" db:"file_size"`
RecordsCount int `json:"records_count" db:"records_count"`
ProcessedAt time.Time `json:"processed_at" db:"processed_at"`
Checksum string `json:"checksum" db:"checksum"`
}
ProcessedFile represents a file that has been processed
type Progress ¶
type Progress struct {
ID int64 `json:"id" db:"id"`
SourceURL string `json:"source_url" db:"source_url"`
SourceHash string `json:"source_hash" db:"source_hash"`
TotalBytes int64 `json:"total_bytes" db:"total_bytes"`
DownloadedBytes int64 `json:"downloaded_bytes" db:"downloaded_bytes"`
ProcessedBytes int64 `json:"processed_bytes" db:"processed_bytes"`
LastTarPosition int64 `json:"last_tar_position" db:"last_tar_position"`
LastXMLFile string `json:"last_xml_file" db:"last_xml_file"`
RecordsProcessed int64 `json:"records_processed" db:"records_processed"`
State State `json:"state" db:"state"`
StartedAt time.Time `json:"started_at" db:"started_at"`
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
CompletedAt *time.Time `json:"completed_at,omitempty" db:"completed_at"`
ErrorMessage string `json:"error_message,omitempty" db:"error_message"`
}
Progress represents the ingestion progress for a source
type ResumeInfo ¶
type ResumeInfo struct {
DownloadedBytes int64 `json:"downloaded_bytes"`
TarPosition int64 `json:"tar_position"`
LastFile string `json:"last_file"`
RecordsProcessed int64 `json:"records_processed"`
ProcessedFiles []string `json:"processed_files"`
}
ResumeInfo contains information needed to resume processing
type Statistics ¶
type Statistics struct {
TotalBytes int64 `json:"total_bytes"`
DownloadedBytes int64 `json:"downloaded_bytes"`
ProcessedBytes int64 `json:"processed_bytes"`
RecordsProcessed int64 `json:"records_processed"`
FilesProcessed int `json:"files_processed"`
Duration time.Duration `json:"duration"`
BytesPerSecond float64 `json:"bytes_per_second"`
RecordsPerSecond float64 `json:"records_per_second"`
PercentComplete float64 `json:"percent_complete"`
EstimatedTimeRemaining time.Duration `json:"estimated_time_remaining"`
}
Statistics contains ingestion statistics
type Tracker ¶
type Tracker struct {
// contains filtered or unexported fields
}
Tracker manages ingestion progress tracking
func NewTracker ¶
NewTracker creates a new progress tracker
func (*Tracker) CleanupOldProgress ¶
CleanupOldProgress removes old completed or failed progress records
func (*Tracker) GetProgress ¶
GetProgress retrieves progress for a source hash
func (*Tracker) GetResumeInfo ¶
func (t *Tracker) GetResumeInfo() (*ResumeInfo, error)
GetResumeInfo returns information needed to resume an ingestion
func (*Tracker) GetStatistics ¶
func (t *Tracker) GetStatistics() (*Statistics, error)
GetStatistics returns ingestion statistics
func (*Tracker) IsFileProcessed ¶
IsFileProcessed checks if a file has already been processed
func (*Tracker) MarkCompleted ¶
MarkCompleted marks the ingestion as completed
func (*Tracker) MarkFailed ¶
MarkFailed marks the ingestion as failed
func (*Tracker) RecordFileProcessed ¶
func (t *Tracker) RecordFileProcessed(fileName string, fileSize int64, recordsCount int, checksum string) error
RecordFileProcessed records that a file has been processed
func (*Tracker) SaveCheckpointFile ¶
SaveCheckpointFile saves a checkpoint to a JSON file for backup
func (*Tracker) SetCheckpointInterval ¶
SetCheckpointInterval sets the checkpoint interval
func (*Tracker) StartOrResume ¶
StartOrResume starts a new ingestion or resumes an existing one
func (*Tracker) UpdateDownloadProgress ¶
UpdateDownloadProgress updates download progress