nyx_space/od/msr/trackingdata/
mod.rs1use super::{MeasurementType, measurement::Measurement};
19use core::fmt;
20use hifitime::prelude::{Duration, Epoch};
21use indexmap::{IndexMap, IndexSet};
22use log::{error, info, warn};
23use std::collections::BTreeMap;
24use std::collections::btree_map::Entry;
25use std::ops::Bound::{self, Excluded, Included, Unbounded};
26use std::ops::{Add, AddAssign, RangeBounds};
27
28mod io_ccsds_tdm;
29mod io_parquet;
30
31#[cfg(feature = "python")]
32use pyo3::prelude::*;
33#[cfg(feature = "python")]
34mod python;
35
36#[derive(Clone, Default)]
77#[cfg_attr(feature = "python", pyclass(from_py_object))]
78pub struct TrackingDataArc {
79 pub measurements: BTreeMap<Epoch, Measurement>, pub source: Option<String>,
83 pub moduli: Option<IndexMap<MeasurementType, f64>>,
85 pub force_reject: bool,
87}
88
89#[cfg_attr(feature = "python", pymethods)]
90impl TrackingDataArc {
91 pub fn start_epoch(&self) -> Option<Epoch> {
93 self.measurements.first_key_value().map(|(k, _)| *k)
94 }
95
96 pub fn end_epoch(&self) -> Option<Epoch> {
98 self.measurements.last_key_value().map(|(k, _)| *k)
99 }
100
101 pub fn duration(&self) -> Option<Duration> {
103 match self.start_epoch() {
104 Some(start) => self.end_epoch().map(|end| end - start),
105 None => None,
106 }
107 }
108
109 pub fn len(&self) -> usize {
111 self.measurements.len()
112 }
113
114 pub fn is_empty(&self) -> bool {
116 self.measurements.is_empty()
117 }
118
119 pub fn min_duration_sep(&self) -> Option<Duration> {
121 if self.is_empty() {
122 None
123 } else {
124 let mut min_sep = Duration::MAX;
125 let mut prev_epoch = self.start_epoch().unwrap();
126 for (epoch, _) in self.measurements.iter().skip(1) {
127 let this_sep = *epoch - prev_epoch;
128 min_sep = min_sep.min(this_sep);
129 prev_epoch = *epoch;
130 }
131 Some(min_sep)
132 }
133 }
134 pub fn set_moduli(&mut self, msr_type: MeasurementType, modulus: f64) {
136 if modulus.is_nan() || modulus.abs() < f64::EPSILON {
137 warn!("cannot set modulus for {msr_type:?} to {modulus}");
138 return;
139 }
140 if self.moduli.is_none() {
141 self.moduli = Some(IndexMap::new());
142 }
143
144 self.moduli.as_mut().unwrap().insert(msr_type, modulus);
145 }
146
147 pub fn apply_moduli(&mut self) {
149 if let Some(moduli) = &self.moduli {
150 for msr in self.measurements.values_mut() {
151 for (msr_type, modulus) in moduli {
152 if let Some(msr_value) = msr.data.get_mut(msr_type) {
153 *msr_value %= *modulus;
154 }
155 }
156 }
157 }
158 }
159
160 pub fn downsample(&self, target_step: Duration) -> Self {
190 if self.is_empty() {
191 return self.clone();
192 }
193 let current_step = self.min_duration_sep().unwrap();
194
195 if current_step >= target_step {
196 warn!(
197 "cannot downsample tracking data from {current_step} to {target_step} (that would be upsampling)"
198 );
199 return self.clone();
200 }
201
202 let current_hz = 1.0 / current_step.to_seconds();
203 let target_hz = 1.0 / target_step.to_seconds();
204
205 let window_size = (current_hz / target_hz).round() as usize;
207
208 info!(
209 "downsampling tracking data from {current_step} ({current_hz:.6} Hz) to {target_step} ({target_hz:.6} Hz) (N = {window_size})"
210 );
211
212 let mut result = TrackingDataArc {
213 source: self.source.clone(),
214 ..Default::default()
215 };
216
217 let measurements: Vec<_> = self.measurements.iter().collect();
218
219 for (i, (epoch, _)) in measurements.iter().enumerate().step_by(window_size) {
220 let start = i.saturating_sub(window_size / 2);
221 let end = (i + window_size / 2 + 1).min(measurements.len());
222 let window = &measurements[start..end];
223
224 let mut filtered_measurement = Measurement {
225 tracker: window[0].1.tracker.clone(),
226 epoch: **epoch,
227 data: IndexMap::new(),
228 rejected: false,
229 };
230
231 for mtype in self.unique_types() {
233 let sum: f64 = window.iter().filter_map(|(_, m)| m.data.get(&mtype)).sum();
234 let count = window
235 .iter()
236 .filter(|(_, m)| m.data.contains_key(&mtype))
237 .count();
238
239 if count > 0 {
240 filtered_measurement.data.insert(mtype, sum / count as f64);
241 }
242 }
243
244 result.measurements.insert(**epoch, filtered_measurement);
245 }
246 result
247 }
248
249 pub fn chunk(&self, max_duration: Duration) -> Vec<TrackingDataArc> {
252 let mut chunks = Vec::new();
253 if self.is_empty() {
254 return chunks;
255 }
256
257 let end_epoch = self.end_epoch().unwrap();
258 let mut chunk_start = self.start_epoch().unwrap();
259
260 while chunk_start <= end_epoch {
261 let chunk_end = chunk_start + max_duration;
262 let sub_arc = self.clone().filter_by_epoch(chunk_start..chunk_end);
263 if !sub_arc.is_empty() {
264 chunks.push(sub_arc);
265 }
266 let mut found_next = false;
268 for (epoch, _) in self.measurements.range(chunk_end..) {
269 if *epoch >= chunk_end {
270 chunk_start = *epoch;
271 found_next = true;
272 break;
273 }
274 }
275 if !found_next {
276 break;
277 }
278 }
279
280 chunks
281 }
282}
283
284impl TrackingDataArc {
285 pub fn unique_aliases(&self) -> IndexSet<String> {
287 self.unique().0
288 }
289
290 pub fn unique_types(&self) -> IndexSet<MeasurementType> {
292 self.unique().1
293 }
294
295 pub fn unique(&self) -> (IndexSet<String>, IndexSet<MeasurementType>) {
297 let mut aliases = IndexSet::new();
298 let mut types = IndexSet::new();
299 for msr in self.measurements.values() {
300 aliases.insert(msr.tracker.clone());
301 for k in msr.data.keys() {
302 types.insert(*k);
303 }
304 }
305 (aliases, types)
306 }
307
308 pub fn filter_by_epoch<R: RangeBounds<Epoch>>(mut self, bound: R) -> Self {
310 self.measurements = self
311 .measurements
312 .range(bound)
313 .map(|(epoch, msr)| (*epoch, msr.clone()))
314 .collect::<BTreeMap<Epoch, Measurement>>();
315 self
316 }
317
318 pub fn filter_by_offset<R: RangeBounds<Duration>>(self, bound: R) -> Self {
321 if self.is_empty() {
322 return self;
323 }
324 let start = match bound.start_bound() {
326 Unbounded => self.start_epoch().unwrap(),
327 Included(offset) | Excluded(offset) => self.start_epoch().unwrap() + *offset,
328 };
329
330 let end = match bound.end_bound() {
331 Unbounded => self.end_epoch().unwrap(),
332 Included(offset) | Excluded(offset) => self.start_epoch().unwrap() + *offset,
333 };
334
335 self.filter_by_epoch(start..end)
336 }
337
338 pub fn filter_by_tracker(mut self, tracker: String) -> Self {
340 self.measurements = self
341 .measurements
342 .iter()
343 .filter_map(|(epoch, msr)| {
344 if msr.tracker == tracker {
345 Some((*epoch, msr.clone()))
346 } else {
347 None
348 }
349 })
350 .collect::<BTreeMap<Epoch, Measurement>>();
351 self
352 }
353
354 pub fn filter_by_measurement_type(mut self, included_type: MeasurementType) -> Self {
356 self.measurements.retain(|_epoch, msr| {
357 msr.data.retain(|msr_type, _| *msr_type == included_type);
358 !msr.data.is_empty()
359 });
360 self
361 }
362
363 pub fn exclude_tracker(mut self, excluded_tracker: String) -> Self {
365 self.measurements = self
366 .measurements
367 .iter()
368 .filter_map(|(epoch, msr)| {
369 if msr.tracker != excluded_tracker {
370 Some((*epoch, msr.clone()))
371 } else {
372 None
373 }
374 })
375 .collect::<BTreeMap<Epoch, Measurement>>();
376 self
377 }
378
379 pub fn exclude_by_epoch<R: RangeBounds<Epoch>>(mut self, bound: R) -> Self {
381 info!(
382 "Excluding measurements from {:?} to {:?}",
383 bound.start_bound(),
384 bound.end_bound()
385 );
386
387 let mut new_measurements = BTreeMap::new();
388
389 new_measurements.extend(
391 self.measurements
392 .range((Bound::Unbounded, bound.start_bound()))
393 .map(|(epoch, msr)| (*epoch, msr.clone())),
394 );
395
396 new_measurements.extend(
398 self.measurements
399 .range((bound.end_bound(), Bound::Unbounded))
400 .map(|(epoch, msr)| (*epoch, msr.clone())),
401 );
402
403 self.measurements = new_measurements;
404 self
405 }
406
407 pub fn exclude_measurement_type(mut self, excluded_type: MeasurementType) -> Self {
409 self.measurements = self
410 .measurements
411 .iter_mut()
412 .map(|(epoch, msr)| {
413 msr.data.retain(|msr_type, _| *msr_type != excluded_type);
414
415 (*epoch, msr.clone())
416 })
417 .collect::<BTreeMap<Epoch, Measurement>>();
418 self
419 }
420
421 pub fn reject_by_epoch<R: RangeBounds<Epoch>>(mut self, bound: R) -> Self {
423 for (_epoch, msr) in self.measurements.range_mut(bound) {
424 msr.rejected = true;
425 }
426 self
427 }
428
429 pub fn reject_by_tracker(mut self, tracker: String) -> Self {
431 for msr in self.measurements.values_mut() {
432 if msr.tracker == tracker {
433 msr.rejected = true;
434 }
435 }
436 self
437 }
438
439 pub fn resid_vs_ref_check(mut self) -> Self {
440 self.force_reject = true;
441 self
442 }
443}
444
445impl fmt::Display for TrackingDataArc {
446 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
447 if self.is_empty() {
448 write!(f, "Empty tracking arc")
449 } else {
450 let start = self.start_epoch().unwrap();
451 let end = self.end_epoch().unwrap();
452 let src = match &self.source {
453 Some(src) => format!(" (source: {src})"),
454 None => String::new(),
455 };
456 write!(
457 f,
458 "Tracking arc with {} measurements of type {:?} over {} (from {start} to {end}) with trackers {:?}{src}",
459 self.len(),
460 self.unique_types(),
461 end - start,
462 self.unique_aliases()
463 )
464 }
465 }
466}
467
468impl fmt::Debug for TrackingDataArc {
469 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
470 write!(f, "{self} @ {self:p}")
471 }
472}
473
474impl PartialEq for TrackingDataArc {
475 fn eq(&self, other: &Self) -> bool {
476 self.measurements == other.measurements
477 }
478}
479
480impl Add for TrackingDataArc {
481 type Output = Self;
482
483 fn add(mut self, rhs: Self) -> Self::Output {
484 self.force_reject = false;
485 for (epoch, msr) in rhs.measurements {
486 if let Entry::Vacant(e) = self.measurements.entry(epoch) {
487 e.insert(msr);
488 } else {
489 error!("merging tracking data with overlapping epoch is not supported");
490 }
491 }
492
493 self
494 }
495}
496
497impl AddAssign for TrackingDataArc {
498 fn add_assign(&mut self, rhs: Self) {
499 *self = self.clone() + rhs;
500 }
501}