Skip to content

Commit 84433b6

Browse files
committed
Thread dir creation as well
When directories complete, start writing the files/dirs within that directory that have been decompressed already. Avoids a stat() + create_dir_all() in the main thread permitting more concurrent IO dispatch in exchange for memory pressure.
1 parent b29c436 commit 84433b6

File tree

4 files changed

+131
-58
lines changed

4 files changed

+131
-58
lines changed

src/diskio/immediate.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@ impl Executor for ImmediateUnpacker {
4242
Box::new(ImmediateIterator(Cell::new(IterateOne::Item(item))))
4343
}
4444

45-
fn join(&mut self) -> Option<Box<dyn Iterator<Item = Item>>> {
46-
None
45+
fn join(&mut self) -> Box<dyn Iterator<Item = Item>> {
46+
Box::new(ImmediateIterator(Cell::new(IterateOne::None)))
4747
}
4848

49-
fn completed(&mut self) -> Option<Box<dyn Iterator<Item = Item>>> {
50-
None
49+
fn completed(&mut self) -> Box<dyn Iterator<Item = Item>> {
50+
Box::new(ImmediateIterator(Cell::new(IterateOne::None)))
5151
}
5252
}

src/diskio/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,10 @@ pub trait Executor {
136136
/// All operations submitted before the join will have been
137137
/// returned either through ready/complete or join once join
138138
/// returns.
139-
fn join(&mut self) -> Option<Box<dyn '_ + Iterator<Item = Item>>>;
139+
fn join(&mut self) -> Box<dyn '_ + Iterator<Item = Item>>;
140140

141141
/// Iterate over completed items.
142-
fn completed(&mut self) -> Option<Box<dyn '_ + Iterator<Item = Item>>>;
142+
fn completed(&mut self) -> Box<dyn '_ + Iterator<Item = Item>>;
143143
}
144144

145145
/// Trivial single threaded IO to be used from executors.

src/diskio/threaded.rs

+7-9
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ impl<'a> Executor for Threaded<'a> {
9898
})
9999
}
100100

101-
fn join(&mut self) -> Option<Box<dyn '_ + Iterator<Item = Item>>> {
101+
fn join(&mut self) -> Box<dyn '_ + Iterator<Item = Item>> {
102102
// Some explanation is in order. Even though the tar we are reading from (if
103103
// any) will have had its FileWithProgress download tracking
104104
// completed before we hit drop, that is not true if we are unwinding due to a
@@ -157,26 +157,24 @@ impl<'a> Executor for Threaded<'a> {
157157
self.tx
158158
.send(Task::Sentinel)
159159
.expect("must still be listening");
160-
Some(Box::new(JoinIterator {
160+
Box::new(JoinIterator {
161161
iter: self.rx.iter(),
162162
consume_sentinel: false,
163-
}))
163+
})
164164
}
165165

166-
fn completed(&mut self) -> Option<Box<dyn '_ + Iterator<Item = Item>>> {
167-
Some(Box::new(JoinIterator {
166+
fn completed(&mut self) -> Box<dyn '_ + Iterator<Item = Item>> {
167+
Box::new(JoinIterator {
168168
iter: self.rx.try_iter(),
169169
consume_sentinel: true,
170-
}))
170+
})
171171
}
172172
}
173173

174174
impl<'a> Drop for Threaded<'a> {
175175
fn drop(&mut self) {
176176
// We are not permitted to fail - consume but do not handle the items.
177-
if let Some(iter) = self.join() {
178-
for _ in iter {}
179-
}
177+
self.join().for_each(drop);
180178
}
181179
}
182180

src/dist/component/package.rs

+118-43
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ use crate::errors::*;
1010
use crate::utils::notifications::Notification;
1111
use crate::utils::utils;
1212

13-
use std::collections::HashSet;
13+
use std::collections::{HashMap, HashSet};
1414
use std::fmt;
1515
use std::io::{self, ErrorKind as IOErrorKind, Read};
16+
use std::iter::FromIterator;
17+
use std::mem;
1618
use std::path::{Path, PathBuf};
1719

1820
use tar::EntryType;
@@ -155,14 +157,17 @@ impl<'a> TarPackage<'a> {
155157
}
156158
}
157159

158-
// Handle the async result of io operations
159-
fn filter_result(op: Item) -> io::Result<()> {
160-
match op.result {
160+
/// Handle the async result of io operations
161+
/// Replaces op.result with Ok(())
162+
fn filter_result(op: &mut Item) -> io::Result<()> {
163+
let result = mem::replace(&mut op.result, Ok(()));
164+
match result {
161165
Ok(_) => Ok(()),
162166
Err(e) => match e.kind() {
163-
// TODO: the IO execution logic should pass this back rather than
164-
// being the code to ignore it.
165167
IOErrorKind::AlreadyExists => {
168+
// mkdir of e.g. ~/.rustup already existing is just fine;
169+
// for others it would be better to know whether it is
170+
// expected to exist or not -so put a flag in the state.
166171
if let Kind::Directory = op.kind {
167172
Ok(())
168173
} else {
@@ -174,6 +179,47 @@ fn filter_result(op: Item) -> io::Result<()> {
174179
}
175180
}
176181

182+
/// Dequeue the children of directories queued up waiting for the directory to
183+
/// be created.
184+
///
185+
/// Currently the volume of queued items does not count as backpressure against
186+
/// the main tar extraction process.
187+
fn trigger_children(
188+
io_executor: &mut dyn Executor,
189+
directories: &mut HashMap<PathBuf, DirStatus>,
190+
item: Item,
191+
) -> Result<usize> {
192+
let mut result = 0;
193+
if let Kind::Directory = item.kind {
194+
let mut pending = Vec::new();
195+
directories
196+
.entry(item.full_path)
197+
.and_modify(|status| match status {
198+
DirStatus::Exists => unreachable!(),
199+
DirStatus::Pending(pending_inner) => {
200+
pending.append(pending_inner);
201+
*status = DirStatus::Exists;
202+
}
203+
})
204+
.or_insert_with(|| unreachable!());
205+
result += pending.len();
206+
for pending_item in pending.into_iter() {
207+
for mut item in Vec::from_iter(io_executor.execute(pending_item)) {
208+
// TODO capture metrics
209+
filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?;
210+
result += trigger_children(io_executor, directories, item)?;
211+
}
212+
}
213+
};
214+
Ok(result)
215+
}
216+
217+
/// What is the status of this directory ?
218+
enum DirStatus {
219+
Exists,
220+
Pending(Vec<Item>),
221+
}
222+
177223
fn unpack_without_first_dir<'a, R: Read>(
178224
archive: &mut tar::Archive<R>,
179225
path: &Path,
@@ -183,9 +229,20 @@ fn unpack_without_first_dir<'a, R: Read>(
183229
let entries = archive
184230
.entries()
185231
.chain_err(|| ErrorKind::ExtractingPackage)?;
186-
let mut checked_parents: HashSet<PathBuf> = HashSet::new();
232+
let mut directories: HashMap<PathBuf, DirStatus> = HashMap::new();
233+
// Path is presumed to exist. Call it a precondition.
234+
directories.insert(path.to_owned(), DirStatus::Exists);
235+
236+
'entries: for entry in entries {
237+
// drain completed results to keep memory pressure low and respond
238+
// rapidly to completed events even if we couldn't submit work (because
239+
// our unpacked item is pending dequeue)
240+
for mut item in Vec::from_iter(io_executor.completed()) {
241+
// TODO capture metrics
242+
filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?;
243+
trigger_children(&mut *io_executor, &mut directories, item)?;
244+
}
187245

188-
for entry in entries {
189246
let mut entry = entry.chain_err(|| ErrorKind::ExtractingPackage)?;
190247
let relpath = {
191248
let path = entry.path();
@@ -200,9 +257,13 @@ fn unpack_without_first_dir<'a, R: Read>(
200257
}
201258
}
202259
let mut components = relpath.components();
203-
// Throw away the first path component: we make our own root
260+
// Throw away the first path component: our root was supplied.
204261
components.next();
205262
let full_path = path.join(&components.as_path());
263+
if full_path == path {
264+
// The tmp dir code makes the root dir for us.
265+
continue;
266+
}
206267

207268
let size = entry.header().size()?;
208269
if size > 100_000_000 {
@@ -236,8 +297,11 @@ fn unpack_without_first_dir<'a, R: Read>(
236297
let o_mode = g_mode >> 3;
237298
let mode = u_mode | g_mode | o_mode;
238299

239-
let item = match kind {
240-
EntryType::Directory => Item::make_dir(full_path, mode),
300+
let mut item = match kind {
301+
EntryType::Directory => {
302+
directories.insert(full_path.to_owned(), DirStatus::Pending(Vec::new()));
303+
Item::make_dir(full_path, mode)
304+
}
241305
EntryType::Regular => {
242306
let mut v = Vec::with_capacity(size as usize);
243307
entry.read_to_end(&mut v)?;
@@ -246,45 +310,56 @@ fn unpack_without_first_dir<'a, R: Read>(
246310
_ => return Err(ErrorKind::UnsupportedKind(format!("{:?}", kind)).into()),
247311
};
248312

249-
// FUTURE: parallelise or delete (surely all distribution tars are well formed in this regard).
250-
// Create the full path to the entry if it does not exist already
251-
if let Some(parent) = item.full_path.parent() {
252-
if !checked_parents.contains(parent) {
253-
checked_parents.insert(parent.to_owned());
254-
// It would be nice to optimise this stat out, but the tar could be like so:
255-
// a/deep/file.txt
256-
// a/file.txt
257-
// which would require tracking the segments rather than a simple hash.
258-
// Until profile shows that one stat per dir is a problem (vs one stat per file)
259-
// leave till later.
260-
261-
if !parent.exists() {
262-
let path_display = format!("{}", parent.display());
263-
trace_scoped!("create_dir_all", "name": path_display);
264-
std::fs::create_dir_all(&parent).chain_err(|| ErrorKind::ExtractingPackage)?
313+
let item = loop {
314+
// Create the full path to the entry if it does not exist already
315+
if let Some(parent) = item.full_path.to_owned().parent() {
316+
match directories.get_mut(parent) {
317+
None => {
318+
// Tar has item before containing directory
319+
// Complain about this so we can see if these exist.
320+
eprintln!(
321+
"Unexpected: missing parent '{}' for '{}'",
322+
parent.display(),
323+
entry.path()?.display()
324+
);
325+
directories.insert(parent.to_owned(), DirStatus::Pending(vec![item]));
326+
item = Item::make_dir(parent.to_owned(), 0o755);
327+
// Check the parent's parent
328+
continue;
329+
}
330+
Some(DirStatus::Exists) => {
331+
break item;
332+
}
333+
Some(DirStatus::Pending(pending)) => {
334+
// Parent dir is being made, take next item from tar
335+
pending.push(item);
336+
continue 'entries;
337+
}
265338
}
339+
} else {
340+
unreachable!();
266341
}
267-
}
342+
};
268343

269-
for item in io_executor.execute(item) {
270-
// TODO capture metrics, add directories to created cache
271-
filter_result(item).chain_err(|| ErrorKind::ExtractingPackage)?;
272-
}
273-
274-
// drain completed results to keep memory pressure low
275-
if let Some(iter) = io_executor.completed() {
276-
for prev_item in iter {
277-
// TODO capture metrics, add directories to created cache
278-
filter_result(prev_item).chain_err(|| ErrorKind::ExtractingPackage)?;
279-
}
344+
for mut item in Vec::from_iter(io_executor.execute(item)) {
345+
// TODO capture metrics
346+
filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?;
347+
trigger_children(&mut *io_executor, &mut directories, item)?;
280348
}
281349
}
282350

283-
if let Some(iter) = io_executor.join() {
284-
for item in iter {
351+
loop {
352+
let mut triggered = 0;
353+
for mut item in Vec::from_iter(io_executor.join()) {
285354
// handle final IOs
286-
// TODO capture metrics, add directories to created cache
287-
filter_result(item).chain_err(|| ErrorKind::ExtractingPackage)?;
355+
// TODO capture metrics
356+
filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?;
357+
triggered += trigger_children(&mut *io_executor, &mut directories, item)?;
358+
}
359+
if triggered == 0 {
360+
// None of the IO submitted before the prior join triggered any new
361+
// submissions
362+
break;
288363
}
289364
}
290365

0 commit comments

Comments
 (0)