forge_task

package
v0.51.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2026 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// TaskTypeID is the type identifier for a Task.
	TaskTypeID = "forge/task"

	// PredTaskToTarget is the predicate linking Task to a Target.
	PredTaskToTarget = quad.IRI("forge/task-target")
	// PredTaskToPass is the predicate linking Task to a Pass.
	PredTaskToPass = quad.IRI("forge/task-pass")
	// PredTaskToSubtask is a graph predicate linking a parent Task to child Tasks.
	PredTaskToSubtask = quad.IRI("forge/task-subtask")
	// PredTaskToCached is a graph predicate linking a Task to a previous Task
	// whose result is inherited/cached.
	PredTaskToCached = quad.IRI("forge/task-cached")
)

Variables

View Source
var (
	State_name = map[int32]string{
		0: "TaskState_UNKNOWN",
		1: "TaskState_PENDING",
		2: "TaskState_RUNNING",
		3: "TaskState_CHECKING",
		4: "TaskState_COMPLETE",
		5: "TaskState_RETRY",
	}
	State_value = map[string]int32{
		"TaskState_UNKNOWN":  0,
		"TaskState_PENDING":  1,
		"TaskState_RUNNING":  2,
		"TaskState_CHECKING": 3,
		"TaskState_COMPLETE": 4,
		"TaskState_RETRY":    5,
	}
)

Enum value maps for State.

View Source
var ErrUnknownState = errors.New("unexpected or unhandled state")

ErrUnknownState is returned if the state was unknown/unhandled.

Functions

func CheckTaskHasPass

func CheckTaskHasPass(ctx context.Context, w world.WorldState, taskKey, passKey string) (bool, error)

CheckTaskHasPass checks if the Task is linked to a Pass.

func CheckTaskType

func CheckTaskType(ctx context.Context, ws world.WorldState, objKey string) error

CheckTaskType checks the type graph quad for a Task.

func CollectTaskPasses

func CollectTaskPasses(
	ctx context.Context,
	ws world.WorldState,
	taskKeys ...string,
) ([]*forge_pass.Pass, []*forge_target.Target, []string, error)

CollectTaskPasses collects all active Pass linked to by the Task. If any of the linked states are invalid, returns an error.

func CollectTaskTargets

func CollectTaskTargets(
	ctx context.Context,
	ws world.WorldState,
	taskKeys ...string,
) ([]*forge_target.Target, []string, error)

CollectTaskTargets collects all active Target linked to by the Tasks. If any of the linked states are invalid, returns an error.

func CreateTaskWithTarget

func CreateTaskWithTarget(
	ctx context.Context,
	ws world.WorldState,
	sender peer.ID,
	objKey string,
	name string,
	tgt *forge_target.Target,
	peerID peer.ID,
	replicas uint32,
	ts *timestamp.Timestamp,
) (world.ObjectState, *bucket.ObjectRef, error)

CreateTaskWithTarget creates a pending Task and Target object in the world.

func EnsureTaskHasPass

func EnsureTaskHasPass(ctx context.Context, w world.WorldState, taskKey, passKey string) error

EnsureTaskHasPass checks if the Task has the Pass and returns an error otherwise.

func FindPassWithNonce

func FindPassWithNonce(passNonce uint64, passes []*forge_pass.Pass) (*forge_pass.Pass, int)

FindPassWithNonce searches for the Pass with the given nonce in a set. returns nil, -1 if not found

func LinkTaskCached

func LinkTaskCached(ctx context.Context, ws world.WorldState, taskKey, cachedTaskKey string) error

LinkTaskCached creates a graph link from a Task to a previous Task whose result is inherited.

func LinkTaskSubtask

func LinkTaskSubtask(ctx context.Context, ws world.WorldState, parentTaskKey, childTaskKey string) error

LinkTaskSubtask creates the graph links between parent and child Tasks. Sets both forge/task-subtask (parent->child) and hydra/world/parent (child->parent).

func ListTaskPasses

func ListTaskPasses(ctx context.Context, w world.WorldState, taskKeys ...string) ([]string, error)

ListTaskPasses lists all Pass object keys that are linked to by the Task.

func ListTaskSubtasks

func ListTaskSubtasks(ctx context.Context, w world.WorldState, taskKeys ...string) ([]string, error)

ListTaskSubtasks lists all subtask object keys for a parent Task.

func ListTaskTargets

func ListTaskTargets(ctx context.Context, w world.WorldState, taskKeys ...string) ([]string, error)

ListTaskTargets lists all Target object keys that are linked to by the Tasks. note: we only expect 1 target to be linked to each at any given time.

func LookupTaskCached

func LookupTaskCached(ctx context.Context, ws world.WorldState, taskKey string) (string, error)

LookupTaskCached looks up the cached task linked to a given task. Returns "", nil if no cached task is linked.

func LookupTaskPass

func LookupTaskPass(
	ctx context.Context,
	ws world.WorldState,
	taskKey string,
	nonce uint64,
) (*forge_pass.Pass, *forge_target.Target, string, error)

LookupTaskPass looks up the task pass with the given nonce. Queries via the <value> field, which must be set correctly. If not found, returns nil, "", nil If nonce = 0, looks up any pass associated with the task.

func LookupTaskTarget

func LookupTaskTarget(
	ctx context.Context,
	ws world.WorldState,
	taskKey string,
) (*forge_target.Target, string, error)

LookupTaskTarget looks up a single Target for a given Task. Returns nil, nil if no Target is resolved. Returns an error if more than one Target is resolved.

func NewPassKey

func NewPassKey(taskObjKey string, passNonce uint64) string

NewPassKey builds a object key for a task pass.

func NewTargetKey

func NewTargetKey(taskObjKey string) string

NewTargetKey builds a object key for a task target.

func NewTaskBlock

func NewTaskBlock() block.Block

NewTaskBlock constructs a new Task block.

func NewTaskToCachedQuad

func NewTaskToCachedQuad(taskKey, cachedTaskKey string) world.GraphQuad

NewTaskToCachedQuad creates a quad linking a Task to a previous Task whose result is inherited/cached.

func NewTaskToPassQuad

func NewTaskToPassQuad(taskObjKey, passObjKey string, passNonce uint64) world.GraphQuad

NewTaskToPassQuad creates a quad linking a Task to a Pass.

func NewTaskToSubtaskQuad

func NewTaskToSubtaskQuad(parentTaskKey, childTaskKey string) world.GraphQuad

NewTaskToSubtaskQuad creates a quad linking a parent Task to a child Task.

func NewTaskToTargetQuad

func NewTaskToTargetQuad(taskObjKey, targetObjKey string) world.GraphQuad

NewTaskToTargetQuad creates a quad linking a Task to a Target.

func ValidateName

func ValidateName(name string) error

ValidateName validates the name of a task.

Types

type State

type State int32

State contains the possible Task states.

const (
	// TaskState_UNKNOWN is the unknown type.
	State_TaskState_UNKNOWN State = 0
	// TaskState_PENDING is the state when waiting for target & inputs to be resolved.
	// If the <task/target> link or other inputs are not set, remains in PENDING state.
	// Transitions to RUNNING state when inputs are resolved and Pass is created.
	// Transitions to PENDING when the target or input values are updated.
	State_TaskState_PENDING State = 1
	// TaskState_RUNNING is the state when a Pass is assigned and currently executing.
	// If the linked Target does not match the linked pass, cancel the pass.
	// If the inputs do not match the linked Pass inputs, cancel the pass.
	// If the pass becomes canceled, return to PENDING state.
	State_TaskState_RUNNING State = 2
	// TaskState_CHECKING is the state when the Pass has completed.
	// The Task controller will then check the Pass results.
	// Transition to COMPLETE state on failure (invalid) or success (validation).
	State_TaskState_CHECKING State = 3
	// TaskState_COMPLETE is the terminal state of the task.
	// This includes both success and failure termination states.
	State_TaskState_COMPLETE State = 4
	// TaskState_RETRY wait for a change in Inputs before retrying with a new Pass.
	// When the assigned Pass is deleted or Inputs are different from latest resolved,
	// remove the <task/pass> graph link and transition to PENDING.
	State_TaskState_RETRY State = 5
)

func (State) EnsureMatches

func (s State) EnsureMatches(sts ...State) error

EnsureMatches checks if the state matches or returns an error.

func (State) Enum

func (x State) Enum() *State

func (State) MarshalJSON

func (x State) MarshalJSON() ([]byte, error)

MarshalJSON marshals the State to JSON.

func (State) MarshalProtoJSON

func (x State) MarshalProtoJSON(s *json.MarshalState)

MarshalProtoJSON marshals the State to JSON.

func (State) MarshalProtoText

func (x State) MarshalProtoText() string

func (State) MarshalText

func (x State) MarshalText() ([]byte, error)

MarshalText marshals the State to text.

func (State) String

func (x State) String() string

func (*State) UnmarshalJSON

func (x *State) UnmarshalJSON(b []byte) error

UnmarshalJSON unmarshals the State from JSON.

func (*State) UnmarshalProtoJSON

func (x *State) UnmarshalProtoJSON(s *json.UnmarshalState)

UnmarshalProtoJSON unmarshals the State from JSON.

func (*State) UnmarshalText

func (x *State) UnmarshalText(b []byte) error

UnmarshalText unmarshals the State from text.

func (State) Validate

func (s State) Validate(allowUnknown bool) error

Validate checks the execution state is within known values.

type Task

type Task struct {

	// TaskState is the current state of the task.
	TaskState State `protobuf:"varint,1,opt,name=task_state,json=taskState,proto3" json:"taskState,omitempty"`
	// Name is the human readable Task name.
	// Example: "my-task-1"
	// Must be a valid DNS label as defined in RFC 1123.
	Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
	// PeerId is the Task controller peer ID.
	// Usually the peer ID of the Cluster controller managing this Task.
	// Can be empty.
	PeerId string `protobuf:"bytes,3,opt,name=peer_id,json=peerId,proto3" json:"peerId,omitempty"`
	// Replicas is the configured number of replicas for the created Pass.
	// Cannot be zero.
	// Task transitions to PENDING if different from latest Pass.
	Replicas uint32 `protobuf:"varint,5,opt,name=replicas,proto3" json:"replicas,omitempty"`
	// PassNonce is the most recent pass index.
	// Incremented when a new pass is added.
	// Can be initially zero when no Pass exists.
	// Task transitions to PENDING if different from latest Pass.
	PassNonce uint64 `protobuf:"varint,6,opt,name=pass_nonce,json=passNonce,proto3" json:"passNonce,omitempty"`
	// TargetRef is the block reference to the Target for the Task.
	// Can be initially empty.
	// Task transitions to PENDING when changed.
	TargetRef *block.BlockRef `protobuf:"bytes,7,opt,name=target_ref,json=targetRef,proto3" json:"targetRef,omitempty"`
	// ValueSet is the set of inputs and outputs for the Task.
	// The output set is updated when transitioning from CHECKING -> COMPLETE.
	// Can be initially empty.
	// Task transitions to PENDING when inputs are changed.
	ValueSet *target.ValueSet `protobuf:"bytes,8,opt,name=value_set,json=valueSet,proto3" json:"valueSet,omitempty"`
	// Result is information about the outcome of a completed Pass.
	Result *value.Result `protobuf:"bytes,9,opt,name=result,proto3" json:"result,omitempty"`
	// Timestamp is the time the Task was created.
	// Used as a reference timestamp to make all ops deterministic.
	// For example: all unixfs timestamps will be set to this value.
	// Must be set.
	Timestamp *timestamppb.Timestamp `protobuf:"bytes,10,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	// contains filtered or unexported fields
}

Task contains state for running a Target.

World graph links:

  • parent: usually the Job that created the Target.
  • forge/task-pass: all Pass for the Task
  • forge/task-target: current active Target of the Task, max 1

Incoming graph links:

  • parent: from the Pass.

func CollectTaskSubtasks

func CollectTaskSubtasks(
	ctx context.Context,
	ws world.WorldState,
	taskKeys ...string,
) ([]*Task, []string, error)

CollectTaskSubtasks collects all subtask objects for a parent Task. If any of the linked tasks are invalid, returns an error.

func LookupTask

func LookupTask(ctx context.Context, ws world.WorldState, objKey string) (*Task, world.ObjectState, error)

LookupTask looks up a task in the world.

func UnmarshalTask

func UnmarshalTask(ctx context.Context, bcs *block.Cursor) (*Task, error)

UnmarshalTask unmarshals a task block from the cursor.

func (*Task) ApplyBlockRef

func (e *Task) ApplyBlockRef(id uint32, ptr *block.BlockRef) error

ApplyBlockRef applies a ref change with a field id. The reference may be nil if the child block is nil.

func (*Task) ApplySubBlock

func (e *Task) ApplySubBlock(id uint32, next block.SubBlock) error

ApplySubBlock applies a sub-block change with a field id.

func (*Task) CloneMessageVT

func (m *Task) CloneMessageVT() protobuf_go_lite.CloneMessage

func (*Task) CloneVT

func (m *Task) CloneVT() *Task

func (*Task) EqualMessageVT

func (this *Task) EqualMessageVT(thatMsg any) bool

func (*Task) EqualVT

func (this *Task) EqualVT(that *Task) bool

func (*Task) FollowTargetRef

func (e *Task) FollowTargetRef(ctx context.Context, bcs *block.Cursor) (*forge_target.Target, *block.Cursor, error)

FollowTargetRef follows the reference to the Task target. bcs should point to the task.

func (*Task) GetBlockRefCtor

func (e *Task) GetBlockRefCtor(id uint32) block.Ctor

GetBlockRefCtor returns the constructor for the block at the ref id. Return nil to indicate invalid ref ID or unknown.

func (*Task) GetBlockRefs

func (e *Task) GetBlockRefs() (map[uint32]*block.BlockRef, error)

GetBlockRefs returns all block references by ID. May return nil, and values may also be nil. Note: this does not include pending references (in a cursor)

func (*Task) GetName

func (x *Task) GetName() string

func (*Task) GetPassNonce

func (x *Task) GetPassNonce() uint64

func (*Task) GetPeerId

func (x *Task) GetPeerId() string

func (*Task) GetReplicas

func (x *Task) GetReplicas() uint32

func (*Task) GetResult

func (x *Task) GetResult() *value.Result

func (*Task) GetSubBlockCtor

func (e *Task) GetSubBlockCtor(id uint32) block.SubBlockCtor

GetSubBlockCtor returns a function which creates or returns the existing sub-block at reference id. Can return nil to indicate invalid reference id.

func (*Task) GetSubBlocks

func (e *Task) GetSubBlocks() map[uint32]block.SubBlock

GetSubBlocks returns all constructed sub-blocks by ID. May return nil, and values may also be nil.

func (*Task) GetTargetRef

func (x *Task) GetTargetRef() *block.BlockRef

func (*Task) GetTaskState

func (x *Task) GetTaskState() State

func (*Task) GetTimestamp

func (x *Task) GetTimestamp() *timestamppb.Timestamp

func (*Task) GetValueSet

func (x *Task) GetValueSet() *target.ValueSet

func (*Task) IsComplete

func (e *Task) IsComplete() bool

IsComplete checks if the execution is in the COMPLETE state.

func (*Task) MarshalBlock

func (e *Task) MarshalBlock() ([]byte, error)

MarshalBlock marshals the block to binary. This is the initial step of marshaling, before transformations.

func (*Task) MarshalJSON

func (x *Task) MarshalJSON() ([]byte, error)

MarshalJSON marshals the Task to JSON.

func (*Task) MarshalProtoJSON

func (x *Task) MarshalProtoJSON(s *json.MarshalState)

MarshalProtoJSON marshals the Task message to JSON.

func (*Task) MarshalProtoText

func (x *Task) MarshalProtoText() string

func (*Task) MarshalToSizedBufferVT

func (m *Task) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*Task) MarshalToVT

func (m *Task) MarshalToVT(dAtA []byte) (int, error)

func (*Task) MarshalVT

func (m *Task) MarshalVT() (dAtA []byte, err error)

func (*Task) ProtoMessage

func (*Task) ProtoMessage()

func (*Task) Reset

func (x *Task) Reset()

func (*Task) SetTarget

func (e *Task) SetTarget(bcs *block.Cursor, tgt *forge_target.Target)

SetTarget updates the target with a new block. bcs should point to the task

func (*Task) SizeVT

func (m *Task) SizeVT() (n int)

func (*Task) String

func (x *Task) String() string

func (*Task) UnmarshalBlock

func (e *Task) UnmarshalBlock(data []byte) error

UnmarshalBlock unmarshals the block to the object. This is the final step of decoding, after transformations.

func (*Task) UnmarshalJSON

func (x *Task) UnmarshalJSON(b []byte) error

UnmarshalJSON unmarshals the Task from JSON.

func (*Task) UnmarshalProtoJSON

func (x *Task) UnmarshalProtoJSON(s *json.UnmarshalState)

UnmarshalProtoJSON unmarshals the Task message from JSON.

func (*Task) UnmarshalVT

func (m *Task) UnmarshalVT(dAtA []byte) error

func (*Task) Validate

func (e *Task) Validate() error

Validate performs cursory checks of the Task object.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL