natsutils

package
v0.12.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Nats Header to set, so that services can check, if set
	// and send the responses to the approriate subjects.
	ReplyToHeader = "claudie-internal-reply-to"

	// If the [ReplyToHeader] is not set, the reply can be interpreted
	// as [ReplyDiscard]
	ReplyDiscard = ""

	// The ID of the task/work that was picked up by the
	// worker from the received message header's key [nats.MsgIdHdr].
	WorkID = "claudie-internal-work-id"

	// The name of the input manifest that the task is scheduled for.
	InputManifestName = "claudie-internal-input-manifest-name"

	// The name of the kubernetes cluster that the task is scheduled for.
	//
	// Note that this value is set even in the case if just loadbalancers
	// are being worked on, as LoadBalancers do not exist without a kubernetes
	// cluster, thus a kuberentes cluster name is used for the identification
	// of all of the data related to that cluster.
	ClusterName = "claudie-internal-cluster-name"
)

utility constants around replying for received NATs messages.

View Source
const (
	// Subject related to Ansibler service only request Messages.
	AnsiblerRequests = "claudie-internal-cluster-requests-ansibler"

	// Subject related to Ansibler service only response Messages.
	AnsiblerResponse = "claudie-internal-cluster-response-ansibler"

	// Subject related to Kuber service only request Messages.
	KuberRequests = "claudie-internal-cluster-requests-kuber"

	// Subject related to Kuber service only response Messages.
	KuberResponse = "claudie-internal-cluster-response-kuber"

	// Subject related to KubeEleven service only request Messages.
	KubeElevenRequests = "claudie-internal-cluster-request-kube-eleven"

	// Subject related to KubeEleven service only response Messages.
	KubeElevenResponse = "claudie-internal-cluster-response-kube-eleven"

	// Subject related to Terraformer service only request Messages.
	TerraformerRequests = "claudie-internal-cluster-request-terraformer"

	// Subject related to Terraformer service only response Messages.
	TerraformerResponse = "claudie-internal-cluster-response-terraformer"
)

A list of default claudie related NATS subjects.

Variables

Default subjects that are used if no are supplied in the Client.JetStreamWorkQueue func.

Functions

func ReplyTo

func ReplyTo(
	ctx context.Context,
	logger zerolog.Logger,
	js jetstream.JetStream,
	result ReplyMsg,
) error

func TryReplyErrorFTL

func TryReplyErrorFTL(
	logger zerolog.Logger,
	err error,
	reply ReplyMsg,
	js jetstream.JetStream,
	msg jetstream.Msg,
)

Utility function used to send back a message that signals an unrecoverable fatal error with no changes made to the infrastructure for the received task to. The passed in `msg` is acknowledged and the Noop Reply is send to the targeted `replyChannel` channel.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClientWithJetStream

func NewClientWithJetStream(url string, size int, opts ...nats.Option) (*Client, error)

func (*Client) Close

func (c *Client) Close()

func (*Client) Conn

func (c *Client) Conn() *nats.Conn

func (*Client) JSWorkQueueConsumer

func (c *Client) JSWorkQueueConsumer(
	ctx context.Context,
	durableName,
	stream string,
	ackWait time.Duration,
	subjects ...string,
) (jetstream.Consumer, error)

Creates a Work Queue jetstream consumer with sensible default values.

func (*Client) JetStream

func (c *Client) JetStream() jetstream.JetStream

func (*Client) JetStreamWorkQueue

func (c *Client) JetStreamWorkQueue(ctx context.Context, name string, subjects ...string) error

Creates a new jetstream.JetStream instance with sensible default values. If a jetstream with given name already exists and its configuration differs from the provided one, it will be updated.

type ReplyMsg

type ReplyMsg struct {
	// Name of the InputManifest for which the reply is targeted at.
	InputManifest string

	// Name of the cluster within the [ReplyMsg.InputManifest] for
	// which the reply is targeted at.
	Cluster string

	// TaskID is the ID from the picked up [nats.Msg], that was received
	// via the [nats.MsgIdHdr]. This is the actuall ID of the task that was
	// scheduled by the manager and this information is given back to the reply
	// channel in the header [WorkID]. This ID is also used to construct the ID
	// of the reply message, as each deplication tracking of messages is stream-wide
	// each stage needs to have its own ID, thus a simply concatenation of the
	// fmt.Sprintf("%v-%v", TaskID, Subject) is used.
	TaskID string

	// To which subject should the reply be send to.
	Subject string

	// Result of the processed task.
	Result *spec.TaskResult
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL