diff --git a/.config/.masterconfig.yaml b/.config/masterconfig.yaml similarity index 75% rename from .config/.masterconfig.yaml rename to .config/masterconfig.yaml index b9bb5f7075f3fe5d2be5ecf206fff208aa3638d5..f86dbff61f592ea4473deb34790d44f20296d5a7 100644 --- a/.config/.masterconfig.yaml +++ b/.config/masterconfig.yaml @@ -1,6 +1,8 @@ # Configuration file for genome simulation parameters per sample. # Each sample is defined by a unique name and associated parameters. +# Warning : for now runing multiple samples at the same time may cause issues + samples: # Unique identifier for each sample with specific simulation parameters. test_sample1: @@ -9,3 +11,7 @@ samples: mutation_rate: 1e-7 # Mutation rate (µ) the rate of mutations in the genome. recombination_rate: 1e-9 # Recombination rate (r) the frequency of recombination events. sample_size: 2 # Number of individual samples to simulate (number of FASTA output) + +memory_multiplier: 1.5 # Multiplier for scaling memory across all rules* +container_registry: "docker://registry.forgemia.inra.fr/pangepop/mspangepop" +output_dir: "results/" \ No newline at end of file diff --git a/.config/snakemake/profiles/slurm/config.yaml b/.config/snakemake/profiles/slurm/config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..24dbe1dba702d510e5305e00b5b2b9f2087850d7 --- /dev/null +++ b/.config/snakemake/profiles/slurm/config.yaml @@ -0,0 +1,9 @@ +executor: slurm +jobs: 10 +use-singularity: true +singularity-args: "--bind $(pwd)" + +default-resources: + #slurm_account: add if needed on your hpc + runtime: 60 + cpus_per_task: 1 \ No newline at end of file diff --git a/.config/snakemake_profile/slurm/CookieCutter.py b/.config/snakemake_profile/slurm/CookieCutter.py deleted file mode 100644 index 19d61df791e1daf3c602d245bc683bf815189d93..0000000000000000000000000000000000000000 --- a/.config/snakemake_profile/slurm/CookieCutter.py +++ /dev/null @@ -1,31 +0,0 @@ -# -# Based on lsf CookieCutter.py -# -import os -import json - -d = os.path.dirname(__file__) -with open(os.path.join(d, "settings.json")) as fh: - settings = json.load(fh) - - -class CookieCutter: - - SBATCH_DEFAULTS = settings['SBATCH_DEFAULTS'] - CLUSTER_NAME = settings['CLUSTER_NAME'] - CLUSTER_CONFIG = settings['CLUSTER_CONFIG'] - ADVANCED_ARGUMENT_CONVERSION = settings['ADVANCED_ARGUMENT_CONVERSION'] - - @staticmethod - def get_cluster_option() -> str: - cluster = CookieCutter.CLUSTER_NAME - if cluster != "": - return f"--cluster={cluster}" - return "" - - @staticmethod - def get_advanced_argument_conversion() -> bool: - val = {"yes": True, "no": False}[ - CookieCutter.ADVANCED_ARGUMENT_CONVERSION - ] - return val diff --git a/.config/snakemake_profile/slurm/cluster_config.yml b/.config/snakemake_profile/slurm/cluster_config.yml deleted file mode 100644 index 7bb55b87ec9223956e9b85796daa32878fe1b18b..0000000000000000000000000000000000000000 --- a/.config/snakemake_profile/slurm/cluster_config.yml +++ /dev/null @@ -1,103 +0,0 @@ -### default ressources used by snakemake (applied to all rules) -__default__: - job-name: "{rule}" - time: "96:00:00" # max run time "hours:minutes:seconds" - ntasks: 1 # nb of processes - cpus-per-task: 4 # nb of cores for each process(1 process) - mem: "60G" - nodes: 1 # Requirements nodes/servers (default: 1) - ntasks-per-node: 1 # Requirements cpu/core/task (default: 1) - output: "slurm_logs/{rule}.%N.%j.out" - error: "slurm_logs/{rule}.%N.%j.err" - mail-type: END,FAIL #email notification - mail-user: your.email@here.fr - -create_chr_config_file: - output: "/dev/null" - mail-type: NONE - mem: "5G" - cpus-per-task: 1 - -generate_fai_index: - output: "/dev/null" - mem: "100G" - cpus-per-task: 20 - -split_index: - output: "/dev/null" - mail-type: NONE - mem: "5G" - cpus-per-task: 1 - -run_msprime: - error: "/dev/null" - output: "/dev/null" - mail-type: NONE - cpus-per-task: 1 - mem: "50G" - -compress_sim_vcf: - error: "/dev/null" - output: "/dev/null" - mail-type: NONE - -index_sim_vcf: - error: "/dev/null" - output: "/dev/null" - mail-type: NONE - -add_header: - output: "/dev/null" - mail-type: NONE - cpus-per-task: 1 - -sort_header: - output: "/dev/null" - mail-type: NONE - cpus-per-task: 1 - -remove_vcf_header: - output: "/dev/null" - mail-type: NONE - cpus-per-task: 1 - -extract_vcf_header: - output: "/dev/null" - mail-type: NONE - cpus-per-task: 1 - -unzip_simulated_vcf: - output: "/dev/null" - mail-type: NONE - -unzip_input_fasta: - output: "/dev/null" - mail-type: NONE - -compress_vcf_for_griaffe: - mail-type: NONE - -generate_variants: - mem: "200G" - -index_giraffe: - mail-type: NONE - mem: "150G" - -sort_vcf: - mail-type: NONE - mem: "150G" - -construct_graph: - mail-type: NONE - mem: "150G" - -vg_to_gfa: - mem: "150G" - -gbz_to_gfa: - mail-type: NONE - mem: "150G" - -graph_to_fasta: - mem: "150G" \ No newline at end of file diff --git a/.config/snakemake_profile/slurm/config.yaml b/.config/snakemake_profile/slurm/config.yaml deleted file mode 100644 index 01741b3281e659d4170ff514475fec124cae70a7..0000000000000000000000000000000000000000 --- a/.config/snakemake_profile/slurm/config.yaml +++ /dev/null @@ -1,14 +0,0 @@ -restart-times: 0 -jobscript: "slurm-jobscript.sh" -cluster: "slurm-submit.py" -cluster-status: "slurm-status.py" -max-jobs-per-second: 1 -max-status-checks-per-second: 10 -local-cores: 1 -latency-wait: 60 - -## added snakemake settings -keep-going: True # Go on with independent jobs if a job fails -rerun-incomplete: True # Re-run all jobs the output of which is recognized as incomplete. -# keep-incomplete: True # keep results even if failed -# unlock: True \ No newline at end of file diff --git a/.config/snakemake_profile/slurm/settings.json b/.config/snakemake_profile/slurm/settings.json deleted file mode 100644 index 5ffbf2bca3ea56cef33ed6cd9998496366dc634f..0000000000000000000000000000000000000000 --- a/.config/snakemake_profile/slurm/settings.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "SBATCH_DEFAULTS": "", - "CLUSTER_NAME": "", - "CLUSTER_CONFIG": "", - "ADVANCED_ARGUMENT_CONVERSION": "no" -} diff --git a/.config/snakemake_profile/slurm/slurm-jobscript.sh b/.config/snakemake_profile/slurm/slurm-jobscript.sh deleted file mode 100755 index 391741ef8824f4b691752e68651f097395d17f70..0000000000000000000000000000000000000000 --- a/.config/snakemake_profile/slurm/slurm-jobscript.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash -# properties = {properties} -{exec_job} diff --git a/.config/snakemake_profile/slurm/slurm-status.py b/.config/snakemake_profile/slurm/slurm-status.py deleted file mode 100755 index 6dc23237c0982578cf013ca0bfd25f787fd04938..0000000000000000000000000000000000000000 --- a/.config/snakemake_profile/slurm/slurm-status.py +++ /dev/null @@ -1,72 +0,0 @@ -#!/usr/bin/env python3 -import re -import subprocess as sp -import shlex -import sys -import time -import logging -from CookieCutter import CookieCutter - -logger = logging.getLogger("__name__") - -STATUS_ATTEMPTS = 20 - -jobid = sys.argv[1] - -cluster = CookieCutter.get_cluster_option() - -for i in range(STATUS_ATTEMPTS): - try: - sacct_res = sp.check_output(shlex.split(f"sacct {cluster} -P -b -j {jobid} -n")) - res = { - x.split("|")[0]: x.split("|")[1] - for x in sacct_res.decode().strip().split("\n") - } - break - except sp.CalledProcessError as e: - logger.error("sacct process error") - logger.error(e) - except IndexError as e: - logger.error(e) - pass - # Try getting job with scontrol instead in case sacct is misconfigured - try: - sctrl_res = sp.check_output( - shlex.split(f"scontrol {cluster} -o show job {jobid}") - ) - m = re.search(r"JobState=(\w+)", sctrl_res.decode()) - res = {jobid: m.group(1)} - break - except sp.CalledProcessError as e: - logger.error("scontrol process error") - logger.error(e) - if i >= STATUS_ATTEMPTS - 1: - print("failed") - exit(0) - else: - time.sleep(1) - -status = res[jobid] - -if status == "BOOT_FAIL": - print("failed") -elif status == "OUT_OF_MEMORY": - print("failed") -elif status.startswith("CANCELLED"): - print("failed") -elif status == "COMPLETED": - print("success") -elif status == "DEADLINE": - print("failed") -elif status == "FAILED": - print("failed") -elif status == "NODE_FAIL": - print("failed") -elif status == "PREEMPTED": - print("failed") -elif status == "TIMEOUT": - print("failed") -elif status == "SUSPENDED": - print("running") -else: - print("running") diff --git a/.config/snakemake_profile/slurm/slurm-submit.py b/.config/snakemake_profile/slurm/slurm-submit.py deleted file mode 100755 index 296b75697bbbda971367d00b5c343ee899f0feec..0000000000000000000000000000000000000000 --- a/.config/snakemake_profile/slurm/slurm-submit.py +++ /dev/null @@ -1,61 +0,0 @@ -#!/usr/bin/env python3 -""" -Snakemake SLURM submit script. -""" -from snakemake.utils import read_job_properties - -import slurm_utils -from CookieCutter import CookieCutter - -# cookiecutter arguments -SBATCH_DEFAULTS = CookieCutter.SBATCH_DEFAULTS -CLUSTER = CookieCutter.get_cluster_option() -CLUSTER_CONFIG = "" -ADVANCED_ARGUMENT_CONVERSION = CookieCutter.get_advanced_argument_conversion() - -RESOURCE_MAPPING = { - "time": ("time", "runtime", "walltime"), - "mem": ("mem", "mem_mb", "ram", "memory"), - "mem-per-cpu": ("mem-per-cpu", "mem_per_cpu", "mem_per_thread"), - "nodes": ("nodes", "nnodes"), - "partition": ("partition", "queue"), -} - -# parse job -jobscript = slurm_utils.parse_jobscript() -job_properties = read_job_properties(jobscript) - -sbatch_options = {} -cluster_config = slurm_utils.load_cluster_config(CLUSTER_CONFIG) - -# 1) sbatch default arguments and cluster -sbatch_options.update(slurm_utils.parse_sbatch_defaults(SBATCH_DEFAULTS)) -sbatch_options.update(slurm_utils.parse_sbatch_defaults(CLUSTER)) - -# 2) cluster_config defaults -sbatch_options.update(cluster_config["__default__"]) - -# 3) Convert resources (no unit conversion!) and threads -sbatch_options.update( - slurm_utils.convert_job_properties(job_properties, RESOURCE_MAPPING) -) - -# 4) cluster_config for particular rule -sbatch_options.update(cluster_config.get(job_properties.get("rule"), {})) - -# 5) cluster_config options -sbatch_options.update(job_properties.get("cluster", {})) - -# 6) Advanced conversion of parameters -if ADVANCED_ARGUMENT_CONVERSION: - sbatch_options = slurm_utils.advanced_argument_conversion(sbatch_options) - -# 7) Format pattern in snakemake style -sbatch_options = slurm_utils.format_values(sbatch_options, job_properties) - -# ensure sbatch output dirs exist -for o in ("output", "error"): - slurm_utils.ensure_dirs_exist(sbatch_options[o]) if o in sbatch_options else None - -# submit job and echo id back to Snakemake (must be the only stdout) -print(slurm_utils.submit_job(jobscript, **sbatch_options)) diff --git a/.config/snakemake_profile/slurm/slurm_utils.py b/.config/snakemake_profile/slurm/slurm_utils.py deleted file mode 100644 index d43c0703dad64faac4b5e70cbe3b83636c3aea39..0000000000000000000000000000000000000000 --- a/.config/snakemake_profile/slurm/slurm_utils.py +++ /dev/null @@ -1,345 +0,0 @@ -#!/usr/bin/env python3 -import os -import sys -from os.path import dirname -import re -import math -import argparse -import subprocess as sp -from io import StringIO - -from snakemake import io -from snakemake.io import Wildcards -from snakemake.utils import SequenceFormatter -from snakemake.utils import AlwaysQuotedFormatter -from snakemake.utils import QuotedFormatter -from snakemake.exceptions import WorkflowError -from snakemake.logging import logger - -from CookieCutter import CookieCutter - - -def _convert_units_to_mb(memory): - """If memory is specified with SI unit, convert to MB""" - if isinstance(memory, int) or isinstance(memory, float): - return int(memory) - siunits = {"K": 1e-3, "M": 1, "G": 1e3, "T": 1e6} - regex = re.compile(r"(\d+)({})$".format("|".join(siunits.keys()))) - m = regex.match(memory) - if m is None: - logger.error( - ( - f"unsupported memory specification '{memory}';" - " allowed suffixes: [K|M|G|T]" - ) - ) - sys.exit(1) - factor = siunits[m.group(2)] - return int(int(m.group(1)) * factor) - - -def parse_jobscript(): - """Minimal CLI to require/only accept single positional argument.""" - p = argparse.ArgumentParser(description="SLURM snakemake submit script") - p.add_argument("jobscript", help="Snakemake jobscript with job properties.") - return p.parse_args().jobscript - - -def parse_sbatch_defaults(parsed): - """Unpack SBATCH_DEFAULTS.""" - d = parsed.split() if type(parsed) == str else parsed - args = {} - for keyval in [a.split("=") for a in d]: - k = keyval[0].strip().strip("-") - v = keyval[1].strip() if len(keyval) == 2 else None - args[k] = v - return args - - -def load_cluster_config(path): - """Load config to dict - - Load configuration to dict either from absolute path or relative - to profile dir. - """ - if path: - path = os.path.join(dirname(__file__), os.path.expandvars(path)) - dcc = io.load_configfile(path) - else: - dcc = {} - if "__default__" not in dcc: - dcc["__default__"] = {} - return dcc - - -# adapted from format function in snakemake.utils -def format(_pattern, _quote_all=False, **kwargs): # noqa: A001 - """Format a pattern in Snakemake style. - This means that keywords embedded in braces are replaced by any variable - values that are available in the current namespace. - """ - fmt = SequenceFormatter(separator=" ") - if _quote_all: - fmt.element_formatter = AlwaysQuotedFormatter() - else: - fmt.element_formatter = QuotedFormatter() - try: - return fmt.format(_pattern, **kwargs) - except KeyError as ex: - raise NameError( - f"The name {ex} is unknown in this context. Please " - "make sure that you defined that variable. " - "Also note that braces not used for variable access " - "have to be escaped by repeating them " - ) - - -# adapted from Job.format_wildcards in snakemake.jobs -def format_wildcards(string, job_properties): - """ Format a string with variables from the job. """ - - class Job(object): - def __init__(self, job_properties): - for key in job_properties: - setattr(self, key, job_properties[key]) - - job = Job(job_properties) - if "params" in job_properties: - job._format_params = Wildcards(fromdict=job_properties["params"]) - else: - job._format_params = None - if "wildcards" in job_properties: - job._format_wildcards = Wildcards(fromdict=job_properties["wildcards"]) - else: - job._format_wildcards = None - _variables = dict() - _variables.update( - dict(params=job._format_params, wildcards=job._format_wildcards) - ) - if hasattr(job, "rule"): - _variables.update(dict(rule=job.rule)) - try: - return format(string, **_variables) - except NameError as ex: - raise WorkflowError( - "NameError with group job {}: {}".format(job.jobid, str(ex)) - ) - except IndexError as ex: - raise WorkflowError( - "IndexError with group job {}: {}".format(job.jobid, str(ex)) - ) - - -# adapted from ClusterExecutor.cluster_params function in snakemake.executor -def format_values(dictionary, job_properties): - formatted = dictionary.copy() - for key, value in list(formatted.items()): - if key == "mem": - value = str(_convert_units_to_mb(value)) - if isinstance(value, str): - try: - formatted[key] = format_wildcards(value, job_properties) - except NameError as e: - msg = "Failed to format cluster config " "entry for job {}.".format( - job_properties["rule"] - ) - raise WorkflowError(msg, e) - return formatted - - -def convert_job_properties(job_properties, resource_mapping=None): - options = {} - if resource_mapping is None: - resource_mapping = {} - resources = job_properties.get("resources", {}) - for k, v in resource_mapping.items(): - options.update({k: resources[i] for i in v if i in resources}) - - if "threads" in job_properties: - options["cpus-per-task"] = job_properties["threads"] - return options - - -def ensure_dirs_exist(path): - """Ensure output folder for Slurm log files exist.""" - di = dirname(path) - if di == "": - return - if not os.path.exists(di): - os.makedirs(di, exist_ok=True) - return - - -def format_sbatch_options(**sbatch_options): - """Format sbatch options""" - options = [] - for k, v in sbatch_options.items(): - val = "" - if v is not None: - val = f"={v}" - options.append(f"--{k}{val}") - return options - - -def submit_job(jobscript, **sbatch_options): - """Submit jobscript and return jobid.""" - options = format_sbatch_options(**sbatch_options) - try: - cmd = ["sbatch"] + ["--parsable"] + options + [jobscript] - res = sp.check_output(cmd) - except sp.CalledProcessError as e: - raise e - # Get jobid - res = res.decode() - try: - jobid = re.search(r"(\d+)", res).group(1) - except Exception as e: - raise e - return jobid - - -def advanced_argument_conversion(arg_dict): - """Experimental adjustment of sbatch arguments to the given or default partition.""" - # Currently not adjusting for multiple node jobs - nodes = int(arg_dict.get("nodes", 1)) - if nodes > 1: - return arg_dict - partition = arg_dict.get("partition", None) or _get_default_partition() - constraint = arg_dict.get("constraint", None) - ncpus = int(arg_dict.get("cpus-per-task", 1)) - runtime = arg_dict.get("time", None) - memory = _convert_units_to_mb(arg_dict.get("mem", 0)) - config = _get_cluster_configuration(partition, constraint, memory) - mem = arg_dict.get("mem", ncpus * min(config["MEMORY_PER_CPU"])) - mem = _convert_units_to_mb(mem) - if mem > max(config["MEMORY"]): - logger.info( - f"requested memory ({mem}) > max memory ({max(config['MEMORY'])}); " - "adjusting memory settings" - ) - mem = max(config["MEMORY"]) - - # Calculate available memory as defined by the number of requested - # cpus times memory per cpu - AVAILABLE_MEM = ncpus * min(config["MEMORY_PER_CPU"]) - # Add additional cpus if memory is larger than AVAILABLE_MEM - if mem > AVAILABLE_MEM: - logger.info( - f"requested memory ({mem}) > " - f"ncpus x MEMORY_PER_CPU ({AVAILABLE_MEM}); " - "trying to adjust number of cpus up" - ) - ncpus = int(math.ceil(mem / min(config["MEMORY_PER_CPU"]))) - if ncpus > max(config["CPUS"]): - logger.info( - f"ncpus ({ncpus}) > available cpus ({max(config['CPUS'])}); " - "adjusting number of cpus down" - ) - ncpus = min(int(max(config["CPUS"])), ncpus) - adjusted_args = {"mem": int(mem), "cpus-per-task": ncpus} - - # Update time. If requested time is larger than maximum allowed time, reset - if runtime: - runtime = time_to_minutes(runtime) - time_limit = max(config["TIMELIMIT_MINUTES"]) - if runtime > time_limit: - logger.info( - f"time (runtime) > time limit {time_limit}; " "adjusting time down" - ) - adjusted_args["time"] = time_limit - - # update and return - arg_dict.update(adjusted_args) - return arg_dict - - -timeformats = [ - re.compile(r"^(?P<days>\d+)-(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)$"), - re.compile(r"^(?P<days>\d+)-(?P<hours>\d+):(?P<minutes>\d+)$"), - re.compile(r"^(?P<days>\d+)-(?P<hours>\d+)$"), - re.compile(r"^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)$"), - re.compile(r"^(?P<minutes>\d+):(?P<seconds>\d+)$"), - re.compile(r"^(?P<minutes>\d+)$"), -] - - -def time_to_minutes(time): - """Convert time string to minutes. - - According to slurm: - - Acceptable time formats include "minutes", "minutes:seconds", - "hours:minutes:seconds", "days-hours", "days-hours:minutes" - and "days-hours:minutes:seconds". - - """ - if not isinstance(time, str): - time = str(time) - d = {"days": 0, "hours": 0, "minutes": 0, "seconds": 0} - regex = list(filter(lambda regex: regex.match(time) is not None, timeformats)) - if len(regex) == 0: - return - assert len(regex) == 1, "multiple time formats match" - m = regex[0].match(time) - d.update(m.groupdict()) - minutes = ( - int(d["days"]) * 24 * 60 - + int(d["hours"]) * 60 - + int(d["minutes"]) - + math.ceil(int(d["seconds"]) / 60) - ) - assert minutes > 0, "minutes has to be greater than 0" - return minutes - - -def _get_default_partition(): - """Retrieve default partition for cluster""" - cluster = CookieCutter.get_cluster_option() - cmd = f"sinfo -O partition {cluster}" - res = sp.check_output(cmd.split()) - m = re.search(r"(?P<partition>\S+)\*", res.decode(), re.M) - partition = m.group("partition") - return partition - - -def _get_cluster_configuration(partition, constraints=None, memory=0): - """Retrieve cluster configuration. - - Retrieve cluster configuration for a partition filtered by - constraints, memory and cpus - - """ - try: - import pandas as pd - except ImportError: - print( - "Error: currently advanced argument conversion " - "depends on 'pandas'.", file=sys.stderr - ) - sys.exit(1) - - if constraints: - constraint_set = set(constraints.split(",")) - cluster = CookieCutter.get_cluster_option() - cmd = f"sinfo -e -o %all -p {partition} {cluster}".split() - try: - output = sp.Popen(" ".join(cmd), shell=True, stdout=sp.PIPE).communicate() - except Exception as e: - print(e) - raise - data = re.sub("^CLUSTER:.+\n", "", re.sub(" \\|", "|", output[0].decode())) - df = pd.read_csv(StringIO(data), sep="|") - try: - df["TIMELIMIT_MINUTES"] = df["TIMELIMIT"].apply(time_to_minutes) - df["MEMORY_PER_CPU"] = df["MEMORY"] / df["CPUS"] - df["FEATURE_SET"] = df["AVAIL_FEATURES"].str.split(",").apply(set) - except Exception as e: - print(e) - raise - if constraints: - constraint_set = set(constraints.split(",")) - i = df["FEATURE_SET"].apply(lambda x: len(x.intersection(constraint_set)) > 0) - df = df.loc[i] - memory = min(_convert_units_to_mb(memory), max(df["MEMORY"])) - df = df.loc[df["MEMORY"] >= memory] - return df diff --git a/.gitignore b/.gitignore index fadcb32593c9d81357fa1654903a947ea61ab4ea..8a850ebdc617f1a6c63493206845b6cb60637b1a 100644 --- a/.gitignore +++ b/.gitignore @@ -55,10 +55,10 @@ # 3) add a pattern to track the file patterns of section2 even if they are in subdirectories !*/ !**.smk -!dag.svg # 4) specific files or folder to TRACK (the '**' sign means 'any path') !/simulation_data/** !test**fa.gz !**setting.json !slurm_logs/ +!.config/** # 5) specific folders to UNTRACK (wherever it may be in the treeview) diff --git a/README.md b/README.md index ca8336d77ee7276d5c7c5b81a9f4e915019b4629..36759768f2f5e44076ad39a61f808affeda89b1a 100644 --- a/README.md +++ b/README.md @@ -1,28 +1,30 @@ # Warnings / Issues -> **`/!\`:** Act with care; this workflow uses significant memory if you increase the values in `.masterconfig`. We recommend keeping the default settings and running a test first. +> **`/!\`:** Act with care; this workflow uses significant memory if you increase the values in `masterconfig`. We recommend keeping the default settings and running a test first. > **`/!\`:** For now dont run multiple split at once -> **`/!\`:** The Transduplication and Reciprocal Translocation sections in the `visor_sv_type.yaml` config file are placeholders; do not use them yet. - # How to Use -## A. Running on the CBIB +## A. Running on a cluster ### 1. Set up -Clone the Git repository and switch to my branch: +Clone the Git repository ```bash git clone https://forgemia.inra.fr/pangepop/MSpangepop.git cd MSpangepop -git checkout dev_lpiat ``` ### 2. Add your files - Add a `.fasta.gz` file; an example can be found in the repository. ### 3. Configure the pipeline -- Edit the `.masterconfig` file in the `.config/` directory with your sample information. +- Edit the `masterconfig` file in the `.config/` directory with your sample information. - Edit the `visor_sv_type.yaml` file with the mutations you want. -- Edit line 17 of `job.sh` and line 13 of `./config/snakemake_profile/clusterconfig.yaml` with your email. +- Edit `job.sh` with your email and add path to the needed modules (`Singularity/Apptainer`, `Miniconda3`) +- Provide the needed conda environement in `job.sh`, under `source activate wf_env`you can create it using : +```bash +conda create -n wf_env -c conda-forge -c bioconda snakemake=8.4.7 snakemake-executor-plugin-slurm +conda init bash +``` ### 4. Run the WF The workflow has two parts: `split` and `simulate`. Always run the split first and once its done (realy quick) run the simulate. @@ -33,16 +35,14 @@ If no warnings are displayed, run: ```bash sbatch job.sh [split or simulate] ``` -> **Nb 1:** to create a visual representation of the workflow, use `dag` instead of `dry`. Open the generated `.dot` file with a [viewer](https://dreampuf.github.io/GraphvizOnline/) that supports the format. +> **Nb 1:** If the your account name cant be automaticly determined, add it in the `.config/snakemake/profiles/slurm/config.yaml` file. -> **Nb 2:** Frist execution of the workflow will be slow since images need to be pulled. +> **Nb 2:** to create a visual representation of the workflow, use `dag` instead of `dry`. Open the generated `.dot` file with a [viewer](https://dreampuf.github.io/GraphvizOnline/) that supports the format. > **Nb 3:** The workflow is in two parts because we want to execute the simulations chromosome by chromosome. Snakemake cannot retrieve the number of chromosomes in one go and needs to index and split first. -> **Nb 4:** Since the cbib dose not support `python:3.9.7` we cant use cookie cutter config, use the `cbib_job.sh` to run. - ## B. Run localy -- Ensure `snakemake` and `singularity` are installed on your machine, then run the workflow: +- Ensure `snakemake` and `Singularity/Apptainer` are installed on your machine, then run the workflow: ```bash ./local_run [split or simulate] dry ``` @@ -60,12 +60,22 @@ The variants generation is inspired by [VISOR](https://github.com/davidebolo1993 You can extract a VCF from the graph using the `vg deconstruct` command. It is not implemented in the pipeline. +## Helper script You can use the script `workflow/scripts/split_path.sh` to cut the final fasta into chromosome level fasta files. + +Example use : ```bash -./split_fasta.sh input.fasta /path/to/output_directory +./workflow/scripts/split_path.sh input.fasta results/test_sample1_results/06_graph_paths/test_sample1_paths.fasta ./out ``` + # Dependencies TODO + +Containers : +Miniconda 3, Singularity/Apptainer + +Python : pandas, msprime, argprase, os, multiprocessing, yaml, Bio.Seq -singularity, snakemake -vg:1.60.0, bcftools:1.12, bgzip:latest, tabix:1.7. \ No newline at end of file + +Workflow : +snakemake, snakemake-executor-plugin-slurm, vg 1.60.0, bcftools 1.12, bgzip, tabix 1.7. \ No newline at end of file diff --git a/cbib_job.sh b/cbib_job.sh deleted file mode 100644 index 3efcf0c7dedd238840ab526666b52051430811c9..0000000000000000000000000000000000000000 --- a/cbib_job.sh +++ /dev/null @@ -1,93 +0,0 @@ -#!/bin/bash -################################ Slurm options ################################# -### prepare_calling_jobs -#SBATCH -J smk_main -### Max run time "hours:minutes:seconds" -#SBATCH --time=95:00:00 -#SBATCH --ntasks=1 #nb of processes -#SBATCH --cpus-per-task=1 # nb of cores for each process(1 process) -#SBATCH --mem=10G # max of memory (-m) -### Requirements nodes/servers (default: 1) -#SBATCH --nodes=1 -### Requirements cpu/core/task (default: 1) -#SBATCH --ntasks-per-node=1 -#SBATCH -o slurm_logs/snakemake.%N.%j.out -#SBATCH -e slurm_logs/snakemake.%N.%j.err -#SBATCH --mail-type=END,FAIL -#SBATCH --mail-user=<your.email@here.fr> -################################################################################ - -# Function to load modules -load_modules() { - module purge # Clear any previously loaded modules - - # Loop through each module and load it - for module_name in "$@"; do - module load "$module_name" - done -} - -load_modules "containers/singularity/3.9.9" "bioinfo/Snakemake/8.3.1" - -SNG_BIND=$(pwd) -MAX_CORES=10 - -echo 'Starting Snakemake workflow' - -run_snakemake() { - local snakefile="$1" # The Snakefile to run - local option="$2" # The option for dry run or DAG - - echo "Starting $snakefile..." - - # Execute the Snakemake command with the specified option - if [[ "$option" == "dry" ]]; then - snakemake -s "$snakefile" -j $MAX_CORES --use-singularity --singularity-args "-B $SNG_BIND" -n - elif [[ "$option" == "dag" ]]; then - snakemake -s "$snakefile" -j $MAX_CORES --use-singularity --singularity-args "-B $SNG_BIND" --dag > dag.dot - echo "DAG has been generated as dag.png" - return - else - snakemake -s "$snakefile" -j $MAX_CORES --use-singularity --singularity-args "-B $SNG_BIND" - fi - - # Check if the Snakemake command was successful - if [ $? -eq 0 ]; then - echo "$snakefile completed successfully." - else - echo "Error: $snakefile failed." - exit 1 - fi -} - -if [ $# -eq 0 ]; then - echo "Usage: $0 [split|simulate] [dry|dag|run]" - echo " split - run the split Snakefile" - echo " simulate - run the simulate Snakefile" - echo " dry - run the specified Snakefile in dry-run mode" - echo " dag - generate DAG for the specified Snakefile" - echo " run - run the specified Snakefile normally (default)" - exit 1 -fi - -# Determine the workflow and option based on the arguments -workflow="$1" -option="$2" - -# Run the specified Snakefile based on user input -case "$workflow" in - split) - snakefile="workflow/Snakefile_split.smk" - ;; - simulate) - snakefile="workflow/Snakefile_simulate.smk" - ;; - *) - echo "Invalid workflow: $workflow" - echo "Usage: $0 [split|simulate] [dry|dag|run]" - exit 1 - ;; -esac - -# Run the specified Snakefile with the provided option -run_snakemake "$snakefile" "$option" \ No newline at end of file diff --git a/job.sh b/job.sh index 408d17f581cfcf89397613b92711be2527d9d9df..3454a1d0af9a17982eca472c446805667e8d0a17 100644 --- a/job.sh +++ b/job.sh @@ -1,77 +1,52 @@ #!/bin/bash -################################ Slurm options ################################# -### prepare_calling_jobs -#SBATCH -J smk_main -### Max run time "hours:minutes:seconds" -#SBATCH --time=96:00:00 -#SBATCH --ntasks=1 #nb of processes -#SBATCH --cpus-per-task=1 # nb of cores for each process(1 process) -#SBATCH --mem=10G # max of memory (-m) -### Requirements nodes/servers (default: 1) -#SBATCH --nodes=1 -### Requirements cpu/core/task (default: 1) -#SBATCH --ntasks-per-node=1 -#SBATCH -o slurm_logs/snakemake.%N.%j.out -#SBATCH -e slurm_logs/snakemake.%N.%j.err -#SBATCH --mail-type=END,FAIL -#SBATCH --mail-user=<your.email@here.fr> -################################################################################ +#SBATCH --cpus-per-task=1 +#SBATCH -o slurm_logs/out_job_%j.out +#SBATCH -e slurm_logs/err_job_%j.err +#SBATCH --time=10:00:00 +#SBATCH -J MSpangepop +#SBATCH --mem=10G +#SBATCH --mail-type=BEGIN,END,FAIL +#SBATCH --mail-user=<mail> -# Useful information to print -echo '########################################' -echo 'Date:' $(date --iso-8601=seconds) -echo 'User:' $USER -echo 'Host:' $HOSTNAME -echo 'Job Name:' $SLURM_JOB_NAME -echo 'Job ID:' $SLURM_JOB_ID -echo 'Number of nodes assigned to job:' $SLURM_JOB_NUM_NODES -echo 'Total number of cores for job (?):' $SLURM_NTASKS -echo 'Number of requested cores per node:' $SLURM_NTASKS_PER_NODE -echo 'Nodes assigned to job:' $SLURM_JOB_NODELIST -echo 'Number of CPUs assigned for each task:' $SLURM_CPUS_PER_TASK -echo 'Directory:' $(pwd) -# Detail Information: -echo 'scontrol show job:' -scontrol show job $SLURM_JOB_ID -echo '########################################' - -# Function to load modules -load_modules() { - module purge # Clear any previously loaded modules - - # Loop through each module and load it - for module_name in "$@"; do - module load "$module_name" - done +# Function to display usage instructions +usage() { + echo "Usage: $0 [split|simulate] [dry|dag|run]" + echo " [split|simulate]: The workflow you want to run" + echo " [dry|dag|run]: The option to execute (dry-run, generate DAG, or actual run)" + echo "Examples:" + echo " $0 split dry # Run the split workflow in dry-run mode" + echo " $0 simulate run # Run the simulate workflow" + exit 1 } -# Here specify the modules to load and their path -load_modules "python/3.9.7" "snakemake/6.5.1" +# Update this with the path to your images +echo 'Loading modules' +module purge +module load containers/Apptainer/1.2.5 +module load devel/Miniconda/Miniconda3 -### variables -SNG_BIND=$(pwd) -CLUSTER_CONFIG=".config/snakemake_profile/slurm/cluster_config.yml" -MAX_CORES=10 -PROFILE=".config/snakemake_profile/slurm" +echo 'Activating environment' +source activate wf_env echo 'Starting Snakemake workflow' - run_snakemake() { - local snakefile="$1" # The Snakefile to run - local option="$2" # The option for dry run or DAG - + local snakefile="$1" + local option="$2" echo "Starting $snakefile..." # Execute the Snakemake command with the specified option if [[ "$option" == "dry" ]]; then - snakemake -s "$snakefile" --profile $PROFILE -j $MAX_CORES --use-singularity --singularity-args "-B $SNG_BIND" --cluster-config $CLUSTER_CONFIG -n -r + snakemake -s "$snakefile" -c $(nproc) --dry-run elif [[ "$option" == "dag" ]]; then - snakemake -s "$snakefile" --profile $PROFILE -j $MAX_CORES --use-singularity --singularity-args "-B $SNG_BIND" --cluster-config $CLUSTER_CONFIG --dag > dag.dot - echo "DAG has been generated as dag.png" + snakemake -s "$snakefile" -c $(nproc) --dag > workflow.dot + echo "DAG has been generated as workflow.svg" return + elif [[ "$option" == "run" || -z "$option" ]]; then + snakemake -s "$snakefile" --workflow-profile ./.config/snakemake/profiles/slurm else - snakemake -s "$snakefile" --profile $PROFILE -j $MAX_CORES --use-singularity --singularity-args "-B $SNG_BIND" --cluster-config $CLUSTER_CONFIG + echo "Invalid option: $option" + usage # Display usage if option is invalid fi # Check if the Snakemake command was successful @@ -79,24 +54,19 @@ run_snakemake() { echo "$snakefile completed successfully." else echo "Error: $snakefile failed." - exit 1 fi } -if [ $# -eq 0 ]; then - echo "Usage: $0 [split|simulate] [dry|dag|run]" - echo " split - run the split Snakefile" - echo " simulate - run the simulate Snakefile" - echo " dry - run the specified Snakefile in dry-run mode" - echo " dag - generate DAG for the specified Snakefile" - echo " run - run the specified Snakefile normally (default)" - exit 1 -fi - # Determine the workflow and option based on the arguments workflow="$1" option="$2" +# Validate arguments and call the usage function if invalid +if [[ -z "$workflow" ]]; then + echo "Error: Missing required workflow argument." + usage # Display usage if workflow is missing +fi + # Run the specified Snakefile based on user input case "$workflow" in split) @@ -107,11 +77,9 @@ case "$workflow" in ;; *) echo "Invalid workflow: $workflow" - echo "Usage: $0 [split|simulate] [dry|dag|run]" - exit 1 + usage # Display usage if workflow is invalid ;; esac # Run the specified Snakefile with the provided option run_snakemake "$snakefile" "$option" -squeue -u $USER \ No newline at end of file diff --git a/workflow/Snakefile_simulate.smk b/workflow/Snakefile_simulate.smk index cc4a2cb9e63d04665166ed574eba821ae0c99366..6dd877f7c912f61084d26f848c5d8e9f88e9d0cf 100644 --- a/workflow/Snakefile_simulate.smk +++ b/workflow/Snakefile_simulate.smk @@ -1,11 +1,15 @@ # Load the configuration file -configfile: ".config/.masterconfig.yaml" +configfile: ".config/masterconfig.yaml" import os import yaml -# Set the output directory for results -output_dir = "results/" +# Retrieve variables from the config file +container_registry = config.get("container_registry", "docker://registry.forgemia.inra.fr/pangepop/mspangepop") +output_dir = config.get("output_dir", "results/") + +# Retrieve memory multiplier from config +memory_multiplier = config.get("memory_multiplier", 1) # Function to load chromosomes dynamically from chr_config.yaml for each sample def load_chromosomes(sample): @@ -17,14 +21,13 @@ def load_chromosomes(sample): return [] # Rule to define all final outputs - rule all: input: expand(os.path.join(output_dir, "{sample}_results", "04_generated_variants", "{sample}_simulated_variants.vcf.gz"), sample=config["samples"].keys()) + expand(os.path.join(output_dir, "{sample}_results", "05_vg_graph", "{sample}_vg_graph.gfa"), sample=config["samples"].keys()) + - expand(os.path.join(output_dir, "{sample}_results", "06_graph_paths", "{sample}_paths.fasta"), + expand(os.path.join(output_dir, "{sample}_results", "06_graph_paths", "{sample}_paths.fasta.gz"), sample=config["samples"].keys()) # Define a function to get the path of the FAI file for each sample and chromosome @@ -33,7 +36,7 @@ def get_fai(wildcards): chromosome = wildcards.chromosome return os.path.join(output_dir, f"{sample}_results", "02_splited_index", f"{chromosome}.fai") -# Rule to generate trees for each chromosome of each sample +# Rule to run msprime simulation rule run_msprime: input: fai=get_fai @@ -45,38 +48,50 @@ rule run_msprime: reco_rate=lambda wildcards: config["samples"][wildcards.sample]["recombination_rate"], n=lambda wildcards: config["samples"][wildcards.sample]["sample_size"], out=lambda wildcards: os.path.join(output_dir, f"{wildcards.sample}_results", "temp") + resources: + mem_mb=lambda wildcards: int(8000 * memory_multiplier), + time="02:00:00" container: - "docker://registry.forgemia.inra.fr/pangepop/mspangepop/mspangepop_dep:0.0.1" + f"{container_registry}/mspangepop_dep:0.0.1" shell: """ mkdir -p {params.out} && python3 workflow/scripts/tree_generation.py -fai {input.fai} -p {params.pop_size} -m {params.mut_rate} -r {params.reco_rate} -n {params.n} -o {params.out} -c {wildcards.chromosome} """ +# Rule to compress the msprime simulation VCF file rule compress_sim_vcf: input: vcf=rules.run_msprime.output output: temp(os.path.join(output_dir, "{sample}_results", "temp", "{chromosome}_msprime_simulation.vcf.gz")) + resources: + mem_mb=lambda wildcards: int(2000 * memory_multiplier), + time="01:00:00" container: - "docker://registry.forgemia.inra.fr/pangepop/mspangepop/bgzip:latest" + f"{container_registry}/bgzip:latest" shell: """ bgzip -c {input.vcf} > {output} """ +# Rule to index the compressed VCF file rule index_sim_vcf: input: vcf_gz=rules.compress_sim_vcf.output output: temp(os.path.join(output_dir, "{sample}_results", "temp", "{chromosome}_msprime_simulation.vcf.gz.tbi")) + resources: + mem_mb=lambda wildcards: int(1000 * memory_multiplier), + time="00:30:00" container: - "docker://registry.forgemia.inra.fr/pangepop/mspangepop/tabix:1.7" + f"{container_registry}/tabix:1.7" shell: """ tabix -p vcf {input.vcf_gz} """ + # Rule to merge VCF files for each sample by combining VCFs from all chromosomes rule merge_simulations: input: @@ -92,11 +107,14 @@ rule merge_simulations: ) output: merged_vcf=os.path.join(output_dir, "{sample}_results", "03_msprime_simulation", "msprime_simulation.vcf.gz") + resources: + mem_mb=lambda wildcards: int(5000 * memory_multiplier), + time="01:30:00" container: - "docker://registry.forgemia.inra.fr/pangepop/mspangepop/bcftools:1.12" + f"{container_registry}/bcftools:1.12" shell: """ - bcftools concat -a -O z -o{output.merged_vcf} -O z {input.vcf_files} + bcftools concat -a -O z -o {output.merged_vcf} {input.vcf_files} """ # Rule to unzip the FASTA file for each sample @@ -105,26 +123,37 @@ rule unzip_input_fasta: fasta_gz=lambda wildcards: config["samples"][wildcards.sample]["fasta_gz"] output: temp(output_dir + "{sample}_results/temp/{sample}.fasta") + resources: + mem_mb=lambda wildcards: int(1000 * memory_multiplier), + time="00:30:00" shell: """ gunzip -c {input.fasta_gz} > {output} """ +# Rule to unzip the simulated VCF rule unzip_simulated_vcf: input: vcf_gz=rules.merge_simulations.output.merged_vcf output: temp(output_dir + "{sample}_results/temp/{sample}_msprime.vcf") + resources: + mem_mb=lambda wildcards: int(1000 * memory_multiplier), + time="00:30:00" shell: """ gunzip -c {input.vcf_gz} > {output} """ +# Rule to remove the header from the VCF file rule remove_vcf_header: input: vcf=rules.unzip_simulated_vcf.output output: temp(output_dir + "{sample}_results/temp/{sample}_msprime_no_header.vcf") + resources: + mem_mb=lambda wildcards: int(500 * memory_multiplier), + time="00:15:00" shell: """ awk '!/^#/' {input.vcf} > {output} @@ -136,12 +165,15 @@ rule extract_vcf_header: vcf=rules.unzip_simulated_vcf.output output: temp(output_dir + "{sample}_results/temp/{sample}_msprime_header.vcf") + resources: + mem_mb=lambda wildcards: int(500 * memory_multiplier), + time="00:15:00" shell: """ awk '/^#/' {input.vcf} > {output} """ -# Rule to generate structural variants using variants_generation.py +# Rule to generate structural variants using the variants_generation.py script rule generate_variants: input: fai=os.path.join(output_dir, "{sample}_results", "01_chromosome_index", "{sample}_full.fai"), @@ -150,115 +182,191 @@ rule generate_variants: yaml=".config/visor_sv_type.yaml" output: temp(os.path.join(output_dir, "{sample}_results", "temp", "simulated_variants.vcf")) + resources: + mem_mb=lambda wildcards: int(8000 * memory_multiplier), + time="02:00:00" container: - "docker://registry.forgemia.inra.fr/pangepop/mspangepop/mspangepop_dep:0.0.1" + f"{container_registry}/mspangepop_dep:0.0.1" shell: """ python3 workflow/scripts/generate_variant.py --fai {input.fai} --fasta {input.fasta} --vcf {input.vcf} --yaml {input.yaml} --output {output} """ -# Rule to sort the extracted VCF header using the external script +# Rule to sort the VCF header rule sort_header: input: header=rules.extract_vcf_header.output output: temp(os.path.join(output_dir, "{sample}_results", "temp", "sorted_header.vcf")) + resources: + mem_mb=lambda wildcards: int(1000 * memory_multiplier), + time="00:20:00" container: - "docker://registry.forgemia.inra.fr/pangepop/mspangepop/mspangepop_dep:0.0.1" + f"{container_registry}/mspangepop_dep:0.0.1" shell: """ python3 workflow/scripts/sort_vcf_header.py {input.header} {output} """ -# Rule to add the header to the generated VCF output +# Rule to add the header to the VCF output rule add_header: input: header=rules.sort_header.output, vcf=rules.generate_variants.output output: temp(os.path.join(output_dir, "{sample}_results", "temp", "final_simulated_variants.vcf")) + resources: + mem_mb=lambda wildcards: int(1000 * memory_multiplier), + time="00:20:00" shell: """ cat {input.header} {input.vcf} > {output} """ -# Workflow to sort, compress, index VCF files and construct graphs +# Rule to sort and compress the final VCF rule sort_vcf: input: - vcf = rules.add_header.output + vcf=rules.add_header.output output: temp(os.path.join(output_dir, "{sample}_results", "temp", "sorted_simulated_variants.vcf")) + resources: + mem_mb=lambda wildcards: int(2000 * memory_multiplier), + time="00:30:00" container: - "docker://registry.forgemia.inra.fr/pangepop/mspangepop/bcftools:1.12" + f"{container_registry}/bcftools:1.12" shell: - "bcftools sort {input.vcf} -Oz -o {output}" - + """ + bcftools sort {input.vcf} -Oz -o {output} + """ +# Rule to construct the graph rule construct_graph: input: - fasta = rules.unzip_input_fasta.output, - vcf = rules.sort_vcf.output + fasta=rules.unzip_input_fasta.output, + vcf=rules.sort_vcf.output output: temp(os.path.join(output_dir, "{sample}_results", "05_vg_graph", "graph.vg")) + resources: + mem_mb=lambda wildcards: int(8000 * memory_multiplier), + time="02:00:00" container: - "docker://registry.forgemia.inra.fr/pangepop/mspangepop/vg:1.60.0" + f"{container_registry}/vg:1.60.0" + params: + memory = lambda wildcards: int(7000 * memory_multiplier), shell: - "vg construct -m 2000000000 -r {input.fasta} -v {input.vcf} -f -p > {output}" - + """ + vg construct -m {params.memory} -r {input.fasta} -v {input.vcf} -f -p > {output} + """ +# Rule to convert VG graph to GFA format rule vg_to_gfa: input: - vg = rules.construct_graph.output + vg=rules.construct_graph.output output: - outfile = os.path.join(output_dir, "{sample}_results", "05_vg_graph", "{sample}_vg_graph.gfa") + outfile=os.path.join(output_dir, "{sample}_results", "05_vg_graph", "{sample}_vg_graph.gfa") + resources: + mem_mb=lambda wildcards: int(4000 * memory_multiplier), + time="01:00:00" container: - "docker://registry.forgemia.inra.fr/pangepop/mspangepop/vg:1.60.0" + f"{container_registry}/vg:1.60.0" shell: - "vg convert -f {input.vg} > {output.outfile}" + """ + vg convert -f {input.vg} > {output.outfile} + """ +# Rule to compress VCF for Giraffe rule compress_vcf_for_griaffe: input: - vcf = rules.sort_vcf.output + vcf=rules.sort_vcf.output output: os.path.join(output_dir, "{sample}_results", "04_generated_variants", "{sample}_simulated_variants.vcf.gz") + resources: + mem_mb=lambda wildcards: int(2000 * memory_multiplier), + time="00:30:00" container: - "docker://registry.forgemia.inra.fr/pangepop/mspangepop/bgzip:latest" + f"{container_registry}/bgzip:latest" shell: """ bgzip -c {input.vcf} > {output} """ +# Rule to create Giraffe index rule index_giraffe: input: - fasta = rules.unzip_input_fasta.output, - vcf_gz = rules.compress_vcf_for_griaffe.output + fasta=rules.unzip_input_fasta.output, + vcf_gz=rules.compress_vcf_for_griaffe.output output: temp(os.path.join(output_dir, "{sample}_results", "temp", "index.giraffe.gbz")) params: - out = os.path.join(output_dir, "{sample}_results", "temp", "index") + out=os.path.join(output_dir, "{sample}_results", "temp", "index") + resources: + mem_mb=lambda wildcards: int(12000 * memory_multiplier), + time="03:00:00" container: - "docker://registry.forgemia.inra.fr/pangepop/mspangepop/vg:1.60.0" + f"{container_registry}/vg:1.60.0" shell: - "vg autoindex -r {input.fasta} -v {input.vcf_gz} -w giraffe -p {params.out}" - + """ + vg autoindex -r {input.fasta} -v {input.vcf_gz} -w giraffe -p {params.out} + """ +# Rule to convert GBZ to GFA format rule gbz_to_gfa: input: - gbz = rules.index_giraffe.output + gbz=rules.index_giraffe.output output: temp(os.path.join(output_dir, "{sample}_results", "temp", "giraffe_graph.gfa")) + resources: + mem_mb=lambda wildcards: int(4000 * memory_multiplier), + time="01:30:00" container: - "docker://registry.forgemia.inra.fr/pangepop/mspangepop/vg:1.60.0" + f"{container_registry}/vg:1.60.0" shell: - "vg convert -f {input.gbz} > {output}" - + """ + vg convert -f {input.gbz} > {output} + """ +# Rule to extract paths to a FASTA file rule graph_to_fasta: input: - gbz = rules.gbz_to_gfa.output + gbz=rules.gbz_to_gfa.output output: - os.path.join(output_dir, "{sample}_results", "06_graph_paths", "{sample}_paths.fasta") + temp(os.path.join(output_dir, "{sample}_results", "temp", "{sample}_paths_dirty.fasta")) + resources: + mem_mb=lambda wildcards: int(2000 * memory_multiplier), + time="01:00:00" container: - "docker://registry.forgemia.inra.fr/pangepop/mspangepop/vg:1.60.0" + f"{container_registry}/vg:1.60.0" + shell: + """ + vg paths -F -x {input.gbz} > {output} + """ + +# Rule to remove the reference paths +rule remove_reference: + input: + fasta=rules.graph_to_fasta.output + output: + temp(os.path.join(output_dir, "{sample}_results", "temp", "{sample}_paths_.fasta")) + resources: + mem_mb=lambda wildcards: int(1000 * memory_multiplier), + time="00:20:00" shell: - "vg paths -F -x {input.gbz} > {output}" \ No newline at end of file + """ + ./workflow/scripts/remove_reference.sh {input.fasta} {output} + """ + +# Rule to compress the filtered FASTA file +rule compress_filtered_fasta: + input: + fasta=rules.remove_reference.output + output: + os.path.join(output_dir, "{sample}_results", "06_graph_paths", "{sample}_paths.fasta.gz") + resources: + mem_mb=lambda wildcards: int(2000 * memory_multiplier), + time="00:30:00" + container: + f"{container_registry}/bgzip:latest" + shell: + """ + bgzip -c {input.fasta} > {output} + """ \ No newline at end of file diff --git a/workflow/Snakefile_split.smk b/workflow/Snakefile_split.smk index 928a0600bcf2296f34d4e46f68682b2fd824de3d..ebf65aae1befababd4744a4eb57b6c8a4634639b 100644 --- a/workflow/Snakefile_split.smk +++ b/workflow/Snakefile_split.smk @@ -1,11 +1,15 @@ # Load the configuration file -configfile: ".config/.masterconfig.yaml" +configfile: ".config/masterconfig.yaml" import os import yaml -# Set the output directory for results -output_dir = "results/" +# Retrieve variables from the config file +container_registry = config.get("container_registry", "docker://registry.forgemia.inra.fr/pangepop/mspangepop") +output_dir = config.get("output_dir", "results/") + +# Retrieve memory multiplier from config +memory_multiplier = config.get("memory_multiplier", 1) # Rule all ensures final files are generated rule all: @@ -18,16 +22,20 @@ rule generate_fai_index: input: fasta=lambda wildcards: config["samples"][wildcards.sample]["fasta_gz"] output: - fai=os.path.join(output_dir, "{sample}_results", "01_chromosome_index", "{sample}_full.fai") + fai=os.path.join(output_dir, "{sample}_results", "01_chromosome_index", "{sample}_full.fai") + threads: 1 + resources: + mem_mb=lambda wildcards: int(10000 * memory_multiplier), + time="01:00:00" params: out=os.path.join(output_dir, "{sample}_results", "01_chromosome_index") container: - "docker://registry.forgemia.inra.fr/pangepop/mspangepop/samtool:1.21" + f"{container_registry}/samtool:1.21" shell: """ samtools faidx {input.fasta} && - mv {input.fasta}.fai {output.fai} && - rm {input.fasta}.gzi || true + mv {input.fasta}.fai {output.fai} && echo 'indexing successful' && + rm {input.fasta}.gzi || true """ # Rule to split the FAI file into separate chromosome files @@ -36,6 +44,9 @@ rule split_index: fai=rules.generate_fai_index.output.fai output: directory(os.path.join(output_dir, "{sample}_results", "02_splited_index")) + resources: + mem_mb=lambda wildcards: int(5000 * memory_multiplier), + time="01:00:00" params: out=os.path.join(output_dir, "{sample}_results", "02_splited_index") shell: @@ -50,6 +61,9 @@ rule create_chr_config_file: fai=rules.generate_fai_index.output.fai output: yaml=os.path.join(output_dir, "{sample}_results", "01_chromosome_index", "chr_config.yaml") + resources: + mem_mb=lambda wildcards: int(2000 * memory_multiplier), + time="01:00:00" shell: """ awk '{{print $1}}' {input.fai} | \ diff --git a/workflow/scripts/remove_reference.sh b/workflow/scripts/remove_reference.sh new file mode 100755 index 0000000000000000000000000000000000000000..cfca91e31ac7186ce8ab24f88bc50a6c1ab42e92 --- /dev/null +++ b/workflow/scripts/remove_reference.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +# Author: Lucien Piat +# Date: November 24, 2024 +# Project: PangenOak at INRAE + +# Usage: ./remove_reference.sh input_fasta output_fasta +input_fasta=$1 +output_fasta=$2 + +# Filter the input FASTA file to include only sequences starting with 'tsk' +awk 'BEGIN {RS=">"; ORS=""} /^tsk/ {print ">" $0}' "$input_fasta" > "$output_fasta" diff --git a/workflow/scripts/split_path.sh b/workflow/scripts/split_path.sh index 8fba85991ea566a13da879c3b279328b09e940dd..98aebfcd52c8b9bbb407e7c48dd195638cbbad7f 100755 --- a/workflow/scripts/split_path.sh +++ b/workflow/scripts/split_path.sh @@ -1,18 +1,60 @@ #!/bin/bash +# This script takes a FASTA file as input, splits each contig into individual smaller FASTA files. +# +# Created by Lucien Piat on behalf of INRAe as part of the PangenOak project. +# +# Usage: +# ./split_fasta.sh <input_fasta> <output_directory> [--help] +# +# Each output file is named after the contig header (without the ">"), with a `.fasta` extension. + +# Display help message +show_help() { + echo "Usage: $0 <input_fasta> <output_directory> [--help]" + echo + echo "This script splits a FASTA file into individual contig files." + echo "Each output file is named after the contig header and saved in the specified output directory." + echo + echo "Arguments:" + echo " <input_fasta> Path to the input FASTA file." + echo " <output_directory> Directory to save the individual FASTA files." + echo + echo "Options:" + echo " --help Display this help message and exit." + echo + exit 0 +} + +if [[ "$1" == "--help" ]]; then + show_help +fi + +if [[ "$#" -ne 2 ]]; then + echo "Error: Missing arguments." + show_help +fi + in="$1" out_dir="$2" -out="" +mkdir -p "$out_dir" + +out="" while read -r line; do if [[ "$line" == \>* ]]; then + # Close previous file descriptor if open [ -n "$out" ] && exec 3>&- - out="${line##>}.fasta" + # Prepare the new output file + out="${line#>}.fasta" exec 3> "$out_dir/$out" echo "$line" >&3 else + # Write sequence data to the current output file echo "$line" >&3 fi done < "$in" [ -n "$out" ] && exec 3>&- + +echo "FASTA file splitting complete. Files saved in '$out_dir'." \ No newline at end of file