Skip to content

Commit 9511fef

Browse files
authored
Merge pull request #1876 from rbtcollins/bug-1866
Enable fully threaded IO for installs
2 parents 04039b0 + 35dd1ed commit 9511fef

File tree

11 files changed

+791
-277
lines changed

11 files changed

+791
-277
lines changed

Cargo.lock

+12
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ xz2 = "0.1.3"
5959
version = "0.5"
6060
default-features = false
6161

62+
[dependencies.rs_tracing]
63+
version = "1.0.1"
64+
features = ["rs_tracing"]
65+
6266
[target."cfg(windows)".dependencies]
6367
cc = "1"
6468
winreg = "0.6"

README.md

+19
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,7 @@ Command | Description
591591

592592
## Environment variables
593593

594+
594595
- `RUSTUP_HOME` (default: `~/.rustup` or `%USERPROFILE%/.rustup`)
595596
Sets the root rustup folder, used for storing installed
596597
toolchains and configuration options.
@@ -611,6 +612,21 @@ Command | Description
611612
- `RUSTUP_UPDATE_ROOT` (default `https://static.rust-lang.org/rustup`)
612613
Sets the root URL for downloading self-updates.
613614

615+
- `RUSTUP_IO_THREADS` *unstable* (defaults to reported cpu count). Sets the
616+
number of threads to perform close IO in. Set to `disabled` to force
617+
single-threaded IO for troubleshooting, or an arbitrary number to
618+
override automatic detection.
619+
620+
- `RUSTUP_TRACE_DIR` *unstable* (default: no tracing)
621+
Enables tracing and determines the directory that traces will be
622+
written too. Traces are of the form PID.trace. Traces can be read
623+
by the Catapult project [tracing viewer][tv].
624+
625+
[tv]: (https://github.com/catapult-project/catapult/blob/master/tracing/README.md)
626+
627+
- `RUSTUP_UNPACK_RAM` *unstable* (default 400M, min 100M)
628+
Caps the amount of RAM rustup will use for IO tasks while unpacking.
629+
614630
## Other installation methods
615631

616632
The primary installation method, as described at https://rustup.rs, differs by platform:
@@ -698,6 +714,9 @@ yet validate signatures of downloads.
698714

699715
[s]: https://github.com/rust-lang/rustup.rs/issues?q=is%3Aopen+is%3Aissue+label%3Asecurity
700716

717+
File modes on installation honor umask as of 1.18.4, use umask if
718+
very tight controls are desired.
719+
701720
## FAQ
702721

703722
### Is this an official Rust project?

src/cli/main.rs

+14
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,12 @@ mod term2;
3030

3131
use crate::errors::*;
3232
use rustup::env_var::RUST_RECURSION_COUNT_MAX;
33+
3334
use std::env;
3435
use std::path::PathBuf;
3536

37+
use rs_tracing::*;
38+
3639
fn main() {
3740
if let Err(ref e) = run_rustup() {
3841
common::report_error(e);
@@ -41,6 +44,17 @@ fn main() {
4144
}
4245

4346
fn run_rustup() -> Result<()> {
47+
if let Ok(dir) = env::var("RUSTUP_TRACE_DIR") {
48+
open_trace_file!(dir)?;
49+
}
50+
let result = run_rustup_inner();
51+
if let Ok(_) = env::var("RUSTUP_TRACE_DIR") {
52+
close_trace_file!();
53+
}
54+
result
55+
}
56+
57+
fn run_rustup_inner() -> Result<()> {
4458
// Guard against infinite proxy recursion. This mostly happens due to
4559
// bugs in rustup.
4660
do_recursion_guard()?;

src/diskio/immediate.rs

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/// Immediate IO model: performs IO in the current thread.
2+
///
3+
/// Use for diagnosing bugs or working around any unexpected issues with the
4+
/// threaded code paths.
5+
use super::{perform, Executor, Item};
6+
7+
pub struct ImmediateUnpacker {}
8+
impl ImmediateUnpacker {
9+
pub fn new<'a>() -> ImmediateUnpacker {
10+
ImmediateUnpacker {}
11+
}
12+
}
13+
14+
impl Executor for ImmediateUnpacker {
15+
fn dispatch(&mut self, mut item: Item) -> Box<dyn '_ + Iterator<Item = Item>> {
16+
perform(&mut item);
17+
Box::new(Some(item).into_iter())
18+
}
19+
20+
fn join(&mut self) -> Box<dyn Iterator<Item = Item>> {
21+
Box::new(None.into_iter())
22+
}
23+
24+
fn completed(&mut self) -> Box<dyn Iterator<Item = Item>> {
25+
Box::new(None.into_iter())
26+
}
27+
}

src/diskio/mod.rs

+216
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
/// Disk IO abstraction for rustup.
2+
///
3+
/// This exists to facilitate high performance extraction even though OS's are
4+
/// imperfect beasts. For detailed design notes see the module source.
5+
//
6+
// When performing IO we have a choice:
7+
// - perform some IO in this thread
8+
// - dispatch some or all IO to another thead
9+
// known tradeoffs:
10+
// NFS: network latency incurred on create, chmod, close calls
11+
// WSLv1: Defender latency incurred on close calls; mutex shared with create calls
12+
// Windows: Defender latency incurred on close calls
13+
// Unix: limited open file count
14+
// Defender : CPU limited, so more service points than cores brings no gain.
15+
// Some machines: IO limited, more service points than cores brings more efficient
16+
// Hello world footprint ~350MB, so around 400MB to install is considered ok.
17+
// IO utilisation.
18+
// All systems: dispatching to a thread has some overhead.
19+
// Basic idea then is a locally measured congestion control problem.
20+
// Underlying system has two
21+
// dimensions - how much work we have queued, and how much work we execute
22+
// at once. Queued work is both memory footprint, and unless each executor
23+
// is performing complex logic, potentially concurrent work.
24+
// Single core machines - thread anyway, they probably don't have SSDs?
25+
// How many service points? Blocking latency due to networks and disks
26+
// is independent of CPU: more threads will garner more throughput up
27+
// to actual resource service capapbility.
28+
// so:
29+
// a) measure time around each IO op from dispatch to completion.
30+
// b) create more threads than CPUs - 2x for now (because threadpool
31+
// doesn't allow creating dynamically), with very shallow stacks
32+
// (say 1MB)
33+
// c) keep adding work while the P95? P80? of completion stays the same
34+
// when pNN starts to increase either (i) we've saturated the system
35+
// or (ii) other work coming in has saturated the system or (iii) this
36+
// sort of work is a lot harder to complete. We use NN<100 to avoid
37+
// having jitter throttle us inappropriately. We use a high NN to
38+
// avoid making the system perform poorly for the user / other users
39+
// on shared components. Perhaps time-to-completion should be scaled by size.
40+
// d) if we have a lot of (iii) we should respond to it the same as (i), so
41+
// lets reduce this to (i) and (ii). Being unable to tell the difference
42+
// between load we created and anothers, we have to throttle back when
43+
// the system saturates. Our most throttled position will be one service
44+
// worker: dispatch IO, extract the next text, wait for IO completion,
45+
// repeat.
46+
// e) scaling up and down: TCP's lessons here are pretty good. So exponential
47+
// up - single thread and measure. two, 4 etc. When Pnn goes bad back off
48+
// for a time and then try again with linear increase (it could be case (ii)
49+
// - lots of room to experiment here; working with a time based approach is important
50+
// as that is the only way we can detect saturation: we are not facing
51+
// loss or errors in this model.
52+
// f) data gathering: record (name, bytes, start, duration)
53+
// write to disk afterwards as a csv file?
54+
pub mod immediate;
55+
pub mod threaded;
56+
57+
use crate::utils::notifications::Notification;
58+
59+
use std::env;
60+
use std::fs::OpenOptions;
61+
use std::io::{self, Write};
62+
use std::path::{Path, PathBuf};
63+
64+
use time::precise_time_s;
65+
66+
#[derive(Debug)]
67+
pub enum Kind {
68+
Directory,
69+
File(Vec<u8>),
70+
}
71+
72+
#[derive(Debug)]
73+
pub struct Item {
74+
/// The path to operate on
75+
pub full_path: PathBuf,
76+
/// The operation to perform
77+
pub kind: Kind,
78+
/// When the operation started
79+
pub start: f64,
80+
/// When the operation ended
81+
pub finish: f64,
82+
/// The length of the file, for files (for stats)
83+
pub size: Option<usize>,
84+
/// The result of the operation
85+
pub result: io::Result<()>,
86+
/// The mode to apply
87+
pub mode: u32,
88+
}
89+
90+
impl Item {
91+
pub fn make_dir(full_path: PathBuf, mode: u32) -> Self {
92+
Item {
93+
full_path,
94+
kind: Kind::Directory,
95+
start: 0.0,
96+
finish: 0.0,
97+
size: None,
98+
result: Ok(()),
99+
mode,
100+
}
101+
}
102+
103+
pub fn write_file(full_path: PathBuf, content: Vec<u8>, mode: u32) -> Self {
104+
let len = content.len();
105+
Item {
106+
full_path,
107+
kind: Kind::File(content),
108+
start: 0.0,
109+
finish: 0.0,
110+
size: Some(len),
111+
result: Ok(()),
112+
mode,
113+
}
114+
}
115+
}
116+
117+
/// Trait object for performing IO. At this point the overhead
118+
/// of trait invocation is not a bottleneck, but if it becomes
119+
/// one we could consider an enum variant based approach instead.
120+
pub trait Executor {
121+
/// Perform a single operation.
122+
/// During overload situations previously queued items may
123+
/// need to be completed before the item is accepted:
124+
/// consume the returned iterator.
125+
fn execute(&mut self, mut item: Item) -> Box<dyn '_ + Iterator<Item = Item>> {
126+
item.start = precise_time_s();
127+
self.dispatch(item)
128+
}
129+
130+
/// Actually dispatch a operation.
131+
/// This is called by the default execute() implementation and
132+
/// should not be called directly.
133+
fn dispatch(&mut self, item: Item) -> Box<dyn '_ + Iterator<Item = Item>>;
134+
135+
/// Wrap up any pending operations and iterate over them.
136+
/// All operations submitted before the join will have been
137+
/// returned either through ready/complete or join once join
138+
/// returns.
139+
fn join(&mut self) -> Box<dyn '_ + Iterator<Item = Item>>;
140+
141+
/// Iterate over completed items.
142+
fn completed(&mut self) -> Box<dyn '_ + Iterator<Item = Item>>;
143+
}
144+
145+
/// Trivial single threaded IO to be used from executors.
146+
/// (Crazy sophisticated ones can obviously ignore this)
147+
pub fn perform(item: &mut Item) {
148+
// directories: make them, TODO: register with the dir existence cache.
149+
// Files, write them.
150+
item.result = match item.kind {
151+
Kind::Directory => create_dir(&item.full_path),
152+
Kind::File(ref contents) => write_file(&item.full_path, &contents, item.mode),
153+
};
154+
item.finish = precise_time_s();
155+
}
156+
157+
#[allow(unused_variables)]
158+
pub fn write_file<P: AsRef<Path>, C: AsRef<[u8]>>(
159+
path: P,
160+
contents: C,
161+
mode: u32,
162+
) -> io::Result<()> {
163+
let mut opts = OpenOptions::new();
164+
#[cfg(unix)]
165+
{
166+
use std::os::unix::fs::OpenOptionsExt;
167+
opts.mode(mode);
168+
}
169+
let path = path.as_ref();
170+
let path_display = format!("{}", path.display());
171+
let mut f = {
172+
trace_scoped!("creat", "name": path_display);
173+
opts.write(true).create(true).truncate(true).open(path)?
174+
};
175+
let contents = contents.as_ref();
176+
let len = contents.len();
177+
{
178+
trace_scoped!("write", "name": path_display, "len": len);
179+
f.write_all(contents)?;
180+
}
181+
{
182+
trace_scoped!("close", "name:": path_display);
183+
drop(f);
184+
}
185+
Ok(())
186+
}
187+
188+
pub fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
189+
let path = path.as_ref();
190+
let path_display = format!("{}", path.display());
191+
trace_scoped!("create_dir", "name": path_display);
192+
std::fs::create_dir(path)
193+
}
194+
195+
/// Get the executor for disk IO.
196+
pub fn get_executor<'a>(
197+
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
198+
) -> Box<dyn Executor + 'a> {
199+
// If this gets lots of use, consider exposing via the config file.
200+
if let Ok(thread_str) = env::var("RUSTUP_IO_THREADS") {
201+
if thread_str == "disabled" {
202+
Box::new(immediate::ImmediateUnpacker::new())
203+
} else {
204+
if let Ok(thread_count) = thread_str.parse::<usize>() {
205+
Box::new(threaded::Threaded::new_with_threads(
206+
notify_handler,
207+
thread_count,
208+
))
209+
} else {
210+
Box::new(threaded::Threaded::new(notify_handler))
211+
}
212+
}
213+
} else {
214+
Box::new(threaded::Threaded::new(notify_handler))
215+
}
216+
}

0 commit comments

Comments
 (0)