Documentation
¶
Index ¶
- Variables
- func GetAllSets() []string
- func GetSourceConfig(source string) (sourceConfig, bool)
- func GetTargetSet(source, ip string) string
- func IsAllowedPath(path string) bool
- func SecureStreamElementsFile(path string, config StreamingBatchConfig, callback BatchCallback) (elementCount int, err error)
- func WithTimeout(ctx context.Context, timeout time.Duration) context.Context
- type BatchCallback
- type BulkJob
- type FastOp
- type FileReadResult
- type FlushResult
- type IndexEntry
- type NFTBackendWrapper
- func (w *NFTBackendWrapper) AddElements(tableName, setName string, elements []SetElement) error
- func (w *NFTBackendWrapper) Close()
- func (w *NFTBackendWrapper) DeleteElements(tableName, setName string, elements []SetElement) error
- func (w *NFTBackendWrapper) FlushSet(tableName, setName string) error
- func (w *NFTBackendWrapper) GetSetElements(tableName, setName string) ([]string, error)
- func (w *NFTBackendWrapper) InvalidateCache()
- type NetlinkBackend
- type OpQueue
- func (q *OpQueue) EnqueueBan(setName string, element string, ttl uint32, source, reason string) error
- func (q *OpQueue) EnqueueFlush(setName string) error
- func (q *OpQueue) EnqueueReplace(setName string, elements []string, source string) error
- func (q *OpQueue) EnqueueReplaceFromFile(setName, filePath, source string, batchConfig StreamingBatchConfig) (int, error)
- func (q *OpQueue) EnqueueUnban(setName string, element string, source string) error
- func (q *OpQueue) QueueDepth() int64
- func (q *OpQueue) Start(ctx context.Context)
- func (q *OpQueue) Stats() QueueStats
- func (q *OpQueue) Stop()
- type OpType
- type PendingOp
- type QueueConfig
- type QueueStats
- type Scheduler
- func (s *Scheduler) EnqueueBulk(setName string, elements []string, source string) error
- func (s *Scheduler) EnqueueBulkFromFile(ctx context.Context, setName, filePath, source string, ...) (int, error)
- func (s *Scheduler) EnqueueFast(setName string, op *SetOp) error
- func (s *Scheduler) SchedulerStats() SchedulerStats
- func (s *Scheduler) Start(ctx context.Context)
- func (s *Scheduler) Stop()
- type SchedulerStats
- type SetBuffer
- type SetElement
- type SetOp
- type SourceIndex
- func (si *SourceIndex) Add(setName, element, source string)
- func (si *SourceIndex) ClearSet(setName string)
- func (si *SourceIndex) GetAllElements(setName string) map[string]bool
- func (si *SourceIndex) GetElementsBySource(setName, source string) []string
- func (si *SourceIndex) HasElement(setName, element string) bool
- func (si *SourceIndex) LoadFromDisk() error
- func (si *SourceIndex) ReconcileWithBackend(backend NetlinkBackend)
- func (si *SourceIndex) Remove(setName, element, source string)
- func (si *SourceIndex) RemoveAll(setName, element string)
- func (si *SourceIndex) SaveToDisk() error
- func (si *SourceIndex) ShouldDelete(setName, element string) bool
- func (si *SourceIndex) StartBackgroundSaver(ctx context.Context)
- func (si *SourceIndex) Stop()
- type StandaloneBackend
- type StreamingBatchConfig
- type StreamingFileReader
Constants ¶
This section is empty.
Variables ¶
var ( ErrQueueFull = errors.New("operation queue is full") ErrQueueStopped = errors.New("operation queue is stopped") )
Errors
Functions ¶
func GetSourceConfig ¶
GetSourceConfig returns the configuration for a source
func GetTargetSet ¶
GetTargetSet returns the appropriate set for a source and IP
func IsAllowedPath ¶
IsAllowedPath checks if a path is in allowed directories
func SecureStreamElementsFile ¶ added in v1.13.1
func SecureStreamElementsFile(path string, config StreamingBatchConfig, callback BatchCallback) (elementCount int, err error)
SecureStreamElementsFile opens a file, streams elements in batches via callback, and closes This is a convenience function that handles the full lifecycle Memory usage is O(batch_size) instead of O(n)
Types ¶
type BatchCallback ¶ added in v1.13.1
BatchCallback is called for each batch of elements Return error to abort streaming
type BulkJob ¶ added in v1.13.1
BulkJob represents a low-priority bulk operation (replace_set for feeds/geoban)
type FastOp ¶ added in v1.13.1
FastOp represents a high-priority operation (ban, unban, flush_source, flush_set)
type FileReadResult ¶
FileReadResult contains the result of reading a file
func SecureReadElementsFile ¶
func SecureReadElementsFile(path string) (*FileReadResult, error)
SecureReadElementsFile reads elements from a file with security checks Security measures: 1. Path prefix validation 2. Symlink rejection (O_NOFOLLOW + Lstat) 3. Regular file check 4. Size limit 5. Ownership check (root or nftban) 6. World-writable check 7. TOCTOU protection (inode verification) 8. Line count limit
type FlushResult ¶
type FlushResult struct {
SetName string
Applied int
PostBarrier []*SetOp // Ops to re-enqueue after flush
Err error
}
FlushResult contains the outcome of a buffer flush
type IndexEntry ¶
type IndexEntry struct {
Set string `json:"set"`
Element string `json:"element"`
Sources []string `json:"sources"`
TTL uint32 `json:"ttl,omitempty"`
Updated int64 `json:"updated"`
}
IndexEntry represents a persisted index entry
type NFTBackendWrapper ¶
type NFTBackendWrapper struct {
// contains filtered or unexported fields
}
NFTBackendWrapper wraps nftsync.NFTManager to implement NetlinkBackend
func NewNFTBackendWrapper ¶
func NewNFTBackendWrapper(nft *nftsync.NFTManager) (*NFTBackendWrapper, error)
NewNFTBackendWrapper creates a wrapper around an existing NFTManager
func (*NFTBackendWrapper) AddElements ¶
func (w *NFTBackendWrapper) AddElements(tableName, setName string, elements []SetElement) error
AddElements adds elements to a set (batched)
func (*NFTBackendWrapper) Close ¶
func (w *NFTBackendWrapper) Close()
Close is a no-op since we don't own the NFTManager
func (*NFTBackendWrapper) DeleteElements ¶
func (w *NFTBackendWrapper) DeleteElements(tableName, setName string, elements []SetElement) error
DeleteElements removes elements from a set (batched)
func (*NFTBackendWrapper) FlushSet ¶
func (w *NFTBackendWrapper) FlushSet(tableName, setName string) error
FlushSet clears all elements from a set
func (*NFTBackendWrapper) GetSetElements ¶
func (w *NFTBackendWrapper) GetSetElements(tableName, setName string) ([]string, error)
GetSetElements returns all elements in a set
func (*NFTBackendWrapper) InvalidateCache ¶
func (w *NFTBackendWrapper) InvalidateCache()
InvalidateCache clears the set cache (call after external nft changes)
type NetlinkBackend ¶
type NetlinkBackend interface {
// FlushSet clears all elements from a set
FlushSet(table, set string) error
// AddElements adds elements to a set (batched)
AddElements(table, set string, elements []SetElement) error
// DeleteElements removes elements from a set (batched)
DeleteElements(table, set string, elements []SetElement) error
// GetSetElements returns all elements in a set
GetSetElements(table, set string) ([]string, error)
}
NetlinkBackend interface for nftables operations (allows mocking in tests)
type OpQueue ¶
type OpQueue struct {
// contains filtered or unexported fields
}
OpQueue manages all set buffers with async flush workers
func NewOpQueue ¶
func NewOpQueue(backend NetlinkBackend, config QueueConfig) *OpQueue
NewOpQueue creates a new operation queue
func (*OpQueue) EnqueueBan ¶
func (q *OpQueue) EnqueueBan(setName string, element string, ttl uint32, source, reason string) error
EnqueueBan adds a ban operation (async, non-blocking)
func (*OpQueue) EnqueueFlush ¶
EnqueueFlush adds a flush_set operation (async, non-blocking)
func (*OpQueue) EnqueueReplace ¶
EnqueueReplace adds a replace_set operation (async, non-blocking)
func (*OpQueue) EnqueueReplaceFromFile ¶ added in v1.13.1
func (q *OpQueue) EnqueueReplaceFromFile(setName, filePath, source string, batchConfig StreamingBatchConfig) (int, error)
EnqueueReplaceFromFile performs a streaming replace_set operation Instead of loading all elements into memory, this: 1. Flushes the set immediately 2. Reads the file in batches and adds elements directly Memory usage is O(batch_size) instead of O(n)
This is a SYNCHRONOUS operation (unlike EnqueueReplace) because it needs to stream from the file. The file can be deleted after return.
func (*OpQueue) EnqueueUnban ¶
EnqueueUnban adds an unban operation (async, non-blocking)
func (*OpQueue) QueueDepth ¶
QueueDepth returns total pending operations - O(1), no locks
type PendingOp ¶
type PendingOp struct {
Type OpType
Element string
TTL uint32 // Seconds, 0 = permanent (infinity in comparisons)
Source string // Latest source (for logging)
Reason string // Latest reason (for logging)
CreatedAt time.Time
}
PendingOp represents a coalesced pending operation
type QueueConfig ¶
type QueueConfig struct {
// Flush policies
FlushInterval time.Duration // Time-based flush interval (default: 100ms)
FlushThreshold int // Size-based flush threshold (default: 1000 ops)
MaxBatchSize int // Max elements per netlink batch (default: 5000)
MaxQueueDepth int64 // Backpressure limit (default: 50000)
}
QueueConfig holds configuration for the operation queue
func DefaultQueueConfig ¶
func DefaultQueueConfig() QueueConfig
DefaultQueueConfig returns sensible defaults
type QueueStats ¶
type QueueStats struct {
PendingCount int64
TotalQueued uint64
TotalApplied uint64
TotalDropped uint64
LastFlushTime time.Time
}
QueueStats holds queue statistics
type Scheduler ¶ added in v1.13.1
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler manages priority-based operation scheduling with two lanes
func NewScheduler ¶ added in v1.13.1
func NewScheduler(backend NetlinkBackend, config QueueConfig) *Scheduler
NewScheduler creates a new priority scheduler
func (*Scheduler) EnqueueBulk ¶ added in v1.13.1
EnqueueBulk adds a low-priority bulk operation (replace_set)
func (*Scheduler) EnqueueBulkFromFile ¶ added in v1.13.1
func (s *Scheduler) EnqueueBulkFromFile(ctx context.Context, setName, filePath, source string, batchConfig StreamingBatchConfig) (int, error)
EnqueueBulkFromFile performs a streaming replace_set operation Instead of loading all elements into memory, this: 1. Flushes the set immediately 2. Reads the file in batches and adds elements directly Memory usage is O(batch_size) instead of O(n)
This is a SYNCHRONOUS operation because it needs to stream from the file. The file can be deleted after this function returns.
func (*Scheduler) EnqueueFast ¶ added in v1.13.1
EnqueueFast adds a high-priority operation (ban, unban, flush_source, flush_set)
func (*Scheduler) SchedulerStats ¶ added in v1.13.1
func (s *Scheduler) SchedulerStats() SchedulerStats
SchedulerStats returns scheduler statistics
type SchedulerStats ¶ added in v1.13.1
type SchedulerStats struct {
FastPending int64
BulkPending int64
TotalApplied uint64
TotalDropped uint64
LastFastFlush time.Time
LastBulkFlush time.Time
}
SchedulerStats holds scheduler statistics
type SetBuffer ¶
type SetBuffer struct {
// contains filtered or unexported fields
}
SetBuffer holds pending operations for one set with coalescing
type SetElement ¶
SetElement represents an element to add/delete from nftables
type SetOp ¶
type SetOp struct {
Type OpType
Element string // IP or CIDR (for Add/Delete)
Elements []string // For ReplaceSet
TTL uint32 // Seconds, 0 = permanent
Reason string // For logging
Source string // Origin module (login, portscan, ddos, etc.)
}
SetOp represents a single operation to be queued
type SourceIndex ¶
type SourceIndex struct {
// contains filtered or unexported fields
}
SourceIndex tracks which elements came from which source Enables flush_source without dedicated sets per module
func NewSourceIndex ¶
func NewSourceIndex(indexPath string) *SourceIndex
NewSourceIndex creates a new source index
func (*SourceIndex) Add ¶
func (si *SourceIndex) Add(setName, element, source string)
Add adds an element with its source
func (*SourceIndex) ClearSet ¶
func (si *SourceIndex) ClearSet(setName string)
ClearSet removes all entries for a set
func (*SourceIndex) GetAllElements ¶
func (si *SourceIndex) GetAllElements(setName string) map[string]bool
GetAllElements returns all elements in a set (as a map for O(1) lookup)
func (*SourceIndex) GetElementsBySource ¶
func (si *SourceIndex) GetElementsBySource(setName, source string) []string
GetElementsBySource returns all elements in a set from a specific source
func (*SourceIndex) HasElement ¶
func (si *SourceIndex) HasElement(setName, element string) bool
HasElement checks if an element exists in a set
func (*SourceIndex) LoadFromDisk ¶
func (si *SourceIndex) LoadFromDisk() error
LoadFromDisk restores index on daemon startup
func (*SourceIndex) ReconcileWithBackend ¶
func (si *SourceIndex) ReconcileWithBackend(backend NetlinkBackend)
ReconcileWithBackend syncs index with actual nft state Uses O(n) map lookups instead of O(n²) nested loops
func (*SourceIndex) Remove ¶
func (si *SourceIndex) Remove(setName, element, source string)
Remove removes a source from an element
func (*SourceIndex) RemoveAll ¶
func (si *SourceIndex) RemoveAll(setName, element string)
RemoveAll removes all sources from an element
func (*SourceIndex) SaveToDisk ¶
func (si *SourceIndex) SaveToDisk() error
SaveToDisk persists index (called after netlink apply, not on enqueue)
func (*SourceIndex) ShouldDelete ¶
func (si *SourceIndex) ShouldDelete(setName, element string) bool
ShouldDelete returns true if element has no remaining sources
func (*SourceIndex) StartBackgroundSaver ¶
func (si *SourceIndex) StartBackgroundSaver(ctx context.Context)
StartBackgroundSaver runs periodic persistence
type StandaloneBackend ¶
type StandaloneBackend struct {
*NFTBackendWrapper
// contains filtered or unexported fields
}
StandaloneBackend creates its own NFTManager connection Use this only for testing or when no existing Backend is available
func NewStandaloneBackend ¶
func NewStandaloneBackend() (*StandaloneBackend, error)
NewStandaloneBackend creates a backend with its own netlink connection
func (*StandaloneBackend) Close ¶
func (s *StandaloneBackend) Close()
Close closes the owned NFTManager connection
type StreamingBatchConfig ¶ added in v1.13.1
type StreamingBatchConfig struct {
BatchSize int // Elements per batch (default: 10000)
}
StreamingBatchConfig configures the streaming file reader
func DefaultStreamingBatchConfig ¶ added in v1.13.1
func DefaultStreamingBatchConfig() StreamingBatchConfig
DefaultStreamingBatchConfig returns sensible defaults
type StreamingFileReader ¶ added in v1.13.1
type StreamingFileReader struct {
// contains filtered or unexported fields
}
StreamingFileReader provides streaming access to element files It maintains an open file handle and reads elements in batches
func SecureOpenElementsFile ¶ added in v1.13.1
func SecureOpenElementsFile(path string, config StreamingBatchConfig) (*StreamingFileReader, error)
SecureOpenElementsFile opens a file with security checks for streaming Returns a reader that must be closed after use Security measures are identical to SecureReadElementsFile
func (*StreamingFileReader) Close ¶ added in v1.13.1
func (r *StreamingFileReader) Close() error
Close closes the underlying file
func (*StreamingFileReader) ReadNextBatch ¶ added in v1.13.1
func (r *StreamingFileReader) ReadNextBatch() ([]string, error)
ReadNextBatch reads the next batch of elements Returns the batch (may be smaller than BatchSize at end of file) Returns nil, nil when EOF is reached Returns nil, error on read error or limit exceeded
func (*StreamingFileReader) Stats ¶ added in v1.13.1
func (r *StreamingFileReader) Stats() (lineCount, elementCount int)
Stats returns reading statistics
func (*StreamingFileReader) StreamElements ¶ added in v1.13.1
func (r *StreamingFileReader) StreamElements(callback BatchCallback) error
StreamElements reads elements in batches and calls the callback for each This is the main streaming API - memory stays O(batch_size)