Documentation
¶
Index ¶
- Constants
- func GetWorkflowActionsTopic() string
- func GetWorkflowExecutionsTopic() string
- func GetWorkflowNodesTopic() string
- type ActionChannel
- type ActionExecutionPayload
- type ActionPublisher
- func (p *ActionPublisher) Close() error
- func (p *ActionPublisher) Publish(ctx context.Context, payload *ActionExecutionPayload) error
- func (p *ActionPublisher) PublishDelivered(ctx context.Context, tenantID, sessionID, workflowID, recipientID string, ...) error
- func (p *ActionPublisher) PublishFailed(ctx context.Context, tenantID, sessionID, workflowID, recipientID string, ...) error
- func (p *ActionPublisher) PublishLinkOpened(ctx context.Context, tenantID, sessionID, workflowID, recipientID string, ...) error
- func (p *ActionPublisher) PublishSent(ctx context.Context, tenantID, sessionID, workflowID, recipientID string, ...) error
- func (p *ActionPublisher) PublishSuccess(ctx context.Context, tenantID, sessionID, workflowID, recipientID string, ...) error
- type DeliveryStatus
- type EventType
- type Publisher
- func (p *Publisher) Close() error
- func (p *Publisher) Publish(ctx context.Context, event *WorkflowEventPayload) error
- func (p *Publisher) PublishEntered(ctx context.Context, ...) error
- func (p *Publisher) PublishExited(ctx context.Context, ...) error
- func (p *Publisher) PublishExitedFailed(ctx context.Context, ...) error
- func (p *Publisher) PublishExitedSuccess(ctx context.Context, ...) error
- func (p *Publisher) PublishWorkflowCompleted(ctx context.Context, tenantID, workflowID, workflowName, sessionID string, ...) error
- func (p *Publisher) PublishWorkflowExited(ctx context.Context, tenantID, workflowID, workflowName, sessionID string, ...) error
- func (p *Publisher) PublishWorkflowStarted(ctx context.Context, tenantID, workflowID, workflowName, sessionID string, ...) error
- type PublisherConfig
- type RecipientType
- type WorkflowEventPayload
Constants ¶
const ( // Triggers ActionIDContactEnrollmentTrigger = "contact-enrollment-trigger" ActionIDOrderEnrollmentTrigger = "order-enrollment-trigger" ActionIDPointEnrollmentTrigger = "point-enrollment-trigger" // Condition ActionIDQuery = "query-action" // Notifications ActionIDSlack = "slack-action" ActionIDEmail = "email-action" ActionIDSMS = "sms-action" ActionIDLine = "line-action" // Actions ActionIDWebhook = "webhook-action" ActionIDTag = "tag-action" // Utility ActionIDWait = "wait-action" ActionIDLog = "log-action" )
ActionID constants - aligned with mca system These are the action_id values used in workflow_actions
const ( RecipientTypeContacts = "contacts" RecipientTypeOrders = "orders" )
Variables ¶
This section is empty.
Functions ¶
func GetWorkflowActionsTopic ¶ added in v1.7.0
func GetWorkflowActionsTopic() string
GetWorkflowActionsTopic returns topic name for action executions (notifications, etc.) Topics: mca.workflow.actions.local, mca.workflow.actions.uat, mca.workflow.actions.prod
func GetWorkflowExecutionsTopic ¶ added in v1.7.0
func GetWorkflowExecutionsTopic() string
GetWorkflowExecutionsTopic returns topic name for workflow executions (started/completed/exited) Topics: mca.workflow.executions.local, mca.workflow.executions.uat, mca.workflow.executions.prod
func GetWorkflowNodesTopic ¶ added in v1.7.0
func GetWorkflowNodesTopic() string
GetWorkflowNodesTopic returns topic name for node executions (entered/exited) Topics: mca.workflow.nodes.local, mca.workflow.nodes.uat, mca.workflow.nodes.prod
Types ¶
type ActionChannel ¶ added in v1.10.0
type ActionChannel string
ActionChannel represents the action/notification channel type
const ( ActionChannelLINE ActionChannel = "line" ActionChannelSlack ActionChannel = "slack" ActionChannelEmail ActionChannel = "email" ActionChannelSMS ActionChannel = "sms" )
type ActionExecutionPayload ¶ added in v1.10.0
type ActionExecutionPayload struct {
TenantID string `json:"tenant_id,omitempty"`
WorkflowID string `json:"workflow_id"`
SessionID string `json:"session_id"`
RecipientID string `json:"recipient_id"`
NodeID string `json:"node_id,omitempty"` // NEW - Node ID from workflow definition
NodeName string `json:"node_name,omitempty"` // NEW - Node display name
ActionKey string `json:"action_key"`
ActionID string `json:"action_id,omitempty"`
ActionType ActionChannel `json:"action_type"` // line, slack, email, sms (was: channel)
ActionLabel string `json:"action_label,omitempty"` // For backward compatibility
MessageContent string `json:"message_content,omitempty"`
HasTrackingLink bool `json:"has_tracking_link"`
TrackingLink string `json:"tracking_link,omitempty"` // Generated tracking URL
OriginalLink string `json:"original_link,omitempty"` // Original destination URL
DeliveryStatus DeliveryStatus `json:"delivery_status"` // pending, sent, delivered, failed, bounced
ExecutionStatus string `json:"execution_status"` // success, failed (was: status)
ErrorMessage string `json:"error_message,omitempty"`
ProviderResponse map[string]interface{} `json:"provider_response,omitempty"`
EventTime time.Time `json:"event_time"`
Timestamp time.Time `json:"timestamp"`
}
ActionExecutionPayload is the message structure for action executions Kafka topic (mca.workflow.actions.{env}) Field names aligned with REPORT_DB.md
func (*ActionExecutionPayload) ToJSON ¶ added in v1.10.0
func (p *ActionExecutionPayload) ToJSON() ([]byte, error)
ToJSON converts payload to JSON bytes
type ActionPublisher ¶ added in v1.10.0
type ActionPublisher struct {
// contains filtered or unexported fields
}
ActionPublisher publishes action execution events to Kafka (mca.workflow.actions.{env})
func NewActionPublisher ¶ added in v1.10.0
func NewActionPublisher(brokers []string) *ActionPublisher
NewActionPublisher creates a new Kafka publisher for action events (no auth)
func NewActionPublisherWithAuth ¶ added in v1.10.0
func NewActionPublisherWithAuth(config PublisherConfig) *ActionPublisher
NewActionPublisherWithAuth creates a new Kafka publisher with SASL authentication
func (*ActionPublisher) Close ¶ added in v1.10.0
func (p *ActionPublisher) Close() error
Close closes the Kafka writer
func (*ActionPublisher) Publish ¶ added in v1.10.0
func (p *ActionPublisher) Publish(ctx context.Context, payload *ActionExecutionPayload) error
Publish sends an action execution event to Kafka
func (*ActionPublisher) PublishDelivered ¶ added in v1.18.0
func (p *ActionPublisher) PublishDelivered( ctx context.Context, tenantID, sessionID, workflowID, recipientID string, nodeID, nodeName string, actionKey, actionID, actionLabel string, actionType ActionChannel, providerResponse map[string]interface{}, ) error
PublishDelivered publishes an action_delivered event (delivery confirmed by provider webhook)
func (*ActionPublisher) PublishFailed ¶ added in v1.10.0
func (p *ActionPublisher) PublishFailed( ctx context.Context, tenantID, sessionID, workflowID, recipientID string, nodeID, nodeName string, actionKey, actionID, actionLabel string, actionType ActionChannel, errorMessage string, ) error
PublishFailed publishes a failed action execution
func (*ActionPublisher) PublishLinkOpened ¶ added in v1.20.0
func (p *ActionPublisher) PublishLinkOpened( ctx context.Context, tenantID, sessionID, workflowID, recipientID string, nodeID, nodeName string, actionKey, actionID, actionLabel string, actionType ActionChannel, trackingID, originalLink string, ) error
PublishLinkOpened publishes a link_opened event when a user clicks a tracking link
func (*ActionPublisher) PublishSent ¶ added in v1.18.0
func (p *ActionPublisher) PublishSent( ctx context.Context, tenantID, sessionID, workflowID, recipientID string, nodeID, nodeName string, actionKey, actionID, actionLabel string, actionType ActionChannel, messageContent string, hasTrackingLink bool, success bool, errorMessage string, ) error
PublishSent publishes an action_sent event (message sent to provider)
func (*ActionPublisher) PublishSuccess ¶ added in v1.10.0
func (p *ActionPublisher) PublishSuccess( ctx context.Context, tenantID, sessionID, workflowID, recipientID string, nodeID, nodeName string, actionKey, actionID, actionLabel string, actionType ActionChannel, messageContent string, hasTrackingLink bool, ) error
PublishSuccess publishes a successful action execution Deprecated: Use PublishSent for action_sent events and PublishDelivered for action_delivered events
type DeliveryStatus ¶ added in v1.5.0
type DeliveryStatus string
DeliveryStatus represents the delivery status
const ( DeliveryStatusSent DeliveryStatus = "sent" DeliveryStatusDelivered DeliveryStatus = "delivered" DeliveryStatusFailed DeliveryStatus = "failed" DeliveryStatusBounced DeliveryStatus = "bounced" )
type EventType ¶
type EventType string
EventType represents the type of workflow event Aligned with REPORT_DB.md: entered / exited pattern
const ( // Workflow lifecycle events (for mca.workflow.executions topic) EventTypeStarted EventType = "started" // User enrolled/started workflow (trigger fired) EventTypeCompleted EventType = "completed" // Workflow instance finished successfully // Node/Action events (for mca.workflow.nodes topic) EventTypeEntered EventType = "entered" // User entered a node EventTypeExited EventType = "exited" // User exited a node (with status: success/failed) )
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher publishes workflow events to Kafka It uses separate writers for different event types: - nodesWriter: for node entered/exited events -> mca.workflow.nodes.{env} - executionsWriter: for workflow started/completed/exited events -> mca.workflow.executions.{env}
func NewPublisher ¶
NewPublisher creates a new Kafka publisher without authentication (for local/testing)
func NewPublisherWithAuth ¶ added in v1.4.2
func NewPublisherWithAuth(config PublisherConfig) *Publisher
NewPublisherWithAuth creates a new Kafka publisher with SASL authentication
func (*Publisher) Publish ¶
func (p *Publisher) Publish(ctx context.Context, event *WorkflowEventPayload) error
Publish sends an event to the appropriate Kafka topic based on event type
func (*Publisher) PublishEntered ¶
func (p *Publisher) PublishEntered( ctx context.Context, tenantID, workflowID, workflowName, sessionID, taskID string, recipientID string, recipientType RecipientType, nodeID, nodeName string, actionKey, actionID string, ) error
PublishEntered publishes event when user enters a node
func (*Publisher) PublishExited ¶
func (p *Publisher) PublishExited( ctx context.Context, tenantID, workflowID, workflowName, sessionID, taskID string, recipientID string, recipientType RecipientType, nodeID, nodeName string, actionKey, actionID string, status string, errorMessage string, payload map[string]interface{}, ) error
PublishExited publishes event when user exits a node (success or failed)
func (*Publisher) PublishExitedFailed ¶
func (p *Publisher) PublishExitedFailed( ctx context.Context, tenantID, workflowID, workflowName, sessionID, taskID string, recipientID string, recipientType RecipientType, nodeID, nodeName string, actionKey, actionID string, errorMessage string, ) error
PublishExitedFailed is a convenience method for failed exit
func (*Publisher) PublishExitedSuccess ¶
func (p *Publisher) PublishExitedSuccess( ctx context.Context, tenantID, workflowID, workflowName, sessionID, taskID string, recipientID string, recipientType RecipientType, nodeID, nodeName string, actionKey, actionID string, payload map[string]interface{}, ) error
PublishExitedSuccess is a convenience method for successful exit
func (*Publisher) PublishWorkflowCompleted ¶
func (p *Publisher) PublishWorkflowCompleted( ctx context.Context, tenantID, workflowID, workflowName, sessionID string, recipientID string, recipientType RecipientType, ) error
PublishWorkflowCompleted publishes workflow completed event
func (*Publisher) PublishWorkflowExited ¶ added in v1.11.0
func (p *Publisher) PublishWorkflowExited( ctx context.Context, tenantID, workflowID, workflowName, sessionID string, recipientID string, recipientType RecipientType, executionStatus string, errorMessage string, ) error
PublishWorkflowExited publishes workflow exited event (timeout/manual/failed)
func (*Publisher) PublishWorkflowStarted ¶
func (p *Publisher) PublishWorkflowStarted( ctx context.Context, tenantID, workflowID, workflowName, sessionID string, recipientID string, recipientType RecipientType, actionKey, actionID string, payload map[string]interface{}, ) error
PublishWorkflowStarted publishes workflow started event (when user enters workflow at trigger node)
type PublisherConfig ¶ added in v1.4.2
PublisherConfig holds configuration for the Kafka publisher
type RecipientType ¶
type RecipientType string
RecipientType for tracking who triggered the workflow
type WorkflowEventPayload ¶
type WorkflowEventPayload struct {
TenantID string `json:"tenant_id"`
WorkflowID string `json:"workflow_id"`
WorkflowName string `json:"workflow_name,omitempty"`
SessionID string `json:"session_id"` // Workflow instance ID (execution_id in DB)
TaskID string `json:"task_id,omitempty"` // Task ID within session
RecipientID string `json:"recipient_id,omitempty"` // Contact/Order ID (user_id)
RecipientType RecipientType `json:"recipient_type,omitempty"` // contacts / orders
EventType EventType `json:"event_type"`
ExecutionStatus string `json:"execution_status,omitempty"` // Detailed status (timeout/manual/failed) for workflow exited (for node_executions table)
NodeID string `json:"node_id,omitempty"` // Node ID from workflow definition (for workflow_events table)
NodeName string `json:"node_name,omitempty"` // Node display name (for workflow_events table)
ActionKey string `json:"action_key,omitempty"` // Node key (a1, a2, etc.)
ActionID string `json:"action_id,omitempty"` // Action type (query-action, slack-action, etc.)
ActionLabel string `json:"action_label,omitempty"` // Node display name (for node_executions table)
Status string `json:"status,omitempty"` // success / failed (for node exited events)
Payload map[string]interface{} `json:"payload,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
EventTime time.Time `json:"event_time"`
Timestamp time.Time `json:"timestamp"`
}
WorkflowEventPayload is the message structure for Kafka Field names aligned with mca-engine-sdk conventions and REPORT_DB.md
func (*WorkflowEventPayload) ToJSON ¶
func (p *WorkflowEventPayload) ToJSON() ([]byte, error)
ToJSON converts payload to JSON bytes