Documentation
¶
Overview ¶
core.go - Central application structure and orchestration. Contains Core struct that holds Storage, ForwarderStorage, ForwarderManager, and Receiver. Also provides HTTP stats endpoint handler and simpleStatsProvider for non-forwarder mode.
forwarderAnnounce.go - Unified announce execution logic for forwarders. Routes to HTTP or UDP protocol handlers, processes results, handles retries. Key functions: executeAnnounce(), doHTTPAnnounce(), doUDPAnnounce(), handleAnnounceResult() Also handles event forwarding (ForwardStoppedEvent, ForwardCompletedEvent).
forwarderManager.go - Core forwarder orchestration and worker pool management. Manages job queue, worker scaling, rate limiting, throttling, and forwarder lifecycle. Handles job scheduling, deduplication, and forwarder enable/disable/suspend logic.
forwarderStats.go - Statistics collection and reporting for forwarders. Implements StatsDataProvider interface for observability. Records response times (EMA), tracks per-hash peer stats, client stats. Key function: GetHashPeerStats() returns seeders/leechers per hash.
forwarderStorage.go - Storage for forwarder state and cached peer responses. Tracks per-hash, per-forwarder: peers, intervals, and NextAnnounce times. Structure: map[InfoHash]map[ForwarderName]ForwarderPeerEntry
receiverAnnounce.go - HTTP announce request handler. Processes BitTorrent announce requests, manages peer storage updates, triggers forwarder announces, and builds responses with aggregated peers. Key function: getPeersForResponse() collects peers and counts seeders/leechers.
receiverUDP.go - UDP tracker server implementation (BEP 15). Handles connect, announce, and scrape requests over UDP. Manages connection state with TempStorage for connection ID validation. Supports both IPv4 and IPv6 peer formats.
scrape.go - HTTP and UDP scrape request handler (BEP 48). Returns statistics (complete/incomplete/downloaded) for requested info_hashes. Aggregates counts from both local storage and forwarder storage.
storage.go - Thread-safe in-memory peer storage. Structure: map[InfoHash]map[PeerID]Request with mutex protection. Includes background purgeRoutine that removes stale peers based on Config.Age.
udpForwarder.go - UDP tracker client for forwarding to external UDP trackers (BEP 15). Manages connection IDs with 2-minute lifetime, handles connect/announce protocol, supports IPv4/IPv6 peer formats, and implements retry with exponential backoff.
Index ¶
- Constants
- Variables
- type AnnounceJob
- type AnnounceResult
- type Core
- type DisabledForwarder
- type ForwarderManager
- func (fm *ForwarderManager) CancelPendingJobs(infoHash common.InfoHash, peerID common.PeerID)
- func (fm *ForwarderManager) ForwardCompletedEvent(infoHash common.InfoHash, peerID common.PeerID, request tracker.Request)
- func (fm *ForwarderManager) ForwardStoppedEvent(infoHash common.InfoHash, peerID common.PeerID, request tracker.Request)
- func (fm *ForwarderManager) QueueEligibleAnnounces(infoHash common.InfoHash, request tracker.Request)
- func (fm *ForwarderManager) Start()
- func (fm *ForwarderManager) Stop()
- type ForwarderPeerEntry
- type ForwarderStats
- type ForwarderStorage
- func (fs *ForwarderStorage) Cleanup(infoHash common.InfoHash)
- func (fs *ForwarderStorage) GetAllPeers(infoHash common.InfoHash) []common.Peer
- func (fs *ForwarderStorage) GetNextAnnounceJobs(now time.Time) []AnnounceJob
- func (fs *ForwarderStorage) HasInfoHash(infoHash common.InfoHash) bool
- func (fs *ForwarderStorage) MarkAnnounced(infoHash common.InfoHash, forwarderName string)
- func (fs *ForwarderStorage) ShouldAnnounceNow(infoHash common.InfoHash, forwarderName string, now time.Time) bool
- func (fs *ForwarderStorage) UpdatePeers(infoHash common.InfoHash, forwarderName string, peers []common.Peer, ...)
- type Receiver
- type ReceiverAnnounce
- type ReceiverUDP
- type ScrapeResponse
- type ScrapeResponseHash
- type ScrapeStats
- type Storage
- type TempStorage
- type UDPAnnounceRequest
- type UDPAnnounceResponse
- type UDPConnectRequest
- type UDPConnectResponse
- type UDPForwarder
- type UDPPeer
- type UDPScrapeData
- type UDPScrapeRequest
- type UDPScrapeResponse
Constants ¶
const ( EventStopped = "stopped" EventCompleted = "completed" )
const ( UDPActionConnect = 0 UDPActionAnnounce = 1 UDPActionScrape = 2 UDPActionError = 3 UDPConnectMagic = 0x41727101980 // Magic constant for connect request )
Variables ¶
var ( DebugLogFwd = log.New(os.Stdout, `debug#`, log.Lshortfile) ErrorLogFwd = log.New(os.Stderr, `error#`, log.Lshortfile) )
var ( DebugLogAnnounce = log.New(os.Stdout, `debug#`, log.Lshortfile) ErrorLogAnnounce = log.New(os.Stderr, `error#`, log.Lshortfile) )
var ( DebugLogUDP = log.New(os.Stdout, `debug#`, log.Lshortfile) ErrorLogUDP = log.New(os.Stderr, `error#`, log.Lshortfile) )
var ( DebugLogScrape = log.New(os.Stdout, `debug#`, log.Lshortfile) ErrorLogScrape = log.New(os.Stderr, `error#`, log.Lshortfile) )
var ( ErrNoInfohashes = errors.New(`no infohashes found`) ErrBadInfohash = errors.New(`bad infohash`) )
var ( DebugLogUDPFwd = log.New(os.Stdout, `debug#`, log.Lshortfile) ErrorLogUDPFwd = log.New(os.Stderr, `error#`, log.Lshortfile) )
var DebugLog = log.New(os.Stdout, `debug#`, log.Lshortfile)
var ErrorLog = log.New(os.Stderr, `error#`, log.Lshortfile)
Functions ¶
This section is empty.
Types ¶
type AnnounceJob ¶
type AnnounceResult ¶
type AnnounceResult struct {
Success bool
Peers []common.Peer
Interval int
ResponseSize int
Duration time.Duration
Error error
NonRetryable bool // If true, error is non-retryable (disable forwarder)
Suspend bool // If true, forwarder should be suspended (e.g., 429)
}
AnnounceResult represents the outcome of an announce attempt
type Core ¶
type Core struct {
Config *config.Config
Storage *Storage
ForwarderStorage *ForwarderStorage
ForwarderManager *ForwarderManager
Receiver *Receiver
}
func (*Core) HTTPScrapeHandler ¶
func (core *Core) HTTPScrapeHandler(w http.ResponseWriter, r *http.Request)
func (*Core) HTTPStatsHandler ¶
func (core *Core) HTTPStatsHandler(w http.ResponseWriter, r *http.Request)
HTTPStatsHandler handles /stats HTTP requests
type DisabledForwarder ¶
DisabledForwarder stores info about a disabled forwarder
type ForwarderManager ¶
type ForwarderManager struct {
Config *config.Config
Storage *ForwarderStorage
MainStorage *Storage
Forwarders []CoreCommon.Forward
Workers int
Prometheus *observability.Prometheus
TempStorage *TempStorage
// contains filtered or unexported fields
}
ForwarderManager manages forwarding announces to external trackers
func NewForwarderManager ¶
func NewForwarderManager(cfg *config.Config, storage *ForwarderStorage, mainStorage *Storage, prom *observability.Prometheus, tempStorage *TempStorage) *ForwarderManager
NewForwarderManager creates a new ForwarderManager
func (*ForwarderManager) CancelPendingJobs ¶
func (fm *ForwarderManager) CancelPendingJobs(infoHash common.InfoHash, peerID common.PeerID)
CancelPendingJobs cancels any pending announce jobs for a specific peer
func (*ForwarderManager) ForwardCompletedEvent ¶
func (fm *ForwarderManager) ForwardCompletedEvent(infoHash common.InfoHash, peerID common.PeerID, request tracker.Request)
ForwardCompletedEvent forwards a completed event to all forwarders
func (*ForwarderManager) ForwardStoppedEvent ¶
func (fm *ForwarderManager) ForwardStoppedEvent(infoHash common.InfoHash, peerID common.PeerID, request tracker.Request)
ForwardStoppedEvent forwards a stopped event to all forwarders
func (*ForwarderManager) QueueEligibleAnnounces ¶
func (fm *ForwarderManager) QueueEligibleAnnounces(infoHash common.InfoHash, request tracker.Request)
QueueEligibleAnnounces enqueues announce jobs for forwarders that are ready
func (*ForwarderManager) Start ¶
func (fm *ForwarderManager) Start()
func (*ForwarderManager) Stop ¶
func (fm *ForwarderManager) Stop()
type ForwarderPeerEntry ¶
type ForwarderPeerEntry struct {
Peers []common.Peer
Interval int // Last reported interval in seconds from tracker response (per hash+forwarder)
LastUpdate time.Time
NextAnnounce time.Time // Calculated as LastUpdate + Interval
}
ForwarderPeerEntry stores the last response from a forwarder for a specific hash. Interval is the last reported interval from the tracker for this hash+forwarder combination.
type ForwarderStats ¶
type ForwarderStats struct {
AvgResponseTime time.Duration
LastInterval int
LastAnnounceTime time.Time
SampleCount int
// contains filtered or unexported fields
}
ForwarderStats tracks statistics per forwarder (aggregated across all hashes).
type ForwarderStorage ¶
type ForwarderStorage struct {
// map[InfoHash]map[ForwarderName]ForwarderPeerEntry
Entries map[common.InfoHash]map[string]ForwarderPeerEntry
// contains filtered or unexported fields
}
ForwarderStorage stores forwarder responses and intervals.
Interval Tracking Structure: - Intervals are tracked PER-HASH, PER-FORWARDER - Structure: map[InfoHash]map[ForwarderName]ForwarderPeerEntry - Each hash maintains separate intervals for each forwarder - When a forwarder responds for a hash, only that hash's interval for that forwarder is updated - Different hashes can have different intervals for the same forwarder
Example:
Hash1 -> ForwarderA: interval=30s, ForwarderB: interval=60s Hash2 -> ForwarderA: interval=45s, ForwarderB: interval=60s Hash3 -> ForwarderA: interval=30s, ForwarderB: interval=90s
This allows each hash to respect each forwarder's specific interval requirements.
func NewForwarderStorage ¶
func NewForwarderStorage() *ForwarderStorage
func (*ForwarderStorage) Cleanup ¶
func (fs *ForwarderStorage) Cleanup(infoHash common.InfoHash)
func (*ForwarderStorage) GetAllPeers ¶
func (fs *ForwarderStorage) GetAllPeers(infoHash common.InfoHash) []common.Peer
func (*ForwarderStorage) GetNextAnnounceJobs ¶
func (fs *ForwarderStorage) GetNextAnnounceJobs(now time.Time) []AnnounceJob
func (*ForwarderStorage) HasInfoHash ¶
func (fs *ForwarderStorage) HasInfoHash(infoHash common.InfoHash) bool
func (*ForwarderStorage) MarkAnnounced ¶
func (fs *ForwarderStorage) MarkAnnounced(infoHash common.InfoHash, forwarderName string)
func (*ForwarderStorage) ShouldAnnounceNow ¶
func (fs *ForwarderStorage) ShouldAnnounceNow(infoHash common.InfoHash, forwarderName string, now time.Time) bool
ShouldAnnounceNow returns true when the tracker has never been contacted for the hash or when the stored NextAnnounce is due.
func (*ForwarderStorage) UpdatePeers ¶
type Receiver ¶
type Receiver struct {
Announce *ReceiverAnnounce
UDP *ReceiverUDP
}
func NewReceiver ¶
func NewReceiver(cfg *config.Config, storage *Storage, forwarderStorage *ForwarderStorage, forwarderManager *ForwarderManager) *Receiver
type ReceiverAnnounce ¶
type ReceiverAnnounce struct {
Config *config.Config
Storage *Storage
ForwarderStorage *ForwarderStorage
ForwarderManager *ForwarderManager
Prometheus *observability.Prometheus
TempStorage *TempStorage
}
func NewReceiverAnnounce ¶
func NewReceiverAnnounce(cfg *config.Config, storage *Storage, forwarderStorage *ForwarderStorage, forwarderManager *ForwarderManager) *ReceiverAnnounce
func (*ReceiverAnnounce) HTTPHandler ¶
func (ra *ReceiverAnnounce) HTTPHandler(w http.ResponseWriter, r *http.Request)
func (*ReceiverAnnounce) ProcessAnnounce ¶
func (ra *ReceiverAnnounce) ProcessAnnounce(remoteAddr, infoHash, peerID, port, uploaded, downloaded, left, ip, numwant, event, userAgent, compactFlag, noPeerIDFlag string) (*Response.Response, string)
type ReceiverUDP ¶
type ReceiverUDP struct {
Config *config.Config
Storage *Storage
ForwarderStorage *ForwarderStorage
ForwarderManager *ForwarderManager
Prometheus *observability.Prometheus
TempStorage *TempStorage
// contains filtered or unexported fields
}
func NewReceiverUDP ¶
func NewReceiverUDP(cfg *config.Config, storage *Storage, forwarderStorage *ForwarderStorage, forwarderManager *ForwarderManager) *ReceiverUDP
func (*ReceiverUDP) Close ¶
func (ru *ReceiverUDP) Close() error
func (*ReceiverUDP) ProcessAnnounce ¶
func (ru *ReceiverUDP) ProcessAnnounce(remoteAddr, infoHash, peerID, port, uploaded, downloaded, left, ip, numwant, event, userAgent, compactFlag, noPeerIDFlag string) (*Response.Response, string)
func (*ReceiverUDP) Start ¶
func (ru *ReceiverUDP) Start(listenAddr string) error
type ScrapeResponse ¶
type ScrapeResponse struct {
Files map[string]ScrapeResponseHash `bencode:"files"`
}
ScrapeResponse is the HTTP bencoded response format
type ScrapeResponseHash ¶
type ScrapeResponseHash struct {
Complete int `bencode:"complete"`
Incomplete int `bencode:"incomplete"`
Downloaded int `bencode:"downloaded"`
}
ScrapeResponseHash is the HTTP bencoded response format
type ScrapeStats ¶
type ScrapeStats struct {
Complete int // Number of seeders
Incomplete int // Number of leechers
Downloaded int // Number of completed downloads (not accurately tracked)
}
ScrapeStats holds statistics for a single info hash
type Storage ¶
type Storage struct {
Config *config.Config
Requests map[common.InfoHash]map[common.PeerID]tracker.Request
// contains filtered or unexported fields
}
func NewStorage ¶
type TempStorage ¶
type TempStorage struct {
// contains filtered or unexported fields
}
func NewTempStorage ¶
func NewTempStorage(_path string) (*TempStorage, error)
func (*TempStorage) SaveBencodeFromForwarder ¶
func (ts *TempStorage) SaveBencodeFromForwarder(p []byte, hash string, uri string) string
type UDPAnnounceRequest ¶
type UDPAnnounceResponse ¶
type UDPConnectRequest ¶
type UDPConnectResponse ¶
type UDPForwarder ¶
type UDPForwarder struct {
// contains filtered or unexported fields
}
UDPForwarder manages UDP connections to external trackers
func NewUDPForwarder ¶
func NewUDPForwarder(debug bool, timeout int, retries int, retryBaseMs int) *UDPForwarder
NewUDPForwarder creates a new UDP forwarder client
func (*UDPForwarder) Announce ¶
func (uf *UDPForwarder) Announce(forwarder CoreCommon.Forward, request tracker.Request) (*Response.Response, int, error)
Announce sends an announce request to the UDP tracker
func (*UDPForwarder) Stop ¶
func (uf *UDPForwarder) Stop()
Stop stops the UDP forwarder and cleans up resources
type UDPScrapeData ¶
type UDPScrapeRequest ¶
type UDPScrapeResponse ¶
type UDPScrapeResponse struct {
Action uint32
TransactionID uint32
ScrapeData []UDPScrapeData
}