Skip to content

Commit 0ddcf1c

Browse files
committed
Move unpacking into rustup entirely.
Generalises the threaded IO closing to be fully threaded disk IO.
1 parent 6e6f44e commit 0ddcf1c

File tree

6 files changed

+578
-152
lines changed

6 files changed

+578
-152
lines changed

src/diskio/immediate.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/// Immediate IO model: performs IO in the current thread.
2+
///
3+
/// Use for diagnosing bugs or working around any unexpected issues with the
4+
/// threaded code paths.
5+
use super::{perform, Executor, Item};
6+
7+
use std::cell::Cell;
8+
9+
pub struct ImmediateUnpacker {}
10+
impl ImmediateUnpacker {
11+
pub fn new<'a>() -> ImmediateUnpacker {
12+
ImmediateUnpacker {}
13+
}
14+
}
15+
16+
enum IterateOne {
17+
Item(Item),
18+
None,
19+
}
20+
21+
impl Default for IterateOne {
22+
fn default() -> Self {
23+
IterateOne::None
24+
}
25+
}
26+
27+
struct ImmediateIterator(Cell<IterateOne>);
28+
29+
impl Iterator for ImmediateIterator {
30+
type Item = Item;
31+
fn next(&mut self) -> Option<Item> {
32+
match self.0.take() {
33+
IterateOne::Item(item) => Some(item),
34+
IterateOne::None => None,
35+
}
36+
}
37+
}
38+
39+
impl Executor for ImmediateUnpacker {
40+
fn dispatch(&mut self, mut item: Item) -> Box<dyn '_ + Iterator<Item = Item>> {
41+
perform(&mut item);
42+
Box::new(ImmediateIterator(Cell::new(IterateOne::Item(item))))
43+
}
44+
45+
fn join(&mut self) -> Option<Box<dyn Iterator<Item = Item>>> {
46+
None
47+
}
48+
49+
fn completed(&mut self) -> Option<Box<dyn Iterator<Item = Item>>> {
50+
None
51+
}
52+
}

src/diskio/mod.rs

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/// Disk IO abstraction for rustup.
2+
///
3+
/// This exists to facilitate high performance extraction even though OS's are
4+
/// imperfect beasts. For detailed design notes see the module source.
5+
//
6+
// When performing IO we have a choice:
7+
// - perform some IO in this thread
8+
// - dispatch some or all IO to another thead
9+
// known tradeoffs:
10+
// NFS: network latency incurred on create, chmod, close calls
11+
// WSLv1: Defender latency incurred on close calls; mutex shared with create calls
12+
// Windows: Defender latency incurred on close calls
13+
// Unix: limited open file count
14+
// Defender : CPU limited, so more service points than cores brings no gain.
15+
// Some machines: IO limited, more service points than cores brings more efficient
16+
// Hello world footprint ~350MB, so around 400MB to install is considered ok.
17+
// IO utilisation.
18+
// All systems: dispatching to a thread has some overhead.
19+
// Basic idea then is a locally measured congestion control problem.
20+
// Underlying system has two
21+
// dimensions - how much work we have queued, and how much work we execute
22+
// at once. Queued work is both memory footprint, and unless each executor
23+
// is performing complex logic, potentially concurrent work.
24+
// Single core machines - thread anyway, they probably don't have SSDs?
25+
// How many service points? Blocking latency due to networks and disks
26+
// is independent of CPU: more threads will garner more throughput up
27+
// to actual resource service capapbility.
28+
// so:
29+
// a) measure time around each IO op from dispatch to completion.
30+
// b) create more threads than CPUs - 2x for now (because threadpool
31+
// doesn't allow creating dynamically), with very shallow stacks
32+
// (say 1MB)
33+
// c) keep adding work while the P95? P80? of completion stays the same
34+
// when pNN starts to increase either (i) we've saturated the system
35+
// or (ii) other work coming in has saturated the system or (iii) this
36+
// sort of work is a lot harder to complete. We use NN<100 to avoid
37+
// having jitter throttle us inappropriately. We use a high NN to
38+
// avoid making the system perform poorly for the user / other users
39+
// on shared components. Perhaps time-to-completion should be scaled by size.
40+
// d) if we have a lot of (iii) we should respond to it the same as (i), so
41+
// lets reduce this to (i) and (ii). Being unable to tell the difference
42+
// between load we created and anothers, we have to throttle back when
43+
// the system saturates. Our most throttled position will be one service
44+
// worker: dispatch IO, extract the next text, wait for IO completion,
45+
// repeat.
46+
// e) scaling up and down: TCP's lessons here are pretty good. So exponential
47+
// up - single thread and measure. two, 4 etc. When Pnn goes bad back off
48+
// for a time and then try again with linear increase (it could be case (ii)
49+
// - lots of room to experiment here; working with a time based approach is important
50+
// as that is the only way we can detect saturation: we are not facing
51+
// loss or errors in this model.
52+
// f) data gathering: record (name, bytes, start, duration)
53+
// write to disk afterwards as a csv file?
54+
pub mod immediate;
55+
pub mod threaded;
56+
57+
use crate::utils::notifications::Notification;
58+
59+
use std::env;
60+
use std::fs::OpenOptions;
61+
use std::io::{self, Write};
62+
use std::path::{Path, PathBuf};
63+
64+
use time::precise_time_s;
65+
66+
#[derive(Debug)]
67+
pub enum Kind {
68+
Directory,
69+
File(Vec<u8>),
70+
}
71+
72+
#[derive(Debug)]
73+
pub struct Item {
74+
/// The path to operate on
75+
pub full_path: PathBuf,
76+
/// The operation to perform
77+
pub kind: Kind,
78+
/// When the operation started
79+
pub start: f64,
80+
/// When the operation ended
81+
pub finish: f64,
82+
/// The length of the file, for files (for stats)
83+
pub size: Option<usize>,
84+
/// The result of the operation
85+
pub result: io::Result<()>,
86+
/// The mode to apply
87+
pub mode: u32,
88+
}
89+
90+
impl Item {
91+
pub fn make_dir(full_path: PathBuf, mode: u32) -> Self {
92+
Item {
93+
full_path,
94+
kind: Kind::Directory,
95+
start: 0.0,
96+
finish: 0.0,
97+
size: None,
98+
result: Ok(()),
99+
mode,
100+
}
101+
}
102+
103+
pub fn write_file(full_path: PathBuf, content: Vec<u8>, mode: u32) -> Self {
104+
let len = content.len();
105+
Item {
106+
full_path,
107+
kind: Kind::File(content),
108+
start: 0.0,
109+
finish: 0.0,
110+
size: Some(len),
111+
result: Ok(()),
112+
mode,
113+
}
114+
}
115+
}
116+
117+
/// Trait object for performing IO. At this point the overhead
118+
/// of trait invocation is not a bottleneck, but if it becomes
119+
/// one we could consider an enum variant based approach instead.
120+
pub trait Executor {
121+
/// Perform a single operation.
122+
/// During overload situations previously queued items may
123+
/// need to be completed before the item is accepted:
124+
/// consume the returned iterator.
125+
fn execute(&mut self, mut item: Item) -> Box<dyn '_ + Iterator<Item = Item>> {
126+
item.start = precise_time_s();
127+
self.dispatch(item)
128+
}
129+
130+
/// Actually dispatch a operation.
131+
/// This is called by the default execute() implementation and
132+
/// should not be called directly.
133+
fn dispatch(&mut self, item: Item) -> Box<dyn '_ + Iterator<Item = Item>>;
134+
135+
/// Wrap up any pending operations and iterate over them.
136+
/// All operations submitted before the join will have been
137+
/// returned either through ready/complete or join once join
138+
/// returns.
139+
fn join(&mut self) -> Option<Box<dyn '_ + Iterator<Item = Item>>>;
140+
141+
/// Iterate over completed items.
142+
fn completed(&mut self) -> Option<Box<dyn '_ + Iterator<Item = Item>>>;
143+
}
144+
145+
/// Trivial single threaded IO to be used from executors.
146+
/// (Crazy sophisticated ones can obviously ignore this)
147+
pub fn perform(item: &mut Item) {
148+
// directories: make them, TODO: register with the dir existence cache.
149+
// Files, write them.
150+
item.result = match item.kind {
151+
Kind::Directory => create_dir(&item.full_path),
152+
Kind::File(ref contents) => write_file(&item.full_path, &contents, item.mode),
153+
};
154+
item.finish = precise_time_s();
155+
}
156+
157+
#[allow(unused_variables)]
158+
pub fn write_file<P: AsRef<Path>, C: AsRef<[u8]>>(
159+
path: P,
160+
contents: C,
161+
mode: u32,
162+
) -> io::Result<()> {
163+
let mut opts = OpenOptions::new();
164+
#[cfg(unix)]
165+
{
166+
use std::os::unix::fs::OpenOptionsExt;
167+
opts.mode(mode);
168+
}
169+
opts.write(true)
170+
.create(true)
171+
.truncate(true)
172+
.open(path.as_ref())?
173+
.write_all(contents.as_ref())
174+
}
175+
176+
pub fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
177+
std::fs::create_dir(path.as_ref())
178+
}
179+
180+
/// Get the executor for disk IO.
181+
pub fn get_executor<'a>(
182+
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
183+
) -> Box<dyn Executor + 'a> {
184+
// If this gets lots of use, consider exposing via the config file.
185+
if let Ok(thread_str) = env::var("RUSTUP_IO_THREADS") {
186+
if thread_str == "disabled" {
187+
Box::new(immediate::ImmediateUnpacker::new())
188+
} else {
189+
if let Ok(thread_count) = thread_str.parse::<usize>() {
190+
Box::new(threaded::Threaded::new_with_threads(
191+
notify_handler,
192+
thread_count,
193+
))
194+
} else {
195+
Box::new(threaded::Threaded::new(notify_handler))
196+
}
197+
}
198+
} else {
199+
Box::new(threaded::Threaded::new(notify_handler))
200+
}
201+
}

0 commit comments

Comments
 (0)