Documentation
¶
Index ¶
- Variables
- func Identity(e interface{}) interface{}
- func IdentityMapper(e interface{}) interface{}
- func ValueIterator(i ProcessingIterator) data.Iterator
- type AggregationFunction
- type BufferCreator
- type BufferFrame
- type BufferImplementation
- type CheckNext
- type CompareFunction
- type CompareIndexedFunction
- type ExplodeFunction
- type FilterFunction
- type IncrementalProcessingSource
- type Index
- type IndexArray
- type MappingFunction
- type ProcessChain
- func Append(chain, add ProcessChain, conditions ...options.Condition) ProcessChain
- func Chain(log logging.Context) ProcessChain
- func Explode(e ExplodeFunction) ProcessChain
- func Filter(f FilterFunction) ProcessChain
- func Map(m MappingFunction) ProcessChain
- func Parallel(n int) ProcessChain
- func Sort(c CompareFunction) ProcessChain
- func Transform(t TransformFunction) ProcessChain
- func Unordered() ProcessChain
- func WithPool(pool ProcessorPool) ProcessChain
- type ProcessingBuffer
- type ProcessingEntry
- type ProcessingIterable
- type ProcessingIterator
- type ProcessingResult
- type ProcessingSource
- type ProcessorPool
- type SubEntries
- type TransformFunction
Constants ¶
This section is empty.
Variables ¶
View Source
var REALM = ocmlog.DefineSubRealm("output processing chains", "processing")
Functions ¶
func IdentityMapper ¶
func IdentityMapper(e interface{}) interface{}
func ValueIterator ¶
func ValueIterator(i ProcessingIterator) data.Iterator
Types ¶
type AggregationFunction ¶
type AggregationFunction func(e, aggr interface{}) interface{}
type BufferCreator ¶
type BufferCreator func(log logging.Context) ProcessingBuffer
type BufferFrame ¶
type BufferFrame interface {
Lock()
Unlock()
Broadcast()
Wait()
IsClosed() bool
}
type BufferImplementation ¶
type BufferImplementation interface {
Add(e ProcessingEntry) bool
Open()
Close()
Len() int
Get(i int) interface{}
ProcessingIterable
SetFrame(frame BufferFrame)
}
type CompareFunction ¶
type CompareFunction = data.CompareFunction
type CompareIndexedFunction ¶
type CompareIndexedFunction = data.CompareIndexedFunction
type ExplodeFunction ¶
type ExplodeFunction func(interface{}) []interface{}
type FilterFunction ¶
type FilterFunction func(interface{}) bool
type IncrementalProcessingSource ¶
type IncrementalProcessingSource interface {
data.Iterable
Open()
Add(e ...interface{}) IncrementalProcessingSource
Close()
}
type IndexArray ¶
type IndexArray []int
func (IndexArray) After ¶
func (i IndexArray) After(o IndexArray) bool
func (IndexArray) Copy ¶
func (i IndexArray) Copy() IndexArray
func (IndexArray) Next ¶
func (i IndexArray) Next(max, sub int) IndexArray
func (IndexArray) Validate ¶
func (i IndexArray) Validate(max int)
type MappingFunction ¶
type MappingFunction data.MappingFunction
type ProcessChain ¶
type ProcessChain interface {
Transform(t TransformFunction) ProcessChain
Explode(m ExplodeFunction) ProcessChain
Map(m MappingFunction) ProcessChain
Filter(f FilterFunction) ProcessChain
Sort(c CompareFunction) ProcessChain
WithPool(p ProcessorPool) ProcessChain
Unordered() ProcessChain
Parallel(n int) ProcessChain
Append(p ProcessChain) ProcessChain
Process(data data.Iterable) ProcessingResult
}
ProcessChain is a data structure holding a chain definition, which is a chain of step creation functions used to instantiate the chain for a dedicated input processing. The instantiation is initiated by calling the Process method on a chain.
func Append ¶
func Append(chain, add ProcessChain, conditions ...options.Condition) ProcessChain
func Chain ¶
func Chain(log logging.Context) ProcessChain
func Explode ¶
func Explode(e ExplodeFunction) ProcessChain
func Filter ¶
func Filter(f FilterFunction) ProcessChain
func Map ¶
func Map(m MappingFunction) ProcessChain
func Parallel ¶
func Parallel(n int) ProcessChain
func Sort ¶
func Sort(c CompareFunction) ProcessChain
func Transform ¶
func Transform(t TransformFunction) ProcessChain
func Unordered ¶
func Unordered() ProcessChain
func WithPool ¶
func WithPool(pool ProcessorPool) ProcessChain
type ProcessingBuffer ¶
type ProcessingBuffer interface {
Add(e ProcessingEntry) ProcessingBuffer
Len() int
Get(int) interface{}
Open()
Close()
ProcessingIterable
IsClosed() bool
}
func NewOrderedBuffer ¶
func NewOrderedBuffer(log logging.Context) ProcessingBuffer
func NewProcessingBuffer ¶
func NewProcessingBuffer(log logging.Context, i BufferImplementation) ProcessingBuffer
func NewSimpleBuffer ¶
func NewSimpleBuffer(log logging.Context) ProcessingBuffer
type ProcessingEntry ¶
func NewEntry ¶
func NewEntry(i Index, v interface{}, opts ...interface{}) ProcessingEntry
type ProcessingIterable ¶
type ProcessingIterable interface {
ProcessingIterator() ProcessingIterator
Iterator() data.Iterator
}
func NewEntryIterableFromIterable ¶
func NewEntryIterableFromIterable(log logging.Context, data data.Iterable) ProcessingIterable
func ValueIterable ¶
func ValueIterable(i ProcessingIterable) ProcessingIterable
type ProcessingIterator ¶
type ProcessingIterator interface {
HasNext() bool
NextProcessingEntry() ProcessingEntry
}
type ProcessingResult ¶
type ProcessingResult interface {
data.Iterable
Transform(t TransformFunction) ProcessingResult
Explode(e ExplodeFunction) ProcessingResult
Map(m MappingFunction) ProcessingResult
Filter(f FilterFunction) ProcessingResult
Sort(c CompareFunction) ProcessingResult
Apply(c ProcessChain) ProcessingResult
Synchronously() ProcessingResult
Asynchronously() ProcessingResult
WithPool(ProcessorPool) ProcessingResult
Unordered() ProcessingResult
Parallel(n int) ProcessingResult
AsSlice() data.IndexedSliceAccess
}
type ProcessingSource ¶
type ProcessingSource interface {
IncrementalProcessingSource
}
func NewAsyncProcessingSource ¶
func NewAsyncProcessingSource(log logging.Context, f func() data.Iterable, pool ProcessorPool) ProcessingSource
func NewIncrementalProcessingSource ¶
func NewIncrementalProcessingSource(log logging.Context) ProcessingSource
type ProcessorPool ¶
type ProcessorPool interface {
Request()
Release()
Exec(func())
}
func NewProcessorPool ¶
func NewProcessorPool(n int) ProcessorPool
func NewUnlimitedProcessorPool ¶
func NewUnlimitedProcessorPool() ProcessorPool
type SubEntries ¶
type SubEntries int
Click to show internal directories.
Click to hide internal directories.