Documentation
¶
Overview ¶
Package processor in this file imports RockSolid Light legacy configurations
Index ¶
- Constants
- Variables
- func AnalyzeModeStandalone(host *string, port *int, username *string, password *string, ssl *bool, ...) error
- func CheckMessageIdFormat(messageID string) bool
- func ComputeMessageIDHash(messageID string) string
- func GetAnalysisCSVHeader() string
- func GetHostname() string
- func IsValidGroupName(name string) bool
- func ParseNNTPDate(dateStr string) time.Time
- func SetHostname(hostname string, db *database.Database) error
- type AnalyzeOptions
- type ArticleSizeStats
- type BatchItem
- type BatchQueue
- type BridgeConfig
- type BridgeManager
- type Counter
- type DateRangeResult
- type GroupAnalysis
- type GroupBatch
- type GroupsEntry
- type LegacyArticle
- type LegacyImporter
- func (leg *LegacyImporter) Close() error
- func (leg *LegacyImporter) GetSectionsSummary() error
- func (leg *LegacyImporter) ImportAllSQLiteDatabases(sqliteDir string, threads int) error
- func (leg *LegacyImporter) ImportSQLiteArticles(sqlitePath string) error
- func (leg *LegacyImporter) ImportSections() error
- func (leg *LegacyImporter) QuickOpenToGetNewsgroup(sqlitePath string) (string, error)
- type LegacyThread
- type MenuConfEntry
- type PostQueueWorker
- type Processor
- func (proc *Processor) AddProcessedArticleToHistory(msgIdItem *history.MessageIdItem, newsgroupPtr *string, articleNumber int64)
- func (proc *Processor) AnalyzeGroup(groupName string, options *AnalyzeOptions) (*GroupAnalysis, error)
- func (proc *Processor) AnalyzeMode(testGrp string, forceRefresh bool, maxAnalyzeArticles int64, startDate string, ...) (*GroupAnalysis, error)
- func (proc *Processor) CheckNoMoreWorkInHistory() bool
- func (proc *Processor) ClearCache(groupName string) error
- func (proc *Processor) Close() error
- func (proc *Processor) DisableBridges()
- func (proc *Processor) DownloadArticles(newsgroup string, DLParChan chan struct{}, progressDB *database.ProgressDB, ...) error
- func (proc *Processor) DownloadArticlesFromDate(groupName string, startDate time.Time, DLParChan chan struct{}, ...) error
- func (proc *Processor) EnableBridges(config *BridgeConfig)
- func (proc *Processor) FindArticlesByDateRange(groupName string, startDate, endDate time.Time) (*DateRangeResult, error)
- func (proc *Processor) FindStartArticleByDate(groupName string, targetDate time.Time, groupInfo *nntp.GroupInfo) (int64, error)
- func (proc *Processor) FindThreadRootInCache(newsgroupPtr *string, refs []string) *database.MsgIdTmpCacheItem
- func (proc *Processor) ForceCloseGroupDBs(groupsDB *database.GroupDBs) error
- func (proc *Processor) GetArticleCountByDateRange(groupName string, startDate, endDate time.Time) (int64, error)
- func (proc *Processor) GetCacheStats(groupName string) (*GroupAnalysis, error)
- func (proc *Processor) GetCachedMessageIDs(groupName string, startArticle, endArticle int64) ([]*nntp.HeaderLine, error)
- func (proc *Processor) GetHistoryStats() history.HistoryStats
- func (proc *Processor) GetXHDR(groupName string, header string, start, end int64) ([]*nntp.HeaderLine, error)
- func (proc *Processor) ImportOverview(groupName string) error
- func (proc *Processor) IsNewsGroupInSectionsDB(name *string) bool
- func (proc *Processor) Lookup(msgIdItem *history.MessageIdItem) (int, error)
- func (proc *Processor) MsgIdExists(group *string, messageID string) bool
- func (processor *Processor) NewPostQueueWorker() *PostQueueWorker
- func (proc *Processor) ProcessIncomingArticle(article *models.Article) (int, error)
- func (proc *Processor) ValidateCacheIntegrity(groupName string) error
- func (proc *Processor) WaitForBatchCompletion()
- func (proc *Processor) WantShutdown(shutdownChan <-chan struct{}) bool
Constants ¶
const (
AnalyzeBatchSize = 10000 // Process overview in batches of 10k articles
)
const DebugProcessorHeaders bool = false // flag for legacy header processing
Variables ¶
var ( // these list of ' var ' can be set after importing the lib before starting!! MaxCrossPosts int = 15 // HARDCODED Maximum number of crossposts to allow per article LocalNNTPHostname string = "" // Hostname must be set before processing articles // MaxBatch defines the maximum number of articles to fetch in a single batch MaxBatchSize int64 = 128 // UseStrictGroupValidation for group names, false allows upper-case in group names UseStrictGroupValidation = true // RunRSLIGHTImport is used to indicate if the importer should run the legacy RockSolid Light importer RunRSLIGHTImport = false // Global Batch Queue (proc_DLArt.go) Batch = &BatchQueue{ Check: make(chan *string), TodoQ: make(chan *nntp.GroupInfo), GetQ: make(chan *BatchItem), GroupQueues: make(map[string]*GroupBatch), } )
var ErrIsEmptyGroup = fmt.Errorf("isEmptyGroup")
var ErrUpToDate = fmt.Errorf("up2date")
var GroupCounter = &Counter{ Map: make(map[string]int64), }
var InvalidDates = make(map[string]string)
var InvalidMutex = &sync.Mutex{}
var LOOPS_PER_GROUPS = 1
var NNTPDateLayouts = []string{}/* 335 elements not displayed */
Functions ¶
func AnalyzeModeStandalone ¶
func AnalyzeModeStandalone(host *string, port *int, username *string, password *string, ssl *bool, timeout *int, testGrp *string, forceRefresh *bool, maxAnalyzeArticles *int64, startDate *string, endDate *string, exportFormat *string, validateCache *bool, clearCache *bool, cacheStats *bool) error
AnalyzeModeStandalone is a wrapper for backward compatibility that creates its own pool
func CheckMessageIdFormat ¶
func ComputeMessageIDHash ¶
ComputeMessageIDHash computes MD5 hash of a message-ID
func GetAnalysisCSVHeader ¶
func GetAnalysisCSVHeader() string
GetCSVHeader returns the CSV header for analysis exports
func IsValidGroupName ¶
IsValidGroupName validates a newsgroup name according to RFC standards Returns true if the group name is valid (lowercase, alphanumeric components separated by dots)
func ParseNNTPDate ¶
parseNNTPDate parses an NNTP date string to time.Time, handling common NNTP quirks.
func SetHostname ¶
SetHostname sets and validates the hostname for NNTP operations This must be called before creating a new processor If hostname is empty, it will attempt to retrieve it from the database If hostname is provided, it will be saved to the database and used
Types ¶
type AnalyzeOptions ¶
type AnalyzeOptions struct {
ForceRefresh bool // Force refresh even if cache exists
StartDate time.Time // Only analyze articles after this date
EndDate time.Time // Only analyze articles before this date
MaxArticles int64 // Maximum number of articles to analyze
IncludeHeaders bool // Include subject/from in cache for detailed analysis
}
AnalyzeOptions contains options for group analysis
type ArticleSizeStats ¶
type ArticleSizeStats struct {
Under4K int64 // < 4KB
Size4to16K int64 // 4KB - 16KB
Size16to32K int64 // 16KB - 32KB
Size32to64K int64 // 32KB - 64KB
Size64to128K int64 // 64KB - 128KB
Size128to256K int64 // 128KB - 256KB
Size256to512K int64 // 256KB - 512KB
Over512K int64 // > 512KB
TotalBytes int64 // Total bytes across all articles
TotalCount int64 // Total article count
}
ArticleSizeStats tracks distribution of article sizes
func (*ArticleSizeStats) AddArticleSize ¶
func (stats *ArticleSizeStats) AddArticleSize(bytes int64)
AddArticleSize adds an article size to the statistics
func (*ArticleSizeStats) PrintSizeDistribution ¶
func (stats *ArticleSizeStats) PrintSizeDistribution()
PrintSizeDistribution prints a detailed breakdown of article sizes
type BatchQueue ¶
type BatchQueue struct {
Mutex sync.RWMutex
GetQ chan *BatchItem // Channel to get articles for processing
Check chan *string // Channel to check newsgroups
TodoQ chan *nntp.GroupInfo // Channel to do newsgroups
GroupQueues map[string]*GroupBatch // Per-newsgroup queues
}
func (*BatchQueue) GetOrCreateGroupBatch ¶
func (bq *BatchQueue) GetOrCreateGroupBatch(newsgroup string) *GroupBatch
GetOrCreateGroupBatch returns the GroupBatch for a newsgroup, creating it if necessary This now spawns a dedicated worker goroutine for efficient per-group processing
type BridgeConfig ¶
type BridgeConfig struct {
// Fediverse configuration
FediverseEnabled bool `json:"fediverse_enabled"`
FediverseDomain string `json:"fediverse_domain"`
FediverseBaseURL string `json:"fediverse_base_url"`
// Matrix configuration
MatrixEnabled bool `json:"matrix_enabled"`
MatrixHomeserver string `json:"matrix_homeserver"`
MatrixAccessToken string `json:"matrix_access_token"`
MatrixUserID string `json:"matrix_user_id"`
}
type BridgeManager ¶
type BridgeManager struct {
// contains filtered or unexported fields
}
func NewBridgeManager ¶
func NewBridgeManager(config *BridgeConfig) *BridgeManager
func (*BridgeManager) BridgeArticle ¶
func (bm *BridgeManager) BridgeArticle(article *models.Article, newsgroup string)
func (*BridgeManager) Close ¶
func (bm *BridgeManager) Close()
func (*BridgeManager) IsAnyBridgeEnabled ¶
func (bm *BridgeManager) IsAnyBridgeEnabled() bool
func (*BridgeManager) RegisterNewsgroup ¶
func (bm *BridgeManager) RegisterNewsgroup(newsgroup *models.Newsgroup) error
type Counter ¶
type Counter struct {
Map map[string]int64 // Map to count articles per group
// contains filtered or unexported fields
}
func NewCounter ¶
func NewCounter() *Counter
func (*Counter) GetResetAll ¶
type DateRangeResult ¶
type DateRangeResult struct {
StartDate time.Time
EndDate time.Time
FirstArticleNum int64
LastArticleNum int64
ArticleCount int64
TotalBytes int64
}
DateRangeResult contains information about articles in a specific date range
type GroupAnalysis ¶
type GroupAnalysis struct {
GroupName string
ProviderName string
TotalArticles int64
TotalBytes int64
FirstArticle int64
LastArticle int64
OldestDate time.Time
NewestDate time.Time
AnalyzedAt time.Time
CacheExists bool
CachedArticles int64
SizeStats *ArticleSizeStats // Article size distribution statistics
}
GroupAnalysis contains analysis results for a newsgroup
func (*GroupAnalysis) ExportAnalysisToCSV ¶
func (analysis *GroupAnalysis) ExportAnalysisToCSV() string
ExportAnalysisToCSV exports group analysis results to CSV format
func (*GroupAnalysis) ExportAnalysisToJSON ¶
func (analysis *GroupAnalysis) ExportAnalysisToJSON() ([]byte, error)
ExportAnalysisToJSON exports group analysis results to JSON format
type GroupBatch ¶
type GroupBatch struct {
ReturnQ chan *BatchItem // Per-group channel to hold batch items to return
}
type GroupsEntry ¶
GroupsEntry represents a line from groups.txt
type LegacyArticle ¶
type LegacyArticle struct {
ID int `json:"id"`
Newsgroup string `json:"newsgroup"`
Number string `json:"number"`
MsgID string `json:"msgid"`
Date string `json:"date"`
Name string `json:"name"`
Subject string `json:"subject"`
//SearchSnippet string `json:"search_snippet"`
Article string `json:"article"` // wireformat! article incl. full headers, body and final DOT+CRLF pair!
}
LegacyArticle represents an article from the legacy SQLite database
type LegacyImporter ¶
type LegacyImporter struct {
// contains filtered or unexported fields
}
LegacyImporter handles importing RockSolid Light legacy configurations
func NewLegacyImporter ¶
func NewLegacyImporter(db *database.Database, etcPath, spoolPath string, useShortHashLen int) *LegacyImporter
NewLegacyImporter creates a new legacy importer
func (*LegacyImporter) Close ¶
func (leg *LegacyImporter) Close() error
Close properly shuts down the legacy importer Note: We don't close the processor as it may share database connections with the main application
func (*LegacyImporter) GetSectionsSummary ¶
func (leg *LegacyImporter) GetSectionsSummary() error
GetSectionsSummary returns a summary of imported sections
func (*LegacyImporter) ImportAllSQLiteDatabases ¶
func (leg *LegacyImporter) ImportAllSQLiteDatabases(sqliteDir string, threads int) error
ImportAllSQLiteDatabases imports from all SQLite databases in the legacy directory
func (*LegacyImporter) ImportSQLiteArticles ¶
func (leg *LegacyImporter) ImportSQLiteArticles(sqlitePath string) error
ImportSQLiteArticles imports articles from legacy SQLite databases
func (*LegacyImporter) ImportSections ¶
func (leg *LegacyImporter) ImportSections() error
ImportSections imports all sections from the legacy RockSolid Light installation
func (*LegacyImporter) QuickOpenToGetNewsgroup ¶
func (leg *LegacyImporter) QuickOpenToGetNewsgroup(sqlitePath string) (string, error)
type LegacyThread ¶
LegacyThread represents a thread from the legacy SQLite database
type MenuConfEntry ¶
MenuConfEntry represents a line from menu.conf
type PostQueueWorker ¶
type PostQueueWorker struct {
// contains filtered or unexported fields
}
PostQueueWorker processes articles from the web posting queue
func (*PostQueueWorker) Start ¶
func (w *PostQueueWorker) Start()
Start begins processing articles from the post queue
type Processor ¶
type Processor struct {
DB *database.Database
Pool *nntp.Pool
// Cache *MsgTmpCache // REMOVED: Migrated to MsgIdCache
ThreadCounter *Counter
LegacyCounter *Counter // Counter for legacy imports, if needed
//HisResChan chan int // Channel for history responses
History *history.History // History system for duplicate detection
MsgIdCache *history.MsgIdItemCache // Cache for message ID items
BridgeManager *BridgeManager // Bridge manager for Fediverse/Matrix (optional)
}
func NewProcessor ¶
func (*Processor) AddProcessedArticleToHistory ¶
func (proc *Processor) AddProcessedArticleToHistory(msgIdItem *history.MessageIdItem, newsgroupPtr *string, articleNumber int64)
AddProcessedArticleToHistory adds a successfully processed article to history with correct group and article number
func (*Processor) AnalyzeGroup ¶
func (proc *Processor) AnalyzeGroup(groupName string, options *AnalyzeOptions) (*GroupAnalysis, error)
AnalyzeGroup performs comprehensive analysis of a remote newsgroup
func (*Processor) AnalyzeMode ¶
func (proc *Processor) AnalyzeMode(testGrp string, forceRefresh bool, maxAnalyzeArticles int64, startDate string, endDate string, exportFormat string, validateCache bool, clearCache bool, cacheStats bool) (*GroupAnalysis, error)
AnalyzeMode performs newsgroup analysis operations using an existing processor instance
func (*Processor) CheckNoMoreWorkInHistory ¶
func (*Processor) ClearCache ¶
ClearCache removes cached overview data for a group
func (*Processor) DisableBridges ¶
func (proc *Processor) DisableBridges()
DisableBridges disables all bridges
func (*Processor) DownloadArticles ¶
func (proc *Processor) DownloadArticles(newsgroup string, DLParChan chan struct{}, progressDB *database.ProgressDB, start int64, end int64, shutdownChan <-chan struct{}) error
DownloadArticles fetches full articles and stores them in the articles DB.
func (*Processor) DownloadArticlesFromDate ¶
func (proc *Processor) DownloadArticlesFromDate(groupName string, startDate time.Time, DLParChan chan struct{}, progressDB *database.ProgressDB, groupInfo *nntp.GroupInfo, shutdownChan <-chan struct{}) error
DownloadArticlesFromDate fetches articles starting from a specific date Uses special progress tracking: sets progress to startArticle-1, or -1 if starting from article 1 This prevents DownloadArticles from using "no progress detected" logic for existing groups
func (*Processor) EnableBridges ¶
func (proc *Processor) EnableBridges(config *BridgeConfig)
EnableBridges enables Fediverse and/or Matrix bridges with the given configuration
func (*Processor) FindArticlesByDateRange ¶
func (proc *Processor) FindArticlesByDateRange(groupName string, startDate, endDate time.Time) (*DateRangeResult, error)
FindArticlesByDateRange finds articles within a specific date range
func (*Processor) FindStartArticleByDate ¶
func (proc *Processor) FindStartArticleByDate(groupName string, targetDate time.Time, groupInfo *nntp.GroupInfo) (int64, error)
FindStartArticleByDate finds the first article number on or after the given date using a simple binary search approach with XOVER data
func (*Processor) FindThreadRootInCache ¶
func (proc *Processor) FindThreadRootInCache(newsgroupPtr *string, refs []string) *database.MsgIdTmpCacheItem
FindThreadRootInCache - public wrapper for the interface
func (*Processor) ForceCloseGroupDBs ¶
ForceCloseGroupDBs implements the ThreadingProcessor interface Forces closure of group database connections
func (*Processor) GetArticleCountByDateRange ¶
func (proc *Processor) GetArticleCountByDateRange(groupName string, startDate, endDate time.Time) (int64, error)
GetArticleCountByDateRange returns the number of articles in a specific date range without full analysis
func (*Processor) GetCacheStats ¶
func (proc *Processor) GetCacheStats(groupName string) (*GroupAnalysis, error)
GetCacheStats returns statistics about cached overview data
func (*Processor) GetCachedMessageIDs ¶
func (proc *Processor) GetCachedMessageIDs(groupName string, startArticle, endArticle int64) ([]*nntp.HeaderLine, error)
GetCachedMessageIDs returns cached message IDs for download optimization
func (*Processor) GetHistoryStats ¶
func (proc *Processor) GetHistoryStats() history.HistoryStats
GetHistoryStats returns current history statistics
func (*Processor) GetXHDR ¶
func (proc *Processor) GetXHDR(groupName string, header string, start, end int64) ([]*nntp.HeaderLine, error)
GetXHDR fetches XHDR data for a group
func (*Processor) ImportOverview ¶
ImportOverview fetches XOVER data for a group and stores it in the overview DB.
func (*Processor) IsNewsGroupInSectionsDB ¶
func (*Processor) Lookup ¶
func (proc *Processor) Lookup(msgIdItem *history.MessageIdItem) (int, error)
Lookup looks up a message-ID in history and returns the storage token in the item
func (*Processor) MsgIdExists ¶
MsgIdExists implements the ThreadingProcessor interface Returns true if the message ID exists in the cache for the given group
func (*Processor) NewPostQueueWorker ¶
func (processor *Processor) NewPostQueueWorker() *PostQueueWorker
NewPostQueueWorker creates a new post queue worker
func (*Processor) ProcessIncomingArticle ¶
// AddArticleToHistory adds an article to history (public wrapper)
func (proc *Processor) AddArticleToHistory(article *nntp.Article, newsgroup string) {
proc.addArticleToHistory(article, newsgroup)
}
ProcessIncomingArticle processes an incoming article and stores it in the database
func (*Processor) ValidateCacheIntegrity ¶
ValidateCacheIntegrity checks if the cache file is valid and consistent
func (*Processor) WaitForBatchCompletion ¶
func (proc *Processor) WaitForBatchCompletion()
WaitForBatchCompletion waits for all pending batch operations to complete This should be called before closing the processor to ensure all articles are processed