Documentation
¶
Index ¶
- Constants
- Variables
- func ComputeMessageIDHash(messageID string) string
- func DirExists(path string) bool
- func FileExists(path string) bool
- func GetShardConfig(mode int) (numDBs, tablesPerDB int, description string)
- func Mkdir(path string) bool
- func Str2int64(s string) int64
- func UnixTimeSec() int64
- type CCC
- type ClearCache
- type ClearCacheChan
- type DatabaseWorkChecker
- type History
- func (h *History) Add(msgIdItem *MessageIdItem)
- func (h *History) CheckNoMoreWorkInHistory() bool
- func (h *History) Close() error
- func (h *History) GetStats() HistoryStats
- func (h *History) Lookup(msgIdItem *MessageIdItem) (int, error)
- func (h *History) LookupWorker(wid int)
- func (h *History) ServerShutdown() bool
- func (h *History) SetDatabaseWorkChecker(checker DatabaseWorkChecker)
- type HistoryConfig
- type HistoryStats
- type L1CACHE
- type L1CACHEMAP
- type L1ECH
- type L1ITEM
- type L1MUXER
- type L1PQ
- type L1PQItem
- type L1pqQ
- type MessageIdItem
- type MsgIdItemCache
- func (c *MsgIdItemCache) AddMsgIdToCache(newsgroupPtr *string, messageID string, articleNum int64) bool
- func (c *MsgIdItemCache) CleanExpiredEntries() int
- func (c *MsgIdItemCache) Clear()
- func (c *MsgIdItemCache) Delete(messageId string) bool
- func (c *MsgIdItemCache) DetailedStats() (totalBuckets, occupiedBuckets, items, maxChainLength int, loadFactor float64)
- func (c *MsgIdItemCache) FNVKey(str string) int
- func (c *MsgIdItemCache) FindThreadRootInCache(newsgroupPtr *string, references []string) *MessageIdItem
- func (c *MsgIdItemCache) GetMsgIdFromCache(newsgroupPtr *string, messageID string) (int64, int64, bool)
- func (c *MsgIdItemCache) GetORCreate(messageId string) *MessageIdItem
- func (c *MsgIdItemCache) GetOrCreateForGroup(messageID string, newsgroupPtr *string) *MessageIdItem
- func (c *MsgIdItemCache) GetResizeInfo() (bucketCount int, itemCount int, loadFactor float64, isResizing bool)
- func (c *MsgIdItemCache) HasMessageIDInGroup(messageID string, newsgroupPtr *string) bool
- func (c *MsgIdItemCache) MsgIdExists(newsgroupPtr *string, messageID string) *MessageIdItem
- func (c *MsgIdItemCache) NewMsgIdItem(messageId string) *MessageIdItem
- func (c *MsgIdItemCache) SetThreadingInfo(messageID string, rootArticle int64, isThreadRoot bool) bool
- func (c *MsgIdItemCache) SetThreadingInfoForGroup(newsgroupPtr *string, messageID string, artNum int64, rootArticle int64, ...) bool
- func (c *MsgIdItemCache) StartCleanupRoutine()
- func (c *MsgIdItemCache) Stats() (buckets, items, maxChainLength int)
- func (c *MsgIdItemCache) UpdateThreadRootToTmpCache(newsgroupPtr *string, messageID string, rootArticle int64, isThreadRoot bool) bool
- type MsgIdItemCachePage
- type SQLite3DB
- type SQLite3Opts
- type SQLite3Pool
- type SQLite3ShardedDB
- type SQLite3ShardedPool
- type ShardConfig
- type ThreadingInfo
Constants ¶
const ( // History file constants DefaultHistoryDir = "data/history" // TODO set via config HistoryFileName = "history.dat" // Cache configuration DefaultCacheExpires = 15 // seconds DefaultCachePurge = 5 // seconds // History Write Batching configuration DefaultBatchSize = 10000 // Number of entries to batch before flushing (reduced from 10000 to fix memory bloat) DefaultBatchTimeout = 5000 // Milliseconds to wait before forced flush // Sharding configuration constants SHARD_16_256 = 2 // 16 DBs with 256 tables each (recommended) )
const ( DefaultL1CacheExpires int64 = 5 // gets x2 BatchFlushEvery x2 DefaultL1CacheExtend int64 = 5 // extends cached items after writes DefaultL1CachePurge int64 = 1 // checks ttl every N seconds. affects CacheExpires/Extend max to + Purge DefaultEvictsCapacity = 16 // his.cEvCap (size of Extend chan) is normally fine as is. ClearEveryN = 16 )
const ( CaseLock = 0xFF // internal cache state. reply with CaseRetry while CaseLock CasePass = 0xF1 // is a reply to L1Lock and IndexQuery CaseDupes = 0x1C // is a reply and cache state CaseRetry = 0x2C // is a reply to if CaseLock or CaseWrite or if history.dat returns EOF //CaseAdded = 0x3C // is a reply to WriterChan:responseChan CaseWrite = 0x4C // internal cache state. is not a reply. reply with CaseRetry while CaseWrite is happening CaseError = 0xE1 // some things drop this error )
const DefaultStorageSystem = 0x1
const DefaultStorageType = StorageTypeSQLite3 // Default storage type for history (no other available, maybe in the future!)
const ENABLE_HISTORY = false // EXPERIMENTAL !
const (
StorageTypeSQLite3 = 0x17 // Storage type for SQLite3
)
Variables ¶
var ( DEBUGL1 bool = false L1 bool = true // better not disable L1 cache... L1CacheExpires int64 = DefaultL1CacheExpires L1ExtendExpires int64 = DefaultL1CacheExtend L1Purge int64 = DefaultL1CachePurge L1InitSize int = 1024 // L1LockDelay: delays L1 locking by N milliseconds // L1 locking is most likely done per client-connection // settings this greater 0 limits the amount of articles a client can lock&send // 1ms is a max of 1000 messages/sec per conn // 100ms is a max of 10 messages/sec per conn // 250ms is a max of 4 messages/sec per conn // 1000ms is a max of 1 message /sec per conn // text peers mostly dont need more than 4 msg per sec L1LockDelay int = 0 )
var ( MIICmutex sync.RWMutex // Global mutex for the cache MsgIdCache *MsgIdItemCache // Global instance of the message ID item cache UpperLimitMsgIdCacheSize = 1024 * 1024 DefaultMsgIdCacheSize = 4 * 1024 // N buckets MaxLoadFactor = 0.75 // Resize when load factor exceeds this ResizeMultiplier = 2 // Double the size when resizing // TTL Configuration Constants TmpCacheTTL = 15 * time.Second // TTL for temporary processing (CaseWrite items) // @AI!!! DO NOT CHANGE THIS!!!! CachedEntryTTL = 15 * time.Second // TTL for cache entries (CaseDupes items after flushing) // @AI!!! DO NOT CHANGE THIS!!!! ErrorCaseTTL = 15 * time.Second // TTL for error cases (shorter to retry sooner) // @AI!!! DO NOT CHANGE THIS!!!! MaxCachedEntryTTL = 15 * time.Second // Maximum TTL for any cache entry // @AI!!! DO NOT CHANGE THIS!!!! )
var DBTABLES = generateCombinations(HEXCHARS, 2, []string{}, []string{}) // 256
var HEXCHARS = []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"} // 16
var HistoryDEBUG = false // Set to true for spammy debug logs
var MaxLookupWorkers = runtime.NumCPU() // Use number of CPU cores for lookup workers
var NOTIFY = struct{}{}
Functions ¶
func ComputeMessageIDHash ¶
ComputeMessageIDHash computes MD5 hash of a message-ID
func GetShardConfig ¶
GetShardConfig returns the configuration for a given shard mode
Types ¶
type ClearCache ¶
type ClearCache struct {
// contains filtered or unexported fields
}
type ClearCacheChan ¶
type ClearCacheChan struct {
// contains filtered or unexported fields
}
type DatabaseWorkChecker ¶
type DatabaseWorkChecker interface {
CheckNoMoreWorkInMaps() bool
}
DatabaseWorkChecker interface allows History to check if database batch system has pending work
type History ¶
type History struct {
HistoryFilePath string
// contains filtered or unexported fields
}
History manages message-ID history tracking using INN2-style architecture
func NewHistory ¶
func NewHistory(config *HistoryConfig, mainWG *sync.WaitGroup) (*History, error)
NewHistory creates a new history manager The mainWG parameter should be the main application's waitgroup that will coordinate shutdown
func (*History) Add ¶
func (h *History) Add(msgIdItem *MessageIdItem)
Add adds a new message-ID to history
func (*History) CheckNoMoreWorkInHistory ¶
CheckNoMoreWorkInHistory checks if there's no more pending work (similar to CheckNoMoreWorkInMaps)
func (*History) GetStats ¶
func (h *History) GetStats() HistoryStats
GetStats returns current statistics
func (*History) Lookup ¶
func (h *History) Lookup(msgIdItem *MessageIdItem) (int, error)
Lookup checks if a message-ID exists in history Returns: ResponsePass (0) = not found, ResponseDuplicate (1) = found, ResponseRetry (2) = error
func (*History) LookupWorker ¶
func (*History) ServerShutdown ¶
func (*History) SetDatabaseWorkChecker ¶
func (h *History) SetDatabaseWorkChecker(checker DatabaseWorkChecker)
SetDatabaseWorkChecker sets the database work checker interface for coordinated shutdown
type HistoryConfig ¶
type HistoryConfig struct {
HistoryDir string `yaml:"history_dir" json:"history_dir"`
CacheExpires int64 `yaml:"cache_expires" json:"cache_expires"`
CachePurge int64 `yaml:"cache_purge" json:"cache_purge"`
ShardMode int `yaml:"shard_mode" json:"shard_mode"`
MaxConnections int `yaml:"max_connections" json:"max_connections"`
UseShortHashLen int `yaml:"use_short_hash_len" json:"use_short_hash_len"` // 2-7 chars stored in DB (default 3)
// Batching configuration for high-throughput writes
BatchSize int `yaml:"batch_size" json:"batch_size"` // Number of entries to batch (default 200)
BatchTimeout int64 `yaml:"batch_timeout" json:"batch_timeout"` // Timeout in milliseconds for forced flush (default 5000)
}
HistoryConfig holds configuration for the history system
func DefaultConfig ¶
func DefaultConfig() *HistoryConfig
DefaultConfig returns a default history configuration
func (*HistoryConfig) ValidateConfig ¶
func (c *HistoryConfig) ValidateConfig() error
ValidateConfig validates and adjusts configuration values
type HistoryStats ¶
type HistoryStats struct {
TotalLookups int64
TotalFileLookups int64
TotalAdds int64
CacheHits int64
CacheMisses int64
Duplicates int64
Errors int64
// contains filtered or unexported fields
}
HistoryEntry represents an entry in the history system HistoryStats tracks statistics for the history system
type L1CACHE ¶
type L1CACHE struct {
Caches map[string]*L1CACHEMAP
Extend map[string]*L1ECH
Muxers map[string]*L1MUXER
Counter map[string]*CCC
// contains filtered or unexported fields
}
func (*L1CACHE) BootL1Cache ¶
func (l1 *L1CACHE) BootL1Cache()
The BootL1Cache method initializes the cache system. It creates cache maps, initializes them with initial sizes, and starts goroutines to periodically purge expired entries.
func (*L1CACHE) LockL1Cache ¶
The LockL1Cache method is used to LOCK a `MessageIDHash` for processing. If the value is not in the cache or has expired, it locks the cache, updates the cache with a new value, and returns the value. Possible return values:
CaseLock == already in processing CaseWrite == already in processing CaseDupes == is a duplicate CasePass == not a duplicate == locked article for processing
type L1CACHEMAP ¶
type L1CACHEMAP struct {
// contains filtered or unexported fields
}
type MessageIdItem ¶
type MessageIdItem struct {
Mux sync.RWMutex // Protects all fields below
CachedEntryExpires time.Time // Exported field for cache entry expiration
MessageIdHash string // Computed hash of the message-ID
StorageToken string // pointer to storage token
MessageId string // pointer to article.messageid
ArtNum int64 // Article number in the history (primary/first occurrence)
GroupName *string // Group name this article belongs to (primary group)
Arrival int64 // When article arrived
Response int // @AI IGNORE Response“` FOR THE MOMENT; RESPONSE LOGIC NEEDS PROPER THINKING! NOT A JOB FOR NOW!
FileOffset int64 // File offset in history.dat where this entry is stored
// Group-specific threading information (replaces global threading fields)
GroupThreading map[*string]*ThreadingInfo // Per-group threading info for crossposted articles
}
type MsgIdItemCache ¶
type MsgIdItemCache struct {
Pages map[int]*MsgIdItemCachePage
// contains filtered or unexported fields
}
func NewMsgIdItemCache ¶
func NewMsgIdItemCache() *MsgIdItemCache
func (*MsgIdItemCache) AddMsgIdToCache ¶
func (c *MsgIdItemCache) AddMsgIdToCache(newsgroupPtr *string, messageID string, articleNum int64) bool
AddMsgIdToCache adds a message ID with article number to the cache for a specific group
func (*MsgIdItemCache) CleanExpiredEntries ¶
func (c *MsgIdItemCache) CleanExpiredEntries() int
CleanExpiredEntries removes expired temporary cache entries This replaces the functionality from MsgTmpCache.CronClean Efficiently unlinks expired items during chain traversal to avoid double-walking
func (*MsgIdItemCache) Clear ¶
func (c *MsgIdItemCache) Clear()
Clear removes all items from the cache
func (*MsgIdItemCache) Delete ¶
func (c *MsgIdItemCache) Delete(messageId string) bool
func (*MsgIdItemCache) DetailedStats ¶
func (c *MsgIdItemCache) DetailedStats() (totalBuckets, occupiedBuckets, items, maxChainLength int, loadFactor float64)
DetailedStats returns comprehensive cache statistics for monitoring and debugging
func (*MsgIdItemCache) FNVKey ¶
func (c *MsgIdItemCache) FNVKey(str string) int
FNVKey efficiently calculates a hash for the given string We use a simple approach that's still efficient for moderate loads
func (*MsgIdItemCache) FindThreadRootInCache ¶
func (c *MsgIdItemCache) FindThreadRootInCache(newsgroupPtr *string, references []string) *MessageIdItem
FindThreadRootInCache searches for thread root in cache by following references This replaces the functionality from MsgTmpCache.FindThreadRootInCache
func (*MsgIdItemCache) GetMsgIdFromCache ¶
func (c *MsgIdItemCache) GetMsgIdFromCache(newsgroupPtr *string, messageID string) (int64, int64, bool)
GetMsgIdFromCache retrieves threading information for a message ID in a specific group This replaces the functionality from MsgTmpCache.GetMsgIdFromTmpCache
func (*MsgIdItemCache) GetORCreate ¶
func (c *MsgIdItemCache) GetORCreate(messageId string) *MessageIdItem
func (*MsgIdItemCache) GetOrCreateForGroup ¶
func (c *MsgIdItemCache) GetOrCreateForGroup(messageID string, newsgroupPtr *string) *MessageIdItem
GetOrCreateForGroup gets or creates a message ID item for a specific group This provides group-specific functionality similar to MsgTmpCache
func (*MsgIdItemCache) GetResizeInfo ¶
func (c *MsgIdItemCache) GetResizeInfo() (bucketCount int, itemCount int, loadFactor float64, isResizing bool)
GetResizeInfo returns information about cache resizing for monitoring
func (*MsgIdItemCache) HasMessageIDInGroup ¶
func (c *MsgIdItemCache) HasMessageIDInGroup(messageID string, newsgroupPtr *string) bool
HasMessageIDInGroup checks if a message ID exists in a specific group and hasn't expired
func (*MsgIdItemCache) MsgIdExists ¶
func (c *MsgIdItemCache) MsgIdExists(newsgroupPtr *string, messageID string) *MessageIdItem
MsgIdExists checks if a message ID exists in the cache for a specific group This replaces the functionality from MsgTmpCache.MsgIdExists
func (*MsgIdItemCache) NewMsgIdItem ¶
func (c *MsgIdItemCache) NewMsgIdItem(messageId string) *MessageIdItem
func (*MsgIdItemCache) SetThreadingInfo ¶
func (c *MsgIdItemCache) SetThreadingInfo(messageID string, rootArticle int64, isThreadRoot bool) bool
SetThreadingInfo sets threading information for a message ID in a specific group
func (*MsgIdItemCache) SetThreadingInfoForGroup ¶
func (c *MsgIdItemCache) SetThreadingInfoForGroup(newsgroupPtr *string, messageID string, artNum int64, rootArticle int64, isThreadRoot bool) bool
SetThreadingInfoForGroup sets threading information for a message ID in a specific group
func (*MsgIdItemCache) StartCleanupRoutine ¶
func (c *MsgIdItemCache) StartCleanupRoutine()
StartCleanupRoutine starts a background goroutine to clean expired entries
func (*MsgIdItemCache) Stats ¶
func (c *MsgIdItemCache) Stats() (buckets, items, maxChainLength int)
Stats returns cache statistics for monitoring and debugging
func (*MsgIdItemCache) UpdateThreadRootToTmpCache ¶
func (c *MsgIdItemCache) UpdateThreadRootToTmpCache(newsgroupPtr *string, messageID string, rootArticle int64, isThreadRoot bool) bool
UpdateThreadRootToTmpCache updates an existing cache entry with thread root information This replaces the functionality from MsgTmpCache.UpdateThreadRootToTmpCache
type MsgIdItemCachePage ¶
type MsgIdItemCachePage struct {
MsgIdItem *MessageIdItem
Next *MsgIdItemCachePage
Prev *MsgIdItemCachePage
// contains filtered or unexported fields
}
type SQLite3DB ¶
SQLite3DB represents a SQLite database connection pool
func NewSQLite3DB ¶
func NewSQLite3DB(opts *SQLite3Opts, createTables bool, useShortHashLen int, mode int) (*SQLite3DB, error)
NewSQLite3DB creates a new SQLite3 database pool
type SQLite3Opts ¶
type SQLite3Opts struct {
// contains filtered or unexported fields
}
SQLite3Opts holds configuration for SQLite3 database
type SQLite3Pool ¶
type SQLite3Pool interface {
GetDB(write bool) (*sql.DB, error)
ReturnDB(db *sql.DB)
Close() error
}
SQLite3Pool interface for database operations (legacy, single DB only)
type SQLite3ShardedDB ¶
type SQLite3ShardedDB struct {
DBPools []*SQLite3DB
// contains filtered or unexported fields
}
SQLite3ShardedDB manages multiple SQLite databases for sharding
func NewSQLite3ShardedDB ¶
func NewSQLite3ShardedDB(config *ShardConfig, createTables bool, useShortHashLen int) (*SQLite3ShardedDB, error)
NewSQLite3ShardedDB creates a new sharded SQLite3 database system
func (*SQLite3ShardedDB) Close ¶
func (s *SQLite3ShardedDB) Close() error
Close closes all database connections (implements both SQLite3Pool and SQLite3ShardedPool interfaces)
func (*SQLite3ShardedDB) CreateAllTables ¶
func (s *SQLite3ShardedDB) CreateAllTables(useShortHashLen int) error
CreateAllTables creates all required tables across all databases
func (*SQLite3ShardedDB) GetShardedDB ¶
GetShardedDB returns a database connection for a specific shard (implements SQLite3ShardedPool interface)
type SQLite3ShardedPool ¶
type SQLite3ShardedPool interface {
GetShardedDB(dbIndex int, write bool) (*sql.DB, error)
//ReturnShardedDB(dbIndex int, db *sql.DB)
Close() error
}
SQLite3ShardedPool interface for sharded database operations
type ShardConfig ¶
type ShardConfig struct {
Mode int // Sharding mode (0-5)
BaseDir string // Base directory for database files
MaxOpenPerDB int // Max connections per database
Timeout int64 // Connection timeout
}
ShardConfig defines the sharding configuration
type ThreadingInfo ¶
type ThreadingInfo struct {
RootArticle int64 // Thread root article number (0 if this IS the root)
ChildArticle int64 // Child article reference (for threading hierarchy)
IsThreadRoot bool // True if this article is a thread root
ArtNum int64 // Article number in this specific group
}
ThreadingInfo holds per-group threading information for a message