Skip to content

Commit c49d27c

Browse files
committed
async-multipart: impl Stream for Multipart<Bytes>
1 parent b1d14c1 commit c49d27c

File tree

3 files changed

+82
-0
lines changed

3 files changed

+82
-0
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ url = "2.2"
4949
[dev-dependencies]
5050
# Diff view of test failures
5151
difference = "2.0"
52+
futures = "0.3"
5253
futures-test = "0.3"
5354
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] }
5455
tokio = { version = "1.0", features = ["macros"] }

src/v1/objects/insert.rs

+24
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::{
88
use futures_util::{
99
io::{AsyncRead, Result as FuturesResult},
1010
task::{Context, Poll},
11+
Stream,
1112
};
1213
#[cfg(feature = "async-multipart")]
1314
use pin_utils::unsafe_pinned;
@@ -293,6 +294,29 @@ impl<B: AsyncRead + Unpin> AsyncRead for Multipart<B> {
293294
}
294295
}
295296

297+
#[cfg(feature = "async-multipart")]
298+
impl Stream for Multipart<bytes::Bytes> {
299+
type Item = bytes::Bytes;
300+
301+
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
302+
Poll::Ready(match self.cursor.part {
303+
MultipartPart::Prefix => {
304+
self.cursor.part.next();
305+
Some(self.prefix.clone())
306+
}
307+
MultipartPart::Body => {
308+
self.cursor.part.next();
309+
Some(self.body.clone())
310+
}
311+
MultipartPart::Suffix => {
312+
self.cursor.part.next();
313+
Some(bytes::Bytes::from(MULTI_PART_SUFFIX))
314+
}
315+
MultipartPart::End => None,
316+
})
317+
}
318+
}
319+
296320
impl super::Object {
297321
/// Stores a new object and metadata.
298322
///

tests/objects.rs

+57
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,63 @@ fn insert_multipart_async() {
392392
util::cmp_strings(&exp_body, &act_body);
393393
}
394394

395+
#[cfg(feature = "async-multipart")]
396+
#[test]
397+
fn insert_multipart_stream_bytes() {
398+
use bytes::{BufMut, Bytes, BytesMut};
399+
400+
let metadata = Metadata {
401+
name: Some("good_name".to_owned()),
402+
content_type: Some("text/plain".to_owned()),
403+
content_encoding: Some("gzip".to_owned()),
404+
content_disposition: Some("attachment; filename=\"good name.jpg\"".to_owned()),
405+
metadata: Some(
406+
["akey"]
407+
.iter()
408+
.map(|k| (String::from(*k), format!("{}value", k)))
409+
.collect(),
410+
),
411+
..Default::default()
412+
};
413+
414+
let insert_req = Object::insert_multipart(
415+
&BucketName::non_validated("bucket"),
416+
Bytes::from(TEST_CONTENT),
417+
TEST_CONTENT.len() as u64,
418+
&metadata,
419+
None,
420+
)
421+
.unwrap();
422+
423+
let exp_body = format!(
424+
"--{b}\ncontent-type: application/json; charset=utf-8\n\n{}\n--{b}\ncontent-type: text/plain\n\n{}\n--{b}--",
425+
serde_json::to_string(&metadata).unwrap(),
426+
TEST_CONTENT,
427+
b = "tame_gcs"
428+
);
429+
430+
let expected = http::Request::builder()
431+
.method(http::Method::POST)
432+
.uri("https://www.googleapis.com/upload/storage/v1/b/bucket/o?uploadType=multipart&prettyPrint=false")
433+
.header(http::header::CONTENT_TYPE, "multipart/related; boundary=tame_gcs")
434+
.header(http::header::CONTENT_LENGTH, 5758)
435+
.body(exp_body)
436+
.unwrap();
437+
438+
let (exp_parts, exp_body) = expected.into_parts();
439+
let (act_parts, act_multipart) = insert_req.into_parts();
440+
441+
util::cmp_strings(&format!("{:#?}", exp_parts), &format!("{:#?}", act_parts));
442+
443+
let mut act_body = BytesMut::with_capacity(2 * 1024);
444+
for chunk in futures::executor::block_on_stream(act_multipart) {
445+
act_body.put(chunk);
446+
}
447+
let act_body = String::from_utf8_lossy(&act_body);
448+
449+
util::cmp_strings(&exp_body, &act_body);
450+
}
451+
395452
#[test]
396453
fn patches() {
397454
let mut md = std::collections::BTreeMap::new();

0 commit comments

Comments
 (0)