workerclient

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 24, 2025 License: MIT Imports: 13 Imported by: 0

README

Load Test Worker Client

A high-performance distributed load testing worker client for executing load test tasks and communicating with the coordinator.

Features

  • Distributed Architecture: Supports multiple workers collaborating to execute load tests
  • High Concurrency Support: Goroutine-based concurrency model with configurable maximum concurrency
  • RPS Control: Supports requests per second limiting and gradual load ramping
  • Real-time Monitoring: Collects and reports performance metrics like response time and success rate
  • Flexible Configuration: Supports multi-step test flows, parameterized testing, and conditional execution
  • Plugin Architecture: Supports different types of request plugin implementations

Project Structure

/workerclient/
├── case_runner.go          # Test case runner
├── worker_runner.go        # Worker runner  
├── test_case.go           # Test case definition
├── result.go              # Test result processing
├── types.go               # Common data structures and type definitions
├── utils.go               # Utility functions
├── go.mod                 # Go module definition
└── README.md              # Project documentation

Core Modules

1. Worker Management (worker_runner.go)
  • Worker lifecycle management
  • Communication with coordinator
  • Task scheduling and concurrency control
2. Test Case Execution (case_runner.go)
  • Concurrency control and RPS limiting
  • Gradual load ramping
  • Real-time performance monitoring and metrics collection
3. Test Case Definition (test_case.go)
  • Multi-step test flows
  • Parameterized testing support
  • Conditional execution and error handling
4. Result Processing (result.go)
  • Unified request result interface
  • Detailed request/response information collection
  • Sub-request support
5. Data Type Definitions (types.go)
  • API communication data structures
  • Test configuration and monitoring metrics
  • Worker state management

Quick Start

Requirements
  • Go 1.19+
  • Coordinator service
Install Dependencies
go mod tidy
Basic Usage
package main

import (
    "loadtestx/workerclient"
)

func main() {
    // Create Worker Runner
    workerRunner := workerclient.NewWorkerRunner("worker-1", "http://coordinator:8080")
    
    // Create test case
    testCase := workerclient.NewTestCase("api_test")
    
    // Add test step
    testCase.AddStep(&workerclient.TestStep{
        StepName: "login",
        ReqPluginFunc: func(params map[string]string) interface{} {
            // Implement specific request logic
            result := workerclient.AcquireResult("login")
            result.Begin()
            
            // Execute HTTP request...
            result.ResponseCode = 200
            result.End()
            
            return result
        },
        GenReqParamsFunc: func(caseParams *workerclient.CaseParams) map[string]string {
            return map[string]string{
                "username": "test",
                "password": "123456",
            }
        },
    })
    
    // Add test case to worker
    workerRunner.AddTestCase(testCase)
    
    // Start worker
    workerRunner.Run()
}

Configuration

Test Case Configuration
type CaseBaseInfo struct {
    Name                string            `json:"name"`                // Test case name
    GlobalParams        map[string]string `json:"globalParams"`        // Global parameters
    MaxConcurrencyCount uint64            `json:"maxConcurrencyCount"` // Maximum concurrency
    RampingSeconds      uint64            `json:"rampingSeconds"`      // Ramping time (seconds)
    DurationMinutes     uint64            `json:"durationMinutes"`     // Duration (minutes)
    WorkerSize          uint64            `json:"workerSize"`          // Worker size
}
Internal Variables

The system automatically injects the following internal variables into request parameters:

  • __name: Step name
  • __goroutine_id: Goroutine ID
  • __executor_index: Executor index
  • __worker_total: Total number of workers
  • __worker_index: Worker index
  • __worker_size: Worker size

API Interfaces

Communication with Coordinator
Push Status
POST /worker/push_status
Send Metrics
POST /worker/send_step_metrics

Performance Monitoring

The system automatically collects the following performance metrics:

  • Response Time: Uses TDigest algorithm for data compression
  • Success Rate: Based on HTTP status code judgment
  • Throughput: Requests per second and bytes per second
  • Concurrency: Real-time active concurrency count
Metric Types
  • step_call: Step call metrics
  • step_call_integral: Step call cumulative metrics

Dependencies

  • github.com/Narasimha1997/ratelimiter: RPS limiting
  • github.com/caio/go-tdigest/v4: Performance data compression
  • github.com/eapache/queue: Queue management
  • github.com/google/uuid: UUID generation

Development Guide

Implementing Custom Request Plugins
type CustomRequestPlugin struct {
    // Custom fields
}

func (p *CustomRequestPlugin) Execute(params map[string]string) workerclient.IResultV1 {
    result := workerclient.AcquireResult("custom_request")
    result.Begin()
    
    // Implement custom request logic
    
    result.End()
    return result
}
Adding Test Steps
testStep := &workerclient.TestStep{
    StepName: "custom_step",
    ReqPluginFunc: func(params map[string]string) interface{} {
        // Request processing logic
    },
    GenReqParamsFunc: func(caseParams *workerclient.CaseParams) map[string]string {
        // Parameter generation logic
    },
    PreFunc: func(caseParams *workerclient.CaseParams, reqParams map[string]string) {
        // Pre-processing
    },
    PostFunc: func(caseParams *workerclient.CaseParams, reqParams map[string]string, res workerclient.IResultV1) {
        // Post-processing
    },
    ExecWhenFunc: func(caseParams *workerclient.CaseParams, reqParams map[string]string) bool {
        // Execution condition judgment
        return true
    },
    ContinueWhenFailed: false, // Whether to continue on failure
    RpsLimitFunc: func(caseRunnerInfo workerclient.CaseRunnerInfo, globalParams map[string]string) uint64 {
        // RPS limiting
        return 100
    },
}

Architecture Overview

System Components
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Coordinator   │◄──►│  Worker Client  │◄──►│  Target System  │
│                 │    │                 │    │                 │
│ - Task Schedule │    │ - Test Execution│    │ - API Endpoints │
│ - Metrics Collect│   │ - Result Process│    │ - Services      │
│ - Worker Manage │    │ - Status Report │    │                 │
└─────────────────┘    └─────────────────┘    └─────────────────┘
Data Flow
  1. Task Assignment: Coordinator assigns test tasks to workers
  2. Test Execution: Workers execute test cases with specified concurrency
  3. Metrics Collection: Real-time collection of performance metrics
  4. Result Reporting: Workers report results back to coordinator
  5. Analysis: Coordinator analyzes and aggregates results

Best Practices

Test Case Design
  • Use meaningful step names for better debugging
  • Implement proper error handling in request plugins
  • Set appropriate RPS limits to avoid overwhelming target systems
  • Use parameterized testing for data-driven scenarios
Performance Optimization
  • Configure appropriate concurrency levels based on target system capacity
  • Use gradual ramping to avoid sudden load spikes
  • Monitor system resources during test execution
  • Implement proper cleanup in tear-down functions
Monitoring and Debugging
  • Check coordinator logs for task assignment issues
  • Monitor worker status and active concurrency
  • Analyze response time distributions using TDigest data
  • Use internal variables for request correlation

Troubleshooting

Common Issues
  1. Worker not receiving tasks: Check coordinator connectivity and worker registration
  2. High failure rates: Verify target system availability and request parameters
  3. Memory issues: Reduce concurrency or optimize request plugins
  4. Network timeouts: Adjust timeout settings and check network connectivity
Debug Mode

Enable debug logging by setting appropriate log levels in your application.

License

MIT License.

Contributing

Issues and Pull Requests are welcome to improve the project.

Contact

For questions, please contact the development team.

Documentation

Index

Constants

View Source
const (
	InnerVarName          = "__name"
	InnerVarGoroutineId   = "__goroutine_id"
	InnerVarExecutorIndex = "__executor_index"
	InnerVarWorkerTotal   = "__worker_total"
	InnerVarWorkerIndex   = "__worker_index"
	InnerVarWorkerSize    = "__worker_size"
)

Variables

This section is empty.

Functions

func UnserializeTDigest

func UnserializeTDigest(nodes []TDNode) *tdigest.TDigest

Types

type CallMonitor

type CallMonitor struct {
	TotalCount uint64 `json:"totalCount"`
	TotalRt    uint64 `json:"totalRt"` // unit: millisecond
	MaxRt      uint64 `json:"maxRt"`
	MinRt      uint64 `json:"minRt"`
	SuccCount  uint64 `json:"succCount"`
	FailCount  uint64 `json:"failCount"`
	BeginTime  uint64 `json:"beginTime"`
	LastTime   uint64 `json:"lastTime"`
}

type CallTimeMapKey

type CallTimeMapKey struct {
	TaskId      string `json:"taskId"`
	MetricName  string `json:"metricName"`
	IsWholeCase bool   `json:"isWholeCase"`
	WorkerName  string `json:"workerName"`
	CaseName    string `json:"caseName"`
	StepName    string `json:"stepName"`
	Success     bool   `json:"success"`
	StatusCode  int    `json:"statusCode"`
	Ts          int    `json:"ts"`
}

type CallTimeMetric

type CallTimeMetric struct {
	Key   CallTimeMapKey `json:"key"`
	Value []TDNode       `json:"value"`
}

type CaseBaseInfo

type CaseBaseInfo struct {
	Name                string            `json:"name" binding:"required"`
	GlobalParams        map[string]string `json:"globalParams" binding:"required"`
	MaxConcurrencyCount uint64            `json:"maxConcurrencyCount" binding:"required"`
	RampingSeconds      uint64            `json:"rampingSeconds" binding:"required"`
	DurationMinutes     uint64            `json:"durationMinutes"  binding:"required"`
	WorkName            string            `json:"workName" binding:"required"`
	WorkerSize          uint64            `json:"workerSize" binding:"required"`
	TaskId              string            `json:"taskId"`
}

type CaseGenFunc

type CaseGenFunc func(caseRunnerInfo CaseRunnerInfo) *TestCase

type CaseParmas

type CaseParmas struct {
	GlobalParams    map[string]string
	CoroutineParams map[string]string
	RuntimeParams   map[string]string
	CaseRunnerInfo  CaseRunnerInfo
}

type CaseRunner

type CaseRunner struct {
	Info                   CaseRunnerInfo
	TestCase               *TestCase
	GlobalParams           map[string]string
	IsRunning              bool
	Output                 *Output
	MetricsChan            chan ([]*CallTimeMetric)
	ActiveConcurrencyCount int64
	CoordinatorApi         string
	// contains filtered or unexported fields
}

func (*CaseRunner) HandleOuput

func (cr *CaseRunner) HandleOuput()

func (*CaseRunner) Run

func (cr *CaseRunner) Run()

func (*CaseRunner) SendMetrics

func (cr *CaseRunner) SendMetrics()

func (*CaseRunner) SetGlobalParams

func (cr *CaseRunner) SetGlobalParams(globalParams map[string]string)

func (*CaseRunner) StopRunChannel

func (cr *CaseRunner) StopRunChannel()

type CaseRunnerInfo

type CaseRunnerInfo struct {
	WorkerName                string
	MaxConcurrencyInThisWoker uint64
	RampingSeconds            uint64
	DurationMinutes           uint64
	WorkerTotal               uint64
	WorkerIndex               uint64
	WorkerSize                uint64
}

type CaseSummary

type CaseSummary struct {
	CallMonitors         map[string]*CallMonitor `json:"callMonitor" binding:"optional"`
	LastConcurrencyCount uint64                  `json:"lastConcurrencyCount"`
}

type HTTPClient

type HTTPClient struct {
	// contains filtered or unexported fields
}

func NewHTTPClient

func NewHTTPClient(timeout time.Duration) *HTTPClient

func (*HTTPClient) PostJSON

func (c *HTTPClient) PostJSON(url string, requestBody interface{}, responseBody interface{}) error

type IResultV1

type IResultV1 interface {
	GetName() string
	GetUrl() string
	GetMethod() string
	GetRequestHeader() map[string]string
	GetRequestBody() string
	GetSentBytes() int
	GetResponseCode() int
	GetResponseHeader() map[string]string
	GetResponseBody() string
	GetReceivedBytes() int
	GetFailureMessage() string
	IsSuccess() bool
	GetBeginTime() int64
	GetEndTime() int64
	GetSubResults() []interface{}
}

type Output

type Output struct {
	ResChans chan IResultV1
}

type ResponseBody

type ResponseBody struct {
	Code int         `json:"code"`
	Data interface{} `json:"data"`
	Msg  string      `json:"msg"`
}

type Result

type Result struct {
	Name           string
	Url            string
	Method         string
	RequestHeader  map[string]string
	RequestBody    string
	SentBytes      int
	ResponseCode   int
	ResponseHeader map[string]string
	ResponseBody   string
	ReceivedBytes  int
	FailureMessage string
	Success        bool
	BeginTime      int64
	EndTime        int64
	SubResults     []interface{}
	SubIndex       int
}

func AcquireResult

func AcquireResult(name string) *Result

func (*Result) AddSub

func (r *Result) AddSub(name string, useNamePrefix bool) *Result

func (*Result) Begin

func (r *Result) Begin()

begin records begin time, do not forget call this function to update

func (*Result) End

func (r *Result) End()

func (*Result) GetBeginTime

func (r *Result) GetBeginTime() int64

func (*Result) GetEndTime

func (r *Result) GetEndTime() int64

func (*Result) GetFailureMessage

func (r *Result) GetFailureMessage() string

func (*Result) GetMethod

func (r *Result) GetMethod() string

func (*Result) GetName

func (r *Result) GetName() string

func (*Result) GetReceivedBytes

func (r *Result) GetReceivedBytes() int

func (*Result) GetRequestBody

func (r *Result) GetRequestBody() string

func (*Result) GetRequestHeader

func (r *Result) GetRequestHeader() map[string]string

func (*Result) GetResponseBody

func (r *Result) GetResponseBody() string

func (*Result) GetResponseCode

func (r *Result) GetResponseCode() int

func (*Result) GetResponseHeader

func (r *Result) GetResponseHeader() map[string]string

func (*Result) GetSentBytes

func (r *Result) GetSentBytes() int

func (*Result) GetSubResults

func (r *Result) GetSubResults() []interface{}

func (*Result) GetUrl

func (r *Result) GetUrl() string

func (*Result) IsSuccess

func (r *Result) IsSuccess() bool

type RpsQLimiter

type RpsQLimiter struct {
	Lock   sync.Mutex
	Limter *ratelimiter.AttributeBasedLimiter
	QMap   map[string]*queue.Queue
}

type RspWorkerPushStatus

type RspWorkerPushStatus struct {
	Worker         *Worker       `json:"worker"`
	ShouldRunCase  bool          `json:"shouldRunCase"`
	ShouldStopCase bool          `json:"shouldStopCase"`
	TestCaseInfo   *TestCaseInfo `json:"testCase"`
}

type RspWorkerPushStatusBody

type RspWorkerPushStatusBody struct {
	Code int                  `json:"code"`
	Data *RspWorkerPushStatus `json:"data"`
	Msg  string               `json:"msg"`
}

type TDNode

type TDNode struct {
	Mean  float64 `json:"mean"`
	Count uint64  `json:"count"`
}

func SerializeTDigest

func SerializeTDigest(td *tdigest.TDigest) []TDNode

type TestCase

type TestCase struct {
	Name      string
	Teststeps []*TestStep
	TearDown  func(coroutineParams map[string]string)
}

func NewTestCase

func NewTestCase(caseName string) *TestCase

func (*TestCase) AddStep

func (tc *TestCase) AddStep(ts *TestStep)

func (*TestCase) Run

func (tc *TestCase) Run(globalParams, coroutineParams map[string]string, rpsQLimiter *RpsQLimiter, output *Output, caseRunner *CaseRunner)

type TestCaseInfo

type TestCaseInfo struct {
	BaseInfo           *CaseBaseInfo `json:"baseInfo"`
	WorkerTotal        uint64        `json:"workerTotal" binding:"optional"`
	RunningWorkerCount uint64        `json:"runningWorkerCount" binding:"optional"`
	RuningWorkerIds    []string      `json:"runningWorkerIds"`
	Status             string        `json:"status" binding:"optional"`
	BeginTime          uint64        `json:"beginTime"`
	LastTime           uint64        `json:"lastTime"`
	Summary            *CaseSummary  `json:"summary" binding:"optional"`
}

type TestCaseSummary

type TestCaseSummary struct {
	Name                   string `json:"name" binding:"required"`
	Status                 string `json:"status" binding:"required"`
	ActiveConcurrencyCount int64  `json:"activeConcurrencyCount"`
	TaskId                 string `json:"taskId"`
}

type TestStep

type TestStep struct {
	StepIndex            string
	StepName             string
	ReqPluginFunc        func(reqPamrams map[string]string) (res interface{})
	SetRuntimeParamsFunc func(caseParmas *CaseParmas)
	GenReqParamsFunc     func(caseParmas *CaseParmas) (p map[string]string)
	ContinueWhenFailed   bool
	ExecWhenFunc         func(caseParmas *CaseParmas, reqPamrams map[string]string) (b bool)
	PreFunc              func(caseParmas *CaseParmas, reqPamrams map[string]string)
	PostFunc             func(caseParmas *CaseParmas, reqPamrams map[string]string, res IResultV1)
	RpsLimitFunc         func(caseRunnerInfo CaseRunnerInfo, globalParams map[string]string) (rps uint64)
}

type Worker

type Worker struct {
	BaseInfo    *WorkerBaseInfo `json:"baseInfo" binding:"required"`
	LastAciveAt int64           `json:"lastAciveAt"`
}

type WorkerBaseInfo

type WorkerBaseInfo struct {
	Name      string             `json:"name" binding:"required"`
	ID        string             `json:"id" binding:"required"`
	Index     int64              `json:"index"`
	Status    string             `json:"status" binding:"required"`
	TestCases []*TestCaseSummary `json:"testCases"`
}

type WorkerPushStatusParams

type WorkerPushStatusParams struct {
	BaseInfo *WorkerBaseInfo `json:"baseInfo" binding:"required"`
}

type WorkerRunner

type WorkerRunner struct {
	Worker            *Worker
	CoordinatorApi    string
	CaseMaps          map[string]*TestCase
	RunningCaseRunner *CaseRunner
	// contains filtered or unexported fields
}

func NewWorkerRunner

func NewWorkerRunner(workerName, coordinatorApi string) *WorkerRunner

func (*WorkerRunner) AddTestCase

func (rw *WorkerRunner) AddTestCase(tc *TestCase)

func (*WorkerRunner) PushStatus

func (rw *WorkerRunner) PushStatus() (rwps *RspWorkerPushStatus)

func (*WorkerRunner) RealRun

func (rw *WorkerRunner) RealRun()

func (*WorkerRunner) Run

func (rw *WorkerRunner) Run()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL