Documentation
¶
Index ¶
- Constants
- Variables
- func BuildPassExecutionObjKey(passObjKey, execPeerID string) string
- func CheckPassType(ctx context.Context, ws world.WorldState, objKey string) error
- func CollectPassExecutions(ctx context.Context, ws world.WorldState, passObjectKeys ...string) ([]*forge_execution.Execution, []string, error)
- func ComputeOutputsWithStates(outputs []*forge_target.Output, execStates []*ExecState, replicas int) (forge_value.ValueSlice, error)
- func CreateExecutionWithPass(ctx context.Context, ws world.WorldState, sender peer.ID, execObjKey string, ...) (*bucket.ObjectRef, error)
- func CreatePassWithTarget(ctx context.Context, ws world.WorldState, sender peer.ID, objKey string, ...) (world.ObjectState, *bucket.ObjectRef, error)
- func ListPassExecutions(ctx context.Context, w world.WorldState, passKeys ...string) ([]string, error)
- func NewExecStateSubBlockSet(v *[]*ExecState, bcs *block.Cursor) *sbset.NamedSubBlockSet
- func NewExecStateSubBlockSetCtor(v *[]*ExecState) block.SubBlockCtor
- func NewPassBlock() block.Block
- func NewPassToExecutionQuad(passObjKey, executionObjKey string) world.GraphQuad
- type ExecState
- func (m *ExecState) CloneMessageVT() protobuf_go_lite.CloneMessage
- func (m *ExecState) CloneVT() *ExecState
- func (this *ExecState) EqualMessageVT(thatMsg any) bool
- func (this *ExecState) EqualVT(that *ExecState) bool
- func (s *ExecState) Equals(ot *ExecState) bool
- func (x *ExecState) GetExecutionState() execution.State
- func (s *ExecState) GetName() string
- func (x *ExecState) GetObjectKey() string
- func (x *ExecState) GetPeerId() string
- func (x *ExecState) GetResult() *value.Result
- func (x *ExecState) GetTimestamp() *timestamppb.Timestamp
- func (x *ExecState) GetValueSet() *target.ValueSet
- func (s *ExecState) IsNil() bool
- func (x *ExecState) MarshalJSON() ([]byte, error)
- func (x *ExecState) MarshalProtoJSON(s *json.MarshalState)
- func (x *ExecState) MarshalProtoText() string
- func (m *ExecState) MarshalToSizedBufferVT(dAtA []byte) (int, error)
- func (m *ExecState) MarshalToVT(dAtA []byte) (int, error)
- func (m *ExecState) MarshalVT() (dAtA []byte, err error)
- func (s *ExecState) MatchesExecution(exec *forge_execution.Execution) bool
- func (s *ExecState) ParsePeerID() (peer.ID, error)
- func (*ExecState) ProtoMessage()
- func (x *ExecState) Reset()
- func (m *ExecState) SizeVT() (n int)
- func (x *ExecState) String() string
- func (x *ExecState) UnmarshalJSON(b []byte) error
- func (x *ExecState) UnmarshalProtoJSON(s *json.UnmarshalState)
- func (m *ExecState) UnmarshalVT(dAtA []byte) error
- func (s *ExecState) Validate() error
- type Pass
- func (e *Pass) ApplyBlockRef(id uint32, ptr *block.BlockRef) error
- func (e *Pass) ApplyExecStates(bcs *block.Cursor, execObjKeys []string, execObjs []*forge_execution.Execution) error
- func (e *Pass) ApplySubBlock(id uint32, next block.SubBlock) error
- func (m *Pass) CloneMessageVT() protobuf_go_lite.CloneMessage
- func (m *Pass) CloneVT() *Pass
- func (this *Pass) EqualMessageVT(thatMsg any) bool
- func (this *Pass) EqualVT(that *Pass) bool
- func (e *Pass) FollowTargetRef(ctx context.Context, bcs *block.Cursor) (*forge_target.Target, *block.Cursor, error)
- func (e *Pass) GetBlockRefCtor(id uint32) block.Ctor
- func (e *Pass) GetBlockRefs() (map[uint32]*block.BlockRef, error)
- func (x *Pass) GetExecStates() []*ExecState
- func (x *Pass) GetPassNonce() uint64
- func (x *Pass) GetPassState() State
- func (x *Pass) GetPeerId() string
- func (x *Pass) GetReplicas() uint32
- func (x *Pass) GetResult() *value.Result
- func (e *Pass) GetSubBlockCtor(id uint32) block.SubBlockCtor
- func (e *Pass) GetSubBlocks() map[uint32]block.SubBlock
- func (x *Pass) GetTargetRef() *block.BlockRef
- func (x *Pass) GetTimestamp() *timestamppb.Timestamp
- func (x *Pass) GetValueSet() *target.ValueSet
- func (e *Pass) IsComplete() bool
- func (e *Pass) MarshalBlock() ([]byte, error)
- func (x *Pass) MarshalJSON() ([]byte, error)
- func (x *Pass) MarshalProtoJSON(s *json.MarshalState)
- func (x *Pass) MarshalProtoText() string
- func (m *Pass) MarshalToSizedBufferVT(dAtA []byte) (int, error)
- func (m *Pass) MarshalToVT(dAtA []byte) (int, error)
- func (m *Pass) MarshalVT() (dAtA []byte, err error)
- func (e *Pass) ParsePeerID() (peer.ID, error)
- func (*Pass) ProtoMessage()
- func (x *Pass) Reset()
- func (m *Pass) SizeVT() (n int)
- func (x *Pass) String() string
- func (e *Pass) UnmarshalBlock(data []byte) error
- func (x *Pass) UnmarshalJSON(b []byte) error
- func (x *Pass) UnmarshalProtoJSON(s *json.UnmarshalState)
- func (m *Pass) UnmarshalVT(dAtA []byte) error
- func (e *Pass) Validate(allowEmptyRefs bool) error
- type State
- func (s State) EnsureMatches(sts ...State) error
- func (x State) Enum() *State
- func (x State) MarshalJSON() ([]byte, error)
- func (x State) MarshalProtoJSON(s *json.MarshalState)
- func (x State) MarshalProtoText() string
- func (x State) MarshalText() ([]byte, error)
- func (x State) String() string
- func (x *State) UnmarshalJSON(b []byte) error
- func (x *State) UnmarshalProtoJSON(s *json.UnmarshalState)
- func (x *State) UnmarshalText(b []byte) error
- func (s State) Validate(allowUnknown bool) error
Constants ¶
const ( // PassTypeID is the type identifier for a Pass. PassTypeID = "forge/pass" // PredPassToExecution is the predicate linking Pass to a Execution. PredPassToExecution = quad.IRI("forge/pass-execution") )
Variables ¶
var ( State_name = map[int32]string{ 0: "PassState_UNKNOWN", 1: "PassState_PENDING", 2: "PassState_RUNNING", 3: "PassState_CHECKING", 4: "PassState_COMPLETE", } State_value = map[string]int32{ "PassState_UNKNOWN": 0, "PassState_PENDING": 1, "PassState_RUNNING": 2, "PassState_CHECKING": 3, "PassState_COMPLETE": 4, } )
Enum value maps for State.
var ErrUnknownState = errors.New("unexpected or unhandled state")
ErrUnknownState is returned if the state was unknown/unhandled.
Functions ¶
func BuildPassExecutionObjKey ¶
BuildPassExecutionObjKey builds the object key for a pass execution. execPeerID must be set
func CheckPassType ¶
CheckPassType checks the type graph quad for a Pass.
func CollectPassExecutions ¶
func CollectPassExecutions( ctx context.Context, ws world.WorldState, passObjectKeys ...string, ) ([]*forge_execution.Execution, []string, error)
CollectPassExecutions collects all Executions linked to by the Pass. If any of the linked states are invalid, returns an error.
func ComputeOutputsWithStates ¶
func ComputeOutputsWithStates(outputs []*forge_target.Output, execStates []*ExecState, replicas int) (forge_value.ValueSlice, error)
ComputeOutputsWithStates computes the pass outputs with exec states.
func CreateExecutionWithPass ¶
func CreateExecutionWithPass( ctx context.Context, ws world.WorldState, sender peer.ID, execObjKey string, passObjKey string, passObjBcs *block.Cursor, passObj *Pass, execPeerID peer.ID, ) (*bucket.ObjectRef, error)
CreateExecutionWithPass creates a pending Execution object for a Pass.
Writes the Target to a block linked to by the Execution. execPeerID is the peer id to assign to the execution.
func CreatePassWithTarget ¶
func CreatePassWithTarget( ctx context.Context, ws world.WorldState, sender peer.ID, objKey string, valueSet *forge_target.ValueSet, tgt *forge_target.Target, nonce uint64, replicas uint32, passPeerID string, ts *timestamp.Timestamp, ) (world.ObjectState, *bucket.ObjectRef, error)
CreatePassWithTarget creates a pending Pass object in the world.
Writes the Target to a block linked to by the Pass.
func ListPassExecutions ¶
func ListPassExecutions(ctx context.Context, w world.WorldState, passKeys ...string) ([]string, error)
ListPassExecutions lists all Execution object keys that are linked to by the Pass.
func NewExecStateSubBlockSet ¶
func NewExecStateSubBlockSet(v *[]*ExecState, bcs *block.Cursor) *sbset.NamedSubBlockSet
NewExecStateSubBlockSet builds a new value set container.
bcs should be located at the sub-block
func NewExecStateSubBlockSetCtor ¶
func NewExecStateSubBlockSetCtor(v *[]*ExecState) block.SubBlockCtor
NewExecStateSubBlockSetCtor returns the sub-block constructor.
func NewPassToExecutionQuad ¶
NewPassToExecutionQuad creates a quad linking a Pass to a Execution.
Types ¶
type ExecState ¶
type ExecState struct {
// ObjectKey is the object key of the execution instance.
// Must exist before adding to Pass state.
// Graph quad also exists: <pass> <pass/execution> <object_key>
ObjectKey string `protobuf:"bytes,1,opt,name=object_key,json=objectKey,proto3" json:"objectKey,omitempty"`
// ExecutionState is the current state of the execution.
ExecutionState execution.State `protobuf:"varint,2,opt,name=execution_state,json=executionState,proto3" json:"executionState,omitempty"`
// PeerId is the identifier of the peer assigned to the execution.
// Can be empty.
PeerId string `protobuf:"bytes,3,opt,name=peer_id,json=peerId,proto3" json:"peerId,omitempty"`
// Timestamp is the time the parent object (usually Pass) was created.
Timestamp *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// ValueSet is the set of inputs and outputs used in the execution.
// Outputs are updated when the execution reaches COMPLETE state.
ValueSet *target.ValueSet `protobuf:"bytes,5,opt,name=value_set,json=valueSet,proto3" json:"valueSet,omitempty"`
// Result is information about the outcome of the execution.
Result *value.Result `protobuf:"bytes,6,opt,name=result,proto3" json:"result,omitempty"`
// contains filtered or unexported fields
}
ExecState contains the previous snapshot of an execution state.
func NewExecState ¶
func NewExecState(objKey string, e *forge_execution.Execution) *ExecState
NewExecState creates a new ExecState from an Execution.
func (*ExecState) CloneMessageVT ¶
func (m *ExecState) CloneMessageVT() protobuf_go_lite.CloneMessage
func (*ExecState) EqualMessageVT ¶
func (*ExecState) GetExecutionState ¶
func (*ExecState) GetObjectKey ¶
func (*ExecState) GetTimestamp ¶
func (x *ExecState) GetTimestamp() *timestamppb.Timestamp
func (*ExecState) GetValueSet ¶
func (*ExecState) MarshalJSON ¶
MarshalJSON marshals the ExecState to JSON.
func (*ExecState) MarshalProtoJSON ¶
func (x *ExecState) MarshalProtoJSON(s *json.MarshalState)
MarshalProtoJSON marshals the ExecState message to JSON.
func (*ExecState) MarshalProtoText ¶
func (*ExecState) MarshalToSizedBufferVT ¶
func (*ExecState) MatchesExecution ¶
func (s *ExecState) MatchesExecution(exec *forge_execution.Execution) bool
MatchesExecution checks if the details in the ExecState match the Execution.
func (*ExecState) ParsePeerID ¶
ParsePeerID parses the peer ID field.
func (*ExecState) ProtoMessage ¶
func (*ExecState) ProtoMessage()
func (*ExecState) UnmarshalJSON ¶
UnmarshalJSON unmarshals the ExecState from JSON.
func (*ExecState) UnmarshalProtoJSON ¶
func (x *ExecState) UnmarshalProtoJSON(s *json.UnmarshalState)
UnmarshalProtoJSON unmarshals the ExecState message from JSON.
func (*ExecState) UnmarshalVT ¶
type Pass ¶
type Pass struct {
// PassState is the current state of the pass.
PassState State `protobuf:"varint,1,opt,name=pass_state,json=passState,proto3" json:"passState,omitempty"`
// PeerId is the Pass controller peer ID.
// Usually the peer ID of the Cluster controller managing this Pass.
// Can be empty.
PeerId string `protobuf:"bytes,2,opt,name=peer_id,json=peerId,proto3" json:"peerId,omitempty"`
// TargetRef is the block reference to the Target for the pass.
TargetRef *block.BlockRef `protobuf:"bytes,3,opt,name=target_ref,json=targetRef,proto3" json:"targetRef,omitempty"`
// ValueSet is the set of inputs and outputs used in the pass.
// The input set is resolved before creating the Pass object.
// The inputs are copied to the Pass objects.
// The output set is updated when transitioning from CHECKING -> COMPLETE.
ValueSet *target.ValueSet `protobuf:"bytes,4,opt,name=value_set,json=valueSet,proto3" json:"valueSet,omitempty"`
// Result is information about the outcome of a completed Pass.
Result *value.Result `protobuf:"bytes,5,opt,name=result,proto3" json:"result,omitempty"`
// Replicas is the configured number of executions for the Pass.
//
// len(replicas) must match len(exec_states) to complete
Replicas uint32 `protobuf:"varint,6,opt,name=replicas,proto3" json:"replicas,omitempty"`
// PassNonce is the nonce of the pass set by the pass creator.
PassNonce uint64 `protobuf:"varint,7,opt,name=pass_nonce,json=passNonce,proto3" json:"passNonce,omitempty"`
// ExecStates contains the most recent snapshot of the execution states.
// Updated when:
// - PENDING to RUNNING: contains initial execution states (PENDING)
// - RUNNING: can add and remove execution states as needed.
// - RUNNING to CHECKING or COMPLETE: contains final states (COMPLETE)
// - Any to PENDING: cleared (set to len=0).
ExecStates []*ExecState `protobuf:"bytes,8,rep,name=exec_states,json=execStates,proto3" json:"execStates,omitempty"`
// Timestamp is the time the Pass was created.
// Used as a reference timestamp to make all ops deterministic.
// For example: all unixfs timestamps will be set to this value.
// Must be set.
Timestamp *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// contains filtered or unexported fields
}
Pass contains state for a Task pass. Contains a pointer to the Target used for the pass. Contains snapshots of the execution instance states. Execution instances can be added / removed.
The Pass is complete when len(exec_states) == replicas and all exec states are in the completed (terminal) state, or when a fatal error occurs.
If the Target object changes (in the world) or inputs change, a new Pass should be created (these fields are immutable).
World graph links:
- parent: usually a Task which created the Pass
- forge/pass-execution: all active execution instances for the pass
Incoming graph links:
- parent: from the Execution for the pass.
func LookupPass ¶
func LookupPass(ctx context.Context, ws world.WorldState, objKey string) (*Pass, *forge_target.Target, error)
LookupPass looks up a Pass in the world.
func UnmarshalPass ¶
UnmarshalPass unmarshals a pass block from the cursor.
func WaitPassComplete ¶
func WaitPassComplete( ctx context.Context, le *logrus.Entry, ws world.WorldState, passObjectKey string, ) (*Pass, error)
WaitPassComplete waits until the Pass is in the COMPLETE state.
func (*Pass) ApplyBlockRef ¶
ApplyBlockRef applies a ref change with a field id. The reference may be nil if the child block is nil.
func (*Pass) ApplyExecStates ¶
func (e *Pass) ApplyExecStates( bcs *block.Cursor, execObjKeys []string, execObjs []*forge_execution.Execution, ) error
ApplyExecStates updates the exec states field with the list of Executions. bcs can be nil
func (*Pass) ApplySubBlock ¶
ApplySubBlock applies a sub-block change with a field id.
func (*Pass) CloneMessageVT ¶
func (m *Pass) CloneMessageVT() protobuf_go_lite.CloneMessage
func (*Pass) EqualMessageVT ¶
func (*Pass) FollowTargetRef ¶
func (e *Pass) FollowTargetRef(ctx context.Context, bcs *block.Cursor) (*forge_target.Target, *block.Cursor, error)
FollowTargetRef follows the reference to the pass target. bcs should point to the pass.
func (*Pass) GetBlockRefCtor ¶
GetBlockRefCtor returns the constructor for the block at the ref id. Return nil to indicate invalid ref ID or unknown.
func (*Pass) GetBlockRefs ¶
GetBlockRefs returns all block references by ID. May return nil, and values may also be nil. Note: this does not include pending references (in a cursor)
func (*Pass) GetExecStates ¶
func (*Pass) GetPassNonce ¶
func (*Pass) GetPassState ¶
func (*Pass) GetReplicas ¶
func (*Pass) GetSubBlockCtor ¶
func (e *Pass) GetSubBlockCtor(id uint32) block.SubBlockCtor
GetSubBlockCtor returns a function which creates or returns the existing sub-block at reference id. Can return nil to indicate invalid reference id.
func (*Pass) GetSubBlocks ¶
GetSubBlocks returns all constructed sub-blocks by ID. May return nil, and values may also be nil.
func (*Pass) GetTargetRef ¶
func (*Pass) GetTimestamp ¶
func (x *Pass) GetTimestamp() *timestamppb.Timestamp
func (*Pass) GetValueSet ¶
func (*Pass) IsComplete ¶
IsComplete checks if the execution is in the COMPLETE state.
func (*Pass) MarshalBlock ¶
MarshalBlock marshals the block to binary. This is the initial step of marshaling, before transformations.
func (*Pass) MarshalJSON ¶
MarshalJSON marshals the Pass to JSON.
func (*Pass) MarshalProtoJSON ¶
func (x *Pass) MarshalProtoJSON(s *json.MarshalState)
MarshalProtoJSON marshals the Pass message to JSON.
func (*Pass) MarshalProtoText ¶
func (*Pass) MarshalToSizedBufferVT ¶
func (*Pass) ParsePeerID ¶
ParsePeerID parses the peer ID field. Returns empty if not set.
func (*Pass) ProtoMessage ¶
func (*Pass) ProtoMessage()
func (*Pass) UnmarshalBlock ¶
UnmarshalBlock unmarshals the block to the object. This is the final step of decoding, after transformations.
func (*Pass) UnmarshalJSON ¶
UnmarshalJSON unmarshals the Pass from JSON.
func (*Pass) UnmarshalProtoJSON ¶
func (x *Pass) UnmarshalProtoJSON(s *json.UnmarshalState)
UnmarshalProtoJSON unmarshals the Pass message from JSON.
func (*Pass) UnmarshalVT ¶
type State ¶
type State int32
State contains the possible Pass states.
const ( // PassState_UNKNOWN is the unknown type. State_PassState_UNKNOWN State = 0 // PassState_PENDING is the state when the pass is not yet running. // Transitions to RUNNING when the pass is promoted to running. State_PassState_PENDING State = 1 // PassState_RUNNING is the state when the executions are running. // ExecStates can be added, removed, updated during this state as needed. State_PassState_RUNNING State = 2 // PassState_CHECKING is the state when all exec states are completed. // If multiple executions were scheduled, they are checked for agreement. // Transition to COMPLETE state on failure (disageement) or success (validation). State_PassState_CHECKING State = 3 // PassState_COMPLETE is the normal terminal state of the pass. // This includes both success and failure termination states. State_PassState_COMPLETE State = 4 )
func (State) EnsureMatches ¶
EnsureMatches checks if the state matches or returns an error.
func (State) MarshalJSON ¶
MarshalJSON marshals the State to JSON.
func (State) MarshalProtoJSON ¶
func (x State) MarshalProtoJSON(s *json.MarshalState)
MarshalProtoJSON marshals the State to JSON.
func (State) MarshalProtoText ¶
func (State) MarshalText ¶
MarshalText marshals the State to text.
func (*State) UnmarshalJSON ¶
UnmarshalJSON unmarshals the State from JSON.
func (*State) UnmarshalProtoJSON ¶
func (x *State) UnmarshalProtoJSON(s *json.UnmarshalState)
UnmarshalProtoJSON unmarshals the State from JSON.
func (*State) UnmarshalText ¶
UnmarshalText unmarshals the State from text.