diff --git a/NAMESPACE b/NAMESPACE
index c2bbb1a0..adc5c55a 100644
--- a/NAMESPACE
+++ b/NAMESPACE
@@ -1,5 +1,6 @@
# Generated by roxygen2: do not edit by hand
+export(TaskWorkflowManager)
export(createOutputFile)
export(deleteJob)
export(deleteStorageContainer)
diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R
index 4302b76e..2194a5be 100644
--- a/R/doAzureParallel.R
+++ b/R/doAzureParallel.R
@@ -331,7 +331,12 @@ setHttpTraffic <- function(value = FALSE) {
if (!is.null(obj$options$azure$chunksize)) {
chunkSize <- obj$options$azure$chunksize
}
-
+
+ threads <- 1
+ if (!is.null(obj$options$azure$threads)) {
+ threads <- obj$options$azure$threads
+ }
+
chunkSizeKeyValuePair <-
list(name = "chunkSize", value = as.character(chunkSize))
@@ -626,7 +631,7 @@ setHttpTraffic <- function(value = FALSE) {
)
mergeOutput <- append(obj$options$azure$outputFiles, mergeOutput)
- BatchUtilitiesOperations$addTask(
+ task <- TaskWorkflowManager$createTask(
jobId = id,
taskId = taskId,
rCommand = sprintf(
@@ -644,12 +649,15 @@ setHttpTraffic <- function(value = FALSE) {
maxRetryCount = maxTaskRetryCount
)
- cat("\r", sprintf("Submitting tasks (%s/%s)", i, length(endIndices)), sep = "")
+ cat("\r", sprintf("Creating tasks (%s/%s)", i, length(endIndices)), sep = "")
flush.console()
- return(taskId)
+ return(task)
})
+ # Submit collection of tasks
+ TaskWorkflowManager$handleTaskCollection(id, tasks, threads)
+
if (enableCloudCombine) {
cat("\nSubmitting merge task")
taskDependencies <- list(taskIdRanges = list(list(
@@ -722,7 +730,7 @@ setHttpTraffic <- function(value = FALSE) {
numberOfFailedTasks <- sum(unlist(failTasks))
if (numberOfFailedTasks > 0 && autoDeleteJob == FALSE) {
- .createErrorViewerPane(id, failTasks)
+ viewErrors(id, failTasks)
}
if (!identical(function(a, ...) c(a, list(...)),
@@ -785,94 +793,3 @@ setHttpTraffic <- function(value = FALSE) {
return(id)
}
}
-
-.createErrorViewerPane <- function(id, failTasks) {
- config <- getConfiguration()
- storageClient <- config$storageClient
-
- sasToken <- storageClient$generateSasToken("r", "c", id)
- queryParameterUrl <- "?"
-
- for (query in names(sasToken)) {
- queryParameterUrl <-
- paste0(queryParameterUrl,
- query,
- "=",
- RCurl::curlEscape(sasToken[[query]]),
- "&")
- }
-
- queryParameterUrl <-
- substr(queryParameterUrl, 1, nchar(queryParameterUrl) - 1)
-
- tempDir <- tempfile()
- dir.create(tempDir)
- htmlFile <- file.path(tempDir, paste0(id, ".html"))
- azureStorageUrl <-
- paste0("http://",
- storageCredentials$name,
- sprintf(".blob.%s/", storageCredentials$endpointSuffix),
- id)
-
- staticHtml <- "
Errors:
"
- for (i in 1:length(failTasks)) {
- if (failTasks[i] == 1) {
- stdoutFile <- paste0(azureStorageUrl, "/", "stdout")
- stderrFile <- paste0(azureStorageUrl, "/", "stderr")
- rlogFile <- paste0(azureStorageUrl, "/", "logs")
-
- stdoutFile <-
- paste0(stdoutFile,
- "/",
- id,
- "-task",
- i,
- "-stdout.txt",
- queryParameterUrl)
- stderrFile <-
- paste0(stderrFile,
- "/",
- id,
- "-task",
- i,
- "-stderr.txt",
- queryParameterUrl)
- rlogFile <-
- paste0(rlogFile,
- "/",
- id,
- "-task",
- i,
- ".txt",
- queryParameterUrl)
-
- staticHtml <-
- paste0(
- staticHtml,
- "Task ",
- i,
- " | ",
- "stdout.txt",
- " |",
- " ",
- "stderr.txt",
- " | ",
- "R output",
- "
"
- )
- }
- }
-
- write(staticHtml, htmlFile)
-
- viewer <- getOption("viewer")
- if (!is.null(viewer)) {
- viewer(htmlFile)
- }
-}
diff --git a/R/task-manager.R b/R/task-manager.R
new file mode 100644
index 00000000..dda7f33f
--- /dev/null
+++ b/R/task-manager.R
@@ -0,0 +1,270 @@
+#' @export
+TaskWorkflowManager <- R6::R6Class(
+ "TaskManager",
+ public = list(
+ initialize = function(){
+ },
+ originalTaskCollection = NULL,
+ tasksToAdd = NULL,
+ results = NULL,
+ failedTasks = NULL,
+ errors = NULL,
+ threads = 1,
+ maxTasksPerRequest = 100,
+ createTask = function(jobId, taskId, rCommand, ...) {
+ config <- getConfiguration()
+ storageClient <- config$storageClient
+
+ args <- list(...)
+ .doAzureBatchGlobals <- args$envir
+ dependsOn <- args$dependsOn
+ argsList <- args$args
+ cloudCombine <- args$cloudCombine
+ userOutputFiles <- args$outputFiles
+ containerImage <- args$containerImage
+ resourceFiles <- args$resourceFiles
+ accountName <- storageClient$authentication$name
+
+ if (!is.null(argsList)) {
+ envFile <- paste0(taskId, ".rds")
+ saveRDS(argsList, file = envFile)
+ storageClient$blobOperations$uploadBlob(
+ jobId,
+ file.path(getwd(), envFile)
+ )
+ file.remove(envFile)
+
+ readToken <- storageClient$generateSasToken("r", "c", jobId)
+ envFileUrl <-
+ rAzureBatch::createBlobUrl(
+ storageClient$authentication$name,
+ jobId,
+ envFile,
+ readToken,
+ config$endpointSuffix)
+ resourceFiles <-
+ list(rAzureBatch::createResourceFile(url = envFileUrl, fileName = envFile))
+ }
+
+ # Only use the download command if cloudCombine is enabled
+ # Otherwise just leave it empty
+ commands <- c()
+
+ containerSettings <- list(
+ imageName = containerImage,
+ containerRunOptions = "--rm"
+ )
+
+ if (!is.null(cloudCombine)) {
+ assign("cloudCombine", cloudCombine, .doAzureBatchGlobals)
+ containerSettings$imageName <- "brianlovedocker/doazureparallel-merge-dockerfile:0.12.1"
+
+ copyCommand <- sprintf(
+ "%s %s %s --download --saskey $BLOBXFER_SASKEY --remoteresource . --include results/*.rds --endpoint %s",
+ accountName,
+ jobId,
+ "$AZ_BATCH_TASK_WORKING_DIR",
+ config$endpointSuffix
+ )
+
+ commands <- c(paste("blobxfer", copyCommand))
+ }
+
+ exitConditions <- NULL
+ if (!is.null(args$dependsOn)) {
+ dependsOn <- args$dependsOn
+ }
+ else {
+ exitConditions <- list(default = list(dependencyAction = "satisfy"))
+ }
+
+ containerUrl <-
+ rAzureBatch::createBlobUrl(
+ storageAccount = storageClient$authentication$name,
+ containerName = jobId,
+ sasToken = storageClient$generateSasToken("w", "c", jobId),
+ storageEndpointSuffix = config$endpointSuffix
+ )
+
+ outputFiles <- list(
+ list(
+ filePattern = paste0(taskId, ".txt"),
+ destination = list(container = list(
+ path = paste0("logs/", taskId, ".txt"),
+ containerUrl = containerUrl
+ )),
+ uploadOptions = list(uploadCondition = "taskCompletion")
+ ),
+ list(
+ filePattern = "../stdout.txt",
+ destination = list(container = list(
+ path = paste0("stdout/", taskId, "-stdout.txt"),
+ containerUrl = containerUrl
+ )),
+ uploadOptions = list(uploadCondition = "taskCompletion")
+ ),
+ list(
+ filePattern = "../stderr.txt",
+ destination = list(container = list(
+ path = paste0("stderr/", taskId, "-stderr.txt"),
+ containerUrl = containerUrl
+ )),
+ uploadOptions = list(uploadCondition = "taskCompletion")
+ )
+ )
+
+ outputFiles <- append(outputFiles, userOutputFiles)
+
+ commands <-
+ c(commands,
+ rCommand)
+
+ commands <- linuxWrapCommands(commands)
+
+ sasToken <- storageClient$generateSasToken("rwcl", "c", jobId)
+ queryParameterUrl <- "?"
+
+ for (query in names(sasToken)) {
+ queryParameterUrl <-
+ paste0(queryParameterUrl,
+ query,
+ "=",
+ RCurl::curlEscape(sasToken[[query]]),
+ "&")
+ }
+
+ queryParameterUrl <-
+ substr(queryParameterUrl, 1, nchar(queryParameterUrl) - 1)
+
+ body <- list(id = taskId,
+ commandLine = commands,
+ userIdentity = list(
+ autoUser = list(
+ scope = "pool",
+ elevationLevel = "admin"
+ )
+ ),
+ resourceFiles = resourceFiles,
+ dependsOn = dependsOn,
+ outputFiles = outputFiles,
+ constraints = list(
+ maxTaskRetryCount = 3
+ ),
+ exitConditions = exitConditions,
+ containerSettings = containerSettings)
+
+ body <- Filter(length, body)
+
+ body
+ },
+ handleTaskCollection = function(
+ jobId,
+ tasks,
+ threads = 1
+ ){
+ size <- length(tasks)
+ self$originalTaskCollection <- tasks
+
+ self$tasksToAdd <- datastructures::queue()
+ self$tasksToAdd <- datastructures::insert(self$tasksToAdd, tasks)
+
+ self$results <- datastructures::queue()
+ self$failedTasks <- datastructures::queue()
+ self$errors <- datastructures::queue()
+
+ config <- getConfiguration()
+ batchClient <- config$batchClient
+
+ tryCatch({
+ while (datastructures::size(self$tasksToAdd) > 0 &&
+ datastructures::size(self$errors) == 0) {
+ maxTasks <- self$maxTasksPerRequest
+ if (datastructures::size(self$tasksToAdd) < maxTasks) {
+ maxTasks <- datastructures::size(self$tasksToAdd)
+ }
+
+ chunkTasksToAdd <- vector("list", maxTasks)
+ index <- 1
+
+ while (index <= maxTasks &&
+ datastructures::size(self$tasksToAdd) > 0){
+ chunkTasksToAdd[[index]]<- datastructures::pop(self$tasksToAdd)
+ index <- index + 1
+ }
+
+ report <- self$addBulkTasks(
+ jobId,
+ chunkTasksToAdd
+ )
+ }
+ },
+ error = function(e){
+
+ })
+ },
+ addBulkTasks = function(
+ jobId,
+ chunkTasksToAdd
+ ){
+ config <- getConfiguration()
+ batchClient <- config$batchClient
+
+ response <- batchClient$taskOperations$addCollection(
+ jobId,
+ list(value = chunkTasksToAdd),
+ content = "response"
+ )
+
+ # In case of a chunk exceeding the MaxMessageSize split chunk in half
+ # and resubmit smaller chunk requests
+ if (response$status_code == 413) {
+ if(length(chunkTasksToAdd) == 1){
+ self$errors$push(response)
+
+ stop("Failed to add task with ID %s due to the body" +
+ " exceeding the maximum request size" + chunkTasksToAdd[[1]]$id)
+ }
+
+ upperBound <- length(chunkTasksToAdd)
+ midBound <- upperBound / 2
+
+
+ self$addBulkTasks(
+ jobId,
+ chunkTasksToAdd[1:midBound])
+
+ self$addBulkTasks(
+ jobId,
+ chunkTasksToAdd[(midBound+1):upperBound])
+ }
+ else if (500 <= response$status_code &&
+ response$status_code <= 599) {
+ self$tasksToAdd <- datastructures::insert(self$tasksToAdd, chunkTasksToAdd)
+ }
+ else if (response$status_code == 200){
+ values <- httr::content(response)$value
+
+ for (i in 1:length(values)) {
+ taskId <- values[[i]]$id
+
+ if (compare(values[[i]]$status, "servererror")) {
+ self$tasksToAdd <- datastructures::insert(self$tasksToAdd, self$originalTaskCollection[[taskId]])
+ }
+ else if (compare(values[[i]]$status, "clienterror") &&
+ values[[i]]$error$code != "TaskExists") {
+ self$failedTasks <- datastructures::insert(self$failedTasks, values[[i]])
+ }
+ else {
+ self$results <- datastructures::insert(self$results, values[[i]])
+ }
+ }
+ }
+ else {
+ self$tasksToAdd <- datastructures::insert(self$tasksToAdd, chunkTasksToAdd)
+ self$errors <- datastructures::insert(self$errors, response)
+ }
+ }
+ )
+)
+
+TaskWorkflowManager <- TaskWorkflowManager$new()
diff --git a/R/utility-job.R b/R/utility-job.R
index dfe0398d..e2e41548 100644
--- a/R/utility-job.R
+++ b/R/utility-job.R
@@ -499,9 +499,7 @@ waitForTasksToComplete <-
for (i in 1:length(failedTasks$value)) {
if (!is.null(failedTasks$value[[i]]$executionInfo$result) &&
- grepl(failedTasks$value[[i]]$executionInfo$result,
- "failure",
- ignore.case = TRUE)) {
+ compare(failedTasks$value[[i]]$executionInfo$result, "failure")) {
tasksFailureWarningLabel <-
paste0(tasksFailureWarningLabel,
sprintf("%s\n", failedTasks$value[[i]]$id))
@@ -559,7 +557,7 @@ waitForTasksToComplete <-
next
}
- if (grepl(mergeTask$executionInfo$result, "success", ignore.case = TRUE)) {
+ if (compare(mergeTask$executionInfo$result, "success")) {
cat(" Completed.")
break
}
diff --git a/R/utility-string.R b/R/utility-string.R
index 95fda84a..09e86992 100644
--- a/R/utility-string.R
+++ b/R/utility-string.R
@@ -115,3 +115,7 @@ printCluster <- function(cluster, resourceFiles = list()) {
}
cat(strrep('=', options("width")), fill = TRUE)
}
+
+compare <- function(a, b) {
+ return(grepl(a, b, ignore.case = TRUE))
+}
diff --git a/R/utility.R b/R/utility.R
index 0bf689d4..7f0b63ad 100644
--- a/R/utility.R
+++ b/R/utility.R
@@ -304,3 +304,95 @@ getHttpErrorMessage <- function(responseObj) {
detailMessage <- paste0(detailMessage, "\r\nodata.metadata: ", responseObj$odata.metadata)
return(detailMessage)
}
+
+viewErrors <- function(id, failTasks) {
+ config <- getConfiguration()
+ storageClient <- config$storageClient
+
+ sasToken <- storageClient$generateSasToken("r", "c", id)
+ queryParameterUrl <- "?"
+
+ for (query in names(sasToken)) {
+ queryParameterUrl <-
+ paste0(queryParameterUrl,
+ query,
+ "=",
+ RCurl::curlEscape(sasToken[[query]]),
+ "&")
+ }
+
+ queryParameterUrl <-
+ substr(queryParameterUrl, 1, nchar(queryParameterUrl) - 1)
+
+ tempDir <- tempfile()
+ dir.create(tempDir)
+ htmlFile <- file.path(tempDir, paste0(id, ".html"))
+ azureStorageUrl <-
+ paste0("http://",
+ storageCredentials$name,
+ sprintf(".blob.%s/", storageCredentials$endpointSuffix),
+ id)
+
+ staticHtml <- "Errors:
"
+ for (i in 1:length(failTasks)) {
+ if (failTasks[i] == 1) {
+ stdoutFile <- paste0(azureStorageUrl, "/", "stdout")
+ stderrFile <- paste0(azureStorageUrl, "/", "stderr")
+ rlogFile <- paste0(azureStorageUrl, "/", "logs")
+
+ stdoutFile <-
+ paste0(stdoutFile,
+ "/",
+ id,
+ "-task",
+ i,
+ "-stdout.txt",
+ queryParameterUrl)
+ stderrFile <-
+ paste0(stderrFile,
+ "/",
+ id,
+ "-task",
+ i,
+ "-stderr.txt",
+ queryParameterUrl)
+ rlogFile <-
+ paste0(rlogFile,
+ "/",
+ id,
+ "-task",
+ i,
+ ".txt",
+ queryParameterUrl)
+
+ staticHtml <-
+ paste0(
+ staticHtml,
+ "Task ",
+ i,
+ " | ",
+ "stdout.txt",
+ " |",
+ " ",
+ "stderr.txt",
+ " | ",
+ "R output",
+ "
"
+ )
+ }
+ }
+
+ write(staticHtml, htmlFile)
+
+ viewer <- getOption("viewer")
+ if (!is.null(viewer)) {
+ viewer(htmlFile)
+ }
+}
+
diff --git a/inst/startup/install_custom.R b/inst/startup/install_custom.R
index 7d39d60d..02b732ff 100644
--- a/inst/startup/install_custom.R
+++ b/inst/startup/install_custom.R
@@ -18,6 +18,13 @@ if (length(args) > 1) {
}
}
+if (length(args) < 1) {
+ stop("Given arguments were not passed,",
+ "install_custom.R file_share_directory pattern")
+}
+
+directory <- args[1]
+
devtoolsPackage <- "devtools"
if (!require(devtoolsPackage, character.only = TRUE)) {
install.packages(devtoolsPackage)
@@ -25,7 +32,8 @@ if (!require(devtoolsPackage, character.only = TRUE)) {
}
packageDirs <- list.files(
- path = tempDir,
+ path = directory,
+ pattern = pattern,
full.names = TRUE,
recursive = FALSE)