diff --git a/DESCRIPTION b/DESCRIPTION index 361ec4a..632f32f 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: CyteTypeR Title: CyteType for R -Version: 0.3.2 +Version: 0.4.0 Description: CyteTypeR is the R version of CyteType python package. Authors@R: person("Nygen Analytics AB", , ,"contact@nygen.io", role = c("aut", "cre")) diff --git a/NAMESPACE b/NAMESPACE index aa4a4df..c109489 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -1,6 +1,7 @@ # Generated by roxygen2: do not edit by hand S3method(print,cytetype_api_error) +export(CleanUpArtifacts) export(CyteTypeR) export(GetResults) export(PrepareCyteTypeR) diff --git a/R/api.R b/R/api.R index fe791b1..6e9beda 100644 --- a/R/api.R +++ b/R/api.R @@ -100,20 +100,19 @@ # Process based on job status if (job_status == "completed") { # Try to get results - tryCatch({ - results_resp <- .api_response_helper(job_id, api_url, 'results', auth_token) - }, error = function(e) { - # If results fetch fails, treat as failed job - return(make_response( - "failed", - message = paste("Job completed but results unavailable:", e$message), - raw = status_data - )) - }) - - # Check if we got an error response above - if (is.list(results_resp) && (results_resp$status == "failed")) { - return(results_resp) # Return the error response + results_resp <- tryCatch( + .api_response_helper(job_id, api_url, 'results', auth_token), + error = function(e) { + make_response( + "failed", + message = paste("Job completed but results unavailable:", e$message), + raw = status_data + ) + } + ) + + if (!is.null(results_resp$status) && results_resp$status == "failed") { + return(results_resp) } if (results_resp$status_code == 404) { diff --git a/R/artifacts.R b/R/artifacts.R index 59c5aab..47eeb2e 100644 --- a/R/artifacts.R +++ b/R/artifacts.R @@ -22,6 +22,15 @@ return(y) } +# Map R class to a pandas-compatible dtype string for the source_dtype attribute. +.r_to_source_dtype <- function(vec) { + if (is.factor(vec)) return("category") + if (is.logical(vec)) return("bool") + if (is.integer(vec)) return("int32") + if (is.numeric(vec)) return("float64") + return("object") +} + # Write optional var (feature) metadata under info/var (mirrors Python _write_var_metadata). .write_var_metadata <- function(fid, n_cols, feature_df, feature_names) { if (nrow(feature_df) != n_cols) { @@ -43,6 +52,7 @@ existing <- c(existing, dataset_name) col_path <- paste0("info/var/columns/", dataset_name) vec <- feature_df[[i]] + source_dtype <- .r_to_source_dtype(vec) if (is.factor(vec)) vec <- as.character(vec) if (is.logical(vec)) { storage.mode(vec) <- "integer" @@ -53,15 +63,75 @@ } else { rhdf5::h5writeDataset(.as_string_values(vec), h5loc = fid, name = col_path) } + did <- rhdf5::H5Dopen(fid, col_path) + rhdf5::h5writeAttribute(col_name, h5obj = did, name = "source_name") + rhdf5::h5writeAttribute(source_dtype, h5obj = did, name = "source_dtype") + rhdf5::H5Dclose(did) + } + invisible(NULL) +} + +# Write a sparse matrix under a named HDF5 group. +# csr = FALSE (default): CSC — indptr over columns (genes), indices are row (cell) indices. +# Input m must be cells × genes (n_obs × n_vars). +# csr = TRUE: CSR — indptr over rows (cells), indices are column (gene) indices. +# Input m must be genes × cells (n_vars × n_obs) as returned by Seurat GetAssayData. +# Stored via CSC(genes × cells) ≡ CSR(cells × genes); no transpose needed. +.write_sparse_group <- function(fid, group, m, n_obs, col_batch, chunk_size, + csr = FALSE, data_h5type = "H5T_NATIVE_FLOAT") { + if (csr) { + m <- as(m, "CsparseMatrix") + n_vars <- nrow(m) + } else { + n_vars <- ncol(m) + } + n_cols <- ncol(m) + rhdf5::h5createGroup(fid, group) + gid <- rhdf5::H5Gopen(fid, group) + on.exit(rhdf5::H5Gclose(gid), add = TRUE) + rhdf5::h5writeAttribute(as.integer(n_obs), h5obj = gid, name = "n_obs") + rhdf5::h5writeAttribute(as.integer(n_vars), h5obj = gid, name = "n_vars") + + rhdf5::h5createDataset(fid, paste0(group, "/indices"), dims = 0L, + maxdims = rhdf5::H5Sunlimited(), chunk = chunk_size, + H5type = "H5T_NATIVE_INT32", filter = "BLOSC_LZ4") + rhdf5::h5createDataset(fid, paste0(group, "/data"), dims = 0L, + maxdims = rhdf5::H5Sunlimited(), chunk = chunk_size, + H5type = data_h5type, filter = "BLOSC_LZ4") + + indptr <- 0 + current_size <- 0L + starts <- seq(1L, n_cols, by = col_batch) + for (start in starts) { + end <- min(start + col_batch - 1L, n_cols) + chunk <- as(m[, start:end, drop = FALSE], "CsparseMatrix") + chunk_indices <- as.integer(chunk@i) + chunk_data <- if (data_h5type == "H5T_NATIVE_INT32") as.integer(chunk@x) else as.numeric(chunk@x) + chunk_nnz <- length(chunk_indices) + if (chunk_nnz > 0L) { + rhdf5::h5set_extent(fid, paste0(group, "/indices"), current_size + chunk_nnz) + rhdf5::h5writeDataset(chunk_indices, h5loc = fid, name = paste0(group, "/indices"), + index = list((current_size + 1L):(current_size + chunk_nnz))) + rhdf5::h5set_extent(fid, paste0(group, "/data"), current_size + chunk_nnz) + rhdf5::h5writeDataset(chunk_data, h5loc = fid, name = paste0(group, "/data"), + index = list((current_size + 1L):(current_size + chunk_nnz))) + current_size <- current_size + chunk_nnz + } + new_indptr <- as.numeric(chunk@p[-1L]) + indptr[length(indptr)] + indptr <- c(indptr, new_indptr) } + + rhdf5::h5createDataset(fid, paste0(group, "/indptr"), dims = length(indptr), + H5type = "H5T_NATIVE_INT64", chunk = min(chunk_size, length(indptr)), + filter = "BLOSC_LZ4") + rhdf5::h5writeDataset(indptr, h5loc = fid, name = paste0(group, "/indptr")) invisible(NULL) } -.save_vars_h5 <- function(out_file, mat, feature_df = NULL, feature_names = NULL, +.save_vars_h5 <- function(out_file, mat, raw_mat = NULL, feature_df = NULL, feature_names = NULL, col_batch = NULL, min_chunk_size = 10000L) { - m <- as(mat, "CsparseMatrix") - n_obs <- nrow(m) - n_vars <- ncol(m) + n_obs <- nrow(mat) + n_vars <- ncol(mat) if (!requireNamespace("rhdf5filters", quietly = TRUE)) { stop("Package 'rhdf5filters' is required to write vars.h5 with LZ4 compression.") @@ -80,71 +150,14 @@ fid <- rhdf5::H5Fopen(out_file, flags = "H5F_ACC_RDWR") on.exit(rhdf5::H5Fclose(fid), add = TRUE) - rhdf5::h5createGroup(fid, "vars") - rhdf5::h5writeAttribute(as.integer(n_obs), h5obj = out_file, name = "n_obs", h5loc = "vars", asScalar = TRUE) - rhdf5::h5writeAttribute(as.integer(n_vars), h5obj = out_file, name = "n_vars", h5loc = "vars", asScalar = TRUE) - - # Create extensible datasets (equivalent to maxshape=(None,) in h5py) - max_nnz <- n_obs * n_vars # upper bound - rhdf5::h5createDataset( - fid, "vars/indices", - dims = 0L, - maxdims = rhdf5::H5Sunlimited(), - chunk = chunk_size, - H5type = "H5T_NATIVE_INT32", - filter = "BLOSC_LZ4" - ) - rhdf5::h5createDataset( - fid, "vars/data", - dims = 0L, - maxdims = rhdf5::H5Sunlimited(), - chunk = chunk_size, - H5type = "H5T_NATIVE_FLOAT", - filter = "BLOSC_LZ4" - ) - - indptr <- 0L - current_size <- 0L + .write_sparse_group(fid, "vars", mat, n_obs, col_batch, chunk_size) - starts <- seq(1L, n_vars, by = col_batch) - for (start in starts) { - end <- min(start + col_batch - 1L, n_vars) - chunk <- as(m[, start:end, drop = FALSE], "CsparseMatrix") - - chunk_indices <- as.integer(chunk@i) - chunk_data <- as.numeric(chunk@x) - chunk_nnz <- length(chunk_indices) - - if (chunk_nnz > 0L) { - # Extend and write indices - rhdf5::h5set_extent(fid, "vars/indices", current_size + chunk_nnz) - rhdf5::h5writeDataset( - chunk_indices, h5loc = fid, name = "vars/indices", - index = list((current_size + 1L):(current_size + chunk_nnz)) - ) - # Extend and write data - rhdf5::h5set_extent(fid, "vars/data", current_size + chunk_nnz) - rhdf5::h5writeDataset( - chunk_data, h5loc = fid, name = "vars/data", - index = list((current_size + 1L):(current_size + chunk_nnz)) - ) - current_size <- current_size + chunk_nnz - } - - # Accumulate indptr (skip first element after first chunk) - new_indptr <- as.integer(chunk@p[-1L] + indptr[length(indptr)]) - indptr <- c(indptr, new_indptr) + if (!is.null(raw_mat)) { + raw_col_batch <- max(1L, as.integer(100000000 / max(nrow(raw_mat), 1))) + .write_sparse_group(fid, "raw", raw_mat, n_obs, raw_col_batch, chunk_size, + csr = TRUE, data_h5type = "H5T_NATIVE_INT32") } - rhdf5::h5createDataset( - fid, "vars/indptr", - dims = length(indptr), - H5type = "H5T_NATIVE_INT32", - chunk = min(chunk_size, length(indptr)), - filter = "BLOSC_LZ4" - ) - rhdf5::h5writeDataset(as.integer(indptr), h5loc = fid, name = "vars/indptr") - if (!is.null(feature_df)) { .write_var_metadata(fid, n_cols = n_vars, feature_df = feature_df, feature_names = feature_names) } @@ -152,7 +165,8 @@ invisible(out_file) } -.save_obs_duckdb <- function(out_file, obs_df, table_name = "obs", +.save_obs_duckdb <- function(out_file, obs_df, coordinates = NULL, coordinates_key = NULL, + table_name = "obs", threads = "4", memory_limit = "4GB", temp_directory = "tmp/duckdb") { if (!requireNamespace("duckdb", quietly = TRUE)) { stop("Package 'duckdb' is required to build obs.duckdb. Install with: install.packages('duckdb')") @@ -160,10 +174,23 @@ if (!grepl("^[A-Za-z_][A-Za-z0-9_]*$", table_name)) { stop("Invalid table_name. Use letters, numbers, and underscores only.") } + + df <- as.data.frame(obs_df) + + if (!is.null(coordinates) && !is.null(coordinates_key)) { + coords <- as.matrix(coordinates) + if (ncol(coords) >= 2 && nrow(coords) == nrow(df)) { + col1 <- paste0("__vis_coordinates_", coordinates_key, "_1") + col2 <- paste0("__vis_coordinates_", coordinates_key, "_2") + df[[col1]] <- as.numeric(coords[, 1]) + df[[col2]] <- as.numeric(coords[, 2]) + } + } + if (file.exists(out_file)) file.remove(out_file) config <- list(threads = as.character(threads), memory_limit = memory_limit, temp_directory = temp_directory) con <- duckdb::dbConnect(duckdb::duckdb(), out_file, config = config) on.exit(duckdb::dbDisconnect(con, shutdown = TRUE), add = TRUE) - duckdb::dbWriteTable(con, table_name, as.data.frame(obs_df), overwrite = TRUE) + duckdb::dbWriteTable(con, table_name, df, overwrite = TRUE) invisible(out_file) } diff --git a/R/client.R b/R/client.R index 4a549b6..b03f80c 100644 --- a/R/client.R +++ b/R/client.R @@ -1,5 +1,5 @@ -# Upload size limits (bytes): match Python API (vars_h5 10GB uses numeric to avoid integer overflow) -.MAX_UPLOAD_BYTES <- list(obs_duckdb = 100L * 1024L * 1024L, vars_h5 = 10 * 1024 * 1024 * 1024) +# Upload size limits: 100MB and 50GB respectively (vars_h5 uses numeric to avoid integer overflow) +.MAX_UPLOAD_BYTES <- list(obs_duckdb = 100L * 1024L * 1024L, vars_h5 = 50 * 1024 * 1024 * 1024) # Chunked upload retry: delays (sec) after 1st, 2nd, 3rd failure; status codes treated as transient (incl. network/gateway) .CHUNK_UPLOAD_BACKOFF_SECS <- c(1L, 5L, 20L) @@ -38,7 +38,7 @@ stop(file_kind, " exceeds upload limit: ", size, " bytes (max ", max_bytes, ")") } - connection_timeout <- 30L + connection_timeout <- 72L # Step 1 – Initiate (empty POST; explicit empty body for compatibility) init_resp <- tryCatch( @@ -52,7 +52,7 @@ ) upload_id <- init_resp$upload_id - chunk_size <- as.integer(init_resp$chunk_size_bytes %||% (5L * 1024L * 1024L)) + chunk_size <- as.integer(init_resp$chunk_size_bytes %||% (50L * 1024L * 1024L)) ## default 50MB server_max <- init_resp$max_size_bytes if (!is.null(server_max) && length(server_max) > 0) { server_max_n <- as.numeric(server_max)[1] @@ -145,7 +145,7 @@ req_method("POST") |> req_body_json(payload, na = "string") |> req_headers("Content-Type" = "application/json") |> - req_timeout(60) + req_timeout(180) # Add auth token if provided if (!is.null(auth_token)) { @@ -211,6 +211,8 @@ last_cluster_status <- list() spinner_frame = 0 consecutive_not_found <- 0 + consecutive_errors <- 0 + max_consecutive_errors <- 5 # Main polling loop repeat { @@ -225,8 +227,9 @@ status_response <- .make_results_request(job_id, api_url, auth_token) status <- status_response$status - # Reset not found counter on valid response - if (status != "not_found"){ consecutive_not_found <- 0 } + # Reset counters on valid server response + if (status != "not_found") consecutive_not_found <- 0 + if (status != "error") consecutive_errors <- 0 # Extract cluster status current_cluster_status <- status_response$raw_response$clusterStatus %||% list() @@ -274,31 +277,40 @@ "pending" = { log_debug("Job {job_id} status: {status}. Waiting {poll_interval}s...") if (show_progress && length(current_cluster_status) > 0){ - - # Sleep with spinner updates - .sleep_with_spinner(poll_interval,current_cluster_status,show_progress) + .sleep_with_spinner(poll_interval, current_cluster_status, show_progress) + } else { + Sys.sleep(poll_interval) } last_cluster_status <- current_cluster_status }, "not_found" = { consecutive_not_found <- consecutive_not_found + 1 + log_debug("Job {job_id} not found (404, attempt {consecutive_not_found}). Waiting {poll_interval}s...") - if (!is.null(auth_token) && consecutive_not_found >= 3) { - log_warn("\u26a0\ufe0f Getting consecutive 404 responses with auth token. This might indicate authentication issues.") - log_warn("Please verify your auth_token is valid and has proper permissions") - log_warn("If you're using a shared server, contact your administrator.") - consecutive_not_found <- 0 # Reset to avoid spam + if (consecutive_not_found >= 5) { + cat("\n") + stop(paste0("Job '", job_id, "' not found after ", consecutive_not_found, + " attempts. Verify the job_id is correct.")) } - - log_debug("Results endpoint not ready yet for job {job_id} (404). Waiting {poll_interval}s...") - .sleep_with_spinner(poll_interval,current_cluster_status,show_progress) + .sleep_with_spinner(poll_interval, current_cluster_status, show_progress) last_cluster_status <- current_cluster_status }, + "error" = { + consecutive_errors <- consecutive_errors + 1 + error_msg <- status_response$message %||% "Unknown error" + log_warn("Error checking job {job_id} ({consecutive_errors}/{max_consecutive_errors}): {error_msg}") + if (consecutive_errors >= max_consecutive_errors) { + cat("\n") + stop(paste("Stopping after", max_consecutive_errors, "consecutive errors:", error_msg)) + } + .sleep_with_spinner(poll_interval, current_cluster_status, show_progress) + }, + { - log_warn("Job {job_id} has unknown status: '{status}'. Continuing...") - .sleep_with_spinner(poll_interval,current_cluster_status,show_progress) + cat("\n") + stop(paste("Job", job_id, "returned unexpected status:", status)) } ) diff --git a/R/cytetype.R b/R/cytetype.R index 798feba..ca4248a 100644 --- a/R/cytetype.R +++ b/R/cytetype.R @@ -24,6 +24,10 @@ #' to use for visualization coordinates (e.g., "umap", "tsne"). Default is "umap". #' @param max_cells_per_group Integer specifying maximum cells per cluster for #' subsampling (currently unused). Default is 1000. +#' @param vars_h5_path Character string specifying the local file path for the +#' generated vars.h5 artifact (feature expression). Default is `"vars.h5"`. +#' @param obs_duckdb_path Character string specifying the local file path for the +#' generated obs.duckdb artifact (cell metadata). Default is `"obs.duckdb"`. #' #' @return Named list containing formatted data for CyteType analysis: #' \itemize{ @@ -32,6 +36,10 @@ #' \item `markerGenes`: List of top marker genes per cluster #' \item `visualizationData`: Coordinates and cluster assignments for plotting #' \item `expressionData`: Expression percentages by cluster +#' \item `group_key`: The metadata column name used for cluster assignments +#' \item `build_succeeded`: Logical indicating whether artifact files were built successfully +#' \item `vars_h5_path`: Path to the generated vars.h5 file +#' \item `obs_duckdb_path`: Path to the generated obs.duckdb file #' } #' #' @details @@ -42,6 +50,8 @@ #' 4. Filters and selects top marker genes per cluster #' 5. Extracts dimensional reduction coordinates for visualization #' 6. Calculates expression percentages across clusters +#' 7. Builds a vars.h5 artifact from the normalized expression matrix +#' 8. Builds an obs.duckdb artifact from the cell metadata #' #' Clusters are renumbered sequentially (1, 2, 3, ...) to ensure consistent #' formatting regardless of original cluster naming. @@ -74,7 +84,9 @@ PrepareCyteTypeR <- function(obj, min_percentage = 10, pcent_batch_size = 2000, coordinates_key = "umap", - max_cells_per_group = 1000 + max_cells_per_group = 1000, + vars_h5_path = "vars.h5", + obs_duckdb_path = "obs.duckdb" ){ .validate_seurat(obj, group_key, gene_symbols, coordinates_key) @@ -98,17 +110,23 @@ PrepareCyteTypeR <- function(obj, arrange(desc(avg_log2FC)) %>% slice_head(n = n_top_genes) %>% ungroup() %>% - {split(.$gene, .$cluster)} + {split(.$gene, as.character(.$cluster))} names(marker_genes) <- cluster_map[names(marker_genes)] if (any(sapply(marker_genes, function(x) !is.vector(x) || length(x) < 5))) { stop("Invalid marker genes, some clusters have fewer than 5 markers") } - print("Preparing visualisation data...") - visualization_data <- .sample_visualization_data( - obj, group_key, coordinates_key, cluster_map, max_cells_per_group - ) + coords <- NULL + visualization_data <- NULL + tryCatch({ + coords <- Seurat::Embeddings(obj, reduction = coordinates_key) + print("Preparing visualisation data...") + visualization_data <- .sample_visualization_data(obj, group_key, coordinates_key, cluster_map, max_cells_per_group) + }, error = function(e) { + log_warn(paste("Could not extract coordinates for reduction '", coordinates_key, + "'. Continuing without visualization data. Error:", conditionMessage(e))) + }) print("Calculating expression percentages...") expression_percentages <- .calculate_pcent(obj, group_key, cluster_map, pcent_batch_size) @@ -117,13 +135,58 @@ PrepareCyteTypeR <- function(obj, cluster_map <- as.list(as.character(sorted_clusters)) names(cluster_map) <- as.character(1:length(sorted_clusters)) + # build artefacts for upload + build_succeeded <- FALSE + + tryCatch({ + log_info("Building vars.h5 from normalized counts (cells x features)...") + default_assay <- .resolve_seurat_assay_rna(obj) + # Seurat stores expression as features x cells; API expects cells x features (n_obs x n_vars). + expr_mat <- tryCatch( + Seurat::GetAssayData(obj, assay = default_assay, layer = "data"), + error = function(e) Seurat::GetAssayData(obj, assay = default_assay, slot = "data") + ) + mat <- Matrix::t(expr_mat) + + feature_df <- tryCatch( + as.data.frame(Seurat::GetAssay(obj, default_assay)@meta.features), + error = function(e) tryCatch( + as.data.frame(Seurat::GetAssay(obj, default_assay)@meta.data), + error = function(e2) NULL + ) + ) + feature_names <- tryCatch(rownames(obj), error = function(e) NULL) + raw_mat <- tryCatch( + tryCatch( + Seurat::GetAssayData(obj, assay = default_assay, layer = "counts"), + error = function(e) Seurat::GetAssayData(obj, assay = default_assay, slot = "counts") + ), + error = function(e) NULL + ) + .save_vars_h5(vars_h5_path, mat, raw_mat = raw_mat, feature_df = feature_df, feature_names = feature_names) + log_info("Built vars.h5 successfully.") + + log_info("Building obs.duckdb (API) from cell metadata (Seurat obj@meta.data)...") + + .save_obs_duckdb(obs_duckdb_path, obj@meta.data, + coordinates = coords, coordinates_key = coordinates_key) + log_info("Built obs.duckdb successfully.") + + build_succeeded <- TRUE + }, error = function(e) { + log_error("Error building artifacts: {conditionMessage(e)}") + }) + prepped_data <- list( clusterLabels = cluster_map, clusterMetadata = group_metadata, markerGenes = marker_genes, visualizationData = visualization_data, expressionData = expression_percentages, - group_key = group_key + group_key = group_key, + build_succeeded = build_succeeded, + vars_h5_path = vars_h5_path, + obs_duckdb_path = obs_duckdb_path ) # Store query obj@misc$query <- prepped_data @@ -152,10 +215,7 @@ PrepareCyteTypeR <- function(obj, #' @param auth_token Optional character. Bearer token for API auth. If `NULL`, uses none. Default is `NULL`. #' @param save_query Logical. Whether to save the request payload to a JSON file. Default is `TRUE`. #' @param query_filename Character. Filename for the saved query when `save_query` is `TRUE`. Default is `"query.json"`. -#' @param vars_h5_path Character. Local path for the generated vars.h5 artifact. Default is `"vars.h5"`. -#' @param obs_duckdb_path Character. Local path for the generated obs.duckdb artifact. Default is `"obs.duckdb"`. #' @param upload_timeout_seconds Integer. Socket read timeout (seconds) for each artifact upload. Default is 3600. -#' @param cleanup_artifacts Logical. If `TRUE`, delete generated artifact files after the run (success or failure). Default is `FALSE`. #' @param require_artifacts Logical. If `TRUE`, an error during artifact build or upload stops the run; if `FALSE`, failures are skipped and annotation continues without artifacts. Default is `TRUE`. #' @param show_progress Logical. Whether to show progress (spinner and cluster status). Set `FALSE` to disable. Default is `TRUE`. #' @param override_existing_results Logical. If `TRUE`, allow overwriting existing results with the same `results_prefix`. If `FALSE` and results exist, an error is raised. Default is `FALSE`. @@ -212,10 +272,7 @@ CyteTypeR <- function(obj, auth_token = NULL, save_query = TRUE, query_filename = "query.json", - vars_h5_path = "vars.h5", - obs_duckdb_path = "obs.duckdb", upload_timeout_seconds = 3600L, - cleanup_artifacts = FALSE, require_artifacts = TRUE, show_progress = TRUE, override_existing_results = FALSE @@ -271,100 +328,53 @@ CyteTypeR <- function(obj, llm_configs = llm_configs ) - artifact_paths <- c(vars_h5_path, obs_duckdb_path) - build_succeeded <- FALSE + build_succeeded <- isTRUE(prepped_data$build_succeeded) - tryCatch({ + if (!build_succeeded && require_artifacts) { + stop("Artifact build did not succeed. Set require_artifacts = FALSE to continue without artifacts.") + } + + if (build_succeeded) { tryCatch({ - log_info("Building vars.h5 from normalized counts (cells x features)...") - default_assay <- .resolve_seurat_assay_rna(obj) - # Seurat stores expression as features x cells; API expects cells x features (n_obs x n_vars). - expr_mat <- tryCatch( - Seurat::GetAssayData(obj, assay = default_assay, layer = "data"), - error = function(e) Seurat::GetAssayData(obj, assay = default_assay, slot = "data") - ) - mat <- Matrix::t(expr_mat) - - feature_df <- tryCatch( - as.data.frame(Seurat::GetAssay(obj, default_assay)@meta.features), - error = function(e) tryCatch( - as.data.frame(Seurat::GetAssay(obj, default_assay)@meta.data), - error = function(e2) NULL - ) - ) - feature_names <- tryCatch(rownames(obj), error = function(e) NULL) - .save_vars_h5(vars_h5_path, mat, feature_df = feature_df, feature_names = feature_names) - log_info("Built vars.h5 successfully.") + vars_h5_path <- prepped_data$vars_h5_path + obs_duckdb_path <- prepped_data$obs_duckdb_path + + log_info("Uploading obs.duckdb (cell metadata)...") + cell_metadata_upload <- .upload_obs_duckdb(api_url, auth_token, obs_duckdb_path, upload_timeout_seconds) + log_info("Uploaded obs.duckdb successfully.") - log_info("Building obs.duckdb (API) from cell metadata (Seurat obj@meta.data)...") - .save_obs_duckdb(obs_duckdb_path, obj@meta.data) - log_info("Built obs.duckdb successfully.") + log_info("Uploading vars.h5 (feature expression)...") + feature_expression_upload <- .upload_vars_h5(api_url, auth_token, vars_h5_path, upload_timeout_seconds) + log_info("Uploaded vars.h5 successfully.") - build_succeeded <- TRUE + query_list$uploaded_files <- list( + obs_duckdb = cell_metadata_upload$upload_id, + vars_h5 = feature_expression_upload$upload_id + ) }, error = function(e) { if (require_artifacts) { - log_error("Building artifacts failed: {conditionMessage(e)}") + log_error("Uploading artifacts failed: {conditionMessage(e)}") stop(e) } else { log_warn(paste( - "Building artifacts failed. Continuing without artifacts.", + "Uploading artifacts failed. Continuing without artifacts.", "Set `require_artifacts=TRUE` to raise this as an error.", "Original error:", conditionMessage(e) )) } }) - - if (build_succeeded) { - tryCatch({ - log_info("Uploading obs.duckdb (cell metadata)...") - cell_metadata_upload <- .upload_obs_duckdb(api_url, auth_token, obs_duckdb_path, upload_timeout_seconds) - log_info("Uploaded obs.duckdb successfully.") - - log_info("Uploading vars.h5 (feature expression)...") - feature_expression_upload <- .upload_vars_h5(api_url, auth_token, vars_h5_path, upload_timeout_seconds) - log_info("Uploaded vars.h5 successfully.") - - query_list$uploaded_files <- list( - obs_duckdb = cell_metadata_upload$upload_id, - vars_h5 = feature_expression_upload$upload_id - ) - }, error = function(e) { - if (require_artifacts) { - log_error("Uploading artifacts failed: {conditionMessage(e)}") - stop(e) - } else { - log_warn(paste( - "Uploading artifacts failed. Continuing without artifacts.", - "Set `require_artifacts=TRUE` to raise this as an error.", - "Original error:", conditionMessage(e) - )) - } - }) - } - }, error = function(e) { - stop(e) - }, finally = { - if (cleanup_artifacts && length(artifact_paths) > 0) { - on.exit({ - for (f in artifact_paths) tryCatch(file.remove(f), error = function(e) NULL) - }, add = TRUE) - } - }) - + } + query_for_json <- .prepare_query_for_json(query_list) - if (save_query){ - write_json(query_for_json, path = query_filename, auto_unbox = TRUE, pretty = TRUE) -} + if (save_query) { + write_json(query_for_json, path = query_filename, auto_unbox = TRUE, pretty = TRUE) + } job_id <- .submit_job(query_for_json, api_url, auth_token) - report_url <- file.path(api_url, 'report', job_id) - job_details <- list( - job_id = job_id, - report_url = report_url, - api_url = api_url, - auth_token = auth_token - ) + if (is.na(job_id)) { + stop("Job submission failed.") + } obj <- .store_job_details_seurat(obj, job_id, api_url, results_prefix, group_key, prepped_data$clusterLabels) @@ -399,6 +409,8 @@ CyteTypeR <- function(obj, return(obj) } + + return(obj) } #' Retrieve CyteType Analysis Results @@ -459,9 +471,26 @@ GetResults <- function(obj = NULL, job_id = NULL, results_prefix = "cytetype", a } - - - +#' Clean Up Artifacts +#' +#' @description +#' Cleans up the artifact files after the run. +#' +#' @param prepped_data Named list containing prepared data from `PrepareCyteTypeR()`. +#' @importFrom logger log_info log_warn +#' @export +CleanUpArtifacts <- function(prepped_data) { + if (isTRUE(prepped_data$build_succeeded)) { + log_info("Cleaning up artifact files...") + tryCatch(file.remove(prepped_data$vars_h5_path), error = function(e) NULL) + log_info("Removed vars.h5 file.") + tryCatch(file.remove(prepped_data$obs_duckdb_path), error = function(e) NULL) + log_info("Removed obs.duckdb file.") + log_info("Artifact files cleaned up successfully.") + } else { + log_warn("Artifact files not built. No cleanup performed.") + } +} diff --git a/R/errors.R b/R/errors.R index 0ca1e70..ed31e25 100644 --- a/R/errors.R +++ b/R/errors.R @@ -31,7 +31,7 @@ print.cytetype_api_error <- function(x, ...) { parsed <- .parse_server_error(e$resp) if (!is.null(parsed) && parsed$error_code == "RATE_LIMIT_EXCEEDED") { stop( - "Rate limit exceeded: ", parsed$message, + parsed$message, "\nUse your own LLM API key via llm_configs to bypass free-tier limits, ", "or wait before retrying.", call. = FALSE diff --git a/R/schema.R b/R/schema.R index ddc5d8d..4eea68b 100644 --- a/R/schema.R +++ b/R/schema.R @@ -117,6 +117,11 @@ LLMModelConfig <- function(provider, } } + query_list$input_data$build_succeeded <- NULL + query_list$input_data$vars_h5_path <- NULL + query_list$input_data$obs_duckdb_path <- NULL + query_list$input_data$group_key <- NULL + return(query_list) } diff --git a/R/seurat_helpers.R b/R/seurat_helpers.R index 1022f1d..79036fa 100644 --- a/R/seurat_helpers.R +++ b/R/seurat_helpers.R @@ -95,7 +95,7 @@ group_key, min_percentage = 10) { - metadata <- seurat_obj@meta.data + metadata <- droplevels(seurat_obj@meta.data) # Get unique groups and initialize result structure unique_groups <- unique(metadata[[group_key]]) unique_groups <- unique_groups[!is.na(unique_groups)] @@ -124,7 +124,7 @@ colnames(percentage_df) <- c("value", "group", "percentage") # Filter for significant values (> min_percentage) - significant_df <- percentage_df[percentage_df$percentage > min_percentage, ] + significant_df <- percentage_df[which(percentage_df$percentage > min_percentage), ] if (nrow(significant_df) > 0) { # Organize results by group @@ -210,8 +210,13 @@ if (!inherits(seurat_obj, "Seurat")){ stop("Please provide a Seurat Object") } + .validate_gene_symbols(seurat_obj,gene_symbols) + if (!(group_key %in% names(seurat_obj@meta.data))){ + stop("Please provide a valid group key that exists in the Seurat object's meta.data.") + } + if (!(coordinates_key %in% names(seurat_obj@reductions))){ log_info("Coordinates key {coordinates_key} not found in reductions.") } @@ -250,9 +255,8 @@ log_info(paste("Correct cluster labels", cli::symbol$tick)) } else{ - log_error("Please check if cluster labels are consistent between marker table and seurat obj!") + stop("Please check if cluster labels are consistent between marker table and seurat obj!") } - log_info(paste("Markers check: done", cli::symbol$tick)) } diff --git a/man/CleanUpArtifacts.Rd b/man/CleanUpArtifacts.Rd new file mode 100644 index 0000000..d4a4939 --- /dev/null +++ b/man/CleanUpArtifacts.Rd @@ -0,0 +1,14 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/cytetype.R +\name{CleanUpArtifacts} +\alias{CleanUpArtifacts} +\title{Clean Up Artifacts} +\usage{ +CleanUpArtifacts(prepped_data) +} +\arguments{ +\item{prepped_data}{Named list containing prepared data from \code{PrepareCyteTypeR()}.} +} +\description{ +Cleans up the artifact files after the run. +} diff --git a/man/CyteTypeR.Rd b/man/CyteTypeR.Rd index 0fc2861..0ced50e 100644 --- a/man/CyteTypeR.Rd +++ b/man/CyteTypeR.Rd @@ -18,10 +18,7 @@ CyteTypeR( auth_token = NULL, save_query = TRUE, query_filename = "query.json", - vars_h5_path = "vars.h5", - obs_duckdb_path = "obs.duckdb", upload_timeout_seconds = 3600L, - cleanup_artifacts = FALSE, require_artifacts = TRUE, show_progress = TRUE, override_existing_results = FALSE @@ -54,14 +51,8 @@ CyteTypeR( \item{query_filename}{Character. Filename for the saved query when \code{save_query} is \code{TRUE}. Default is \code{"query.json"}.} -\item{vars_h5_path}{Character. Local path for the generated vars.h5 artifact. Default is \code{"vars.h5"}.} - -\item{obs_duckdb_path}{Character. Local path for the generated obs.duckdb artifact. Default is \code{"obs.duckdb"}.} - \item{upload_timeout_seconds}{Integer. Socket read timeout (seconds) for each artifact upload. Default is 3600.} -\item{cleanup_artifacts}{Logical. If \code{TRUE}, delete generated artifact files after the run (success or failure). Default is \code{FALSE}.} - \item{require_artifacts}{Logical. If \code{TRUE}, an error during artifact build or upload stops the run; if \code{FALSE}, failures are skipped and annotation continues without artifacts. Default is \code{TRUE}.} \item{show_progress}{Logical. Whether to show progress (spinner and cluster status). Set \code{FALSE} to disable. Default is \code{TRUE}.} diff --git a/man/PrepareCyteTypeR.Rd b/man/PrepareCyteTypeR.Rd index 9497a54..1c56a7c 100644 --- a/man/PrepareCyteTypeR.Rd +++ b/man/PrepareCyteTypeR.Rd @@ -14,7 +14,9 @@ PrepareCyteTypeR( min_percentage = 10, pcent_batch_size = 2000, coordinates_key = "umap", - max_cells_per_group = 1000 + max_cells_per_group = 1000, + vars_h5_path = "vars.h5", + obs_duckdb_path = "obs.duckdb" ) } \arguments{ @@ -46,6 +48,12 @@ to use for visualization coordinates (e.g., "umap", "tsne"). Default is "umap".} \item{max_cells_per_group}{Integer specifying maximum cells per cluster for subsampling (currently unused). Default is 1000.} + +\item{vars_h5_path}{Character string specifying the local file path for the +generated vars.h5 artifact (feature expression). Default is \code{"vars.h5"}.} + +\item{obs_duckdb_path}{Character string specifying the local file path for the +generated obs.duckdb artifact (cell metadata). Default is \code{"obs.duckdb"}.} } \value{ Named list containing formatted data for CyteType analysis: @@ -55,6 +63,10 @@ Named list containing formatted data for CyteType analysis: \item \code{markerGenes}: List of top marker genes per cluster \item \code{visualizationData}: Coordinates and cluster assignments for plotting \item \code{expressionData}: Expression percentages by cluster +\item \code{group_key}: The metadata column name used for cluster assignments +\item \code{build_succeeded}: Logical indicating whether artifact files were built successfully +\item \code{vars_h5_path}: Path to the generated vars.h5 file +\item \code{obs_duckdb_path}: Path to the generated obs.duckdb file } } \description{ @@ -71,6 +83,8 @@ The function performs the following steps: \item Filters and selects top marker genes per cluster \item Extracts dimensional reduction coordinates for visualization \item Calculates expression percentages across clusters +\item Builds a vars.h5 artifact from the normalized expression matrix +\item Builds an obs.duckdb artifact from the cell metadata } Clusters are renumbered sequentially (1, 2, 3, ...) to ensure consistent diff --git a/tests/testthat/test-client.R b/tests/testthat/test-client.R index 80bb978..b828488 100644 --- a/tests/testthat/test-client.R +++ b/tests/testthat/test-client.R @@ -1,7 +1,7 @@ test_that(".MAX_UPLOAD_BYTES has expected limits", { lim <- CyteTypeR:::.MAX_UPLOAD_BYTES expect_identical(lim$obs_duckdb, 100L * 1024L * 1024L) - expect_equal(lim$vars_h5, 10 * 1024 * 1024 * 1024) + expect_equal(lim$vars_h5, 50 * 1024 * 1024 * 1024) expect_true(is.numeric(lim$vars_h5), info = "vars_h5 must be numeric to avoid integer overflow") }) @@ -26,3 +26,138 @@ test_that(".upload_file_chunked errors for unknown file_kind (no upload limit)", "exceeds upload limit" ) }) + +# --- .make_results_request tests --- + +test_that(".make_results_request returns 'failed' when results endpoint errors", { + call_count <- 0L + testthat::local_mocked_bindings( + .api_response_helper = function(job_id, api_url, req_item, auth_token = NULL) { + call_count <<- call_count + 1L + if (req_item == "status") { + return(list(status_code = 200L, data = list(jobStatus = "completed"))) + } + if (req_item == "results") { + stop("HTTP 409 error: job still processing") + } + }, + .package = "CyteTypeR" + ) + resp <- CyteTypeR:::.make_results_request("job1", "https://example.com") + expect_identical(resp$status, "failed") + expect_true(grepl("results unavailable", resp$message)) +}) + +test_that(".make_results_request returns 'not_found' on 404 status", { + testthat::local_mocked_bindings( + .api_response_helper = function(job_id, api_url, req_item, auth_token = NULL) { + list(status_code = 404L, data = list()) + }, + .package = "CyteTypeR" + ) + resp <- CyteTypeR:::.make_results_request("job1", "https://example.com") + expect_identical(resp$status, "not_found") +}) + +test_that(".make_results_request returns 'error' on unexpected exception", { + testthat::local_mocked_bindings( + .api_response_helper = function(job_id, api_url, req_item, auth_token = NULL) { + stop("connection refused") + }, + .package = "CyteTypeR" + ) + resp <- CyteTypeR:::.make_results_request("job1", "https://example.com") + expect_identical(resp$status, "error") + expect_true(grepl("connection refused", resp$message)) +}) + +# --- .poll_for_results tests --- + +test_that("poll stops after consecutive 'error' statuses", { + testthat::local_mocked_bindings( + .make_results_request = function(...) { + list(status = "error", result = NULL, + message = "Error checking job status: connection refused", + raw_response = NULL) + }, + .sleep_with_spinner = function(...) invisible(NULL), + .package = "CyteTypeR" + ) + expect_error( + CyteTypeR:::.poll_for_results("job1", "https://example.com", + poll_interval = 0, timeout = 600, + show_progress = FALSE), + "consecutive errors" + ) +}) + +test_that("poll stops after consecutive 'not_found' statuses", { + testthat::local_mocked_bindings( + .make_results_request = function(...) { + list(status = "not_found", result = NULL, + message = "Job not found", + raw_response = NULL) + }, + .sleep_with_spinner = function(...) invisible(NULL), + .package = "CyteTypeR" + ) + expect_error( + CyteTypeR:::.poll_for_results("job1", "https://example.com", + poll_interval = 0, timeout = 600, + show_progress = FALSE), + "not found after" + ) +}) + +test_that("poll stops immediately on truly unknown status", { + testthat::local_mocked_bindings( + .make_results_request = function(...) { + list(status = "something_new", result = NULL, + message = "unknown", + raw_response = NULL) + }, + .sleep_with_spinner = function(...) invisible(NULL), + .package = "CyteTypeR" + ) + expect_error( + CyteTypeR:::.poll_for_results("job1", "https://example.com", + poll_interval = 0, timeout = 600, + show_progress = FALSE), + "unexpected status" + ) +}) + +test_that("poll returns result on 'completed' status", { + testthat::local_mocked_bindings( + .make_results_request = function(...) { + list(status = "completed", + result = list(annotations = list()), + message = "Job completed successfully", + raw_response = list(clusterStatus = list("1" = "completed"))) + }, + .sleep_with_spinner = function(...) invisible(NULL), + .package = "CyteTypeR" + ) + result <- CyteTypeR:::.poll_for_results("job1", "https://example.com", + poll_interval = 0, timeout = 600, + show_progress = FALSE) + expect_true(is.list(result)) +}) + +test_that("poll stops on 'failed' status with server message", { + testthat::local_mocked_bindings( + .make_results_request = function(...) { + list(status = "failed", result = NULL, + message = "Job failed", + raw_response = list(clusterStatus = list())) + }, + .sleep_with_spinner = function(...) invisible(NULL), + .package = "CyteTypeR" + ) + expect_error( + CyteTypeR:::.poll_for_results("job1", "https://example.com", + poll_interval = 0, timeout = 600, + show_progress = FALSE), + "Server error.*Job failed" + ) +}) diff --git a/tests/testthat/test-cytetype-build-upload.R b/tests/testthat/test-cytetype-build-upload.R index 8819579..ec167c4 100644 --- a/tests/testthat/test-cytetype-build-upload.R +++ b/tests/testthat/test-cytetype-build-upload.R @@ -1,5 +1,5 @@ # Helper: minimal Seurat and prepped_data for build/upload path -.local_seurat_and_prepped <- function() { +.local_seurat_and_prepped <- function(build_succeeded = FALSE) { obj <- Seurat::CreateSeuratObject(Matrix::Matrix(1, 2, 2)) obj$cluster <- "1" prepped_data <- list( @@ -11,16 +11,16 @@ visualizationData = NULL, expressionData = list(), nParallelClusters = 2L, - group_key = "cluster" + group_key = "cluster", + build_succeeded = build_succeeded, + vars_h5_path = "vars.h5", + obs_duckdb_path = "obs.duckdb" ) list(obj = obj, prepped_data = prepped_data) } -test_that("build failure with require_artifacts TRUE stops with build error", { - testthat::local_mocked_bindings( - .save_vars_h5 = function(...) stop("build error") - ) - x <- .local_seurat_and_prepped() +test_that("build_succeeded FALSE with require_artifacts TRUE stops", { + x <- .local_seurat_and_prepped(build_succeeded = FALSE) expect_error( CyteTypeR::CyteTypeR( x$obj, x$prepped_data, @@ -28,33 +28,30 @@ test_that("build failure with require_artifacts TRUE stops with build error", { save_query = FALSE, require_artifacts = TRUE ), - "build error" + "Artifact build did not succeed" ) }) -test_that("build failure with require_artifacts FALSE continues and completes", { +test_that("build_succeeded FALSE with require_artifacts FALSE skips uploads and continues", { testthat::local_mocked_bindings( - .save_vars_h5 = function(...) stop("build error"), .submit_job = function(...) "job1", .poll_for_results = function(...) NULL ) - x <- .local_seurat_and_prepped() + x <- .local_seurat_and_prepped(build_succeeded = FALSE) out <- CyteTypeR::CyteTypeR( x$obj, x$prepped_data, api_url = "https://example.com", save_query = FALSE, require_artifacts = FALSE ) - expect_null(out) + expect_s4_class(out, "Seurat") }) test_that("upload failure with require_artifacts TRUE stops with upload error", { testthat::local_mocked_bindings( - .save_vars_h5 = function(...) invisible(NULL), - .save_obs_duckdb = function(...) invisible(NULL), .upload_obs_duckdb = function(...) stop("upload error") ) - x <- .local_seurat_and_prepped() + x <- .local_seurat_and_prepped(build_succeeded = TRUE) expect_error( CyteTypeR::CyteTypeR( x$obj, x$prepped_data, @@ -68,18 +65,16 @@ test_that("upload failure with require_artifacts TRUE stops with upload error", test_that("upload failure with require_artifacts FALSE continues and completes", { testthat::local_mocked_bindings( - .save_vars_h5 = function(...) invisible(NULL), - .save_obs_duckdb = function(...) invisible(NULL), .upload_obs_duckdb = function(...) stop("upload error"), .submit_job = function(...) "job1", .poll_for_results = function(...) NULL ) - x <- .local_seurat_and_prepped() + x <- .local_seurat_and_prepped(build_succeeded = TRUE) out <- CyteTypeR::CyteTypeR( x$obj, x$prepped_data, api_url = "https://example.com", save_query = FALSE, require_artifacts = FALSE ) - expect_null(out) + expect_s4_class(out, "Seurat") }) diff --git a/tests/testthat/test-save-vars-h5.R b/tests/testthat/test-save-vars-h5.R index fc4eb64..29447ec 100644 --- a/tests/testthat/test-save-vars-h5.R +++ b/tests/testthat/test-save-vars-h5.R @@ -34,7 +34,7 @@ test_that(".save_vars_h5 round-trip preserves sparse structure", { data <- rhdf5::h5read(out, "vars/data") indptr <- rhdf5::h5read(out, "vars/indptr") expect_length(indptr, 4L) - expect_identical(indptr[1], 0L) + expect_equal(as.numeric(indptr[1]), 0) expect_equal(sum(m_csc@x), sum(data)) }) @@ -149,6 +149,51 @@ test_that(".save_vars_h5 indices are in [0, n_obs)", { expect_true(all(indices < n_obs)) }) +test_that(".save_vars_h5 writes raw group in CSR format when raw_mat provided", { + skip_if_not_installed("rhdf5") + skip_if_not_installed("rhdf5filters") + mat <- Matrix::sparseMatrix( + i = c(1L, 2L, 3L), + j = c(1L, 2L, 3L), + x = c(1.0, 2.0, 3.0), + dims = c(4L, 3L) + ) + raw <- Matrix::sparseMatrix( + i = c(1L, 2L, 3L, 1L), + j = c(1L, 2L, 3L, 4L), + x = c(10, 20, 30, 5), + dims = c(3L, 4L) + ) + out <- tempfile(fileext = ".h5") + on.exit(if (file.exists(out)) unlink(out)) + CyteTypeR:::.save_vars_h5(out, mat, raw_mat = raw) + + raw_attrs <- rhdf5::h5readAttributes(out, "raw") + expect_equal(as.integer(raw_attrs[["n_obs"]]), 4L) + expect_equal(as.integer(raw_attrs[["n_vars"]]), 3L) + + raw_indptr <- rhdf5::h5read(out, "raw/indptr") + expect_length(raw_indptr, 4L + 1L) + expect_equal(as.numeric(raw_indptr[1]), 0) + + raw_indices <- rhdf5::h5read(out, "raw/indices") + expect_true(all(raw_indices >= 0L)) + raw_data <- rhdf5::h5read(out, "raw/data") + expect_equal(length(raw_indices), length(raw_data)) + expect_true(is.integer(raw_data)) +}) + +test_that(".save_vars_h5 omits raw group when raw_mat is NULL", { + skip_if_not_installed("rhdf5") + skip_if_not_installed("rhdf5filters") + mat <- Matrix::sparseMatrix(i = 1L, j = 1L, x = 1.0, dims = c(2L, 2L)) + out <- tempfile(fileext = ".h5") + on.exit(if (file.exists(out)) unlink(out)) + CyteTypeR:::.save_vars_h5(out, mat) + contents <- rhdf5::h5ls(out) + expect_false("raw" %in% contents$name) +}) + test_that(".save_vars_h5 fails gracefully when rhdf5filters is missing", { skip_if_not_installed("rhdf5") skip("Cannot mock base::requireNamespace; rhdf5filters guard is validated manually")