processing/

directory
v0.35.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2026 License: Apache-2.0

README

GNSS Processing Pipeline Architecture

This package implements a modular, configurable, and testable architecture for GNSS signal processing. It's designed following Go best practices and inspired by Rob Pike's philosophy of simplicity and composition.

Overview

The architecture is based on a pipeline pattern where satellite data flows through a series of independent processors. Each processor does one specific task, making the system:

  • Modular: Add new corrections or models without changing existing code
  • Testable: Each processor can be tested independently
  • Configurable: Toggle features on/off to test their impact
  • Extensible: Easy to support new GNSS systems or processing modes
  • Concurrent: Process multiple satellites in parallel

Core Concepts

1. SatelliteState

The SatelliteState struct holds all information about a satellite at a specific epoch:

type SatelliteState struct {
    System      gnss.System              // GPS, GALILEO, etc.
    SatelliteID int                      // Satellite PRN/SVID
    Time        time.Time                // Observation epoch
    
    Observations []observation.Observation  // Raw measurements
    Ephemeris    ephemeris.TypeSpecificEphemeris
    Position     *ephemeris.SatPosition   // Satellite position
    Geometry     *Geometry                // Azimuth, elevation, range
    Corrections  Corrections              // All atmospheric/relativistic corrections
    Combinations Combinations             // Linear combinations (IF, NL, WL, etc.)
    Quality      QualityIndicators        // Health, slips, SNR, etc.
}

Fields are pointers (nil = not computed yet) to distinguish "not computed" from "computed as zero".

2. Processor Interface

Every processing step implements this simple interface:

type Processor interface {
    Process(ctx context.Context, state *SatelliteState) error
    Name() string
}

Processors are stateless and modify the SatelliteState in-place. They can be chained together in any order.

3. Pipeline

A Pipeline chains processors together and executes them sequentially:

pipeline := pipeline.NewPipeline(
    processors.NewEphemerisProcessor(ephStore),
    processors.NewPositionProcessor(true),
    processors.NewGeometryProcessor(receiverPos),
    processors.NewTroposphereProcessor("niell", receiverPos, true),
    processors.NewIonosphereFreeProcessor(true),
)

err := pipeline.Process(ctx, state)

Available Processors

Basic Processors
  • EphemerisProcessor: Attaches ephemeris data from the store
  • PositionProcessor: Calculates satellite position and clock
  • GeometryProcessor: Calculates azimuth, elevation, range
  • ElevationMaskProcessor: Marks satellites below elevation mask
Correction Processors
  • TroposphereProcessor: Tropospheric delay (Niell, Saastamoinen, STANAG)
  • SagnacProcessor: Sagnac effect correction
  • PhaseWindupProcessor: Carrier phase windup (placeholder)
  • (More to come: ionosphere, tidal loading, PCV/PCO, etc.)
Linear Combination Processors
  • IonosphereFreeProcessor: Ionosphere-free combination (L1/L2)
  • NarrowLaneProcessor: Narrow-lane combination
  • WideLaneProcessor: Wide-lane combination
  • GeometryFreeProcessor: Geometry-free combination
  • MelbourneWubbenaProcessor: Melbourne-Wübbena combination

Configuration

Use ProcessingConfig to control which processors are enabled:

cfg := config.ProcessingConfig{
    ReceiverPosition: coordinates.Vector3D{x, y, z},
    Systems: []gnss.System{gnss.GPS, gnss.GALILEO},
    
    ElevationMasks: map[gnss.System]float64{
        gnss.GPS:     7.0,
        gnss.GALILEO: 7.0,
    },
    
    Models: config.ModelConfig{
        Troposphere: "niell",  // or "sasstamoinen", "stanag"
    },
    
    Corrections: config.CorrectionConfig{
        Troposphere: true,   // Enable/disable troposphere
        Sagnac:      true,   // Enable/disable Sagnac
        PhaseWindup: false,  // Toggle to test impact!
    },
    
    Combinations: config.CombinationConfig{
        IonosphereFree: true,
        NarrowLane:     false,
    },
}
Default Configurations

Predefined configs for common use cases:

// Single Point Positioning
cfg := config.DefaultConfig()

// Precise Point Positioning
cfg := config.DefaultPPPConfig()

// Time-Differenced Carrier Phase (velocity)
cfg := config.DefaultTDCPConfig()

Pipeline Builder

The PipelineBuilder constructs pipelines from configuration:

builder := config.NewPipelineBuilder(cfg, ephemerisStore)
pipeline, err := builder.Build()
if err != nil {
    log.Fatal(err)
}

The builder automatically orders processors correctly (e.g., geometry before corrections).

Processing Epochs

Use EpochProcessor to process entire epochs:

epochProc := pipeline.NewEpochProcessor(pipeline, true) // true = concurrent
states, err := epochProc.Process(ctx, epoch)

// Filter ready satellites
readyStates := pipeline.FilterReadyStates(states)

// Group by system
bySystem := pipeline.GroupBySystem(readyStates)
Filtering Utilities
// Only satellites ready for positioning
ready := pipeline.FilterReadyStates(states)

// Only GPS and Galileo
gpsGal := pipeline.FilterBySystem(states, gnss.GPS, gnss.GALILEO)

// Only above 15 degrees elevation
highElev := pipeline.FilterByElevation(states, 15.0)

// Custom filter
filtered := pipeline.FilterStates(states, func(s *SatelliteState) bool {
    return s.Quality.SignalStrengthL1 > 35.0
})

Example: Simple SPP

func SimpleSPP(epoch observation.Epoch, ephStore *ephemeris.EphemerisStore) {
    // 1. Configure
    cfg := config.DefaultConfig()
    cfg.ReceiverPosition = coordinates.Vector3D{x, y, z}
    cfg.Corrections.Troposphere = true
    cfg.Combinations.IonosphereFree = true
    
    // 2. Build pipeline
    builder := config.NewPipelineBuilder(cfg, ephStore)
    pipe, _ := builder.Build()
    
    // 3. Process epoch
    epochProc := pipeline.NewEpochProcessor(pipe, true)
    states, _ := epochProc.Process(context.Background(), epoch)
    
    // 4. Filter and solve
    ready := pipeline.FilterReadyStates(states)
    position := solveBancroft(ready)
}

Example: Testing Correction Impact

One of the key design goals is making it easy to test the impact of corrections:

// Test with and without troposphere correction
for _, enableTropo := range []bool{false, true} {
    cfg := config.DefaultConfig()
    cfg.Corrections.Troposphere = enableTropo
    
    builder := config.NewPipelineBuilder(cfg, ephStore)
    pipe, _ := builder.Build()
    
    states, _ := processEpoch(pipe, epoch)
    position := solve(states)
    
    fmt.Printf("Troposphere %v: position = %v\n", enableTropo, position)
}

Extending the System

Adding a New Correction
  1. Create a new processor:
type MyNewCorrectionProcessor struct {
    enabled bool
}

func (p *MyNewCorrectionProcessor) Process(ctx context.Context, state *SatelliteState) error {
    if !p.enabled {
        return nil
    }
    
    // Calculate correction
    correction := calculateMyCorrection(state)
    state.Corrections.MyNewCorrection = &correction
    return nil
}

func (p *MyNewCorrectionProcessor) Name() string {
    return "my_new_correction"
}
  1. Add to configuration:
type CorrectionConfig struct {
    // ... existing fields
    MyNewCorrection bool
}
  1. Add to builder:
func (b *PipelineBuilder) buildCorrectionProcessors() []Processor {
    // ... existing processors
    
    if b.config.Corrections.MyNewCorrection {
        procs = append(procs, NewMyNewCorrectionProcessor(true))
    }
}
Adding a New GNSS System

The architecture already supports all systems defined in gnss.System. To add observations:

  1. Define signal mappings in your configuration
  2. Add ephemeris message types (if new)
  3. Processors work automatically with any system!

Concurrent Processing

Process satellites in parallel for better performance:

epochProc := pipeline.NewEpochProcessor(pipeline, true) // concurrent = true
epochProc.SetMaxRoutines(8) // limit to 8 concurrent goroutines
states, err := epochProc.Process(ctx, epoch)

Context and Cancellation

All processors support context for cancellation and timeouts:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err := pipeline.Process(ctx, state)

Testing

Each processor can be tested independently:

func TestTroposphereProcessor(t *testing.T) {
    proc := processors.NewTroposphereProcessor("niell", receiverPos, true)
    
    state := &pipeline.SatelliteState{
        Geometry: &pipeline.Geometry{Elevation: 45.0},
    }
    
    err := proc.Process(context.Background(), state)
    require.NoError(t, err)
    require.NotNil(t, state.Corrections.Troposphere)
    require.Greater(t, *state.Corrections.Troposphere, 0.0)
}

Comparison with Old Architecture

Old (satellite/state)
  • Monolithic Prepare() method with 10+ calculations
  • Hard to test individual components
  • No way to toggle corrections on/off
  • Tight coupling to configuration
  • Sequential processing only
New (processing/pipeline)
  • Each calculation is a separate processor
  • Each processor independently testable
  • Easy to enable/disable any feature
  • Loose coupling via interfaces
  • Concurrent processing support
  • Clear data flow and dependencies

Future Enhancements

Planned additions to the pipeline architecture:

  • Ionosphere correction processors (Klobuchar, NeQuick)
  • Phase windup implementation
  • Tidal loading corrections
  • Ocean loading corrections
  • Antenna PCV/PCO corrections
  • Precise orbit/clock support (SP3)
  • Kalman filter processor for PPP
  • Ambiguity resolution processors
  • Cycle slip detection processors

Architecture Principles

This design follows Rob Pike's Go philosophy:

  1. Simplicity: Each processor does ONE thing
  2. Composition: Complex behavior from simple parts
  3. Interfaces: Small, focused interfaces
  4. Data flow: Clear pipeline pattern
  5. Concurrency: First-class support via goroutines
  6. Testability: Every component independently testable

Performance Considerations

  • Concurrent processing: Use for epochs with many satellites (10+)
  • Sequential processing: Better for small epochs (< 10 satellites)
  • Memory: States are small; safe to create thousands
  • Cloning: Use state.Clone() if you need to preserve state

Questions?

See pkg/processing/examples/ for complete working examples.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL