opqueue

package
v1.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2026 License: MPL-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrQueueFull    = errors.New("operation queue is full")
	ErrQueueStopped = errors.New("operation queue is stopped")
)

Errors

Functions

func GetAllSets

func GetAllSets() []string

GetAllSets returns all known set names

func GetSourceConfig

func GetSourceConfig(source string) (sourceConfig, bool)

GetSourceConfig returns the configuration for a source

func GetTargetSet

func GetTargetSet(source, ip string) string

GetTargetSet returns the appropriate set for a source and IP

func IsAllowedPath

func IsAllowedPath(path string) bool

IsAllowedPath checks if a path is in allowed directories

func WithTimeout

func WithTimeout(ctx context.Context, timeout time.Duration) context.Context

WithTimeout returns a context with timeout for operations

Types

type FileReadResult

type FileReadResult struct {
	Elements []string
	Count    int
}

FileReadResult contains the result of reading a file

func SecureReadElementsFile

func SecureReadElementsFile(path string) (*FileReadResult, error)

SecureReadElementsFile reads elements from a file with security checks Security measures: 1. Path prefix validation 2. Symlink rejection (O_NOFOLLOW + Lstat) 3. Regular file check 4. Size limit 5. Ownership check (root or nftban) 6. World-writable check 7. TOCTOU protection (inode verification) 8. Line count limit

type FlushResult

type FlushResult struct {
	SetName     string
	Applied     int
	PostBarrier []*SetOp // Ops to re-enqueue after flush
	Err         error
}

FlushResult contains the outcome of a buffer flush

type IndexEntry

type IndexEntry struct {
	Set     string   `json:"set"`
	Element string   `json:"element"`
	Sources []string `json:"sources"`
	TTL     uint32   `json:"ttl,omitempty"`
	Updated int64    `json:"updated"`
}

IndexEntry represents a persisted index entry

type NFTBackendWrapper

type NFTBackendWrapper struct {
	// contains filtered or unexported fields
}

NFTBackendWrapper wraps nftsync.NFTManager to implement NetlinkBackend

func NewNFTBackendWrapper

func NewNFTBackendWrapper(nft *nftsync.NFTManager) (*NFTBackendWrapper, error)

NewNFTBackendWrapper creates a wrapper around an existing NFTManager

func (*NFTBackendWrapper) AddElements

func (w *NFTBackendWrapper) AddElements(tableName, setName string, elements []SetElement) error

AddElements adds elements to a set (batched)

func (*NFTBackendWrapper) Close

func (w *NFTBackendWrapper) Close()

Close is a no-op since we don't own the NFTManager

func (*NFTBackendWrapper) DeleteElements

func (w *NFTBackendWrapper) DeleteElements(tableName, setName string, elements []SetElement) error

DeleteElements removes elements from a set (batched)

func (*NFTBackendWrapper) FlushSet

func (w *NFTBackendWrapper) FlushSet(tableName, setName string) error

FlushSet clears all elements from a set

func (*NFTBackendWrapper) GetSetElements

func (w *NFTBackendWrapper) GetSetElements(tableName, setName string) ([]string, error)

GetSetElements returns all elements in a set

func (*NFTBackendWrapper) InvalidateCache

func (w *NFTBackendWrapper) InvalidateCache()

InvalidateCache clears the set cache (call after external nft changes)

type NetlinkBackend

type NetlinkBackend interface {
	// FlushSet clears all elements from a set
	FlushSet(table, set string) error

	// AddElements adds elements to a set (batched)
	AddElements(table, set string, elements []SetElement) error

	// DeleteElements removes elements from a set (batched)
	DeleteElements(table, set string, elements []SetElement) error

	// GetSetElements returns all elements in a set
	GetSetElements(table, set string) ([]string, error)
}

NetlinkBackend interface for nftables operations (allows mocking in tests)

type OpQueue

type OpQueue struct {
	// contains filtered or unexported fields
}

OpQueue manages all set buffers with async flush workers

func NewOpQueue

func NewOpQueue(backend NetlinkBackend, config QueueConfig) *OpQueue

NewOpQueue creates a new operation queue

func (*OpQueue) EnqueueBan

func (q *OpQueue) EnqueueBan(setName string, element string, ttl uint32, source, reason string) error

EnqueueBan adds a ban operation (async, non-blocking)

func (*OpQueue) EnqueueFlush

func (q *OpQueue) EnqueueFlush(setName string) error

EnqueueFlush adds a flush_set operation (async, non-blocking)

func (*OpQueue) EnqueueReplace

func (q *OpQueue) EnqueueReplace(setName string, elements []string, source string) error

EnqueueReplace adds a replace_set operation (async, non-blocking)

func (*OpQueue) EnqueueUnban

func (q *OpQueue) EnqueueUnban(setName string, element string, source string) error

EnqueueUnban adds an unban operation (async, non-blocking)

func (*OpQueue) QueueDepth

func (q *OpQueue) QueueDepth() int64

QueueDepth returns total pending operations - O(1), no locks

func (*OpQueue) Start

func (q *OpQueue) Start(ctx context.Context)

Start begins the async flush worker

func (*OpQueue) Stats

func (q *OpQueue) Stats() QueueStats

Stats returns queue statistics

func (*OpQueue) Stop

func (q *OpQueue) Stop()

Stop signals the queue to stop and drains remaining operations

type OpType

type OpType int

OpType represents the type of operation

const (
	OpAdd OpType = iota
	OpDelete
	OpReplaceSet
	OpFlushSet
)

func (OpType) String

func (t OpType) String() string

type PendingOp

type PendingOp struct {
	Type      OpType
	Element   string
	TTL       uint32 // Seconds, 0 = permanent (infinity in comparisons)
	Source    string // Latest source (for logging)
	Reason    string // Latest reason (for logging)
	CreatedAt time.Time
}

PendingOp represents a coalesced pending operation

type QueueConfig

type QueueConfig struct {
	// Flush policies
	FlushInterval  time.Duration // Time-based flush interval (default: 100ms)
	FlushThreshold int           // Size-based flush threshold (default: 1000 ops)
	MaxBatchSize   int           // Max elements per netlink batch (default: 5000)
	MaxQueueDepth  int64         // Backpressure limit (default: 50000)
}

QueueConfig holds configuration for the operation queue

func DefaultQueueConfig

func DefaultQueueConfig() QueueConfig

DefaultQueueConfig returns sensible defaults

type QueueStats

type QueueStats struct {
	PendingCount  int64
	TotalQueued   uint64
	TotalApplied  uint64
	TotalDropped  uint64
	LastFlushTime time.Time
}

QueueStats holds queue statistics

type SetBuffer

type SetBuffer struct {
	// contains filtered or unexported fields
}

SetBuffer holds pending operations for one set with coalescing

type SetElement

type SetElement struct {
	Value  string
	TTL    uint32
	IsIPv6 bool
}

SetElement represents an element to add/delete from nftables

type SetOp

type SetOp struct {
	Type     OpType
	Element  string   // IP or CIDR (for Add/Delete)
	Elements []string // For ReplaceSet
	TTL      uint32   // Seconds, 0 = permanent
	Reason   string   // For logging
	Source   string   // Origin module (login, portscan, ddos, etc.)
}

SetOp represents a single operation to be queued

type SourceIndex

type SourceIndex struct {
	// contains filtered or unexported fields
}

SourceIndex tracks which elements came from which source Enables flush_source without dedicated sets per module

func NewSourceIndex

func NewSourceIndex(indexPath string) *SourceIndex

NewSourceIndex creates a new source index

func (*SourceIndex) Add

func (si *SourceIndex) Add(setName, element, source string)

Add adds an element with its source

func (*SourceIndex) ClearSet

func (si *SourceIndex) ClearSet(setName string)

ClearSet removes all entries for a set

func (*SourceIndex) GetAllElements

func (si *SourceIndex) GetAllElements(setName string) map[string]bool

GetAllElements returns all elements in a set (as a map for O(1) lookup)

func (*SourceIndex) GetElementsBySource

func (si *SourceIndex) GetElementsBySource(setName, source string) []string

GetElementsBySource returns all elements in a set from a specific source

func (*SourceIndex) HasElement

func (si *SourceIndex) HasElement(setName, element string) bool

HasElement checks if an element exists in a set

func (*SourceIndex) LoadFromDisk

func (si *SourceIndex) LoadFromDisk() error

LoadFromDisk restores index on daemon startup

func (*SourceIndex) ReconcileWithBackend

func (si *SourceIndex) ReconcileWithBackend(backend NetlinkBackend)

ReconcileWithBackend syncs index with actual nft state Uses O(n) map lookups instead of O(n²) nested loops

func (*SourceIndex) Remove

func (si *SourceIndex) Remove(setName, element, source string)

Remove removes a source from an element

func (*SourceIndex) RemoveAll

func (si *SourceIndex) RemoveAll(setName, element string)

RemoveAll removes all sources from an element

func (*SourceIndex) SaveToDisk

func (si *SourceIndex) SaveToDisk() error

SaveToDisk persists index (called after netlink apply, not on enqueue)

func (*SourceIndex) ShouldDelete

func (si *SourceIndex) ShouldDelete(setName, element string) bool

ShouldDelete returns true if element has no remaining sources

func (*SourceIndex) StartBackgroundSaver

func (si *SourceIndex) StartBackgroundSaver(ctx context.Context)

StartBackgroundSaver runs periodic persistence

func (*SourceIndex) Stop

func (si *SourceIndex) Stop()

Stop signals the background saver to stop

type StandaloneBackend

type StandaloneBackend struct {
	*NFTBackendWrapper
	// contains filtered or unexported fields
}

StandaloneBackend creates its own NFTManager connection Use this only for testing or when no existing Backend is available

func NewStandaloneBackend

func NewStandaloneBackend() (*StandaloneBackend, error)

NewStandaloneBackend creates a backend with its own netlink connection

func (*StandaloneBackend) Close

func (s *StandaloneBackend) Close()

Close closes the owned NFTManager connection

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL