11  Viash + Nextflow: A use-case

In this section, we will explore the use of Viash. Viash is a code generation tool that allows you to augment your scripts and Nextflow scripts with code generation.

Use case components:

11.1 Bash

Path: src/load_data

name: load_data
description: Load data from an S3 bucket

arguments:
  - type: string
    name: --url
    description: URL to the data
    example: s3://my-bucket/my-data.csv
    required: true
  - type: file
    name: --output
    description: Path to the output file
    example: /path/to/output.csv
    required: true
    direction: output

resources:
  - type: bash_script
    path: script.sh

test_resources:
  - type: bash_script
    path: test.sh

engines:
  - type: docker
    image: amazon/aws-cli

runners:
  - type: executable
  - type: nextflow
#!/bin/bash

aws s3 cp \
  --no-sign-request \
  "$par_url" \
  "$par_output"
#!/bin/bash

# Run the executable
"$meta_executable" \
  --url s3://openproblems-bio/public/neurips-2023-competition/moa_annotations.csv \
  --output moa_annotations.csv

# Check if the output file exists
if [[ ! -f moa_annotations.csv ]]; then
  echo "File not found!"
  exit 1
fi

# Check if the output file has the correct MD5 sum
if [[ "$(md5sum moa_annotations.csv | cut -d ' ' -f 1)" != "80ebe44ce6b8d73f31dbc653787089f9" ]]; then
  echo "MD5 sum does not match!"
  exit 1
fi

echo "All tests passed!"

11.2 Python

Path: src/subset_obs

name: subset_obs
description: Subset the observations of an AnnData object

argument_groups:
  - name: Inputs
    arguments:
      - type: file
        name: --input
        description: Path to the input h5ad file
        example: /path/to/input.h5ad
        required: true
  - name: Subsetting arguments
    arguments:
      - type: string
        name: --obs_column
        description: Name of the column to subset on
        example: cell_type
        required: true
      - type: string
        name: --obs_values
        description: List of values to subset on. If column is a boolean, do not pass any values to this argument.
        multiple: true
        example: ["B cell", "T cell"]
      - type: boolean_true
        name: --invert
        description: Invert the subset
  - name: Outputs
    arguments:
      - type: file
        name: --output
        description: Path to the output h5ad file
        example: /path/to/output.h5ad
        required: true
        direction: output

resources:
  - type: python_script
    path: script.py

test_resources:
  - type: python_script
    path: test.py

engines:
  - type: docker
    image: python:3.10
    setup:
      - type: python
        pypi:
          - anndata
    test_setup:
      - type: python
        pypi:
          - viashpy

runners:
  - type: executable
  - type: nextflow
import anndata as ad

## VIASH START
par = {"input": "", "obs_column": "", "obs_values": [], "invert": False, "output": ""}
## VIASH END

print("Load data", flush=True)
adata = ad.read_h5ad(par["input"])

print(f"Format of input data: {adata}", flush=True)

print("Subset data", flush=True)
filt = adata.obs[par["obs_column"]]

# if filt is a list of booleans
assert (filt.dtype == bool) == (not par["obs_values"]), \
  f"If column '{par['obs_column']}' is boolean, 'obs_values' must be empty, and vice versa."

if filt.dtype != bool:
  # if filt is a list of strings
  filt = filt.isin(par["obs_values"])

if par["invert"]:
  filt = ~filt

adata = adata[filt].copy()

print(f"Format of output data: {adata}", flush=True)

print("Store to disk", flush=True)
adata.write_h5ad(par["output"], compression="gzip")
import sys
import anndata as ad
import pytest
import numpy as np

def test_subset_var(run_component, tmp_path):
  input_path = tmp_path / "input.h5ad"
  output_path = tmp_path / "output.h5ad"

  # create data
  adata_in = ad.AnnData(
    X=np.random.rand(4, 2),
    obs={"cell_type": ["A", "B", "C", "D"]},
    var={"highly_variable": [True, False]},
  )

  adata_in.write_h5ad(input_path)

  # run component
  run_component([
    "--input", str(input_path),
    "--obs_column", "cell_type",
    "--obs_values", "A;B",
    "--output", str(output_path),
  ])

  # load output
  adata_out = ad.read_h5ad(output_path)

  # check output
  assert adata_out.X.shape == (2, 2)
  assert adata_out.obs["cell_type"].tolist() == ["A", "B"]
  assert adata_out.var["highly_variable"].tolist() == [True, False]


if __name__ == "__main__":
  sys.exit(pytest.main([__file__]))

11.3 R

Path: src/differential_expression

name: differential_expression
description: Compute differential expression between two observation types

argument_groups:
  - name: Inputs
    arguments:
      - type: file
        name: --input
        description: Path to the input h5ad file
        example: /path/to/input.h5ad
        required: true
  - name: Differential expression arguments
    arguments:
      - type: string
        name: --contrast
        description: |
          Contrast to compute. Must be of length 3:

          1. The name of the column to contrast on
          2. The name of the first observation type
          3. The name of the second observation type
        example: ["cell_type", "batch", "sample"]
        multiple: true
        required: true
      - type: string
        name: --design_formula
        description: Design formula for the differential expression model
        example: ~ batch + cell_type
  - name: Outputs
    arguments:
      - type: file
        name: --output
        description: Path to the output h5ad file
        example: /path/to/output.h5ad
        required: true
        direction: output

resources:
  - type: r_script
    path: script.R

test_resources:
  - type: r_script
    path: test.R

engines:
  - type: docker
    image: rocker/r2u:22.04
    setup:
      - type: apt
        packages:
          - python3
          - python3-pip
          - python3-dev
          - python-is-python3
      - type: python
        pypi:
          - anndata
      - type: r
        cran:
          - anndata
          - processx
        bioc:
          - DESeq2

runners:
  - type: executable
  - type: nextflow
library(anndata)
requireNamespace("DESeq2", quietly = TRUE)

## VIASH START
par <- list(input = "", contrast = c(), design_formula = "", output = "")
## VIASH END

cat("Reading data\n")
adata <- read_h5ad(par$input)

cat("Parse formula\n")
formula <- as.formula(par$design_formula)

cat("Create DESeq dataset\n")
# transform counts matrix
count_data <- t(as.matrix(adata$X))
storage.mode(count_data) <- "integer"

# create dataset
dds <- DESeq2::DESeqDataSetFromMatrix(
  countData = count_data,
  colData = adata$obs,
  design = formula
)

cat("Run DESeq2\n")
dds <- DESeq2::DESeq(dds)

res <- DESeq2::results(dds, contrast = par$contrast) |>
  as.data.frame()

cat("Write to disk\n")
contrast_names <- gsub(" ", "_", par$contrast)
contrast_names <- gsub("[^[:alnum:]]", "_", contrast_names)
contrast_names <- gsub("__", "_", contrast_names)
contrast_names <- tolower(contrast_names)

varm_name <- paste0("de_", paste(contrast_names, collapse = "_"))
adata$varm[[varm_name]] <- res

# Save adata
zzz <- adata$write_h5ad(par$output, compression = "gzip")
library(anndata)

cat("Create input data\n")
X <- matrix(runif(100, 10, 100), nrow = 10, ncol = 10)
for (i in 1:10) {
  X[1:5, i] <- X[1:5, i] + i * 10
}
adata_in <- AnnData(
  X = X,
  obs = data.frame(
    row.names = paste0("cell", 1:10),
    sm_name = rep(c("Belinostat", "Dimethyl Sulfoxide"), each = 5),
    plate_name = rep(c("plate1", "plate2"), times = 5)
  ),
  var = data.frame(
    row.names = paste0("gene", 1:10)
  )
)

cat("Write input data to file\n")
input_path <- "input.h5ad"
output_path <- "output.h5ad"
zzz <- adata_in$write_h5ad(input_path)

cat("Run component\n")
zzz <- processx::run(
  command = meta$executable,
  args = c(
    "--input", input_path,
    "--contrast", "sm_name;Dimethyl Sulfoxide;Belinostat",
    "--design_formula", "~ sm_name + plate_name",
    "--output", output_path
  ),
  error_on_status = TRUE,
  echo = TRUE
)

cat("Read output data\n")
adata_out <- read_h5ad(output_path)

cat("Preview output data\n")
print(adata_out)

cat("Check DE results:\n")
de_out <- adata_out$varm$de_sm_name_dimethyl_sulfoxide_belinostat
if (is.null(de_out)) {
  stop("No DE results found")
}

print(de_out)

expected_colnames <- c("baseMean", "log2FoldChange", "lfcSE", "stat", "pvalue", "padj")
if (!all(colnames(de_out) == expected_colnames)) {
  stop(paste0(
    "Column names do not match.\n",
    "Expected: ", paste(expected_colnames, collapse = ", "), "\n",
    "Actual: ", paste(colnames(de_out), collapse = ", ")
  ))
}

cat("Done\n")

11.4 Nextflow

Path: src/workflow

name: workflow
description: |
  A workflow to compute differential expression between two groups of cells in an AnnData object.

argument_groups:
  - name: Inputs
    arguments:
      - type: file
        name: --input
        description: Path to the input h5ad file
        example: s3://my-bucket/my-data.h5ad
        required: true
  - name: Differential expression arguments
    arguments:
      - type: string
        name: --contrast
        description: |
          Contrast to compute. Must be of length 3:

          1. The name of the column to contrast on
          2. The name of the first observation type
          3. The name of the second observation type
        example: ["cell_type", "batch", "sample"]
        multiple: true
        required: true
      - type: string
        name: --design_formula
        description: Design formula for the differential expression model
        example: ~ batch + cell_type
  - name: Outputs
    arguments:
      - type: file
        name: --output
        description: Path to the output h5ad file
        example: /path/to/output.h5ad
        required: true
        direction: output

resources:
  - type: nextflow_script
    path: main.nf
    entrypoint: wf

dependencies:
  - name: subset_obs
  - name: subset_var
  - name: compute_pseudobulk
  - name: differential_expression

runners:
  - type: nextflow
workflow wf {
  take:
  ch_in

  main:
  ch_out = ch_in

    | subset_obs.run(
      key: "subset_sm_name",
      fromState: ["input": "input"],
      args: [
        "obs_column": "sm_name",
        "obs_values": ["Belinostat", "Dimethyl Sulfoxide"]
      ],
      toState: ["data": "output"]
    )

    | subset_obs.run(
      key: "subset_cell_type",
      fromState: ["input": "data"],
      args: [
        "obs_column": "cell_type",
        "obs_values": ["T cells"]
      ],
      toState: ["data": "output"]
    )

    | subset_var.run(
      fromState: ["input": "data"],
      args: [
        "var_column": "highly_variable",
      ],
      toState: ["data": "output"]
    )

    | compute_pseudobulk.run(
      fromState: ["input": "data"],
      args: [
        "obs_column_index": "plate_well_celltype_reannotated",
        "obs_column_values": ["sm_name", "cell_type", "plate_name", "well"],
      ],
      toState: ["data": "output"]
    )

    | differential_expression.run(
      fromState: [
        "input": "data",
        "contrast": "contrast",
        "design_formula": "design_formula"
      ],
      toState: ["data": "output"]
    )

    | setState(["output": "data"])
    
  emit:
  ch_out
}

11.5 Running the workflow

To run the workflow, you must first build the project:

viash ns build --parallel --setup cachedbuild
Exporting load_data =executable=> target/executable/load_data
[notice] Building container 'polygloty_usecase/load_data:0.1.0' with Dockerfile
Exporting load_data =nextflow=> target/nextflow/load_data
Exporting compute_pseudobulk =executable=> target/executable/compute_pseudobulk
[notice] Building container 'polygloty_usecase/compute_pseudobulk:0.1.0' with Dockerfile
Exporting compute_pseudobulk =nextflow=> target/nextflow/compute_pseudobulk
Exporting subset_obs =executable=> target/executable/subset_obs
[notice] Building container 'polygloty_usecase/subset_obs:0.1.0' with Dockerfile
Exporting subset_obs =nextflow=> target/nextflow/subset_obs
Exporting subset_var =executable=> target/executable/subset_var
[notice] Building container 'polygloty_usecase/subset_var:0.1.0' with Dockerfile
Exporting subset_var =nextflow=> target/nextflow/subset_var
Exporting differential_expression =executable=> target/executable/differential_expression
[notice] Building container 'polygloty_usecase/differential_expression:0.1.0' with Dockerfile
Exporting differential_expression =nextflow=> target/nextflow/differential_expression
Exporting workflow =nextflow=> target/nextflow/workflow
All 11 configs built successfully

Then, you can run the workflow:

nextflow run \
  target/nextflow/workflow/main.nf \
  -with-docker \
  --id dataset \
  --input s3://openproblems-bio/public/neurips-2023-competition/sc_counts_reannotated_with_counts.h5ad \
  --contrast 'sm_name;Belinostat;Dimethyl Sulfoxide' \
  --design_formula '~ sm_name + plate_name' \
  --publish_dir output
N E X T F L O W  ~  version 23.10.0
Launching `target/nextflow/workflow/main.nf` [condescending_engelbart] DSL2 - revision: f54b192abd
executor >  local (6)
[e2/da368b] process > workflow:wf:subset_sm_name:processWf:subset_sm_name_process (dataset)                   [100%] 1 of 1 ✔
[d5/fea947] process > workflow:wf:subset_cell_type:processWf:subset_cell_type_process (dataset)               [100%] 1 of 1 ✔
[23/a2b0a7] process > workflow:wf:subset_var:processWf:subset_var_process (dataset)                           [100%] 1 of 1 ✔
[55/a59f07] process > workflow:wf:compute_pseudobulk:processWf:compute_pseudobulk_process (dataset)           [100%] 1 of 1 ✔
[10/5c0650] process > workflow:wf:differential_expression:processWf:differential_expression_process (dataset) [100%] 1 of 1 ✔
[91/5f7431] process > workflow:publishStatesSimpleWf:publishStatesProc (dataset)                              [100%] 1 of 1 ✔
Completed at: 07-Sep-2024 21:33:16
Duration    : 8m 59s
CPU hours   : (a few seconds)
Succeeded   : 6

The workflow will process the dataset, subset the data, compute pseudobulk samples, and perform differential expression analysis. The results will be saved in the output directory.

11.6 Workflow Output

tree output
output/
├── dataset.workflow.output.h5ad
└── dataset.workflow.state.yaml

1 directory, 2 files

The resulting pseudobulk samples and differential expression analysis results are stored in the dataset.workflow.output.h5ad file.

import anndata as ad

adata = ad.read("output/dataset.workflow.output.h5ad")

adata
AnnData object with n_obs × n_vars = 96 × 2000
    obs: 'sm_name', 'cell_type', 'plate_name', 'well'
    var: 'highly_variable', 'means', 'dispersions', 'dispersions_norm'
    varm: 'de_sm_name_belinostat_dimethyl_sulfoxide'
adata.varm["de_sm_name_belinostat_dimethyl_sulfoxide"]
          baseMean  log2FoldChange     lfcSE      stat        pvalue          padj
A2M      12.320857       -0.696714  0.113490 -6.138993  8.304620e-10  2.699001e-09
A2M-AS1  31.563323       -0.652155  0.074559 -8.746780  2.195265e-18  1.065788e-17
A2MP1     1.712214       -0.844991  0.262029 -3.224806  1.260582e-03  2.551988e-03
AARD      0.125909        0.086614  1.234496  0.070161  9.440653e-01           NaN
ABCA1     2.003082       -0.154770  0.240886 -0.642504  5.205457e-01  6.193785e-01
...            ...             ...       ...       ...           ...           ...
ZNF860    0.024027        0.214981  2.915260  0.073743  9.412147e-01           NaN
ZNF876P   0.568178       -0.703739  0.422580 -1.665340  9.584497e-02  1.486826e-01
ZNF891   28.045500        0.515844  0.066416  7.766889  8.043700e-15  3.338454e-14
ZNF92    46.860620        0.076892  0.048781  1.576261  1.149657e-01  1.743887e-01
ZNF99     0.000000             NaN       NaN       NaN           NaN           NaN

[2000 rows x 6 columns]