diff --git a/NAMESPACE b/NAMESPACE index c2bbb1a0..adc5c55a 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -1,5 +1,6 @@ # Generated by roxygen2: do not edit by hand +export(TaskWorkflowManager) export(createOutputFile) export(deleteJob) export(deleteStorageContainer) diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index 4302b76e..2194a5be 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -331,7 +331,12 @@ setHttpTraffic <- function(value = FALSE) { if (!is.null(obj$options$azure$chunksize)) { chunkSize <- obj$options$azure$chunksize } - + + threads <- 1 + if (!is.null(obj$options$azure$threads)) { + threads <- obj$options$azure$threads + } + chunkSizeKeyValuePair <- list(name = "chunkSize", value = as.character(chunkSize)) @@ -626,7 +631,7 @@ setHttpTraffic <- function(value = FALSE) { ) mergeOutput <- append(obj$options$azure$outputFiles, mergeOutput) - BatchUtilitiesOperations$addTask( + task <- TaskWorkflowManager$createTask( jobId = id, taskId = taskId, rCommand = sprintf( @@ -644,12 +649,15 @@ setHttpTraffic <- function(value = FALSE) { maxRetryCount = maxTaskRetryCount ) - cat("\r", sprintf("Submitting tasks (%s/%s)", i, length(endIndices)), sep = "") + cat("\r", sprintf("Creating tasks (%s/%s)", i, length(endIndices)), sep = "") flush.console() - return(taskId) + return(task) }) + # Submit collection of tasks + TaskWorkflowManager$handleTaskCollection(id, tasks, threads) + if (enableCloudCombine) { cat("\nSubmitting merge task") taskDependencies <- list(taskIdRanges = list(list( @@ -722,7 +730,7 @@ setHttpTraffic <- function(value = FALSE) { numberOfFailedTasks <- sum(unlist(failTasks)) if (numberOfFailedTasks > 0 && autoDeleteJob == FALSE) { - .createErrorViewerPane(id, failTasks) + viewErrors(id, failTasks) } if (!identical(function(a, ...) c(a, list(...)), @@ -785,94 +793,3 @@ setHttpTraffic <- function(value = FALSE) { return(id) } } - -.createErrorViewerPane <- function(id, failTasks) { - config <- getConfiguration() - storageClient <- config$storageClient - - sasToken <- storageClient$generateSasToken("r", "c", id) - queryParameterUrl <- "?" - - for (query in names(sasToken)) { - queryParameterUrl <- - paste0(queryParameterUrl, - query, - "=", - RCurl::curlEscape(sasToken[[query]]), - "&") - } - - queryParameterUrl <- - substr(queryParameterUrl, 1, nchar(queryParameterUrl) - 1) - - tempDir <- tempfile() - dir.create(tempDir) - htmlFile <- file.path(tempDir, paste0(id, ".html")) - azureStorageUrl <- - paste0("http://", - storageCredentials$name, - sprintf(".blob.%s/", storageCredentials$endpointSuffix), - id) - - staticHtml <- "

Errors:

" - for (i in 1:length(failTasks)) { - if (failTasks[i] == 1) { - stdoutFile <- paste0(azureStorageUrl, "/", "stdout") - stderrFile <- paste0(azureStorageUrl, "/", "stderr") - rlogFile <- paste0(azureStorageUrl, "/", "logs") - - stdoutFile <- - paste0(stdoutFile, - "/", - id, - "-task", - i, - "-stdout.txt", - queryParameterUrl) - stderrFile <- - paste0(stderrFile, - "/", - id, - "-task", - i, - "-stderr.txt", - queryParameterUrl) - rlogFile <- - paste0(rlogFile, - "/", - id, - "-task", - i, - ".txt", - queryParameterUrl) - - staticHtml <- - paste0( - staticHtml, - "Task ", - i, - " | ", - "stdout.txt", - " |", - " ", - "stderr.txt", - " | ", - "R output", - "
" - ) - } - } - - write(staticHtml, htmlFile) - - viewer <- getOption("viewer") - if (!is.null(viewer)) { - viewer(htmlFile) - } -} diff --git a/R/task-manager.R b/R/task-manager.R new file mode 100644 index 00000000..dda7f33f --- /dev/null +++ b/R/task-manager.R @@ -0,0 +1,270 @@ +#' @export +TaskWorkflowManager <- R6::R6Class( + "TaskManager", + public = list( + initialize = function(){ + }, + originalTaskCollection = NULL, + tasksToAdd = NULL, + results = NULL, + failedTasks = NULL, + errors = NULL, + threads = 1, + maxTasksPerRequest = 100, + createTask = function(jobId, taskId, rCommand, ...) { + config <- getConfiguration() + storageClient <- config$storageClient + + args <- list(...) + .doAzureBatchGlobals <- args$envir + dependsOn <- args$dependsOn + argsList <- args$args + cloudCombine <- args$cloudCombine + userOutputFiles <- args$outputFiles + containerImage <- args$containerImage + resourceFiles <- args$resourceFiles + accountName <- storageClient$authentication$name + + if (!is.null(argsList)) { + envFile <- paste0(taskId, ".rds") + saveRDS(argsList, file = envFile) + storageClient$blobOperations$uploadBlob( + jobId, + file.path(getwd(), envFile) + ) + file.remove(envFile) + + readToken <- storageClient$generateSasToken("r", "c", jobId) + envFileUrl <- + rAzureBatch::createBlobUrl( + storageClient$authentication$name, + jobId, + envFile, + readToken, + config$endpointSuffix) + resourceFiles <- + list(rAzureBatch::createResourceFile(url = envFileUrl, fileName = envFile)) + } + + # Only use the download command if cloudCombine is enabled + # Otherwise just leave it empty + commands <- c() + + containerSettings <- list( + imageName = containerImage, + containerRunOptions = "--rm" + ) + + if (!is.null(cloudCombine)) { + assign("cloudCombine", cloudCombine, .doAzureBatchGlobals) + containerSettings$imageName <- "brianlovedocker/doazureparallel-merge-dockerfile:0.12.1" + + copyCommand <- sprintf( + "%s %s %s --download --saskey $BLOBXFER_SASKEY --remoteresource . --include results/*.rds --endpoint %s", + accountName, + jobId, + "$AZ_BATCH_TASK_WORKING_DIR", + config$endpointSuffix + ) + + commands <- c(paste("blobxfer", copyCommand)) + } + + exitConditions <- NULL + if (!is.null(args$dependsOn)) { + dependsOn <- args$dependsOn + } + else { + exitConditions <- list(default = list(dependencyAction = "satisfy")) + } + + containerUrl <- + rAzureBatch::createBlobUrl( + storageAccount = storageClient$authentication$name, + containerName = jobId, + sasToken = storageClient$generateSasToken("w", "c", jobId), + storageEndpointSuffix = config$endpointSuffix + ) + + outputFiles <- list( + list( + filePattern = paste0(taskId, ".txt"), + destination = list(container = list( + path = paste0("logs/", taskId, ".txt"), + containerUrl = containerUrl + )), + uploadOptions = list(uploadCondition = "taskCompletion") + ), + list( + filePattern = "../stdout.txt", + destination = list(container = list( + path = paste0("stdout/", taskId, "-stdout.txt"), + containerUrl = containerUrl + )), + uploadOptions = list(uploadCondition = "taskCompletion") + ), + list( + filePattern = "../stderr.txt", + destination = list(container = list( + path = paste0("stderr/", taskId, "-stderr.txt"), + containerUrl = containerUrl + )), + uploadOptions = list(uploadCondition = "taskCompletion") + ) + ) + + outputFiles <- append(outputFiles, userOutputFiles) + + commands <- + c(commands, + rCommand) + + commands <- linuxWrapCommands(commands) + + sasToken <- storageClient$generateSasToken("rwcl", "c", jobId) + queryParameterUrl <- "?" + + for (query in names(sasToken)) { + queryParameterUrl <- + paste0(queryParameterUrl, + query, + "=", + RCurl::curlEscape(sasToken[[query]]), + "&") + } + + queryParameterUrl <- + substr(queryParameterUrl, 1, nchar(queryParameterUrl) - 1) + + body <- list(id = taskId, + commandLine = commands, + userIdentity = list( + autoUser = list( + scope = "pool", + elevationLevel = "admin" + ) + ), + resourceFiles = resourceFiles, + dependsOn = dependsOn, + outputFiles = outputFiles, + constraints = list( + maxTaskRetryCount = 3 + ), + exitConditions = exitConditions, + containerSettings = containerSettings) + + body <- Filter(length, body) + + body + }, + handleTaskCollection = function( + jobId, + tasks, + threads = 1 + ){ + size <- length(tasks) + self$originalTaskCollection <- tasks + + self$tasksToAdd <- datastructures::queue() + self$tasksToAdd <- datastructures::insert(self$tasksToAdd, tasks) + + self$results <- datastructures::queue() + self$failedTasks <- datastructures::queue() + self$errors <- datastructures::queue() + + config <- getConfiguration() + batchClient <- config$batchClient + + tryCatch({ + while (datastructures::size(self$tasksToAdd) > 0 && + datastructures::size(self$errors) == 0) { + maxTasks <- self$maxTasksPerRequest + if (datastructures::size(self$tasksToAdd) < maxTasks) { + maxTasks <- datastructures::size(self$tasksToAdd) + } + + chunkTasksToAdd <- vector("list", maxTasks) + index <- 1 + + while (index <= maxTasks && + datastructures::size(self$tasksToAdd) > 0){ + chunkTasksToAdd[[index]]<- datastructures::pop(self$tasksToAdd) + index <- index + 1 + } + + report <- self$addBulkTasks( + jobId, + chunkTasksToAdd + ) + } + }, + error = function(e){ + + }) + }, + addBulkTasks = function( + jobId, + chunkTasksToAdd + ){ + config <- getConfiguration() + batchClient <- config$batchClient + + response <- batchClient$taskOperations$addCollection( + jobId, + list(value = chunkTasksToAdd), + content = "response" + ) + + # In case of a chunk exceeding the MaxMessageSize split chunk in half + # and resubmit smaller chunk requests + if (response$status_code == 413) { + if(length(chunkTasksToAdd) == 1){ + self$errors$push(response) + + stop("Failed to add task with ID %s due to the body" + + " exceeding the maximum request size" + chunkTasksToAdd[[1]]$id) + } + + upperBound <- length(chunkTasksToAdd) + midBound <- upperBound / 2 + + + self$addBulkTasks( + jobId, + chunkTasksToAdd[1:midBound]) + + self$addBulkTasks( + jobId, + chunkTasksToAdd[(midBound+1):upperBound]) + } + else if (500 <= response$status_code && + response$status_code <= 599) { + self$tasksToAdd <- datastructures::insert(self$tasksToAdd, chunkTasksToAdd) + } + else if (response$status_code == 200){ + values <- httr::content(response)$value + + for (i in 1:length(values)) { + taskId <- values[[i]]$id + + if (compare(values[[i]]$status, "servererror")) { + self$tasksToAdd <- datastructures::insert(self$tasksToAdd, self$originalTaskCollection[[taskId]]) + } + else if (compare(values[[i]]$status, "clienterror") && + values[[i]]$error$code != "TaskExists") { + self$failedTasks <- datastructures::insert(self$failedTasks, values[[i]]) + } + else { + self$results <- datastructures::insert(self$results, values[[i]]) + } + } + } + else { + self$tasksToAdd <- datastructures::insert(self$tasksToAdd, chunkTasksToAdd) + self$errors <- datastructures::insert(self$errors, response) + } + } + ) +) + +TaskWorkflowManager <- TaskWorkflowManager$new() diff --git a/R/utility-job.R b/R/utility-job.R index dfe0398d..e2e41548 100644 --- a/R/utility-job.R +++ b/R/utility-job.R @@ -499,9 +499,7 @@ waitForTasksToComplete <- for (i in 1:length(failedTasks$value)) { if (!is.null(failedTasks$value[[i]]$executionInfo$result) && - grepl(failedTasks$value[[i]]$executionInfo$result, - "failure", - ignore.case = TRUE)) { + compare(failedTasks$value[[i]]$executionInfo$result, "failure")) { tasksFailureWarningLabel <- paste0(tasksFailureWarningLabel, sprintf("%s\n", failedTasks$value[[i]]$id)) @@ -559,7 +557,7 @@ waitForTasksToComplete <- next } - if (grepl(mergeTask$executionInfo$result, "success", ignore.case = TRUE)) { + if (compare(mergeTask$executionInfo$result, "success")) { cat(" Completed.") break } diff --git a/R/utility-string.R b/R/utility-string.R index 95fda84a..09e86992 100644 --- a/R/utility-string.R +++ b/R/utility-string.R @@ -115,3 +115,7 @@ printCluster <- function(cluster, resourceFiles = list()) { } cat(strrep('=', options("width")), fill = TRUE) } + +compare <- function(a, b) { + return(grepl(a, b, ignore.case = TRUE)) +} diff --git a/R/utility.R b/R/utility.R index 0bf689d4..7f0b63ad 100644 --- a/R/utility.R +++ b/R/utility.R @@ -304,3 +304,95 @@ getHttpErrorMessage <- function(responseObj) { detailMessage <- paste0(detailMessage, "\r\nodata.metadata: ", responseObj$odata.metadata) return(detailMessage) } + +viewErrors <- function(id, failTasks) { + config <- getConfiguration() + storageClient <- config$storageClient + + sasToken <- storageClient$generateSasToken("r", "c", id) + queryParameterUrl <- "?" + + for (query in names(sasToken)) { + queryParameterUrl <- + paste0(queryParameterUrl, + query, + "=", + RCurl::curlEscape(sasToken[[query]]), + "&") + } + + queryParameterUrl <- + substr(queryParameterUrl, 1, nchar(queryParameterUrl) - 1) + + tempDir <- tempfile() + dir.create(tempDir) + htmlFile <- file.path(tempDir, paste0(id, ".html")) + azureStorageUrl <- + paste0("http://", + storageCredentials$name, + sprintf(".blob.%s/", storageCredentials$endpointSuffix), + id) + + staticHtml <- "

Errors:

" + for (i in 1:length(failTasks)) { + if (failTasks[i] == 1) { + stdoutFile <- paste0(azureStorageUrl, "/", "stdout") + stderrFile <- paste0(azureStorageUrl, "/", "stderr") + rlogFile <- paste0(azureStorageUrl, "/", "logs") + + stdoutFile <- + paste0(stdoutFile, + "/", + id, + "-task", + i, + "-stdout.txt", + queryParameterUrl) + stderrFile <- + paste0(stderrFile, + "/", + id, + "-task", + i, + "-stderr.txt", + queryParameterUrl) + rlogFile <- + paste0(rlogFile, + "/", + id, + "-task", + i, + ".txt", + queryParameterUrl) + + staticHtml <- + paste0( + staticHtml, + "Task ", + i, + " | ", + "stdout.txt", + " |", + " ", + "stderr.txt", + " | ", + "R output", + "
" + ) + } + } + + write(staticHtml, htmlFile) + + viewer <- getOption("viewer") + if (!is.null(viewer)) { + viewer(htmlFile) + } +} + diff --git a/inst/startup/install_custom.R b/inst/startup/install_custom.R index 7d39d60d..02b732ff 100644 --- a/inst/startup/install_custom.R +++ b/inst/startup/install_custom.R @@ -18,6 +18,13 @@ if (length(args) > 1) { } } +if (length(args) < 1) { + stop("Given arguments were not passed,", + "install_custom.R file_share_directory pattern") +} + +directory <- args[1] + devtoolsPackage <- "devtools" if (!require(devtoolsPackage, character.only = TRUE)) { install.packages(devtoolsPackage) @@ -25,7 +32,8 @@ if (!require(devtoolsPackage, character.only = TRUE)) { } packageDirs <- list.files( - path = tempDir, + path = directory, + pattern = pattern, full.names = TRUE, recursive = FALSE)