Skip to main content

nyx_space/od/msr/trackingdata/
mod.rs

1/*
2    Nyx, blazing fast astrodynamics
3    Copyright (C) 2018-onwards Christopher Rabotin <christopher.rabotin@gmail.com>
4
5    This program is free software: you can redistribute it and/or modify
6    it under the terms of the GNU Affero General Public License as published
7    by the Free Software Foundation, either version 3 of the License, or
8    (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU Affero General Public License for more details.
14
15    You should have received a copy of the GNU Affero General Public License
16    along with this program.  If not, see <https://www.gnu.org/licenses/>.
17*/
18use super::{MeasurementType, measurement::Measurement};
19use core::fmt;
20use hifitime::prelude::{Duration, Epoch};
21use indexmap::{IndexMap, IndexSet};
22use log::{error, info, warn};
23use std::collections::BTreeMap;
24use std::collections::btree_map::Entry;
25use std::ops::Bound::{self, Excluded, Included, Unbounded};
26use std::ops::{Add, AddAssign, RangeBounds};
27
28mod io_ccsds_tdm;
29mod io_parquet;
30
31#[cfg(feature = "python")]
32use pyo3::prelude::*;
33#[cfg(feature = "python")]
34mod python;
35
36/// Tracking data storing all of measurements as a B-Tree.
37/// It inherently does NOT support multiple concurrent measurements from several trackers.
38///
39/// # Measurement Moduli, e.g. range modulus
40///
41/// In the case of ranging, and possibly other data types, a code is used to measure the range to the spacecraft. The length of this code
42/// determines the ambiguity resolution, as per equation 9 in section 2.2.2.2 of the JPL DESCANSO, document 214, _Pseudo-Noise and Regenerative Ranging_.
43/// For example, using the JPL Range Code and a frequency range clock of 1 MHz, the range ambiguity is 75,660 km. In other words,
44/// as soon as the spacecraft is at a range of 75,660 + 1 km the JPL Range Code will report the vehicle to be at a range of 1 km.
45/// This is simply because the range code overlaps with itself, effectively loosing track of its own reference:
46/// it's due to the phase shift of the signal "lapping" the original signal length.
47///
48/// ```text
49///             (Spacecraft)
50///             ^
51///             |    Actual Distance = 75,661 km
52///             |
53/// 0 km                                         75,660 km (Wrap-Around)
54/// |-----------------------------------------------|
55///   When the "code length" is exceeded,
56///   measurements wrap back to 0.
57///
58/// So effectively:
59///     Observed code range = Actual range (mod 75,660 km)
60///     75,661 km → 1 km
61///
62/// ```
63///
64/// Nyx can only resolve the range ambiguity if the tracking data specifies a modulus for this specific measurement type.
65/// For example, in the case of the JPL Range Code and a 1 MHz range clock, the ambiguity interval is 75,660 km.
66///
67/// The measurement used in the Orbit Determination Process then becomes the following, where `//` represents the [Euclidian division](https://doc.rust-lang.org/std/primitive.f64.html#method.div_euclid).
68///
69/// ```text
70/// k = computed_obs // ambiguity_interval
71/// real_obs = measured_obs + k * modulus
72/// ```
73///
74/// Reference: JPL DESCANSO, document 214, _Pseudo-Noise and Regenerative Ranging_.
75///
76#[derive(Clone, Default)]
77#[cfg_attr(feature = "python", pyclass(from_py_object))]
78pub struct TrackingDataArc {
79    /// All measurements in this data arc
80    pub measurements: BTreeMap<Epoch, Measurement>, // BUG: Consider a map of tracking to epoch!
81    /// Source file if loaded from a file or saved to a file.
82    pub source: Option<String>,
83    /// Optionally provide a map of modulos (e.g. the RANGE_MODULO of CCSDS TDM).
84    pub moduli: Option<IndexMap<MeasurementType, f64>>,
85    /// Reject all of the measurements, useful for debugging passes.
86    pub force_reject: bool,
87}
88
89#[cfg_attr(feature = "python", pymethods)]
90impl TrackingDataArc {
91    /// Returns the start epoch of this tracking arc
92    pub fn start_epoch(&self) -> Option<Epoch> {
93        self.measurements.first_key_value().map(|(k, _)| *k)
94    }
95
96    /// Returns the end epoch of this tracking arc
97    pub fn end_epoch(&self) -> Option<Epoch> {
98        self.measurements.last_key_value().map(|(k, _)| *k)
99    }
100
101    /// Returns the duration this tracking arc
102    pub fn duration(&self) -> Option<Duration> {
103        match self.start_epoch() {
104            Some(start) => self.end_epoch().map(|end| end - start),
105            None => None,
106        }
107    }
108
109    /// Returns the number of measurements in this data arc
110    pub fn len(&self) -> usize {
111        self.measurements.len()
112    }
113
114    /// Returns whether this arc has no measurements.
115    pub fn is_empty(&self) -> bool {
116        self.measurements.is_empty()
117    }
118
119    /// Returns the minimum duration between two subsequent measurements.
120    pub fn min_duration_sep(&self) -> Option<Duration> {
121        if self.is_empty() {
122            None
123        } else {
124            let mut min_sep = Duration::MAX;
125            let mut prev_epoch = self.start_epoch().unwrap();
126            for (epoch, _) in self.measurements.iter().skip(1) {
127                let this_sep = *epoch - prev_epoch;
128                min_sep = min_sep.min(this_sep);
129                prev_epoch = *epoch;
130            }
131            Some(min_sep)
132        }
133    }
134    /// Set (or overwrites) the modulus of the provided measurement type.
135    pub fn set_moduli(&mut self, msr_type: MeasurementType, modulus: f64) {
136        if modulus.is_nan() || modulus.abs() < f64::EPSILON {
137            warn!("cannot set modulus for {msr_type:?} to {modulus}");
138            return;
139        }
140        if self.moduli.is_none() {
141            self.moduli = Some(IndexMap::new());
142        }
143
144        self.moduli.as_mut().unwrap().insert(msr_type, modulus);
145    }
146
147    /// Applies the moduli to each measurement, if defined.
148    pub fn apply_moduli(&mut self) {
149        if let Some(moduli) = &self.moduli {
150            for msr in self.measurements.values_mut() {
151                for (msr_type, modulus) in moduli {
152                    if let Some(msr_value) = msr.data.get_mut(msr_type) {
153                        *msr_value %= *modulus;
154                    }
155                }
156            }
157        }
158    }
159
160    /// Downsamples the tracking data to a lower frequency using a simple moving average low-pass filter followed by decimation,
161    /// returning new `TrackingDataArc` with downsampled measurements.
162    ///
163    /// It provides a computationally efficient approach to reduce the sampling rate while mitigating aliasing effects.
164    ///
165    /// # Algorithm
166    ///
167    /// 1. A simple moving average filter is applied as a low-pass filter.
168    /// 2. Decimation is performed by selecting every Nth sample after filtering.
169    ///
170    /// # Advantages
171    ///
172    /// - Computationally efficient, suitable for large datasets common in spaceflight applications.
173    /// - Provides basic anti-aliasing, crucial for preserving signal integrity in orbit determination and tracking.
174    /// - Maintains phase information, important for accurate timing in spacecraft state estimation.
175    ///
176    /// # Limitations
177    ///
178    /// - The frequency response is not as sharp as more sophisticated filters (e.g., FIR, IIR).
179    /// - May not provide optimal stopband attenuation for high-precision applications.
180    ///
181    /// ## Considerations for Spaceflight Applications
182    ///
183    /// - Suitable for initial data reduction in ground station tracking pipelines.
184    /// - Adequate for many orbit determination and tracking tasks where computational speed is prioritized.
185    /// - For high-precision applications (e.g., interplanetary navigation), consider using more advanced filtering techniques.
186    ///
187    /// :type target_step: Duration
188    /// :rtype: Self
189    pub fn downsample(&self, target_step: Duration) -> Self {
190        if self.is_empty() {
191            return self.clone();
192        }
193        let current_step = self.min_duration_sep().unwrap();
194
195        if current_step >= target_step {
196            warn!(
197                "cannot downsample tracking data from {current_step} to {target_step} (that would be upsampling)"
198            );
199            return self.clone();
200        }
201
202        let current_hz = 1.0 / current_step.to_seconds();
203        let target_hz = 1.0 / target_step.to_seconds();
204
205        // Simple moving average as low-pass filter
206        let window_size = (current_hz / target_hz).round() as usize;
207
208        info!(
209            "downsampling tracking data from {current_step} ({current_hz:.6} Hz) to {target_step} ({target_hz:.6} Hz) (N = {window_size})"
210        );
211
212        let mut result = TrackingDataArc {
213            source: self.source.clone(),
214            ..Default::default()
215        };
216
217        let measurements: Vec<_> = self.measurements.iter().collect();
218
219        for (i, (epoch, _)) in measurements.iter().enumerate().step_by(window_size) {
220            let start = i.saturating_sub(window_size / 2);
221            let end = (i + window_size / 2 + 1).min(measurements.len());
222            let window = &measurements[start..end];
223
224            let mut filtered_measurement = Measurement {
225                tracker: window[0].1.tracker.clone(),
226                epoch: **epoch,
227                data: IndexMap::new(),
228                rejected: false,
229            };
230
231            // Apply moving average filter for each measurement type
232            for mtype in self.unique_types() {
233                let sum: f64 = window.iter().filter_map(|(_, m)| m.data.get(&mtype)).sum();
234                let count = window
235                    .iter()
236                    .filter(|(_, m)| m.data.contains_key(&mtype))
237                    .count();
238
239                if count > 0 {
240                    filtered_measurement.data.insert(mtype, sum / count as f64);
241                }
242            }
243
244            result.measurements.insert(**epoch, filtered_measurement);
245        }
246        result
247    }
248
249    /// Splits a long tracking data arc into smaller chunks, each up to `max_duration` long.
250    /// This is inspired by JPL MONTE's long arc setup to ensure BLSE convergence on manageable chunks.
251    pub fn chunk(&self, max_duration: Duration) -> Vec<TrackingDataArc> {
252        let mut chunks = Vec::new();
253        if self.is_empty() {
254            return chunks;
255        }
256
257        let end_epoch = self.end_epoch().unwrap();
258        let mut chunk_start = self.start_epoch().unwrap();
259
260        while chunk_start <= end_epoch {
261            let chunk_end = chunk_start + max_duration;
262            let sub_arc = self.clone().filter_by_epoch(chunk_start..chunk_end);
263            if !sub_arc.is_empty() {
264                chunks.push(sub_arc);
265            }
266            // Move start to the next measurement's epoch after chunk_end
267            let mut found_next = false;
268            for (epoch, _) in self.measurements.range(chunk_end..) {
269                if *epoch >= chunk_end {
270                    chunk_start = *epoch;
271                    found_next = true;
272                    break;
273                }
274            }
275            if !found_next {
276                break;
277            }
278        }
279
280        chunks
281    }
282}
283
284impl TrackingDataArc {
285    /// Returns the unique list of aliases in this tracking data arc
286    pub fn unique_aliases(&self) -> IndexSet<String> {
287        self.unique().0
288    }
289
290    /// Returns the unique measurement types in this tracking data arc
291    pub fn unique_types(&self) -> IndexSet<MeasurementType> {
292        self.unique().1
293    }
294
295    /// Returns the unique trackers and unique measurement types in this data arc
296    pub fn unique(&self) -> (IndexSet<String>, IndexSet<MeasurementType>) {
297        let mut aliases = IndexSet::new();
298        let mut types = IndexSet::new();
299        for msr in self.measurements.values() {
300            aliases.insert(msr.tracker.clone());
301            for k in msr.data.keys() {
302                types.insert(*k);
303            }
304        }
305        (aliases, types)
306    }
307
308    /// Returns a new tracking arc that only contains measurements that fall within the given epoch range.
309    pub fn filter_by_epoch<R: RangeBounds<Epoch>>(mut self, bound: R) -> Self {
310        self.measurements = self
311            .measurements
312            .range(bound)
313            .map(|(epoch, msr)| (*epoch, msr.clone()))
314            .collect::<BTreeMap<Epoch, Measurement>>();
315        self
316    }
317
318    /// Returns a new tracking arc that only contains measurements that fall within the given offset from the first epoch.
319    /// For example, a bound of 30.minutes()..90.minutes() will only read measurements from the start of the arc + 30 minutes until start + 90 minutes.
320    pub fn filter_by_offset<R: RangeBounds<Duration>>(self, bound: R) -> Self {
321        if self.is_empty() {
322            return self;
323        }
324        // Rebuild an epoch bound.
325        let start = match bound.start_bound() {
326            Unbounded => self.start_epoch().unwrap(),
327            Included(offset) | Excluded(offset) => self.start_epoch().unwrap() + *offset,
328        };
329
330        let end = match bound.end_bound() {
331            Unbounded => self.end_epoch().unwrap(),
332            Included(offset) | Excluded(offset) => self.start_epoch().unwrap() + *offset,
333        };
334
335        self.filter_by_epoch(start..end)
336    }
337
338    /// Returns a new tracking arc that only contains measurements from the desired tracker.
339    pub fn filter_by_tracker(mut self, tracker: String) -> Self {
340        self.measurements = self
341            .measurements
342            .iter()
343            .filter_map(|(epoch, msr)| {
344                if msr.tracker == tracker {
345                    Some((*epoch, msr.clone()))
346                } else {
347                    None
348                }
349            })
350            .collect::<BTreeMap<Epoch, Measurement>>();
351        self
352    }
353
354    /// Returns a new tracking arc that only contains measurements of the provided type.
355    pub fn filter_by_measurement_type(mut self, included_type: MeasurementType) -> Self {
356        self.measurements.retain(|_epoch, msr| {
357            msr.data.retain(|msr_type, _| *msr_type == included_type);
358            !msr.data.is_empty()
359        });
360        self
361    }
362
363    /// Returns a new tracking arc that contains measurements from all trackers except the one provided
364    pub fn exclude_tracker(mut self, excluded_tracker: String) -> Self {
365        self.measurements = self
366            .measurements
367            .iter()
368            .filter_map(|(epoch, msr)| {
369                if msr.tracker != excluded_tracker {
370                    Some((*epoch, msr.clone()))
371                } else {
372                    None
373                }
374            })
375            .collect::<BTreeMap<Epoch, Measurement>>();
376        self
377    }
378
379    /// Returns a new tracking arc that excludes measurements within the given epoch range.
380    pub fn exclude_by_epoch<R: RangeBounds<Epoch>>(mut self, bound: R) -> Self {
381        info!(
382            "Excluding measurements from {:?} to {:?}",
383            bound.start_bound(),
384            bound.end_bound()
385        );
386
387        let mut new_measurements = BTreeMap::new();
388
389        // Include entries before the excluded range
390        new_measurements.extend(
391            self.measurements
392                .range((Bound::Unbounded, bound.start_bound()))
393                .map(|(epoch, msr)| (*epoch, msr.clone())),
394        );
395
396        // Include entries after the excluded range
397        new_measurements.extend(
398            self.measurements
399                .range((bound.end_bound(), Bound::Unbounded))
400                .map(|(epoch, msr)| (*epoch, msr.clone())),
401        );
402
403        self.measurements = new_measurements;
404        self
405    }
406
407    /// Returns a new tracking arc that contains measurements from all trackers except the one provided
408    pub fn exclude_measurement_type(mut self, excluded_type: MeasurementType) -> Self {
409        self.measurements = self
410            .measurements
411            .iter_mut()
412            .map(|(epoch, msr)| {
413                msr.data.retain(|msr_type, _| *msr_type != excluded_type);
414
415                (*epoch, msr.clone())
416            })
417            .collect::<BTreeMap<Epoch, Measurement>>();
418        self
419    }
420
421    /// Marks measurements within the given epoch range as rejected.
422    pub fn reject_by_epoch<R: RangeBounds<Epoch>>(mut self, bound: R) -> Self {
423        for (_epoch, msr) in self.measurements.range_mut(bound) {
424            msr.rejected = true;
425        }
426        self
427    }
428
429    /// Marks measurements from the provided tracker as rejected.
430    pub fn reject_by_tracker(mut self, tracker: String) -> Self {
431        for msr in self.measurements.values_mut() {
432            if msr.tracker == tracker {
433                msr.rejected = true;
434            }
435        }
436        self
437    }
438
439    pub fn resid_vs_ref_check(mut self) -> Self {
440        self.force_reject = true;
441        self
442    }
443}
444
445impl fmt::Display for TrackingDataArc {
446    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
447        if self.is_empty() {
448            write!(f, "Empty tracking arc")
449        } else {
450            let start = self.start_epoch().unwrap();
451            let end = self.end_epoch().unwrap();
452            let src = match &self.source {
453                Some(src) => format!(" (source: {src})"),
454                None => String::new(),
455            };
456            write!(
457                f,
458                "Tracking arc with {} measurements of type {:?} over {} (from {start} to {end}) with trackers {:?}{src}",
459                self.len(),
460                self.unique_types(),
461                end - start,
462                self.unique_aliases()
463            )
464        }
465    }
466}
467
468impl fmt::Debug for TrackingDataArc {
469    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
470        write!(f, "{self} @ {self:p}")
471    }
472}
473
474impl PartialEq for TrackingDataArc {
475    fn eq(&self, other: &Self) -> bool {
476        self.measurements == other.measurements
477    }
478}
479
480impl Add for TrackingDataArc {
481    type Output = Self;
482
483    fn add(mut self, rhs: Self) -> Self::Output {
484        self.force_reject = false;
485        for (epoch, msr) in rhs.measurements {
486            if let Entry::Vacant(e) = self.measurements.entry(epoch) {
487                e.insert(msr);
488            } else {
489                error!("merging tracking data with overlapping epoch is not supported");
490            }
491        }
492
493        self
494    }
495}
496
497impl AddAssign for TrackingDataArc {
498    fn add_assign(&mut self, rhs: Self) {
499        *self = self.clone() + rhs;
500    }
501}