integration

package
v1.1.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2026 License: AGPL-3.0 Imports: 10 Imported by: 0

README ¶

Lambda Integration Test Suite

Overview

The Lambda Integration Test Suite provides comprehensive, production-ready testing capabilities for AWS Lambda functions in the Lesser serverless ActivityPub implementation. This suite integrates with:

  • Lift Framework: Type-safe Lambda testing with HTTP simulation
  • DynamORM: Test data management with proper repository patterns
  • Test Factories: Realistic test data generation
  • Performance Monitoring: Cold start, warm start, and throughput analysis
  • Concurrency Testing: Load testing with concurrent execution scenarios

Key Features

🔧 Comprehensive Infrastructure Integration
  • DynamORM repository pattern integration
  • Lift testing framework compatibility
  • Authentication service testing
  • Real AWS service simulation
📊 Advanced Performance Monitoring
  • Cold start vs warm start analysis
  • Memory usage tracking
  • Percentile-based latency analysis (P50, P95, P99)
  • Throughput and concurrency metrics
🎯 Production-Ready Test Scenarios
  • API Gateway integration testing
  • ActivityPub federation testing
  • Media processing workflows
  • Background job processing (SQS)
🚀 Scalability Testing
  • Concurrent execution testing
  • Load testing with realistic traffic patterns
  • Performance threshold validation

Architecture

Core Components
type LambdaIntegrationTestSuite struct {
    // Core infrastructure
    config *config.Config
    logger *zap.Logger
    repos  core.RepositoryStorage
    db     dynamormCore.DB
    
    // Lift testing integration
    liftTestSuite *testing.IntegrationTestSuite
    testApp       *testing.TestApp
    
    // Test data management
    actorFactory    *factories.ActorFactory
    testDataManager *TestDataManager
    
    // Performance tracking
    performanceTracker *PerformanceTracker
    concurrencyTracker *ConcurrencyTracker
}
Test Data Management

The suite uses DynamORM patterns for test data:

// Automatically creates and tracks test data
DataRequirements: &TestDataRequirements{
    Users:    10,
    Actors:   5,
    Statuses: 50,
    Activities: 25,
    CustomData: map[string]interface{}{
        "federation_enabled": true,
    },
}

Usage Examples

Basic Integration Test
func TestAPILambdaIntegration(t *testing.T) {
    handler := lambda.NewHandler(myLambdaFunction)
    suite := integration.NewLambdaIntegrationTestSuite(t, handler)
    
    testCase := integration.LambdaIntegrationTestCase{
        Name:        "API_Health_Check",
        Description: "Test API health endpoint",
        Event:       integration.BuildAPIGatewayEvent("GET", "/health", nil, nil),
        Timeout:     10 * time.Second,
        PerformanceThresholds: &integration.PerformanceThresholds{
            MaxColdStart:   5 * time.Second,
            MaxWarmStart:   2 * time.Second,
            MaxMemoryMB:    256,
            MinSuccessRate: 99.0,
        },
    }
    
    suite.RunIntegrationTest(testCase)
}
Authenticated API Testing
testCase := integration.LambdaIntegrationTestCase{
    Name:           "Authenticated_User_Timeline",
    RequiredAuth:   true,
    RequiredScopes: []string{"read"},
    DataRequirements: &integration.TestDataRequirements{
        Users:    1,
        Actors:   1,
        Statuses: 10,
    },
    ExecuteFunc: func(suite *integration.LambdaIntegrationTestSuite, event interface{}) (interface{}, error) {
        headers := map[string]string{
            "Authorization": "Bearer " + suite.GetTestToken("standard"),
        }
        authEvent := integration.BuildAPIGatewayEvent("GET", "/api/v1/timelines/home", nil, headers)
        return suite.invokeLambda(context.Background(), authEvent)
    },
}
Concurrency Testing
concurrencyTest := integration.LambdaConcurrencyTest{
    Name:               "High_Load_Timeline_Requests",
    ConcurrentRequests: 50,
    RequestBuilder: func(index int) interface{} {
        return integration.BuildAPIGatewayEvent("GET", "/api/v1/timelines/public", nil, nil)
    },
    MaxDuration: 30 * time.Second,
}

suite.RunConcurrencyTest(concurrencyTest)
ActivityPub Federation Testing
activityPubTest := integration.LambdaIntegrationTestCase{
    Name: "ActivityPub_Inbox_Processing",
    DataRequirements: &integration.TestDataRequirements{
        Actors: 2,
    },
    ExecuteFunc: func(suite *integration.LambdaIntegrationTestSuite, _ interface{}) (interface{}, error) {
        activity := map[string]interface{}{
            "@context": "https://www.w3.org/ns/activitystreams",
            "type":     "Follow",
            "actor":    "https://remote.example.com/users/remote_user",
            "object":   "https://test.example.com/users/local_user",
        }
        event := integration.BuildAPIGatewayEvent("POST", "/inbox", activity, map[string]string{
            "Content-Type": "application/activity+json",
        })
        return suite.invokeLambda(context.Background(), event)
    },
}

Pre-built Test Scenarios

The suite includes pre-built test scenarios for common Lambda patterns:

API Lambda Tests
apiTests := suite.APILambdaTestScenarios()
// Includes: health checks, authentication, CRUD operations
ActivityPub Lambda Tests
federationTests := suite.ActivityPubLambdaTestScenarios()
// Includes: inbox processing, outbox delivery, signature validation
Media Processing Tests
mediaTests := suite.MediaProcessingLambdaTestScenarios()
// Includes: upload processing, transcoding, thumbnail generation
Background Job Tests
jobTests := suite.BackgroundJobLambdaTestScenarios()
// Includes: SQS processing, retry logic, DLQ handling

Performance Analysis

The suite provides comprehensive performance analysis:

Metrics Collected
  • Execution Metrics: Invocations, successes, errors, timeouts
  • Performance Metrics: Cold/warm starts, duration percentiles, throughput
  • Memory Metrics: Peak usage, average usage across invocations
  • Concurrency Metrics: Peak concurrency, queue depth, wait times
Quality Assessment

Tests are automatically scored based on:

  • Error rates
  • Performance (P95 latency)
  • Cold start frequency
  • Throughput capabilities

Results: EXCELLENT | GOOD | ACCEPTABLE | NEEDS_IMPROVEMENT | POOR

Configuration Options

Test Suite Options
suite := integration.NewLambdaIntegrationTestSuite(t, handler,
    integration.WithConfig(customConfig),
    integration.WithLogger(customLogger),
    integration.WithRepositories(customRepos),
    integration.WithDynamoDBClient(customDB),
)
Performance Thresholds
thresholds := &integration.PerformanceThresholds{
    MaxColdStart:   5 * time.Second,   // Maximum acceptable cold start
    MaxWarmStart:   2 * time.Second,   // Maximum acceptable warm start
    MaxMemoryMB:    512,               // Maximum memory usage
    MinSuccessRate: 99.0,              // Minimum success rate (%)
    MaxErrorRate:   1.0,               // Maximum error rate (%)
}
Data Requirements
dataReqs := &integration.TestDataRequirements{
    Users:        10,                   // Number of test users
    Actors:       5,                    // Number of test actors
    Statuses:     50,                   // Number of test statuses
    Activities:   25,                   // Number of test activities
    PreserveData: false,               // Clean up after test
    CustomData: map[string]interface{}{
        "feature_flags": []string{"new_timeline"},
    },
}

Integration with Existing Infrastructure

DynamORM Integration
  • Uses proper repository patterns
  • Maintains data consistency
  • Tracks costs and performance
Lift Framework Integration
  • Type-safe HTTP testing
  • Middleware testing
  • Context management
Authentication Integration
  • JWT token generation
  • Scope-based testing
  • Multi-tenant support

Best Practices

1. Test Organization
func TestAPILambdaIntegration(t *testing.T) {
    if testing.Short() {
        t.Skip("Skipping integration test in short mode")
    }
    
    suite := integration.NewLambdaIntegrationTestSuite(t, handler)
    defer suite.AddCleanup(func() error {
        return customCleanup()
    })
    
    // Group related tests
    apiTests := suite.APILambdaTestScenarios()
    suite.RunIntegrationTests(apiTests)
}
2. Performance Testing
  • Set realistic performance thresholds
  • Test both cold and warm start scenarios
  • Include concurrency testing for user-facing endpoints
3. Data Management
  • Use minimal test data sets
  • Clean up after tests unless debugging
  • Use factories for consistent test data
4. Error Testing
  • Test error conditions explicitly
  • Validate error responses
  • Test timeout scenarios

CI/CD Integration

Environment Variables
INTEGRATION_TESTS=true          # Enable integration tests
AWS_REGION=us-east-1           # AWS region for testing
DYNAMODB_TABLE=lesser-test     # Test table name
Test Commands
# Run all integration tests
go test -v ./pkg/testing/integration/... -tags=integration

# Run only fast integration tests
go test -v ./pkg/testing/integration/... -short

# Run with performance benchmarking
go test -v ./pkg/testing/integration/... -bench=.

Troubleshooting

Common Issues
  1. DynamoDB Connection Errors

    • Ensure AWS credentials are configured
    • Check DynamoDB Local is running for local tests
    • Verify table permissions
  2. Performance Test Failures

    • Cold starts may be slower in test environment
    • Adjust thresholds for local testing
    • Check system resources during testing
  3. Authentication Failures

    • Verify test token generation
    • Check required scopes match endpoint requirements
    • Ensure auth middleware is configured
Debug Mode
suite := integration.NewLambdaIntegrationTestSuite(t, handler,
    integration.WithLogger(zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel))),
)

Migration from Minimal Testing

The new comprehensive integration test suite replaces minimal Lambda testing patterns:

Before (Minimal)
func TestLambda(t *testing.T) {
    result, err := handler.Invoke(context.Background(), event)
    assert.NoError(t, err)
    assert.NotNil(t, result)
}
After (Comprehensive)
func TestLambda(t *testing.T) {
    suite := integration.NewLambdaIntegrationTestSuite(t, handler)
    
    testCase := integration.LambdaIntegrationTestCase{
        Name: "Comprehensive_Lambda_Test",
        Event: event,
        PerformanceThresholds: &integration.PerformanceThresholds{
            MaxColdStart:   5 * time.Second,
            MinSuccessRate: 99.0,
        },
        DataRequirements: &integration.TestDataRequirements{
            Users: 1,
        },
    }
    
    suite.RunIntegrationTest(testCase)
}

This provides:

  • ✅ Performance monitoring
  • ✅ Test data management
  • ✅ Comprehensive metrics
  • ✅ Quality assessment
  • ✅ Production readiness validation

Documentation ¶

Overview ¶

Package integration provides end-to-end Lambda function testing utilities and test case management.

Index ¶

Constants ¶

This section is empty.

Variables ¶

This section is empty.

Functions ¶

func AssertColdStartTime ¶

func AssertColdStartTime(t *testing.T, duration time.Duration, maxAllowed time.Duration)

AssertColdStartTime verifies cold start performance

func AssertLambdaResponse ¶

func AssertLambdaResponse(t *testing.T, response interface{}, expectedStatus int, expectedBody interface{})

AssertLambdaResponse validates Lambda response structure

func AssertWarmStartTime ¶

func AssertWarmStartTime(t *testing.T, duration time.Duration, maxAllowed time.Duration)

AssertWarmStartTime verifies warm start performance

func BuildAPIGatewayEvent ¶

func BuildAPIGatewayEvent(method, path string, body interface{}, headers map[string]string) events.APIGatewayV2HTTPRequest

BuildAPIGatewayEvent creates an API Gateway event for testing

func BuildDynamoDBStreamEvent ¶

func BuildDynamoDBStreamEvent(eventName string, newImage, oldImage map[string]interface{}) events.DynamoDBEvent

BuildDynamoDBStreamEvent creates a DynamoDB stream event for testing

func BuildEventBridgeEvent ¶

func BuildEventBridgeEvent(source, detailType string, detail interface{}) events.CloudWatchEvent

BuildEventBridgeEvent creates an EventBridge event for testing

func BuildS3Event ¶

func BuildS3Event(bucket, key, eventName string) events.S3Event

BuildS3Event creates an S3 event for testing

func BuildSQSEvent ¶

func BuildSQSEvent(messages ...string) events.SQSEvent

BuildSQSEvent creates an SQS event for testing

func RunBatchStreamTest ¶

func RunBatchStreamTest(t *testing.T, handler func(context.Context, events.DynamoDBEvent) error, test BatchStreamTest)

RunBatchStreamTest executes batch stream processing test

func RunConcurrencyTest ¶

func RunConcurrencyTest(t *testing.T, handler lambda.Handler, test LambdaConcurrencyTest)

RunConcurrencyTest executes Lambda concurrency test

func RunStreamReplay ¶

func RunStreamReplay(t *testing.T, test StreamReplayTest, records []events.DynamoDBEventRecord)

RunStreamReplay executes stream replay test

func TestStreamErrorScenarios ¶

func TestStreamErrorScenarios(t *testing.T, handler func(context.Context, events.DynamoDBEvent) error)

TestStreamErrorScenarios tests error handling in stream processing

func TestStreamIdempotency ¶

func TestStreamIdempotency(t *testing.T, handler func(context.Context, events.DynamoDBEvent) error)

TestStreamIdempotency tests idempotent stream processing

func TestStreamOrdering ¶

func TestStreamOrdering(t *testing.T, handler func(context.Context, events.DynamoDBEvent) error)

TestStreamOrdering tests stream record ordering

Types ¶

type BatchStreamTest ¶

type BatchStreamTest struct {
	BatchSize      int
	Records        []events.DynamoDBEventRecord
	MaxConcurrency int
	Timeout        time.Duration
}

BatchStreamTest tests batch stream processing

type LagMeasurement ¶

type LagMeasurement struct {
	Timestamp time.Time
	Lag       time.Duration
	RecordID  string
}

LagMeasurement represents a lag measurement

type LambdaConcurrencyTest ¶

type LambdaConcurrencyTest struct {
	Name               string
	ConcurrentRequests int
	RequestBuilder     func(int) interface{}
	ValidateResponse   func(*testing.T, interface{}, error)
	MaxDuration        time.Duration
}

LambdaConcurrencyTest tests Lambda under concurrent load

type LambdaTestCase ¶

type LambdaTestCase struct {
	Name          string
	Event         interface{}
	SetupFunc     func() error
	CleanupFunc   func() error
	ValidateFunc  func(*testing.T, interface{}, error)
	ExpectedError bool
	Timeout       time.Duration
}

LambdaTestCase defines an end-to-end Lambda test case

type LambdaTestSuite ¶

type LambdaTestSuite struct {
	// contains filtered or unexported fields
}

LambdaTestSuite provides utilities for end-to-end Lambda testing

func NewLambdaTestSuite ¶

func NewLambdaTestSuite(t *testing.T, handler lambda.Handler) *LambdaTestSuite

NewLambdaTestSuite creates a new Lambda test suite

func (*LambdaTestSuite) PrintMetrics ¶

func (s *LambdaTestSuite) PrintMetrics()

PrintMetrics prints test metrics summary

func (*LambdaTestSuite) RunTest ¶

func (s *LambdaTestSuite) RunTest(tc LambdaTestCase)

RunTest executes a single Lambda test case

func (*LambdaTestSuite) RunTests ¶

func (s *LambdaTestSuite) RunTests(testCases []LambdaTestCase)

RunTests executes multiple Lambda test cases

type MemoryProfiler ¶

type MemoryProfiler struct {
	// contains filtered or unexported fields
}

MemoryProfiler tracks Lambda memory usage

func NewMemoryProfiler ¶

func NewMemoryProfiler() *MemoryProfiler

NewMemoryProfiler creates a memory profiler

func (*MemoryProfiler) GetPeakMemory ¶

func (m *MemoryProfiler) GetPeakMemory() int

GetPeakMemory returns peak memory usage

func (*MemoryProfiler) Sample ¶

func (m *MemoryProfiler) Sample()

Sample records current memory usage

type StreamEvent ¶

type StreamEvent struct {
	Type      string
	Timestamp time.Time
	Data      interface{}
	Metadata  map[string]interface{}
}

StreamEvent represents a processed stream event

type StreamEventCollector ¶

type StreamEventCollector struct {
	// contains filtered or unexported fields
}

StreamEventCollector collects events for validation

func (*StreamEventCollector) AddEvent ¶

func (c *StreamEventCollector) AddEvent(eventType string, data interface{})

AddEvent adds an event to the collector

func (*StreamEventCollector) GetEvents ¶

func (c *StreamEventCollector) GetEvents() []StreamEvent

GetEvents returns collected events

type StreamLagMonitor ¶

type StreamLagMonitor struct {
	// contains filtered or unexported fields
}

StreamLagMonitor monitors stream processing lag

func NewStreamLagMonitor ¶

func NewStreamLagMonitor(alertThreshold time.Duration) *StreamLagMonitor

NewStreamLagMonitor creates a lag monitor

func (*StreamLagMonitor) GetHighLagRecords ¶

func (m *StreamLagMonitor) GetHighLagRecords() []LagMeasurement

GetHighLagRecords returns records with high lag

func (*StreamLagMonitor) RecordLag ¶

func (m *StreamLagMonitor) RecordLag(record events.DynamoDBEventRecord)

RecordLag records stream lag

type StreamMetrics ¶

type StreamMetrics struct {
	TotalRecords     int
	ProcessedRecords int
	FailedRecords    int
	RetryCount       int
	ProcessingTime   map[string]time.Duration
	RecordLag        []time.Duration
	BatchSizes       []int
	// contains filtered or unexported fields
}

StreamMetrics tracks stream processing metrics

type StreamReplayTest ¶

type StreamReplayTest struct {
	Name          string
	StartTime     time.Time
	EndTime       time.Time
	RecordFilter  func(events.DynamoDBEventRecord) bool
	ProcessRecord func(events.DynamoDBEventRecord) error
}

StreamReplayTest replays historical stream records

type StreamTestCase ¶

type StreamTestCase struct {
	Name           string
	Operation      string // INSERT, MODIFY, REMOVE
	OldImage       interface{}
	NewImage       interface{}
	SetupFunc      func() error
	ValidateFunc   func(*testing.T, *StreamTestResult)
	ExpectedEvents []string
	Timeout        time.Duration
}

StreamTestCase defines a DynamoDB stream test case

type StreamTestResult ¶

type StreamTestResult struct {
	ProcessedRecords int
	FailedRecords    int
	Events           []StreamEvent
	Duration         time.Duration
	Errors           []error
}

StreamTestResult captures stream processing results

type StreamTestSuite ¶

type StreamTestSuite struct {
	// contains filtered or unexported fields
}

StreamTestSuite provides utilities for testing DynamoDB streams

func NewStreamTestSuite ¶

func NewStreamTestSuite(t *testing.T, handler func(context.Context, events.DynamoDBEvent) error) *StreamTestSuite

NewStreamTestSuite creates a new stream test suite

func (*StreamTestSuite) PrintMetrics ¶

func (s *StreamTestSuite) PrintMetrics()

PrintMetrics prints stream processing metrics

func (*StreamTestSuite) RunTest ¶

func (s *StreamTestSuite) RunTest(tc StreamTestCase)

RunTest executes a stream test case

type TestMetrics ¶

type TestMetrics struct {
	Invocations     int
	ColdStarts      int
	TotalDuration   time.Duration
	AverageDuration time.Duration
	MaxDuration     time.Duration
	MinDuration     time.Duration
	Errors          int
	Timeouts        int
	MemoryUsed      []int
	ConcurrentExecs int
}

TestMetrics tracks Lambda execution metrics

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL