Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement read trait for assets for full reader functionality #24

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ thiserror = "1.0.30"
wapc = "1.0.0"
wapc-guest = "1.0"

[build-dependencies]
git2 = "0.13"

[workspace]
members = [
"samples",
Expand Down
14 changes: 11 additions & 3 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,27 @@ pub fn execute(exe: &str, args: &[&str]) {
}
}

fn get_git_hash() -> String {
let repo = git2::Repository::open(".").unwrap();
let head = repo.head().unwrap();
let head_oid = head.target().unwrap();
let head_commit = repo.find_commit(head_oid).unwrap();
head_commit.id().to_string()
}

fn setup_version() {
// Retrieve the Cargo version from the environment variable
let cargo_version = env::var("CARGO_PKG_VERSION").unwrap_or_else(|_| "unknown".to_string());

let commit_hash = get_git_hash();
// Write the version to a file
let out_dir = env::var("OUT_DIR").unwrap();
let dest_path = format!("{}/version.rs", out_dir);
let mut file = File::create(dest_path).unwrap();

write!(
&mut file,
"pub const CARGO_VERSION: &str = \"{}\";\n\n",
cargo_version
"pub const CARGO_VERSION: &str = \"{}\";\n\n pub const COMMIT_HASH: &str = \"{}\";\n",
cargo_version, commit_hash
)
.unwrap();
}
Expand Down
89 changes: 33 additions & 56 deletions samples/external-img/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ extern crate serde_derive;
extern crate serde_json;

use elvwasm::{
bccontext_fabric_io::FabricStreamReader, bccontext_fabric_io::FabricStreamWriter,
implement_bitcode_module, jpc, register_handler, BitcodeContext, FetchResult, SystemTimeResult,
};

use serde_derive::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::io::{BufWriter, ErrorKind, SeekFrom, Write};
use std::io::{BufWriter, Write};
use std::path::Path;

use elvwasm::ErrorKinds;
Expand Down Expand Up @@ -201,15 +203,17 @@ fn do_bulk_download(bcc: &mut BitcodeContext) -> CallResult {
}
None => DEF_CAP,
};
let total_size = 0;
let mut fw = FabricWriter::new(bcc, total_size);
let mut fw = FabricStreamWriter::new(bcc, "fos".to_string(), 0);
{
let bw = BufWriter::with_capacity(buf_cap, &mut fw);

//let zip = GzEncoder::new(bw, flate2::Compression::default());
let mut a = tar::Builder::new(bw);
let time_cur: SystemTimeResult = bcc.q_system_time().try_into()?;
let rsr = bcc.read_stream_chunked("fis".to_string(), 10000000)?;
let mut fir = FabricStreamReader::new("fis".to_string(), bcc);
let mut buffer = Vec::new();
std::io::copy(&mut fir, &mut buffer)?;
let rsr: Vec<u8> = buffer;

let params: Vec<String> = if !rsr.is_empty() {
let p: serde_json::Value = serde_json::from_slice(&rsr)?;
Expand Down Expand Up @@ -249,11 +253,21 @@ fn do_bulk_download(bcc: &mut BitcodeContext) -> CallResult {
};

let mut header = tar::Header::new_gnu();
let read_bytes = bcc.read_stream_chunked(exr.body, 10000000)?;
header.set_size(read_bytes.len() as u64);
header.set_cksum();
header.set_mtime(time_cur.time);
header.set_mode(0o644);
let sz_file: i32 = exr
.headers
.get("Content-Length")
.unwrap_or(&vec!["0".to_string()])[0]
.parse::<i32>()
.unwrap_or(0);
bcc.log_debug(&std::format!("Bulk download asset {p} size = {sz_file}"))?;
if sz_file < 0 {
v_file_status.push(SummaryElement {
asset: format!("{0} Error=Size is negative", p),
status: "failed".to_string(),
});
continue;
}

let filename: String = exr
.headers
.get("Content-Disposition")
Expand All @@ -266,7 +280,13 @@ fn do_bulk_download(bcc: &mut BitcodeContext) -> CallResult {
.map(|s| s.trim_matches(|c| c == '"' || c == '\''))
.ok_or(ErrorKinds::NotExist("filename= not found".to_string()))?
.to_string();
a.append_data(&mut header, &filename, read_bytes.as_slice())?;
let fsr = FabricStreamReader::new(exr.body.clone(), bcc);
header.set_size(sz_file as u64);
header.set_mtime(time_cur.time);
header.set_mode(0o644);
header.set_path(&filename)?;
header.set_cksum();
a.append(&header, fsr)?;
v_file_status.push(SummaryElement {
asset: filename.to_string(),
status: "success".to_string(),
Expand Down Expand Up @@ -315,10 +335,10 @@ fn do_single_asset(

let exr: FetchResult = get_single_offering_image(bcc, &result.url, is_video).try_into()?;

let body_size = 0;
let sid = exr.body;
let input = bcc.read_stream_chunked(sid.to_string(), 1000000)?;
bcc.write_stream("fos", &input)?;
let mut fsr = FabricStreamReader::new(sid.clone(), bcc);
let mut fsw = FabricStreamWriter::new(bcc, "fos".to_string(), 0);
let body_size = std::io::copy(&mut fsr, &mut fsw)? as usize;
let mut filename = meta
.get("title")
.ok_or(ErrorKinds::NotExist("title not found in meta".to_string()))?
Expand Down Expand Up @@ -406,49 +426,6 @@ impl TryFrom<CallResult> for ComputeCallResult {
}
}

//FabricWriter is a struct that implements the Write trait
//The struct is used to write the image bits to the qfab based stream
// The is no buffer in the struct as the BufWriter will write immediately to "fos" of qfab's context
#[derive(Debug)]
struct FabricWriter<'a> {
bcc: &'a BitcodeContext,
size: usize,
}

impl FabricWriter<'_> {
fn new(bcc: &BitcodeContext, sz: usize) -> FabricWriter<'_> {
FabricWriter { bcc, size: sz }
}
}
impl std::io::Write for FabricWriter<'_> {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
match self.bcc.write_stream("fos", buf) {
Ok(s) => {
let w: elvwasm::WritePartResult = serde_json::from_slice(&s)?;
self.size += w.written;
Ok(w.written)
}
Err(e) => Err(std::io::Error::new(ErrorKind::Other, e)),
}
}

fn flush(&mut self) -> Result<(), std::io::Error> {
// Nothing to flush. The BufWriter will handle its buffer independant using writes
Ok(())
}
}

impl std::io::Seek for FabricWriter<'_> {
fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
match pos {
SeekFrom::Start(_s) => {}
SeekFrom::Current(_s) => {}
SeekFrom::End(_s) => {}
}
Ok(self.size as u64)
}
}

#[test]
fn test_image_url_generation() {
let meta = json!({
Expand Down
15 changes: 9 additions & 6 deletions samples/external/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ extern crate serde_json;
extern crate scopeguard;

use base64::{engine::general_purpose, Engine as _};
use elvwasm::bccontext_fabric_io::FabricStreamWriter;
use elvwasm::BitcodeContext;
use elvwasm::{
implement_bitcode_module, jpc, register_handler, CreatePartResult, ErrorKinds,
ExternalCallResult, FinalizeCallResult, NewStreamResult,
};
use serde_json::json;
use std::convert::TryInto;
use std::io::Write;

implement_bitcode_module!("external", do_external, "failme", do_external_fail);

Expand All @@ -29,7 +31,7 @@ fn do_external_fail(bcc: &mut BitcodeContext) -> CallResult {
let tar_hash = &qp
.get("tar_hash")
.ok_or(ErrorKinds::Invalid("tar_hash not present".to_string()))?[0];
bcc.log_info(&format!("img_hash ={img_hash:?} tar_hash = {tar_hash:?}"))?;
bcc.log_debug(&format!("img_hash ={img_hash:?} tar_hash = {tar_hash:?}"))?;
let params = json!({
"http" : {
"verb" : "some",
Expand Down Expand Up @@ -59,7 +61,8 @@ fn do_external_fail(bcc: &mut BitcodeContext) -> CallResult {
bcc.log_debug(&format!("Closing part stream {}", &stream_img.stream_id)).unwrap_or_default();
let _ = bcc.close_stream(stream_img.stream_id.clone());
}
bcc.write_stream(&stream_img.stream_id, imgbits)?;
let mut fsw = FabricStreamWriter::new(bcc, stream_img.stream_id.clone(), imgbits.len());
fsw.write_all(&imgbits)?;
let imgpart: CreatePartResult = bcc
.q_create_part_from_stream(&bcc.request.q_info.write_token, &stream_img.stream_id)
.try_into()?;
Expand Down Expand Up @@ -110,7 +113,7 @@ fn do_external(bcc: &mut BitcodeContext) -> CallResult {
let tar_hash = &qp
.get("tar_hash")
.ok_or(ErrorKinds::Invalid("tar_hash not present".to_string()))?[0];
bcc.log_info(&format!("img_hash ={img_hash:?} tar_hash = {tar_hash:?}"))?;
bcc.log_debug(&format!("img_hash ={img_hash:?} tar_hash = {tar_hash:?}"))?;
let params = json!({
"http" : {
"verb" : "GET",
Expand All @@ -128,7 +131,7 @@ fn do_external(bcc: &mut BitcodeContext) -> CallResult {
let exr: ExternalCallResult = bcc
.call_external_bitcode("image", &params, img_obj, "builtin")
.try_into()?;
bcc.log_info("here")?;
bcc.log_debug("do external")?;
let imgbits = &general_purpose::STANDARD.decode(&exr.fout)?;
console_log(&format!(
"imgbits decoded size = {} fout size = {}",
Expand Down Expand Up @@ -170,12 +173,12 @@ fn do_external(bcc: &mut BitcodeContext) -> CallResult {
.call_external_bitcode("tar", &tar_params, &fc.qhash, tar_hash)
.try_into()?;
let tarbits = &general_purpose::STANDARD.decode(&exr_tar.fout)?;
bcc.log_info(&format!(
bcc.log_debug(&format!(
"fout size = {} tar_ bit len = {}",
&exr_tar.fout.len(),
tarbits.len()
))?;
bcc.callback(200, "application/zip", tarbits.len())?;
bcc.write_stream("fos", tarbits)?;
FabricStreamWriter::new(bcc, "fos".to_string(), tarbits.len()).write_all(tarbits)?;
bcc.make_success_json(&json!({}))
}
4 changes: 2 additions & 2 deletions samples/lro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ fn do_lro(bcc: &mut elvwasm::BitcodeContext) -> CallResult {
fn do_lro_callback(bcc: &mut elvwasm::BitcodeContext) -> CallResult {
let http_p = &bcc.request.params.http;
let _qp = &http_p.query;
bcc.log_info("IN CALLBACK!!!!!!!")?;
bcc.log_debug("lro callback")?;
let mr: ModifyResult = bcc.q_modify_content().try_into()?;
bcc.log_info(&format!("write token = {}", mr.qwtoken))?;
bcc.log_debug(&format!("write token = {}", mr.qwtoken))?;
bcc.make_success_json(&json!({}))
}
71 changes: 8 additions & 63 deletions samples/objtar/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,70 +8,15 @@ extern crate serde_json;
extern crate scopeguard;

use elvwasm::{
implement_bitcode_module, jpc, register_handler, BitcodeContext, NewStreamResult, QPartList,
SystemTimeResult,
bccontext_fabric_io::FabricStreamReader, bccontext_fabric_io::FabricStreamWriter,
implement_bitcode_module, jpc, register_handler, NewStreamResult, QPartList, SystemTimeResult,
};
use flate2::write::GzEncoder;
use serde_json::json;
use std::io::{BufWriter, ErrorKind, SeekFrom, Write};
use std::io::{BufWriter, Write};

implement_bitcode_module!("tar", do_tar_from_obj, "content", do_tar_from_obj);

#[derive(Debug)]
struct FabricWriter<'a> {
bcc: &'a BitcodeContext,
size: usize,
}

impl FabricWriter<'_> {
fn new(bcc: &BitcodeContext, sz: usize) -> FabricWriter<'_> {
FabricWriter { bcc, size: sz }
}
}
impl std::io::Write for FabricWriter<'_> {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
match self.bcc.write_stream("fos", buf) {
Ok(s) => {
self.bcc
.log_debug(&format!("Wrote {} bytes", buf.len()))
.unwrap_or_default(); // to gobble the log result
let w: elvwasm::WritePartResult = serde_json::from_slice(&s)?;
self.size += w.written;
Ok(w.written)
}
Err(e) => Err(std::io::Error::new(ErrorKind::Other, e)),
}
}

fn flush(&mut self) -> Result<(), std::io::Error> {
// Nothing to flush. The BufWriter will handle its buffer independant using writes
Ok(())
}
}

impl std::io::Seek for FabricWriter<'_> {
fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
match pos {
SeekFrom::Start(s) => {
self.bcc
.log_debug(&format!("SEEK from START {s}"))
.unwrap_or_default();
}
SeekFrom::Current(s) => {
self.bcc
.log_debug(&format!("SEEK from CURRENT {s}"))
.unwrap_or_default();
}
SeekFrom::End(s) => {
self.bcc
.log_debug(&format!("SEEK from END {s}"))
.unwrap_or_default();
}
}
Ok(self.size as u64)
}
}

#[no_mangle]
fn do_tar_from_obj(bcc: &mut elvwasm::BitcodeContext) -> CallResult {
let http_p = &bcc.request.params.http;
Expand All @@ -90,7 +35,7 @@ fn do_tar_from_obj(bcc: &mut elvwasm::BitcodeContext) -> CallResult {
None => DEF_CAP,
};
let total_size = 0;
let mut fw = FabricWriter::new(bcc, total_size);
let mut fw = FabricStreamWriter::new(bcc, "fos".to_string(), total_size);
{
let bw = BufWriter::with_capacity(buf_cap, &mut fw);

Expand All @@ -113,13 +58,13 @@ fn do_tar_from_obj(bcc: &mut elvwasm::BitcodeContext) -> CallResult {
-1,
false,
)?;
let usz = part.size.try_into()?;
let data = bcc.read_stream(stream_wm.stream_id.clone(), usz)?;
let usz = part.size as u64;
let fsr = FabricStreamReader::new(stream_wm.stream_id.clone(), bcc);
let mut header = tar::Header::new_gnu();
header.set_size(usz as u64);
header.set_size(usz);
header.set_cksum();
header.set_mtime(time_cur.time);
a.append_data(&mut header, part.hash.clone(), data.as_slice())?;
a.append_data(&mut header, part.hash.clone(), fsr)?;
}
a.finish()?;
let mut finished_writer = a.into_inner()?;
Expand Down
Loading
Loading