processor

package
v0.4.7-1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2025 License: GPL-2.0 Imports: 23 Imported by: 0

Documentation

Overview

Package processor in this file imports RockSolid Light legacy configurations

Index

Constants

View Source
const (
	AnalyzeBatchSize = 10000 // Process overview in batches of 10k articles
)
View Source
const DebugProcessorHeaders bool = false // flag for legacy header processing

Variables

View Source
var (
	// these list of ' var ' can be set after importing the lib before starting!!
	MaxCrossPosts int = 15 // HARDCODED Maximum number of crossposts to allow per article

	LocalNNTPHostname string = "" // Hostname must be set before processing articles

	// MaxBatch defines the maximum number of articles to fetch in a single batch
	MaxBatchSize int64 = 128

	// UseStrictGroupValidation for group names, false allows upper-case in group names
	UseStrictGroupValidation = true

	// RunRSLIGHTImport is used to indicate if the importer should run the legacy RockSolid Light importer
	RunRSLIGHTImport = false

	// Global Batch Queue (proc_DLArt.go)
	Batch = &BatchQueue{
		Check:       make(chan *string),
		TodoQ:       make(chan *nntp.GroupInfo),
		GetQ:        make(chan *BatchItem),
		GroupQueues: make(map[string]*GroupBatch),
	}
)
View Source
var ErrIsEmptyGroup = fmt.Errorf("isEmptyGroup")
View Source
var ErrUpToDate = fmt.Errorf("up2date")
View Source
var GroupCounter = &Counter{
	Map: make(map[string]int64),
}
View Source
var InvalidDates = make(map[string]string)
View Source
var InvalidMutex = &sync.Mutex{}
View Source
var LOOPS_PER_GROUPS = 1
View Source
var NNTPDateLayouts = []string{}/* 335 elements not displayed */

Functions

func AnalyzeModeStandalone

func AnalyzeModeStandalone(host *string, port *int, username *string, password *string, ssl *bool, timeout *int,
	testGrp *string, forceRefresh *bool, maxAnalyzeArticles *int64,
	startDate *string, endDate *string, exportFormat *string, validateCache *bool,
	clearCache *bool, cacheStats *bool) error

AnalyzeModeStandalone is a wrapper for backward compatibility that creates its own pool

func CheckMessageIdFormat

func CheckMessageIdFormat(messageID string) bool

func ComputeMessageIDHash

func ComputeMessageIDHash(messageID string) string

ComputeMessageIDHash computes MD5 hash of a message-ID

func GetAnalysisCSVHeader

func GetAnalysisCSVHeader() string

GetCSVHeader returns the CSV header for analysis exports

func GetHostname

func GetHostname() string

GetHostname returns the currently configured hostname

func IsValidGroupName

func IsValidGroupName(name string) bool

IsValidGroupName validates a newsgroup name according to RFC standards Returns true if the group name is valid (lowercase, alphanumeric components separated by dots)

func ParseNNTPDate

func ParseNNTPDate(dateStr string) time.Time

parseNNTPDate parses an NNTP date string to time.Time, handling common NNTP quirks.

func SetHostname

func SetHostname(hostname string, db *database.Database) error

SetHostname sets and validates the hostname for NNTP operations This must be called before creating a new processor If hostname is empty, it will attempt to retrieve it from the database If hostname is provided, it will be saved to the database and used

Types

type AnalyzeOptions

type AnalyzeOptions struct {
	ForceRefresh   bool      // Force refresh even if cache exists
	StartDate      time.Time // Only analyze articles after this date
	EndDate        time.Time // Only analyze articles before this date
	MaxArticles    int64     // Maximum number of articles to analyze
	IncludeHeaders bool      // Include subject/from in cache for detailed analysis
}

AnalyzeOptions contains options for group analysis

type ArticleSizeStats

type ArticleSizeStats struct {
	Under4K       int64 // < 4KB
	Size4to16K    int64 // 4KB - 16KB
	Size16to32K   int64 // 16KB - 32KB
	Size32to64K   int64 // 32KB - 64KB
	Size64to128K  int64 // 64KB - 128KB
	Size128to256K int64 // 128KB - 256KB
	Size256to512K int64 // 256KB - 512KB
	Over512K      int64 // > 512KB
	TotalBytes    int64 // Total bytes across all articles
	TotalCount    int64 // Total article count
}

ArticleSizeStats tracks distribution of article sizes

func (*ArticleSizeStats) AddArticleSize

func (stats *ArticleSizeStats) AddArticleSize(bytes int64)

AddArticleSize adds an article size to the statistics

func (*ArticleSizeStats) PrintSizeDistribution

func (stats *ArticleSizeStats) PrintSizeDistribution()

PrintSizeDistribution prints a detailed breakdown of article sizes

type BatchItem

type BatchItem struct {
	MessageID  *string
	ArticleNum int64
	GroupName  *string
	Article    *models.Article
	Error      error
	ReturnQ    chan *BatchItem // Channel to return processed items
}

type BatchQueue

type BatchQueue struct {
	Mutex       sync.RWMutex
	GetQ        chan *BatchItem        // Channel to get articles for processing
	Check       chan *string           // Channel to check newsgroups
	TodoQ       chan *nntp.GroupInfo   // Channel to do newsgroups
	GroupQueues map[string]*GroupBatch // Per-newsgroup queues
}

func (*BatchQueue) GetOrCreateGroupBatch

func (bq *BatchQueue) GetOrCreateGroupBatch(newsgroup string) *GroupBatch

GetOrCreateGroupBatch returns the GroupBatch for a newsgroup, creating it if necessary This now spawns a dedicated worker goroutine for efficient per-group processing

type BridgeConfig

type BridgeConfig struct {
	// Fediverse configuration
	FediverseEnabled bool   `json:"fediverse_enabled"`
	FediverseDomain  string `json:"fediverse_domain"`
	FediverseBaseURL string `json:"fediverse_base_url"`

	// Matrix configuration
	MatrixEnabled     bool   `json:"matrix_enabled"`
	MatrixHomeserver  string `json:"matrix_homeserver"`
	MatrixAccessToken string `json:"matrix_access_token"`
	MatrixUserID      string `json:"matrix_user_id"`
}

type BridgeManager

type BridgeManager struct {
	// contains filtered or unexported fields
}

func NewBridgeManager

func NewBridgeManager(config *BridgeConfig) *BridgeManager

func (*BridgeManager) BridgeArticle

func (bm *BridgeManager) BridgeArticle(article *models.Article, newsgroup string)

func (*BridgeManager) Close

func (bm *BridgeManager) Close()

func (*BridgeManager) IsAnyBridgeEnabled

func (bm *BridgeManager) IsAnyBridgeEnabled() bool

func (*BridgeManager) RegisterNewsgroup

func (bm *BridgeManager) RegisterNewsgroup(newsgroup *models.Newsgroup) error

type Counter

type Counter struct {
	Map map[string]int64 // Map to count articles per group
	// contains filtered or unexported fields
}

func NewCounter

func NewCounter() *Counter

func (*Counter) Add

func (*Counter) Add(group string, value int64)

func (*Counter) GetReset

func (*Counter) GetReset(group string) int64

func (*Counter) GetResetAll

func (*Counter) GetResetAll() map[string]int64

func (*Counter) Increment

func (*Counter) Increment(group string)

type DateRangeResult

type DateRangeResult struct {
	StartDate       time.Time
	EndDate         time.Time
	FirstArticleNum int64
	LastArticleNum  int64
	ArticleCount    int64
	TotalBytes      int64
}

DateRangeResult contains information about articles in a specific date range

type GroupAnalysis

type GroupAnalysis struct {
	GroupName      string
	ProviderName   string
	TotalArticles  int64
	TotalBytes     int64
	FirstArticle   int64
	LastArticle    int64
	OldestDate     time.Time
	NewestDate     time.Time
	AnalyzedAt     time.Time
	CacheExists    bool
	CachedArticles int64
	SizeStats      *ArticleSizeStats // Article size distribution statistics
}

GroupAnalysis contains analysis results for a newsgroup

func (*GroupAnalysis) ExportAnalysisToCSV

func (analysis *GroupAnalysis) ExportAnalysisToCSV() string

ExportAnalysisToCSV exports group analysis results to CSV format

func (*GroupAnalysis) ExportAnalysisToJSON

func (analysis *GroupAnalysis) ExportAnalysisToJSON() ([]byte, error)

ExportAnalysisToJSON exports group analysis results to JSON format

type GroupBatch

type GroupBatch struct {
	ReturnQ chan *BatchItem // Per-group channel to hold batch items to return
}

type GroupsEntry

type GroupsEntry struct {
	Name        string
	Description string
	IsHeader    bool
}

GroupsEntry represents a line from groups.txt

type LegacyArticle

type LegacyArticle struct {
	ID        int    `json:"id"`
	Newsgroup string `json:"newsgroup"`
	Number    string `json:"number"`
	MsgID     string `json:"msgid"`
	Date      string `json:"date"`
	Name      string `json:"name"`
	Subject   string `json:"subject"`
	//SearchSnippet string `json:"search_snippet"`
	Article string `json:"article"` // wireformat! article incl. full headers, body and final DOT+CRLF pair!
}

LegacyArticle represents an article from the legacy SQLite database

type LegacyImporter

type LegacyImporter struct {
	// contains filtered or unexported fields
}

LegacyImporter handles importing RockSolid Light legacy configurations

func NewLegacyImporter

func NewLegacyImporter(db *database.Database, etcPath, spoolPath string, useShortHashLen int) *LegacyImporter

NewLegacyImporter creates a new legacy importer

func (*LegacyImporter) Close

func (leg *LegacyImporter) Close() error

Close properly shuts down the legacy importer Note: We don't close the processor as it may share database connections with the main application

func (*LegacyImporter) GetSectionsSummary

func (leg *LegacyImporter) GetSectionsSummary() error

GetSectionsSummary returns a summary of imported sections

func (*LegacyImporter) ImportAllSQLiteDatabases

func (leg *LegacyImporter) ImportAllSQLiteDatabases(sqliteDir string, threads int) error

ImportAllSQLiteDatabases imports from all SQLite databases in the legacy directory

func (*LegacyImporter) ImportSQLiteArticles

func (leg *LegacyImporter) ImportSQLiteArticles(sqlitePath string) error

ImportSQLiteArticles imports articles from legacy SQLite databases

func (*LegacyImporter) ImportSections

func (leg *LegacyImporter) ImportSections() error

ImportSections imports all sections from the legacy RockSolid Light installation

func (*LegacyImporter) QuickOpenToGetNewsgroup

func (leg *LegacyImporter) QuickOpenToGetNewsgroup(sqlitePath string) (string, error)

type LegacyThread

type LegacyThread struct {
	ID      int    `json:"id"`
	Headers string `json:"headers"`
}

LegacyThread represents a thread from the legacy SQLite database

type MenuConfEntry struct {
	Name             string
	ShowInHeader     bool
	EnableLocalSpool bool
}

MenuConfEntry represents a line from menu.conf

type PostQueueWorker

type PostQueueWorker struct {
	// contains filtered or unexported fields
}

PostQueueWorker processes articles from the web posting queue

func (*PostQueueWorker) Start

func (w *PostQueueWorker) Start()

Start begins processing articles from the post queue

func (*PostQueueWorker) Stop

func (w *PostQueueWorker) Stop()

Stop gracefully stops the worker

type Processor

type Processor struct {
	DB   *database.Database
	Pool *nntp.Pool
	// Cache         *MsgTmpCache // REMOVED: Migrated to MsgIdCache
	ThreadCounter *Counter
	LegacyCounter *Counter // Counter for legacy imports, if needed
	//HisResChan    chan int                // Channel for history responses
	History       *history.History        // History system for duplicate detection
	MsgIdCache    *history.MsgIdItemCache // Cache for message ID items
	BridgeManager *BridgeManager          // Bridge manager for Fediverse/Matrix (optional)
}

func NewProcessor

func NewProcessor(db *database.Database, nntpPool *nntp.Pool, useShortHashLen int) *Processor

func (*Processor) AddProcessedArticleToHistory

func (proc *Processor) AddProcessedArticleToHistory(msgIdItem *history.MessageIdItem, newsgroupPtr *string, articleNumber int64)

AddProcessedArticleToHistory adds a successfully processed article to history with correct group and article number

func (*Processor) AnalyzeGroup

func (proc *Processor) AnalyzeGroup(groupName string, options *AnalyzeOptions) (*GroupAnalysis, error)

AnalyzeGroup performs comprehensive analysis of a remote newsgroup

func (*Processor) AnalyzeMode

func (proc *Processor) AnalyzeMode(testGrp string, forceRefresh bool, maxAnalyzeArticles int64,
	startDate string, endDate string, exportFormat string, validateCache bool,
	clearCache bool, cacheStats bool) (*GroupAnalysis, error)

AnalyzeMode performs newsgroup analysis operations using an existing processor instance

func (*Processor) CheckNoMoreWorkInHistory

func (proc *Processor) CheckNoMoreWorkInHistory() bool

func (*Processor) ClearCache

func (proc *Processor) ClearCache(groupName string) error

ClearCache removes cached overview data for a group

func (*Processor) Close

func (proc *Processor) Close() error

Close gracefully shuts down the processor and history system

func (*Processor) DisableBridges

func (proc *Processor) DisableBridges()

DisableBridges disables all bridges

func (*Processor) DownloadArticles

func (proc *Processor) DownloadArticles(newsgroup string, DLParChan chan struct{}, progressDB *database.ProgressDB, start int64, end int64, shutdownChan <-chan struct{}) error

DownloadArticles fetches full articles and stores them in the articles DB.

func (*Processor) DownloadArticlesFromDate

func (proc *Processor) DownloadArticlesFromDate(groupName string, startDate time.Time, DLParChan chan struct{}, progressDB *database.ProgressDB, groupInfo *nntp.GroupInfo, shutdownChan <-chan struct{}) error

DownloadArticlesFromDate fetches articles starting from a specific date Uses special progress tracking: sets progress to startArticle-1, or -1 if starting from article 1 This prevents DownloadArticles from using "no progress detected" logic for existing groups

func (*Processor) EnableBridges

func (proc *Processor) EnableBridges(config *BridgeConfig)

EnableBridges enables Fediverse and/or Matrix bridges with the given configuration

func (*Processor) FindArticlesByDateRange

func (proc *Processor) FindArticlesByDateRange(groupName string, startDate, endDate time.Time) (*DateRangeResult, error)

FindArticlesByDateRange finds articles within a specific date range

func (*Processor) FindStartArticleByDate

func (proc *Processor) FindStartArticleByDate(groupName string, targetDate time.Time, groupInfo *nntp.GroupInfo) (int64, error)

FindStartArticleByDate finds the first article number on or after the given date using a simple binary search approach with XOVER data

func (*Processor) FindThreadRootInCache

func (proc *Processor) FindThreadRootInCache(newsgroupPtr *string, refs []string) *database.MsgIdTmpCacheItem

FindThreadRootInCache - public wrapper for the interface

func (*Processor) ForceCloseGroupDBs

func (proc *Processor) ForceCloseGroupDBs(groupsDB *database.GroupDBs) error

ForceCloseGroupDBs implements the ThreadingProcessor interface Forces closure of group database connections

func (*Processor) GetArticleCountByDateRange

func (proc *Processor) GetArticleCountByDateRange(groupName string, startDate, endDate time.Time) (int64, error)

GetArticleCountByDateRange returns the number of articles in a specific date range without full analysis

func (*Processor) GetCacheStats

func (proc *Processor) GetCacheStats(groupName string) (*GroupAnalysis, error)

GetCacheStats returns statistics about cached overview data

func (*Processor) GetCachedMessageIDs

func (proc *Processor) GetCachedMessageIDs(groupName string, startArticle, endArticle int64) ([]*nntp.HeaderLine, error)

GetCachedMessageIDs returns cached message IDs for download optimization

func (*Processor) GetHistoryStats

func (proc *Processor) GetHistoryStats() history.HistoryStats

GetHistoryStats returns current history statistics

func (*Processor) GetXHDR

func (proc *Processor) GetXHDR(groupName string, header string, start, end int64) ([]*nntp.HeaderLine, error)

GetXHDR fetches XHDR data for a group

func (*Processor) ImportOverview

func (proc *Processor) ImportOverview(groupName string) error

ImportOverview fetches XOVER data for a group and stores it in the overview DB.

func (*Processor) IsNewsGroupInSectionsDB

func (proc *Processor) IsNewsGroupInSectionsDB(name *string) bool

func (*Processor) Lookup

func (proc *Processor) Lookup(msgIdItem *history.MessageIdItem) (int, error)

Lookup looks up a message-ID in history and returns the storage token in the item

func (*Processor) MsgIdExists

func (proc *Processor) MsgIdExists(group *string, messageID string) bool

MsgIdExists implements the ThreadingProcessor interface Returns true if the message ID exists in the cache for the given group

func (*Processor) NewPostQueueWorker

func (processor *Processor) NewPostQueueWorker() *PostQueueWorker

NewPostQueueWorker creates a new post queue worker

func (*Processor) ProcessIncomingArticle

func (proc *Processor) ProcessIncomingArticle(article *models.Article) (int, error)

// AddArticleToHistory adds an article to history (public wrapper)

func (proc *Processor) AddArticleToHistory(article *nntp.Article, newsgroup string) {
	proc.addArticleToHistory(article, newsgroup)
}

ProcessIncomingArticle processes an incoming article and stores it in the database

func (*Processor) ValidateCacheIntegrity

func (proc *Processor) ValidateCacheIntegrity(groupName string) error

ValidateCacheIntegrity checks if the cache file is valid and consistent

func (*Processor) WaitForBatchCompletion

func (proc *Processor) WaitForBatchCompletion()

WaitForBatchCompletion waits for all pending batch operations to complete This should be called before closing the processor to ensure all articles are processed

func (*Processor) WantShutdown

func (proc *Processor) WantShutdown(shutdownChan <-chan struct{}) bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL