history

package
v0.4.7-1-testing-2025-... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 3, 2025 License: GPL-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// History file constants
	DefaultHistoryDir = "data/history" // TODO set via config
	HistoryFileName   = "history.dat"

	// Cache configuration
	DefaultCacheExpires = 15 // seconds
	DefaultCachePurge   = 5  // seconds

	// History Write Batching configuration
	DefaultBatchSize    = 10000 // Number of entries to batch before flushing (reduced from 10000 to fix memory bloat)
	DefaultBatchTimeout = 5000  // Milliseconds to wait before forced flush

	// Sharding configuration constants
	SHARD_16_256 = 2 // 16 DBs with 256 tables each (recommended)
)
View Source
const (
	DefaultL1CacheExpires int64 = 5  // gets x2 BatchFlushEvery x2
	DefaultL1CacheExtend  int64 = 5  // extends cached items after writes
	DefaultL1CachePurge   int64 = 1  // checks ttl every N seconds. affects CacheExpires/Extend max to + Purge
	DefaultEvictsCapacity       = 16 // his.cEvCap (size of Extend chan) is normally fine as is.
	ClearEveryN                 = 16
)
View Source
const (
	CaseLock  = 0xFF // internal cache state. reply with CaseRetry while CaseLock
	CasePass  = 0xF1 // is a reply to L1Lock and IndexQuery
	CaseDupes = 0x1C // is a reply and cache state
	CaseRetry = 0x2C // is a reply to if CaseLock or CaseWrite or if history.dat returns EOF
	//CaseAdded = 0x3C // is a reply to WriterChan:responseChan
	CaseWrite = 0x4C // internal cache state. is not a reply. reply with CaseRetry while CaseWrite is happening
	CaseError = 0xE1 // some things drop this error
)
View Source
const DefaultStorageSystem = 0x1
View Source
const DefaultStorageType = StorageTypeSQLite3 // Default storage type for history (no other available, maybe in the future!)
View Source
const ENABLE_HISTORY = false // EXPERIMENTAL !
View Source
const (
	StorageTypeSQLite3 = 0x17 // Storage type for SQLite3

)

Variables

View Source
var (
	DEBUGL1         bool  = false
	L1              bool  = true // better not disable L1 cache...
	L1CacheExpires  int64 = DefaultL1CacheExpires
	L1ExtendExpires int64 = DefaultL1CacheExtend
	L1Purge         int64 = DefaultL1CachePurge
	L1InitSize      int   = 1024

	// L1LockDelay: delays L1 locking by N milliseconds
	// L1 locking is most likely done per client-connection
	// settings this greater 0 limits the amount of articles a client can lock&send
	//    1ms is a max of 1000 messages/sec per conn
	//  100ms is a max of   10 messages/sec per conn
	//  250ms is a max of    4 messages/sec per conn
	// 1000ms is a max of    1 message /sec per conn
	// text peers mostly dont need more than 4 msg per sec
	L1LockDelay int = 0
)
View Source
var (
	MIICmutex  sync.RWMutex    // Global mutex for the cache
	MsgIdCache *MsgIdItemCache // Global instance of the message ID item cache

	UpperLimitMsgIdCacheSize = 1024 * 1024

	DefaultMsgIdCacheSize = 4 * 1024 // N buckets
	MaxLoadFactor         = 0.75     // Resize when load factor exceeds this
	ResizeMultiplier      = 2        // Double the size when resizing

	// TTL Configuration Constants
	TmpCacheTTL       = 15 * time.Second // TTL for temporary processing (CaseWrite items) // @AI!!! DO NOT CHANGE THIS!!!!
	CachedEntryTTL    = 15 * time.Second // TTL for cache entries (CaseDupes items after flushing) // @AI!!! DO NOT CHANGE THIS!!!!
	ErrorCaseTTL      = 15 * time.Second // TTL for error cases (shorter to retry sooner)  // @AI!!! DO NOT CHANGE THIS!!!!
	MaxCachedEntryTTL = 15 * time.Second // Maximum TTL for any cache entry // @AI!!! DO NOT CHANGE THIS!!!!
)
View Source
var DBTABLES = generateCombinations(HEXCHARS, 2, []string{}, []string{}) // 256
View Source
var HEXCHARS = []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"} // 16
View Source
var HistoryDEBUG = false // Set to true for spammy debug logs
View Source
var MaxLookupWorkers = runtime.NumCPU() // Use number of CPU cores for lookup workers
View Source
var NOTIFY = struct{}{}

Functions

func ComputeMessageIDHash

func ComputeMessageIDHash(messageID string) string

ComputeMessageIDHash computes MD5 hash of a message-ID

func DirExists

func DirExists(path string) bool

DirExists checks if directory exists

func FileExists

func FileExists(path string) bool

FileExists checks if file exists

func GetShardConfig

func GetShardConfig(mode int) (numDBs, tablesPerDB int, description string)

GetShardConfig returns the configuration for a given shard mode

func Mkdir

func Mkdir(path string) bool

Mkdir creates directory and all parent directories

func Str2int64

func Str2int64(s string) int64

Str2int64 converts string to int64, returns 0 on error

func UnixTimeSec

func UnixTimeSec() int64

UnixTimeSec returns current Unix timestamp in seconds

Types

type CCC

type CCC struct {
	Counter map[string]uint64 // counter key: value
}

CharCacheCounter

type ClearCache

type ClearCache struct {
	// contains filtered or unexported fields
}

type ClearCacheChan

type ClearCacheChan struct {
	// contains filtered or unexported fields
}

type DatabaseWorkChecker

type DatabaseWorkChecker interface {
	CheckNoMoreWorkInMaps() bool
}

DatabaseWorkChecker interface allows History to check if database batch system has pending work

type History

type History struct {
	HistoryFilePath string
	// contains filtered or unexported fields
}

History manages message-ID history tracking using INN2-style architecture

func NewHistory

func NewHistory(config *HistoryConfig, mainWG *sync.WaitGroup) (*History, error)

NewHistory creates a new history manager The mainWG parameter should be the main application's waitgroup that will coordinate shutdown

func (*History) Add

func (h *History) Add(msgIdItem *MessageIdItem)

Add adds a new message-ID to history

func (*History) CheckNoMoreWorkInHistory

func (h *History) CheckNoMoreWorkInHistory() bool

CheckNoMoreWorkInHistory checks if there's no more pending work (similar to CheckNoMoreWorkInMaps)

func (*History) Close

func (h *History) Close() error

Close gracefully shuts down the history system

func (*History) GetStats

func (h *History) GetStats() HistoryStats

GetStats returns current statistics

func (*History) Lookup

func (h *History) Lookup(msgIdItem *MessageIdItem) (int, error)

Lookup checks if a message-ID exists in history Returns: ResponsePass (0) = not found, ResponseDuplicate (1) = found, ResponseRetry (2) = error

func (*History) LookupWorker

func (h *History) LookupWorker(wid int)

func (*History) ServerShutdown

func (h *History) ServerShutdown() bool

func (*History) SetDatabaseWorkChecker

func (h *History) SetDatabaseWorkChecker(checker DatabaseWorkChecker)

SetDatabaseWorkChecker sets the database work checker interface for coordinated shutdown

type HistoryConfig

type HistoryConfig struct {
	HistoryDir      string `yaml:"history_dir" json:"history_dir"`
	CacheExpires    int64  `yaml:"cache_expires" json:"cache_expires"`
	CachePurge      int64  `yaml:"cache_purge" json:"cache_purge"`
	ShardMode       int    `yaml:"shard_mode" json:"shard_mode"`
	MaxConnections  int    `yaml:"max_connections" json:"max_connections"`
	UseShortHashLen int    `yaml:"use_short_hash_len" json:"use_short_hash_len"` // 2-7 chars stored in DB (default 3)

	// Batching configuration for high-throughput writes
	BatchSize    int   `yaml:"batch_size" json:"batch_size"`       // Number of entries to batch (default 200)
	BatchTimeout int64 `yaml:"batch_timeout" json:"batch_timeout"` // Timeout in milliseconds for forced flush (default 5000)

}

HistoryConfig holds configuration for the history system

func DefaultConfig

func DefaultConfig() *HistoryConfig

DefaultConfig returns a default history configuration

func (*HistoryConfig) ValidateConfig

func (c *HistoryConfig) ValidateConfig() error

ValidateConfig validates and adjusts configuration values

type HistoryStats

type HistoryStats struct {
	TotalLookups     int64
	TotalFileLookups int64
	TotalAdds        int64
	CacheHits        int64
	CacheMisses      int64
	Duplicates       int64
	Errors           int64
	// contains filtered or unexported fields
}

HistoryEntry represents an entry in the history system HistoryStats tracks statistics for the history system

type L1CACHE

type L1CACHE struct {
	Caches  map[string]*L1CACHEMAP
	Extend  map[string]*L1ECH
	Muxers  map[string]*L1MUXER
	Counter map[string]*CCC
	// contains filtered or unexported fields
}

func (*L1CACHE) BootL1Cache

func (l1 *L1CACHE) BootL1Cache()

The BootL1Cache method initializes the cache system. It creates cache maps, initializes them with initial sizes, and starts goroutines to periodically purge expired entries.

func (*L1CACHE) L1Stats

func (l1 *L1CACHE) L1Stats(statskey string) (retval uint64, retmap map[string]uint64)

func (*L1CACHE) LockL1Cache

func (l1 *L1CACHE) LockL1Cache(hash string, value int) int

The LockL1Cache method is used to LOCK a `MessageIDHash` for processing. If the value is not in the cache or has expired, it locks the cache, updates the cache with a new value, and returns the value. Possible return values:

CaseLock == already in processing
CaseWrite == already in processing
CaseDupes == is a duplicate
CasePass == not a duplicate == locked article for processing

func (*L1CACHE) Set

func (l1 *L1CACHE) Set(hash string, char string, value int, flagexpires bool)

The Set method is used to set a value in the cache. If the cache size is close to its maximum, it grows the cache.

type L1CACHEMAP

type L1CACHEMAP struct {
	// contains filtered or unexported fields
}

type L1ECH

type L1ECH struct {
	// contains filtered or unexported fields
}

L1ExtendChan

type L1ITEM

type L1ITEM struct {
	// contains filtered or unexported fields
}

type L1MUXER

type L1MUXER struct {
	// contains filtered or unexported fields
}

type L1PQ

type L1PQ []*L1PQItem

type L1PQItem

type L1PQItem struct {
	Key     string
	Expires int64
}

type L1pqQ

type L1pqQ struct {
	// contains filtered or unexported fields
}

func (*L1pqQ) Pop

func (pq *L1pqQ) Pop() (*L1PQItem, int)

func (*L1pqQ) Push

func (pq *L1pqQ) Push(item *L1PQItem)

type MessageIdItem

type MessageIdItem struct {
	Mux                sync.RWMutex // Protects all fields below
	CachedEntryExpires time.Time    // Exported field for cache entry expiration
	MessageIdHash      string       // Computed hash of the message-ID
	StorageToken       string       // pointer to storage token
	MessageId          string       // pointer to article.messageid
	ArtNum             int64        // Article number in the history (primary/first occurrence)
	GroupName          *string      // Group name this article belongs to (primary group)
	Arrival            int64        // When article arrived
	Response           int          // @AI IGNORE Response“` FOR THE MOMENT; RESPONSE LOGIC NEEDS PROPER THINKING! NOT A JOB FOR NOW!
	FileOffset         int64        // File offset in history.dat where this entry is stored

	// Group-specific threading information (replaces global threading fields)
	GroupThreading map[*string]*ThreadingInfo // Per-group threading info for crossposted articles
}

type MsgIdItemCache

type MsgIdItemCache struct {
	Pages map[int]*MsgIdItemCachePage
	// contains filtered or unexported fields
}

func NewMsgIdItemCache

func NewMsgIdItemCache() *MsgIdItemCache

func (*MsgIdItemCache) AddMsgIdToCache

func (c *MsgIdItemCache) AddMsgIdToCache(newsgroupPtr *string, messageID string, articleNum int64) bool

AddMsgIdToCache adds a message ID with article number to the cache for a specific group

func (*MsgIdItemCache) CleanExpiredEntries

func (c *MsgIdItemCache) CleanExpiredEntries() int

CleanExpiredEntries removes expired temporary cache entries This replaces the functionality from MsgTmpCache.CronClean Efficiently unlinks expired items during chain traversal to avoid double-walking

func (*MsgIdItemCache) Clear

func (c *MsgIdItemCache) Clear()

Clear removes all items from the cache

func (*MsgIdItemCache) Delete

func (c *MsgIdItemCache) Delete(messageId string) bool

func (*MsgIdItemCache) DetailedStats

func (c *MsgIdItemCache) DetailedStats() (totalBuckets, occupiedBuckets, items, maxChainLength int, loadFactor float64)

DetailedStats returns comprehensive cache statistics for monitoring and debugging

func (*MsgIdItemCache) FNVKey

func (c *MsgIdItemCache) FNVKey(str string) int

FNVKey efficiently calculates a hash for the given string We use a simple approach that's still efficient for moderate loads

func (*MsgIdItemCache) FindThreadRootInCache

func (c *MsgIdItemCache) FindThreadRootInCache(newsgroupPtr *string, references []string) *MessageIdItem

FindThreadRootInCache searches for thread root in cache by following references This replaces the functionality from MsgTmpCache.FindThreadRootInCache

func (*MsgIdItemCache) GetMsgIdFromCache

func (c *MsgIdItemCache) GetMsgIdFromCache(newsgroupPtr *string, messageID string) (int64, int64, bool)

GetMsgIdFromCache retrieves threading information for a message ID in a specific group This replaces the functionality from MsgTmpCache.GetMsgIdFromTmpCache

func (*MsgIdItemCache) GetORCreate

func (c *MsgIdItemCache) GetORCreate(messageId string) *MessageIdItem

func (*MsgIdItemCache) GetOrCreateForGroup

func (c *MsgIdItemCache) GetOrCreateForGroup(messageID string, newsgroupPtr *string) *MessageIdItem

GetOrCreateForGroup gets or creates a message ID item for a specific group This provides group-specific functionality similar to MsgTmpCache

func (*MsgIdItemCache) GetResizeInfo

func (c *MsgIdItemCache) GetResizeInfo() (bucketCount int, itemCount int, loadFactor float64, isResizing bool)

GetResizeInfo returns information about cache resizing for monitoring

func (*MsgIdItemCache) HasMessageIDInGroup

func (c *MsgIdItemCache) HasMessageIDInGroup(messageID string, newsgroupPtr *string) bool

HasMessageIDInGroup checks if a message ID exists in a specific group and hasn't expired

func (*MsgIdItemCache) MsgIdExists

func (c *MsgIdItemCache) MsgIdExists(newsgroupPtr *string, messageID string) *MessageIdItem

MsgIdExists checks if a message ID exists in the cache for a specific group This replaces the functionality from MsgTmpCache.MsgIdExists

func (*MsgIdItemCache) NewMsgIdItem

func (c *MsgIdItemCache) NewMsgIdItem(messageId string) *MessageIdItem

func (*MsgIdItemCache) SetThreadingInfo

func (c *MsgIdItemCache) SetThreadingInfo(messageID string, rootArticle int64, isThreadRoot bool) bool

SetThreadingInfo sets threading information for a message ID in a specific group

func (*MsgIdItemCache) SetThreadingInfoForGroup

func (c *MsgIdItemCache) SetThreadingInfoForGroup(newsgroupPtr *string, messageID string, artNum int64, rootArticle int64, isThreadRoot bool) bool

SetThreadingInfoForGroup sets threading information for a message ID in a specific group

func (*MsgIdItemCache) StartCleanupRoutine

func (c *MsgIdItemCache) StartCleanupRoutine()

StartCleanupRoutine starts a background goroutine to clean expired entries

func (*MsgIdItemCache) Stats

func (c *MsgIdItemCache) Stats() (buckets, items, maxChainLength int)

Stats returns cache statistics for monitoring and debugging

func (*MsgIdItemCache) UpdateThreadRootToTmpCache

func (c *MsgIdItemCache) UpdateThreadRootToTmpCache(newsgroupPtr *string, messageID string, rootArticle int64, isThreadRoot bool) bool

UpdateThreadRootToTmpCache updates an existing cache entry with thread root information This replaces the functionality from MsgTmpCache.UpdateThreadRootToTmpCache

type MsgIdItemCachePage

type MsgIdItemCachePage struct {
	MsgIdItem *MessageIdItem
	Next      *MsgIdItemCachePage
	Prev      *MsgIdItemCachePage
	// contains filtered or unexported fields
}

type SQLite3DB

type SQLite3DB struct {
	DB *sql.DB
	// contains filtered or unexported fields
}

SQLite3DB represents a SQLite database connection pool

func NewSQLite3DB

func NewSQLite3DB(opts *SQLite3Opts, createTables bool, useShortHashLen int, mode int) (*SQLite3DB, error)

NewSQLite3DB creates a new SQLite3 database pool

func (*SQLite3DB) Close

func (p *SQLite3DB) Close() error

Close closes the database connection

type SQLite3Opts

type SQLite3Opts struct {
	// contains filtered or unexported fields
}

SQLite3Opts holds configuration for SQLite3 database

type SQLite3Pool

type SQLite3Pool interface {
	GetDB(write bool) (*sql.DB, error)
	ReturnDB(db *sql.DB)
	Close() error
}

SQLite3Pool interface for database operations (legacy, single DB only)

type SQLite3ShardedDB

type SQLite3ShardedDB struct {
	DBPools []*SQLite3DB
	// contains filtered or unexported fields
}

SQLite3ShardedDB manages multiple SQLite databases for sharding

func NewSQLite3ShardedDB

func NewSQLite3ShardedDB(config *ShardConfig, createTables bool, useShortHashLen int) (*SQLite3ShardedDB, error)

NewSQLite3ShardedDB creates a new sharded SQLite3 database system

func (*SQLite3ShardedDB) Close

func (s *SQLite3ShardedDB) Close() error

Close closes all database connections (implements both SQLite3Pool and SQLite3ShardedPool interfaces)

func (*SQLite3ShardedDB) CreateAllTables

func (s *SQLite3ShardedDB) CreateAllTables(useShortHashLen int) error

CreateAllTables creates all required tables across all databases

func (*SQLite3ShardedDB) GetShardedDB

func (s *SQLite3ShardedDB) GetShardedDB(dbIndex int, write bool) (*sql.DB, error)

GetShardedDB returns a database connection for a specific shard (implements SQLite3ShardedPool interface)

type SQLite3ShardedPool

type SQLite3ShardedPool interface {
	GetShardedDB(dbIndex int, write bool) (*sql.DB, error)
	//ReturnShardedDB(dbIndex int, db *sql.DB)
	Close() error
}

SQLite3ShardedPool interface for sharded database operations

type ShardConfig

type ShardConfig struct {
	Mode         int    // Sharding mode (0-5)
	BaseDir      string // Base directory for database files
	MaxOpenPerDB int    // Max connections per database
	Timeout      int64  // Connection timeout
}

ShardConfig defines the sharding configuration

type ThreadingInfo

type ThreadingInfo struct {
	RootArticle  int64 // Thread root article number (0 if this IS the root)
	ChildArticle int64 // Child article reference (for threading hierarchy)
	IsThreadRoot bool  // True if this article is a thread root
	ArtNum       int64 // Article number in this specific group
}

ThreadingInfo holds per-group threading information for a message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL