events

package
v1.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2025 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Triggers
	ActionIDContactEnrollmentTrigger = "contact-enrollment-trigger"
	ActionIDOrderEnrollmentTrigger   = "order-enrollment-trigger"
	ActionIDPointEnrollmentTrigger   = "point-enrollment-trigger"

	// Condition
	ActionIDQuery = "query-action"

	// Notifications
	ActionIDSlack = "slack-action"
	ActionIDEmail = "email-action"
	ActionIDSMS   = "sms-action"
	ActionIDLine  = "line-action"

	// Actions
	ActionIDWebhook = "webhook-action"
	ActionIDTag     = "tag-action"

	// Utility
	ActionIDWait = "wait-action"
	ActionIDLog  = "log-action"
)

ActionID constants - aligned with mca system These are the action_id values used in workflow_actions

View Source
const (
	RecipientTypeContacts = "contacts"
	RecipientTypeOrders   = "orders"
)

Variables

This section is empty.

Functions

func GetWorkflowActionsTopic added in v1.7.0

func GetWorkflowActionsTopic() string

GetWorkflowActionsTopic returns topic name for action executions (notifications, etc.) Topics: mca.workflow.actions.local, mca.workflow.actions.uat, mca.workflow.actions.prod

func GetWorkflowEventsTopic deprecated

func GetWorkflowEventsTopic() string

Deprecated: Use GetWorkflowNodesTopic instead

func GetWorkflowExecutionsTopic added in v1.7.0

func GetWorkflowExecutionsTopic() string

GetWorkflowExecutionsTopic returns topic name for workflow executions (started/completed/exited) Topics: mca.workflow.executions.local, mca.workflow.executions.uat, mca.workflow.executions.prod

func GetWorkflowNodesTopic added in v1.7.0

func GetWorkflowNodesTopic() string

GetWorkflowNodesTopic returns topic name for node executions (entered/exited) Topics: mca.workflow.nodes.local, mca.workflow.nodes.uat, mca.workflow.nodes.prod

func GetWorkflowNotificationsTopic deprecated added in v1.5.0

func GetWorkflowNotificationsTopic() string

Deprecated: Use GetWorkflowActionsTopic instead

func GetWorkflowRecipientsTopic deprecated added in v1.5.0

func GetWorkflowRecipientsTopic() string

Deprecated: Use GetWorkflowExecutionsTopic instead

Types

type DeliveryStatus added in v1.5.0

type DeliveryStatus string

DeliveryStatus represents the notification delivery status

const (
	DeliveryStatusSent      DeliveryStatus = "sent"
	DeliveryStatusDelivered DeliveryStatus = "delivered"
	DeliveryStatusFailed    DeliveryStatus = "failed"
	DeliveryStatusBounced   DeliveryStatus = "bounced"
)

type EventType

type EventType string

EventType represents the type of workflow event Aligned with REPORT_DB.md: entered / exited pattern

const (
	// Workflow lifecycle events
	EventTypeWorkflowStarted   EventType = "workflow_started"   // User enrolled/started workflow (trigger fired)
	EventTypeWorkflowCompleted EventType = "workflow_completed" // Workflow instance finished successfully
	EventTypeWorkflowFailed    EventType = "workflow_failed"    // Workflow instance terminated with error

	// Node/Action events (entered/exited pattern from REPORT_DB)
	EventTypeEntered EventType = "entered" // User entered a node
	EventTypeExited  EventType = "exited"  // User exited a node (with status: success/failed)
)

type NotificationChannel added in v1.5.0

type NotificationChannel string

NotificationChannel represents the notification channel type

const (
	NotificationChannelLINE  NotificationChannel = "line"
	NotificationChannelSlack NotificationChannel = "slack"
	NotificationChannelEmail NotificationChannel = "email"
	NotificationChannelSMS   NotificationChannel = "sms"
)

type NotificationPublisher added in v1.5.0

type NotificationPublisher struct {
	// contains filtered or unexported fields
}

NotificationPublisher publishes notification execution events to Kafka

func NewNotificationPublisher added in v1.5.0

func NewNotificationPublisher(brokers []string) *NotificationPublisher

NewNotificationPublisher creates a new Kafka publisher for notification events (no auth)

func NewNotificationPublisherWithAuth added in v1.5.0

func NewNotificationPublisherWithAuth(config PublisherConfig) *NotificationPublisher

NewNotificationPublisherWithAuth creates a new Kafka publisher with SASL authentication

func (*NotificationPublisher) Close added in v1.5.0

func (p *NotificationPublisher) Close() error

Close closes the Kafka writer

func (*NotificationPublisher) Publish added in v1.5.0

Publish sends a notification event to Kafka

func (*NotificationPublisher) PublishFailed added in v1.5.0

func (p *NotificationPublisher) PublishFailed(
	ctx context.Context,
	tenantID, sessionID, workflowID, recipientID string,
	actionKey, actionID string,
	channel NotificationChannel,
	errorMessage string,
) error

PublishFailed publishes a failed notification execution

func (*NotificationPublisher) PublishSuccess added in v1.5.0

func (p *NotificationPublisher) PublishSuccess(
	ctx context.Context,
	tenantID, sessionID, workflowID, recipientID string,
	actionKey, actionID string,
	channel NotificationChannel,
	messageContent string,
	hasTrackingLink bool,
) error

PublishSuccess publishes a successful notification execution

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher publishes workflow events to Kafka It uses separate writers for different event types: - nodesWriter: for node entered/exited events -> mca.workflow.nodes.{env} - executionsWriter: for workflow started/completed/exited events -> mca.workflow.executions.{env}

func NewPublisher

func NewPublisher(brokers []string) *Publisher

NewPublisher creates a new Kafka publisher without authentication (for local/testing)

func NewPublisherWithAuth added in v1.4.2

func NewPublisherWithAuth(config PublisherConfig) *Publisher

NewPublisherWithAuth creates a new Kafka publisher with SASL authentication

func (*Publisher) Close

func (p *Publisher) Close() error

Close closes all Kafka writers

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, event *WorkflowEventPayload) error

Publish sends an event to the appropriate Kafka topic based on event type

func (*Publisher) PublishEntered

func (p *Publisher) PublishEntered(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID, taskID string,
	recipientID string, recipientType RecipientType,
	actionKey, actionID string,
) error

PublishEntered publishes event when user enters a node

func (*Publisher) PublishExited

func (p *Publisher) PublishExited(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID, taskID string,
	recipientID string, recipientType RecipientType,
	actionKey, actionID string,
	status string,
	errorMessage string,
	payload map[string]interface{},
) error

PublishExited publishes event when user exits a node (success or failed)

func (*Publisher) PublishExitedFailed

func (p *Publisher) PublishExitedFailed(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID, taskID string,
	recipientID string, recipientType RecipientType,
	actionKey, actionID string,
	errorMessage string,
) error

PublishExitedFailed is a convenience method for failed exit

func (*Publisher) PublishExitedSuccess

func (p *Publisher) PublishExitedSuccess(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID, taskID string,
	recipientID string, recipientType RecipientType,
	actionKey, actionID string,
	payload map[string]interface{},
) error

PublishExitedSuccess is a convenience method for successful exit

func (*Publisher) PublishWorkflowCompleted

func (p *Publisher) PublishWorkflowCompleted(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID string,
	recipientID string, recipientType RecipientType,
) error

PublishWorkflowCompleted publishes workflow completed event

func (*Publisher) PublishWorkflowFailed

func (p *Publisher) PublishWorkflowFailed(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID string,
	recipientID string, recipientType RecipientType,
	errorMessage string,
) error

PublishWorkflowFailed publishes workflow failed event

func (*Publisher) PublishWorkflowStarted

func (p *Publisher) PublishWorkflowStarted(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID string,
	recipientID string, recipientType RecipientType,
	actionKey, actionID string,
	payload map[string]interface{},
) error

PublishWorkflowStarted publishes workflow started event (when user enters workflow at trigger node)

type PublisherConfig added in v1.4.2

type PublisherConfig struct {
	Brokers  []string
	Username string
	Password string
}

PublisherConfig holds configuration for the Kafka publisher

type RecipientEventType added in v1.5.0

type RecipientEventType string

RecipientEventType represents the type of recipient event

const (
	RecipientEventTypeStarted   RecipientEventType = "started"   // Workflow started for recipient
	RecipientEventTypeCompleted RecipientEventType = "completed" // Workflow completed for recipient
	RecipientEventTypeExited    RecipientEventType = "exited"    // Workflow exited early for recipient
)

type RecipientPublisher added in v1.5.0

type RecipientPublisher struct {
	// contains filtered or unexported fields
}

RecipientPublisher publishes workflow recipient events to Kafka

func NewRecipientPublisher added in v1.5.0

func NewRecipientPublisher(brokers []string) *RecipientPublisher

NewRecipientPublisher creates a new Kafka publisher for recipient events (no auth)

func NewRecipientPublisherWithAuth added in v1.5.0

func NewRecipientPublisherWithAuth(config PublisherConfig) *RecipientPublisher

NewRecipientPublisherWithAuth creates a new Kafka publisher with SASL authentication

func (*RecipientPublisher) Close added in v1.5.0

func (p *RecipientPublisher) Close() error

Close closes the Kafka writer

func (*RecipientPublisher) Publish added in v1.5.0

Publish sends a recipient event to Kafka

func (*RecipientPublisher) PublishCompleted added in v1.5.0

func (p *RecipientPublisher) PublishCompleted(
	ctx context.Context,
	tenantID, workflowID string,
	recipientID, sessionID string,
) error

PublishCompleted publishes workflow completed event for a recipient

func (*RecipientPublisher) PublishExited added in v1.5.0

func (p *RecipientPublisher) PublishExited(
	ctx context.Context,
	tenantID, workflowID string,
	recipientID, sessionID string,
	exitReason string,
) error

PublishExited publishes workflow exited event for a recipient

func (*RecipientPublisher) PublishStarted added in v1.5.0

func (p *RecipientPublisher) PublishStarted(
	ctx context.Context,
	tenantID, workflowID, workflowName string,
	recipientID, recipientType, sessionID string,
) error

PublishStarted publishes workflow started event for a recipient

type RecipientType

type RecipientType string

RecipientType for tracking who triggered the workflow

type WorkflowEventPayload

type WorkflowEventPayload struct {
	TenantID      string                 `json:"tenant_id"`
	WorkflowID    string                 `json:"workflow_id"`
	WorkflowName  string                 `json:"workflow_name,omitempty"`
	SessionID     string                 `json:"session_id"`               // Workflow instance ID (execution_id in DB)
	TaskID        string                 `json:"task_id,omitempty"`        // Task ID within session
	RecipientID   string                 `json:"recipient_id,omitempty"`   // Contact/Order ID (user_id)
	RecipientType RecipientType          `json:"recipient_type,omitempty"` // contacts / orders
	EventType     EventType              `json:"event_type"`
	ActionKey     string                 `json:"action_key,omitempty"` // Node key (a1, a2, etc.)
	ActionID      string                 `json:"action_id,omitempty"`  // Action type (query-action, slack-action, etc.)
	Status        string                 `json:"status,omitempty"`     // success / failed (for exited events)
	Payload       map[string]interface{} `json:"payload,omitempty"`
	ErrorMessage  string                 `json:"error_message,omitempty"`
	EventTime     time.Time              `json:"event_time"`
	Timestamp     time.Time              `json:"timestamp"`
}

WorkflowEventPayload is the message structure for Kafka Field names aligned with mca-engine-sdk conventions and REPORT_DB.md

func (*WorkflowEventPayload) ToJSON

func (p *WorkflowEventPayload) ToJSON() ([]byte, error)

ToJSON converts payload to JSON bytes

type WorkflowNotificationPayload added in v1.5.0

type WorkflowNotificationPayload struct {
	TenantID         string                 `json:"tenant_id,omitempty"` // Tenant ID for multi-tenancy
	SessionID        string                 `json:"session_id"`          // Session/execution ID
	WorkflowID       string                 `json:"workflow_id"`         // For lookups
	RecipientID      string                 `json:"recipient_id"`        // For linking
	ActionKey        string                 `json:"action_key"`          // Node key (a1, a2, etc.)
	ActionID         string                 `json:"action_id"`           // line-action, slack-action, etc.
	ActionLabel      string                 `json:"action_label,omitempty"`
	Channel          NotificationChannel    `json:"channel"` // line, slack, email, sms
	MessageContent   string                 `json:"message_content,omitempty"`
	HasTrackingLink  bool                   `json:"has_tracking_link"`
	DeliveryStatus   DeliveryStatus         `json:"delivery_status"` // sent, delivered, failed, bounced
	Status           string                 `json:"status"`          // success, failed (execution status)
	ErrorMessage     string                 `json:"error_message,omitempty"`
	ProviderResponse map[string]interface{} `json:"provider_response,omitempty"`
	EventTime        time.Time              `json:"event_time"`
	Timestamp        time.Time              `json:"timestamp"`
}

WorkflowNotificationPayload is the message structure for notification executions Kafka topic

func (*WorkflowNotificationPayload) ToJSON added in v1.5.0

func (p *WorkflowNotificationPayload) ToJSON() ([]byte, error)

ToJSON converts payload to JSON bytes

type WorkflowRecipientPayload added in v1.5.0

type WorkflowRecipientPayload struct {
	TenantID      string             `json:"tenant_id"`
	WorkflowID    string             `json:"workflow_id"`
	WorkflowName  string             `json:"workflow_name,omitempty"`
	RecipientID   string             `json:"recipient_id"`
	RecipientType string             `json:"recipient_type"` // "contacts", "orders", "points"
	SessionID     string             `json:"session_id"`     // Session/execution ID
	EventType     RecipientEventType `json:"event_type"`     // started, completed, exited
	ExitReason    string             `json:"exit_reason,omitempty"`
	EventTime     time.Time          `json:"event_time"`
	Timestamp     time.Time          `json:"timestamp"`
}

WorkflowRecipientPayload is the message structure for workflow recipients Kafka topic

func (*WorkflowRecipientPayload) ToJSON added in v1.5.0

func (p *WorkflowRecipientPayload) ToJSON() ([]byte, error)

ToJSON converts payload to JSON bytes

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL