Documentation
¶
Overview ¶
Package chunk provides chunked upload functionality for large scan reports.
The chunk package enables splitting large reports into smaller chunks for gradual upload, with SQLite-based local storage for persistence and resumability.
Key components:
- Config: Configuration for chunking behavior
- Splitter: Algorithm to split reports into chunks
- Storage: SQLite-based chunk storage
- Manager: Main orchestration of chunking and upload
Example usage:
manager, err := chunk.NewManager(chunk.DefaultConfig())
if err != nil {
log.Fatal(err)
}
defer manager.Close()
report, err := manager.SubmitReport(ctx, largeReport)
if err != nil {
log.Fatal(err)
}
manager.Start(ctx) // Start background upload
Index ¶
- type Chunk
- type ChunkData
- type ChunkStatus
- type Config
- type Manager
- func (m *Manager) Close() error
- func (m *Manager) GetProgress(ctx context.Context, reportID string) (*Progress, error)
- func (m *Manager) GetStats(ctx context.Context) (*StorageStats, error)
- func (m *Manager) IsRunning() bool
- func (m *Manager) NeedsChunking(report *ctis.Report) bool
- func (m *Manager) ProcessPending(ctx context.Context) error
- func (m *Manager) SetCallbacks(onProgress func(*Progress), onComplete func(string), ...)
- func (m *Manager) SetUploader(uploader Uploader)
- func (m *Manager) SetVerbose(v bool)
- func (m *Manager) Start(ctx context.Context) error
- func (m *Manager) Stop()
- func (m *Manager) SubmitReport(ctx context.Context, report *ctis.Report) (*Report, error)
- type Metadata
- type Progress
- type Report
- type ReportStatus
- type Splitter
- type Storage
- func (s *Storage) Cleanup(ctx context.Context, maxAge time.Duration) (int64, error)
- func (s *Storage) CleanupToSize(ctx context.Context, maxBytes int64) (int, error)
- func (s *Storage) Close() error
- func (s *Storage) DeleteChunkData(ctx context.Context, chunkID string) error
- func (s *Storage) DeleteReportChunks(ctx context.Context, reportID string) (int, error)
- func (s *Storage) GetChunk(ctx context.Context, id string) (*Chunk, error)
- func (s *Storage) GetNextPendingChunk(ctx context.Context) (*Chunk, error)
- func (s *Storage) GetPendingChunks(ctx context.Context, reportID string, limit int) ([]*Chunk, error)
- func (s *Storage) GetPendingReports(ctx context.Context) ([]*Report, error)
- func (s *Storage) GetReport(ctx context.Context, id string) (*Report, error)
- func (s *Storage) GetStorageStats(ctx context.Context) (*StorageStats, error)
- func (s *Storage) IncrementCompletedChunks(ctx context.Context, reportID string) error
- func (s *Storage) IncrementFailedChunks(ctx context.Context, reportID string) error
- func (s *Storage) SaveChunk(ctx context.Context, chunk *Chunk) error
- func (s *Storage) SaveReport(ctx context.Context, report *Report) error
- func (s *Storage) UpdateChunkStatus(ctx context.Context, chunkID string, status ChunkStatus, lastError string) error
- func (s *Storage) UpdateReportStatus(ctx context.Context, reportID string, status ReportStatus) error
- type StorageStats
- type Uploader
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Chunk ¶
type Chunk struct {
ID string `json:"id"`
ReportID string `json:"report_id"`
ChunkIndex int `json:"chunk_index"`
TotalChunks int `json:"total_chunks"`
Data []byte `json:"-"` // Compressed data (not in JSON)
UncompressedSize int `json:"uncompressed_size"`
CompressedSize int `json:"compressed_size"`
Status ChunkStatus `json:"status"`
RetryCount int `json:"retry_count"`
LastError string `json:"last_error,omitempty"`
CreatedAt time.Time `json:"created_at"`
UploadedAt *time.Time `json:"uploaded_at,omitempty"`
}
Chunk represents a single chunk of a report.
type ChunkData ¶
type ChunkData struct {
ReportID string `json:"report_id"`
ChunkIndex int `json:"chunk_index"`
TotalChunks int `json:"total_chunks"`
Tool *ctis.Tool `json:"tool,omitempty"`
Metadata *ctis.ReportMetadata `json:"metadata,omitempty"`
Assets []ctis.Asset `json:"assets,omitempty"`
Findings []ctis.Finding `json:"findings,omitempty"`
IsFinal bool `json:"is_final"`
}
ChunkData represents the actual data in a chunk. This is what gets serialized, compressed, and uploaded.
type ChunkStatus ¶
type ChunkStatus string
ChunkStatus represents the status of a single chunk.
const ( // ChunkStatusPending indicates the chunk is waiting for upload. ChunkStatusPending ChunkStatus = "pending" // ChunkStatusUploading indicates the chunk is currently uploading. ChunkStatusUploading ChunkStatus = "uploading" // ChunkStatusCompleted indicates the chunk was successfully uploaded. ChunkStatusCompleted ChunkStatus = "completed" // ChunkStatusFailed indicates the chunk failed after all retries. ChunkStatusFailed ChunkStatus = "failed" )
type Config ¶
type Config struct {
// Chunking thresholds - when to trigger chunking
MinFindingsForChunking int // Minimum findings to trigger chunking (default: 2000)
MinAssetsForChunking int // Minimum assets to trigger chunking (default: 200)
MinSizeForChunking int // Minimum raw size in bytes to trigger chunking (default: 5MB)
// Chunk size limits
MaxFindingsPerChunk int // Max findings per chunk (default: 500)
MaxAssetsPerChunk int // Max assets per chunk (default: 100)
MaxChunkSizeBytes int // Max uncompressed size per chunk (default: 2MB)
// Upload behavior
UploadDelayMs int // Delay between chunk uploads in ms (default: 100)
MaxConcurrentUploads int // Max concurrent upload workers (default: 2)
UploadTimeoutSeconds int // Timeout per chunk upload (default: 30)
// Retry configuration
MaxRetries int // Max retries per chunk (default: 3)
RetryBackoffMs int // Initial backoff between retries (default: 1000)
// Storage configuration
DatabasePath string // SQLite database path (default: ~/.openctem/chunks.db)
RetentionHours int // How long to keep completed chunks (default: 24)
MaxStorageMB int // Max storage for chunk DB (default: 500)
// Compression (chunks are always compressed before storage)
CompressionLevel int // gzip/zstd level 1-9 (default: 3)
// Auto-cleanup configuration
AutoCleanupOnUpload bool // Delete chunk data immediately after successful upload (default: true)
CleanupOnReportComplete bool // Delete all chunks when report completes (default: true)
CleanupIntervalMinutes int // How often to run cleanup in minutes (default: 15)
AggressiveCleanup bool // Enable aggressive cleanup when storage exceeds MaxStorageMB (default: true)
}
Config configures chunking behavior.
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns sensible defaults for most environments.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager handles chunking, storage, and upload coordination.
func NewManager ¶
NewManager creates a new chunk manager.
func (*Manager) GetProgress ¶
GetProgress returns upload progress for a report.
func (*Manager) GetStats ¶
func (m *Manager) GetStats(ctx context.Context) (*StorageStats, error)
GetStats returns storage statistics.
func (*Manager) NeedsChunking ¶
NeedsChunking checks if a report needs to be chunked.
func (*Manager) ProcessPending ¶
ProcessPending processes all pending chunks immediately (for testing).
func (*Manager) SetCallbacks ¶
func (m *Manager) SetCallbacks(onProgress func(*Progress), onComplete func(string), onError func(string, error))
SetCallbacks sets the callback functions.
func (*Manager) SetUploader ¶
SetUploader configures the uploader.
func (*Manager) SetVerbose ¶
SetVerbose enables verbose logging.
type Metadata ¶
type Metadata struct {
ToolName string `json:"tool_name,omitempty"`
ToolVersion string `json:"tool_version,omitempty"`
ScanID string `json:"scan_id,omitempty"`
AgentID string `json:"agent_id,omitempty"`
StartedAt time.Time `json:"started_at,omitempty"`
FinishedAt time.Time `json:"finished_at,omitempty"`
}
Metadata stores additional report context.
type Progress ¶
type Progress struct {
ReportID string `json:"report_id"`
TotalChunks int `json:"total_chunks"`
CompletedChunks int `json:"completed_chunks"`
FailedChunks int `json:"failed_chunks"`
PendingChunks int `json:"pending_chunks"`
PercentComplete float64 `json:"percent_complete"`
BytesUploaded int64 `json:"bytes_uploaded"`
BytesTotal int64 `json:"bytes_total"`
Status string `json:"status"`
}
Progress represents upload progress for a report.
type Report ¶
type Report struct {
ID string `json:"id"`
OriginalFindingsCount int `json:"original_findings_count"`
OriginalAssetsCount int `json:"original_assets_count"`
OriginalSize int `json:"original_size"`
CompressedSize int `json:"compressed_size"`
TotalChunks int `json:"total_chunks"`
CompletedChunks int `json:"completed_chunks"`
FailedChunks int `json:"failed_chunks"`
Status ReportStatus `json:"status"`
CompressionAlgo string `json:"compression_algo"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
Metadata *Metadata `json:"metadata,omitempty"`
}
Report represents a chunked report in the database.
func (*Report) CalculateProgress ¶
CalculateProgress calculates progress from a report.
func (*Report) IsComplete ¶
IsComplete checks if all chunks have been uploaded.
type ReportStatus ¶
type ReportStatus string
ReportStatus represents the status of a chunked report.
const ( // ReportStatusPending indicates the report is queued for upload. ReportStatusPending ReportStatus = "pending" // ReportStatusUploading indicates chunks are being uploaded. ReportStatusUploading ReportStatus = "uploading" // ReportStatusCompleted indicates all chunks have been uploaded. ReportStatusCompleted ReportStatus = "completed" // ReportStatusFailed indicates the upload failed after all retries. ReportStatusFailed ReportStatus = "failed" )
type Splitter ¶
type Splitter struct {
// contains filtered or unexported fields
}
Splitter handles report chunking logic.
func NewSplitter ¶
NewSplitter creates a new splitter with the given config.
func (*Splitter) EstimateChunks ¶
EstimateChunks estimates the number of chunks without actually splitting.
func (*Splitter) NeedsChunking ¶
NeedsChunking determines if a report should be chunked.
func (*Splitter) Split ¶
Split divides a report into chunks.
Algorithm: 1. If report is small enough, return single chunk 2. Group findings by their asset_ref 3. Create chunks that respect both findings and assets limits 4. First chunk always includes tool and metadata 5. Each chunk is self-contained with its assets and findings
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
Storage provides SQLite-based chunk storage.
func NewStorage ¶
NewStorage creates a new chunk storage.
func (*Storage) CleanupToSize ¶
CleanupToSize removes oldest completed reports until storage is under limit. Returns the number of reports deleted.
func (*Storage) DeleteChunkData ¶
DeleteChunkData removes the data blob from a chunk (keeps metadata). This is used for immediate cleanup after successful upload.
func (*Storage) DeleteReportChunks ¶
DeleteReportChunks removes all chunk data for a report. Returns the number of chunks deleted.
func (*Storage) GetNextPendingChunk ¶
GetNextPendingChunk returns the next chunk ready for upload.
func (*Storage) GetPendingChunks ¶
func (s *Storage) GetPendingChunks(ctx context.Context, reportID string, limit int) ([]*Chunk, error)
GetPendingChunks returns all pending chunks for a report.
func (*Storage) GetPendingReports ¶
GetPendingReports returns all reports with pending chunks.
func (*Storage) GetStorageStats ¶
func (s *Storage) GetStorageStats(ctx context.Context) (*StorageStats, error)
GetStorageStats returns storage statistics.
func (*Storage) IncrementCompletedChunks ¶
IncrementCompletedChunks increments the completed chunk count.
func (*Storage) IncrementFailedChunks ¶
IncrementFailedChunks increments the failed chunk count.
func (*Storage) SaveReport ¶
SaveReport saves a report record.
func (*Storage) UpdateChunkStatus ¶
func (s *Storage) UpdateChunkStatus(ctx context.Context, chunkID string, status ChunkStatus, lastError string) error
UpdateChunkStatus updates the status of a chunk.
func (*Storage) UpdateReportStatus ¶
func (s *Storage) UpdateReportStatus(ctx context.Context, reportID string, status ReportStatus) error
UpdateReportStatus updates the status of a report.
type StorageStats ¶
type StorageStats struct {
TotalReports int `json:"total_reports"`
PendingReports int `json:"pending_reports"`
UploadingReports int `json:"uploading_reports"`
CompletedReports int `json:"completed_reports"`
FailedReports int `json:"failed_reports"`
TotalChunks int `json:"total_chunks"`
PendingChunks int `json:"pending_chunks"`
CompletedChunks int `json:"completed_chunks"`
FailedChunks int `json:"failed_chunks"`
TotalStorageBytes int64 `json:"total_storage_bytes"`
}
StorageStats contains storage statistics.