Pipelines: Nextflow
Nextflow is a workflow orchestration engine and domain-specific language (DSL) that makes it easy to write data-intensive computational workflows. Nextflow defines complex program interactions and a high-level parallel computational environment, based on the dataflow programming model. Nextflow’s core features are:
Workflow portability and reproducibility
Scalability of parallelization and deployment
Integration of existing tools, systems, and industry standards
In practice, a Nextflow workflow is made by joining together different processes. Each process can be written in any scripting language that can be executed by the Linux platform (Bash, Perl, Ruby, Python, etc.).
Processes are executed independently and are isolated from each other, i.e., they do not share a common (writable) state. The only way they can communicate is via asynchronous first-in, first-out (FIFO) queues, called channels. In other words, every input and output of a process is represented as a channel. The interaction between these processes, and ultimately the workflow execution flow itself, is implicitly defined by these input and output declarations
While not every pipeline requires large, scalable compute environments, most omics analysis typically involves the processing of hundreds of input files, each several gigabytes in size. During the pipeline execution, the intermediate files required for the analysis can expand up to 30x the size of the input data, generating terabytes of data. For multi-omics analysis at scale HPC environments are widely used in multi-omics analysis, the sheer scale of compute and data requirements often makes the HPC an attractive alternative. Easy access to the HPC provides many advantages for bioinformaticians.
This section explains the very basic concepts for using nextflow in Garnatxa. For a more complete understanding, it is recommended to visit the official nextflow documentation and its training guides: https://training.nextflow.io/
To explain how to use nextflow in garnatxa we will use the same sample examples used in previous sections: BWA indexing and aligning.
Basic concepts about Nextflow
Nextflow allows you to implement complex work pipelines in which one process follows another in a certain dependency order. Basically Nextflow defines the logic of each processes together the data input and the data output. It may happen that the output of a process is required to compute the secondary process, so Nextflow does not start the secondary process until it has obtained the required inputs from the preceding process.
Nextflow can be run on your computer, but for complex pipelines and large datasets it is necessary to use HPC clusters such as Garnatxa. Nextflow fully integrates with resource management systems such as SLURM. The processes will be launched into the Garnatxa queuing system. The basic operation consists of launching a master nextflow process on the queuing system. This master process will be in charge of executing the following subprocesses on the cluster and controlling the dependencies between them until the workflow defined by the user is completely finished.
The best way to understand how it works is to put it into practice with an example already commented. In our example the pipeline (workflow) is composed by two processes. The first is responsible for indexing the genome of a reference using the BWA tool. The second process is responsible for aligning a set of short read samples based on the index of the reference gene obtained in the first process.
- We will use three files from the folder:
/doc/test/
NextflowJob.config: this file defines global or specific profiles and parameters that will be applied to processes.
NextflowJob.nf: the file implements the logic of the workflow: inputs, outputs, processes, dependencies, etc.
NextflowLauncher.sbatch : the sbatch file used to submit the pipeline to the queue system.
Configuration file: NextflowJob.config
This file defines some common parameters that will be used in the pipelines. So we don’t need to rewrite every time all the requirements for each process. We can define here the resources that will take each step of the pipeline (processes). Please note that this file does not define the logic of each step, only how they are launched into the queue system.
1executor {
2 queueSize = 200
3}
4
5process {
6 executor = 'slurm'
7 queue = 'global'
8 maxRetries = 5
9 errorStrategy = 'retry'
10}
11
12process {
13 withLabel: INDEX_CONFIG {
14 memory = { 1.GB * task.attempt }
15 time = { 3.minute * task.attempt }
16 cpus = 2
17 clusterOptions = '--qos=short'
18 }
19
20 withLabel: ALIGN_CONFIG {
21 memory = { 5.GB * task.attempt }
22 time = { 1.day * task.attempt }
23 cpus = 2
24 clusterOptions = '--qos=long'
25 errorStrategy = { task.exitStatus >= 1 ? 'retry' : 'terminate' }
26 }
27}
queueSize = Indicates the maximum number of processes that can be running concurrently. If this value exceeds the limits assigned by the system, the jobs will remain in the queued state.
executor = Nextflow needs to know how to submit jobs to the system, we are chooising to launch jobs to the queue system.
queue = Refers to the partition where jobs are submitted in SLURM. Distinct to the name of the queue specified more below.
maxRetries = Number of retries when a process is failed. After that number of retries the task will be aborted.
errorStrategy = The default action to be performed when a process is failed.
Is possible to affine the total of resources per process. * memory, time and cpu = Defines the amount of memory, CPU, or time the process requires. It’s possible to set a dynamic amount that increases if the process is re-executed. * clusterOptions = Defines the params to pass to SLURM. In this case we are defining the queue name. * errorStrategy = Depending on the error code returned by the process, it is possible to retry the execution or terminate it.
Pipeline implementation: NextflowJob.nf
This file contains all the workflow definition. To explain his content we can split the file into three parts:
1#!/usr/bin/env nextflow
2
3params.reads = "$projectDir/data/reads_*.fq"
4params.genome = "$projectDir/ref/chr8.fa"
5params.outdir = "$projectDir/out"
6
7println "reads: $params.reads"
The header of the file contains global variables that will be used below. In this example we are defining the input (reference genome and sample reads) and output directories. Below to this we are defining two processes (similar to functions) that describes the indexing and aligning tasks.
1/*
2* define the `INDEX` process that creates a binary index
3* given the reference genome
4*/
5process INDEX {
6 label 'short_process'
7
8 input:
9 path genome
10
11 output:
12 path 'chr8_ref.*'
13
14 script:
15 """
16 module load biotools
17 bwa index $genome -p chr8_ref
18 """
19}
Line 6 we are selecting the profile to be applied in SLURM to submit this task. Remember that the profile
short_process
was defined in the configuration file.Line 8-9 defines the input parameters. Indexation requires a path to the reference genome file then we use de variable
genome
to capture this input path that we’ll be passed to this process below.Line 11-12 defines the output parameter. We have to specify the files that the process indexation will produce if all goes fine. In this case this nextflow validates that the INDEX process will generate some files with the pattern:
chr8_ref.*
(indexation files).Line 14-18 defines the code that will be executed by the process INDEX. In this case we are loading the module that contains the tool bwa and then executing the indexation command:
bwa index $genome -p chr8_ref
. Take into account that we are providing to bwa the variablegenome
which contains the path to the genome reference. This variable will be instantiated when the process INDEX is called in the last section of this file.
Next we define the process ALIGN. This segment of code is defined in a similar eay to the process INDEX.
1/*
2* define the `ALIGN` process that align multiple short reads from a
3* indexed reference genome.
4*/
5process ALIGN {
6 label 'short_process'
7
8 publishDir params.outdir, mode:'copy'
9 input:
10 path genome
11 path read
12
13 output:
14 path '*_aln.sai'
15
16 script:
17 """
18 module load biotools
19 bwa aln -I -t 1 chr8_ref $read > ${read.baseName}_aln.sai
20 """
21}
Line 8 points to nextflow that all the produced output (specified in lines 13-14) will be copied to the shared directory defined in the variable
params.outdir
. When nextflow executes a process a temporal directory is created. All files produced by the process are saved in the temporal direcotry. So if we need to retrieve these output files we need to copy them to the work directory.Line 9-11 we need to define two parameters as input. One to the path of indexed files (which are produced in the previous process). And other to define the read sample to be aligned in this process. Take account that the variable
read
will be instantiated to a single file (this example contains 20 read sample files). Nextflow generates 20 alignment processes running concurrently so the value of the variableread
only contain one file per process.Line 13-17 this fragment of code will be executed to align the dataset of sequences. The path to the sequence file to process is passed through the variable
read
. The aligned file is saved using the name of the read sequence that was aligned.
The last step is to define the sequence of calls to the above process. Nextflow uses the fragmen of code worflow
to define input and output channels that are used to bind processes into the pipeline.
1workflow {
2 reads_ch = Channel.fromPath(params.reads)
3 index_ch = INDEX(params.genome)
4 align_ch = ALIGN(index_ch,reads_ch)
5 align_ch.view()
6}
Line 2 defines a channel
reads_ch
of data associated to the sample reads. This channels contains a list of 20 files which will be used then by the process ALIGN.Line 3 calls the process INDEX. The INDEX process receives the path to the reference genome as input:
params.genome
. The output of this process is a new channel:index_ch
which contains the list of indexed files.Line 4 calls the process ALIGN. This process receives two channels as inputs: indexed files ->
index_ch
and sequence reads ->reads_ch
. The output is a channel with the name of the aligned file.Line 5 we can show the content of a channel. In this case we are showing the list of aligned files returned by the ALIGN process.
Launcher script: NextflowLauncher.sbatch
Nextflow requires a master process in charge of monitoring the execution of the pipeline. This process is responsible for executing each of the processes defined above in the correct order until the last of the pipeline processes is finished. To run the pipeline in the Garnatxa cluster, it is necessary to create a file in sbatch format that will contain the execution order to nextflow along with the configuration file and pipeline file. Therefore, it is mandatory to launch the nextflow master process within the Garnatxa queue system. Keep in mind that this process will last the entire duration of the pipeline, so you must estimate a sufficiently long estimated queue time. You only need to use more than 1 processor and a couple of gigabytes of memory to launch this master process.
1#!/bin/bash
2#SBATCH --qos short
3#SBATCH --mem 2G
4#SBATCH -c 1
5#SBATCH -t 12:00:00
6
7WORKFLOW=$1
8CONFIG=$2
9
10module load nextflow
11
12nextflow -C ${CONFIG} run ${WORKFLOW} -resume -with-report -with-dag
Line 12 executes the nextflow pipeline. You must pass the configuration file and pipeline as parameter. The
-resume
parameter indicates to nexflow that intermediate results can be used in case that the execution was stopped due to an failure.
Launching the above script we can monitoring the execution of our pipeline:
$ sbatch NextflowLauncher.sbatch ./NextflowJob.nf ./NextflowJob.config -resume
Then the master job will be submitted to the queue system. You can check that index and aligned processes will be executed in order. Firstly only an indexing process will be executed and after the rest of align processes concurrently.
Nextflow allows you to perform a statistical analysis of the usage of each process in the pipeline. To do this, you must specify the –with-report parameter in launcher file (NextflowLauncher.sbatch). After the pipeline completes, an HTML file will be generated. Simply copy this file to your local computer and open it with any browser. Also you can retrieve an useful direct acyclic graph (DAG) of your pipeline. Use -with-dag parameter in the NextflowLauncher.sbatch file and nextflow will produce a DAG file before the execution.
squeue -u user1
JOBID NAME PARTITION QOS USER ACCOUNT START_TIME TIME TIME_LEFT NODES CPU MIN_M NODELIST ST REASON
840576 nf-INDEX global short user1 admin 2024-10-28T10:12 2:27 27:33 1 4 10G CG None
840597 nf-ALIGN_(9) global short user1 admin 2024-10-28T10:44 0:00 30:00 1 4 10G PD Resources
840573 NextflowLaunche global short user1 admin 2024-10-28T10:11 2:49 11:57:11 1 2 2G cn11 R None
840588 nf-ALIGN_(1) global short user1 admin 2024-10-28T10:14 0:06 29:54 1 4 10G cn02 R None
840589 nf-ALIGN_(2) global short user1 admin 2024-10-28T10:14 0:06 29:54 1 4 10G cn07 R None
840590 nf-ALIGN_(3) global short user1 admin 2024-10-28T10:14 0:05 29:55 1 4 10G cn09 R None
840591 nf-ALIGN_(4) global short user1 admin 2024-10-28T10:14 0:05 29:55 1 4 10G cn09 R None
840592 nf-ALIGN_(5) global short user1 admin 2024-10-28T10:14 0:05 29:55 1 4 10G cn09 R None
840593 nf-ALIGN_(6) global short user1 admin 2024-10-28T10:14 0:05 29:55 1 4 10G cn09 R None
840594 nf-ALIGN_(7) global short user1 admin 2024-10-28T10:14 0:05 29:55 1 4 10G cn09 R None
840595 nf-ALIGN_(8) global short user1 admin 2024-10-28T10:14 0:05 29:55 1 4 10G cn09 R None
840596 nf-ALIGN_(10) global short user1 admin 2024-10-28T10:14 0:05 29:55 1 4 10G cn09 R None
In the above output the indexing job: 840576 is in state CG (completing) when the rest of aligned processes are concurrently executed. Note that the nextflow master job 840573 remains in the queue until all jobs in the pipeline are finished. Use the jobid of the master job to monitor de output of nextflow during the execution. In this example the assigned job id: 840573
1$ cat slurm-840573.out
2Nextflow 24.10.0 is available - Please consider updating your version to it
3
4N E X T F L O W ~ version 24.04.3
5
6Launching `./NextflowJob.nf` [evil_bartik] DSL2 - revision: 7d1433c154
7
8reads: /home/xxx/nextflow/data/reads_*.fq
9[- ] INDEX -
10[- ] ALIGN -
11executor > slurm (1)
12[e1/ff562a] INDEX | 0 of 1
13[- ] ALIGN -
14executor > slurm (1)
15[e1/ff562a] INDEX | 0 of 1
16[- ] ALIGN -
17executor > slurm (2)
18[e1/ff562a] INDEX | 1 of 1 ✔
19[33/87d73c] ALIGN (1) | 0 of 4
20executor > slurm (7)
21[e1/ff562a] INDEX | 1 of 1 ✔
22[78/53e984] ALIGN (6) | 0 of 10
23executor > slurm (11)
24[e1/ff562a] INDEX | 1 of 1 ✔
25[8d/54f16f] ALIGN (10) | 10 of 10 ✔
26/home/user1/nextflow/work/78/53e98461a5bdcd30a6cb7f8be6d861/reads_01_aln.sai
27/home/user1/nextflow/work/78/53e98461a5bdcd30a6cb7f8be6d861/reads_04_aln.sai
28/home/user1/nextflow/work/73/72553f0a19ded5c9312c4ca46aaf87/reads_03_aln.sai
29/home/user1/nextflow/work/02/e11c7201fee3d65c926ba8a13f133a/reads_05_aln.sai
30/home/user1/nextflow/work/1d/8ab26cd35f2897ede9274d10591d97/reads_08_aln.sai
31/home/user1/nextflow/work/38/01b66e7f67d305b7453fb358ad92f3/reads_00_aln.sai
32/home/user1/nextflow/work/b2/209dde11c4962fe637ededf88c36f4/reads_09_aln.sai
33/home/user1/nextflow/work/10/a726717ccde2744367e08e07916607/reads_07_aln.sai
34/home/user1/nextflow/work/33/87d73ca2e0bf53c3b36d427a768179/reads_06_aln.sai
35/home/user1/nextflow/work/4e/ddc69a23078b190f4a2ceb3b034cb8/reads_02_aln.sai
36
37Completed at: 28-Oct-2024 10:17:21
38Duration : 5m 11s
39CPU hours : 1.0
40Succeeded : 11

Figure 1. Example of statistics generated by Nextflow.

Figure 2. Example of workflow diagram generated by Nextflow.