async

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2026 License: GPL-3.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CloudTasksEnqueuer

type CloudTasksEnqueuer struct {
	// contains filtered or unexported fields
}

func NewCloudTasksEnqueuer

func NewCloudTasksEnqueuer(client *cloudtasks.Client, cfg CloudTasksEnqueuerConfig) *CloudTasksEnqueuer

func (*CloudTasksEnqueuer) EnqueueRun

func (e *CloudTasksEnqueuer) EnqueueRun(ctx context.Context, ref ExecutionRef, delay time.Duration) error

type CloudTasksEnqueuerConfig

type CloudTasksEnqueuerConfig struct {
	ProjectID string
	Location  string
	Queue     string

	RunURL string

	InvokerServiceAccountEmail string
	OIDCAudience               string
}

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

func NewEngine

func NewEngine(store Store, enq Enqueuer, reg *Registry, leaseTTL time.Duration) *Engine

func (*Engine) Kick

func (e *Engine) Kick(ctx context.Context, kind, key string, wakeNow bool, delay time.Duration) error

func (*Engine) RunOnce

func (e *Engine) RunOnce(ctx context.Context, ref ExecutionRef, owner string) (int, error)

type Enqueuer

type Enqueuer interface {
	EnqueueRun(ctx context.Context, ref ExecutionRef, delay time.Duration) error
}

type Execution

type Execution struct {
	Ref        ExecutionRef
	Status     ExecutionStatus
	Attempt    int
	LeaseOwner string
	LeaseUntil time.Time

	WakeAt     *time.Time
	Dirty      bool
	Checkpoint string

	WaitReason string
	LastError  string
	UpdatedAt  time.Time
}

type ExecutionRef

type ExecutionRef struct {
	Kind string `json:"kind"`
	Key  string `json:"key"`
}

func (ExecutionRef) ID

func (r ExecutionRef) ID() string

type ExecutionStatus

type ExecutionStatus string
const (
	ExecutionStatusQueued  ExecutionStatus = "queued"
	ExecutionStatusRunning ExecutionStatus = "running"
	ExecutionStatusWaiting ExecutionStatus = "waiting"
	ExecutionStatusDone    ExecutionStatus = "done"
	ExecutionStatusFailed  ExecutionStatus = "failed"
)

type Handler

type Handler interface {
	Run(ctx context.Context, exec Execution) (Outcome, error)
}

type Outcome

type Outcome struct {
	Type       OutcomeType
	Delay      time.Duration
	Reason     string
	Checkpoint string
	Err        error
}

func Done

func Done(checkpoint string) Outcome

func Fail

func Fail(err error) Outcome

func Retry

func Retry(err error) Outcome

func Wait

func Wait(delay time.Duration, reason string, checkpoint string) Outcome

type OutcomeType

type OutcomeType string
const (
	OutcomeDone  OutcomeType = "done"
	OutcomeWait  OutcomeType = "wait"
	OutcomeRetry OutcomeType = "retry"
	OutcomeFail  OutcomeType = "fail"
)

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

func NewRegistry

func NewRegistry() *Registry

func (*Registry) Get

func (r *Registry) Get(kind string) (Handler, bool)

func (*Registry) MustGet

func (r *Registry) MustGet(kind string) Handler

func (*Registry) Register

func (r *Registry) Register(kind string, h Handler)

type Store

type Store interface {
	UpsertQueued(ctx context.Context, ref ExecutionRef, wakeNow bool) (bool, error)
	AcquireLease(ctx context.Context, ref ExecutionRef, owner string, ttl time.Duration) (bool, Execution, error)
	Get(ctx context.Context, ref ExecutionRef) (Execution, error)
	MarkWaiting(ctx context.Context, ref ExecutionRef, wakeAt time.Time, reason string, checkpoint string) error
	MarkDone(ctx context.Context, ref ExecutionRef, checkpoint string) error
	MarkFailed(ctx context.Context, ref ExecutionRef, errMsg string) error
	FinalizeAfterRun(ctx context.Context, ref ExecutionRef) (bool, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL