Documentation
¶
Overview ¶
Package temporalnexus provides utilities for exposing Temporal constructs as Nexus Operations.
Nexus RPC is a modern open-source service framework for arbitrary-length operations whose lifetime may extend beyond a traditional RPC. Nexus was designed with durable execution in mind, as an underpinning to connect durable executions within and across namespaces, clusters and regions – with a clean API contract to streamline multi-team collaboration. Any service can be exposed as a set of sync or async Nexus operations – the latter provides an operation identity and a uniform interface to get the status of an operation or its result, receive a completion callback, or cancel the operation.
Temporal leverages the Nexus RPC protocol to facilitate calling across namespace and cluster and boundaries.
See also:
Nexus over HTTP Spec: https://github.com/nexus-rpc/api/blob/main/SPEC.md
Nexus Go SDK: https://github.com/nexus-rpc/sdk-go
Index ¶
- func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) nexus.Link
- func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_WorkflowEvent, error)
- func GetClient(ctx context.Context) client.Client
- func GetLogger(ctx context.Context) log.Logger
- func GetMetricsHandler(ctx context.Context) metrics.Handler
- func MustNewWorkflowRunOperationWithOptions[I, O any](options WorkflowRunOperationOptions[I, O]) nexus.Operation[I, O]
- func NewSyncOperation[I any, O any](name string, ...) nexus.Operation[I, O]deprecated
- func NewWorkflowRunOperation[I, O any](name string, workflow func(workflow.Context, I) (O, error), ...) nexus.Operation[I, O]
- func NewWorkflowRunOperationWithOptions[I, O any](options WorkflowRunOperationOptions[I, O]) (nexus.Operation[I, O], error)
- type WorkflowHandle
- type WorkflowRunOperationOptions
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConvertLinkWorkflowEventToNexusLink ¶
func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) nexus.Link
ConvertLinkWorkflowEventToNexusLink converts a Link_WorkflowEvent type to Nexus Link.
NOTE: Experimental
func ConvertNexusLinkToLinkWorkflowEvent ¶
func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_WorkflowEvent, error)
ConvertNexusLinkToLinkWorkflowEvent converts a Nexus Link to Link_WorkflowEvent.
NOTE: Experimental
func GetClient ¶
GetClient returns a client to be used in a Nexus operation's context, this is the same client that the worker was created with. Client methods will panic when called from the test environment.
func GetMetricsHandler ¶
GetMetricsHandler returns a metrics handler to be used in a Nexus operation's context.
func MustNewWorkflowRunOperationWithOptions ¶
func MustNewWorkflowRunOperationWithOptions[I, O any](options WorkflowRunOperationOptions[I, O]) nexus.Operation[I, O]
MustNewWorkflowRunOperation map an operation to a workflow run with the given options. Panics if invalid options are provided.
func NewSyncOperation
deprecated
func NewSyncOperation[I any, O any]( name string, handler func(context.Context, client.Client, I, nexus.StartOperationOptions) (O, error), ) nexus.Operation[I, O]
NewSyncOperation is a helper for creating a synchronous-only nexus.Operation from a given name and handler function. The handler is passed the client that the worker was created with. Sync operations are useful for exposing short-lived Temporal client requests, such as signals, queries, sync update, list workflows, etc...
Deprecated: Use nexus.NewSyncOperation and get the client via temporalnexus.GetClient
Example ¶
package main
import (
"context"
"github.com/formulatehq/temporal-sdk-go/client"
"github.com/formulatehq/temporal-sdk-go/temporalnexus"
"github.com/formulatehq/temporal-sdk-go/worker"
"github.com/nexus-rpc/sdk-go/nexus"
)
type MyInput struct {
ID string
}
type MyQueryOutput struct {
}
func main() {
opRead := nexus.NewSyncOperation("my-read-only-operation", func(ctx context.Context, input MyInput, opts nexus.StartOperationOptions) (MyQueryOutput, error) {
var ret MyQueryOutput
res, err := temporalnexus.GetClient(ctx).QueryWorkflow(ctx, input.ID, "", "some-query", nil)
if err != nil {
return ret, err
}
return ret, res.Get(&ret)
})
// Operations don't have to return values.
opWrite := nexus.NewSyncOperation("my-write-operation", func(ctx context.Context, input MyInput, opts nexus.StartOperationOptions) (nexus.NoValue, error) {
return nil, temporalnexus.GetClient(ctx).SignalWorkflow(ctx, input.ID, "", "some-signal", nil)
})
service := nexus.NewService("my-service")
_ = service.Register(opRead, opWrite)
c, _ := client.Dial(client.Options{
HostPort: "localhost:7233",
Namespace: "my-namespace",
})
w := worker.New(c, "my-task-queue", worker.Options{})
w.RegisterNexusService(service)
}
func NewWorkflowRunOperation ¶
func NewWorkflowRunOperation[I, O any]( name string, workflow func(workflow.Context, I) (O, error), getOptions func(context.Context, I, nexus.StartOperationOptions) (client.StartWorkflowOptions, error), ) nexus.Operation[I, O]
NewWorkflowRunOperation maps an operation to a workflow run.
Example ¶
package main
import (
"context"
"github.com/formulatehq/temporal-sdk-go/client"
"github.com/formulatehq/temporal-sdk-go/temporalnexus"
"github.com/formulatehq/temporal-sdk-go/worker"
"github.com/formulatehq/temporal-sdk-go/workflow"
"github.com/nexus-rpc/sdk-go/nexus"
)
type MyOutput struct {
}
type MyInput struct {
ID string
}
func MyHandlerWorkflow(workflow.Context, MyInput) (MyOutput, error) {
return MyOutput{}, nil
}
func main() {
op := temporalnexus.NewWorkflowRunOperation(
"my-async-operation",
MyHandlerWorkflow,
func(ctx context.Context, input MyInput, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
return client.StartWorkflowOptions{
// Workflow ID is required and must be deterministically generated from the input in order
// for the operation to be idempotent as the request to start the operation may be retried.
ID: input.ID,
}, nil
})
service := nexus.NewService("my-service")
_ = service.Register(op)
c, _ := client.Dial(client.Options{
HostPort: "localhost:7233",
Namespace: "my-namespace",
})
w := worker.New(c, "my-task-queue", worker.Options{})
w.RegisterWorkflow(MyHandlerWorkflow)
w.RegisterNexusService(service)
}
func NewWorkflowRunOperationWithOptions ¶
func NewWorkflowRunOperationWithOptions[I, O any](options WorkflowRunOperationOptions[I, O]) (nexus.Operation[I, O], error)
NewWorkflowRunOperation map an operation to a workflow run with the given options. Returns an error if invalid options are provided.
Example ¶
package main
import (
"context"
"github.com/formulatehq/temporal-sdk-go/client"
"github.com/formulatehq/temporal-sdk-go/temporalnexus"
"github.com/formulatehq/temporal-sdk-go/worker"
"github.com/formulatehq/temporal-sdk-go/workflow"
"github.com/nexus-rpc/sdk-go/nexus"
)
type MyWorkflowInput struct {
}
type MyOutput struct {
}
type MyInput struct {
ID string
}
func MyHandlerWorkflow(workflow.Context, MyInput) (MyOutput, error) {
return MyOutput{}, nil
}
func MyHandlerWorkflowWithAlternativeInput(workflow.Context, MyWorkflowInput) (MyOutput, error) {
return MyOutput{}, nil
}
func main() {
// Alternative 1 - long form version of NewWorkflowRunOperation.
opAlt1, _ := temporalnexus.NewWorkflowRunOperationWithOptions(
temporalnexus.WorkflowRunOperationOptions[MyInput, MyOutput]{
Name: "my-async-op-1",
Workflow: MyHandlerWorkflow,
GetOptions: func(ctx context.Context, input MyInput, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
return client.StartWorkflowOptions{
// Workflow ID is required and must be deterministically generated from the input in order
// for the operation to be idempotent as the request to start the operation may be retried.
ID: input.ID,
}, nil
},
})
// Alternative 2 - start a workflow with alternative inputs.
opAlt2, _ := temporalnexus.NewWorkflowRunOperationWithOptions(
temporalnexus.WorkflowRunOperationOptions[MyInput, MyOutput]{
Name: "my-async-op-2",
Handler: func(ctx context.Context, input MyInput, opts nexus.StartOperationOptions) (temporalnexus.WorkflowHandle[MyOutput], error) {
// Workflows started with this API must take a single input and return single output.
// To start workflows with different signatures, use ExecuteUntypedWorkflow.
return temporalnexus.ExecuteWorkflow(ctx, opts, client.StartWorkflowOptions{
// Workflow ID is required and must be deterministically generated from the input in order
// for the operation to be idempotent as the request to start the operation may be retried.
ID: input.ID,
}, MyHandlerWorkflowWithAlternativeInput, MyWorkflowInput{})
},
})
service := nexus.NewService("my-service")
_ = service.Register(opAlt1, opAlt2)
c, _ := client.Dial(client.Options{
HostPort: "localhost:7233",
Namespace: "my-namespace",
})
w := worker.New(c, "my-task-queue", worker.Options{})
w.RegisterWorkflow(MyHandlerWorkflow)
w.RegisterWorkflow(MyHandlerWorkflowWithAlternativeInput)
w.RegisterNexusService(service)
}
Types ¶
type WorkflowHandle ¶
type WorkflowHandle[T any] interface { // ID is the workflow's ID. ID() string // ID is the workflow's run ID. RunID() string // contains filtered or unexported methods }
WorkflowHandle is a readonly representation of a workflow run backing a Nexus operation. It's created via the ExecuteWorkflow and ExecuteUntypedWorkflow methods.
func ExecuteUntypedWorkflow ¶
func ExecuteUntypedWorkflow[R any]( ctx context.Context, nexusOptions nexus.StartOperationOptions, startWorkflowOptions client.StartWorkflowOptions, workflow any, args ...any, ) (WorkflowHandle[R], error)
ExecuteUntypedWorkflow starts a workflow with by function reference or string name, linking the execution chain to a Nexus operation. Useful for invoking workflows that don't follow the single argument - single return type signature. See ExecuteWorkflow for more information.
func ExecuteWorkflow ¶
func ExecuteWorkflow[I, O any, WF func(workflow.Context, I) (O, error)]( ctx context.Context, nexusOptions nexus.StartOperationOptions, startWorkflowOptions client.StartWorkflowOptions, workflow WF, arg I, ) (WorkflowHandle[O], error)
ExecuteWorkflow starts a workflow run for a WorkflowRunOperationOptions Handler, linking the execution chain to a Nexus operation (subsequent runs started from continue-as-new and retries). Automatically propagates the callback and request ID from the nexus options to the workflow.
type WorkflowRunOperationOptions ¶
type WorkflowRunOperationOptions[I, O any] struct { // Operation name. Name string // Workflow function to map this operation to. The operation input maps directly to workflow input. // The workflow name is resolved as it would when using this function in client.ExecuteOperation. // GetOptions must be provided when setting this option. Mutually exclusive with Handler. Workflow func(workflow.Context, I) (O, error) // Options for starting the workflow. Must be set if Workflow is set. Mutually exclusive with Handler. // The options returned must include a workflow ID that is deterministically generated from the input in order // for the operation to be idempotent as the request to start the operation may be retried. // TaskQueue is optional and defaults to the current worker's task queue. // WorkflowExecutionErrorWhenAlreadyStarted is ignored and always set to true. // WorkflowIDConflictPolicy is by default set to fail if a workflow is already running. That is, // if a caller executes another operation that starts the same workflow, it will fail. You can set // it to WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING to attach the caller's callback to the existing // running workflow. This way, all attached callers will be notified when the workflow completes. GetOptions func(context.Context, I, nexus.StartOperationOptions) (client.StartWorkflowOptions, error) // Handler for starting a workflow with a different input than the operation. Mutually exclusive with Workflow // and GetOptions. Handler func(context.Context, I, nexus.StartOperationOptions) (WorkflowHandle[O], error) }
WorkflowRunOperationOptions are options for NewWorkflowRunOperationWithOptions.