events

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2025 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Triggers
	ActionIDContactEnrollmentTrigger = "contact-enrollment-trigger"
	ActionIDOrderEnrollmentTrigger   = "order-enrollment-trigger"
	ActionIDPointEnrollmentTrigger   = "point-enrollment-trigger"

	// Condition
	ActionIDQuery = "query-action"

	// Notifications
	ActionIDSlack = "slack-action"
	ActionIDEmail = "email-action"
	ActionIDSMS   = "sms-action"
	ActionIDLine  = "line-action"

	// Actions
	ActionIDWebhook = "webhook-action"
	ActionIDTag     = "tag-action"

	// Utility
	ActionIDWait = "wait-action"
	ActionIDLog  = "log-action"
)

ActionID constants - aligned with mca system These are the action_id values used in workflow_actions

View Source
const (
	RecipientTypeContacts = "contacts"
	RecipientTypeOrders   = "orders"
)

Variables

This section is empty.

Functions

func GetWorkflowEventsTopic

func GetWorkflowEventsTopic() string

GetWorkflowEventsTopic returns topic name based on environment Topics: mca.workflow.events.local, mca.workflow.events.uat, mca.workflow.events.prod

Types

type EventType

type EventType string

EventType represents the type of workflow event Aligned with REPORT_DB.md: entered / exited pattern

const (
	// Workflow lifecycle events
	EventTypeWorkflowStarted   EventType = "workflow_started"   // User enrolled/started workflow (trigger fired)
	EventTypeWorkflowCompleted EventType = "workflow_completed" // Workflow instance finished successfully
	EventTypeWorkflowFailed    EventType = "workflow_failed"    // Workflow instance terminated with error

	// Node/Action events (entered/exited pattern from REPORT_DB)
	EventTypeEntered EventType = "entered" // User entered a node
	EventTypeExited  EventType = "exited"  // User exited a node (with status: success/failed)
)

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher publishes workflow events to Kafka

func NewPublisher

func NewPublisher(brokers []string) *Publisher

NewPublisher creates a new Kafka publisher

func (*Publisher) Close

func (p *Publisher) Close() error

Close closes the Kafka writer

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, event *WorkflowEventPayload) error

Publish sends an event to Kafka

func (*Publisher) PublishEntered

func (p *Publisher) PublishEntered(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID, taskID string,
	recipientID string, recipientType RecipientType,
	actionKey, actionID string,
) error

PublishEntered publishes event when user enters a node

func (*Publisher) PublishExited

func (p *Publisher) PublishExited(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID, taskID string,
	recipientID string, recipientType RecipientType,
	actionKey, actionID string,
	status string,
	errorMessage string,
	payload map[string]interface{},
) error

PublishExited publishes event when user exits a node (success or failed)

func (*Publisher) PublishExitedFailed

func (p *Publisher) PublishExitedFailed(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID, taskID string,
	recipientID string, recipientType RecipientType,
	actionKey, actionID string,
	errorMessage string,
) error

PublishExitedFailed is a convenience method for failed exit

func (*Publisher) PublishExitedSuccess

func (p *Publisher) PublishExitedSuccess(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID, taskID string,
	recipientID string, recipientType RecipientType,
	actionKey, actionID string,
	payload map[string]interface{},
) error

PublishExitedSuccess is a convenience method for successful exit

func (*Publisher) PublishWorkflowCompleted

func (p *Publisher) PublishWorkflowCompleted(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID string,
	recipientID string, recipientType RecipientType,
) error

PublishWorkflowCompleted publishes workflow completed event

func (*Publisher) PublishWorkflowFailed

func (p *Publisher) PublishWorkflowFailed(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID string,
	recipientID string, recipientType RecipientType,
	errorMessage string,
) error

PublishWorkflowFailed publishes workflow failed event

func (*Publisher) PublishWorkflowStarted

func (p *Publisher) PublishWorkflowStarted(
	ctx context.Context,
	tenantID, workflowID, workflowName, sessionID string,
	recipientID string, recipientType RecipientType,
	actionKey, actionID string,
	payload map[string]interface{},
) error

PublishWorkflowStarted publishes workflow started event (when user enters workflow at trigger node)

type RecipientType

type RecipientType string

RecipientType for tracking who triggered the workflow

type WorkflowEventPayload

type WorkflowEventPayload struct {
	TenantID      string                 `json:"tenant_id"`
	WorkflowID    string                 `json:"workflow_id"`
	WorkflowName  string                 `json:"workflow_name,omitempty"`
	SessionID     string                 `json:"session_id"`               // Workflow instance ID (execution_id in DB)
	TaskID        string                 `json:"task_id,omitempty"`        // Task ID within session
	RecipientID   string                 `json:"recipient_id,omitempty"`   // Contact/Order ID (user_id)
	RecipientType RecipientType          `json:"recipient_type,omitempty"` // contacts / orders
	EventType     EventType              `json:"event_type"`
	ActionKey     string                 `json:"action_key,omitempty"` // Node key (a1, a2, etc.)
	ActionID      string                 `json:"action_id,omitempty"`  // Action type (query-action, slack-action, etc.)
	Status        string                 `json:"status,omitempty"`     // success / failed (for exited events)
	Payload       map[string]interface{} `json:"payload,omitempty"`
	ErrorMessage  string                 `json:"error_message,omitempty"`
	EventTime     time.Time              `json:"event_time"`
	Timestamp     time.Time              `json:"timestamp"`
}

WorkflowEventPayload is the message structure for Kafka Field names aligned with mca-engine-sdk conventions and REPORT_DB.md

func (*WorkflowEventPayload) ToJSON

func (p *WorkflowEventPayload) ToJSON() ([]byte, error)

ToJSON converts payload to JSON bytes

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL