concurrent

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ActiveTask

type ActiveTask struct {
	Task          types.Task
	CurrentOffset atomic.Int64
	StopAt        atomic.Int64

	// Health monitoring fields
	LastActivity atomic.Int64       // Unix nano timestamp of last data received
	Speed        float64            // EMA-smoothed speed in bytes/sec (protected by mutex)
	StartTime    time.Time          // When this task started
	Cancel       context.CancelFunc // Cancel function to abort this task
	SpeedMu      sync.Mutex         // Protects Speed field

	// Sliding window for recent speed tracking
	WindowStart time.Time    // When current measurement window started
	WindowBytes atomic.Int64 // Bytes downloaded in current window

	// Hedged request tracking
	Hedged          atomic.Int32  // 1 if an idle worker is already racing this task
	SharedMaxOffset *atomic.Int64 // Highest offset reached by any racing worker
}

ActiveTask tracks a task currently being processed by a worker

func (*ActiveTask) GetSpeed

func (at *ActiveTask) GetSpeed() float64

GetSpeed returns the current EMA-smoothed speed, decaying if stalled

func (*ActiveTask) RemainingBytes

func (at *ActiveTask) RemainingBytes() int64

RemainingBytes returns the number of bytes left for this task

func (*ActiveTask) RemainingTask

func (at *ActiveTask) RemainingTask() *types.Task

RemainingTask returns a Task representing the remaining work, or nil if complete

type ConcurrentDownloader

type ConcurrentDownloader struct {
	ProgressChan chan<- any           // Channel for events (start/complete/error)
	ID           string               // Download ID
	State        *types.ProgressState // Shared state for TUI polling

	URL      string // For pause/resume
	DestPath string // For pause/resume
	Runtime  *types.RuntimeConfig

	Headers map[string]string // Custom HTTP headers from browser (cookies, auth, etc.)
	// contains filtered or unexported fields
}

ConcurrentDownloader handles multi-connection downloads

func NewConcurrentDownloader

func NewConcurrentDownloader(id string, progressCh chan<- any, progState *types.ProgressState, runtime *types.RuntimeConfig) *ConcurrentDownloader

NewConcurrentDownloader creates a new concurrent downloader with all required parameters

func (*ConcurrentDownloader) Download

func (d *ConcurrentDownloader) Download(ctx context.Context, rawurl string, candidateMirrors []string, activeMirrors []string, destPath string, fileSize int64) error

Download downloads a file using multiple concurrent connections Uses pre-probed metadata (file size already known)

func (*ConcurrentDownloader) HedgeWork

func (d *ConcurrentDownloader) HedgeWork(queue *TaskQueue) bool

HedgeWork creates a duplicate task when stealing isn't possible (chunks too small). An idle worker picks up the duplicate and races the original on a fresh HTTP connection. Both workers write identical data to the same file offsets (WriteAt is idempotent), so the file is always correct. Whichever finishes first wins; the other exits naturally when the queue closes or its next read returns data already counted.

func (*ConcurrentDownloader) ReportMirrorError

func (d *ConcurrentDownloader) ReportMirrorError(url string)

ReportMirrorError marks a mirror as having an error in the state

func (*ConcurrentDownloader) StealWork

func (d *ConcurrentDownloader) StealWork(queue *TaskQueue) bool

StealWork tries to split an active task from a busy worker It greedily targets the worker with the MOST remaining work.

type TaskQueue

type TaskQueue struct {
	// contains filtered or unexported fields
}

TaskQueue is a thread-safe work-stealing queue

func NewTaskQueue

func NewTaskQueue() *TaskQueue

func (*TaskQueue) Close

func (q *TaskQueue) Close()

func (*TaskQueue) DrainRemaining

func (q *TaskQueue) DrainRemaining() []types.Task

DrainRemaining returns all remaining tasks in the queue (used for pause/resume)

func (*TaskQueue) IdleWorkers

func (q *TaskQueue) IdleWorkers() int64

func (*TaskQueue) Len

func (q *TaskQueue) Len() int

func (*TaskQueue) Pop

func (q *TaskQueue) Pop() (types.Task, bool)

func (*TaskQueue) Push

func (q *TaskQueue) Push(t types.Task)

func (*TaskQueue) PushMultiple

func (q *TaskQueue) PushMultiple(tasks []types.Task)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL