11 Viash + Nextflow: A use-case
In this section, we will explore the use of Viash. Viash is a code generation tool that allows you to augment your scripts and Nextflow scripts with code generation.
Use case components:
11.1 Bash
Path: src/load_data
name: load_data
description: Load data from an S3 bucket
arguments:
- type: string
name: --url
description: URL to the data
example: s3://my-bucket/my-data.csv
required: true
- type: file
name: --output
description: Path to the output file
example: /path/to/output.csv
required: true
direction: output
resources:
- type: bash_script
path: script.sh
test_resources:
- type: bash_script
path: test.sh
engines:
- type: docker
image: amazon/aws-cli
runners:
- type: executable
- type: nextflow
#!/bin/bash
aws s3 cp \
--no-sign-request \
"$par_url" \
"$par_output"
#!/bin/bash
# Run the executable
"$meta_executable" \
--url s3://openproblems-bio/public/neurips-2023-competition/moa_annotations.csv \
--output moa_annotations.csv
# Check if the output file exists
if [[ ! -f moa_annotations.csv ]]; then
echo "File not found!"
exit 1
fi
# Check if the output file has the correct MD5 sum
if [[ "$(md5sum moa_annotations.csv | cut -d ' ' -f 1)" != "80ebe44ce6b8d73f31dbc653787089f9" ]]; then
echo "MD5 sum does not match!"
exit 1
fi
echo "All tests passed!"
11.2 Python
Path: src/subset_obs
name: subset_obs
description: Subset the observations of an AnnData object
argument_groups:
- name: Inputs
arguments:
- type: file
name: --input
description: Path to the input h5ad file
example: /path/to/input.h5ad
required: true
- name: Subsetting arguments
arguments:
- type: string
name: --obs_column
description: Name of the column to subset on
example: cell_type
required: true
- type: string
name: --obs_values
description: List of values to subset on. If column is a boolean, do not pass any values to this argument.
multiple: true
example: ["B cell", "T cell"]
- type: boolean_true
name: --invert
description: Invert the subset
- name: Outputs
arguments:
- type: file
name: --output
description: Path to the output h5ad file
example: /path/to/output.h5ad
required: true
direction: output
resources:
- type: python_script
path: script.py
test_resources:
- type: python_script
path: test.py
engines:
- type: docker
image: python:3.10
setup:
- type: python
pypi:
- anndata
test_setup:
- type: python
pypi:
- viashpy
runners:
- type: executable
- type: nextflow
import anndata as ad
## VIASH START
= {"input": "", "obs_column": "", "obs_values": [], "invert": False, "output": ""}
par ## VIASH END
print("Load data", flush=True)
= ad.read_h5ad(par["input"])
adata
print(f"Format of input data: {adata}", flush=True)
print("Subset data", flush=True)
= adata.obs[par["obs_column"]]
filt
# if filt is a list of booleans
assert (filt.dtype == bool) == (not par["obs_values"]), \
f"If column '{par['obs_column']}' is boolean, 'obs_values' must be empty, and vice versa."
if filt.dtype != bool:
# if filt is a list of strings
= filt.isin(par["obs_values"])
filt
if par["invert"]:
= ~filt
filt
= adata[filt].copy()
adata
print(f"Format of output data: {adata}", flush=True)
print("Store to disk", flush=True)
"output"], compression="gzip") adata.write_h5ad(par[
import sys
import anndata as ad
import pytest
import numpy as np
def test_subset_var(run_component, tmp_path):
= tmp_path / "input.h5ad"
input_path = tmp_path / "output.h5ad"
output_path
# create data
= ad.AnnData(
adata_in =np.random.rand(4, 2),
X={"cell_type": ["A", "B", "C", "D"]},
obs={"highly_variable": [True, False]},
var
)
adata_in.write_h5ad(input_path)
# run component
run_component(["--input", str(input_path),
"--obs_column", "cell_type",
"--obs_values", "A;B",
"--output", str(output_path),
])
# load output
= ad.read_h5ad(output_path)
adata_out
# check output
assert adata_out.X.shape == (2, 2)
assert adata_out.obs["cell_type"].tolist() == ["A", "B"]
assert adata_out.var["highly_variable"].tolist() == [True, False]
if __name__ == "__main__":
__file__])) sys.exit(pytest.main([
11.3 R
Path: src/differential_expression
name: differential_expression
description: Compute differential expression between two observation types
argument_groups:
- name: Inputs
arguments:
- type: file
name: --input
description: Path to the input h5ad file
example: /path/to/input.h5ad
required: true
- name: Differential expression arguments
arguments:
- type: string
name: --contrast
description: |
Contrast to compute. Must be of length 3:
1. The name of the column to contrast on
2. The name of the first observation type
3. The name of the second observation type example: ["cell_type", "batch", "sample"]
multiple: true
required: true
- type: string
name: --design_formula
description: Design formula for the differential expression model
example: ~ batch + cell_type
- name: Outputs
arguments:
- type: file
name: --output
description: Path to the output h5ad file
example: /path/to/output.h5ad
required: true
direction: output
resources:
- type: r_script
path: script.R
test_resources:
- type: r_script
path: test.R
engines:
- type: docker
image: rocker/r2u:22.04
setup:
- type: apt
packages:
- python3
- python3-pip
- python3-dev
- python-is-python3
- type: python
pypi:
- anndata
- type: r
cran:
- anndata
- processx
bioc:
- DESeq2
runners:
- type: executable
- type: nextflow
library(anndata)
requireNamespace("DESeq2", quietly = TRUE)
## VIASH START
<- list(input = "", contrast = c(), design_formula = "", output = "")
par ## VIASH END
cat("Reading data\n")
<- read_h5ad(par$input)
adata
cat("Parse formula\n")
<- as.formula(par$design_formula)
formula
cat("Create DESeq dataset\n")
# transform counts matrix
<- t(as.matrix(adata$X))
count_data storage.mode(count_data) <- "integer"
# create dataset
<- DESeq2::DESeqDataSetFromMatrix(
dds countData = count_data,
colData = adata$obs,
design = formula
)
cat("Run DESeq2\n")
<- DESeq2::DESeq(dds)
dds
<- DESeq2::results(dds, contrast = par$contrast) |>
res as.data.frame()
cat("Write to disk\n")
<- gsub(" ", "_", par$contrast)
contrast_names <- gsub("[^[:alnum:]]", "_", contrast_names)
contrast_names <- gsub("__", "_", contrast_names)
contrast_names <- tolower(contrast_names)
contrast_names
<- paste0("de_", paste(contrast_names, collapse = "_"))
varm_name $varm[[varm_name]] <- res
adata
# Save adata
<- adata$write_h5ad(par$output, compression = "gzip") zzz
library(anndata)
cat("Create input data\n")
<- matrix(runif(100, 10, 100), nrow = 10, ncol = 10)
X for (i in 1:10) {
1:5, i] <- X[1:5, i] + i * 10
X[
}<- AnnData(
adata_in X = X,
obs = data.frame(
row.names = paste0("cell", 1:10),
sm_name = rep(c("Belinostat", "Dimethyl Sulfoxide"), each = 5),
plate_name = rep(c("plate1", "plate2"), times = 5)
),var = data.frame(
row.names = paste0("gene", 1:10)
)
)
cat("Write input data to file\n")
<- "input.h5ad"
input_path <- "output.h5ad"
output_path <- adata_in$write_h5ad(input_path)
zzz
cat("Run component\n")
<- processx::run(
zzz command = meta$executable,
args = c(
"--input", input_path,
"--contrast", "sm_name;Dimethyl Sulfoxide;Belinostat",
"--design_formula", "~ sm_name + plate_name",
"--output", output_path
),error_on_status = TRUE,
echo = TRUE
)
cat("Read output data\n")
<- read_h5ad(output_path)
adata_out
cat("Preview output data\n")
print(adata_out)
cat("Check DE results:\n")
<- adata_out$varm$de_sm_name_dimethyl_sulfoxide_belinostat
de_out if (is.null(de_out)) {
stop("No DE results found")
}
print(de_out)
<- c("baseMean", "log2FoldChange", "lfcSE", "stat", "pvalue", "padj")
expected_colnames if (!all(colnames(de_out) == expected_colnames)) {
stop(paste0(
"Column names do not match.\n",
"Expected: ", paste(expected_colnames, collapse = ", "), "\n",
"Actual: ", paste(colnames(de_out), collapse = ", ")
))
}
cat("Done\n")
11.4 Nextflow
Path: src/workflow
name: workflow
description: |
A workflow to compute differential expression between two groups of cells in an AnnData object.
argument_groups:
- name: Inputs
arguments:
- type: file
name: --input
description: Path to the input h5ad file
example: s3://my-bucket/my-data.h5ad
required: true
- name: Differential expression arguments
arguments:
- type: string
name: --contrast
description: |
Contrast to compute. Must be of length 3:
1. The name of the column to contrast on
2. The name of the first observation type
3. The name of the second observation type example: ["cell_type", "batch", "sample"]
multiple: true
required: true
- type: string
name: --design_formula
description: Design formula for the differential expression model
example: ~ batch + cell_type
- name: Outputs
arguments:
- type: file
name: --output
description: Path to the output h5ad file
example: /path/to/output.h5ad
required: true
direction: output
resources:
- type: nextflow_script
path: main.nf
entrypoint: wf
dependencies:
- name: subset_obs
- name: subset_var
- name: compute_pseudobulk
- name: differential_expression
runners:
- type: nextflow
{
workflow wf :
take
ch_in
:
main= ch_in
ch_out
| subset_obs.run(
: "subset_sm_name",
key: ["input": "input"],
fromState: [
args"obs_column": "sm_name",
"obs_values": ["Belinostat", "Dimethyl Sulfoxide"]
],
: ["data": "output"]
toState)
| subset_obs.run(
: "subset_cell_type",
key: ["input": "data"],
fromState: [
args"obs_column": "cell_type",
"obs_values": ["T cells"]
],
: ["data": "output"]
toState)
| subset_var.run(
: ["input": "data"],
fromState: [
args"var_column": "highly_variable",
],
: ["data": "output"]
toState)
| compute_pseudobulk.run(
: ["input": "data"],
fromState: [
args"obs_column_index": "plate_well_celltype_reannotated",
"obs_column_values": ["sm_name", "cell_type", "plate_name", "well"],
],
: ["data": "output"]
toState)
| differential_expression.run(
: [
fromState"input": "data",
"contrast": "contrast",
"design_formula": "design_formula"
],
: ["data": "output"]
toState)
| setState(["output": "data"])
:
emit
ch_out}
11.5 Running the workflow
To run the workflow, you must first build the project:
viash ns build --parallel --setup cachedbuild
Exporting load_data =executable=> target/executable/load_data
[notice] Building container 'polygloty_usecase/load_data:0.1.0' with Dockerfile
Exporting load_data =nextflow=> target/nextflow/load_data
Exporting compute_pseudobulk =executable=> target/executable/compute_pseudobulk
[notice] Building container 'polygloty_usecase/compute_pseudobulk:0.1.0' with Dockerfile
Exporting compute_pseudobulk =nextflow=> target/nextflow/compute_pseudobulk
Exporting subset_obs =executable=> target/executable/subset_obs
[notice] Building container 'polygloty_usecase/subset_obs:0.1.0' with Dockerfile
Exporting subset_obs =nextflow=> target/nextflow/subset_obs
Exporting subset_var =executable=> target/executable/subset_var
[notice] Building container 'polygloty_usecase/subset_var:0.1.0' with Dockerfile
Exporting subset_var =nextflow=> target/nextflow/subset_var
Exporting differential_expression =executable=> target/executable/differential_expression
[notice] Building container 'polygloty_usecase/differential_expression:0.1.0' with Dockerfile
Exporting differential_expression =nextflow=> target/nextflow/differential_expression
Exporting workflow =nextflow=> target/nextflow/workflow
All 11 configs built successfully
Then, you can run the workflow:
nextflow run \
\
target/nextflow/workflow/main.nf -with-docker \
--id dataset \
--input s3://openproblems-bio/public/neurips-2023-competition/sc_counts_reannotated_with_counts.h5ad \
--contrast 'sm_name;Belinostat;Dimethyl Sulfoxide' \
--design_formula '~ sm_name + plate_name' \
--publish_dir output
N E X T F L O W ~ version 23.10.0
Launching `target/nextflow/workflow/main.nf` [condescending_engelbart] DSL2 - revision: f54b192abd
executor > local (6)
[e2/da368b] process > workflow:wf:subset_sm_name:processWf:subset_sm_name_process (dataset) [100%] 1 of 1 ✔
[d5/fea947] process > workflow:wf:subset_cell_type:processWf:subset_cell_type_process (dataset) [100%] 1 of 1 ✔
[23/a2b0a7] process > workflow:wf:subset_var:processWf:subset_var_process (dataset) [100%] 1 of 1 ✔
[55/a59f07] process > workflow:wf:compute_pseudobulk:processWf:compute_pseudobulk_process (dataset) [100%] 1 of 1 ✔
[10/5c0650] process > workflow:wf:differential_expression:processWf:differential_expression_process (dataset) [100%] 1 of 1 ✔
[91/5f7431] process > workflow:publishStatesSimpleWf:publishStatesProc (dataset) [100%] 1 of 1 ✔
Completed at: 07-Sep-2024 21:33:16
Duration : 8m 59s
CPU hours : (a few seconds)
Succeeded : 6
The workflow will process the dataset, subset the data, compute pseudobulk samples, and perform differential expression analysis. The results will be saved in the output
directory.
11.6 Workflow Output
tree output
output/
├── dataset.workflow.output.h5ad
└── dataset.workflow.state.yaml
1 directory, 2 files
The resulting pseudobulk samples and differential expression analysis results are stored in the dataset.workflow.output.h5ad
file.
import anndata as ad
= ad.read("output/dataset.workflow.output.h5ad")
adata
adata
AnnData object with n_obs × n_vars = 96 × 2000
obs: 'sm_name', 'cell_type', 'plate_name', 'well'
var: 'highly_variable', 'means', 'dispersions', 'dispersions_norm'
varm: 'de_sm_name_belinostat_dimethyl_sulfoxide'
"de_sm_name_belinostat_dimethyl_sulfoxide"] adata.varm[
baseMean log2FoldChange lfcSE stat pvalue padj
A2M 12.320857 -0.696714 0.113490 -6.138993 8.304620e-10 2.699001e-09
A2M-AS1 31.563323 -0.652155 0.074559 -8.746780 2.195265e-18 1.065788e-17
A2MP1 1.712214 -0.844991 0.262029 -3.224806 1.260582e-03 2.551988e-03
AARD 0.125909 0.086614 1.234496 0.070161 9.440653e-01 NaN
ABCA1 2.003082 -0.154770 0.240886 -0.642504 5.205457e-01 6.193785e-01
... ... ... ... ... ... ...
ZNF860 0.024027 0.214981 2.915260 0.073743 9.412147e-01 NaN
ZNF876P 0.568178 -0.703739 0.422580 -1.665340 9.584497e-02 1.486826e-01
ZNF891 28.045500 0.515844 0.066416 7.766889 8.043700e-15 3.338454e-14
ZNF92 46.860620 0.076892 0.048781 1.576261 1.149657e-01 1.743887e-01
ZNF99 0.000000 NaN NaN NaN NaN NaN
[2000 rows x 6 columns]