temporal

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2026 License: MIT Imports: 26 Imported by: 0

README

temporal

A module for the Modulus framework to integrate working with Temporal.io workflows engine

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DecorateStarterForTests

func DecorateStarterForTests(temporalModule *module.Module, timeout time.Duration) *module.Module

func ExecuteWorkflow

func ExecuteWorkflow(
	ctx context.Context,
	starter Starter,
	options client.StartWorkflowOptions,
	workflow interface{},
	args ...interface{},
) (client.WorkflowRun, error)

func Name

func Name(method interface{}) string

Name returns the full name of the workflow or activity function when the function is the method of a struct. This package registers and calls workflows and activities using this method of the function transformation. The native temporal SDK uses the function name only as the name of the workflow or activity. Due to this, if you register several workflows or activities with the same name, the last one will override the previous ones.

func NewFuture

func NewFuture[T any](ctx workflow.Context) (Future[T], Settable[T])

func NewManifesto

func NewManifesto() module.Manifesto

func NewModule

func NewModule() *module.Module

func OverrideStarter

func OverrideStarter[T Starter](temporalModule *module.Module) *module.Module

func Provide

func Provide[T Registerer](register interface{}, opts ...RegisterOption) fx.Option

func RegisterActivity

func RegisterActivity(registry worker.Registry, a interface{})

func RegisterWorkflow

func RegisterWorkflow(registry worker.Registry, w interface{})

func ScheduleAnnotation

func ScheduleAnnotation[T Schedule]() interface{}

func SchedulerCommand

func SchedulerCommand(scheduler *Scheduler) *cli.Command

SchedulerCommand runs a command to add or updates Temporal schedules.

func ShouldContinueAsNew

func ShouldContinueAsNew(ctx workflow.Context) bool

func SideEffect

func SideEffect[T any](ctx workflow.Context, f func(ctx workflow.Context) T) (T, error)

func WaitActivity

func WaitActivity[O any](ctx workflow.Context, activity interface{}, input ...any) (O, error)

func WaitActivityWithoutResult

func WaitActivityWithoutResult(ctx workflow.Context, activity interface{}, input ...any) error

func WorkerCommand

func WorkerCommand(w *Worker) *cli.Command

func WorkflowName

func WorkflowName(w interface{}) string

Types

type Channel

type Channel[T any] struct {
	// contains filtered or unexported fields
}

func NewChannel

func NewChannel[T any](ctx workflow.Context) Channel[T]

func (Channel[T]) Close

func (c Channel[T]) Close()

func (Channel[T]) Receive

func (c Channel[T]) Receive(ctx workflow.Context) (t T, more bool)

func (Channel[T]) ReceiveWithTimeout

func (c Channel[T]) ReceiveWithTimeout(ctx workflow.Context, timeout time.Duration) (t T, ok bool, more bool)

func (Channel[T]) Send

func (c Channel[T]) Send(ctx workflow.Context, value T)

func (Channel[T]) SendAsync

func (c Channel[T]) SendAsync(value T) (ok bool)

type Config

type Config struct {
	Host      string `env:"TEMPORAL_HOST, default=localhost:7233"`
	Namespace string `env:"TEMPORAL_NAMESPACE"`
	ApiKey    string `env:"TEMPORAL_API_KEY"`
}

type Future

type Future[T any] struct {
	Future workflow.Future
}

func ExecuteActivity

func ExecuteActivity[O any](ctx workflow.Context, activity interface{}, args ...any) Future[O]

func (Future[T]) Get

func (f Future[T]) Get(ctx workflow.Context) (T, error)

func (Future[T]) GetOrDefault

func (f Future[T]) GetOrDefault(ctx workflow.Context) T

func (Future[T]) GetWithTimeout

func (f Future[T]) GetWithTimeout(ctx workflow.Context, timeout time.Duration) (T, error)

func (Future[T]) IsReady

func (f Future[T]) IsReady() bool

type RegisterOption

type RegisterOption func() interface{}

type Registerer

type Registerer interface {
	Register(worker.Registry)
}

type Schedule

type Schedule interface {
	Schedule(queue string) client.ScheduleOptions
}

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(params SchedulerParams) *Scheduler

func (*Scheduler) Invoke

func (w *Scheduler) Invoke(ctx context.Context, queue string) error

type SchedulerParams

type SchedulerParams struct {
	fx.In

	Logger    *slog.Logger
	Temporal  client.Client
	Schedules []Schedule `group:"temporal.schedules"`
}

type Settable

type Settable[T any] struct {
	Settable workflow.Settable
}

func (Settable[T]) Set

func (f Settable[T]) Set(value T, err error)

func (Settable[T]) SetError

func (f Settable[T]) SetError(err error)

func (Settable[T]) SetValue

func (f Settable[T]) SetValue(value T)

type Starter

type Starter interface {
	ExecuteWorkflow(
		ctx context.Context,
		options client.StartWorkflowOptions,
		workflow interface{},
		args ...interface{},
	) (client.WorkflowRun, error)

	SignalWithStartWorkflow(
		ctx context.Context,
		workflowID string,
		signalName string,
		signalArg interface{},
		options client.StartWorkflowOptions,
		workflow interface{},
		workflowArgs ...interface{},
	) (client.WorkflowRun, error)

	SignalWorkflow(ctx context.Context, workflowID string, runID string, signalName string, arg interface{}) error
}

Look client.Client interface

func NewStarter

func NewStarter(client client.Client) Starter

type TestingStarter

type TestingStarter struct {
	// contains filtered or unexported fields
}

func (TestingStarter) ExecuteWorkflow

func (s TestingStarter) ExecuteWorkflow(
	ctx context.Context,
	options client.StartWorkflowOptions,
	workflow interface{},
	args ...interface{},
) (client.WorkflowRun, error)

func (TestingStarter) SignalWithStartWorkflow

func (s TestingStarter) SignalWithStartWorkflow(
	ctx context.Context,
	workflowID string,
	signalName string,
	signalArg interface{},
	options client.StartWorkflowOptions,
	workflow interface{},
	workflowArgs ...interface{},
) (client.WorkflowRun, error)

func (TestingStarter) SignalWorkflow

func (s TestingStarter) SignalWorkflow(
	ctx context.Context,
	workflowID string,
	runID string,
	signalName string,
	arg interface{},
) error

type TestingWorkflowRun

type TestingWorkflowRun struct {
	// contains filtered or unexported fields
}

func (*TestingWorkflowRun) Get

func (r *TestingWorkflowRun) Get(ctx context.Context, valuePtr interface{}) error

func (*TestingWorkflowRun) GetID

func (r *TestingWorkflowRun) GetID() string

func (*TestingWorkflowRun) GetRunID

func (r *TestingWorkflowRun) GetRunID() string

func (*TestingWorkflowRun) GetWithOptions

func (r *TestingWorkflowRun) GetWithOptions(
	ctx context.Context,
	valuePtr interface{},
	options client.WorkflowRunGetOptions,
) error

type WaitGroup

type WaitGroup struct {
	// contains filtered or unexported fields
}

func NewWaitGroup

func NewWaitGroup(ctx workflow.Context) *WaitGroup

func (*WaitGroup) Go

func (w *WaitGroup) Go(ctx workflow.Context, f func(ctx workflow.Context) error)

func (*WaitGroup) Wait

func (w *WaitGroup) Wait(ctx workflow.Context) error

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(params WorkersParams) *Worker

func (*Worker) Invoke

func (w *Worker) Invoke(cliCtx *cli.Context, queue string, enableSessionWorker bool) error

type WorkersParams

type WorkersParams struct {
	fx.In

	Runner      *infraCli.Runner
	Temporal    client.Client
	Registerers []Registerer `group:"temporal.registerers"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL