Documentation
¶
Index ¶
- Constants
- Variables
- func CopyInto(ctx context.Context, wrappedSink Sink, spliterator spliterator.Spliterator)
- type AbstractPipeline
- func (p *AbstractPipeline) Evaluate(ctx context.Context, terminalOp Operation, split spliterator.Spliterator) optional.Optional
- func (p *AbstractPipeline) GetTargetSize(sizeEstimate int) int
- func (p *AbstractPipeline) IsLinkedOrConsumed() bool
- func (p *AbstractPipeline) IsParallel() bool
- func (p *AbstractPipeline) SetLinkedOrConsumed(linkedOrConsumed bool)
- func (p *AbstractPipeline) SetParallel(parallel bool)
- func (p *AbstractPipeline) SetTargetSize(targetSize int)
- func (p *AbstractPipeline) SuggestTargetSize(sizeEstimate int) int
- type Operation
- type ShortCircuitTask
- type Sink
- type TODOOperation
- type TODOShortCircuitTask
- func (task *TODOShortCircuitTask) Cancel()
- func (task *TODOShortCircuitTask) CancelLaterNodes()
- func (task *TODOShortCircuitTask) Compute(ctx context.Context)
- func (task *TODOShortCircuitTask) GetLocalResult() Sink
- func (task *TODOShortCircuitTask) GetSharedResult() Sink
- func (task *TODOShortCircuitTask) SetLocalResult(localResult Sink)
- func (task TODOShortCircuitTask) SetRawResult(result Sink)
- func (task *TODOShortCircuitTask) SharedResult() *sharedResult
- func (task *TODOShortCircuitTask) ShortCircuit(result Sink)
- func (task *TODOShortCircuitTask) TaskCanceled() bool
- func (task *TODOShortCircuitTask) WithParent(parent ShortCircuitTask, spliterator spliterator.Spliterator) *TODOShortCircuitTask
- func (task *TODOShortCircuitTask) WithSpliterator(spliterator spliterator.Spliterator) *TODOShortCircuitTask
- type TODOSink
- type TODOTask
- func (task *TODOTask) Compute(ctx context.Context)
- func (task *TODOTask) DoLeaf(ctx context.Context) Sink
- func (task *TODOTask) Fork(ctx context.Context)
- func (task *TODOTask) GetLeafTarget() int
- func (task *TODOTask) GetLocalResult() Sink
- func (task *TODOTask) GetParent() Task
- func (task *TODOTask) GetSpliterator() spliterator.Spliterator
- func (task *TODOTask) Invoke(ctx context.Context) Sink
- func (task *TODOTask) IsLeaf() bool
- func (task *TODOTask) IsLeftmostNode() bool
- func (task *TODOTask) IsRoot() bool
- func (task *TODOTask) Join() Sink
- func (task *TODOTask) LeftChild() Task
- func (task *TODOTask) MakeChild(spliterator spliterator.Spliterator) Task
- func (task *TODOTask) OnCompletion(caller Task)
- func (task *TODOTask) RightChild() Task
- func (task *TODOTask) SetLeftChild(task_ Task)
- func (task *TODOTask) SetLocalResult(localResult Sink)
- func (task *TODOTask) SetRawResult(result Sink)
- func (task *TODOTask) SetRightChild(task_ Task)
- func (task *TODOTask) TargetSize() int
- func (task *TODOTask) WithParent(parent Task, spliterator spliterator.Spliterator) *TODOTask
- func (task *TODOTask) WithSpliterator(spliterator spliterator.Spliterator) *TODOTask
- type Task
Constants ¶
View Source
const ( MsgStreamLinked = "stream has already been operated upon or closed" MsgConsumed = "source already consumed or closed" )
Variables ¶
View Source
var (
ParallelTargetSize = runtime.GOMAXPROCS(-1)
)
Functions ¶
func CopyInto ¶
func CopyInto(ctx context.Context, wrappedSink Sink, spliterator spliterator.Spliterator)
Types ¶
type AbstractPipeline ¶
*
- Abstract base class for "pipeline" classes, which are the core
- implementations of the Stream interface and its primitive specializations.
- Manages construction and evaluation of stream pipelines. *
- <p>An {@code AbstractPipeline} represents an initial portion of a stream
- pipeline, encapsulating a stream source and zero or more intermediate
- operations. The individual {@code AbstractPipeline} objects are often
- referred to as <em>stages</em>, where each stage describes either the stream
- source or an intermediate operation. *
- <p>A concrete intermediate stage is generally built from an
- {@code AbstractPipeline}, a shape-specific pipeline class which extends it
- (e.g., {@code IntPipeline}) which is also abstract, and an operation-specific
- concrete class which extends that. {@code AbstractPipeline} contains most of
- the mechanics of evaluating the pipeline, and implements methods that will be
- used by the operation; the shape-specific classes add helper methods for
- dealing with collection of results into the appropriate shape-specific
- containers. *
- <p>After chaining a new intermediate operation, or executing a terminal
- operation, the stream is considered to be consumed, and no more intermediate
- or terminal operations are permitted on this stream instance. *
- @implNote
- <p>For sequential streams, and parallel streams without
- <a href="package-summary.html#StreamOps">stateful intermediate
- operations</a>, parallel streams, pipeline evaluation is done in a single
- pass that "jams" all the operations together. For parallel streams with
- stateful operations, execution is divided into segments, where each
- stateful operations marks the end of a segment, and each segment is
- evaluated separately and the result used as the input to the next
- segment. In all cases, the source data is not consumed until a terminal
- operation begins. *
- @param <E_IN> type of input elements
- @param <E_OUT> type of output elements
- @param <S> type of the subclass implementing {@code BaseStream}
- @since 1.8
func (*AbstractPipeline) Evaluate ¶
func (p *AbstractPipeline) Evaluate(ctx context.Context, terminalOp Operation, split spliterator.Spliterator) optional.Optional
*
- Evaluate the pipeline with a terminal operation to produce a result. *
- @param <R> the type of result
- @param terminalOp the terminal operation to be applied to the pipeline.
- @return the result
func (*AbstractPipeline) GetTargetSize ¶
func (p *AbstractPipeline) GetTargetSize(sizeEstimate int) int
*
- Returns the targetSize, initializing it via the supplied
- size estimate if not already initialized.
func (*AbstractPipeline) IsLinkedOrConsumed ¶
func (p *AbstractPipeline) IsLinkedOrConsumed() bool
func (*AbstractPipeline) IsParallel ¶
func (p *AbstractPipeline) IsParallel() bool
func (*AbstractPipeline) SetLinkedOrConsumed ¶
func (p *AbstractPipeline) SetLinkedOrConsumed(linkedOrConsumed bool)
func (*AbstractPipeline) SetParallel ¶
func (p *AbstractPipeline) SetParallel(parallel bool)
func (*AbstractPipeline) SetTargetSize ¶
func (p *AbstractPipeline) SetTargetSize(targetSize int)
func (*AbstractPipeline) SuggestTargetSize ¶
func (p *AbstractPipeline) SuggestTargetSize(sizeEstimate int) int
*
- Returns a suggested target leaf size based on the initial size estimate. *
- @return suggested target leaf size
type Operation ¶
type Operation interface {
/**
* Gets the stream flags of the operation. Terminal operations may set a
* limited subset of the stream flags defined in {@link StreamOpFlag}, and
* these flags are combined with the previously combined stream and
* intermediate operation flags for the pipeline.
*
* @implSpec The default implementation returns zero.
*
* @return the stream flags for this operation
* @see StreamOpFlag
*/
GetOpFlags() int
/**
* Performs a parallel evaluation of the operation using the specified
* {@code PipelineHelper}, which describes the upstream intermediate
* operations.
*
* @implSpec The default performs a sequential evaluation of the operation
* using the specified {@code PipelineHelper}.
*
* @param helper the pipeline helper
* @param spliterator the source spliterator
* @return the result of the evaluation
*/
EvaluateParallel(ctx context.Context, spliterator spliterator.Spliterator) optional.Optional
/**
* Performs a sequential evaluation of the operation using the specified
* {@code PipelineHelper}, which describes the upstream intermediate
* operations.
*
* @param helper the pipeline helper
* @param spliterator the source spliterator
* @return the result of the evaluation
*/
EvaluateSequential(ctx context.Context, spliterator spliterator.Spliterator) optional.Optional
}
*
- An operation in a stream pipeline that takes a stream as input and produces
- a result or side-effect. A {@code Op} has an input type and stream
- shape, and a result type. A {@code Op} also has a set of
- <em>operation flags</em> that describes how the operation processes elements
- of the stream (such as short-circuiting or respecting encounter order; see
- {@link StreamOpFlag}). *
- <p>A {@code Op} must provide a sequential and parallel implementation
- of the operation relative to a given stream source and set of intermediate
- operations. *
- @param <E_IN> the type of input elements
- @param <R> the type of the result
- @since 1.8
type ShortCircuitTask ¶
type ShortCircuitTask interface {
Task
GetSharedResult() Sink
/**
* Declares that a globally valid result has been found. If another task has
* not already found the answer, the result is installed in
* {@code sharedResult}. The {@code compute()} method will check
* {@code sharedResult} before proceeding with computation, so this causes
* the computation to terminate early.
*
* @param result the result found
*/
ShortCircuit(result Sink)
/**
* Mark this task as canceled
*/
Cancel()
/**
* Queries whether this task is canceled. A task is considered canceled if
* it or any of its parents have been canceled.
*
* @return {@code true} if this task or any parent is canceled.
*/
TaskCanceled() bool
/**
* Cancels all tasks which succeed this one in the encounter order. This
* includes canceling all the current task's right sibling, as well as the
* later right siblings of all its parents.
*/
CancelLaterNodes()
}
*
- Abstract class for fork-join tasks used to implement short-circuiting
- stream ops, which can produce a result without processing all elements of the
- stream. *
- @param <P_IN> type of input elements to the pipeline
- @param <P_OUT> type of output elements from the pipeline
- @param <R> type of intermediate result, may be different from operation
- result type
- @param <K> type of child and sibling tasks
- @since 1.8
type Sink ¶
type Sink interface {
sink.Sink
supplier.OptionalSupplier
}
*
- A {@link Sink} which accumulates state as elements are accepted, and allows
- a result to be retrieved after the computation is finished. *
- @param <T> the type of elements to be accepted
- @param <R> the type of the result *
- @since 1.8
func WrapAndCopyInto ¶
func WrapAndCopyInto(ctx context.Context, sink Sink, spliterator spliterator.Spliterator) Sink
*
- Applies the pipeline stages described by this {@code PipelineHelper} to
- the provided {@code Spliterator} and send the results to the provided
- {@code Sink}. *
- @implSpec
- The implementation behaves as if:
- <pre>{@code
- copyInto(wrapSink(sink), spliterator);
- }</pre> *
- @param sink the {@code Sink} to receive the results
- @param spliterator the spliterator describing the source input to process
type TODOOperation ¶
func (*TODOOperation) EvaluateParallel ¶
func (op *TODOOperation) EvaluateParallel(ctx context.Context, spliterator spliterator.Spliterator) optional.Optional
func (*TODOOperation) EvaluateSequential ¶
func (op *TODOOperation) EvaluateSequential(ctx context.Context, spliterator spliterator.Spliterator) optional.Optional
func (*TODOOperation) GetOpFlags ¶
func (op *TODOOperation) GetOpFlags() int
type TODOShortCircuitTask ¶
type TODOShortCircuitTask struct {
TODOTask
// contains filtered or unexported fields
}
func (*TODOShortCircuitTask) Cancel ¶
func (task *TODOShortCircuitTask) Cancel()
func (*TODOShortCircuitTask) CancelLaterNodes ¶
func (task *TODOShortCircuitTask) CancelLaterNodes()
func (*TODOShortCircuitTask) Compute ¶
func (task *TODOShortCircuitTask) Compute(ctx context.Context)
*
- Overrides TODOTask version to include checks for early
- exits while splitting or computing.
func (*TODOShortCircuitTask) GetLocalResult ¶
func (task *TODOShortCircuitTask) GetLocalResult() Sink
*
- Retrieves the local result for this task. If this task is the root,
- retrieves the shared result instead.
func (*TODOShortCircuitTask) GetSharedResult ¶
func (task *TODOShortCircuitTask) GetSharedResult() Sink
func (*TODOShortCircuitTask) SetLocalResult ¶
func (task *TODOShortCircuitTask) SetLocalResult(localResult Sink)
*
- Sets a local result for this task. If this task is the root, set the
- shared result instead (if not already set). *
- @param localResult The result to set for this task
func (TODOShortCircuitTask) SetRawResult ¶
func (task TODOShortCircuitTask) SetRawResult(result Sink)
*
- Does nothing; instead, subclasses should use
- {@link #setLocalResult(Object)}} to manage results. *
- @param result must be null, or an exception is thrown (this is a safety
- tripwire to detect when {@code setRawResult()} is being used
- instead of {@code setLocalResult()}
func (*TODOShortCircuitTask) SharedResult ¶
func (task *TODOShortCircuitTask) SharedResult() *sharedResult
func (*TODOShortCircuitTask) ShortCircuit ¶
func (task *TODOShortCircuitTask) ShortCircuit(result Sink)
func (*TODOShortCircuitTask) TaskCanceled ¶
func (task *TODOShortCircuitTask) TaskCanceled() bool
func (*TODOShortCircuitTask) WithParent ¶
func (task *TODOShortCircuitTask) WithParent(parent ShortCircuitTask, spliterator spliterator.Spliterator) *TODOShortCircuitTask
*
- Constructor for non-root nodes. *
- @param parent parent task in the computation tree
- @param spliterator the {@code Spliterator} for the portion of the
- computation tree described by this task
func (*TODOShortCircuitTask) WithSpliterator ¶
func (task *TODOShortCircuitTask) WithSpliterator(spliterator spliterator.Spliterator) *TODOShortCircuitTask
*
- Constructor for root tasks. *
- @param helper the {@code PipelineHelper} describing the stream pipeline
- up to this operation
- @param spliterator the {@code Spliterator} describing the source for this
- pipeline
type TODOTask ¶
func (*TODOTask) GetLeafTarget ¶
func (*TODOTask) GetLocalResult ¶
*
- Retrieves a result previously stored with {@link #setLocalResult} *
- @return local result for this node previously stored with
- {@link #setLocalResult}
func (*TODOTask) GetSpliterator ¶
func (task *TODOTask) GetSpliterator() spliterator.Spliterator
func (*TODOTask) IsLeftmostNode ¶
func (*TODOTask) MakeChild ¶
func (task *TODOTask) MakeChild(spliterator spliterator.Spliterator) Task
func (*TODOTask) OnCompletion ¶
func (*TODOTask) RightChild ¶
func (*TODOTask) SetLeftChild ¶
func (*TODOTask) SetLocalResult ¶
*
- Associates the result with the task, can be retrieved with
- {@link #GetLocalResult} *
- @param localResult local result for this node
func (*TODOTask) SetRawResult ¶
*
- Does nothing; instead, subclasses should use
- {@link #setLocalResult(Object)}} to manage results. *
- @param result must be null, or an exception is thrown (this is a safety
- tripwire to detect when {@code setRawResult()} is being used
- instead of {@code setLocalResult()}
func (*TODOTask) SetRightChild ¶
func (*TODOTask) TargetSize ¶
func (*TODOTask) WithParent ¶
func (task *TODOTask) WithParent(parent Task, spliterator spliterator.Spliterator) *TODOTask
*
- Constructor for non-root nodes. *
- @param parent this node's parent task
- @param spliterator {@code Spliterator} describing the subtree rooted at
- this node, obtained by splitting the parent {@code Spliterator}
func (*TODOTask) WithSpliterator ¶
func (task *TODOTask) WithSpliterator(spliterator spliterator.Spliterator) *TODOTask
*
- Constructor for root nodes. *
- @param helper The {@code PipelineHelper} describing the stream pipeline
- up to this operation
- @param spliterator The {@code Spliterator} describing the source for this
- pipeline
type Task ¶
type Task interface {
GetSpliterator() spliterator.Spliterator
/**
* Returns the parent of this task, or null if this task is the root
*
* @return the parent of this task, or null if this task is the root
*/
GetParent() Task
/**
* The left child.
* null if no children
* if non-null rightChild is non-null
*
* @return the right child
*/
LeftChild() Task
SetLeftChild(task Task)
/**
* The right child.
* null if no children
* if non-null rightChild is non-null
*
* @return the right child
*/
RightChild() Task
SetRightChild(task Task)
/**
* Indicates whether this task is a leaf node. (Only valid after
* {@link #compute} has been called on this node). If the node is not a
* leaf node, then children will be non-null and numChildren will be
* positive.
*
* @return {@code true} if this task is a leaf node
*/
IsLeaf() bool
/**
* Indicates whether this task is the root node
*
* @return {@code true} if this task is the root node.
*/
IsRoot() bool
/**
* Target leaf size, common to all tasks in a computation
*
* @return target leaf size.
*/
TargetSize() int
/**
* Default target of leaf tasks for parallel decomposition.
* To allow load balancing, we over-partition, currently to approximately
* four tasks per processor, which enables others to help out
* if leaf tasks are uneven or some processors are otherwise busy.
*
* @return the default target size of leaf tasks
*/
GetLeafTarget() int
/**
* Constructs a new node of type T whose parent is the receiver; must call
* the TODOTask(T, Spliterator) constructor with the receiver and the
* provided Spliterator.
*
* @param spliterator {@code Spliterator} describing the subtree rooted at
* this node, obtained by splitting the parent {@code Spliterator}
* @return newly constructed child node
*/
MakeChild(spliterator spliterator.Spliterator) Task
/**
* Computes the result associated with a leaf node. Will be called by
* {@code compute()} and the result passed to @{code setLocalResult()}
*
* @return the computed result of a leaf node
*/
DoLeaf(ctx context.Context) Sink
/**
* Retrieves a result previously stored with {@link #setLocalResult}
*
* @return local result for this node previously stored with
* {@link #setLocalResult}
*/
GetLocalResult() Sink
/**
* Associates the result with the task, can be retrieved with
* {@link #GetLocalResult}
*
* @param localResult local result for this node
*/
SetLocalResult(localResult Sink)
/**
* Decides whether or not to split a task further or compute it
* directly. If computing directly, calls {@code doLeaf} and pass
* the result to {@code setRawResult}. Otherwise splits off
* subtasks, forking one and continuing as the other.
*
* <p> The method is structured to conserve resources across a
* range of uses. The loop continues with one of the child tasks
* when split, to avoid deep recursion. To cope with spliterators
* that may be systematically biased toward left-heavy or
* right-heavy splits, we alternate which child is forked versus
* continued in the loop.
*/
Compute(ctx context.Context)
/**
* {@inheritDoc}
*
* @implNote
* Clears spliterator and children fields. Overriders MUST call
* {@code super.onCompletion} as the last thing they do if they want these
* cleared.
*/
OnCompletion(caller Task)
/**
* Returns whether this node is a "leftmost" node -- whether the path from
* the root to this node involves only traversing leftmost child links. For
* a leaf node, this means it is the first leaf node in the encounter order.
*
* @return {@code true} if this node is a "leftmost" node
*/
IsLeftmostNode() bool
/**
* Arranges to asynchronously execute this task in the pool the
* current task is running in, if applicable, or using the {@link
* ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While
* it is not necessarily enforced, it is a usage error to fork a
* task more than once unless it has completed and been
* reinitialized. Subsequent modifications to the state of this
* task or any data it operates on are not necessarily
* consistently observable by any thread other than the one
* executing it unless preceded by a call to {@link #join} or
* related methods, or a call to {@link #isDone} returning {@code
* true}.
*
* @return {@code this}, to simplify usage
*/
Fork(ctx context.Context)
/**
* Returns the result of the computation when it
* {@linkplain #isDone is done}.
* This method differs from {@link #get()} in that abnormal
* completion results in {@code RuntimeException} or {@code Error},
* not {@code ExecutionException}, and that interrupts of the
* calling thread do <em>not</em> cause the method to abruptly
* return by throwing {@code InterruptedException}.
*
* @return the computed result
*/
Join() Sink
/**
* Commences performing this task, awaits its completion if
* necessary, and returns its result, or throws an (unchecked)
* {@code RuntimeException} or {@code Error} if the underlying
* computation did so.
*
* @return the computed result
*/
Invoke(ctx context.Context) Sink
}
Click to show internal directories.
Click to hide internal directories.