pubsub

package
v0.14.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package pubsub wraps Google Cloud Pub/Sub for AILANG messaging. Pub/Sub serves as a notification/transport layer on top of Firestore storage. Messages are stored durably in Firestore; Pub/Sub provides instant push notification that new work is available (replacing SQLite polling).

Index

Constants

View Source
const (
	TopicMessages    = "messages"    // New message notifications (attribute-routed by inbox)
	TopicTasks       = "tasks"       // Task dispatch to Cloud Run Jobs
	TopicCompletions = "completions" // Task completion notifications
	TopicEvents      = "events"      // Real-time dashboard/laptop event streaming
	TopicDeadLetter  = "dead-letter" // Failed message sink
)

Topic base names. The full topic name is "{prefix}-{base}".

View Source
const (
	SubMessagesCoordinator    = "messages-coordinator"    // Cloud Run coordinator
	SubMessagesLaptop         = "messages-laptop"         // Developer laptop (pull)
	SubTasksExecutor          = "tasks-executor"          // Eventarc → Cloud Run Job
	SubCompletionsCoordinator = "completions-coordinator" // Coordinator receives results
	SubEventsDashboard        = "events-dashboard"        // Dashboard server
	SubEventsLaptop           = "events-laptop"           // Laptop real-time updates
)

Subscription base names. The full name is "{prefix}-{base}".

View Source
const DefaultTopicPrefix = "ailang"

DefaultTopicPrefix is the default prefix for all AILANG Pub/Sub topics.

Variables

This section is empty.

Functions

func DecodePushEnvelope

func DecodePushEnvelope(body io.Reader) (data []byte, attrs map[string]string, messageID string, err error)

DecodePushEnvelope reads a Pub/Sub push HTTP body, unmarshals the envelope, and base64-decodes the Data field. Returns the decoded data bytes, attributes map, and message ID.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client wraps the Google Cloud Pub/Sub client with AILANG topic naming conventions.

func NewClient

func NewClient(ctx context.Context, projectID, prefix string) (*Client, error)

NewClient creates a new Pub/Sub client using Application Default Credentials.

func (*Client) Close

func (c *Client) Close() error

Close closes the underlying Pub/Sub client.

func (*Client) Prefix

func (c *Client) Prefix() string

Prefix returns the topic name prefix.

func (*Client) ProjectID

func (c *Client) ProjectID() string

ProjectID returns the GCP project ID.

func (*Client) Subscription

func (c *Client) Subscription(baseName string) *pubsub.Subscription

Subscription returns a handle to an existing subscription.

func (*Client) SubscriptionName

func (c *Client) SubscriptionName(baseName string) string

SubscriptionName returns the full subscription name with prefix.

func (*Client) Topic

func (c *Client) Topic(baseName string) *pubsub.Topic

Topic returns a handle to an existing topic. Does NOT create the topic — topics are managed by Terraform in ailang-multivac.

func (*Client) TopicName

func (c *Client) TopicName(baseName string) string

TopicName returns the full topic name with prefix (e.g., "ailang-messages").

type MessageAttributes

type MessageAttributes struct {
	Inbox       string // Target agent inbox (e.g., "design-doc-creator")
	Workspace   string // Project workspace (e.g., "sunholo-data/ailang")
	FromAgent   string // Sender agent ID (e.g., "user", "sprint-planner")
	Category    string // Message category (e.g., "bug", "feature", "general")
	MessageType string // Message type (e.g., "request", "notification")
}

MessageAttributes carries routing metadata as Pub/Sub message attributes. Used for subscription filtering (e.g., filter by inbox or workspace).

func AttributesFromMap

func AttributesFromMap(m map[string]string) MessageAttributes

AttributesFromMap creates MessageAttributes from a Pub/Sub message attributes map.

func (MessageAttributes) ToMap

func (a MessageAttributes) ToMap() map[string]string

ToMap converts attributes to map[string]string for Pub/Sub message publishing. Only non-empty values are included.

type MessageHandler

type MessageHandler func(ctx context.Context, data []byte, attrs map[string]string) error

MessageHandler is called for each received Pub/Sub message. Returning nil acknowledges the message; returning an error causes a nack (retry).

type MessageNotification

type MessageNotification struct {
	MessageID string `json:"message_id"`
}

MessageNotification is published to the messages topic. Intentionally minimal — full message content lives in Firestore.

func DecodeMessageNotification

func DecodeMessageNotification(data []byte) (MessageNotification, error)

DecodeMessageNotification decodes a MessageNotification from raw Pub/Sub data.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher provides high-level publish functions for AILANG Pub/Sub topics.

func NewPublisher

func NewPublisher(client *Client) *Publisher

NewPublisher creates a publisher from a client.

func (*Publisher) PublishCompletion

func (p *Publisher) PublishCompletion(ctx context.Context, completion TaskCompletion, workspace string) error

PublishCompletion publishes a task completion to the completions topic.

func (*Publisher) PublishEvent

func (p *Publisher) PublishEvent(ctx context.Context, eventJSON []byte, eventType, taskID, workspace string) error

PublishEvent publishes a real-time event to the events topic. Events are consumed by the dashboard and laptop for live streaming.

func (*Publisher) PublishMessage

func (p *Publisher) PublishMessage(ctx context.Context, messageID string, attrs MessageAttributes) error

PublishMessage publishes a message notification to the messages topic. The actual message content is already stored in Firestore — this notification tells the coordinator (or laptop) that a new message is available.

func (*Publisher) PublishTask

func (p *Publisher) PublishTask(ctx context.Context, taskID, agentID, workspace, provider string) error

PublishTask publishes a task dispatch to the tasks topic. This triggers a Cloud Run Job via Eventarc to execute the task.

func (*Publisher) Stop

func (p *Publisher) Stop()

Stop flushes all pending messages and releases topic resources.

type PushEnvelope

type PushEnvelope struct {
	Message      PushMessage `json:"message"`
	Subscription string      `json:"subscription"`
}

PushEnvelope is the JSON body POSTed by Pub/Sub to push subscription endpoints. See: https://cloud.google.com/pubsub/docs/push#receive_push

type PushMessage

type PushMessage struct {
	Data        string            `json:"data"` // base64-encoded payload
	MessageID   string            `json:"messageId"`
	Attributes  map[string]string `json:"attributes"`
	PublishTime string            `json:"publishTime"`
}

PushMessage represents the message within a Pub/Sub push envelope.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber provides pull-based subscription for AILANG Pub/Sub topics.

func NewSubscriber

func NewSubscriber(client *Client) *Subscriber

NewSubscriber creates a subscriber from a client.

func (*Subscriber) ReceiveOne

func (s *Subscriber) ReceiveOne(ctx context.Context, subName string) ([]byte, map[string]string, error)

ReceiveOne pulls a single message from the subscription and returns it. Useful for CLI commands like `ailang messages watch --pubsub`. Returns the raw data, attributes, and any error.

func (*Subscriber) Stop

func (s *Subscriber) Stop()

Stop cancels all active subscriptions.

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, subName string, handler MessageHandler) error

Subscribe starts a blocking pull subscription. It calls handler for each message received on the named subscription. Subscribe blocks until ctx is cancelled or an unrecoverable error occurs.

Messages are automatically acked when handler returns nil, or nacked on error.

type TaskCompletion

type TaskCompletion struct {
	TaskID     string `json:"task_id"`
	AgentID    string `json:"agent_id"`
	Status     string `json:"status"`                // "completed" or "failed"
	BranchName string `json:"branch_name,omitempty"` // Git branch with changes
	ErrorMsg   string `json:"error_msg,omitempty"`

	// Changed files discovered via git diff after execution.
	// Used by external clients (portal, sidecar) to know which files were created/modified.
	ChangedFiles []string `json:"changed_files,omitempty"`

	// Executor metrics (populated when using full executor infrastructure)
	SessionID     string  `json:"session_id,omitempty"`
	NumTurns      int     `json:"num_turns,omitempty"`
	ToolCallCount int     `json:"tool_call_count,omitempty"`
	InputTokens   int     `json:"input_tokens,omitempty"`
	OutputTokens  int     `json:"output_tokens,omitempty"`
	CostUSD       float64 `json:"cost_usd,omitempty"`
	DurationMS    int     `json:"duration_ms,omitempty"`
}

TaskCompletion is published to the completions topic when a job finishes.

func DecodeTaskCompletion

func DecodeTaskCompletion(data []byte) (TaskCompletion, error)

DecodeTaskCompletion decodes a TaskCompletion from raw Pub/Sub data.

type TaskDispatch

type TaskDispatch struct {
	TaskID  string `json:"task_id"`
	AgentID string `json:"agent_id"`
}

TaskDispatch is published to the tasks topic to trigger a Cloud Run Job.

func DecodeTaskDispatch

func DecodeTaskDispatch(data []byte) (TaskDispatch, error)

DecodeTaskDispatch decodes a TaskDispatch from raw Pub/Sub data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL