README
¶
GNSS Processing Pipeline Architecture
This package implements a modular, configurable, and testable architecture for GNSS signal processing. It's designed following Go best practices and inspired by Rob Pike's philosophy of simplicity and composition.
Overview
The architecture is based on a pipeline pattern where satellite data flows through a series of independent processors. Each processor does one specific task, making the system:
- Modular: Add new corrections or models without changing existing code
- Testable: Each processor can be tested independently
- Configurable: Toggle features on/off to test their impact
- Extensible: Easy to support new GNSS systems or processing modes
- Concurrent: Process multiple satellites in parallel
Core Concepts
1. SatelliteState
The SatelliteState struct holds all information about a satellite at a specific epoch:
type SatelliteState struct {
System gnss.System // GPS, GALILEO, etc.
SatelliteID int // Satellite PRN/SVID
Time time.Time // Observation epoch
Observations []observation.Observation // Raw measurements
Ephemeris ephemeris.TypeSpecificEphemeris
Position *ephemeris.SatPosition // Satellite position
Geometry *Geometry // Azimuth, elevation, range
Corrections Corrections // All atmospheric/relativistic corrections
Combinations Combinations // Linear combinations (IF, NL, WL, etc.)
Quality QualityIndicators // Health, slips, SNR, etc.
}
Fields are pointers (nil = not computed yet) to distinguish "not computed" from "computed as zero".
2. Processor Interface
Every processing step implements this simple interface:
type Processor interface {
Process(ctx context.Context, state *SatelliteState) error
Name() string
}
Processors are stateless and modify the SatelliteState in-place. They can be chained together in any order.
3. Pipeline
A Pipeline chains processors together and executes them sequentially:
pipeline := pipeline.NewPipeline(
processors.NewEphemerisProcessor(ephStore),
processors.NewPositionProcessor(true),
processors.NewGeometryProcessor(receiverPos),
processors.NewTroposphereProcessor("niell", receiverPos, true),
processors.NewIonosphereFreeProcessor(true),
)
err := pipeline.Process(ctx, state)
Available Processors
Basic Processors
- EphemerisProcessor: Attaches ephemeris data from the store
- PositionProcessor: Calculates satellite position and clock
- GeometryProcessor: Calculates azimuth, elevation, range
- ElevationMaskProcessor: Marks satellites below elevation mask
Correction Processors
- TroposphereProcessor: Tropospheric delay (Niell, Saastamoinen, STANAG)
- SagnacProcessor: Sagnac effect correction
- PhaseWindupProcessor: Carrier phase windup (placeholder)
- (More to come: ionosphere, tidal loading, PCV/PCO, etc.)
Linear Combination Processors
- IonosphereFreeProcessor: Ionosphere-free combination (L1/L2)
- NarrowLaneProcessor: Narrow-lane combination
- WideLaneProcessor: Wide-lane combination
- GeometryFreeProcessor: Geometry-free combination
- MelbourneWubbenaProcessor: Melbourne-Wübbena combination
Configuration
Use ProcessingConfig to control which processors are enabled:
cfg := config.ProcessingConfig{
ReceiverPosition: coordinates.Vector3D{x, y, z},
Systems: []gnss.System{gnss.GPS, gnss.GALILEO},
ElevationMasks: map[gnss.System]float64{
gnss.GPS: 7.0,
gnss.GALILEO: 7.0,
},
Models: config.ModelConfig{
Troposphere: "niell", // or "sasstamoinen", "stanag"
},
Corrections: config.CorrectionConfig{
Troposphere: true, // Enable/disable troposphere
Sagnac: true, // Enable/disable Sagnac
PhaseWindup: false, // Toggle to test impact!
},
Combinations: config.CombinationConfig{
IonosphereFree: true,
NarrowLane: false,
},
}
Default Configurations
Predefined configs for common use cases:
// Single Point Positioning
cfg := config.DefaultConfig()
// Precise Point Positioning
cfg := config.DefaultPPPConfig()
// Time-Differenced Carrier Phase (velocity)
cfg := config.DefaultTDCPConfig()
Pipeline Builder
The PipelineBuilder constructs pipelines from configuration:
builder := config.NewPipelineBuilder(cfg, ephemerisStore)
pipeline, err := builder.Build()
if err != nil {
log.Fatal(err)
}
The builder automatically orders processors correctly (e.g., geometry before corrections).
Processing Epochs
Use EpochProcessor to process entire epochs:
epochProc := pipeline.NewEpochProcessor(pipeline, true) // true = concurrent
states, err := epochProc.Process(ctx, epoch)
// Filter ready satellites
readyStates := pipeline.FilterReadyStates(states)
// Group by system
bySystem := pipeline.GroupBySystem(readyStates)
Filtering Utilities
// Only satellites ready for positioning
ready := pipeline.FilterReadyStates(states)
// Only GPS and Galileo
gpsGal := pipeline.FilterBySystem(states, gnss.GPS, gnss.GALILEO)
// Only above 15 degrees elevation
highElev := pipeline.FilterByElevation(states, 15.0)
// Custom filter
filtered := pipeline.FilterStates(states, func(s *SatelliteState) bool {
return s.Quality.SignalStrengthL1 > 35.0
})
Example: Simple SPP
func SimpleSPP(epoch observation.Epoch, ephStore *ephemeris.EphemerisStore) {
// 1. Configure
cfg := config.DefaultConfig()
cfg.ReceiverPosition = coordinates.Vector3D{x, y, z}
cfg.Corrections.Troposphere = true
cfg.Combinations.IonosphereFree = true
// 2. Build pipeline
builder := config.NewPipelineBuilder(cfg, ephStore)
pipe, _ := builder.Build()
// 3. Process epoch
epochProc := pipeline.NewEpochProcessor(pipe, true)
states, _ := epochProc.Process(context.Background(), epoch)
// 4. Filter and solve
ready := pipeline.FilterReadyStates(states)
position := solveBancroft(ready)
}
Example: Testing Correction Impact
One of the key design goals is making it easy to test the impact of corrections:
// Test with and without troposphere correction
for _, enableTropo := range []bool{false, true} {
cfg := config.DefaultConfig()
cfg.Corrections.Troposphere = enableTropo
builder := config.NewPipelineBuilder(cfg, ephStore)
pipe, _ := builder.Build()
states, _ := processEpoch(pipe, epoch)
position := solve(states)
fmt.Printf("Troposphere %v: position = %v\n", enableTropo, position)
}
Extending the System
Adding a New Correction
- Create a new processor:
type MyNewCorrectionProcessor struct {
enabled bool
}
func (p *MyNewCorrectionProcessor) Process(ctx context.Context, state *SatelliteState) error {
if !p.enabled {
return nil
}
// Calculate correction
correction := calculateMyCorrection(state)
state.Corrections.MyNewCorrection = &correction
return nil
}
func (p *MyNewCorrectionProcessor) Name() string {
return "my_new_correction"
}
- Add to configuration:
type CorrectionConfig struct {
// ... existing fields
MyNewCorrection bool
}
- Add to builder:
func (b *PipelineBuilder) buildCorrectionProcessors() []Processor {
// ... existing processors
if b.config.Corrections.MyNewCorrection {
procs = append(procs, NewMyNewCorrectionProcessor(true))
}
}
Adding a New GNSS System
The architecture already supports all systems defined in gnss.System. To add observations:
- Define signal mappings in your configuration
- Add ephemeris message types (if new)
- Processors work automatically with any system!
Concurrent Processing
Process satellites in parallel for better performance:
epochProc := pipeline.NewEpochProcessor(pipeline, true) // concurrent = true
epochProc.SetMaxRoutines(8) // limit to 8 concurrent goroutines
states, err := epochProc.Process(ctx, epoch)
Context and Cancellation
All processors support context for cancellation and timeouts:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := pipeline.Process(ctx, state)
Testing
Each processor can be tested independently:
func TestTroposphereProcessor(t *testing.T) {
proc := processors.NewTroposphereProcessor("niell", receiverPos, true)
state := &pipeline.SatelliteState{
Geometry: &pipeline.Geometry{Elevation: 45.0},
}
err := proc.Process(context.Background(), state)
require.NoError(t, err)
require.NotNil(t, state.Corrections.Troposphere)
require.Greater(t, *state.Corrections.Troposphere, 0.0)
}
Comparison with Old Architecture
Old (satellite/state)
- Monolithic
Prepare()method with 10+ calculations - Hard to test individual components
- No way to toggle corrections on/off
- Tight coupling to configuration
- Sequential processing only
New (processing/pipeline)
- Each calculation is a separate processor
- Each processor independently testable
- Easy to enable/disable any feature
- Loose coupling via interfaces
- Concurrent processing support
- Clear data flow and dependencies
Future Enhancements
Planned additions to the pipeline architecture:
- Ionosphere correction processors (Klobuchar, NeQuick)
- Phase windup implementation
- Tidal loading corrections
- Ocean loading corrections
- Antenna PCV/PCO corrections
- Precise orbit/clock support (SP3)
- Kalman filter processor for PPP
- Ambiguity resolution processors
- Cycle slip detection processors
Architecture Principles
This design follows Rob Pike's Go philosophy:
- Simplicity: Each processor does ONE thing
- Composition: Complex behavior from simple parts
- Interfaces: Small, focused interfaces
- Data flow: Clear pipeline pattern
- Concurrency: First-class support via goroutines
- Testability: Every component independently testable
Performance Considerations
- Concurrent processing: Use for epochs with many satellites (10+)
- Sequential processing: Better for small epochs (< 10 satellites)
- Memory: States are small; safe to create thousands
- Cloning: Use
state.Clone()if you need to preserve state
Questions?
See pkg/processing/examples/ for complete working examples.