chunk

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2026 License: GPL-3.0 Imports: 13 Imported by: 0

Documentation

Overview

Package chunk provides chunked upload functionality for large scan reports.

The chunk package enables splitting large reports into smaller chunks for gradual upload, with SQLite-based local storage for persistence and resumability.

Key components:

  • Config: Configuration for chunking behavior
  • Splitter: Algorithm to split reports into chunks
  • Storage: SQLite-based chunk storage
  • Manager: Main orchestration of chunking and upload

Example usage:

manager, err := chunk.NewManager(chunk.DefaultConfig())
if err != nil {
    log.Fatal(err)
}
defer manager.Close()

report, err := manager.SubmitReport(ctx, largeReport)
if err != nil {
    log.Fatal(err)
}

manager.Start(ctx) // Start background upload

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Chunk

type Chunk struct {
	ID               string      `json:"id"`
	ReportID         string      `json:"report_id"`
	ChunkIndex       int         `json:"chunk_index"`
	TotalChunks      int         `json:"total_chunks"`
	Data             []byte      `json:"-"` // Compressed data (not in JSON)
	UncompressedSize int         `json:"uncompressed_size"`
	CompressedSize   int         `json:"compressed_size"`
	Status           ChunkStatus `json:"status"`
	RetryCount       int         `json:"retry_count"`
	LastError        string      `json:"last_error,omitempty"`
	CreatedAt        time.Time   `json:"created_at"`
	UploadedAt       *time.Time  `json:"uploaded_at,omitempty"`
}

Chunk represents a single chunk of a report.

func (*Chunk) CanRetry

func (c *Chunk) CanRetry(maxRetries int) bool

CanRetry checks if there are chunks that can be retried.

type ChunkData

type ChunkData struct {
	ReportID    string               `json:"report_id"`
	ChunkIndex  int                  `json:"chunk_index"`
	TotalChunks int                  `json:"total_chunks"`
	Tool        *ctis.Tool           `json:"tool,omitempty"`
	Metadata    *ctis.ReportMetadata `json:"metadata,omitempty"`
	Assets      []ctis.Asset         `json:"assets,omitempty"`
	Findings    []ctis.Finding       `json:"findings,omitempty"`
	IsFinal     bool                 `json:"is_final"`
}

ChunkData represents the actual data in a chunk. This is what gets serialized, compressed, and uploaded.

type ChunkStatus

type ChunkStatus string

ChunkStatus represents the status of a single chunk.

const (
	// ChunkStatusPending indicates the chunk is waiting for upload.
	ChunkStatusPending ChunkStatus = "pending"

	// ChunkStatusUploading indicates the chunk is currently uploading.
	ChunkStatusUploading ChunkStatus = "uploading"

	// ChunkStatusCompleted indicates the chunk was successfully uploaded.
	ChunkStatusCompleted ChunkStatus = "completed"

	// ChunkStatusFailed indicates the chunk failed after all retries.
	ChunkStatusFailed ChunkStatus = "failed"
)

type Config

type Config struct {
	// Chunking thresholds - when to trigger chunking
	MinFindingsForChunking int // Minimum findings to trigger chunking (default: 2000)
	MinAssetsForChunking   int // Minimum assets to trigger chunking (default: 200)
	MinSizeForChunking     int // Minimum raw size in bytes to trigger chunking (default: 5MB)

	// Chunk size limits
	MaxFindingsPerChunk int // Max findings per chunk (default: 500)
	MaxAssetsPerChunk   int // Max assets per chunk (default: 100)
	MaxChunkSizeBytes   int // Max uncompressed size per chunk (default: 2MB)

	// Upload behavior
	UploadDelayMs        int // Delay between chunk uploads in ms (default: 100)
	MaxConcurrentUploads int // Max concurrent upload workers (default: 2)
	UploadTimeoutSeconds int // Timeout per chunk upload (default: 30)

	// Retry configuration
	MaxRetries     int // Max retries per chunk (default: 3)
	RetryBackoffMs int // Initial backoff between retries (default: 1000)

	// Storage configuration
	DatabasePath   string // SQLite database path (default: ~/.openctem/chunks.db)
	RetentionHours int    // How long to keep completed chunks (default: 24)
	MaxStorageMB   int    // Max storage for chunk DB (default: 500)

	// Compression (chunks are always compressed before storage)
	CompressionLevel int // gzip/zstd level 1-9 (default: 3)

	// Auto-cleanup configuration
	AutoCleanupOnUpload     bool // Delete chunk data immediately after successful upload (default: true)
	CleanupOnReportComplete bool // Delete all chunks when report completes (default: true)
	CleanupIntervalMinutes  int  // How often to run cleanup in minutes (default: 15)
	AggressiveCleanup       bool // Enable aggressive cleanup when storage exceeds MaxStorageMB (default: true)
}

Config configures chunking behavior.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns sensible defaults for most environments.

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager handles chunking, storage, and upload coordination.

func NewManager

func NewManager(cfg *Config) (*Manager, error)

NewManager creates a new chunk manager.

func (*Manager) Close

func (m *Manager) Close() error

Close releases resources.

func (*Manager) GetProgress

func (m *Manager) GetProgress(ctx context.Context, reportID string) (*Progress, error)

GetProgress returns upload progress for a report.

func (*Manager) GetStats

func (m *Manager) GetStats(ctx context.Context) (*StorageStats, error)

GetStats returns storage statistics.

func (*Manager) IsRunning

func (m *Manager) IsRunning() bool

IsRunning returns whether the manager is running.

func (*Manager) NeedsChunking

func (m *Manager) NeedsChunking(report *ctis.Report) bool

NeedsChunking checks if a report needs to be chunked.

func (*Manager) ProcessPending

func (m *Manager) ProcessPending(ctx context.Context) error

ProcessPending processes all pending chunks immediately (for testing).

func (*Manager) SetCallbacks

func (m *Manager) SetCallbacks(onProgress func(*Progress), onComplete func(string), onError func(string, error))

SetCallbacks sets the callback functions.

func (*Manager) SetUploader

func (m *Manager) SetUploader(uploader Uploader)

SetUploader configures the uploader.

func (*Manager) SetVerbose

func (m *Manager) SetVerbose(v bool)

SetVerbose enables verbose logging.

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

Start begins background upload processing.

func (*Manager) Stop

func (m *Manager) Stop()

Stop gracefully stops the upload process.

func (*Manager) SubmitReport

func (m *Manager) SubmitReport(ctx context.Context, report *ctis.Report) (*Report, error)

SubmitReport queues a report for chunked upload. Returns immediately after storing chunks to SQLite.

type Metadata

type Metadata struct {
	ToolName    string    `json:"tool_name,omitempty"`
	ToolVersion string    `json:"tool_version,omitempty"`
	ScanID      string    `json:"scan_id,omitempty"`
	AgentID     string    `json:"agent_id,omitempty"`
	StartedAt   time.Time `json:"started_at,omitempty"`
	FinishedAt  time.Time `json:"finished_at,omitempty"`
}

Metadata stores additional report context.

type Progress

type Progress struct {
	ReportID        string  `json:"report_id"`
	TotalChunks     int     `json:"total_chunks"`
	CompletedChunks int     `json:"completed_chunks"`
	FailedChunks    int     `json:"failed_chunks"`
	PendingChunks   int     `json:"pending_chunks"`
	PercentComplete float64 `json:"percent_complete"`
	BytesUploaded   int64   `json:"bytes_uploaded"`
	BytesTotal      int64   `json:"bytes_total"`
	Status          string  `json:"status"`
}

Progress represents upload progress for a report.

type Report

type Report struct {
	ID                    string       `json:"id"`
	OriginalFindingsCount int          `json:"original_findings_count"`
	OriginalAssetsCount   int          `json:"original_assets_count"`
	OriginalSize          int          `json:"original_size"`
	CompressedSize        int          `json:"compressed_size"`
	TotalChunks           int          `json:"total_chunks"`
	CompletedChunks       int          `json:"completed_chunks"`
	FailedChunks          int          `json:"failed_chunks"`
	Status                ReportStatus `json:"status"`
	CompressionAlgo       string       `json:"compression_algo"`
	CreatedAt             time.Time    `json:"created_at"`
	UpdatedAt             time.Time    `json:"updated_at"`
	CompletedAt           *time.Time   `json:"completed_at,omitempty"`
	Metadata              *Metadata    `json:"metadata,omitempty"`
}

Report represents a chunked report in the database.

func (*Report) CalculateProgress

func (r *Report) CalculateProgress() *Progress

CalculateProgress calculates progress from a report.

func (*Report) HasFailed

func (r *Report) HasFailed() bool

HasFailed checks if the report has failed.

func (*Report) IsComplete

func (r *Report) IsComplete() bool

IsComplete checks if all chunks have been uploaded.

type ReportStatus

type ReportStatus string

ReportStatus represents the status of a chunked report.

const (
	// ReportStatusPending indicates the report is queued for upload.
	ReportStatusPending ReportStatus = "pending"

	// ReportStatusUploading indicates chunks are being uploaded.
	ReportStatusUploading ReportStatus = "uploading"

	// ReportStatusCompleted indicates all chunks have been uploaded.
	ReportStatusCompleted ReportStatus = "completed"

	// ReportStatusFailed indicates the upload failed after all retries.
	ReportStatusFailed ReportStatus = "failed"
)

type Splitter

type Splitter struct {
	// contains filtered or unexported fields
}

Splitter handles report chunking logic.

func NewSplitter

func NewSplitter(cfg *Config) *Splitter

NewSplitter creates a new splitter with the given config.

func (*Splitter) EstimateChunks

func (s *Splitter) EstimateChunks(findingsCount, assetsCount int) int

EstimateChunks estimates the number of chunks without actually splitting.

func (*Splitter) NeedsChunking

func (s *Splitter) NeedsChunking(report *ctis.Report) bool

NeedsChunking determines if a report should be chunked.

func (*Splitter) Split

func (s *Splitter) Split(report *ctis.Report) ([]*ChunkData, error)

Split divides a report into chunks.

Algorithm: 1. If report is small enough, return single chunk 2. Group findings by their asset_ref 3. Create chunks that respect both findings and assets limits 4. First chunk always includes tool and metadata 5. Each chunk is self-contained with its assets and findings

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Storage provides SQLite-based chunk storage.

func NewStorage

func NewStorage(cfg *Config) (*Storage, error)

NewStorage creates a new chunk storage.

func (*Storage) Cleanup

func (s *Storage) Cleanup(ctx context.Context, maxAge time.Duration) (int64, error)

Cleanup removes old completed reports and their chunks.

func (*Storage) CleanupToSize

func (s *Storage) CleanupToSize(ctx context.Context, maxBytes int64) (int, error)

CleanupToSize removes oldest completed reports until storage is under limit. Returns the number of reports deleted.

func (*Storage) Close

func (s *Storage) Close() error

Close closes the storage.

func (*Storage) DeleteChunkData

func (s *Storage) DeleteChunkData(ctx context.Context, chunkID string) error

DeleteChunkData removes the data blob from a chunk (keeps metadata). This is used for immediate cleanup after successful upload.

func (*Storage) DeleteReportChunks

func (s *Storage) DeleteReportChunks(ctx context.Context, reportID string) (int, error)

DeleteReportChunks removes all chunk data for a report. Returns the number of chunks deleted.

func (*Storage) GetChunk

func (s *Storage) GetChunk(ctx context.Context, id string) (*Chunk, error)

GetChunk retrieves a chunk by ID.

func (*Storage) GetNextPendingChunk

func (s *Storage) GetNextPendingChunk(ctx context.Context) (*Chunk, error)

GetNextPendingChunk returns the next chunk ready for upload.

func (*Storage) GetPendingChunks

func (s *Storage) GetPendingChunks(ctx context.Context, reportID string, limit int) ([]*Chunk, error)

GetPendingChunks returns all pending chunks for a report.

func (*Storage) GetPendingReports

func (s *Storage) GetPendingReports(ctx context.Context) ([]*Report, error)

GetPendingReports returns all reports with pending chunks.

func (*Storage) GetReport

func (s *Storage) GetReport(ctx context.Context, id string) (*Report, error)

GetReport retrieves a report by ID.

func (*Storage) GetStorageStats

func (s *Storage) GetStorageStats(ctx context.Context) (*StorageStats, error)

GetStorageStats returns storage statistics.

func (*Storage) IncrementCompletedChunks

func (s *Storage) IncrementCompletedChunks(ctx context.Context, reportID string) error

IncrementCompletedChunks increments the completed chunk count.

func (*Storage) IncrementFailedChunks

func (s *Storage) IncrementFailedChunks(ctx context.Context, reportID string) error

IncrementFailedChunks increments the failed chunk count.

func (*Storage) SaveChunk

func (s *Storage) SaveChunk(ctx context.Context, chunk *Chunk) error

SaveChunk saves a chunk record.

func (*Storage) SaveReport

func (s *Storage) SaveReport(ctx context.Context, report *Report) error

SaveReport saves a report record.

func (*Storage) UpdateChunkStatus

func (s *Storage) UpdateChunkStatus(ctx context.Context, chunkID string, status ChunkStatus, lastError string) error

UpdateChunkStatus updates the status of a chunk.

func (*Storage) UpdateReportStatus

func (s *Storage) UpdateReportStatus(ctx context.Context, reportID string, status ReportStatus) error

UpdateReportStatus updates the status of a report.

type StorageStats

type StorageStats struct {
	TotalReports      int   `json:"total_reports"`
	PendingReports    int   `json:"pending_reports"`
	UploadingReports  int   `json:"uploading_reports"`
	CompletedReports  int   `json:"completed_reports"`
	FailedReports     int   `json:"failed_reports"`
	TotalChunks       int   `json:"total_chunks"`
	PendingChunks     int   `json:"pending_chunks"`
	CompletedChunks   int   `json:"completed_chunks"`
	FailedChunks      int   `json:"failed_chunks"`
	TotalStorageBytes int64 `json:"total_storage_bytes"`
}

StorageStats contains storage statistics.

type Uploader

type Uploader interface {
	// UploadChunk uploads a single chunk.
	UploadChunk(ctx context.Context, data *ChunkData) error
}

Uploader is the interface for uploading chunks.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL