object_store/
delimited.rs1use std::collections::VecDeque;
21
22use bytes::Bytes;
23use futures_util::{Stream, StreamExt};
24
25use super::Result;
26
27#[derive(Debug, thiserror::Error)]
28enum Error {
29 #[error("encountered unterminated string")]
30 UnterminatedString,
31
32 #[error("encountered trailing escape character")]
33 TrailingEscape,
34}
35
36impl From<Error> for super::Error {
37 fn from(err: Error) -> Self {
38 Self::Generic {
39 store: "LineDelimiter",
40 source: Box::new(err),
41 }
42 }
43}
44
45const QUOTE: u8 = b'"';
47
48const NEWLINE: u8 = b'\n';
50
51const ESCAPE: u8 = b'\\';
53
54#[derive(Debug, Default)]
57struct LineDelimiter {
58 complete: VecDeque<Bytes>,
60 remainder: Vec<u8>,
62 is_escape: bool,
64 is_quote: bool,
66}
67
68impl LineDelimiter {
69 fn new() -> Self {
71 Self::default()
72 }
73
74 fn push(&mut self, val: impl Into<Bytes>) {
76 let val: Bytes = val.into();
77
78 let is_escape = &mut self.is_escape;
79 let is_quote = &mut self.is_quote;
80 let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| {
81 if *is_escape {
82 *is_escape = false;
83 None
84 } else if *v == ESCAPE {
85 *is_escape = true;
86 None
87 } else if *v == QUOTE {
88 *is_quote = !*is_quote;
89 None
90 } else if *is_quote {
91 None
92 } else {
93 (*v == NEWLINE).then_some(idx + 1)
94 }
95 });
96
97 let start_offset = match self.remainder.is_empty() {
98 true => 0,
99 false => match record_ends.next() {
100 Some(idx) => {
101 self.remainder.extend_from_slice(&val[0..idx]);
102 self.complete
103 .push_back(Bytes::from(std::mem::take(&mut self.remainder)));
104 idx
105 }
106 None => {
107 self.remainder.extend_from_slice(&val);
108 return;
109 }
110 },
111 };
112 let end_offset = record_ends.next_back().unwrap_or(start_offset);
113 if start_offset != end_offset {
114 self.complete.push_back(val.slice(start_offset..end_offset));
115 }
116
117 if end_offset != val.len() {
118 self.remainder.extend_from_slice(&val[end_offset..])
119 }
120 }
121
122 fn finish(&mut self) -> Result<bool> {
126 if !self.remainder.is_empty() {
127 if self.is_quote {
128 Err(Error::UnterminatedString)?;
129 }
130 if self.is_escape {
131 Err(Error::TrailingEscape)?;
132 }
133
134 self.complete
135 .push_back(Bytes::from(std::mem::take(&mut self.remainder)))
136 }
137 Ok(self.complete.is_empty())
138 }
139}
140
141impl Iterator for LineDelimiter {
142 type Item = Bytes;
143
144 fn next(&mut self) -> Option<Self::Item> {
145 self.complete.pop_front()
146 }
147}
148
149pub fn newline_delimited_stream<S>(s: S) -> impl Stream<Item = Result<Bytes>>
153where
154 S: Stream<Item = Result<Bytes>> + Unpin,
155{
156 let delimiter = LineDelimiter::new();
157
158 futures_util::stream::unfold(
159 (s, delimiter, false),
160 |(mut s, mut delimiter, mut exhausted)| async move {
161 loop {
162 if let Some(next) = delimiter.next() {
163 return Some((Ok(next), (s, delimiter, exhausted)));
164 } else if exhausted {
165 return None;
166 }
167
168 match s.next().await {
169 Some(Ok(bytes)) => delimiter.push(bytes),
170 Some(Err(e)) => return Some((Err(e), (s, delimiter, exhausted))),
171 None => {
172 exhausted = true;
173 match delimiter.finish() {
174 Ok(true) => return None,
175 Ok(false) => continue,
176 Err(e) => return Some((Err(e), (s, delimiter, exhausted))),
177 }
178 }
179 }
180 }
181 },
182 )
183}
184
185#[cfg(test)]
186mod tests {
187 use futures_util::stream::{BoxStream, TryStreamExt};
188
189 use super::*;
190
191 #[test]
192 fn test_delimiter() {
193 let mut delimiter = LineDelimiter::new();
194 delimiter.push("hello\nworld");
195 delimiter.push("\n\n");
196
197 assert_eq!(delimiter.next().unwrap(), Bytes::from("hello\n"));
198 assert_eq!(delimiter.next().unwrap(), Bytes::from("world\n"));
199 assert_eq!(delimiter.next().unwrap(), Bytes::from("\n"));
200 assert!(delimiter.next().is_none());
201 }
202
203 #[test]
204 fn test_delimiter_escaped() {
205 let mut delimiter = LineDelimiter::new();
206 delimiter.push("");
207 delimiter.push("fo\\\n\"foo");
208 delimiter.push("bo\n\"bar\n");
209 delimiter.push("\"he");
210 delimiter.push("llo\"\n");
211 assert_eq!(
212 delimiter.next().unwrap(),
213 Bytes::from("fo\\\n\"foobo\n\"bar\n")
214 );
215 assert_eq!(delimiter.next().unwrap(), Bytes::from("\"hello\"\n"));
216 assert!(delimiter.next().is_none());
217
218 delimiter.push("\"foo\nbar\",\"fiz\\\"inner\\\"\"\nhello");
220 assert!(!delimiter.finish().unwrap());
221
222 assert_eq!(
223 delimiter.next().unwrap(),
224 Bytes::from("\"foo\nbar\",\"fiz\\\"inner\\\"\"\n")
225 );
226 assert_eq!(delimiter.next().unwrap(), Bytes::from("hello"));
227 assert!(delimiter.finish().unwrap());
228 assert!(delimiter.next().is_none());
229 }
230
231 #[tokio::test]
232 async fn test_delimiter_stream() {
233 let input = vec!["hello\nworld\nbin", "go\ncup", "cakes"];
234 let input_stream =
235 futures_util::stream::iter(input.into_iter().map(|s| Ok(Bytes::from(s))));
236 let stream = newline_delimited_stream(input_stream);
237
238 let results: Vec<_> = stream.try_collect().await.unwrap();
239 assert_eq!(
240 results,
241 vec![
242 Bytes::from("hello\nworld\n"),
243 Bytes::from("bingo\n"),
244 Bytes::from("cupcakes")
245 ]
246 )
247 }
248 #[tokio::test]
249 async fn test_delimiter_unfold_stream() {
250 let input_stream: BoxStream<'static, Result<Bytes>> = futures_util::stream::unfold(
251 VecDeque::from(["hello\nworld\nbin", "go\ncup", "cakes"]),
252 |mut input| async move {
253 if !input.is_empty() {
254 Some((Ok(Bytes::from(input.pop_front().unwrap())), input))
255 } else {
256 None
257 }
258 },
259 )
260 .boxed();
261 let stream = newline_delimited_stream(input_stream);
262
263 let results: Vec<_> = stream.try_collect().await.unwrap();
264 assert_eq!(
265 results,
266 vec![
267 Bytes::from("hello\nworld\n"),
268 Bytes::from("bingo\n"),
269 Bytes::from("cupcakes")
270 ]
271 )
272 }
273}