nexus

package
v1.32.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2026 License: MIT Imports: 43 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Currently supported token version.
	TokenVersion = 1
	// Header key for the callback token in StartOperation requests.
	CallbackTokenHeader = "Temporal-Callback-Token"
)
View Source
const (
	// FailureSourceHeaderName is the header used to indicate from where the Nexus failure originated.
	FailureSourceHeaderName = "Temporal-Nexus-Failure-Source"
	// FailureSourceWorker indicates the failure originated from outside the server (e.g. bad request or on the Nexus worker).
	FailureSourceWorker = "worker"
)
View Source
const SystemCallbackURL = "temporal://system"

SystemCallbackURL is the reserved callback URL used to route Nexus operation callbacks internally within Temporal. It must match the scheme/host used in validation and routing logic.

View Source
const SystemEndpoint = "__temporal_system"

SystemEndpoint is the reserved endpoint name for Temporal system operations. Operation requests for this endpoint are routed internally within the history service.

Variables

View Source
var ErrNexusDisabled = serviceerror.NewFailedPrecondition("nexus is disabled")
View Source
var FailureSourceContextKey = failureSourceContextKeyType{}
View Source
var HTTPTraceConfig = dynamicconfig.NewGlobalTypedSettingWithConverter(
	"system.nexusHTTPTraceConfig",
	convertHTTPClientTraceConfig,
	defaultHTTPClientTraceConfig,
	`Configuration options for controlling additional tracing for Nexus HTTP requests. Fields: Enabled, ForwardingEnabled, MinAttempt, MaxAttempt, Hooks. See HTTPClientTraceConfig comments for more detail.`,
)

HTTPTraceConfig is the dynamic config for controlling Nexus HTTP request tracing behavior.

View Source
var PathCompletionCallbackNoIdentifier = "/nexus/callback"

PathCompletionCallbackNoIdentifier is an HTTP route for completing a Nexus operation via callback. Unlike RouteCompletionCallback it does not require any identifying information to be provided in the URL path. Instead the handler for this path is expected to look up identifying information through via the callback token in the headers.

View Source
var PayloadSerializer nexus.Serializer = payloadSerializer{}
View Source
var RouteCompletionCallback = routing.NewBuilder[string]().
	Constant("namespaces").
	StringVariable("namespace", func(namespace *string) *string { return namespace }).
	Constant("nexus", "callback").
	Build()

RouteCompletionCallback is an HTTP route for completing a Nexus operation via callback.

View Source
var RouteDispatchNexusTaskByEndpoint = routing.NewBuilder[string]().
	Constant("nexus", "endpoints").
	StringVariable("endpoint", func(endpoint *string) *string { return endpoint }).
	Constant("services").
	Build()
View Source
var RouteDispatchNexusTaskByNamespaceAndTaskQueue = routing.NewBuilder[NamespaceAndTaskQueue]().
	Constant("namespaces").
	StringVariable("namespace", func(params *NamespaceAndTaskQueue) *string { return &params.Namespace }).
	Constant("task-queues").
	StringVariable("task_queue", func(params *NamespaceAndTaskQueue) *string { return &params.TaskQueue }).
	Constant("nexus-services").
	Build()

Functions

func AdaptAuthorizeError

func AdaptAuthorizeError(permissionDeniedError *serviceerror.PermissionDenied) error

func ConvertGRPCError

func ConvertGRPCError(err error, exposeDetails bool) error

ConvertGRPCError converts either a serviceerror or a gRPC status error into a Nexus HandlerError if possible. If exposeDetails is true, the error message from the given error is exposed in the converted HandlerError, otherwise, a default message with minimal information is attached to the returned error. Roughly taken from https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto and https://github.com/grpc-ecosystem/grpc-gateway/blob/a7cf811e6ffabeaddcfb4ff65602c12671ff326e/runtime/errors.go#L56.

func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) nexus.Link

ConvertLinkWorkflowEventToNexusLink converts a Link_WorkflowEvent type to Nexus Link.

NOTE: Experimental

func ConvertLinksFromProto

func ConvertLinksFromProto(links []*nexuspb.Link) []nexus.Link

ConvertLinksFromProto converts protobuf links to Nexus SDK links.

func ConvertLinksToProto

func ConvertLinksToProto(links []nexus.Link) []*nexuspb.Link

ConvertLinksToProto converts Nexus SDK links to protobuf links.

func ConvertNexusLinkToLinkWorkflowEvent

func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_WorkflowEvent, error)

ConvertNexusLinkToLinkWorkflowEvent converts a Nexus Link to Link_WorkflowEvent.

NOTE: Experimental

func FormatDuration

func FormatDuration(d time.Duration) string

FormatDuration converts a duration into a string representation in millisecond resolution. TODO: replace this with the version exported from the Nexus SDK

func NexusFailureToProtoFailure

func NexusFailureToProtoFailure(failure nexus.Failure) *nexuspb.Failure

NexusFailureToProtoFailure converts a Nexus SDK Failure to a proto Nexus Failure. Always returns a non-nil value.

func NexusFailureToTemporalFailure

func NexusFailureToTemporalFailure(f nexus.Failure) (*failurepb.Failure, error)

NexusFailureToTemporalFailure converts a Nexus Failure to an API proto Failure. If the failure metadata "type" field is set to the fullname of the temporal API Failure message, the failure is reconstructed using protojson.Unmarshal on the failure details field. Otherwise, the failure is reconstructed based on the known Nexus SDK failure types. Returns an error if the failure cannot be converted. nolint:revive // cognitive-complexity is high but justified to keep each case together

func ProtoFailureToNexusFailure

func ProtoFailureToNexusFailure(failure *nexuspb.Failure) nexus.Failure

ProtoFailureToNexusFailure converts a proto Nexus Failure to a Nexus SDK Failure.

func SetFailureSourceOnContext

func SetFailureSourceOnContext(ctx context.Context, response *http.Response)

func TemporalFailureToNexusFailure

func TemporalFailureToNexusFailure(failure *failurepb.Failure) (nexus.Failure, error)

TemporalFailureToNexusFailure converts an API proto Failure to a Nexus SDK Failure setting the metadata "type" field to the proto fullname of the temporal API Failure message or the standard Nexus SDK failure types. Returns an error if the failure cannot be converted. Mutates the failure temporarily, unsetting the Message and StackTrace fields to avoid duplicating the information in the serialized failure. Mutating was chosen over cloning for performance reasons since this function may be called frequently.

Types

type CallbackToken

type CallbackToken struct {
	// Token version - currently only [TokenVersion] is supported.
	Version int `json:"v"`
	// Encoded [tokenspb.NexusOperationCompletion].
	Data string `json:"d"`
}

CallbackToken contains an encoded NexusOperationCompletion message.

func DecodeCallbackToken

func DecodeCallbackToken(encoded string) (token *CallbackToken, err error)

DecodeCallbackToken unmarshals the given token applying minimal data verification.

type CallbackTokenGenerator

type CallbackTokenGenerator struct {
}

func NewCallbackTokenGenerator

func NewCallbackTokenGenerator() *CallbackTokenGenerator

func (*CallbackTokenGenerator) DecodeCompletion

DecodeCompletion decodes a callback token unwrapping the contained NexusOperationCompletion proto struct.

func (*CallbackTokenGenerator) Tokenize

type EndpointRegistry

type EndpointRegistry interface {
	// GetByName returns an endpoint entry for the endpoint name for a caller from the given namespace ID.
	// Note that the default implementation is global to the cluster and can ignore the namespace ID param.
	GetByName(ctx context.Context, namespaceID namespace.ID, endpointName string) (*persistencespb.NexusEndpointEntry, error)
	GetByID(ctx context.Context, endpointID string) (*persistencespb.NexusEndpointEntry, error)
	StartLifecycle()
	StopLifecycle()
}

type EndpointRegistryConfig

type EndpointRegistryConfig struct {
	// contains filtered or unexported fields
}

func NewEndpointRegistryConfig

func NewEndpointRegistryConfig(dc *dynamicconfig.Collection) *EndpointRegistryConfig

type EndpointRegistryImpl

type EndpointRegistryImpl struct {
	// contains filtered or unexported fields
}

EndpointRegistryImpl manages a cached view of Nexus endpoints. Endpoints are lazily-loaded into memory on the first read. Thereafter, endpoint data is kept up to date by background long polling on matching service ListNexusEndpoints.

func NewEndpointRegistry

func NewEndpointRegistry(
	config *EndpointRegistryConfig,
	matchingClient matchingservice.MatchingServiceClient,
	persistence p.NexusEndpointManager,
	logger log.Logger,
	metricsHandler metrics.Handler,
) *EndpointRegistryImpl

func (*EndpointRegistryImpl) GetByID

func (*EndpointRegistryImpl) GetByName

func (*EndpointRegistryImpl) StartLifecycle

func (r *EndpointRegistryImpl) StartLifecycle()

StartLifecycle starts this component. It should only be invoked by an fx lifecycle hook. Should not be called multiple times or concurrently with StopLifecycle()

func (*EndpointRegistryImpl) StopLifecycle

func (r *EndpointRegistryImpl) StopLifecycle()

StopLifecycle stops this component. It should only be invoked by an fx lifecycle hook. Should not be called multiple times or concurrently with StartLifecycle()

type HTTPClientTraceConfig

type HTTPClientTraceConfig struct {
	// Enabled controls whether any additional tracing will be invoked. Default false.
	Enabled bool
	// ForwardingEnabled controls whether any additional tracing will be invoked for forwarded requests. Default false. Forwarded requests do not have an attempt count, so MinAttempt and MaxAttempt are ignored for these requests.
	ForwardingEnabled bool
	// MinAttempt is the first operation attempt to include additional tracing. Default 2. Setting to 0 or 1 will add tracing to all requests and may be expensive.
	MinAttempt int32
	// MaxAttempt is the maximum operation attempt to include additional tracing. Default 2. Setting to 0 means no maximum.
	MaxAttempt int32
	// Hooks is the list of method names to invoke with extra tracing. See httptrace.ClientTrace for more detail.
	// Defaults to all implemented hooks: GetConn, GotConn, ConnectStart, ConnectDone, DNSStart, DNSDone, TLSHandshakeStart, TLSHandshakeDone, WroteRequest, GotFirstResponseByte.
	Hooks []string
}

type HTTPClientTraceProvider

type HTTPClientTraceProvider interface {
	// NewTrace returns a *httptrace.ClientTrace which provides hooks to invoke at each point in the HTTP request
	// lifecycle. This trace must be added to the HTTP request context using httptrace.WithClientTrace for the hooks to
	// be invoked. The provided logger should already be tagged with relevant request information
	// e.g. using log.With(logger, tag.RequestID(id), tag.Operation(op), ...).
	NewTrace(attempt int32, logger log.Logger) *httptrace.ClientTrace
	// NewForwardingTrace functions the same as NewTrace but forwarded requests do not have an associated attempt count,
	// so all forwarded requests will be traced if enabled.
	NewForwardingTrace(logger log.Logger) *httptrace.ClientTrace
}

func NewLoggedHTTPClientTraceProvider

func NewLoggedHTTPClientTraceProvider(dc *dynamicconfig.Collection) HTTPClientTraceProvider

type LoggedHTTPClientTraceProvider

type LoggedHTTPClientTraceProvider struct {
	Config dynamicconfig.TypedPropertyFn[HTTPClientTraceConfig]
}

func (*LoggedHTTPClientTraceProvider) NewForwardingTrace

func (p *LoggedHTTPClientTraceProvider) NewForwardingTrace(logger log.Logger) *httptrace.ClientTrace

func (*LoggedHTTPClientTraceProvider) NewTrace

func (p *LoggedHTTPClientTraceProvider) NewTrace(attempt int32, logger log.Logger) *httptrace.ClientTrace

type NamespaceAndTaskQueue

type NamespaceAndTaskQueue struct {
	Namespace string
	TaskQueue string
}

Directories

Path Synopsis
Package nexusrpc provides client and server implementations of the Nexus HTTP API [HTTP API]: https://github.com/nexus-rpc/api
Package nexusrpc provides client and server implementations of the Nexus HTTP API [HTTP API]: https://github.com/nexus-rpc/api

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL