Documentation
¶
Overview ¶
Package pool provides unified high-performance object pooling for Nebula This is the SINGLE pool implementation that replaces all other pool packages
Index ¶
- Variables
- func GenerateID(prefix string) string
- func GetBatchChannel() chan []*Record
- func GetByteSlice() []byte
- func GetCSVRow(capacity int) []string
- func GetColumnName(index int) string
- func GetErrorChannel() chan error
- func GetErrorSlice(expectedSize int) []error
- func GetFieldName(prefix string, index int) string
- func GetGlobalStats() map[string]Stats
- func GetInternStats() (size, hits, misses int64)
- func GetMap() map[string]interface{}
- func GetRecordChannel(size int) chan *Record
- func GetRecordID(index int) string
- func GetStringBatch(capacity int) [][]string
- func GetStringSlice() []string
- func InternBytes(b []byte) string
- func InternString(s string) string
- func PreInternFieldNames(names []string)
- func PutBatchChannel(ch chan []*Record)
- func PutBatchSlice(batch []*Record)
- func PutByteSlice(b []byte)
- func PutCSVRow(slice []string)
- func PutErrorChannel(ch chan error)
- func PutErrorSlice(errs []error)
- func PutMap(m map[string]interface{})
- func PutRecord(record *Record)
- func PutRecordChannel(ch chan *Record)
- func PutStringBatch(batch [][]string)
- func PutStringSlice(s []string)
- func PutTypedRecord(tr *TypedRecord)
- type Arena
- type ArenaPool
- type BufferPool
- type ChannelPool
- type ErrorSlicePool
- type Pool
- type Record
- func GetBatchSlice(capacity int) []*Record
- func GetRecord() *Record
- func NewCDCRecord(database, table, operation string, before, after map[string]interface{}) *Record
- func NewRecord(source string, data map[string]interface{}) *Record
- func NewRecordFromPool(source string) *Record
- func NewStreamingRecord(streamID string, offset int64, data map[string]interface{}) *Record
- func (r *Record) GetCDCBefore() map[string]interface{}
- func (r *Record) GetCDCDatabase() string
- func (r *Record) GetCDCOperation() string
- func (r *Record) GetCDCPosition() string
- func (r *Record) GetCDCTable() string
- func (r *Record) GetCDCTransaction() string
- func (r *Record) GetData(key string) (interface{}, bool)
- func (r *Record) GetMetadata(key string) (interface{}, bool)
- func (r *Record) GetOffset() int64
- func (r *Record) GetStreamID() string
- func (r *Record) GetTimestamp() time.Time
- func (r *Record) IsCDCRecord() bool
- func (r *Record) IsDelete() bool
- func (r *Record) IsInsert() bool
- func (r *Record) IsStreamingRecord() bool
- func (r *Record) IsUpdate() bool
- func (r *Record) Release()
- func (r *Record) SetCDCBefore(before map[string]interface{})
- func (r *Record) SetCDCDatabase(database string)
- func (r *Record) SetCDCOperation(operation string)
- func (r *Record) SetCDCPosition(position string)
- func (r *Record) SetCDCTable(table string)
- func (r *Record) SetCDCTransaction(txID string)
- func (r *Record) SetData(key string, value interface{})
- func (r *Record) SetMetadata(key string, value interface{})
- func (r *Record) SetOffset(offset int64)
- func (r *Record) SetStreamID(streamID string)
- func (r *Record) SetTimestamp(t time.Time)
- type RecordMetadata
- type Stats
- type StringInternPool
- type TypedRecord
- func (tr *TypedRecord) GetBool(key string) (bool, bool)
- func (tr *TypedRecord) GetBytes(key string) ([]byte, bool)
- func (tr *TypedRecord) GetFloat(key string) (float64, bool)
- func (tr *TypedRecord) GetInt(key string) (int64, bool)
- func (tr *TypedRecord) GetString(key string) (string, bool)
- func (tr *TypedRecord) GetTime(key string) (time.Time, bool)
- func (tr *TypedRecord) Release()
- func (tr *TypedRecord) SetBool(key string, value bool)
- func (tr *TypedRecord) SetBytes(key string, value []byte)
- func (tr *TypedRecord) SetFloat(key string, value float64)
- func (tr *TypedRecord) SetInt(key string, value int64)
- func (tr *TypedRecord) SetString(key, value string)
- func (tr *TypedRecord) SetTime(key string, value time.Time)
Constants ¶
This section is empty.
Variables ¶
var ( // RecordChannelPool for single record channels RecordChannelPool = NewChannelPool[*Record](10000) // BatchChannelPool for batch channels BatchChannelPool = NewChannelPool[[]*Record](100) // SmallRecordChannelPool for smaller buffers SmallRecordChannelPool = NewChannelPool[*Record](100) // ErrorChannelPool for error channels ErrorChannelPool = NewChannelPool[error](10) )
Global channel pools for common types
var ( // RecordPool provides optimized pooling for Record objects RecordPool = New( func() *Record { return &Record{ Data: make(map[string]interface{}, 16), } }, func(r *Record) { r.ID = "" r.Schema = nil r.RawData = nil for k := range r.Data { delete(r.Data, k) } if r.Metadata.Custom != nil { for k := range r.Metadata.Custom { delete(r.Metadata.Custom, k) } } if r.Metadata.Before != nil { for k := range r.Metadata.Before { delete(r.Metadata.Before, k) } } r.Metadata = RecordMetadata{} }, ) // MapPool for map[string]interface{} objects MapPool = New( func() map[string]interface{} { return make(map[string]interface{}, 16) }, func(m map[string]interface{}) { for k := range m { delete(m, k) } }, ) // StringSlicePool for []string objects StringSlicePool = New( func() []string { return make([]string, 0, 32) }, func(s []string) { for i := range s { s[i] = "" } s = s[:0] }, ) // ByteSlicePool for []byte objects ByteSlicePool = New( func() []byte { return make([]byte, 0, 1024) }, func(b []byte) { b = b[:0] }, ) // IDBufferPool for ID generation IDBufferPool = New( func() []byte { return make([]byte, 0, 64) }, func(b []byte) { b = b[:0] }, ) // BatchSlicePool for []*Record objects (used in pipeline batching) BatchSlicePool = New( func() []*Record { return make([]*Record, 0, 1000) }, func(s []*Record) { for i := range s { s[i] = nil } s = s[:0] }, ) )
Global unified pools for the entire system
var ( // GlobalBufferPool for byte buffer pooling GlobalBufferPool = NewBufferPool() // GlobalArenaPool for arena allocation GlobalArenaPool = NewArenaPool(16*1024*1024, 10) // 16MB chunks, 10 arenas )
Global pools for advanced use cases
var ( // Pool for [][]string batches (CSV rows) StringBatchPool = &sync.Pool{ New: func() interface{} { return make([][]string, 0, 5000) }, } )
String batch pools for CSV and similar operations
Functions ¶
func GenerateID ¶
GenerateID generates a unique ID using pooled buffers
func GetBatchChannel ¶
func GetBatchChannel() chan []*Record
GetBatchChannel gets a batch channel from the pool
func GetColumnName ¶
GetColumnName returns an interned column name Optimized for CSV and similar columnar data
func GetErrorChannel ¶
func GetErrorChannel() chan error
GetErrorChannel gets an error channel from the pool
func GetErrorSlice ¶
GetErrorSlice gets an error slice from the pool based on expected size
func GetFieldName ¶
GetFieldName returns an interned field name for common patterns This avoids allocations for frequently used field names
func GetGlobalStats ¶
GetGlobalStats returns statistics for all global pools
func GetInternStats ¶
func GetInternStats() (size, hits, misses int64)
GetInternStats returns global intern pool statistics
func GetRecordChannel ¶
GetRecordChannel gets a record channel from the pool
func GetRecordID ¶
GetRecordID returns an interned record ID Optimized for sequential access patterns
func GetStringBatch ¶
GetStringBatch gets a [][]string from the pool
func GetStringSlice ¶
func GetStringSlice() []string
GetStringSlice gets a string slice from the global pool
func InternBytes ¶
InternBytes interns a byte slice as a string using the global pool
func InternString ¶
InternString interns a string using the global pool
func PreInternFieldNames ¶
func PreInternFieldNames(names []string)
PreInternFieldNames pre-interns a batch of field names Useful for known schemas or repeated patterns
func PutBatchChannel ¶
func PutBatchChannel(ch chan []*Record)
PutBatchChannel returns a batch channel to the pool
func PutBatchSlice ¶
func PutBatchSlice(batch []*Record)
PutBatchSlice returns a batch slice to the global pool
func PutByteSlice ¶
func PutByteSlice(b []byte)
PutByteSlice returns a byte slice to the global pool
func PutErrorChannel ¶
func PutErrorChannel(ch chan error)
PutErrorChannel returns an error channel to the pool
func PutErrorSlice ¶
func PutErrorSlice(errs []error)
PutErrorSlice returns an error slice to the appropriate pool
func PutRecordChannel ¶
func PutRecordChannel(ch chan *Record)
PutRecordChannel returns a record channel to the pool
func PutStringBatch ¶
func PutStringBatch(batch [][]string)
PutStringBatch returns a [][]string to the pool
func PutStringSlice ¶
func PutStringSlice(s []string)
PutStringSlice returns a string slice to the global pool
func PutTypedRecord ¶
func PutTypedRecord(tr *TypedRecord)
PutTypedRecord returns a typed record to the pool
Types ¶
type Arena ¶
type Arena struct {
// contains filtered or unexported fields
}
Arena represents a memory arena
type ArenaPool ¶
type ArenaPool struct {
// contains filtered or unexported fields
}
ArenaPool provides arena-style allocation
func NewArenaPool ¶
NewArenaPool creates a new arena pool
type BufferPool ¶
type BufferPool struct {
// contains filtered or unexported fields
}
BufferPool manages byte buffer pooling with size-based buckets
func NewBufferPool ¶
func NewBufferPool() *BufferPool
NewBufferPool creates a new buffer pool with predefined sizes
func (*BufferPool) Get ¶
func (p *BufferPool) Get(size int) []byte
Get returns a buffer of at least the requested size
type ChannelPool ¶
type ChannelPool[T any] struct { // contains filtered or unexported fields }
ChannelPool provides pooling for channels to reduce allocations
func NewChannelPool ¶
func NewChannelPool[T any](size int) *ChannelPool[T]
NewChannelPool creates a new channel pool with specified buffer size
func (*ChannelPool[T]) Get ¶
func (p *ChannelPool[T]) Get() chan T
Get retrieves a channel from the pool
func (*ChannelPool[T]) Put ¶
func (p *ChannelPool[T]) Put(ch chan T)
Put returns a channel to the pool after draining it
type ErrorSlicePool ¶
type ErrorSlicePool struct {
// contains filtered or unexported fields
}
ErrorSlicePool provides pooling for error slices
type Pool ¶
type Pool[T any] struct { // contains filtered or unexported fields }
Pool represents a generic object pool with type safety
type Record ¶
type Record struct {
ID string `json:"id"`
Data map[string]interface{} `json:"data"`
Metadata RecordMetadata `json:"metadata"`
Schema interface{} `json:"schema,omitempty"`
RawData []byte `json:"-"`
}
Record represents the unified record type (compatible with models.Record)
func GetBatchSlice ¶
GetBatchSlice gets a batch slice from the global pool with specified capacity
func NewCDCRecord ¶
NewCDCRecord creates a new record for Change Data Capture
func NewRecordFromPool ¶
NewRecordFromPool creates a new record using pooled resources
func NewStreamingRecord ¶
NewStreamingRecord creates a new record for streaming
func (*Record) GetCDCBefore ¶
GetCDCBefore returns the before state for UPDATE/DELETE operations
func (*Record) GetCDCDatabase ¶
GetCDCDatabase returns the database name for CDC records
func (*Record) GetCDCOperation ¶
GetCDCOperation returns the CDC operation type
func (*Record) GetCDCPosition ¶
GetCDCPosition returns the replication position
func (*Record) GetCDCTable ¶
GetCDCTable returns the table name for CDC records
func (*Record) GetCDCTransaction ¶
GetCDCTransaction returns the transaction ID
func (*Record) GetMetadata ¶
GetMetadata efficiently gets a custom metadata field
func (*Record) GetStreamID ¶
GetStreamID returns the stream identifier
func (*Record) GetTimestamp ¶
GetTimestamp returns the timestamp
func (*Record) IsCDCRecord ¶
IsCDCRecord returns true if this is a CDC record
func (*Record) IsStreamingRecord ¶
IsStreamingRecord returns true if this is a streaming record
func (*Record) Release ¶
func (r *Record) Release()
Release returns the record and its resources to the pool
func (*Record) SetCDCBefore ¶
SetCDCBefore sets the before state for UPDATE/DELETE operations
func (*Record) SetCDCDatabase ¶
SetCDCDatabase sets the database name for CDC records
func (*Record) SetCDCOperation ¶
SetCDCOperation sets the CDC operation type (INSERT, UPDATE, DELETE)
func (*Record) SetCDCPosition ¶
SetCDCPosition sets the replication position
func (*Record) SetCDCTable ¶
SetCDCTable sets the table name for CDC records
func (*Record) SetCDCTransaction ¶
SetCDCTransaction sets the transaction ID
func (*Record) SetMetadata ¶
SetMetadata efficiently sets a custom metadata field
func (*Record) SetStreamID ¶
SetStreamID sets the stream identifier
func (*Record) SetTimestamp ¶
SetTimestamp sets the timestamp
type RecordMetadata ¶
type RecordMetadata struct {
Source string `json:"source,omitempty"`
Table string `json:"table,omitempty"`
Operation string `json:"operation,omitempty"`
Offset int64 `json:"offset,omitempty"`
StreamID string `json:"stream_id,omitempty"`
Timestamp time.Time `json:"timestamp"`
Database string `json:"database,omitempty"`
Schema string `json:"schema,omitempty"`
Position string `json:"position,omitempty"`
Transaction string `json:"transaction,omitempty"`
Before map[string]interface{} `json:"before,omitempty"`
Custom map[string]interface{} `json:"custom,omitempty"`
}
RecordMetadata contains structured metadata (forward declaration for models.Record compatibility)
type StringInternPool ¶
type StringInternPool struct {
// contains filtered or unexported fields
}
StringInternPool provides string interning to reduce memory allocations for frequently used strings (like field names, map keys, etc.)
func (*StringInternPool) Clear ¶
func (p *StringInternPool) Clear()
Clear clears the intern pool (useful for tests)
func (*StringInternPool) Intern ¶
func (p *StringInternPool) Intern(s string) string
Intern returns an interned version of the string
func (*StringInternPool) InternBytes ¶
func (p *StringInternPool) InternBytes(b []byte) string
InternBytes interns a byte slice as a string
func (*StringInternPool) Stats ¶
func (p *StringInternPool) Stats() (size, hits, misses int64)
Stats returns intern pool statistics
type TypedRecord ¶
type TypedRecord struct {
*Record
// Typed fields for common data types
StringFields map[string]string
IntFields map[string]int64
FloatFields map[string]float64
BoolFields map[string]bool
TimeFields map[string]time.Time
BytesFields map[string][]byte
}
TypedRecord provides specialized record types to avoid interface{} boxing
func GetTypedRecord ¶
func GetTypedRecord() *TypedRecord
GetTypedRecord gets a typed record from the pool
func (*TypedRecord) GetBool ¶
func (tr *TypedRecord) GetBool(key string) (bool, bool)
GetBool gets a bool field without unboxing
func (*TypedRecord) GetBytes ¶
func (tr *TypedRecord) GetBytes(key string) ([]byte, bool)
GetBytes gets a bytes field without unboxing
func (*TypedRecord) GetFloat ¶
func (tr *TypedRecord) GetFloat(key string) (float64, bool)
GetFloat gets a float field without unboxing
func (*TypedRecord) GetInt ¶
func (tr *TypedRecord) GetInt(key string) (int64, bool)
GetInt gets an int field without unboxing
func (*TypedRecord) GetString ¶
func (tr *TypedRecord) GetString(key string) (string, bool)
GetString gets a string field without unboxing
func (*TypedRecord) GetTime ¶
func (tr *TypedRecord) GetTime(key string) (time.Time, bool)
GetTime gets a time field without unboxing
func (*TypedRecord) SetBool ¶
func (tr *TypedRecord) SetBool(key string, value bool)
SetBool sets a bool field without boxing
func (*TypedRecord) SetBytes ¶
func (tr *TypedRecord) SetBytes(key string, value []byte)
SetBytes sets a bytes field without boxing
func (*TypedRecord) SetFloat ¶
func (tr *TypedRecord) SetFloat(key string, value float64)
SetFloat sets a float field without boxing
func (*TypedRecord) SetInt ¶
func (tr *TypedRecord) SetInt(key string, value int64)
SetInt sets an int field without boxing
func (*TypedRecord) SetString ¶
func (tr *TypedRecord) SetString(key, value string)
SetString sets a string field without boxing