Skip to main content

object_store/
delimited.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utility for streaming newline delimited files from object storage
19
20use std::collections::VecDeque;
21
22use bytes::Bytes;
23use futures_util::{Stream, StreamExt};
24
25use super::Result;
26
27#[derive(Debug, thiserror::Error)]
28enum Error {
29    #[error("encountered unterminated string")]
30    UnterminatedString,
31
32    #[error("encountered trailing escape character")]
33    TrailingEscape,
34}
35
36impl From<Error> for super::Error {
37    fn from(err: Error) -> Self {
38        Self::Generic {
39            store: "LineDelimiter",
40            source: Box::new(err),
41        }
42    }
43}
44
45/// The ASCII encoding of `"`
46const QUOTE: u8 = b'"';
47
48/// The ASCII encoding of `\n`
49const NEWLINE: u8 = b'\n';
50
51/// The ASCII encoding of `\`
52const ESCAPE: u8 = b'\\';
53
54/// [`LineDelimiter`] is provided with a stream of [`Bytes`] and returns an iterator
55/// of [`Bytes`] containing a whole number of new line delimited records
56#[derive(Debug, Default)]
57struct LineDelimiter {
58    /// Complete chunks of [`Bytes`]
59    complete: VecDeque<Bytes>,
60    /// Remainder bytes that form the next record
61    remainder: Vec<u8>,
62    /// True if the last character was the escape character
63    is_escape: bool,
64    /// True if currently processing a quoted string
65    is_quote: bool,
66}
67
68impl LineDelimiter {
69    /// Creates a new [`LineDelimiter`] with the provided delimiter
70    fn new() -> Self {
71        Self::default()
72    }
73
74    /// Adds the next set of [`Bytes`]
75    fn push(&mut self, val: impl Into<Bytes>) {
76        let val: Bytes = val.into();
77
78        let is_escape = &mut self.is_escape;
79        let is_quote = &mut self.is_quote;
80        let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| {
81            if *is_escape {
82                *is_escape = false;
83                None
84            } else if *v == ESCAPE {
85                *is_escape = true;
86                None
87            } else if *v == QUOTE {
88                *is_quote = !*is_quote;
89                None
90            } else if *is_quote {
91                None
92            } else {
93                (*v == NEWLINE).then_some(idx + 1)
94            }
95        });
96
97        let start_offset = match self.remainder.is_empty() {
98            true => 0,
99            false => match record_ends.next() {
100                Some(idx) => {
101                    self.remainder.extend_from_slice(&val[0..idx]);
102                    self.complete
103                        .push_back(Bytes::from(std::mem::take(&mut self.remainder)));
104                    idx
105                }
106                None => {
107                    self.remainder.extend_from_slice(&val);
108                    return;
109                }
110            },
111        };
112        let end_offset = record_ends.next_back().unwrap_or(start_offset);
113        if start_offset != end_offset {
114            self.complete.push_back(val.slice(start_offset..end_offset));
115        }
116
117        if end_offset != val.len() {
118            self.remainder.extend_from_slice(&val[end_offset..])
119        }
120    }
121
122    /// Marks the end of the stream, delimiting any remaining bytes
123    ///
124    /// Returns `true` if there is no remaining data to be read
125    fn finish(&mut self) -> Result<bool> {
126        if !self.remainder.is_empty() {
127            if self.is_quote {
128                Err(Error::UnterminatedString)?;
129            }
130            if self.is_escape {
131                Err(Error::TrailingEscape)?;
132            }
133
134            self.complete
135                .push_back(Bytes::from(std::mem::take(&mut self.remainder)))
136        }
137        Ok(self.complete.is_empty())
138    }
139}
140
141impl Iterator for LineDelimiter {
142    type Item = Bytes;
143
144    fn next(&mut self) -> Option<Self::Item> {
145        self.complete.pop_front()
146    }
147}
148
149/// Given a [`Stream`] of [`Bytes`] returns a [`Stream`] where each
150/// yielded [`Bytes`] contains a whole number of new line delimited records
151/// accounting for `\` style escapes and `"` quotes
152pub fn newline_delimited_stream<S>(s: S) -> impl Stream<Item = Result<Bytes>>
153where
154    S: Stream<Item = Result<Bytes>> + Unpin,
155{
156    let delimiter = LineDelimiter::new();
157
158    futures_util::stream::unfold(
159        (s, delimiter, false),
160        |(mut s, mut delimiter, mut exhausted)| async move {
161            loop {
162                if let Some(next) = delimiter.next() {
163                    return Some((Ok(next), (s, delimiter, exhausted)));
164                } else if exhausted {
165                    return None;
166                }
167
168                match s.next().await {
169                    Some(Ok(bytes)) => delimiter.push(bytes),
170                    Some(Err(e)) => return Some((Err(e), (s, delimiter, exhausted))),
171                    None => {
172                        exhausted = true;
173                        match delimiter.finish() {
174                            Ok(true) => return None,
175                            Ok(false) => continue,
176                            Err(e) => return Some((Err(e), (s, delimiter, exhausted))),
177                        }
178                    }
179                }
180            }
181        },
182    )
183}
184
185#[cfg(test)]
186mod tests {
187    use futures_util::stream::{BoxStream, TryStreamExt};
188
189    use super::*;
190
191    #[test]
192    fn test_delimiter() {
193        let mut delimiter = LineDelimiter::new();
194        delimiter.push("hello\nworld");
195        delimiter.push("\n\n");
196
197        assert_eq!(delimiter.next().unwrap(), Bytes::from("hello\n"));
198        assert_eq!(delimiter.next().unwrap(), Bytes::from("world\n"));
199        assert_eq!(delimiter.next().unwrap(), Bytes::from("\n"));
200        assert!(delimiter.next().is_none());
201    }
202
203    #[test]
204    fn test_delimiter_escaped() {
205        let mut delimiter = LineDelimiter::new();
206        delimiter.push("");
207        delimiter.push("fo\\\n\"foo");
208        delimiter.push("bo\n\"bar\n");
209        delimiter.push("\"he");
210        delimiter.push("llo\"\n");
211        assert_eq!(
212            delimiter.next().unwrap(),
213            Bytes::from("fo\\\n\"foobo\n\"bar\n")
214        );
215        assert_eq!(delimiter.next().unwrap(), Bytes::from("\"hello\"\n"));
216        assert!(delimiter.next().is_none());
217
218        // Verify can push further data
219        delimiter.push("\"foo\nbar\",\"fiz\\\"inner\\\"\"\nhello");
220        assert!(!delimiter.finish().unwrap());
221
222        assert_eq!(
223            delimiter.next().unwrap(),
224            Bytes::from("\"foo\nbar\",\"fiz\\\"inner\\\"\"\n")
225        );
226        assert_eq!(delimiter.next().unwrap(), Bytes::from("hello"));
227        assert!(delimiter.finish().unwrap());
228        assert!(delimiter.next().is_none());
229    }
230
231    #[tokio::test]
232    async fn test_delimiter_stream() {
233        let input = vec!["hello\nworld\nbin", "go\ncup", "cakes"];
234        let input_stream =
235            futures_util::stream::iter(input.into_iter().map(|s| Ok(Bytes::from(s))));
236        let stream = newline_delimited_stream(input_stream);
237
238        let results: Vec<_> = stream.try_collect().await.unwrap();
239        assert_eq!(
240            results,
241            vec![
242                Bytes::from("hello\nworld\n"),
243                Bytes::from("bingo\n"),
244                Bytes::from("cupcakes")
245            ]
246        )
247    }
248    #[tokio::test]
249    async fn test_delimiter_unfold_stream() {
250        let input_stream: BoxStream<'static, Result<Bytes>> = futures_util::stream::unfold(
251            VecDeque::from(["hello\nworld\nbin", "go\ncup", "cakes"]),
252            |mut input| async move {
253                if !input.is_empty() {
254                    Some((Ok(Bytes::from(input.pop_front().unwrap())), input))
255                } else {
256                    None
257                }
258            },
259        )
260        .boxed();
261        let stream = newline_delimited_stream(input_stream);
262
263        let results: Vec<_> = stream.try_collect().await.unwrap();
264        assert_eq!(
265            results,
266            vec![
267                Bytes::from("hello\nworld\n"),
268                Bytes::from("bingo\n"),
269                Bytes::from("cupcakes")
270            ]
271        )
272    }
273}