Skip to content

Commit e92fabe

Browse files
authored
Improve sync times (#185)
* Improve tracing output * Improve sync times This is a bit uglier than I would like, but unfortunately we can't ship off the async task join to a tokio task as we spawn so many tasks that tokio won't schedule it. So instead we spawn the rayon scope on a separate thread first. This cuts sync times by ~25+% * Add sanity check This test is kind of redundant to diff_cargo, but I was using it to sanity check that we are 100% not giving cargo any work to do. * Fix lint * Ignore when using git * Update CHANGELOG
1 parent 6ff8b72 commit e92fabe

File tree

8 files changed

+234
-79
lines changed

8 files changed

+234
-79
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
88

99
<!-- next-header -->
1010
## [Unreleased] - ReleaseDate
11+
### Fixed
12+
- [PR#185](https://github.com/EmbarkStudios/cargo-fetcher/pull/185) significantly improved the speed of the sync subcommand.
13+
1114
## [0.14.3] - 2023-08-15
1215
### Fixed
1316
- [PR#184](https://github.com/EmbarkStudios/cargo-fetcher/pull/184) fixed submodule checkout even better this time. For real this time.

src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ impl<'a> fmt::Display for CloudId<'a> {
112112

113113
#[allow(dead_code)]
114114
pub struct GcsLocation<'a> {
115-
bucket: &'a str,
116-
prefix: &'a str,
115+
pub bucket: &'a str,
116+
pub prefix: &'a str,
117117
}
118118

119119
#[allow(dead_code)]

src/sync.rs

+82-60
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ pub async fn registry_index(
108108
Ok(())
109109
}
110110

111-
#[tracing::instrument(skip(pkg))]
111+
#[tracing::instrument(level = "debug", skip_all, fields(name = krate.name, version = krate.version, rev = %rev.id))]
112112
fn sync_git(
113113
db_dir: &Path,
114114
co_dir: &Path,
@@ -126,7 +126,13 @@ fn sync_git(
126126
let crate::git::GitPackage { db, checkout } = pkg;
127127

128128
let unpack_path = db_path.clone();
129-
util::unpack_tar(db, util::Encoding::Zstd, &unpack_path)?;
129+
let compressed = db.len();
130+
let uncompressed = util::unpack_tar(db, util::Encoding::Zstd, &unpack_path)?;
131+
debug!(
132+
compressed = compressed,
133+
uncompressed = uncompressed,
134+
"unpacked db dir"
135+
);
130136

131137
let co_path = co_dir.join(format!("{}/{}", krate.local_id(), rev.short()));
132138

@@ -143,7 +149,13 @@ fn sync_git(
143149
// otherwise do a checkout
144150
match checkout {
145151
Some(checkout) => {
146-
util::unpack_tar(checkout, util::Encoding::Zstd, &co_path)?;
152+
let compressed = checkout.len();
153+
let uncompressed = util::unpack_tar(checkout, util::Encoding::Zstd, &co_path)?;
154+
debug!(
155+
compressed = compressed,
156+
uncompressed = uncompressed,
157+
"unpacked checkout dir"
158+
);
147159
}
148160
None => {
149161
// Do a checkout of the bare clone if we didn't/couldn't unpack the
@@ -158,7 +170,7 @@ fn sync_git(
158170
Ok(())
159171
}
160172

161-
#[tracing::instrument(level = "debug", skip(data))]
173+
#[tracing::instrument(level = "debug", skip_all, fields(name = krate.name, version = krate.version))]
162174
fn sync_package(
163175
cache_dir: &Path,
164176
src_dir: &Path,
@@ -185,7 +197,7 @@ fn sync_package(
185197
f.write_all(&pack_data)?;
186198
f.sync_all()?;
187199

188-
debug!(bytes = pack_data.len(), path = ?packed_path, "wrote pack file to disk");
200+
debug!(bytes = pack_data.len(), "wrote pack file to disk");
189201
Ok(())
190202
},
191203
|| -> anyhow::Result<()> {
@@ -410,15 +422,71 @@ pub async fn crates(ctx: &crate::Ctx) -> anyhow::Result<Summary> {
410422
});
411423
}
412424

413-
let summary = std::sync::Mutex::new(Summary {
425+
let summary = std::sync::Arc::new(std::sync::Mutex::new(Summary {
414426
total_bytes: 0,
415427
bad: 0,
416428
good: 0,
417-
});
429+
}));
430+
431+
let (tx, rx) = crossbeam_channel::unbounded::<(Krate, Pkg)>();
432+
let fs_thread = {
433+
let summary = summary.clone();
434+
let root_dir = root_dir.clone();
435+
436+
std::thread::spawn(move || {
437+
let db_dir = &git_db_dir;
438+
let co_dir = &git_co_dir;
439+
let root_dir = &root_dir;
440+
let summary = &summary;
441+
rayon::scope(|s| {
442+
while let Ok((krate, pkg)) = rx.recv() {
443+
s.spawn(move |_s| {
444+
let synced = match (&krate.source, pkg) {
445+
(Source::Registry(rs), Pkg::Registry(krate_data)) => {
446+
let len = krate_data.len();
447+
let (cache_dir, src_dir) = rs.registry.sync_dirs(root_dir);
448+
if let Err(err) = sync_package(
449+
&cache_dir, &src_dir, &krate, krate_data, &rs.chksum,
450+
) {
451+
error!(krate = %krate, "failed to splat package: {err:#}");
452+
None
453+
} else {
454+
Some(len)
455+
}
456+
}
457+
(Source::Git(gs), Pkg::Git(pkg)) => {
458+
let mut len = pkg.db.len();
459+
460+
if let Some(co) = &pkg.checkout {
461+
len += co.len();
462+
}
463+
464+
match sync_git(db_dir, co_dir, &krate, pkg, &gs.rev) {
465+
Ok(_) => Some(len),
466+
Err(err) => {
467+
error!(krate = %krate, "failed to splat git repo: {err:#}");
468+
None
469+
}
470+
}
471+
}
472+
_ => unreachable!(),
473+
};
474+
475+
let mut sum = summary.lock().unwrap();
476+
if let Some(synced) = synced {
477+
sum.good += 1;
478+
sum.total_bytes += synced;
479+
} else {
480+
sum.bad += 1;
481+
}
482+
});
483+
}
484+
});
485+
})
486+
};
418487

419488
// As each remote I/O op completes, pass it off to the thread pool to do
420489
// the more CPU intensive work of decompression, etc
421-
let (tx, rx) = crossbeam_channel::unbounded();
422490
while let Some(res) = tasks.join_next().await {
423491
let Ok(res) = res else { continue; };
424492

@@ -429,59 +497,13 @@ pub async fn crates(ctx: &crate::Ctx) -> anyhow::Result<Summary> {
429497
}
430498
}
431499

432-
// Need to drop the sender otherwise we'll deadlock waiting for the scope
433-
// to finish on the receiver
500+
// Drop the sender otherwise we'll deadlock
434501
drop(tx);
435502

436-
{
437-
let db_dir = &git_db_dir;
438-
let co_dir = &git_co_dir;
439-
let summary = &summary;
440-
rayon::scope(|s| {
441-
while let Ok((krate, pkg)) = rx.recv() {
442-
s.spawn(move |_s| {
443-
let synced = match (&krate.source, pkg) {
444-
(Source::Registry(rs), Pkg::Registry(krate_data)) => {
445-
let len = krate_data.len();
446-
let (cache_dir, src_dir) = rs.registry.sync_dirs(root_dir);
447-
if let Err(err) =
448-
sync_package(&cache_dir, &src_dir, &krate, krate_data, &rs.chksum)
449-
{
450-
error!(krate = %krate, "failed to splat package: {err:#}");
451-
None
452-
} else {
453-
Some(len)
454-
}
455-
}
456-
(Source::Git(gs), Pkg::Git(pkg)) => {
457-
let mut len = pkg.db.len();
458-
459-
if let Some(co) = &pkg.checkout {
460-
len += co.len();
461-
}
462-
463-
match sync_git(db_dir, co_dir, &krate, pkg, &gs.rev) {
464-
Ok(_) => Some(len),
465-
Err(err) => {
466-
error!("failed to splat git repo: {err:#}");
467-
None
468-
}
469-
}
470-
}
471-
_ => unreachable!(),
472-
};
473-
474-
let mut sum = summary.lock().unwrap();
475-
if let Some(synced) = synced {
476-
sum.good += 1;
477-
sum.total_bytes += synced;
478-
} else {
479-
sum.bad += 1;
480-
}
481-
});
482-
}
483-
});
484-
}
503+
fs_thread.join().expect("failed to join thread");
485504

486-
Ok(summary.into_inner().unwrap())
505+
Ok(std::sync::Arc::into_inner(summary)
506+
.unwrap()
507+
.into_inner()
508+
.unwrap())
487509
}

src/util.rs

+21-8
Original file line numberDiff line numberDiff line change
@@ -58,22 +58,32 @@ use bytes::Bytes;
5858
use std::io;
5959

6060
#[tracing::instrument(level = "debug")]
61-
pub(crate) fn unpack_tar(buffer: Bytes, encoding: Encoding, dir: &Path) -> anyhow::Result<()> {
61+
pub(crate) fn unpack_tar(buffer: Bytes, encoding: Encoding, dir: &Path) -> anyhow::Result<u64> {
62+
struct DecoderWrapper<'z, R: io::Read + io::BufRead> {
63+
/// The total bytes read from the compressed stream
64+
total: u64,
65+
inner: Decoder<'z, R>,
66+
}
67+
6268
#[allow(clippy::large_enum_variant)]
6369
enum Decoder<'z, R: io::Read + io::BufRead> {
6470
Gzip(flate2::read::GzDecoder<R>),
6571
Zstd(zstd::Decoder<'z, R>),
6672
}
6773

68-
impl<'z, R> io::Read for Decoder<'z, R>
74+
impl<'z, R> io::Read for DecoderWrapper<'z, R>
6975
where
7076
R: io::Read + io::BufRead,
7177
{
7278
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
73-
match self {
74-
Self::Gzip(gz) => gz.read(buf),
75-
Self::Zstd(zstd) => zstd.read(buf),
76-
}
79+
let read = match &mut self.inner {
80+
Decoder::Gzip(gz) => gz.read(buf),
81+
Decoder::Zstd(zstd) => zstd.read(buf),
82+
};
83+
84+
let read = read?;
85+
self.total += read as u64;
86+
Ok(read)
7787
}
7888
}
7989

@@ -90,7 +100,10 @@ pub(crate) fn unpack_tar(buffer: Bytes, encoding: Encoding, dir: &Path) -> anyho
90100
Encoding::Zstd => Decoder::Zstd(zstd::Decoder::new(buf_reader)?),
91101
};
92102

93-
let mut archive_reader = tar::Archive::new(decoder);
103+
let mut archive_reader = tar::Archive::new(DecoderWrapper {
104+
total: 0,
105+
inner: decoder,
106+
});
94107

95108
#[cfg(unix)]
96109
#[allow(clippy::unnecessary_cast)]
@@ -125,7 +138,7 @@ pub(crate) fn unpack_tar(buffer: Bytes, encoding: Encoding, dir: &Path) -> anyho
125138
return Err(e).context("failed to unpack");
126139
}
127140

128-
Ok(())
141+
Ok(archive_reader.into_inner().total)
129142
}
130143

131144
#[tracing::instrument(level = "debug")]

tests/diff_cargo.rs

+64
Original file line numberDiff line numberDiff line change
@@ -200,3 +200,67 @@ async fn diff_cargo() {
200200
assert_diff(fetcher_root, cargo_home);
201201
}
202202
}
203+
204+
/// Validates that a cargo sync following a fetcher sync should do nothing
205+
#[tokio::test]
206+
async fn nothing_to_do() {
207+
if std::env::var("CARGO_FETCHER_CRATES_IO_PROTOCOL")
208+
.ok()
209+
.as_deref()
210+
== Some("git")
211+
{
212+
// Git registry is too unstable for diffing as the index changes too often
213+
return;
214+
}
215+
216+
util::hook_logger();
217+
218+
let sync_dir = util::tempdir();
219+
let fs_root = util::tempdir();
220+
221+
{
222+
let (the_krates, registries) = cf::cargo::read_lock_files(
223+
vec!["tests/full/Cargo.lock".into()],
224+
vec![util::crates_io_registry()],
225+
)
226+
.unwrap();
227+
228+
let mut fs_ctx = util::fs_ctx(fs_root.pb(), registries);
229+
fs_ctx.krates = the_krates;
230+
fs_ctx.root_dir = sync_dir.pb();
231+
232+
let registry_sets = fs_ctx.registry_sets();
233+
let the_registry = fs_ctx.registries[0].clone();
234+
235+
cf::mirror::registry_indices(&fs_ctx, std::time::Duration::new(10, 0), registry_sets).await;
236+
cf::mirror::crates(&fs_ctx)
237+
.await
238+
.expect("failed to mirror crates");
239+
240+
fs_ctx.prep_sync_dirs().expect("create base dirs");
241+
cf::sync::crates(&fs_ctx).await.expect("synced crates");
242+
cf::sync::registry_index(&fs_ctx.root_dir, fs_ctx.backend.clone(), the_registry)
243+
.await
244+
.expect("failed to sync index");
245+
}
246+
247+
let output = std::process::Command::new("cargo")
248+
.env("CARGO_HOME", sync_dir.path())
249+
.args([
250+
"fetch",
251+
"--locked",
252+
"--manifest-path",
253+
"tests/full/Cargo.toml",
254+
])
255+
.stdout(std::process::Stdio::piped())
256+
.stderr(std::process::Stdio::piped())
257+
.output()
258+
.unwrap();
259+
260+
let stdout = String::from_utf8(output.stdout).unwrap();
261+
let stderr = String::from_utf8(output.stderr).unwrap();
262+
263+
if !stdout.is_empty() || !stderr.is_empty() {
264+
panic!("expected no output from cargo, got:\nstdout:\n{stdout}\nstderr:{stderr}\n");
265+
}
266+
}

0 commit comments

Comments
 (0)