spooled

package
v1.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2025 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package spooled provides the official Go SDK for Spooled Cloud.

Package spooled provides the official Go SDK for Spooled Cloud.

Spooled Cloud is a modern, scalable job queue and task scheduler service. This SDK provides a complete interface for interacting with all Spooled API endpoints, including job management, queue operations, worker registration, real-time events, scheduling, and workflows.

Basic Usage

Create a client with your API key:

client, err := spooled.NewClient(
	spooled.WithAPIKey("sp_live_your_api_key"),
)
if err != nil {
	log.Fatal(err)
}
defer client.Close()

Creating Jobs

ctx := context.Background()
resp, err := client.Jobs().Create(ctx, &types.CreateJobRequest{
	QueueName: "emails",
	Payload: map[string]any{
		"to":      "user@example.com",
		"subject": "Hello!",
	},
})
if err != nil {
	log.Fatal(err)
}
fmt.Printf("Created job: %s\n", resp.ID)

Configuration Options

The client supports various configuration options:

client, err := spooled.NewClient(
	spooled.WithAPIKey("sp_live_..."),
	spooled.WithBaseURL("https://api.spooled.cloud"),
	spooled.WithTimeout(30 * time.Second),
	spooled.WithRetry(spooled.RetryConfig{
		MaxRetries: 3,
		BaseDelay:  time.Second,
	}),
	spooled.WithDebug(true),
)

Error Handling

All SDK errors implement the error interface and can be inspected:

job, err := client.Jobs().Get(ctx, "job-id")
if err != nil {
	if spooled.IsNotFoundError(err) {
		fmt.Println("Job not found")
	} else if spooled.IsRateLimitError(err) {
		var rateLimitErr *spooled.RateLimitError
		if errors.As(err, &rateLimitErr) {
			fmt.Printf("Rate limited, retry after %d seconds\n", rateLimitErr.GetRetryAfter())
		}
	} else {
		log.Fatal(err)
	}
}

Resources

The client provides access to various resources:

  • Jobs: Create, list, cancel, retry jobs and manage the dead letter queue
  • Queues: List, configure, pause/resume queues
  • Workers: Register, heartbeat, and deregister workers
  • Schedules: Create and manage scheduled jobs
  • Workflows: Create and manage job workflows with dependencies
  • Webhooks: Configure outgoing webhooks for events
  • Organizations: Manage organizations and usage
  • API Keys: Manage API keys
  • Billing: Access billing status and portal
  • Auth: Authentication and token management

Package spooled provides top-level convenience functions for common operations.

These functions provide shortcuts for the most common SDK operations.

Index

Constants

View Source
const (
	DefaultBaseURL     = "https://api.spooled.cloud"
	DefaultWSURL       = "wss://api.spooled.cloud"
	DefaultGRPCAddress = "grpc.spooled.cloud:443"
	DefaultTimeout     = 30 * time.Second
	DefaultAPIVersion  = "v1"
	DefaultAPIBasePath = "/api/v1"
)

Default configuration values

Variables

View Source
var (
	// ErrNoAuth is returned when no authentication is configured.
	ErrNoAuth = errors.New("no authentication configured: set APIKey or AccessToken")

	// ErrInvalidAPIKey is returned when the API key format is invalid.
	ErrInvalidAPIKey = errors.New("invalid API key format: must start with sk_live_, sk_test_, sp_live_, or sp_test_")

	// ErrCircuitOpen is returned when the circuit breaker is open.
	ErrCircuitOpen = errors.New("circuit breaker is open")
)

Sentinel errors for common conditions

Functions

func CancelJob

func CancelJob(client *Client, jobID string) error

CancelJob cancels a job by ID.

Example:

err := spooled.CancelJob(client, "job_123")
if err != nil {
	log.Fatal(err)
}

func CreateJob

func CreateJob(client *Client, queueName string, payload map[string]any) (string, error)

CreateJob creates a job and returns the job ID.

This is a convenience function for simple job creation.

Example:

jobID, err := spooled.CreateJob(client, "emails", map[string]any{
	"to": "user@example.com",
	"subject": "Hello!",
})
if err != nil {
	log.Fatal(err)
}

func CreateJobWithOptions

func CreateJobWithOptions(client *Client, req *resources.CreateJobRequest) (string, error)

CreateJobWithOptions creates a job with full options.

Example:

jobID, err := spooled.CreateJobWithOptions(client, &resources.CreateJobRequest{
	QueueName:  "emails",
	Payload:    map[string]any{"to": "user@example.com"},
	Priority:   ptr(10),
	ScheduledAt: &futureTime,
})

func CreateScheduledJob

func CreateScheduledJob(client *Client, queueName string, payload map[string]any, scheduledAt time.Time) (string, error)

CreateScheduledJob creates a job scheduled for the future.

Example:

future := time.Now().Add(1 * time.Hour)
jobID, err := spooled.CreateScheduledJob(client, "emails", map[string]any{
	"to": "user@example.com",
}, future)

func CreateWebhook

CreateWebhook creates an outgoing webhook.

Example:

webhook, err := spooled.CreateWebhook(client, &resources.CreateOutgoingWebhookRequest{
	Name:   "email-events",
	URL:    "https://api.example.com/webhooks",
	Events: []resources.WebhookEvent{resources.WebhookEventJobCompleted},
})
if err != nil {
	log.Fatal(err)
}

func GetDashboard

func GetDashboard(client *Client) (*resources.DashboardData, error)

GetDashboard retrieves dashboard data.

Example:

dashboard, err := spooled.GetDashboard(client)
if err != nil {
	log.Fatal(err)
}
fmt.Printf("Total jobs: %d\n", dashboard.Jobs.Total)

func GetHealth

func GetHealth(client *Client) (*resources.HealthResponse, error)

GetHealth checks the health of the Spooled service.

Example:

health, err := spooled.GetHealth(client)
if err != nil {
	log.Fatal(err)
}
fmt.Printf("Status: %s\n", health.Status)

func GetJob

func GetJob(client *Client, jobID string) (*resources.Job, error)

GetJob retrieves a job by ID.

Example:

job, err := spooled.GetJob(client, "job_123")
if err != nil {
	log.Fatal(err)
}
fmt.Printf("Job status: %s\n", job.Status)

func GetQueueStats

func GetQueueStats(client *Client, queueName string) (*resources.QueueStats, error)

GetQueueStats retrieves statistics for a queue.

Example:

stats, err := spooled.GetQueueStats(client, "emails")
if err != nil {
	log.Fatal(err)
}
fmt.Printf("Pending jobs: %d\n", stats.PendingJobs)

func IsAuthenticationError

func IsAuthenticationError(err error) bool

IsAuthenticationError returns true if the error is an authentication error.

func IsNotFoundError

func IsNotFoundError(err error) bool

IsNotFoundError returns true if the error is a not found error.

func IsRateLimitError

func IsRateLimitError(err error) bool

IsRateLimitError returns true if the error is a rate limit error.

func IsRetryable

func IsRetryable(err error) bool

IsRetryable returns true if the error is retryable.

func IsSpooledError

func IsSpooledError(err error) bool

IsSpooledError returns true if the error is a Spooled SDK error.

func IsValidationError

func IsValidationError(err error) bool

IsValidationError returns true if the error is a validation error.

func ListJobs

func ListJobs(client *Client, params *resources.ListJobsParams) ([]resources.Job, error)

ListJobs lists jobs with optional filters.

Example:

jobs, err := spooled.ListJobs(client, &resources.ListJobsParams{
	QueueName: &queueName,
	Status:    &status,
	Limit:     ptr(50),
})

func ListWorkers

func ListWorkers(client *Client) ([]resources.Worker, error)

ListWorkers lists all workers.

Example:

workers, err := spooled.ListWorkers(client)
if err != nil {
	log.Fatal(err)
}
for _, w := range workers {
	fmt.Printf("Worker: %s (%s)\n", w.ID, w.Status)
}

func RegisterWorker

RegisterWorker registers a new worker.

Example:

resp, err := spooled.RegisterWorker(client, &resources.RegisterWorkerRequest{
	QueueName: "emails",
	Hostname:  "worker-01",
})
if err != nil {
	log.Fatal(err)
}
fmt.Printf("Registered worker: %s\n", resp.ID)

func ValidateAPIKey

func ValidateAPIKey(key string) error

ValidateAPIKey validates the format of an API key. Accepts both sk_ (production keys) and sp_ (documentation examples) prefixes.

Types

type APIError

type APIError struct {
	// StatusCode is the HTTP status code.
	StatusCode int `json:"status_code,omitempty"`
	// Code is the error code from the API.
	Code string `json:"code,omitempty"`
	// Message is the human-readable error message.
	Message string `json:"message,omitempty"`
	// Details contains additional error details.
	Details map[string]any `json:"details,omitempty"`
	// RequestID is the request ID for debugging.
	RequestID string `json:"request_id,omitempty"`
	// RawBody is the raw response body.
	RawBody []byte `json:"-"`
	// Err is the underlying error, if any.
	Err error `json:"-"`
}

APIError is the base error type for all Spooled SDK errors.

func AsSpooledError

func AsSpooledError(err error) (*APIError, bool)

AsSpooledError attempts to convert an error to a Spooled APIError.

func (*APIError) Error

func (e *APIError) Error() string

Error implements the error interface.

func (*APIError) IsRetryable

func (e *APIError) IsRetryable() bool

IsRetryable returns true if the error is retryable.

func (*APIError) Unwrap

func (e *APIError) Unwrap() error

Unwrap returns the underlying error.

type AuthenticationError

type AuthenticationError struct {
	*APIError
}

AuthenticationError represents a 401 error.

type AuthorizationError

type AuthorizationError struct {
	*APIError
}

AuthorizationError represents a 403 error.

type CircuitBreakerConfig

type CircuitBreakerConfig struct {
	// Enabled determines if circuit breaker is active.
	Enabled bool
	// FailureThreshold is the number of failures before opening the circuit.
	FailureThreshold int
	// SuccessThreshold is the number of successes needed to close the circuit.
	SuccessThreshold int
	// Timeout is the duration the circuit stays open before allowing a test request.
	Timeout time.Duration
}

CircuitBreakerConfig configures the circuit breaker.

func DefaultCircuitBreakerConfig

func DefaultCircuitBreakerConfig() CircuitBreakerConfig

DefaultCircuitBreakerConfig returns the default circuit breaker configuration.

type CircuitBreakerOpenError

type CircuitBreakerOpenError struct {
	*APIError
}

CircuitBreakerOpenError represents a circuit breaker open error.

func (*CircuitBreakerOpenError) IsRetryable

func (e *CircuitBreakerOpenError) IsRetryable() bool

IsRetryable always returns false for circuit breaker errors.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is the main Spooled SDK client.

func CreateClient

func CreateClient(apiKey string) (*Client, error)

CreateClient creates a new Spooled client with default configuration.

This is a convenience function equivalent to:

client, err := spooled.NewClient(spooled.WithAPIKey(apiKey))

Example:

client, err := spooled.CreateClient("sp_live_your_api_key")
if err != nil {
	log.Fatal(err)
}
defer client.Close()

func NewClient

func NewClient(opts ...Option) (*Client, error)

NewClient creates a new Spooled client with the given options.

func (*Client) APIKeys

func (c *Client) APIKeys() *resources.APIKeysResource

APIKeys returns the API Keys resource.

func (*Client) Admin

func (c *Client) Admin() *resources.AdminResource

Admin returns the Admin resource.

func (*Client) Auth

func (c *Client) Auth() *resources.AuthResource

Auth returns the Auth resource.

func (*Client) Billing

func (c *Client) Billing() *resources.BillingResource

Billing returns the Billing resource.

func (*Client) Close

func (c *Client) Close() error

Close closes the client and releases any resources.

func (*Client) Dashboard

func (c *Client) Dashboard() *resources.DashboardResource

Dashboard returns the Dashboard resource.

func (*Client) GRPC

func (c *Client) GRPC() (*grpc.Client, error)

GRPC returns the gRPC client for high-performance operations.

Note: This method dials the gRPC server the first time it is called. Callers should handle connection errors (e.g., local dev without gRPC running).

func (*Client) GetConfig

func (c *Client) GetConfig() Config

GetConfig returns a copy of the client configuration.

func (*Client) Health

func (c *Client) Health() *resources.HealthResource

Health returns the Health resource.

func (*Client) Ingest

func (c *Client) Ingest() *resources.IngestResource

Ingest returns the Ingest resource.

func (*Client) Jobs

func (c *Client) Jobs() *resources.JobsResource

Jobs returns the Jobs resource.

func (*Client) Metrics

func (c *Client) Metrics() *resources.MetricsResource

Metrics returns the Metrics resource.

func (*Client) Organizations

func (c *Client) Organizations() *resources.OrganizationsResource

Organizations returns the Organizations resource.

func (*Client) Queues

func (c *Client) Queues() *resources.QueuesResource

Queues returns the Queues resource.

func (*Client) Schedules

func (c *Client) Schedules() *resources.SchedulesResource

Schedules returns the Schedules resource.

func (*Client) Webhooks

func (c *Client) Webhooks() *resources.WebhooksResource

Webhooks returns the Webhooks resource.

func (*Client) Workers

func (c *Client) Workers() *resources.WorkersResource

Workers returns the Workers resource.

func (*Client) Workflows

func (c *Client) Workflows() *resources.WorkflowsResource

Workflows returns the Workflows resource.

type Config

type Config struct {
	// APIKey is the API key for authentication (production keys start with sk_live_, sk_test_).
	APIKey string
	// AccessToken is a JWT access token (alternative to API key).
	AccessToken string
	// RefreshToken is a JWT refresh token for automatic token renewal.
	RefreshToken string
	// AdminKey is the admin API key (for /api/v1/admin/* endpoints; uses X-Admin-Key header).
	AdminKey string

	// BaseURL is the base URL for the REST API.
	BaseURL string
	// WSURL is the WebSocket URL for realtime events.
	WSURL string
	// GRPCAddress is the gRPC server address.
	GRPCAddress string

	// Timeout is the request timeout.
	Timeout time.Duration
	// Retry is the retry configuration.
	Retry RetryConfig
	// CircuitBreaker is the circuit breaker configuration.
	CircuitBreaker CircuitBreakerConfig

	// Headers are additional headers to include in all requests.
	Headers map[string]string
	// UserAgent is the custom user agent string.
	UserAgent string
	// Logger is the debug logger.
	Logger Logger
	// AutoRefreshToken enables automatic token refresh.
	AutoRefreshToken bool
}

Config holds the SDK configuration.

type ConflictError

type ConflictError struct {
	*APIError
}

ConflictError represents a 409 error.

type Logger

type Logger interface {
	Debug(msg string, keysAndValues ...any)
}

Logger is the interface for debug logging.

type LoggerFunc

type LoggerFunc func(msg string, keysAndValues ...any)

LoggerFunc is a function adapter for Logger.

func (LoggerFunc) Debug

func (f LoggerFunc) Debug(msg string, keysAndValues ...any)

Debug implements Logger.

type NetworkError

type NetworkError struct {
	*APIError
}

NetworkError represents a network-level error.

func (*NetworkError) IsRetryable

func (e *NetworkError) IsRetryable() bool

IsRetryable always returns true for network errors.

type NotFoundError

type NotFoundError struct {
	*APIError
}

NotFoundError represents a 404 error.

type Option

type Option func(*Config)

Option is a functional option for configuring the client.

func WithAPIKey

func WithAPIKey(key string) Option

WithAPIKey sets the API key for authentication.

func WithAccessToken

func WithAccessToken(token string) Option

WithAccessToken sets the JWT access token.

func WithAdminKey

func WithAdminKey(key string) Option

WithAdminKey sets the admin API key.

func WithAutoRefreshToken

func WithAutoRefreshToken(enabled bool) Option

WithAutoRefreshToken enables or disables automatic token refresh.

func WithBaseURL

func WithBaseURL(url string) Option

WithBaseURL sets the base URL for the REST API.

func WithCircuitBreaker

func WithCircuitBreaker(cfg CircuitBreakerConfig) Option

WithCircuitBreaker sets the circuit breaker configuration.

func WithDebug

func WithDebug(enabled bool) Option

WithDebug enables debug logging to stdout.

func WithGRPCAddress

func WithGRPCAddress(addr string) Option

WithGRPCAddress sets the gRPC server address.

func WithHeaders

func WithHeaders(headers map[string]string) Option

WithHeaders sets additional headers for all requests.

func WithLogger

func WithLogger(l Logger) Option

WithLogger sets the debug logger.

func WithRefreshToken

func WithRefreshToken(token string) Option

WithRefreshToken sets the JWT refresh token.

func WithRetry

func WithRetry(cfg RetryConfig) Option

WithRetry sets the retry configuration.

func WithTimeout

func WithTimeout(d time.Duration) Option

WithTimeout sets the request timeout.

func WithUserAgent

func WithUserAgent(ua string) Option

WithUserAgent sets a custom user agent string.

func WithWSURL

func WithWSURL(url string) Option

WithWSURL sets the WebSocket URL.

type PayloadTooLargeError

type PayloadTooLargeError struct {
	*APIError
}

PayloadTooLargeError represents a 413 error.

type RateLimitError

type RateLimitError struct {
	*APIError
	// RetryAfter is the duration to wait before retrying.
	RetryAfter time.Duration
	// Limit is the rate limit.
	Limit int
	// Remaining is the remaining requests.
	Remaining int
	// Reset is the time when the rate limit resets.
	Reset time.Time
}

RateLimitError represents a 429 error.

func (*RateLimitError) GetRetryAfter

func (e *RateLimitError) GetRetryAfter() int

GetRetryAfter returns the retry-after duration in seconds.

type RetryConfig

type RetryConfig struct {
	// MaxRetries is the maximum number of retry attempts.
	MaxRetries int
	// BaseDelay is the initial delay before the first retry.
	BaseDelay time.Duration
	// MaxDelay is the maximum delay between retries.
	MaxDelay time.Duration
	// Factor is the exponential backoff multiplier.
	Factor float64
	// Jitter enables randomized jitter on retry delays.
	Jitter bool
}

RetryConfig configures retry behavior for failed requests.

func DefaultRetryConfig

func DefaultRetryConfig() RetryConfig

DefaultRetryConfig returns the default retry configuration.

type ServerError

type ServerError struct {
	*APIError
}

ServerError represents a 5xx error.

func (*ServerError) IsRetryable

func (e *ServerError) IsRetryable() bool

IsRetryable always returns true for server errors.

type SpooledWorker

type SpooledWorker struct {
	// contains filtered or unexported fields
}

SpooledWorker is a high-level worker for processing jobs.

func CreateWorker

func CreateWorker(client *Client, queueName string, handler func(context.Context, *resources.Job) (any, error)) (*SpooledWorker, error)

CreateWorker creates and starts a Spooled worker.

This is a convenience function that creates, configures, and starts a worker.

Example:

worker, err := spooled.CreateWorker(client, "emails", func(ctx context.Context, job *resources.Job) (any, error) {
	fmt.Printf("Processing email to: %v\n", job.Payload["to"])
	return map[string]any{"sent": true}, nil
})
if err != nil {
	log.Fatal(err)
}
defer worker.Stop()

// Worker runs until stopped
select {}

func NewSpooledWorker

func NewSpooledWorker(c *Client, opts SpooledWorkerOptions) *SpooledWorker

NewSpooledWorker creates a new Spooled worker for processing jobs.

Example:

w := spooled.NewSpooledWorker(client, spooled.SpooledWorkerOptions{
	QueueName:   "emails",
	Concurrency: 10,
})
defer w.Stop()

w.Process(func(ctx context.Context, job *resources.Job) (any, error) {
	// Process job
	return map[string]any{"processed": true}, nil
})

if err := w.Start(); err != nil {
	log.Fatal(err)
}

func (*SpooledWorker) Process

func (w *SpooledWorker) Process(handler func(context.Context, *resources.Job) (any, error))

Process registers a job handler function.

func (*SpooledWorker) Start

func (w *SpooledWorker) Start() error

Start starts the worker.

func (*SpooledWorker) Stop

func (w *SpooledWorker) Stop() error

Stop stops the worker.

type SpooledWorkerOptions

type SpooledWorkerOptions struct {
	// QueueName is the name of the queue to process jobs from.
	QueueName string
	// Concurrency is the maximum number of jobs to process concurrently (default: 5).
	Concurrency int
	// PollInterval is how often to poll for new jobs (default: 1s).
	PollInterval time.Duration
	// LeaseDuration is the job lease duration in seconds (default: 30).
	LeaseDuration int
	// Hostname is the worker hostname (default: auto-detected).
	Hostname string
	// WorkerType identifies the type of worker (default: "go").
	WorkerType string
	// Version is the worker version (default: SDK version).
	Version string
	// Metadata contains additional worker metadata.
	Metadata map[string]string
	// contains filtered or unexported fields
}

SpooledWorkerOptions configures a Spooled worker.

type TimeoutError

type TimeoutError struct {
	*APIError
	// TimeoutSeconds is the timeout duration in seconds.
	TimeoutSeconds float64
}

TimeoutError represents a timeout error.

func (*TimeoutError) IsRetryable

func (e *TimeoutError) IsRetryable() bool

IsRetryable always returns true for timeout errors.

type ValidationError

type ValidationError struct {
	*APIError
}

ValidationError represents a 400/422 error.

Directories

Path Synopsis
Package grpc provides a gRPC client for the Spooled API.
Package grpc provides a gRPC client for the Spooled API.
pb
Package realtime provides SSE and WebSocket clients for real-time Spooled events.
Package realtime provides SSE and WebSocket clients for real-time Spooled events.
Package resources provides REST resource implementations for the Spooled API.
Package resources provides REST resource implementations for the Spooled API.
Package types contains request and response types for the Spooled API.
Package types contains request and response types for the Spooled API.
Package worker provides job processing runtime for Spooled queues.
Package worker provides job processing runtime for Spooled queues.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL