Documentation
¶
Index ¶
- Constants
- Variables
- func CheckTaskHasPass(ctx context.Context, w world.WorldState, taskKey, passKey string) (bool, error)
- func CheckTaskType(ctx context.Context, ws world.WorldState, objKey string) error
- func CollectTaskPasses(ctx context.Context, ws world.WorldState, taskKeys ...string) ([]*forge_pass.Pass, []*forge_target.Target, []string, error)
- func CollectTaskTargets(ctx context.Context, ws world.WorldState, taskKeys ...string) ([]*forge_target.Target, []string, error)
- func CreateTaskWithTarget(ctx context.Context, ws world.WorldState, sender peer.ID, objKey string, ...) (world.ObjectState, *bucket.ObjectRef, error)
- func EnsureTaskHasPass(ctx context.Context, w world.WorldState, taskKey, passKey string) error
- func FindPassWithNonce(passNonce uint64, passes []*forge_pass.Pass) (*forge_pass.Pass, int)
- func LinkTaskCached(ctx context.Context, ws world.WorldState, taskKey, cachedTaskKey string) error
- func LinkTaskSubtask(ctx context.Context, ws world.WorldState, parentTaskKey, childTaskKey string) error
- func ListTaskPasses(ctx context.Context, w world.WorldState, taskKeys ...string) ([]string, error)
- func ListTaskSubtasks(ctx context.Context, w world.WorldState, taskKeys ...string) ([]string, error)
- func ListTaskTargets(ctx context.Context, w world.WorldState, taskKeys ...string) ([]string, error)
- func LookupTaskCached(ctx context.Context, ws world.WorldState, taskKey string) (string, error)
- func LookupTaskPass(ctx context.Context, ws world.WorldState, taskKey string, nonce uint64) (*forge_pass.Pass, *forge_target.Target, string, error)
- func LookupTaskTarget(ctx context.Context, ws world.WorldState, taskKey string) (*forge_target.Target, string, error)
- func NewPassKey(taskObjKey string, passNonce uint64) string
- func NewTargetKey(taskObjKey string) string
- func NewTaskBlock() block.Block
- func NewTaskToCachedQuad(taskKey, cachedTaskKey string) world.GraphQuad
- func NewTaskToPassQuad(taskObjKey, passObjKey string, passNonce uint64) world.GraphQuad
- func NewTaskToSubtaskQuad(parentTaskKey, childTaskKey string) world.GraphQuad
- func NewTaskToTargetQuad(taskObjKey, targetObjKey string) world.GraphQuad
- func ValidateName(name string) error
- type State
- func (s State) EnsureMatches(sts ...State) error
- func (x State) Enum() *State
- func (x State) MarshalJSON() ([]byte, error)
- func (x State) MarshalProtoJSON(s *json.MarshalState)
- func (x State) MarshalProtoText() string
- func (x State) MarshalText() ([]byte, error)
- func (x State) String() string
- func (x *State) UnmarshalJSON(b []byte) error
- func (x *State) UnmarshalProtoJSON(s *json.UnmarshalState)
- func (x *State) UnmarshalText(b []byte) error
- func (s State) Validate(allowUnknown bool) error
- type Task
- func (e *Task) ApplyBlockRef(id uint32, ptr *block.BlockRef) error
- func (e *Task) ApplySubBlock(id uint32, next block.SubBlock) error
- func (m *Task) CloneMessageVT() protobuf_go_lite.CloneMessage
- func (m *Task) CloneVT() *Task
- func (this *Task) EqualMessageVT(thatMsg any) bool
- func (this *Task) EqualVT(that *Task) bool
- func (e *Task) FollowTargetRef(ctx context.Context, bcs *block.Cursor) (*forge_target.Target, *block.Cursor, error)
- func (e *Task) GetBlockRefCtor(id uint32) block.Ctor
- func (e *Task) GetBlockRefs() (map[uint32]*block.BlockRef, error)
- func (x *Task) GetName() string
- func (x *Task) GetPassNonce() uint64
- func (x *Task) GetPeerId() string
- func (x *Task) GetReplicas() uint32
- func (x *Task) GetResult() *value.Result
- func (e *Task) GetSubBlockCtor(id uint32) block.SubBlockCtor
- func (e *Task) GetSubBlocks() map[uint32]block.SubBlock
- func (x *Task) GetTargetRef() *block.BlockRef
- func (x *Task) GetTaskState() State
- func (x *Task) GetTimestamp() *timestamppb.Timestamp
- func (x *Task) GetValueSet() *target.ValueSet
- func (e *Task) IsComplete() bool
- func (e *Task) MarshalBlock() ([]byte, error)
- func (x *Task) MarshalJSON() ([]byte, error)
- func (x *Task) MarshalProtoJSON(s *json.MarshalState)
- func (x *Task) MarshalProtoText() string
- func (m *Task) MarshalToSizedBufferVT(dAtA []byte) (int, error)
- func (m *Task) MarshalToVT(dAtA []byte) (int, error)
- func (m *Task) MarshalVT() (dAtA []byte, err error)
- func (*Task) ProtoMessage()
- func (x *Task) Reset()
- func (e *Task) SetTarget(bcs *block.Cursor, tgt *forge_target.Target)
- func (m *Task) SizeVT() (n int)
- func (x *Task) String() string
- func (e *Task) UnmarshalBlock(data []byte) error
- func (x *Task) UnmarshalJSON(b []byte) error
- func (x *Task) UnmarshalProtoJSON(s *json.UnmarshalState)
- func (m *Task) UnmarshalVT(dAtA []byte) error
- func (e *Task) Validate() error
Constants ¶
const ( // TaskTypeID is the type identifier for a Task. TaskTypeID = "forge/task" // PredTaskToTarget is the predicate linking Task to a Target. PredTaskToTarget = quad.IRI("forge/task-target") // PredTaskToPass is the predicate linking Task to a Pass. PredTaskToPass = quad.IRI("forge/task-pass") // PredTaskToSubtask is a graph predicate linking a parent Task to child Tasks. PredTaskToSubtask = quad.IRI("forge/task-subtask") // PredTaskToCached is a graph predicate linking a Task to a previous Task // whose result is inherited/cached. PredTaskToCached = quad.IRI("forge/task-cached") )
Variables ¶
var ( State_name = map[int32]string{ 0: "TaskState_UNKNOWN", 1: "TaskState_PENDING", 2: "TaskState_RUNNING", 3: "TaskState_CHECKING", 4: "TaskState_COMPLETE", 5: "TaskState_RETRY", } State_value = map[string]int32{ "TaskState_UNKNOWN": 0, "TaskState_PENDING": 1, "TaskState_RUNNING": 2, "TaskState_CHECKING": 3, "TaskState_COMPLETE": 4, "TaskState_RETRY": 5, } )
Enum value maps for State.
var ErrUnknownState = errors.New("unexpected or unhandled state")
ErrUnknownState is returned if the state was unknown/unhandled.
Functions ¶
func CheckTaskHasPass ¶
func CheckTaskHasPass(ctx context.Context, w world.WorldState, taskKey, passKey string) (bool, error)
CheckTaskHasPass checks if the Task is linked to a Pass.
func CheckTaskType ¶
CheckTaskType checks the type graph quad for a Task.
func CollectTaskPasses ¶
func CollectTaskPasses( ctx context.Context, ws world.WorldState, taskKeys ...string, ) ([]*forge_pass.Pass, []*forge_target.Target, []string, error)
CollectTaskPasses collects all active Pass linked to by the Task. If any of the linked states are invalid, returns an error.
func CollectTaskTargets ¶
func CollectTaskTargets( ctx context.Context, ws world.WorldState, taskKeys ...string, ) ([]*forge_target.Target, []string, error)
CollectTaskTargets collects all active Target linked to by the Tasks. If any of the linked states are invalid, returns an error.
func CreateTaskWithTarget ¶
func CreateTaskWithTarget( ctx context.Context, ws world.WorldState, sender peer.ID, objKey string, name string, tgt *forge_target.Target, peerID peer.ID, replicas uint32, ts *timestamp.Timestamp, ) (world.ObjectState, *bucket.ObjectRef, error)
CreateTaskWithTarget creates a pending Task and Target object in the world.
func EnsureTaskHasPass ¶
EnsureTaskHasPass checks if the Task has the Pass and returns an error otherwise.
func FindPassWithNonce ¶
func FindPassWithNonce(passNonce uint64, passes []*forge_pass.Pass) (*forge_pass.Pass, int)
FindPassWithNonce searches for the Pass with the given nonce in a set. returns nil, -1 if not found
func LinkTaskCached ¶
LinkTaskCached creates a graph link from a Task to a previous Task whose result is inherited.
func LinkTaskSubtask ¶
func LinkTaskSubtask(ctx context.Context, ws world.WorldState, parentTaskKey, childTaskKey string) error
LinkTaskSubtask creates the graph links between parent and child Tasks. Sets both forge/task-subtask (parent->child) and hydra/world/parent (child->parent).
func ListTaskPasses ¶
ListTaskPasses lists all Pass object keys that are linked to by the Task.
func ListTaskSubtasks ¶
func ListTaskSubtasks(ctx context.Context, w world.WorldState, taskKeys ...string) ([]string, error)
ListTaskSubtasks lists all subtask object keys for a parent Task.
func ListTaskTargets ¶
ListTaskTargets lists all Target object keys that are linked to by the Tasks. note: we only expect 1 target to be linked to each at any given time.
func LookupTaskCached ¶
LookupTaskCached looks up the cached task linked to a given task. Returns "", nil if no cached task is linked.
func LookupTaskPass ¶
func LookupTaskPass( ctx context.Context, ws world.WorldState, taskKey string, nonce uint64, ) (*forge_pass.Pass, *forge_target.Target, string, error)
LookupTaskPass looks up the task pass with the given nonce. Queries via the <value> field, which must be set correctly. If not found, returns nil, "", nil If nonce = 0, looks up any pass associated with the task.
func LookupTaskTarget ¶
func LookupTaskTarget( ctx context.Context, ws world.WorldState, taskKey string, ) (*forge_target.Target, string, error)
LookupTaskTarget looks up a single Target for a given Task. Returns nil, nil if no Target is resolved. Returns an error if more than one Target is resolved.
func NewPassKey ¶
NewPassKey builds a object key for a task pass.
func NewTargetKey ¶
NewTargetKey builds a object key for a task target.
func NewTaskToCachedQuad ¶
NewTaskToCachedQuad creates a quad linking a Task to a previous Task whose result is inherited/cached.
func NewTaskToPassQuad ¶
NewTaskToPassQuad creates a quad linking a Task to a Pass.
func NewTaskToSubtaskQuad ¶
NewTaskToSubtaskQuad creates a quad linking a parent Task to a child Task.
func NewTaskToTargetQuad ¶
NewTaskToTargetQuad creates a quad linking a Task to a Target.
Types ¶
type State ¶
type State int32
State contains the possible Task states.
const ( // TaskState_UNKNOWN is the unknown type. State_TaskState_UNKNOWN State = 0 // TaskState_PENDING is the state when waiting for target & inputs to be resolved. // If the <task/target> link or other inputs are not set, remains in PENDING state. // Transitions to RUNNING state when inputs are resolved and Pass is created. // Transitions to PENDING when the target or input values are updated. State_TaskState_PENDING State = 1 // TaskState_RUNNING is the state when a Pass is assigned and currently executing. // If the linked Target does not match the linked pass, cancel the pass. // If the inputs do not match the linked Pass inputs, cancel the pass. // If the pass becomes canceled, return to PENDING state. State_TaskState_RUNNING State = 2 // TaskState_CHECKING is the state when the Pass has completed. // The Task controller will then check the Pass results. // Transition to COMPLETE state on failure (invalid) or success (validation). State_TaskState_CHECKING State = 3 // TaskState_COMPLETE is the terminal state of the task. // This includes both success and failure termination states. State_TaskState_COMPLETE State = 4 // TaskState_RETRY wait for a change in Inputs before retrying with a new Pass. // When the assigned Pass is deleted or Inputs are different from latest resolved, // remove the <task/pass> graph link and transition to PENDING. State_TaskState_RETRY State = 5 )
func (State) EnsureMatches ¶
EnsureMatches checks if the state matches or returns an error.
func (State) MarshalJSON ¶
MarshalJSON marshals the State to JSON.
func (State) MarshalProtoJSON ¶
func (x State) MarshalProtoJSON(s *json.MarshalState)
MarshalProtoJSON marshals the State to JSON.
func (State) MarshalProtoText ¶
func (State) MarshalText ¶
MarshalText marshals the State to text.
func (*State) UnmarshalJSON ¶
UnmarshalJSON unmarshals the State from JSON.
func (*State) UnmarshalProtoJSON ¶
func (x *State) UnmarshalProtoJSON(s *json.UnmarshalState)
UnmarshalProtoJSON unmarshals the State from JSON.
func (*State) UnmarshalText ¶
UnmarshalText unmarshals the State from text.
type Task ¶
type Task struct {
// TaskState is the current state of the task.
TaskState State `protobuf:"varint,1,opt,name=task_state,json=taskState,proto3" json:"taskState,omitempty"`
// Name is the human readable Task name.
// Example: "my-task-1"
// Must be a valid DNS label as defined in RFC 1123.
Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
// PeerId is the Task controller peer ID.
// Usually the peer ID of the Cluster controller managing this Task.
// Can be empty.
PeerId string `protobuf:"bytes,3,opt,name=peer_id,json=peerId,proto3" json:"peerId,omitempty"`
// Replicas is the configured number of replicas for the created Pass.
// Cannot be zero.
// Task transitions to PENDING if different from latest Pass.
Replicas uint32 `protobuf:"varint,5,opt,name=replicas,proto3" json:"replicas,omitempty"`
// PassNonce is the most recent pass index.
// Incremented when a new pass is added.
// Can be initially zero when no Pass exists.
// Task transitions to PENDING if different from latest Pass.
PassNonce uint64 `protobuf:"varint,6,opt,name=pass_nonce,json=passNonce,proto3" json:"passNonce,omitempty"`
// TargetRef is the block reference to the Target for the Task.
// Can be initially empty.
// Task transitions to PENDING when changed.
TargetRef *block.BlockRef `protobuf:"bytes,7,opt,name=target_ref,json=targetRef,proto3" json:"targetRef,omitempty"`
// ValueSet is the set of inputs and outputs for the Task.
// The output set is updated when transitioning from CHECKING -> COMPLETE.
// Can be initially empty.
// Task transitions to PENDING when inputs are changed.
ValueSet *target.ValueSet `protobuf:"bytes,8,opt,name=value_set,json=valueSet,proto3" json:"valueSet,omitempty"`
// Result is information about the outcome of a completed Pass.
Result *value.Result `protobuf:"bytes,9,opt,name=result,proto3" json:"result,omitempty"`
// Timestamp is the time the Task was created.
// Used as a reference timestamp to make all ops deterministic.
// For example: all unixfs timestamps will be set to this value.
// Must be set.
Timestamp *timestamppb.Timestamp `protobuf:"bytes,10,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// contains filtered or unexported fields
}
Task contains state for running a Target.
World graph links:
- parent: usually the Job that created the Target.
- forge/task-pass: all Pass for the Task
- forge/task-target: current active Target of the Task, max 1
Incoming graph links:
- parent: from the Pass.
func CollectTaskSubtasks ¶
func CollectTaskSubtasks( ctx context.Context, ws world.WorldState, taskKeys ...string, ) ([]*Task, []string, error)
CollectTaskSubtasks collects all subtask objects for a parent Task. If any of the linked tasks are invalid, returns an error.
func LookupTask ¶
func LookupTask(ctx context.Context, ws world.WorldState, objKey string) (*Task, world.ObjectState, error)
LookupTask looks up a task in the world.
func UnmarshalTask ¶
UnmarshalTask unmarshals a task block from the cursor.
func (*Task) ApplyBlockRef ¶
ApplyBlockRef applies a ref change with a field id. The reference may be nil if the child block is nil.
func (*Task) ApplySubBlock ¶
ApplySubBlock applies a sub-block change with a field id.
func (*Task) CloneMessageVT ¶
func (m *Task) CloneMessageVT() protobuf_go_lite.CloneMessage
func (*Task) EqualMessageVT ¶
func (*Task) FollowTargetRef ¶
func (e *Task) FollowTargetRef(ctx context.Context, bcs *block.Cursor) (*forge_target.Target, *block.Cursor, error)
FollowTargetRef follows the reference to the Task target. bcs should point to the task.
func (*Task) GetBlockRefCtor ¶
GetBlockRefCtor returns the constructor for the block at the ref id. Return nil to indicate invalid ref ID or unknown.
func (*Task) GetBlockRefs ¶
GetBlockRefs returns all block references by ID. May return nil, and values may also be nil. Note: this does not include pending references (in a cursor)
func (*Task) GetPassNonce ¶
func (*Task) GetReplicas ¶
func (*Task) GetSubBlockCtor ¶
func (e *Task) GetSubBlockCtor(id uint32) block.SubBlockCtor
GetSubBlockCtor returns a function which creates or returns the existing sub-block at reference id. Can return nil to indicate invalid reference id.
func (*Task) GetSubBlocks ¶
GetSubBlocks returns all constructed sub-blocks by ID. May return nil, and values may also be nil.
func (*Task) GetTargetRef ¶
func (*Task) GetTaskState ¶
func (*Task) GetTimestamp ¶
func (x *Task) GetTimestamp() *timestamppb.Timestamp
func (*Task) GetValueSet ¶
func (*Task) IsComplete ¶
IsComplete checks if the execution is in the COMPLETE state.
func (*Task) MarshalBlock ¶
MarshalBlock marshals the block to binary. This is the initial step of marshaling, before transformations.
func (*Task) MarshalJSON ¶
MarshalJSON marshals the Task to JSON.
func (*Task) MarshalProtoJSON ¶
func (x *Task) MarshalProtoJSON(s *json.MarshalState)
MarshalProtoJSON marshals the Task message to JSON.
func (*Task) MarshalProtoText ¶
func (*Task) MarshalToSizedBufferVT ¶
func (*Task) ProtoMessage ¶
func (*Task) ProtoMessage()
func (*Task) SetTarget ¶
func (e *Task) SetTarget(bcs *block.Cursor, tgt *forge_target.Target)
SetTarget updates the target with a new block. bcs should point to the task
func (*Task) UnmarshalBlock ¶
UnmarshalBlock unmarshals the block to the object. This is the final step of decoding, after transformations.
func (*Task) UnmarshalJSON ¶
UnmarshalJSON unmarshals the Task from JSON.
func (*Task) UnmarshalProtoJSON ¶
func (x *Task) UnmarshalProtoJSON(s *json.UnmarshalState)
UnmarshalProtoJSON unmarshals the Task message from JSON.