cost

package
v1.1.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2026 License: AGPL-3.0 Imports: 15 Imported by: 0

README

Cost-Aware Federation

This package implements intelligent cost tracking and management for federation activities in Lesser. It provides budget controls, health monitoring, and tiered service levels for federated instances.

Features

1. Cost Tracking & Budgeting
  • Real-time cost estimation for all federation activities
  • Per-instance and global budget enforcement
  • AWS service cost calculations (Lambda, DynamoDB, S3, Data Transfer)
  • Cost transparency headers in federation requests
2. Instance Health Monitoring
  • Automatic health scoring based on success rate, response time, and failures
  • Instance quarantine for consistently failing servers
  • Intelligent retry policies based on instance health
3. Tiered Federation Service
  • Premium: Unlimited federation, priority processing
  • Standard: Normal limits and processing
  • Limited: Reduced limits, lower priority
  • Blocked: No federation allowed
4. Smart Retry Logic
  • Instance-specific retry policies
  • Exponential backoff with jitter
  • Health-aware retry decisions

Usage

Basic Setup
import (
    "github.com/equaltoai/lesser/pkg/federation/cost"
    "github.com/aws/aws-sdk-go-v2/service/dynamodb"
)

// Create storage backend
storage := cost.NewDynamoStorage(
    dynamoClient,
    "lesser-main-table",
    logger,
    costTracker,
)

// Define budget limits
budget := &cost.FederationBudget{
    TotalBudgetUSD:       1000.0,  // $1000/month total
    PerInstanceBudgetUSD: 10.0,    // $10/month per instance
    BudgetPeriod:         "monthly",
    InstanceOverrides: map[string]float64{
        "mastodon.social": 50.0,   // Higher budget for large instances
    },
}

// Set thresholds
thresholds := &cost.Thresholds{
    WarnThresholdPercent:  80,  // Warn at 80% budget
    BlockThresholdPercent: 95,  // Block at 95% budget
}

// Create controller
calculator := cost.NewCostCalculator("us-east-1")
controller := cost.NewController(
    storage,
    calculator,
    logger,
    budget,
    thresholds,
)
Integration with Federation Delivery
// Wrap existing delivery function
deliveryMiddleware := cost.NewDeliveryMiddleware(controller, logger)
wrappedDelivery := deliveryMiddleware.WrapDelivery(originalDeliveryFunc)

// Use wrapped delivery
err := wrappedDelivery(ctx, "https://mastodon.social", activityJSON)
HTTP Transport Integration
// Create cost-aware HTTP client
transport := cost.NewHTTPTransportWrapper(
    http.DefaultTransport,
    controller,
    logger,
)

client := &http.Client{
    Transport: transport,
    Timeout:   30 * time.Second,
}
Retry with Instance-Specific Policy
retryMiddleware := cost.NewRetryMiddleware(controller, logger)

err := retryMiddleware.RetryWithPolicy(ctx, "mastodon.social", func() error {
    // Your federation operation here
    return sendActivity(activity)
})

DynamoDB Schema

The cost tracking uses the following DynamoDB patterns:

Primary Table Structure
PK SK Type Purpose
FEDCOST#domain PERIOD#YYYY-MM FederationCost Monthly cost tracking
INSTANCE#domain HEALTH InstanceHealth Health metrics
INSTANCE#domain CONFIG InstanceConfig Instance configuration
Global Secondary Indexes

GSI1: Period-based queries

  • PK: PERIOD#YYYY-MM
  • SK: INSTANCE#domain

GSI2: Unhealthy instances

  • PK: UNHEALTHY
  • SK: SCORE#0.xxxx#domain

GSI3: Tier-based queries

  • PK: TIER#premium/standard/limited
  • SK: domain

Cost Calculation

The system tracks costs for:

  1. Data Transfer: $0.09/GB (with tiered pricing)
  2. Lambda Invocations: $0.20/million requests + compute time
  3. DynamoDB: $0.25/million reads, $1.25/million writes
  4. S3 Storage: $0.023/GB + request costs

Free tier allowances are automatically applied.

Monitoring & Alerts

The system provides:

  • Warning logs when instances approach budget limits
  • Automatic instance health checks
  • Cost metrics aggregation by period
  • Instance reputation tracking

Configuration Examples

Premium Instance
config := &cost.InstanceConfig{
    Domain: "important.social",
    Tier:   cost.TierPremium,
    CustomBudgetUSD: &premiumBudget,
    RetryPolicy: &cost.RetryPolicy{
        MaxRetries:     5,
        InitialBackoff: 500 * time.Millisecond,
        MaxBackoff:     30 * time.Second,
        BackoffFactor:  1.5,
    },
}
Limiting Problematic Instance
config := &cost.InstanceConfig{
    Domain: "spammy.instance",
    Tier:   cost.TierLimited,
    RateLimitOverride: &lowRateLimit,
    RetryPolicy: &cost.RetryPolicy{
        MaxRetries: 1,  // Minimal retries
    },
}

Best Practices

  1. Set Reasonable Budgets: Start conservative and adjust based on actual usage
  2. Monitor Health Metrics: Use the unhealthy instances query to identify problems
  3. Use Tiering: Assign appropriate tiers based on instance importance
  4. Review Cost Reports: Regularly check cost metrics to optimize spending
  5. Implement Gradual Rollout: Test with a few instances before enabling globally

Future Enhancements

  • Machine learning for cost prediction
  • Automatic tier adjustment based on behavior
  • Cross-region cost optimization
  • Detailed cost attribution by activity type

Documentation

Overview

Package cost provides AWS cost calculation utilities for federation operations.

Index

Constants

View Source
const (
	// Data transfer costs
	DataTransferCostPerGB = 0.09 // $0.09 per GB for outbound data transfer

	// Lambda costs
	LambdaCostPerMillionRequests = 0.20         // $0.20 per million requests
	LambdaCostPerGBSecond        = 0.0000166667 // $0.0000166667 per GB-second

	// DynamoDB costs (on-demand capacity)
	DynamoDBReadCostPerMillion  = 0.25 // $0.25 per million read request units
	DynamoDBWriteCostPerMillion = 1.25 // $1.25 per million write request units

	// S3 costs
	S3StorageCostPerGB       = 0.023  // $0.023 per GB per month for standard storage
	S3RequestCostPerThousand = 0.0004 // $0.0004 per 1,000 GET requests
)

AWS pricing constants (US East 1 region, as of 2024)

Variables

View Source
var (
	// ErrFederationNotAllowed indicates federation is blocked or limited for a domain
	ErrFederationNotAllowed = errors.NewFederationError(errors.CodeForbidden, "federation not allowed")

	// ErrInstanceUnhealthy indicates an instance is marked as unhealthy and should not receive retries
	ErrInstanceUnhealthy = errors.NewFederationError(errors.CodeExternalServiceUnavailable, "instance unhealthy")

	// ErrOperationFailedAfterRetries indicates an operation failed after all retry attempts
	ErrOperationFailedAfterRetries = errors.ProcessingFailed("operation after retries", stdErrors.New("operation failed after retries"))

	// ErrEmptyInstanceURL indicates an empty or invalid instance URL was provided
	ErrEmptyInstanceURL = errors.NewValidationError("instance_url", "cannot be empty")

	// ErrGetInstanceTier indicates failure to retrieve instance tier
	ErrGetInstanceTier = errors.FailedToGet("instance tier", stdErrors.New("failed to get instance tier"))

	// ErrCheckHealth indicates failure to check instance health
	ErrCheckHealth = errors.ProcessingFailed("health check", stdErrors.New("health check failed"))

	// ErrGetRemainingBudget indicates failure to get remaining budget
	ErrGetRemainingBudget = errors.FailedToGet("remaining budget", stdErrors.New("failed to get remaining budget"))

	// ErrGetInstanceConfig indicates failure to get instance configuration
	ErrGetInstanceConfig = errors.FailedToGet("instance config", stdErrors.New("failed to get instance config"))

	// ErrGetRetryPolicy indicates failure to get retry policy
	ErrGetRetryPolicy = errors.FailedToGet("retry policy", stdErrors.New("failed to get retry policy"))

	// ErrGetInstanceCost indicates failure to get instance cost data
	ErrGetInstanceCost = errors.FailedToGet("instance cost", stdErrors.New("failed to get instance cost"))

	// ErrRecordCost indicates failure to record cost data
	ErrRecordCost = errors.ProcessingFailed("cost recording", stdErrors.New("cost recording failed"))

	// ErrGetInstanceHealth indicates failure to get instance health data
	ErrGetInstanceHealth = errors.FailedToGet("instance health", stdErrors.New("failed to get instance health"))

	// ErrUpdateInstanceHealth indicates failure to update instance health
	ErrUpdateInstanceHealth = errors.FailedToUpdate("instance health", stdErrors.New("failed to update instance health"))

	// ErrDomainExtractionFailed indicates failure to extract domain from URL
	ErrDomainExtractionFailed = errors.ProcessingFailed("domain extraction", stdErrors.New("domain extraction failed"))

	// ErrFederationCheckFailed indicates failure to check federation status
	ErrFederationCheckFailed = errors.ProcessingFailed("federation check", stdErrors.New("federation check failed"))
)

Error constants for federation cost integration

View Source
var DefaultRetryPolicy = &RetryPolicy{
	MaxRetries:     3,
	InitialBackoff: 1 * time.Second,
	MaxBackoff:     30 * time.Second,
	BackoffFactor:  2.0,
}

DefaultRetryPolicy defines the default retry configuration

Functions

func EstimateTotalActivityCost

func EstimateTotalActivityCost(
	dataTransferBytes int64,
	lambdaInvocations int,
	lambdaDurationMs int64,
	dynamoReads int,
	dynamoWrites int,
	s3StorageGB int64,
	s3Requests int64,
) float64

EstimateTotalActivityCost estimates the total cost of a federation activity

Types

type Controller

type Controller interface {
	// Decision making
	ShouldFederate(ctx context.Context, instance string) (bool, error)
	GetInstanceTier(ctx context.Context, instance string) (FederationTier, error)
	GetRetryPolicy(ctx context.Context, instance string) (*RetryPolicy, error)

	// Cost tracking
	TrackActivity(ctx context.Context, instance string, activityType string, sizeBytes int64) error
	GetRemainingBudget(ctx context.Context, instance string) (float64, error)

	// Health monitoring
	RecordSuccess(ctx context.Context, instance string, responseTimeMs int64) error
	RecordFailure(ctx context.Context, instance string, err error) error
	IsHealthy(ctx context.Context, instance string) (bool, error)
}

Controller interface for cost-aware federation decisions

func NewController

func NewController(
	storage Storage,
	calculator CostCalculator,
	logger *zap.Logger,
	budget *FederationBudget,
	thresholds *Thresholds,
) Controller

NewController creates a new cost-aware federation controller

type CostCalculator

type CostCalculator interface {
	EstimateDataTransferCost(bytes int64, region string) float64
	EstimateLambdaCost(invocations int, durationMs int64) float64
	EstimateDynamoDBCost(readUnits, writeUnits int) float64
	EstimateS3Cost(storageGB, requestCount int64) float64
}

CostCalculator defines the interface for estimating AWS costs

func NewCostCalculator

func NewCostCalculator(region string) CostCalculator

NewCostCalculator creates a new cost calculator for the specified AWS region

type CostMetrics

type CostMetrics struct {
	Period         string             `json:"period" dynamodbav:"Period"` // YYYY-MM-DD
	TotalCostUSD   float64            `json:"total_cost_usd" dynamodbav:"TotalCostUSD"`
	InstanceCosts  map[string]float64 `json:"instance_costs" dynamodbav:"InstanceCosts"`
	ActivityCosts  map[string]float64 `json:"activity_costs" dynamodbav:"ActivityCosts"` // by activity type
	DataTransferGB float64            `json:"data_transfer_gb" dynamodbav:"DataTransferGB"`
	RequestCount   int64              `json:"request_count" dynamodbav:"RequestCount"`
}

CostMetrics holds aggregated cost metrics

type DeliveryMiddleware

type DeliveryMiddleware struct {
	// contains filtered or unexported fields
}

DeliveryMiddleware wraps federation delivery with cost tracking and health monitoring

func NewDeliveryMiddleware

func NewDeliveryMiddleware(controller Controller, logger *zap.Logger) *DeliveryMiddleware

NewDeliveryMiddleware creates a new cost-aware delivery middleware

func (*DeliveryMiddleware) WrapDelivery

func (m *DeliveryMiddleware) WrapDelivery(
	deliveryFunc func(ctx context.Context, instanceURL, activityJSON string) error,
) func(ctx context.Context, instanceURL, activityJSON string) error

WrapDelivery wraps a delivery function with cost-aware checks and tracking

type FederationBudget

type FederationBudget struct {
	TotalBudgetUSD       float64            `json:"total_budget_usd"`
	PerInstanceBudgetUSD float64            `json:"per_instance_budget_usd"`
	BudgetPeriod         string             `json:"budget_period"` // "monthly", "daily", "hourly"
	InstanceOverrides    map[string]float64 `json:"instance_overrides"`
}

FederationBudget defines spending limits for federation activities

type FederationCost

type FederationCost struct {
	InstanceDomain string    `json:"instance_domain" dynamodbav:"InstanceDomain"`
	IngressBytes   int64     `json:"ingress_bytes" dynamodbav:"IngressBytes"`
	EgressBytes    int64     `json:"egress_bytes" dynamodbav:"EgressBytes"`
	RequestCount   int       `json:"request_count" dynamodbav:"RequestCount"`
	ErrorCount     int       `json:"error_count" dynamodbav:"ErrorCount"`
	ErrorRate      float64   `json:"error_rate" dynamodbav:"ErrorRate"`
	AverageCostUSD float64   `json:"average_cost_usd" dynamodbav:"AverageCostUSD"`
	LastUpdated    time.Time `json:"last_updated" dynamodbav:"LastUpdated"`
	BillingPeriod  string    `json:"billing_period" dynamodbav:"BillingPeriod"` // YYYY-MM format
}

FederationCost tracks the cost metrics for a specific federated instance

type FederationTier

type FederationTier string

FederationTier represents different service tiers for federated instances

const (
	TierPremium  FederationTier = "premium"  // Unlimited, priority processing
	TierStandard FederationTier = "standard" // Normal limits
	TierLimited  FederationTier = "limited"  // Reduced limits, lower priority
	TierBlocked  FederationTier = "blocked"  // No federation
)

Federation tier constants

type HTTPTransportWrapper

type HTTPTransportWrapper struct {
	// contains filtered or unexported fields
}

HTTPTransportWrapper wraps HTTP transport with cost-aware headers

func NewHTTPTransportWrapper

func NewHTTPTransportWrapper(
	base http.RoundTripper,
	controller Controller,
	logger *zap.Logger,
) *HTTPTransportWrapper

NewHTTPTransportWrapper creates a new HTTP transport wrapper

func (*HTTPTransportWrapper) RoundTrip

func (t *HTTPTransportWrapper) RoundTrip(req *http.Request) (*http.Response, error)

RoundTrip implements http.RoundTripper with cost tracking

type InstanceConfig

type InstanceConfig struct {
	Domain            string         `json:"domain" dynamodbav:"Domain"`
	Tier              FederationTier `json:"tier" dynamodbav:"Tier"`
	CustomBudgetUSD   *float64       `json:"custom_budget_usd,omitempty" dynamodbav:"CustomBudgetUSD,omitempty"`
	RateLimitOverride *int           `json:"rate_limit_override,omitempty" dynamodbav:"RateLimitOverride,omitempty"`
	RetryPolicy       *RetryPolicy   `json:"retry_policy,omitempty" dynamodbav:"RetryPolicy,omitempty"`
	Created           time.Time      `json:"created" dynamodbav:"Created"`
	LastModified      time.Time      `json:"last_modified" dynamodbav:"LastModified"`
}

InstanceConfig holds per-instance federation configuration

type InstanceHealth

type InstanceHealth struct {
	Domain           string    `json:"domain" dynamodbav:"Domain"`
	HealthScore      float64   `json:"health_score" dynamodbav:"HealthScore"`          // 0.0 to 1.0
	ResponseTimeP95  int64     `json:"response_time_p95" dynamodbav:"ResponseTimeP95"` // milliseconds
	SuccessRate      float64   `json:"success_rate" dynamodbav:"SuccessRate"`          // 0.0 to 1.0
	LastHealthCheck  time.Time `json:"last_health_check" dynamodbav:"LastHealthCheck"`
	ConsecutiveFails int       `json:"consecutive_fails" dynamodbav:"ConsecutiveFails"`
	IsHealthy        bool      `json:"is_healthy" dynamodbav:"IsHealthy"`
}

InstanceHealth represents the health and reliability metrics for a federated instance

type RetryMiddleware

type RetryMiddleware struct {
	// contains filtered or unexported fields
}

RetryMiddleware implements intelligent retry logic based on instance health and cost

func NewRetryMiddleware

func NewRetryMiddleware(controller Controller, logger *zap.Logger) *RetryMiddleware

NewRetryMiddleware creates a new retry middleware

func (*RetryMiddleware) RetryWithPolicy

func (m *RetryMiddleware) RetryWithPolicy(
	ctx context.Context,
	domain string,
	operation func() error,
) error

RetryWithPolicy executes a function with instance-specific retry policy

type RetryPolicy

type RetryPolicy struct {
	MaxRetries     int           `json:"max_retries" dynamodbav:"MaxRetries"`
	InitialBackoff time.Duration `json:"initial_backoff" dynamodbav:"InitialBackoff"`
	MaxBackoff     time.Duration `json:"max_backoff" dynamodbav:"MaxBackoff"`
	BackoffFactor  float64       `json:"backoff_factor" dynamodbav:"BackoffFactor"`
}

RetryPolicy defines how to retry failed federation attempts

type Storage

type Storage interface {
	// Cost tracking
	RecordCost(ctx context.Context, cost *FederationCost) error
	GetInstanceCost(ctx context.Context, domain string, period string) (*FederationCost, error)
	GetCostMetrics(ctx context.Context, period string) (*CostMetrics, error)

	// Health tracking
	UpdateInstanceHealth(ctx context.Context, health *InstanceHealth) error
	GetInstanceHealth(ctx context.Context, domain string) (*InstanceHealth, error)
	ListUnhealthyInstances(ctx context.Context) ([]*InstanceHealth, error)

	// Configuration
	SaveInstanceConfig(ctx context.Context, config *InstanceConfig) error
	GetInstanceConfig(ctx context.Context, domain string) (*InstanceConfig, error)
	ListInstanceConfigs(ctx context.Context) ([]*InstanceConfig, error)
}

Storage interface for cost tracking persistence

func NewDynamoStorage

func NewDynamoStorage(
	repo Storage,
	logger *zap.Logger,
	costTracker *cost.Tracker,
) Storage

NewDynamoStorage creates a new DynamoDB-backed storage implementation

func NewRepositoryAdapter

func NewRepositoryAdapter(db core.DB, logger *zap.Logger, costTracker *cost.Tracker) Storage

NewRepositoryAdapter creates a new repository adapter

type Thresholds

type Thresholds struct {
	WarnThresholdPercent  float64 `json:"warn_threshold_percent"`  // e.g., 80
	BlockThresholdPercent float64 `json:"block_threshold_percent"` // e.g., 95
}

Thresholds defines alerting thresholds

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL