Skip to content

Commit 5281b53

Browse files
committed
Enable threaded closing on all platforms.
Set RUSTUP_CLOSE_THREADS=disabled to force single threaded IO, or to a specific number if desired for testing/tuning. This may improve #1867, but has no impact on #1866 due to the coarse lock around the fd-handle table inside WSL.
1 parent a540515 commit 5281b53

File tree

2 files changed

+54
-8
lines changed

2 files changed

+54
-8
lines changed

README.md

+5
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,11 @@ Command | Description
611611
- `RUSTUP_UPDATE_ROOT` (default `https://static.rust-lang.org/rustup`)
612612
Sets the root URL for downloading self-updates.
613613

614+
- `RUSTUP_CLOSE_THREADS` (defaults to reported cpu count). Sets the
615+
number of threads to perform close IO in. Set to `disabled` to force
616+
single-threaded IO for troubleshooting, or an arbitrary number to
617+
override automatic detection.
618+
614619
## Other installation methods
615620

616621
The primary installation method, as described at https://rustup.rs, differs by platform:

src/dist/component/package.rs

+49-8
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::utils::notifications::Notification;
1111
use crate::utils::utils;
1212

1313
use std::collections::HashSet;
14+
use std::env;
1415
use std::fmt;
1516
use std::io::Read;
1617
use std::path::{Path, PathBuf};
@@ -214,8 +215,11 @@ impl<'a> TarPackage<'a> {
214215
}
215216
}
216217

217-
#[cfg(windows)]
218-
mod unpacker {
218+
trait Unpacker {
219+
fn handle(&mut self, unpacked: tar::Unpacked);
220+
}
221+
222+
mod threadedunpacker {
219223
use std::sync::atomic::{AtomicUsize, Ordering};
220224
use std::sync::Arc;
221225
use threadpool;
@@ -239,12 +243,33 @@ mod unpacker {
239243
.build();
240244
Unpacker {
241245
n_files: Arc::new(AtomicUsize::new(0)),
242-
pool: pool,
243-
notify_handler: notify_handler,
246+
pool,
247+
notify_handler,
244248
}
245249
}
246250

247-
pub fn handle(&mut self, unpacked: tar::Unpacked) {
251+
pub fn new_with_threads(
252+
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
253+
thread_count: usize,
254+
) -> Self {
255+
// Defaults to hardware thread count threads; this is suitable for
256+
// our needs as IO bound operations tend to show up as write latencies
257+
// rather than close latencies, so we don't need to look at
258+
// more threads to get more IO dispatched at this stage in the process.
259+
let pool = threadpool::Builder::new()
260+
.thread_name("CloseHandle".into())
261+
.num_threads(thread_count)
262+
.build();
263+
Unpacker {
264+
n_files: Arc::new(AtomicUsize::new(0)),
265+
pool,
266+
notify_handler,
267+
}
268+
}
269+
}
270+
271+
impl<'a> super::Unpacker for Unpacker<'a> {
272+
fn handle(&mut self, unpacked: tar::Unpacked) {
248273
if let tar::Unpacked::File(f) = unpacked {
249274
self.n_files.fetch_add(1, Ordering::Relaxed);
250275
let n_files = self.n_files.clone();
@@ -307,15 +332,16 @@ mod unpacker {
307332
}
308333
}
309334

310-
#[cfg(not(windows))]
311335
mod unpacker {
312336
use crate::utils::notifications::Notification;
313337
pub struct Unpacker {}
314338
impl Unpacker {
315339
pub fn new<'a>(_notify_handler: Option<&'a dyn Fn(Notification<'_>)>) -> Unpacker {
316340
Unpacker {}
317341
}
318-
pub fn handle(&mut self, _unpacked: tar::Unpacked) {}
342+
}
343+
impl super::Unpacker for Unpacker {
344+
fn handle(&mut self, _unpacked: tar::Unpacked) {}
319345
}
320346
}
321347

@@ -324,7 +350,22 @@ fn unpack_without_first_dir<'a, R: Read>(
324350
path: &Path,
325351
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
326352
) -> Result<()> {
327-
let mut unpacker = unpacker::Unpacker::new(notify_handler);
353+
let mut unpacker : Box<dyn Unpacker> =
354+
// If this gets lots of use, consider exposing via the config file.
355+
if let Ok(thread_str) = env::var("RUSTUP_CLOSE_THREADS") {
356+
if thread_str == "disabled" {
357+
Box::new(unpacker::Unpacker::new(notify_handler))
358+
} else {
359+
if let Ok(thread_count) = thread_str.parse::<usize>() {
360+
Box::new(threadedunpacker::Unpacker::new_with_threads(notify_handler, thread_count))
361+
} else {
362+
Box::new(threadedunpacker::Unpacker::new(notify_handler))
363+
}
364+
}
365+
} else {
366+
Box::new(threadedunpacker::Unpacker::new(notify_handler))
367+
}
368+
;
328369
let entries = archive
329370
.entries()
330371
.chain_err(|| ErrorKind::ExtractingPackage)?;

0 commit comments

Comments
 (0)