1use crate::md::StateParameter;
20use crate::time::Epoch;
21use arrow::error::ArrowError;
22use log::debug;
23use parquet::errors::ParquetError;
24use snafu::prelude::*;
25pub(crate) mod watermark;
26use hifitime::Duration;
27use hifitime::prelude::{Format, Formatter};
28use serde::de::DeserializeOwned;
29use serde::{Deserialize, Deserializer};
30use serde::{Serialize, Serializer};
31use std::collections::{BTreeMap, HashMap};
32use std::fmt::Debug;
33use std::fs::File;
34use std::io::BufReader;
35use std::path::{Path, PathBuf};
36use std::str::FromStr;
37use typed_builder::TypedBuilder;
38
39pub mod gravity;
41
42use std::io;
43
44#[cfg(feature = "python")]
45use pyo3::prelude::*;
46#[cfg(feature = "python")]
47mod python;
48
49#[derive(Clone, Debug, Default, Serialize, Deserialize, TypedBuilder, PartialEq)]
51#[builder(doc)]
52#[cfg_attr(feature = "python", pyclass(from_py_object, eq))]
53pub struct ExportCfg {
54 #[builder(default, setter(strip_option))]
56 pub fields: Option<Vec<StateParameter>>,
57 #[builder(default, setter(strip_option))]
59 pub start_epoch: Option<Epoch>,
60 #[builder(default, setter(strip_option))]
62 pub end_epoch: Option<Epoch>,
63 #[builder(default, setter(strip_option))]
65 pub step: Option<Duration>,
66 #[builder(default, setter(strip_option))]
68 pub metadata: Option<HashMap<String, String>>,
69 #[builder(default)]
71 pub timestamp: bool,
72}
73
74impl ExportCfg {
75 pub fn from_metadata(metadata: Vec<(String, String)>) -> Self {
77 let mut me = ExportCfg {
78 metadata: Some(HashMap::new()),
79 ..Default::default()
80 };
81 for (k, v) in metadata {
82 me.metadata.as_mut().unwrap().insert(k, v);
83 }
84 me
85 }
86
87 pub fn timestamped() -> Self {
89 Self {
90 timestamp: true,
91 ..Default::default()
92 }
93 }
94
95 pub fn append_field(&mut self, field: StateParameter) {
96 if let Some(fields) = self.fields.as_mut() {
97 fields.push(field);
98 } else {
99 self.fields = Some(vec![field]);
100 }
101 }
102
103 pub(crate) fn actual_path<P: AsRef<Path>>(&self, path: P) -> PathBuf {
105 let mut path_buf = path.as_ref().to_path_buf();
106 if self.timestamp
107 && let Some(file_name) = path_buf.file_name()
108 && let Some(file_name_str) = file_name.to_str()
109 && let Some(extension) = path_buf.extension()
110 {
111 let stamp = Formatter::new(
112 Epoch::now().unwrap(),
113 Format::from_str("%Y-%m-%dT%H-%M-%S").unwrap(),
114 );
115 let ext = extension.to_str().unwrap();
116 let file_name = file_name_str.replace(&format!(".{ext}"), "");
117 let new_file_name = format!("{file_name}-{stamp}.{ext}");
118 path_buf.set_file_name(new_file_name);
119 };
120 path_buf
121 }
122}
123
124#[derive(Debug, Snafu)]
125#[snafu(visibility(pub(crate)))]
126pub enum ConfigError {
127 #[snafu(display("failed to read configuration file: {source}"))]
128 ReadError { source: io::Error },
129
130 #[snafu(display("failed to parse YAML configuration file: {source}"))]
131 ParseError { source: serde_yml::Error },
132
133 #[snafu(display("of invalid configuration: {msg}"))]
134 InvalidConfig { msg: String },
135}
136
137impl PartialEq for ConfigError {
138 fn eq(&self, _other: &Self) -> bool {
140 false
141 }
142}
143
144#[derive(Debug, Snafu)]
145#[snafu(visibility(pub(crate)))]
146pub enum InputOutputError {
147 #[snafu(display("{action} encountered i/o error: {source}"))]
148 StdIOError {
149 source: io::Error,
150 action: &'static str,
151 },
152 #[snafu(display("missing required data {which}"))]
153 MissingData { which: String },
154 #[snafu(display("unknown data `{which}`"))]
155 UnsupportedData { which: String },
156 #[snafu(display("{action} encountered a Parquet error: {source}"))]
157 ParquetError {
158 source: ParquetError,
159 action: &'static str,
160 },
161 #[snafu(display("inconsistency detected: {msg}"))]
162 Inconsistency { msg: String },
163 #[snafu(display("{action} encountered an Arrow error: {source}"))]
164 ArrowError {
165 source: ArrowError,
166 action: &'static str,
167 },
168 #[snafu(display("error parsing `{data}` as Dhall config: {err}"))]
169 ParseDhall { data: String, err: String },
170 #[snafu(display("error serializing {what} to Dhall: {err}"))]
171 SerializeDhall { what: String, err: String },
172 #[snafu(display("empty dataset error when (de)serializing {action}"))]
173 EmptyDataset { action: &'static str },
174}
175
176impl PartialEq for InputOutputError {
177 fn eq(&self, _other: &Self) -> bool {
178 false
179 }
180}
181
182pub trait ConfigRepr: Debug + Sized + Serialize + DeserializeOwned {
183 fn load<P>(path: P) -> Result<Self, ConfigError>
185 where
186 P: AsRef<Path>,
187 {
188 let file = File::open(path).context(ReadSnafu)?;
189 let reader = BufReader::new(file);
190
191 serde_yml::from_reader(reader).context(ParseSnafu)
192 }
193
194 fn load_many<P>(path: P) -> Result<Vec<Self>, ConfigError>
196 where
197 P: AsRef<Path>,
198 {
199 let file = File::open(path).context(ReadSnafu)?;
200 let reader = BufReader::new(file);
201
202 serde_yml::from_reader(reader).context(ParseSnafu)
203 }
204
205 fn load_named<P>(path: P) -> Result<BTreeMap<String, Self>, ConfigError>
207 where
208 P: AsRef<Path>,
209 {
210 let file = File::open(path).context(ReadSnafu)?;
211 let reader = BufReader::new(file);
212
213 serde_yml::from_reader(reader).context(ParseSnafu)
214 }
215
216 fn loads_many(data: &str) -> Result<Vec<Self>, ConfigError> {
218 debug!("Loading YAML:\n{data}");
219 serde_yml::from_str(data).context(ParseSnafu)
220 }
221
222 fn loads_named(data: &str) -> Result<BTreeMap<String, Self>, ConfigError> {
224 debug!("Loading YAML:\n{data}");
225 serde_yml::from_str(data).context(ParseSnafu)
226 }
227}
228
229pub(crate) fn epoch_to_str<S>(epoch: &Epoch, serializer: S) -> Result<S::Ok, S::Error>
230where
231 S: Serializer,
232{
233 serializer.serialize_str(&format!("{epoch}"))
234}
235
236pub(crate) fn epoch_from_str<'de, D>(deserializer: D) -> Result<Epoch, D::Error>
238where
239 D: Deserializer<'de>,
240{
241 let s = String::deserialize(deserializer)?;
243 Epoch::from_str(&s).map_err(serde::de::Error::custom)
244}
245
246pub(crate) fn duration_to_str<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
247where
248 S: Serializer,
249{
250 serializer.serialize_str(&format!("{duration}"))
251}
252
253pub(crate) fn duration_from_str<'de, D>(deserializer: D) -> Result<Duration, D::Error>
255where
256 D: Deserializer<'de>,
257{
258 let s = String::deserialize(deserializer)?;
260 Duration::from_str(&s).map_err(serde::de::Error::custom)
261}
262
263pub(crate) fn maybe_duration_to_str<S>(
264 duration: &Option<Duration>,
265 serializer: S,
266) -> Result<S::Ok, S::Error>
267where
268 S: Serializer,
269{
270 if let Some(duration) = duration {
271 duration_to_str(duration, serializer)
272 } else {
273 serializer.serialize_none()
274 }
275}
276
277pub(crate) fn maybe_duration_from_str<'de, D>(deserializer: D) -> Result<Option<Duration>, D::Error>
278where
279 D: Deserializer<'de>,
280{
281 if let Ok(s) = String::deserialize(deserializer) {
282 if let Ok(duration) = Duration::from_str(&s) {
283 Ok(Some(duration))
284 } else {
285 Ok(None)
286 }
287 } else {
288 Ok(None)
289 }
290}