events

package
v1.22.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 8, 2025 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Triggers
	ActionIDContactEnrollmentTrigger = "contact-enrollment-trigger"
	ActionIDOrderEnrollmentTrigger   = "order-enrollment-trigger"
	ActionIDPointEnrollmentTrigger   = "point-enrollment-trigger"

	// Condition
	ActionIDQuery = "query-action"

	// Notifications
	ActionIDSlack = "slack-action"
	ActionIDEmail = "email-action"
	ActionIDSMS   = "sms-action"
	ActionIDLine  = "line-action"

	// Actions
	ActionIDWebhook = "webhook-action"
	ActionIDTag     = "tag-action"

	// Utility
	ActionIDWait = "wait-action"
	ActionIDLog  = "log-action"
)

ActionID constants - aligned with mca system These are the action_id values used in workflow_actions

View Source
const (
	RecipientTypeContacts = "contacts"
	RecipientTypeOrders   = "orders"
)

Variables

This section is empty.

Functions

func GetWorkflowActionsTopic added in v1.7.0

func GetWorkflowActionsTopic() string

GetWorkflowActionsTopic returns topic name for action executions (notifications, etc.) Topics: mca.workflow.actions.local, mca.workflow.actions.uat, mca.workflow.actions.prod

func GetWorkflowExecutionsTopic added in v1.7.0

func GetWorkflowExecutionsTopic() string

GetWorkflowExecutionsTopic returns topic name for workflow executions (started/completed/exited) Topics: mca.workflow.executions.local, mca.workflow.executions.uat, mca.workflow.executions.prod

func GetWorkflowNodesTopic added in v1.7.0

func GetWorkflowNodesTopic() string

GetWorkflowNodesTopic returns topic name for node executions (entered/exited) Topics: mca.workflow.nodes.local, mca.workflow.nodes.uat, mca.workflow.nodes.prod

Types

type ActionChannel added in v1.10.0

type ActionChannel string

ActionChannel represents the action/notification channel type

const (
	ActionChannelLINE  ActionChannel = "line"
	ActionChannelSlack ActionChannel = "slack"
	ActionChannelEmail ActionChannel = "email"
	ActionChannelSMS   ActionChannel = "sms"
)

type ActionExecutionPayload added in v1.10.0

type ActionExecutionPayload struct {
	TenantID         string                 `json:"tenant_id,omitempty"`
	WorkflowID       string                 `json:"workflow_id"`
	SessionID        string                 `json:"session_id"`
	RecipientID      string                 `json:"recipient_id"`
	NodeID           string                 `json:"node_id,omitempty"`   // NEW - Node ID from workflow definition
	NodeName         string                 `json:"node_name,omitempty"` // NEW - Node display name
	ActionKey        string                 `json:"action_key"`
	ActionID         string                 `json:"action_id,omitempty"`
	ActionType       ActionChannel          `json:"action_type"`            // line, slack, email, sms (was: channel)
	ActionLabel      string                 `json:"action_label,omitempty"` // For backward compatibility
	MessageContent   string                 `json:"message_content,omitempty"`
	HasTrackingLink  bool                   `json:"has_tracking_link"`
	TrackingLink     string                 `json:"tracking_link,omitempty"` // Generated tracking URL
	OriginalLink     string                 `json:"original_link,omitempty"` // Original destination URL
	DeliveryStatus   DeliveryStatus         `json:"delivery_status"`         // pending, sent, delivered, failed, bounced
	ExecutionStatus  string                 `json:"execution_status"`        // success, failed (was: status)
	ErrorMessage     string                 `json:"error_message,omitempty"`
	ProviderResponse map[string]interface{} `json:"provider_response,omitempty"`
	EventTime        time.Time              `json:"event_time"`
	Timestamp        time.Time              `json:"timestamp"`
}

ActionExecutionPayload is the message structure for action executions Kafka topic (mca.workflow.actions.{env}) Field names aligned with REPORT_DB.md

func (*ActionExecutionPayload) ToJSON added in v1.10.0

func (p *ActionExecutionPayload) ToJSON() ([]byte, error)

ToJSON converts payload to JSON bytes

type ActionPublisher added in v1.10.0

type ActionPublisher struct {
	// contains filtered or unexported fields
}

ActionPublisher publishes action execution events to Kafka (mca.workflow.actions.{env})

func NewActionPublisher added in v1.10.0

func NewActionPublisher(brokers []string) *ActionPublisher

NewActionPublisher creates a new Kafka publisher for action events (no auth)

func NewActionPublisherWithAuth added in v1.10.0

func NewActionPublisherWithAuth(config PublisherConfig) *ActionPublisher

NewActionPublisherWithAuth creates a new Kafka publisher with SASL authentication

func (*ActionPublisher) Close added in v1.10.0

func (p *ActionPublisher) Close() error

Close closes the Kafka writer

func (*ActionPublisher) Publish added in v1.10.0

func (p *ActionPublisher) Publish(ctx context.Context, payload *ActionExecutionPayload) error

Publish sends an action execution event to Kafka

func (*ActionPublisher) PublishDelivered added in v1.18.0

func (p *ActionPublisher) PublishDelivered(
	ctx context.Context,
	tenantID, sessionID, workflowID, recipientID string,
	nodeID, nodeName string,
	actionKey, actionID, actionLabel string,
	actionType ActionChannel,
	providerResponse map[string]interface{},
) error

PublishDelivered publishes an action_delivered event (delivery confirmed by provider webhook)

func (*ActionPublisher) PublishFailed added in v1.10.0

func (p *ActionPublisher) PublishFailed(
	ctx context.Context,
	tenantID, sessionID, workflowID, recipientID string,
	nodeID, nodeName string,
	actionKey, actionID, actionLabel string,
	actionType ActionChannel,
	errorMessage string,
) error

PublishFailed publishes a failed action execution

func (*ActionPublisher) PublishLinkOpened added in v1.20.0

func (p *ActionPublisher) PublishLinkOpened(
	ctx context.Context,
	tenantID, sessionID, workflowID, recipientID string,
	nodeID, nodeName string,
	actionKey, actionID, actionLabel string,
	actionType ActionChannel,
	trackingID, originalLink string,
) error

PublishLinkOpened publishes a link_opened event when a user clicks a tracking link

func (*ActionPublisher) PublishSent added in v1.18.0

func (p *ActionPublisher) PublishSent(
	ctx context.Context,
	tenantID, sessionID, workflowID, recipientID string,
	nodeID, nodeName string,
	actionKey, actionID, actionLabel string,
	actionType ActionChannel,
	messageContent string,
	hasTrackingLink bool,
	success bool,
	errorMessage string,
) error

PublishSent publishes an action_sent event (message sent to provider)

func (*ActionPublisher) PublishSuccess added in v1.10.0

func (p *ActionPublisher) PublishSuccess(
	ctx context.Context,
	tenantID, sessionID, workflowID, recipientID string,
	nodeID, nodeName string,
	actionKey, actionID, actionLabel string,
	actionType ActionChannel,
	messageContent string,
	hasTrackingLink bool,
) error

PublishSuccess publishes a successful action execution Deprecated: Use PublishSent for action_sent events and PublishDelivered for action_delivered events

type DeliveryStatus added in v1.5.0

type DeliveryStatus string

DeliveryStatus represents the delivery status

const (
	DeliveryStatusSent      DeliveryStatus = "sent"
	DeliveryStatusDelivered DeliveryStatus = "delivered"
	DeliveryStatusFailed    DeliveryStatus = "failed"
	DeliveryStatusBounced   DeliveryStatus = "bounced"
)

type EventType

type EventType string

EventType represents the type of workflow event Aligned with REPORT_DB.md: entered / exited pattern

const (
	// Workflow lifecycle events (for mca.workflow.executions topic)
	EventTypeStarted   EventType = "started"   // User enrolled/started workflow (trigger fired)
	EventTypeCompleted EventType = "completed" // Workflow instance finished successfully

	// Node/Action events (for mca.workflow.nodes topic)
	EventTypeEntered EventType = "entered" // User entered a node
	EventTypeExited  EventType = "exited"  // User exited a node (with status: success/failed)

)

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher publishes workflow events to Kafka It uses separate writers for different event types: - nodesWriter: for node entered/exited events -> mca.workflow.nodes.{env} - executionsWriter: for workflow started/completed/exited events -> mca.workflow.executions.{env}

func NewPublisher

func NewPublisher(brokers []string) *Publisher

NewPublisher creates a new Kafka publisher without authentication (for local/testing)

func NewPublisherWithAuth added in v1.4.2

func NewPublisherWithAuth(config PublisherConfig) *Publisher

NewPublisherWithAuth creates a new Kafka publisher with SASL authentication

func (*Publisher) Close

func (p *Publisher) Close() error

Close closes all Kafka writers

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, event *WorkflowEventPayload) error

Publish sends an event to the appropriate Kafka topic based on event type

func (*Publisher) PublishEntered

func (p *Publisher) PublishEntered(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID, taskID string,
	recipientID string, recipientType RecipientType,
	nodeID, nodeName string,
	actionKey, actionID string,
) error

PublishEntered publishes event when user enters a node

func (*Publisher) PublishExited

func (p *Publisher) PublishExited(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID, taskID string,
	recipientID string, recipientType RecipientType,
	nodeID, nodeName string,
	actionKey, actionID string,
	status string,
	errorMessage string,
	payload map[string]interface{},
) error

PublishExited publishes event when user exits a node (success or failed)

func (*Publisher) PublishExitedFailed

func (p *Publisher) PublishExitedFailed(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID, taskID string,
	recipientID string, recipientType RecipientType,
	nodeID, nodeName string,
	actionKey, actionID string,
	errorMessage string,
) error

PublishExitedFailed is a convenience method for failed exit

func (*Publisher) PublishExitedSuccess

func (p *Publisher) PublishExitedSuccess(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID, taskID string,
	recipientID string, recipientType RecipientType,
	nodeID, nodeName string,
	actionKey, actionID string,
	payload map[string]interface{},
) error

PublishExitedSuccess is a convenience method for successful exit

func (*Publisher) PublishWorkflowCompleted

func (p *Publisher) PublishWorkflowCompleted(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID string,
	recipientID string, recipientType RecipientType,
) error

PublishWorkflowCompleted publishes workflow completed event

func (*Publisher) PublishWorkflowExited added in v1.11.0

func (p *Publisher) PublishWorkflowExited(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID string,
	recipientID string, recipientType RecipientType,
	executionStatus string,
	errorMessage string,
) error

PublishWorkflowExited publishes workflow exited event (timeout/manual/failed)

func (*Publisher) PublishWorkflowStarted

func (p *Publisher) PublishWorkflowStarted(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID string,
	recipientID string, recipientType RecipientType,
	actionKey, actionID string,
	payload map[string]interface{},
) error

PublishWorkflowStarted publishes workflow started event (when user enters workflow at trigger node)

type PublisherConfig added in v1.4.2

type PublisherConfig struct {
	Brokers  []string
	Username string
	Password string
}

PublisherConfig holds configuration for the Kafka publisher

type RecipientType

type RecipientType string

RecipientType for tracking who triggered the workflow

type WorkflowEventPayload

type WorkflowEventPayload struct {
	TenantID        string                 `json:"tenant_id"`
	WorkflowID      string                 `json:"workflow_id"`
	WorkflowName    string                 `json:"workflow_name,omitempty"`
	SessionID       string                 `json:"session_id"`               // Workflow instance ID (execution_id in DB)
	TaskID          string                 `json:"task_id,omitempty"`        // Task ID within session
	RecipientID     string                 `json:"recipient_id,omitempty"`   // Contact/Order ID (user_id)
	RecipientType   RecipientType          `json:"recipient_type,omitempty"` // contacts / orders
	EventType       EventType              `json:"event_type"`
	ExecutionStatus string                 `json:"execution_status,omitempty"` // Detailed status (timeout/manual/failed) for workflow exited (for node_executions table)
	NodeID          string                 `json:"node_id,omitempty"`          // Node ID from workflow definition (for workflow_events table)
	NodeName        string                 `json:"node_name,omitempty"`        // Node display name (for workflow_events table)
	ActionKey       string                 `json:"action_key,omitempty"`       // Node key (a1, a2, etc.)
	ActionID        string                 `json:"action_id,omitempty"`        // Action type (query-action, slack-action, etc.)
	ActionLabel     string                 `json:"action_label,omitempty"`     // Node display name (for node_executions table)
	Status          string                 `json:"status,omitempty"`           // success / failed (for node exited events)
	Payload         map[string]interface{} `json:"payload,omitempty"`
	ErrorMessage    string                 `json:"error_message,omitempty"`
	EventTime       time.Time              `json:"event_time"`
	Timestamp       time.Time              `json:"timestamp"`
}

WorkflowEventPayload is the message structure for Kafka Field names aligned with mca-engine-sdk conventions and REPORT_DB.md

func (*WorkflowEventPayload) ToJSON

func (p *WorkflowEventPayload) ToJSON() ([]byte, error)

ToJSON converts payload to JSON bytes

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL