opqueue

package
v1.13.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2026 License: MPL-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrQueueFull    = errors.New("operation queue is full")
	ErrQueueStopped = errors.New("operation queue is stopped")
)

Errors

Functions

func GetAllSets

func GetAllSets() []string

GetAllSets returns all known set names

func GetSourceConfig

func GetSourceConfig(source string) (sourceConfig, bool)

GetSourceConfig returns the configuration for a source

func GetTargetSet

func GetTargetSet(source, ip string) string

GetTargetSet returns the appropriate set for a source and IP

func IsAllowedPath

func IsAllowedPath(path string) bool

IsAllowedPath checks if a path is in allowed directories

func SecureStreamElementsFile added in v1.13.1

func SecureStreamElementsFile(path string, config StreamingBatchConfig, callback BatchCallback) (elementCount int, err error)

SecureStreamElementsFile opens a file, streams elements in batches via callback, and closes This is a convenience function that handles the full lifecycle Memory usage is O(batch_size) instead of O(n)

func WithTimeout

func WithTimeout(ctx context.Context, timeout time.Duration) context.Context

WithTimeout returns a context with timeout for operations

Types

type BatchCallback added in v1.13.1

type BatchCallback func(batch []string, batchNum int) error

BatchCallback is called for each batch of elements Return error to abort streaming

type BulkJob added in v1.13.1

type BulkJob struct {
	SetName  string
	Elements []string
	Source   string
}

BulkJob represents a low-priority bulk operation (replace_set for feeds/geoban)

type FastOp added in v1.13.1

type FastOp struct {
	SetName string
	Op      *SetOp
}

FastOp represents a high-priority operation (ban, unban, flush_source, flush_set)

type FileReadResult

type FileReadResult struct {
	Elements []string
	Count    int
}

FileReadResult contains the result of reading a file

func SecureReadElementsFile

func SecureReadElementsFile(path string) (*FileReadResult, error)

SecureReadElementsFile reads elements from a file with security checks Security measures: 1. Path prefix validation 2. Symlink rejection (O_NOFOLLOW + Lstat) 3. Regular file check 4. Size limit 5. Ownership check (root or nftban) 6. World-writable check 7. TOCTOU protection (inode verification) 8. Line count limit

type FlushResult

type FlushResult struct {
	SetName     string
	Applied     int
	PostBarrier []*SetOp // Ops to re-enqueue after flush
	Err         error
}

FlushResult contains the outcome of a buffer flush

type IndexEntry

type IndexEntry struct {
	Set     string   `json:"set"`
	Element string   `json:"element"`
	Sources []string `json:"sources"`
	TTL     uint32   `json:"ttl,omitempty"`
	Updated int64    `json:"updated"`
}

IndexEntry represents a persisted index entry

type NFTBackendWrapper

type NFTBackendWrapper struct {
	// contains filtered or unexported fields
}

NFTBackendWrapper wraps nftsync.NFTManager to implement NetlinkBackend

func NewNFTBackendWrapper

func NewNFTBackendWrapper(nft *nftsync.NFTManager) (*NFTBackendWrapper, error)

NewNFTBackendWrapper creates a wrapper around an existing NFTManager

func (*NFTBackendWrapper) AddElements

func (w *NFTBackendWrapper) AddElements(tableName, setName string, elements []SetElement) error

AddElements adds elements to a set (batched)

func (*NFTBackendWrapper) Close

func (w *NFTBackendWrapper) Close()

Close is a no-op since we don't own the NFTManager

func (*NFTBackendWrapper) DeleteElements

func (w *NFTBackendWrapper) DeleteElements(tableName, setName string, elements []SetElement) error

DeleteElements removes elements from a set (batched)

func (*NFTBackendWrapper) FlushSet

func (w *NFTBackendWrapper) FlushSet(tableName, setName string) error

FlushSet clears all elements from a set

func (*NFTBackendWrapper) GetSetElements

func (w *NFTBackendWrapper) GetSetElements(tableName, setName string) ([]string, error)

GetSetElements returns all elements in a set

func (*NFTBackendWrapper) InvalidateCache

func (w *NFTBackendWrapper) InvalidateCache()

InvalidateCache clears the set cache (call after external nft changes)

type NetlinkBackend

type NetlinkBackend interface {
	// FlushSet clears all elements from a set
	FlushSet(table, set string) error

	// AddElements adds elements to a set (batched)
	AddElements(table, set string, elements []SetElement) error

	// DeleteElements removes elements from a set (batched)
	DeleteElements(table, set string, elements []SetElement) error

	// GetSetElements returns all elements in a set
	GetSetElements(table, set string) ([]string, error)
}

NetlinkBackend interface for nftables operations (allows mocking in tests)

type OpQueue

type OpQueue struct {
	// contains filtered or unexported fields
}

OpQueue manages all set buffers with async flush workers

func NewOpQueue

func NewOpQueue(backend NetlinkBackend, config QueueConfig) *OpQueue

NewOpQueue creates a new operation queue

func (*OpQueue) EnqueueBan

func (q *OpQueue) EnqueueBan(setName string, element string, ttl uint32, source, reason string) error

EnqueueBan adds a ban operation (async, non-blocking)

func (*OpQueue) EnqueueFlush

func (q *OpQueue) EnqueueFlush(setName string) error

EnqueueFlush adds a flush_set operation (async, non-blocking)

func (*OpQueue) EnqueueReplace

func (q *OpQueue) EnqueueReplace(setName string, elements []string, source string) error

EnqueueReplace adds a replace_set operation (async, non-blocking)

func (*OpQueue) EnqueueReplaceFromFile added in v1.13.1

func (q *OpQueue) EnqueueReplaceFromFile(setName, filePath, source string, batchConfig StreamingBatchConfig) (int, error)

EnqueueReplaceFromFile performs a streaming replace_set operation Instead of loading all elements into memory, this: 1. Flushes the set immediately 2. Reads the file in batches and adds elements directly Memory usage is O(batch_size) instead of O(n)

This is a SYNCHRONOUS operation (unlike EnqueueReplace) because it needs to stream from the file. The file can be deleted after return.

func (*OpQueue) EnqueueUnban

func (q *OpQueue) EnqueueUnban(setName string, element string, source string) error

EnqueueUnban adds an unban operation (async, non-blocking)

func (*OpQueue) QueueDepth

func (q *OpQueue) QueueDepth() int64

QueueDepth returns total pending operations - O(1), no locks

func (*OpQueue) Start

func (q *OpQueue) Start(ctx context.Context)

Start begins the async flush worker

func (*OpQueue) Stats

func (q *OpQueue) Stats() QueueStats

Stats returns queue statistics

func (*OpQueue) Stop

func (q *OpQueue) Stop()

Stop signals the queue to stop and drains remaining operations

type OpType

type OpType int

OpType represents the type of operation

const (
	OpAdd OpType = iota
	OpDelete
	OpReplaceSet
	OpFlushSet
)

func (OpType) String

func (t OpType) String() string

type PendingOp

type PendingOp struct {
	Type      OpType
	Element   string
	TTL       uint32 // Seconds, 0 = permanent (infinity in comparisons)
	Source    string // Latest source (for logging)
	Reason    string // Latest reason (for logging)
	CreatedAt time.Time
}

PendingOp represents a coalesced pending operation

type QueueConfig

type QueueConfig struct {
	// Flush policies
	FlushInterval  time.Duration // Time-based flush interval (default: 100ms)
	FlushThreshold int           // Size-based flush threshold (default: 1000 ops)
	MaxBatchSize   int           // Max elements per netlink batch (default: 5000)
	MaxQueueDepth  int64         // Backpressure limit (default: 50000)
}

QueueConfig holds configuration for the operation queue

func DefaultQueueConfig

func DefaultQueueConfig() QueueConfig

DefaultQueueConfig returns sensible defaults

type QueueStats

type QueueStats struct {
	PendingCount  int64
	TotalQueued   uint64
	TotalApplied  uint64
	TotalDropped  uint64
	LastFlushTime time.Time
}

QueueStats holds queue statistics

type Scheduler added in v1.13.1

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler manages priority-based operation scheduling with two lanes

func NewScheduler added in v1.13.1

func NewScheduler(backend NetlinkBackend, config QueueConfig) *Scheduler

NewScheduler creates a new priority scheduler

func (*Scheduler) EnqueueBulk added in v1.13.1

func (s *Scheduler) EnqueueBulk(setName string, elements []string, source string) error

EnqueueBulk adds a low-priority bulk operation (replace_set)

func (*Scheduler) EnqueueBulkFromFile added in v1.13.1

func (s *Scheduler) EnqueueBulkFromFile(ctx context.Context, setName, filePath, source string, batchConfig StreamingBatchConfig) (int, error)

EnqueueBulkFromFile performs a streaming replace_set operation Instead of loading all elements into memory, this: 1. Flushes the set immediately 2. Reads the file in batches and adds elements directly Memory usage is O(batch_size) instead of O(n)

This is a SYNCHRONOUS operation because it needs to stream from the file. The file can be deleted after this function returns.

func (*Scheduler) EnqueueFast added in v1.13.1

func (s *Scheduler) EnqueueFast(setName string, op *SetOp) error

EnqueueFast adds a high-priority operation (ban, unban, flush_source, flush_set)

func (*Scheduler) SchedulerStats added in v1.13.1

func (s *Scheduler) SchedulerStats() SchedulerStats

SchedulerStats returns scheduler statistics

func (*Scheduler) Start added in v1.13.1

func (s *Scheduler) Start(ctx context.Context)

Start begins the scheduler workers

func (*Scheduler) Stop added in v1.13.1

func (s *Scheduler) Stop()

Stop signals the scheduler to stop and waits for workers to finish

type SchedulerStats added in v1.13.1

type SchedulerStats struct {
	FastPending   int64
	BulkPending   int64
	TotalApplied  uint64
	TotalDropped  uint64
	LastFastFlush time.Time
	LastBulkFlush time.Time
}

SchedulerStats holds scheduler statistics

type SetBuffer

type SetBuffer struct {
	// contains filtered or unexported fields
}

SetBuffer holds pending operations for one set with coalescing

type SetElement

type SetElement struct {
	Value  string
	TTL    uint32
	IsIPv6 bool
}

SetElement represents an element to add/delete from nftables

type SetOp

type SetOp struct {
	Type     OpType
	Element  string   // IP or CIDR (for Add/Delete)
	Elements []string // For ReplaceSet
	TTL      uint32   // Seconds, 0 = permanent
	Reason   string   // For logging
	Source   string   // Origin module (login, portscan, ddos, etc.)
}

SetOp represents a single operation to be queued

type SourceIndex

type SourceIndex struct {
	// contains filtered or unexported fields
}

SourceIndex tracks which elements came from which source Enables flush_source without dedicated sets per module

func NewSourceIndex

func NewSourceIndex(indexPath string) *SourceIndex

NewSourceIndex creates a new source index

func (*SourceIndex) Add

func (si *SourceIndex) Add(setName, element, source string)

Add adds an element with its source

func (*SourceIndex) ClearSet

func (si *SourceIndex) ClearSet(setName string)

ClearSet removes all entries for a set

func (*SourceIndex) GetAllElements

func (si *SourceIndex) GetAllElements(setName string) map[string]bool

GetAllElements returns all elements in a set (as a map for O(1) lookup)

func (*SourceIndex) GetElementsBySource

func (si *SourceIndex) GetElementsBySource(setName, source string) []string

GetElementsBySource returns all elements in a set from a specific source

func (*SourceIndex) HasElement

func (si *SourceIndex) HasElement(setName, element string) bool

HasElement checks if an element exists in a set

func (*SourceIndex) LoadFromDisk

func (si *SourceIndex) LoadFromDisk() error

LoadFromDisk restores index on daemon startup

func (*SourceIndex) ReconcileWithBackend

func (si *SourceIndex) ReconcileWithBackend(backend NetlinkBackend)

ReconcileWithBackend syncs index with actual nft state Uses O(n) map lookups instead of O(n²) nested loops

func (*SourceIndex) Remove

func (si *SourceIndex) Remove(setName, element, source string)

Remove removes a source from an element

func (*SourceIndex) RemoveAll

func (si *SourceIndex) RemoveAll(setName, element string)

RemoveAll removes all sources from an element

func (*SourceIndex) SaveToDisk

func (si *SourceIndex) SaveToDisk() error

SaveToDisk persists index (called after netlink apply, not on enqueue)

func (*SourceIndex) ShouldDelete

func (si *SourceIndex) ShouldDelete(setName, element string) bool

ShouldDelete returns true if element has no remaining sources

func (*SourceIndex) StartBackgroundSaver

func (si *SourceIndex) StartBackgroundSaver(ctx context.Context)

StartBackgroundSaver runs periodic persistence

func (*SourceIndex) Stop

func (si *SourceIndex) Stop()

Stop signals the background saver to stop

type StandaloneBackend

type StandaloneBackend struct {
	*NFTBackendWrapper
	// contains filtered or unexported fields
}

StandaloneBackend creates its own NFTManager connection Use this only for testing or when no existing Backend is available

func NewStandaloneBackend

func NewStandaloneBackend() (*StandaloneBackend, error)

NewStandaloneBackend creates a backend with its own netlink connection

func (*StandaloneBackend) Close

func (s *StandaloneBackend) Close()

Close closes the owned NFTManager connection

type StreamingBatchConfig added in v1.13.1

type StreamingBatchConfig struct {
	BatchSize int // Elements per batch (default: 10000)
}

StreamingBatchConfig configures the streaming file reader

func DefaultStreamingBatchConfig added in v1.13.1

func DefaultStreamingBatchConfig() StreamingBatchConfig

DefaultStreamingBatchConfig returns sensible defaults

type StreamingFileReader added in v1.13.1

type StreamingFileReader struct {
	// contains filtered or unexported fields
}

StreamingFileReader provides streaming access to element files It maintains an open file handle and reads elements in batches

func SecureOpenElementsFile added in v1.13.1

func SecureOpenElementsFile(path string, config StreamingBatchConfig) (*StreamingFileReader, error)

SecureOpenElementsFile opens a file with security checks for streaming Returns a reader that must be closed after use Security measures are identical to SecureReadElementsFile

func (*StreamingFileReader) Close added in v1.13.1

func (r *StreamingFileReader) Close() error

Close closes the underlying file

func (*StreamingFileReader) ReadNextBatch added in v1.13.1

func (r *StreamingFileReader) ReadNextBatch() ([]string, error)

ReadNextBatch reads the next batch of elements Returns the batch (may be smaller than BatchSize at end of file) Returns nil, nil when EOF is reached Returns nil, error on read error or limit exceeded

func (*StreamingFileReader) Stats added in v1.13.1

func (r *StreamingFileReader) Stats() (lineCount, elementCount int)

Stats returns reading statistics

func (*StreamingFileReader) StreamElements added in v1.13.1

func (r *StreamingFileReader) StreamElements(callback BatchCallback) error

StreamElements reads elements in batches and calls the callback for each This is the main streaming API - memory stays O(batch_size)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL