Documentation
¶
Index ¶
- Constants
- Variables
- func ReplyTo(ctx context.Context, logger zerolog.Logger, js jetstream.JetStream, ...) error
- func TryReplyErrorFTL(logger zerolog.Logger, err error, reply ReplyMsg, js jetstream.JetStream, ...)
- type Client
- func (c *Client) Close()
- func (c *Client) Conn() *nats.Conn
- func (c *Client) JSWorkQueueConsumer(ctx context.Context, durableName, stream string, ackWait time.Duration, ...) (jetstream.Consumer, error)
- func (c *Client) JetStream() jetstream.JetStream
- func (c *Client) JetStreamWorkQueue(ctx context.Context, name string, subjects ...string) error
- type ReplyMsg
Constants ¶
const ( // Nats Header to set, so that services can check, if set // and send the responses to the approriate subjects. ReplyToHeader = "claudie-internal-reply-to" // If the [ReplyToHeader] is not set, the reply can be interpreted // as [ReplyDiscard] ReplyDiscard = "" // The ID of the task/work that was picked up by the // worker from the received message header's key [nats.MsgIdHdr]. WorkID = "claudie-internal-work-id" // The name of the input manifest that the task is scheduled for. InputManifestName = "claudie-internal-input-manifest-name" // The name of the kubernetes cluster that the task is scheduled for. // // Note that this value is set even in the case if just loadbalancers // are being worked on, as LoadBalancers do not exist without a kubernetes // cluster, thus a kuberentes cluster name is used for the identification // of all of the data related to that cluster. ClusterName = "claudie-internal-cluster-name" )
utility constants around replying for received NATs messages.
const ( // Subject related to Ansibler service only request Messages. AnsiblerRequests = "claudie-internal-cluster-requests-ansibler" // Subject related to Ansibler service only response Messages. AnsiblerResponse = "claudie-internal-cluster-response-ansibler" // Subject related to Kuber service only request Messages. KuberRequests = "claudie-internal-cluster-requests-kuber" // Subject related to Kuber service only response Messages. KuberResponse = "claudie-internal-cluster-response-kuber" // Subject related to KubeEleven service only request Messages. KubeElevenRequests = "claudie-internal-cluster-request-kube-eleven" // Subject related to KubeEleven service only response Messages. KubeElevenResponse = "claudie-internal-cluster-response-kube-eleven" // Subject related to Terraformer service only request Messages. TerraformerRequests = "claudie-internal-cluster-request-terraformer" // Subject related to Terraformer service only response Messages. TerraformerResponse = "claudie-internal-cluster-response-terraformer" )
A list of default claudie related NATS subjects.
Variables ¶
var DefaultSubjects = [...]string{ AnsiblerRequests, AnsiblerResponse, KuberRequests, KuberResponse, KubeElevenRequests, KubeElevenResponse, TerraformerRequests, TerraformerResponse, }
Default subjects that are used if no are supplied in the Client.JetStreamWorkQueue func.
Functions ¶
func TryReplyErrorFTL ¶
func TryReplyErrorFTL( logger zerolog.Logger, err error, reply ReplyMsg, js jetstream.JetStream, msg jetstream.Msg, )
Utility function used to send back a message that signals an unrecoverable fatal error with no changes made to the infrastructure for the received task to. The passed in `msg` is acknowledged and the Noop Reply is send to the targeted `replyChannel` channel.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func NewClientWithJetStream ¶
func (*Client) JSWorkQueueConsumer ¶
func (c *Client) JSWorkQueueConsumer( ctx context.Context, durableName, stream string, ackWait time.Duration, subjects ...string, ) (jetstream.Consumer, error)
Creates a Work Queue jetstream consumer with sensible default values.
func (*Client) JetStreamWorkQueue ¶
Creates a new jetstream.JetStream instance with sensible default values. If a jetstream with given name already exists and its configuration differs from the provided one, it will be updated.
type ReplyMsg ¶
type ReplyMsg struct {
// Name of the InputManifest for which the reply is targeted at.
InputManifest string
// Name of the cluster within the [ReplyMsg.InputManifest] for
// which the reply is targeted at.
Cluster string
// TaskID is the ID from the picked up [nats.Msg], that was received
// via the [nats.MsgIdHdr]. This is the actuall ID of the task that was
// scheduled by the manager and this information is given back to the reply
// channel in the header [WorkID]. This ID is also used to construct the ID
// of the reply message, as each deplication tracking of messages is stream-wide
// each stage needs to have its own ID, thus a simply concatenation of the
// fmt.Sprintf("%v-%v", TaskID, Subject) is used.
TaskID string
// To which subject should the reply be send to.
Subject string
// Result of the processed task.
Result *spec.TaskResult
}