nyx_space/od/msr/trackingdata/
io_ccsds_tdm.rs1use crate::io::ExportCfg;
20use crate::io::watermark::prj_name_ver;
21use crate::io::{InputOutputError, StdIOSnafu};
22use crate::od::msr::{Measurement, MeasurementType};
23use anise::constants::SPEED_OF_LIGHT_KM_S;
24use hifitime::efmt::{Format, Formatter};
25use hifitime::{Duration, Epoch, TimeScale};
26use indexmap::{IndexMap, IndexSet};
27use log::{error, info, warn};
28use snafu::ResultExt;
29use std::collections::{BTreeMap, HashMap};
30use std::fs::File;
31use std::io::Write;
32use std::io::{BufRead, BufReader, BufWriter};
33use std::path::{Path, PathBuf};
34use std::str::FromStr;
35
36use super::TrackingDataArc;
37
38impl TrackingDataArc {
39 pub fn from_tdm<P: AsRef<Path>>(
87 path: P,
88 aliases: Option<HashMap<String, String>>,
89 ) -> Result<Self, InputOutputError> {
90 let file = File::open(&path).context(StdIOSnafu {
91 action: "opening CCSDS TDM file for tracking arc",
92 })?;
93
94 let source = path.as_ref().to_path_buf().display().to_string();
95 info!("parsing CCSDS TDM {source}");
96
97 let mut measurements = BTreeMap::new();
98 let mut metadata = HashMap::new();
99
100 let reader = BufReader::new(file);
101
102 let mut in_data_section = false;
103 let mut current_tracker = String::new();
104 let mut time_system = TimeScale::UTC;
105 let mut has_freq_data = false;
106 let mut msr_divider = 1.0;
107
108 for line in reader.lines() {
109 let line = line.context(StdIOSnafu {
110 action: "reading CCSDS TDM file",
111 })?;
112 let line = line.trim();
113
114 if line == "DATA_START" {
115 in_data_section = true;
116 continue;
117 } else if line == "DATA_STOP" {
118 in_data_section = false;
119 }
120
121 if !in_data_section {
122 if line.starts_with("PARTICIPANT_1") {
123 current_tracker = line.split('=').nth(1).unwrap_or("").trim().to_string();
124 if let Some(aliases) = &aliases
126 && let Some(alias) = aliases.get(¤t_tracker)
127 {
128 current_tracker = alias.clone();
129 }
130 } else if line.starts_with("TIME_SYSTEM") {
131 let ts = line.split('=').nth(1).unwrap_or("UTC").trim();
132 if let Ok(ts) = TimeScale::from_str(ts) {
134 time_system = ts;
135 } else {
136 return Err(InputOutputError::UnsupportedData {
137 which: format!("time scale `{ts}` not supported"),
138 });
139 }
140 } else if line.starts_with("PATH") {
141 match line.split(",").count() {
142 2 => msr_divider = 1.0,
143 3 => msr_divider = 2.0,
144 cnt => {
145 return Err(InputOutputError::UnsupportedData {
146 which: format!(
147 "found {cnt} paths in TDM, only 1 or 2 are supported"
148 ),
149 });
150 }
151 }
152 }
153
154 let mut splt = line.split('=');
155 if let Some(keyword) = splt.nth(0) {
156 if let Some(value) = splt.nth(0) {
158 metadata.insert(keyword.trim().to_string(), value.trim().to_string());
159 }
160 }
161
162 continue;
163 }
164
165 if let Some((mtype, epoch, value)) = parse_measurement_line(line, time_system)? {
166 if [
167 MeasurementType::ReceiveFrequency,
168 MeasurementType::TransmitFrequency,
169 MeasurementType::TransmitFrequencyRate,
170 ]
171 .contains(&mtype)
172 {
173 has_freq_data = true;
174 msr_divider = 1.0;
176 }
177 measurements
178 .entry(epoch)
179 .or_insert_with(|| Measurement {
180 tracker: current_tracker.clone(),
181 epoch,
182 data: IndexMap::new(),
183 rejected: false,
184 })
185 .data
186 .insert(mtype, value / msr_divider);
187 }
188 }
189
190 let mut turnaround_ratio = None;
191 let drop_freq_data;
192 if has_freq_data {
193 if let Some(ta_num_str) = metadata.get("TURNAROUND_NUMERATOR") {
195 if let Some(ta_denom_str) = metadata.get("TURNAROUND_DENOMINATOR") {
196 if let Ok(ta_num) = ta_num_str.parse::<i32>() {
197 if let Ok(ta_denom) = ta_denom_str.parse::<i32>() {
198 turnaround_ratio = Some(f64::from(ta_num) / f64::from(ta_denom));
200 info!("turn-around ratio is {ta_num}/{ta_denom}");
201 drop_freq_data = false;
202 } else {
203 error!(
204 "turn-around denominator `{ta_denom_str}` is not a valid integer"
205 );
206 drop_freq_data = true;
207 }
208 } else {
209 error!("turn-around numerator `{ta_num_str}` is not a valid integer");
210 drop_freq_data = true;
211 }
212 } else {
213 error!(
214 "required turn-around denominator missing from metadata -- dropping ALL RECEIVE/TRANSMIT data"
215 );
216 drop_freq_data = true;
217 }
218 } else {
219 error!(
220 "required turn-around numerator missing from metadata -- dropping ALL RECEIVE/TRANSMIT data"
221 );
222 drop_freq_data = true;
223 }
224 } else {
225 drop_freq_data = true;
226 }
227
228 let corrections_applied = if let Some(corr_flag) = metadata.get("CORRECTIONS_APPLIED") {
229 match corr_flag.trim().to_lowercase().as_str() {
230 "no" => false,
231 "yes" => true,
232 _ => {
233 warn!("invalid CORRECTIONS_APPLIED `{corr_flag}`");
234 true
235 }
236 }
237 } else {
238 true
239 };
240
241 let mut freq_types = IndexSet::new();
244 freq_types.insert(MeasurementType::ReceiveFrequency);
245 freq_types.insert(MeasurementType::TransmitFrequency);
246 freq_types.insert(MeasurementType::TransmitFrequencyRate);
247
248 let mut latest_transmit_freq = None;
249 let mut latest_transmit_epoch = None;
250 let mut latest_transmit_rate = 0.0;
251
252 let mut all_applied_corrections = IndexSet::new();
253
254 for (epoch, measurement) in measurements.iter_mut() {
255 if !corrections_applied {
257 for msr_type in [
258 MeasurementType::Range,
259 MeasurementType::Doppler,
260 MeasurementType::Azimuth,
261 MeasurementType::Elevation,
262 MeasurementType::ReceiveFrequency,
263 MeasurementType::TransmitFrequency,
264 MeasurementType::TransmitFrequencyRate,
265 ] {
266 let kw = format!("CORRECTION_{}", msr_type.ccsds_tdm_name());
267 if let Some(correction_str) = metadata.get(&kw) {
268 if let Ok(correction) = correction_str.parse::<f64>() {
269 measurement.correct(msr_type, correction);
270 all_applied_corrections.insert(msr_type);
271 } else {
272 warn!("invalid correction value for {kw}");
273 }
274 }
275 }
276 }
277
278 if drop_freq_data {
279 for freq in &freq_types {
280 measurement.data.swap_remove(freq);
281 }
282 continue;
283 }
284
285 if let Some(rate) = measurement
287 .data
288 .get(&MeasurementType::TransmitFrequencyRate)
289 {
290 if let (Some(last_f), Some(last_e)) = (latest_transmit_freq, latest_transmit_epoch)
291 {
292 let dt: Duration = *epoch - last_e;
293 latest_transmit_freq = Some(last_f + latest_transmit_rate * dt.to_seconds());
294 }
295 latest_transmit_epoch = Some(*epoch);
296 latest_transmit_rate = *rate;
297 }
298
299 if let Some(freq) = measurement.data.get(&MeasurementType::TransmitFrequency) {
300 latest_transmit_freq = Some(*freq);
301 latest_transmit_epoch = Some(*epoch);
302 }
303
304 if !measurement
305 .data
306 .contains_key(&MeasurementType::ReceiveFrequency)
307 {
308 for freq in &freq_types {
311 measurement.data.swap_remove(freq);
312 }
313 continue;
314 }
315
316 if latest_transmit_freq.is_none() {
318 warn!(
319 "receive frequency found at {epoch} but no transmit frequency was ever set, ignoring"
320 );
321 for freq in &freq_types {
322 measurement.data.swap_remove(freq);
323 }
324 continue;
325 }
326
327 let dt: Duration = *epoch - latest_transmit_epoch.unwrap();
328 let transmit_freq_hz =
329 latest_transmit_freq.unwrap() + latest_transmit_rate * dt.to_seconds();
330
331 let receive_freq_hz = *measurement
332 .data
333 .get(&MeasurementType::ReceiveFrequency)
334 .unwrap();
335
336 let doppler_shift_hz = transmit_freq_hz * turnaround_ratio.unwrap() - receive_freq_hz;
338 let rho_dot_km_s = (doppler_shift_hz * SPEED_OF_LIGHT_KM_S)
340 / (2.0 * transmit_freq_hz * turnaround_ratio.unwrap());
341
342 for freq in &freq_types {
344 measurement.data.swap_remove(freq);
345 }
346 measurement
347 .data
348 .insert(MeasurementType::Doppler, rho_dot_km_s);
349 }
350
351 if !all_applied_corrections.is_empty() {
352 info!("applied corrections for {all_applied_corrections:?}");
353 }
354
355 let moduli = if let Some(range_modulus) = metadata.get("RANGE_MODULUS") {
356 if let Ok(value) = range_modulus.parse::<f64>() {
357 if value > 0.0 {
358 let mut modulos = IndexMap::new();
359 modulos.insert(MeasurementType::Range, value);
360 Some(modulos)
362 } else {
363 None
365 }
366 } else {
367 warn!("could not parse RANGE_MODULUS of `{range_modulus}` as a double");
368 None
369 }
370 } else {
371 None
372 };
373
374 measurements.retain(|_, m| !m.data.is_empty());
376
377 let trk = Self {
378 measurements,
379 source: Some(source),
380 moduli,
381 force_reject: false,
382 };
383
384 if trk.unique_types().is_empty() {
385 Err(InputOutputError::EmptyDataset {
386 action: "CCSDS TDM file",
387 })
388 } else {
389 Ok(trk)
390 }
391 }
392
393 pub fn to_tdm_file<P: AsRef<Path>>(
395 mut self,
396 spacecraft_name: String,
397 aliases: Option<HashMap<String, String>>,
398 path: P,
399 cfg: ExportCfg,
400 ) -> Result<PathBuf, InputOutputError> {
401 if self.is_empty() {
402 return Err(InputOutputError::MissingData {
403 which: " - empty tracking data cannot be exported to TDM".to_string(),
404 });
405 }
406
407 if let Some(start_epoch) = cfg.start_epoch {
409 if let Some(end_epoch) = cfg.end_epoch {
410 self = self.filter_by_epoch(start_epoch..end_epoch);
411 } else {
412 self = self.filter_by_epoch(start_epoch..);
413 }
414 } else if let Some(end_epoch) = cfg.end_epoch {
415 self = self.filter_by_epoch(..end_epoch);
416 }
417
418 let tick = Epoch::now().unwrap();
419 info!("Exporting tracking data to CCSDS TDM file...");
420
421 let path_buf = cfg.actual_path(path);
423
424 let metadata = cfg.metadata.unwrap_or_default();
425
426 let file = File::create(&path_buf).context(StdIOSnafu {
427 action: "creating CCSDS TDM file for tracking arc",
428 })?;
429 let mut writer = BufWriter::new(file);
430
431 let err_hdlr = |source| InputOutputError::StdIOError {
432 source,
433 action: "writing data to TDM file",
434 };
435
436 let iso8601_no_ts = Format::from_str("%Y-%m-%dT%H:%M:%S.%f").unwrap();
438
439 writeln!(writer, "CCSDS_TDM_VERS = 2.0").map_err(err_hdlr)?;
441 writeln!(
442 writer,
443 "\nCOMMENT Build by {} -- https://nyxspace.com",
444 prj_name_ver()
445 )
446 .map_err(err_hdlr)?;
447 writeln!(
448 writer,
449 "COMMENT Nyx Space provided under the AGPL v3 open source license -- https://nyxspace.com/pricing\n"
450 )
451 .map_err(err_hdlr)?;
452 writeln!(
453 writer,
454 "CREATION_DATE = {}",
455 Formatter::new(Epoch::now().unwrap(), iso8601_no_ts)
456 )
457 .map_err(err_hdlr)?;
458 writeln!(
459 writer,
460 "ORIGINATOR = {}\n",
461 metadata
462 .get("originator")
463 .unwrap_or(&"Nyx Space".to_string())
464 )
465 .map_err(err_hdlr)?;
466
467 let trackers = self.unique_aliases();
470
471 for tracker in trackers {
472 let tracker_data = self.clone().filter_by_tracker(tracker.clone());
473
474 let types = tracker_data.unique_types();
475
476 let two_way_types = types
477 .iter()
478 .filter(|msr_type| msr_type.may_be_two_way())
479 .copied()
480 .collect::<Vec<_>>();
481
482 let one_way_types = types
483 .iter()
484 .filter(|msr_type| !msr_type.may_be_two_way())
485 .copied()
486 .collect::<Vec<_>>();
487
488 for (tno, types) in [two_way_types, one_way_types].iter().enumerate() {
490 writeln!(writer, "META_START").map_err(err_hdlr)?;
491 writeln!(writer, "\tTIME_SYSTEM = UTC").map_err(err_hdlr)?;
492 writeln!(
493 writer,
494 "\tSTART_TIME = {}",
495 Formatter::new(tracker_data.start_epoch().unwrap(), iso8601_no_ts)
496 )
497 .map_err(err_hdlr)?;
498 writeln!(
499 writer,
500 "\tSTOP_TIME = {}",
501 Formatter::new(tracker_data.end_epoch().unwrap(), iso8601_no_ts)
502 )
503 .map_err(err_hdlr)?;
504
505 let multiplier = if tno == 0 {
506 writeln!(writer, "\tPATH = 1,2,1").map_err(err_hdlr)?;
507 2.0
508 } else {
509 writeln!(writer, "\tPATH = 1,2").map_err(err_hdlr)?;
510 1.0
511 };
512
513 writeln!(
514 writer,
515 "\tPARTICIPANT_1 = {}",
516 if let Some(aliases) = &aliases {
517 if let Some(alias) = aliases.get(&tracker) {
518 alias
519 } else {
520 &tracker
521 }
522 } else {
523 &tracker
524 }
525 )
526 .map_err(err_hdlr)?;
527
528 writeln!(writer, "\tPARTICIPANT_2 = {spacecraft_name}").map_err(err_hdlr)?;
529
530 writeln!(writer, "\tMODE = SEQUENTIAL").map_err(err_hdlr)?;
531
532 for (k, v) in &metadata {
534 if k != "originator" {
535 writeln!(writer, "\t{k} = {v}").map_err(err_hdlr)?;
536 }
537 }
538
539 if types.contains(&MeasurementType::Range) {
540 writeln!(writer, "\tRANGE_UNITS = km").map_err(err_hdlr)?;
541
542 if let Some(moduli) = &self.moduli
543 && let Some(range_modulus) = moduli.get(&MeasurementType::Range)
544 {
545 writeln!(writer, "\tRANGE_MODULUS = {range_modulus:E}")
546 .map_err(err_hdlr)?;
547 }
548 }
549
550 if types.contains(&MeasurementType::Azimuth)
551 || types.contains(&MeasurementType::Elevation)
552 {
553 writeln!(writer, "\tANGLE_TYPE = AZEL").map_err(err_hdlr)?;
554 }
555
556 writeln!(writer, "META_STOP\n").map_err(err_hdlr)?;
557
558 writeln!(writer, "DATA_START").map_err(err_hdlr)?;
560
561 for (epoch, measurement) in &tracker_data.measurements {
563 for (mtype, value) in &measurement.data {
564 if !types.contains(mtype) {
565 continue;
566 }
567
568 writeln!(
569 writer,
570 "\t{:<20} = {:<23}\t{:.12}",
571 mtype.ccsds_tdm_name(),
572 Formatter::new(*epoch, iso8601_no_ts),
573 value * multiplier
574 )
575 .map_err(err_hdlr)?;
576 }
577 }
578
579 writeln!(writer, "DATA_STOP\n").map_err(err_hdlr)?;
580 }
581 }
582
583 #[allow(clippy::writeln_empty_string)]
584 writeln!(writer, "").map_err(err_hdlr)?;
585
586 let tock_time = Epoch::now().unwrap() - tick;
588 info!("CCSDS TDM written to {} in {tock_time}", path_buf.display());
589 Ok(path_buf)
590 }
591}
592
593fn parse_measurement_line(
594 line: &str,
595 time_system: TimeScale,
596) -> Result<Option<(MeasurementType, Epoch, f64)>, InputOutputError> {
597 let parts: Vec<&str> = line.split('=').collect();
598 if parts.len() != 2 {
599 return Ok(None);
600 }
601
602 let (mtype_str, data) = (parts[0].trim(), parts[1].trim());
603 let mtype = match mtype_str {
604 "RANGE" => MeasurementType::Range,
605 "DOPPLER_INSTANTANEOUS" | "DOPPLER_INTEGRATED" => MeasurementType::Doppler,
606 "ANGLE_1" => MeasurementType::Azimuth,
607 "ANGLE_2" => MeasurementType::Elevation,
608 "RECEIVE_FREQ" | "RECEIVE_FREQ_1" | "RECEIVE_FREQ_2" | "RECEIVE_FREQ_3"
609 | "RECEIVE_FREQ_4" | "RECEIVE_FREQ_5" => MeasurementType::ReceiveFrequency,
610 "TRANSMIT_FREQ" | "TRANSMIT_FREQ_1" | "TRANSMIT_FREQ_2" | "TRANSMIT_FREQ_3"
611 | "TRANSMIT_FREQ_4" | "TRANSMIT_FREQ_5" => MeasurementType::TransmitFrequency,
612 "TRANSMIT_FREQ_RATE"
613 | "TRANSMIT_FREQ_RATE_1"
614 | "TRANSMIT_FREQ_RATE_2"
615 | "TRANSMIT_FREQ_RATE_3"
616 | "TRANSMIT_FREQ_RATE_4"
617 | "TRANSMIT_FREQ_RATE_5" => MeasurementType::TransmitFrequencyRate,
618 _ => {
619 return Err(InputOutputError::UnsupportedData {
620 which: mtype_str.to_string(),
621 });
622 }
623 };
624
625 let data_parts: Vec<&str> = data.split_whitespace().collect();
626 if data_parts.len() != 2 {
627 return Ok(None);
628 }
629
630 let epoch =
631 Epoch::from_gregorian_str(&format!("{} {time_system}", data_parts[0])).map_err(|e| {
632 InputOutputError::Inconsistency {
633 msg: format!("{e} when parsing epoch"),
634 }
635 })?;
636
637 let value = data_parts[1]
638 .parse::<f64>()
639 .map_err(|e| InputOutputError::UnsupportedData {
640 which: format!("`{}` is not a float: {e}", data_parts[1]),
641 })?;
642
643 Ok(Some((mtype, epoch, value)))
644}