Azure Storage in R

Code Properties

Overview

Sources:

  • Source URL or reference

Description of this code snippet/script/module.

Code

Azure CLI Wrapper

# azure cli ----------------------------------------------------------------------
 
#' @export
#' @importFrom sys exec_internal as_text
#' @importFrom cli cli_abort
az <- function(cmd, ..., .timeout = 60, .echo = TRUE) {
 
  az_path <- Sys.which("az")
  if (az_path == "") {
    cli::cli_abort(
    "{.code az} is not installed or not found in the system {.envvar PATH}."
    )
  }
 
  args <- c(cmd, ...)
 
  res <- sys::exec_internal(
    cmd = az_path,
    args = args,
    timeout = .timeout,
    error = FALSE
  )
 
  if (res$status != 0) {
    cli::cli_abort(
      c(
        "Failed to run {.code az {cmd}} command (exit code: {res$status}).",
        "x" = sys::as_text(res$stderr)
      )
    )
  }
 
  output <- sys::as_text(res$stdout)
  if (.echo) cat(output)
  invisible(
    list(
      stdout = output,
      stderr = sys::as_text(res$stderr),
      status = res$status
    )
  )
 
}

Login

# login
az_login <- function() {
  res <- az("account", "show", "--output", "json", .echo = FALSE)
  if (res$status != 0) {
    cli::cli_abort("Failed to retrieve Azure account information. Please ensure you are logged in to Azure CLI.")
  } else {
    account_info <- jsonlite::fromJSON(res$stdout)
    cli::cli_alert_success("Logged in to Azure as {.field {account_info$user$name}} (Subscription: {.field {account_info$name}}).")
  }
  az <- AzureRMR::create_azure_login()
  if (is.null(az)) {
    cli::cli_abort("Failed to create Azure login object. Please check your Azure CLI configuration.")
  }
  return(az)
}

Subscription

az_get_subscription <- function(az) {
  subs <- az$list_subscriptions()
  sub_names <- names(subs)
  if (length(subs) > 1L) {
    cli::cli_alert_info("Multiple Azure subscriptions found: {.field {sub_names}}.")
    res <- utils::menu(choices = sub_names, title = "Select an Azure subscription to use:")
    if (res == 0) {
      cli::cli_abort("No subscription selected. Exiting.")
    }
    selected_sub <- subs[[res]]
  } else {
    selected_sub <- subs[[1L]]
    cli::cli_alert_info("Using Azure subscription: {.field {selected_sub$name}}.")
  }
  return(selected_sub)
}

Resource Group

az_get_resource_group <- function(az, sub = az_get_subscription(az)) {
  rgs <- sub$list_resource_groups()
  rg_names <- names(rgs)
  if (length(rgs) > 1L) {
    cli::cli_alert_info("Multiple resource groups found: {.field {rg_names}}.")
    res <- utils::menu(choices = rg_names, title = "Select a resource group to use:")
    if (res == 0) {
      cli::cli_abort("No resource group selected. Exiting.")
    }
    selected_rg <- rgs[[res]]
  } else {
    selected_rg <- rgs[[1L]]
    cli::cli_alert_info("Using resource group: {.field {selected_rg$name}}.")
  }
  return(selected_rg)
}

Resource

az_get_resource <- function(az, rg = az_get_resource_group(az)) {
  resources <- rg$list_resources()
  res_names <- names(resources)
  if (length(resources) > 1L) {
    cli::cli_alert_info("Multiple resources found: {.field {res_names}}.")
    res <- utils::menu(choices = res_names, title = "Select a resource to use:")
    if (res == 0) {
      cli::cli_abort("No resource selected. Exiting.")
    }
    selected_res <- resources[[res]]
  } else {
    selected_res <- resources[[1L]]
    cli::cli_alert_info("Using resource: {.field {selected_res$name}}.")
  }
  return(selected_res)
}

Blob Storage Endpoint

#' Get Azure Blob Storage Endpoint
#'
#' Creates an authenticated connection to Azure Blob Storage using cached configuration.
#'
#' @param .config Azure config object. If NULL, uses cached config from package environment.
#' @param cache Logical. Cache the endpoint in package environment? Default: TRUE
#'
#' @return An Azure blob endpoint object
#' @export
az_storage_endpoint <- function(.config = NULL, cache = TRUE) {
 
  if (is.null(.config)) {
    .config <- get_az_config()
  }
 
  check_az_config(require_gdal = FALSE)
 
  # Check for cached endpoint
  endpoint_key <- .config$storage_account
 
  if (cache && rlang::env_has(.pkg_env$azure$endpoints, endpoint_key)) {
    return(rlang::env_get(.pkg_env$azure$endpoints, endpoint_key))
  }
 
  # Create new endpoint
  endpoint <- tryCatch(
    AzureStor::blob_endpoint(.config$endpoint_url, key = .config$account_key),
    error = function(e) {
      cli::cli_abort(
        c(
          "x" = "Failed to connect to Azure Storage",
          "i" = "Storage Account: {.val {.config$storage_account}}",
          "i" = "Error: {e$message}"
        )
      )
    }
  )
 
  # Cache if requested
  if (cache) {
    rlang::env_bind(.pkg_env$azure$endpoints, !!endpoint_key := endpoint)
    cli::cli_alert_info("Cached Azure endpoint for {.val {.config$storage_account}}")
  }
 
  endpoint
}

Blob Storage Container

#' Get Azure Blob Container
#'
#' Returns a reference to a specific blob container, with session caching.
#'
#' @param container_name Container name. If NULL, uses default from config.
#' @param .endpoint Azure blob endpoint. If NULL, creates using cached config.
#' @param cache Logical. Cache the container reference? Default: TRUE
#'
#' @return An Azure blob container object
#' @export
az_storage_container <- function(container_name = NULL, .endpoint = NULL, cache = TRUE) {
 
  if (is.null(container_name)) {
    container_name <- get_az_config("default_container")
  }
 
  # Check for cached container
  if (cache && rlang::env_has(.pkg_env$azure$containers, container_name)) {
    return(rlang::env_get(.pkg_env$azure$containers, container_name))
  }
 
  if (is.null(.endpoint)) {
    .endpoint <- az_storage_endpoint()
  }
 
  container <- tryCatch(
    AzureStor::blob_container(.endpoint, container_name),
    error = function(e) {
      cli::cli_abort(
        c(
          "x" = "Failed to access container {.val {container_name}}",
          "i" = "Ensure the container exists and you have appropriate permissions",
          "i" = "Error: {e$message}"
        )
      )
    }
  )
 
  # Cache if requested
  if (cache) {
    rlang::env_bind(.pkg_env$azure$containers, !!container_name := container)
  }
 
  container
}

Blob Storage Smart Transfer

#' Smart Transfer from URL to Azure Blob
#'
#' Intelligently transfers files from HTTP/HTTPS/FTP URLs to Azure Blob Storage.
#'
#' @param urls Character vector of source URLs
#' @param blob_paths Character vector of destination blob paths. If NULL, uses basename of URLs.
#' @param .container Azure container object. If NULL, uses default container.
#' @param overwrite Logical. Overwrite existing blobs? Default: FALSE
#' @param progress Logical. Show progress? Default: TRUE
#'
#' @return Invisible tibble with transfer results
#' @export
az_smart_transfer <- function(
    urls,
    blob_paths = NULL,
    .container = NULL,
    overwrite = FALSE,
    progress = TRUE
) {
 
  if (is.null(.container)) {
    .container <- az_storage_container()
  }
 
  check_az_container(.container)
 
  if (is.null(blob_paths)) {
    blob_paths <- basename(urls)
  }
 
  if (length(urls) != length(blob_paths)) {
    cli::cli_abort(
      c(
        "x" = "{.arg urls} and {.arg blob_paths} must have same length",
        "i" = "urls: {length(urls)}, blob_paths: {length(blob_paths)}"
      )
    )
  }
 
  results <- purrr::map2_dfr(urls, blob_paths, function(url, dest_path) {
 
    cli::cli_h2("Processing: {basename(url)}")
 
    # Check if already exists
    if (!overwrite) {
      exists <- tryCatch(
        AzureStor::blob_exists(.container, dest_path),
        error = function(e) FALSE
      )
 
      if (exists) {
        cli::cli_alert_info("Blob {.file {dest_path}} already exists, skipping")
        return(tibble::tibble(
          url = url,
          blob_path = dest_path,
          status = "skipped",
          size_mb = NA_real_,
          method = NA_character_
        ))
      }
    }
 
    # Get file size
    size_mb <- tryCatch({
      req <- httr2::request(url) |>
        httr2::req_method("HEAD") |>
        httr2::req_retry(max_tries = 3) |>
        httr2::req_timeout(30)
 
      resp <- httr2::req_perform(req)
      as.numeric(httr2::resp_header(resp, "Content-Length")) / (1024^2)
    }, error = function(e) {
      cli::cli_alert_warning("Could not determine file size, assuming > 256MB")
      999999
    })
 
    cli::cli_alert_info("File size: {round(size_mb, 2)} MB")
 
    result <- tryCatch({
      if (size_mb < 256) {
        # Fast path: Server-side copy
        cli::cli_alert_info("Using server-side copy...")
        AzureStor::copy_url_to_blob(.container, url, dest_path)
 
        tibble::tibble(
          url = url,
          blob_path = dest_path,
          status = "success",
          size_mb = size_mb,
          method = "server_copy"
        )
 
      } else {
        # Slow path: Download then upload
        cli::cli_alert_warning("File exceeds 256MB, using download-upload method...")
 
        tmp <- tempfile(fileext = paste0(".", tools::file_ext(url)))
        on.exit(unlink(tmp), add = TRUE)
 
        old_timeout <- getOption("timeout")
        on.exit(options(timeout = old_timeout), add = TRUE)
        options(timeout = max(3600, old_timeout))
 
        cli::cli_progress_step("Downloading...")
        utils::download.file(url, tmp, mode = "wb", quiet = !progress)
        cli::cli_progress_done()
 
        cli::cli_progress_step("Uploading to Azure...")
        AzureStor::upload_blob(.container, tmp, dest_path)
        cli::cli_progress_done()
 
        tibble::tibble(
          url = url,
          blob_path = dest_path,
          status = "success",
          size_mb = size_mb,
          method = "download_upload"
        )
      }
    }, error = function(e) {
      cli::cli_alert_danger("Transfer failed: {e$message}")
      tibble::tibble(
        url = url,
        blob_path = dest_path,
        status = "failed",
        size_mb = size_mb,
        method = NA_character_
      )
    })
 
    if (result$status == "success") {
      cli::cli_alert_success("Transferred: {.file {dest_path}}")
    }
 
    result
  })
 
  # Summary
  cli::cli_rule()
  cli::cli_alert_success("Completed {sum(results$status == 'success')}/{nrow(results)} transfers")
 
  if (any(results$status == "failed")) {
    cli::cli_alert_warning("{sum(results$status == 'failed')} transfers failed")
  }
 
  if (any(results$status == "skipped")) {
    cli::cli_alert_info("{sum(results$status == 'skipped')} transfers skipped")
  }
 
  invisible(results)
}

Pins Example

az_storage_container <- function(name) {
  storage_endpoint <- AzureStor::blob_endpoint("https://landrisestorage.blob.core.windows.net/", key = Sys.getenv("AZURE_STORAGE_KEY"))
  containers <- AzureStor::list_blob_containers(storage_endpoint)
  rlang::arg_match(name, names(containers))
  AzureStor::blob_container(storage_endpoint, name)
}
 
az_board <- pins::board_azure(
  container = az_storage_container("landrise-geospatial"),
  path = "temp",
  cache = "data-raw/cache"
)
 
pins::pin_write(
  board = az_board,
  x = tiger_field_dictionary,
  name = "tiger_field_dictionary",
  description = "Field dictionary for TIGER/Line shapefiles from the US Census Bureau",
  type = "csv"
)
 
az_board |> pins::pin_list()
 
az_board |> pins::write_board_manifest()
 
 
tiger_data <- pins::board_url(
  urls = c(
    "cb_2024_us_all_20m_gpkg" = "https://www2.census.gov/geo/tiger/GENZ2024/gpkg/cb_2024_us_all_20m.zip",
    "cb_2024_us_tract_5m_shp" = "https://www2.census.gov/geo/tiger/GENZ2024/shp/cb_2024_us_tract_5m.zip"
  ),
  cache = "data-raw/cache/tiger",
  use_cache_on_failure = TRUE,
  headers = NULL
)
 
pins::write_board_manifest(tiger_data)
 
tiger_data |>
  pins::pin_download("cb_2024_us_all_20m_gpkg")
 
Sys.setenv("R_USER_DATA_DIR" = "data-raw/cache/tiger")
dpkg::stow(uri = "https://www2.census.gov/geo/tiger/GENZ2024/gpkg/cb_2024_us_all_20m.zip")
 

Usage

How to use this code:

# usage example

Notes

Additional notes about the code.


Appendix

Note created on 2026-01-05 and last modified on 2026-01-05.

See Also


(c) No Clocks, LLC | 2026