eventloop

package
v0.0.0-...-63d8188 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2026 License: AGPL-3.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AppendListenerIDs

func AppendListenerIDs(sig Signal, listeners ...SignalListener) error

AppendListenerIDs appends the provided listeners to the provided signal. It uses reflection, and relies on a BaseSignal being embedded in the provided signal.

func SignalHandlerWorkflowID

func SignalHandlerWorkflowID(sig Signal, req EventLoopRequest) string

SignalHandlerWorkflowID returns the standard ID to use for a signal handler child workflow

func WorkflowIDTemplate

func WorkflowIDTemplate(ctx workflow.Context, req any, tmpl string) string

WorkflowIDTemplate creates a workflow ID by evaluating the provided template string in the context of the given request.

This is expected to be called from code generated by temporal-gen.

Types

type BaseSignal

type BaseSignal struct {
	CtxPayload      *propagator.Payload `json:"ctx_payload"`
	SignalListeners []SignalListener    `json:"signal_listeners"`
	CGroup          string              `json:"cgroup`
}

func (BaseSignal) ConcurrencyGroup

func (b BaseSignal) ConcurrencyGroup() string

func (*BaseSignal) GetContext

func (b *BaseSignal) GetContext(ctx context.Context) context.Context

func (BaseSignal) GetOrg

func (BaseSignal) GetOrg(ctx context.Context, id string, db *gorm.DB) (*app.Org, error)

func (*BaseSignal) GetWorkflowContext

func (b *BaseSignal) GetWorkflowContext(ctx workflow.Context) workflow.Context

func (*BaseSignal) Listeners

func (b *BaseSignal) Listeners() []SignalListener

func (*BaseSignal) PropagateContext

func (b *BaseSignal) PropagateContext(ctx cctx.ValueContext) error

func (BaseSignal) WorkflowID

func (BaseSignal) WorkflowID(id string) string

func (BaseSignal) WorkflowName

func (BaseSignal) WorkflowName() string

type Client

type Client interface {
	Send(ctx context.Context, id string, signal Signal)
	Cancel(ctx context.Context, namespace, id string) error
	GetWorkflowStatus(ctx context.Context, namespace string, workflowID string) (enumsv1.WorkflowExecutionStatus, error)
	GetWorkflowCount(ctx context.Context, namespace string, workflowID string) (int64, error)
}

func New

func New(params Params) Client

type EventLoopRequest

type EventLoopRequest struct {
	ID          string
	SandboxMode bool

	// state managed between different signals
	Version            string
	RestartCount       int
	VersionChangeCount int
}

type IDTemplateVars

type IDTemplateVars struct {
	// CallerID is the workflow ID of the workflow that is calling this workflow as a child. Set to "root" if there is no parent.
	CallerID string

	// Req is the request argument passed in to the workflow. This is the same as the argument passed to the invoked workflow function.
	Req any
}

type Params

type Params struct {
	fx.In

	V      *validator.Validate
	L      *zap.Logger
	Client temporalclient.Client
	MW     metrics.Writer
	Cfg    *internal.Config
	DB     *gorm.DB `name:"psql"`
}

TODO: make event loop status consts

type Signal

type Signal interface {
	// WorkflowID returns the string ID of the event loop workflow to which this signal is being sent.
	WorkflowID(id string) string
	WorkflowName() string
	Namespace() string
	// Signal name is the same as temporal workflow type
	Name() string
	SignalType() SignalType

	// for managing intra-workflow communication
	Listeners() []SignalListener

	// ConcurrencyGroup returns the name of the concurrency group this signal belongs to.
	// Signals in different concurrency groups execute independently. Signals in the same group
	// execute sequentially in the order they are received by the event loop.
	//
	// The empty string identifies the default group.
	ConcurrencyGroup() string

	// for managing context
	GetOrg(ctx context.Context, id string, db *gorm.DB) (*app.Org, error)
	PropagateContext(ctx cctx.ValueContext) error
	GetWorkflowContext(ctx workflow.Context) workflow.Context
	GetContext(ctx context.Context) context.Context

	// lifecycle methods
	Validate(*validator.Validate) error
	Stop() bool
	Restart() bool
	Start() bool
}

type SignalDoneMessage

type SignalDoneMessage struct {
	Result any   `json:"result"`
	Error  error `json:"error"`
}

SignalDoneMessage is a special one-off signal type that is sent by the event loop to listeners that have registered for notifications about a particular signal.

func (SignalDoneMessage) MarshalJSON

func (s SignalDoneMessage) MarshalJSON() ([]byte, error)

func (*SignalDoneMessage) UnmarshalJSON

func (s *SignalDoneMessage) UnmarshalJSON(b []byte) error

type SignalListener

type SignalListener struct {
	// WorkflowID is the id of the workflow that is waiting for a signal.
	WorkflowID string `json:"workflow_id"`
	// Namespace is the namespace of the workflow that is waiting for a signal.
	Namespace string `json:"namespace"`
	// SignalName is the name for the signal that the listening workflow is expecting. This value
	// should be dynamic and ephemeral.
	SignalName string `json:"signal_name"`
}

A SignalListener is a serializable representation of a workflow that wants to be notified about the completion of a particular signal.

Field values represent the workflow to be notified, NOT the signal of interest.

type SignalType

type SignalType string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL