From 99fa10b03be2bcf2cb4d313de78220a0518d3a4c Mon Sep 17 00:00:00 2001 From: "sukanya.denni" <sukanya.denni@inrae.fr> Date: Wed, 20 Dec 2023 15:51:43 +0100 Subject: [PATCH 1/6] fix insertion length --- bed2vcf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bed2vcf.py b/bed2vcf.py index 3d3172c..1b36616 100644 --- a/bed2vcf.py +++ b/bed2vcf.py @@ -89,7 +89,7 @@ def get_seq(vcf_df, bed_df, fa_dict, output_file): elif sv_type == "insertion": end = start + get_random_len("INS") - alt_seq = DNA(end) + alt_seq = DNA(get_random_len("INS")) ref = str(fasta_seq.seq[start]) alt = ref + str(alt_seq) -- GitLab From 690ad76fc4d5ec3e37859b08616724ec70079f74 Mon Sep 17 00:00:00 2001 From: "sukanya.denni" <sukanya.denni@inrae.fr> Date: Fri, 19 Jan 2024 09:58:32 +0100 Subject: [PATCH 2/6] update README --- README.md | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 67ded0c..d399e76 100644 --- a/README.md +++ b/README.md @@ -33,4 +33,13 @@ The distributions are used to randomly sample each structural variant size. - [ ] tandem repeat contraction - [ ] tandem repeat expansion - [ ] approximate tandem repetition -- [x] Add VCF merging when there are multiple chromosomes \ No newline at end of file +- [x] Add VCF merging when there are multiple chromosomes + +## 3. Create exact data with vg +In the `vg_extact_data` folder. + +Snakemake/Singularity pipeline to get a pangenome in GFA format and a FASTA with all individuals from the VCF. Starts from a reference FASTA and a VCF to specify in the `config.yaml` file, with a name for the output. + +On SLURM cluster, run `sbatch job.sh dry` for a dry run or `sbatch job.sh` directly. Adjust the `SNG_BIND` variable if files are not found and the snakemake profile as necessary for performance. + +You can extract a VCF from the graph using the `vg deconstruct` command. It is not implemented in the pipeline. \ No newline at end of file -- GitLab From 82976f4cd14cc081a38acf83913db82f740878d1 Mon Sep 17 00:00:00 2001 From: "sukanya.denni" <sukanya.denni@inrae.fr> Date: Fri, 19 Jan 2024 09:58:50 +0100 Subject: [PATCH 3/6] added data generation with vg --- vg_exact_data/Snakefile | 75 ++++ vg_exact_data/config.yaml | 3 + vg_exact_data/job.sh | 61 ++++ .../snakemake_profile/slurm/CookieCutter.py | 31 ++ .../slurm/cluster_config.yml | 15 + .../snakemake_profile/slurm/config.yaml | 14 + .../slurm/slurm-jobscript.sh | 3 + .../snakemake_profile/slurm/slurm-status.py | 72 ++++ .../snakemake_profile/slurm/slurm-submit.py | 61 ++++ .../snakemake_profile/slurm/slurm_utils.py | 345 ++++++++++++++++++ 10 files changed, 680 insertions(+) create mode 100644 vg_exact_data/Snakefile create mode 100644 vg_exact_data/config.yaml create mode 100644 vg_exact_data/job.sh create mode 100755 vg_exact_data/snakemake_profile/slurm/CookieCutter.py create mode 100644 vg_exact_data/snakemake_profile/slurm/cluster_config.yml create mode 100644 vg_exact_data/snakemake_profile/slurm/config.yaml create mode 100644 vg_exact_data/snakemake_profile/slurm/slurm-jobscript.sh create mode 100755 vg_exact_data/snakemake_profile/slurm/slurm-status.py create mode 100755 vg_exact_data/snakemake_profile/slurm/slurm-submit.py create mode 100755 vg_exact_data/snakemake_profile/slurm/slurm_utils.py diff --git a/vg_exact_data/Snakefile b/vg_exact_data/Snakefile new file mode 100644 index 0000000..4739017 --- /dev/null +++ b/vg_exact_data/Snakefile @@ -0,0 +1,75 @@ +configfile: "config.yaml" +SAMPLES = config["name"] + + +rule all: + input: + expand("results/{sample}/{sample}.gfa", sample=SAMPLES), + # expand("results/{sample}/{sample}.fa", sample=SAMPLES), + expand("results/{sample}/{sample}_from_vg.gfa", sample=SAMPLES) + +rule sort: + input: + vcf = config["vcf"] + output: + sorted_vcf = "results/{sample}/{sample}.vcf.gz" + container: + "docker://mgibio/bcftools:1.12" + shell: + "bcftools sort {input.vcf} -Oz -o {output}" + +rule construct: + input: + ref = config["fa"], + vcf = rules.sort.output + output: + "results/{sample}/graph.vg" + container: + "docker://quay.io/vgteam/vg:v1.53.0" + shell: + "vg construct -m 2000000000 -r {input.ref} -v {input.vcf} -f -p > {output}" + +rule convert: + input: + vg = rules.construct.output + output: + gfa = "results/{sample}/{sample}_from_vg.gfa" + container: + "docker://quay.io/vgteam/vg:v1.53.0" + shell: + "vg convert -f {input.vg} > {output.gfa}" + +rule graph: + input: + ref = config["fa"], + vcf = rules.sort.output + # vcf = config["vcf"] + output: + gbz = "results/" + "{sample}/index.giraffe.gbz" + params: + prefix = "results/" + "{sample}/index" + container: + "docker://quay.io/vgteam/vg:v1.53.0" + shell: + "vg autoindex -r {input.ref} -v {input.vcf} -w giraffe -p {params.prefix}" + +rule graph_gfa: + input: + gbz = rules.graph.output + output: + gfa = "results/{sample}/{sample}.gfa" + container: + "docker://quay.io/vgteam/vg:v1.53.0" + shell: + "vg convert -f {input.gbz} > {output.gfa}" + +rule fa: + input: + gbz = rules.graph.output + output: + fa = "results/{sample}/{sample}.fa" + container: + "docker://quay.io/vgteam/vg:v1.53.0" + shell: + "vg paths -F -x {input.gbz} > {output.fa}" + diff --git a/vg_exact_data/config.yaml b/vg_exact_data/config.yaml new file mode 100644 index 0000000..a233aa7 --- /dev/null +++ b/vg_exact_data/config.yaml @@ -0,0 +1,3 @@ +fa: "../Sibirica_v1.0.fa" +vcf: "../with_singularity/results/sib_10.vcf" +name: "sibirica_10k" \ No newline at end of file diff --git a/vg_exact_data/job.sh b/vg_exact_data/job.sh new file mode 100644 index 0000000..1598450 --- /dev/null +++ b/vg_exact_data/job.sh @@ -0,0 +1,61 @@ +#!/bin/bash +################################ Slurm options ################################# +### prepare_calling_jobs +#SBATCH -J vg_smk +### Max run time "hours:minutes:seconds" +#SBATCH --time=120:00:00 +#SBATCH --ntasks=1 #nb of processes +#SBATCH --cpus-per-task=1 # nb of cores for each process(1 process) +#SBATCH --mem=10G # max of memory (-m) +### Requirements nodes/servers (default: 1) +#SBATCH --nodes=1 +### Requirements cpu/core/task (default: 1) +#SBATCH --ntasks-per-node=1 +#SBATCH -o slurm_logs/snakemake.%N.%j.out +#SBATCH -e slurm_logs/snakemake.%N.%j.err +#SBATCH --mail-type=END,FAIL +#SBATCH --mail-user=sukanya.denni@univ-rouen.fr +################################################################################ + +# Useful information to print +echo '########################################' +echo 'Date:' $(date --iso-8601=seconds) +echo 'User:' $USER +echo 'Host:' $HOSTNAME +echo 'Job Name:' $SLURM_JOB_NAME +echo 'Job ID:' $SLURM_JOB_ID +echo 'Number of nodes assigned to job:' $SLURM_JOB_NUM_NODES +echo 'Total number of cores for job (?):' $SLURM_NTASKS +echo 'Number of requested cores per node:' $SLURM_NTASKS_PER_NODE +echo 'Nodes assigned to job:' $SLURM_JOB_NODELIST +echo 'Number of CPUs assigned for each task:' $SLURM_CPUS_PER_TASK +echo 'Directory:' $(pwd) +# Detail Information: +echo 'scontrol show job:' +scontrol show job $SLURM_JOB_ID +echo '########################################' + + +### variables +CLUSTER_CONFIG="snakemake_profile/slurm/cluster_config.yml" +MAX_CORES=10 +PROFILE="snakemake_profile/slurm" +SNG_BIND=".,/gpfs" + +### Module Loading: +module purge +module load snakemake/6.5.1 +# module load singularity + +echo 'Starting Snakemake workflow' + +### Snakemake commands + +if [ "$1" = "dry" ] +then + # dry run + snakemake --profile $PROFILE -j $MAX_CORES --use-singularity --singularity-args "-B $SNG_BIND" --cluster-config $CLUSTER_CONFIG -n -r +else + # run + snakemake --profile $PROFILE -j $MAX_CORES --use-singularity --singularity-args "-B $SNG_BIND" --cluster-config $CLUSTER_CONFIG +fi \ No newline at end of file diff --git a/vg_exact_data/snakemake_profile/slurm/CookieCutter.py b/vg_exact_data/snakemake_profile/slurm/CookieCutter.py new file mode 100755 index 0000000..19d61df --- /dev/null +++ b/vg_exact_data/snakemake_profile/slurm/CookieCutter.py @@ -0,0 +1,31 @@ +# +# Based on lsf CookieCutter.py +# +import os +import json + +d = os.path.dirname(__file__) +with open(os.path.join(d, "settings.json")) as fh: + settings = json.load(fh) + + +class CookieCutter: + + SBATCH_DEFAULTS = settings['SBATCH_DEFAULTS'] + CLUSTER_NAME = settings['CLUSTER_NAME'] + CLUSTER_CONFIG = settings['CLUSTER_CONFIG'] + ADVANCED_ARGUMENT_CONVERSION = settings['ADVANCED_ARGUMENT_CONVERSION'] + + @staticmethod + def get_cluster_option() -> str: + cluster = CookieCutter.CLUSTER_NAME + if cluster != "": + return f"--cluster={cluster}" + return "" + + @staticmethod + def get_advanced_argument_conversion() -> bool: + val = {"yes": True, "no": False}[ + CookieCutter.ADVANCED_ARGUMENT_CONVERSION + ] + return val diff --git a/vg_exact_data/snakemake_profile/slurm/cluster_config.yml b/vg_exact_data/snakemake_profile/slurm/cluster_config.yml new file mode 100644 index 0000000..91e92ba --- /dev/null +++ b/vg_exact_data/snakemake_profile/slurm/cluster_config.yml @@ -0,0 +1,15 @@ +### default ressources used by snakemake (applied to all rules) +__default__: + job-name: "{rule}" + time: "120:00:00" # max run time "hours:minutes:seconds" + ntasks: 1 # nb of processes + cpus-per-task: 10 # nb of cores for each process(1 process) + mem: "60G" + nodes: 1 # Requirements nodes/servers (default: 1) + ntasks-per-node: 1 # Requirements cpu/core/task (default: 1) + output: "slurm_logs/{rule}.%N.%j.out" + error: "slurm_logs/{rule}.%N.%j.err" + mail-type: FAIL #email notification + mail-user: sukanya.denni@univ-rouen.fr + +###Â rule resources \ No newline at end of file diff --git a/vg_exact_data/snakemake_profile/slurm/config.yaml b/vg_exact_data/snakemake_profile/slurm/config.yaml new file mode 100644 index 0000000..01741b3 --- /dev/null +++ b/vg_exact_data/snakemake_profile/slurm/config.yaml @@ -0,0 +1,14 @@ +restart-times: 0 +jobscript: "slurm-jobscript.sh" +cluster: "slurm-submit.py" +cluster-status: "slurm-status.py" +max-jobs-per-second: 1 +max-status-checks-per-second: 10 +local-cores: 1 +latency-wait: 60 + +## added snakemake settings +keep-going: True # Go on with independent jobs if a job fails +rerun-incomplete: True # Re-run all jobs the output of which is recognized as incomplete. +# keep-incomplete: True # keep results even if failed +# unlock: True \ No newline at end of file diff --git a/vg_exact_data/snakemake_profile/slurm/slurm-jobscript.sh b/vg_exact_data/snakemake_profile/slurm/slurm-jobscript.sh new file mode 100644 index 0000000..391741e --- /dev/null +++ b/vg_exact_data/snakemake_profile/slurm/slurm-jobscript.sh @@ -0,0 +1,3 @@ +#!/bin/bash +# properties = {properties} +{exec_job} diff --git a/vg_exact_data/snakemake_profile/slurm/slurm-status.py b/vg_exact_data/snakemake_profile/slurm/slurm-status.py new file mode 100755 index 0000000..6dc2323 --- /dev/null +++ b/vg_exact_data/snakemake_profile/slurm/slurm-status.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 +import re +import subprocess as sp +import shlex +import sys +import time +import logging +from CookieCutter import CookieCutter + +logger = logging.getLogger("__name__") + +STATUS_ATTEMPTS = 20 + +jobid = sys.argv[1] + +cluster = CookieCutter.get_cluster_option() + +for i in range(STATUS_ATTEMPTS): + try: + sacct_res = sp.check_output(shlex.split(f"sacct {cluster} -P -b -j {jobid} -n")) + res = { + x.split("|")[0]: x.split("|")[1] + for x in sacct_res.decode().strip().split("\n") + } + break + except sp.CalledProcessError as e: + logger.error("sacct process error") + logger.error(e) + except IndexError as e: + logger.error(e) + pass + # Try getting job with scontrol instead in case sacct is misconfigured + try: + sctrl_res = sp.check_output( + shlex.split(f"scontrol {cluster} -o show job {jobid}") + ) + m = re.search(r"JobState=(\w+)", sctrl_res.decode()) + res = {jobid: m.group(1)} + break + except sp.CalledProcessError as e: + logger.error("scontrol process error") + logger.error(e) + if i >= STATUS_ATTEMPTS - 1: + print("failed") + exit(0) + else: + time.sleep(1) + +status = res[jobid] + +if status == "BOOT_FAIL": + print("failed") +elif status == "OUT_OF_MEMORY": + print("failed") +elif status.startswith("CANCELLED"): + print("failed") +elif status == "COMPLETED": + print("success") +elif status == "DEADLINE": + print("failed") +elif status == "FAILED": + print("failed") +elif status == "NODE_FAIL": + print("failed") +elif status == "PREEMPTED": + print("failed") +elif status == "TIMEOUT": + print("failed") +elif status == "SUSPENDED": + print("running") +else: + print("running") diff --git a/vg_exact_data/snakemake_profile/slurm/slurm-submit.py b/vg_exact_data/snakemake_profile/slurm/slurm-submit.py new file mode 100755 index 0000000..296b756 --- /dev/null +++ b/vg_exact_data/snakemake_profile/slurm/slurm-submit.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 +""" +Snakemake SLURM submit script. +""" +from snakemake.utils import read_job_properties + +import slurm_utils +from CookieCutter import CookieCutter + +# cookiecutter arguments +SBATCH_DEFAULTS = CookieCutter.SBATCH_DEFAULTS +CLUSTER = CookieCutter.get_cluster_option() +CLUSTER_CONFIG = "" +ADVANCED_ARGUMENT_CONVERSION = CookieCutter.get_advanced_argument_conversion() + +RESOURCE_MAPPING = { + "time": ("time", "runtime", "walltime"), + "mem": ("mem", "mem_mb", "ram", "memory"), + "mem-per-cpu": ("mem-per-cpu", "mem_per_cpu", "mem_per_thread"), + "nodes": ("nodes", "nnodes"), + "partition": ("partition", "queue"), +} + +# parse job +jobscript = slurm_utils.parse_jobscript() +job_properties = read_job_properties(jobscript) + +sbatch_options = {} +cluster_config = slurm_utils.load_cluster_config(CLUSTER_CONFIG) + +# 1) sbatch default arguments and cluster +sbatch_options.update(slurm_utils.parse_sbatch_defaults(SBATCH_DEFAULTS)) +sbatch_options.update(slurm_utils.parse_sbatch_defaults(CLUSTER)) + +# 2) cluster_config defaults +sbatch_options.update(cluster_config["__default__"]) + +# 3) Convert resources (no unit conversion!) and threads +sbatch_options.update( + slurm_utils.convert_job_properties(job_properties, RESOURCE_MAPPING) +) + +# 4) cluster_config for particular rule +sbatch_options.update(cluster_config.get(job_properties.get("rule"), {})) + +# 5) cluster_config options +sbatch_options.update(job_properties.get("cluster", {})) + +# 6) Advanced conversion of parameters +if ADVANCED_ARGUMENT_CONVERSION: + sbatch_options = slurm_utils.advanced_argument_conversion(sbatch_options) + +# 7) Format pattern in snakemake style +sbatch_options = slurm_utils.format_values(sbatch_options, job_properties) + +# ensure sbatch output dirs exist +for o in ("output", "error"): + slurm_utils.ensure_dirs_exist(sbatch_options[o]) if o in sbatch_options else None + +# submit job and echo id back to Snakemake (must be the only stdout) +print(slurm_utils.submit_job(jobscript, **sbatch_options)) diff --git a/vg_exact_data/snakemake_profile/slurm/slurm_utils.py b/vg_exact_data/snakemake_profile/slurm/slurm_utils.py new file mode 100755 index 0000000..d43c070 --- /dev/null +++ b/vg_exact_data/snakemake_profile/slurm/slurm_utils.py @@ -0,0 +1,345 @@ +#!/usr/bin/env python3 +import os +import sys +from os.path import dirname +import re +import math +import argparse +import subprocess as sp +from io import StringIO + +from snakemake import io +from snakemake.io import Wildcards +from snakemake.utils import SequenceFormatter +from snakemake.utils import AlwaysQuotedFormatter +from snakemake.utils import QuotedFormatter +from snakemake.exceptions import WorkflowError +from snakemake.logging import logger + +from CookieCutter import CookieCutter + + +def _convert_units_to_mb(memory): + """If memory is specified with SI unit, convert to MB""" + if isinstance(memory, int) or isinstance(memory, float): + return int(memory) + siunits = {"K": 1e-3, "M": 1, "G": 1e3, "T": 1e6} + regex = re.compile(r"(\d+)({})$".format("|".join(siunits.keys()))) + m = regex.match(memory) + if m is None: + logger.error( + ( + f"unsupported memory specification '{memory}';" + " allowed suffixes: [K|M|G|T]" + ) + ) + sys.exit(1) + factor = siunits[m.group(2)] + return int(int(m.group(1)) * factor) + + +def parse_jobscript(): + """Minimal CLI to require/only accept single positional argument.""" + p = argparse.ArgumentParser(description="SLURM snakemake submit script") + p.add_argument("jobscript", help="Snakemake jobscript with job properties.") + return p.parse_args().jobscript + + +def parse_sbatch_defaults(parsed): + """Unpack SBATCH_DEFAULTS.""" + d = parsed.split() if type(parsed) == str else parsed + args = {} + for keyval in [a.split("=") for a in d]: + k = keyval[0].strip().strip("-") + v = keyval[1].strip() if len(keyval) == 2 else None + args[k] = v + return args + + +def load_cluster_config(path): + """Load config to dict + + Load configuration to dict either from absolute path or relative + to profile dir. + """ + if path: + path = os.path.join(dirname(__file__), os.path.expandvars(path)) + dcc = io.load_configfile(path) + else: + dcc = {} + if "__default__" not in dcc: + dcc["__default__"] = {} + return dcc + + +# adapted from format function in snakemake.utils +def format(_pattern, _quote_all=False, **kwargs): # noqa: A001 + """Format a pattern in Snakemake style. + This means that keywords embedded in braces are replaced by any variable + values that are available in the current namespace. + """ + fmt = SequenceFormatter(separator=" ") + if _quote_all: + fmt.element_formatter = AlwaysQuotedFormatter() + else: + fmt.element_formatter = QuotedFormatter() + try: + return fmt.format(_pattern, **kwargs) + except KeyError as ex: + raise NameError( + f"The name {ex} is unknown in this context. Please " + "make sure that you defined that variable. " + "Also note that braces not used for variable access " + "have to be escaped by repeating them " + ) + + +# adapted from Job.format_wildcards in snakemake.jobs +def format_wildcards(string, job_properties): + """ Format a string with variables from the job. """ + + class Job(object): + def __init__(self, job_properties): + for key in job_properties: + setattr(self, key, job_properties[key]) + + job = Job(job_properties) + if "params" in job_properties: + job._format_params = Wildcards(fromdict=job_properties["params"]) + else: + job._format_params = None + if "wildcards" in job_properties: + job._format_wildcards = Wildcards(fromdict=job_properties["wildcards"]) + else: + job._format_wildcards = None + _variables = dict() + _variables.update( + dict(params=job._format_params, wildcards=job._format_wildcards) + ) + if hasattr(job, "rule"): + _variables.update(dict(rule=job.rule)) + try: + return format(string, **_variables) + except NameError as ex: + raise WorkflowError( + "NameError with group job {}: {}".format(job.jobid, str(ex)) + ) + except IndexError as ex: + raise WorkflowError( + "IndexError with group job {}: {}".format(job.jobid, str(ex)) + ) + + +# adapted from ClusterExecutor.cluster_params function in snakemake.executor +def format_values(dictionary, job_properties): + formatted = dictionary.copy() + for key, value in list(formatted.items()): + if key == "mem": + value = str(_convert_units_to_mb(value)) + if isinstance(value, str): + try: + formatted[key] = format_wildcards(value, job_properties) + except NameError as e: + msg = "Failed to format cluster config " "entry for job {}.".format( + job_properties["rule"] + ) + raise WorkflowError(msg, e) + return formatted + + +def convert_job_properties(job_properties, resource_mapping=None): + options = {} + if resource_mapping is None: + resource_mapping = {} + resources = job_properties.get("resources", {}) + for k, v in resource_mapping.items(): + options.update({k: resources[i] for i in v if i in resources}) + + if "threads" in job_properties: + options["cpus-per-task"] = job_properties["threads"] + return options + + +def ensure_dirs_exist(path): + """Ensure output folder for Slurm log files exist.""" + di = dirname(path) + if di == "": + return + if not os.path.exists(di): + os.makedirs(di, exist_ok=True) + return + + +def format_sbatch_options(**sbatch_options): + """Format sbatch options""" + options = [] + for k, v in sbatch_options.items(): + val = "" + if v is not None: + val = f"={v}" + options.append(f"--{k}{val}") + return options + + +def submit_job(jobscript, **sbatch_options): + """Submit jobscript and return jobid.""" + options = format_sbatch_options(**sbatch_options) + try: + cmd = ["sbatch"] + ["--parsable"] + options + [jobscript] + res = sp.check_output(cmd) + except sp.CalledProcessError as e: + raise e + # Get jobid + res = res.decode() + try: + jobid = re.search(r"(\d+)", res).group(1) + except Exception as e: + raise e + return jobid + + +def advanced_argument_conversion(arg_dict): + """Experimental adjustment of sbatch arguments to the given or default partition.""" + # Currently not adjusting for multiple node jobs + nodes = int(arg_dict.get("nodes", 1)) + if nodes > 1: + return arg_dict + partition = arg_dict.get("partition", None) or _get_default_partition() + constraint = arg_dict.get("constraint", None) + ncpus = int(arg_dict.get("cpus-per-task", 1)) + runtime = arg_dict.get("time", None) + memory = _convert_units_to_mb(arg_dict.get("mem", 0)) + config = _get_cluster_configuration(partition, constraint, memory) + mem = arg_dict.get("mem", ncpus * min(config["MEMORY_PER_CPU"])) + mem = _convert_units_to_mb(mem) + if mem > max(config["MEMORY"]): + logger.info( + f"requested memory ({mem}) > max memory ({max(config['MEMORY'])}); " + "adjusting memory settings" + ) + mem = max(config["MEMORY"]) + + # Calculate available memory as defined by the number of requested + # cpus times memory per cpu + AVAILABLE_MEM = ncpus * min(config["MEMORY_PER_CPU"]) + # Add additional cpus if memory is larger than AVAILABLE_MEM + if mem > AVAILABLE_MEM: + logger.info( + f"requested memory ({mem}) > " + f"ncpus x MEMORY_PER_CPU ({AVAILABLE_MEM}); " + "trying to adjust number of cpus up" + ) + ncpus = int(math.ceil(mem / min(config["MEMORY_PER_CPU"]))) + if ncpus > max(config["CPUS"]): + logger.info( + f"ncpus ({ncpus}) > available cpus ({max(config['CPUS'])}); " + "adjusting number of cpus down" + ) + ncpus = min(int(max(config["CPUS"])), ncpus) + adjusted_args = {"mem": int(mem), "cpus-per-task": ncpus} + + # Update time. If requested time is larger than maximum allowed time, reset + if runtime: + runtime = time_to_minutes(runtime) + time_limit = max(config["TIMELIMIT_MINUTES"]) + if runtime > time_limit: + logger.info( + f"time (runtime) > time limit {time_limit}; " "adjusting time down" + ) + adjusted_args["time"] = time_limit + + # update and return + arg_dict.update(adjusted_args) + return arg_dict + + +timeformats = [ + re.compile(r"^(?P<days>\d+)-(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)$"), + re.compile(r"^(?P<days>\d+)-(?P<hours>\d+):(?P<minutes>\d+)$"), + re.compile(r"^(?P<days>\d+)-(?P<hours>\d+)$"), + re.compile(r"^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)$"), + re.compile(r"^(?P<minutes>\d+):(?P<seconds>\d+)$"), + re.compile(r"^(?P<minutes>\d+)$"), +] + + +def time_to_minutes(time): + """Convert time string to minutes. + + According to slurm: + + Acceptable time formats include "minutes", "minutes:seconds", + "hours:minutes:seconds", "days-hours", "days-hours:minutes" + and "days-hours:minutes:seconds". + + """ + if not isinstance(time, str): + time = str(time) + d = {"days": 0, "hours": 0, "minutes": 0, "seconds": 0} + regex = list(filter(lambda regex: regex.match(time) is not None, timeformats)) + if len(regex) == 0: + return + assert len(regex) == 1, "multiple time formats match" + m = regex[0].match(time) + d.update(m.groupdict()) + minutes = ( + int(d["days"]) * 24 * 60 + + int(d["hours"]) * 60 + + int(d["minutes"]) + + math.ceil(int(d["seconds"]) / 60) + ) + assert minutes > 0, "minutes has to be greater than 0" + return minutes + + +def _get_default_partition(): + """Retrieve default partition for cluster""" + cluster = CookieCutter.get_cluster_option() + cmd = f"sinfo -O partition {cluster}" + res = sp.check_output(cmd.split()) + m = re.search(r"(?P<partition>\S+)\*", res.decode(), re.M) + partition = m.group("partition") + return partition + + +def _get_cluster_configuration(partition, constraints=None, memory=0): + """Retrieve cluster configuration. + + Retrieve cluster configuration for a partition filtered by + constraints, memory and cpus + + """ + try: + import pandas as pd + except ImportError: + print( + "Error: currently advanced argument conversion " + "depends on 'pandas'.", file=sys.stderr + ) + sys.exit(1) + + if constraints: + constraint_set = set(constraints.split(",")) + cluster = CookieCutter.get_cluster_option() + cmd = f"sinfo -e -o %all -p {partition} {cluster}".split() + try: + output = sp.Popen(" ".join(cmd), shell=True, stdout=sp.PIPE).communicate() + except Exception as e: + print(e) + raise + data = re.sub("^CLUSTER:.+\n", "", re.sub(" \\|", "|", output[0].decode())) + df = pd.read_csv(StringIO(data), sep="|") + try: + df["TIMELIMIT_MINUTES"] = df["TIMELIMIT"].apply(time_to_minutes) + df["MEMORY_PER_CPU"] = df["MEMORY"] / df["CPUS"] + df["FEATURE_SET"] = df["AVAIL_FEATURES"].str.split(",").apply(set) + except Exception as e: + print(e) + raise + if constraints: + constraint_set = set(constraints.split(",")) + i = df["FEATURE_SET"].apply(lambda x: len(x.intersection(constraint_set)) > 0) + df = df.loc[i] + memory = min(_convert_units_to_mb(memory), max(df["MEMORY"])) + df = df.loc[df["MEMORY"] >= memory] + return df -- GitLab From cb1a3ae19f9dc8451806b4b67ad44802cb28cf4e Mon Sep 17 00:00:00 2001 From: "sukanya.denni" <sukanya.denni@inrae.fr> Date: Fri, 19 Jan 2024 10:05:51 +0100 Subject: [PATCH 4/6] clean up --- vg_exact_data/job.sh | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/vg_exact_data/job.sh b/vg_exact_data/job.sh index 1598450..3ad2949 100644 --- a/vg_exact_data/job.sh +++ b/vg_exact_data/job.sh @@ -1,7 +1,7 @@ #!/bin/bash ################################ Slurm options ################################# ### prepare_calling_jobs -#SBATCH -J vg_smk +#SBATCH -J vg_data ### Max run time "hours:minutes:seconds" #SBATCH --time=120:00:00 #SBATCH --ntasks=1 #nb of processes @@ -14,7 +14,6 @@ #SBATCH -o slurm_logs/snakemake.%N.%j.out #SBATCH -e slurm_logs/snakemake.%N.%j.err #SBATCH --mail-type=END,FAIL -#SBATCH --mail-user=sukanya.denni@univ-rouen.fr ################################################################################ # Useful information to print -- GitLab From 6c5ff9ae0481f9ba066e3eeb700f9207380527de Mon Sep 17 00:00:00 2001 From: "sukanya.denni" <sukanya.denni@inrae.fr> Date: Fri, 19 Jan 2024 10:19:22 +0100 Subject: [PATCH 5/6] add singularity --- .gitignore | 1 + singularity_img/build_singularity.sh | 6 ++++++ singularity_img/to_singularity.def | 19 +++++++++++++++++++ 3 files changed, 26 insertions(+) create mode 100644 singularity_img/build_singularity.sh create mode 100644 singularity_img/to_singularity.def diff --git a/.gitignore b/.gitignore index 103f54c..73777e9 100644 --- a/.gitignore +++ b/.gitignore @@ -49,6 +49,7 @@ !*.yml !*.yaml !Snakefile +!*.def # 3) add a pattern to track the file patterns of section2 even if they are in # subdirectories diff --git a/singularity_img/build_singularity.sh b/singularity_img/build_singularity.sh new file mode 100644 index 0000000..1e15b3d --- /dev/null +++ b/singularity_img/build_singularity.sh @@ -0,0 +1,6 @@ + +### outside of msprime_VISOR_fusionVCF folder +sudo singularity build mspvisor.sif msprime_VISOR_fusionVCF/singularity_img/to_singularity.def + +### use singularity image +#Â singularity exec mspvisor.sif python3 /mspv/tree_generation.py --help diff --git a/singularity_img/to_singularity.def b/singularity_img/to_singularity.def new file mode 100644 index 0000000..8525ee5 --- /dev/null +++ b/singularity_img/to_singularity.def @@ -0,0 +1,19 @@ +Bootstrap: docker +# From: ubuntu:focal-20220415 +From: ubuntu:jammy-20231004 +IncludeCmd: yes + +%files + msprime_VISOR_fusionVCF mspv + +%environment + export PATH="/mspv/tree_generation.py":$PATH + export PATH="/mspv/variants_generation.py":$PATH + +%post + ##Python installation + apt update -y + apt install python3 -y + python3 --version + apt install python3-pip -y + pip install msprime pandas defopt numpy biopython pyyaml -- GitLab From 466849df2319691d234d03bd1e1d29dbd30af6ac Mon Sep 17 00:00:00 2001 From: "sukanya.denni" <sukanya.denni@inrae.fr> Date: Fri, 19 Jan 2024 10:19:31 +0100 Subject: [PATCH 6/6] update README --- README.md | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index d399e76..0be7bbd 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ python tree_generation.py -h python variants_generation.py -h ``` -# ! DOC IS NOT UPDATED +The code may be slow and should be ran on a cluster if you wish to generate a large population. A Singularity image can be created (`singularity_img` folder). ## 1. Use a VCF as a base A VCF is used as a base for genotypes and the position and number of variants. This VCF can be create with `msprime` to generate a population with: @@ -21,14 +21,15 @@ This command output as many VCF as there are chromosomes in the reference FAI. python variants_generation.py -fai <FAI> -fa <FASTA> -v <VCF> -y <YAML> ``` The VCF is the one created with the previous command using `msprime`. -YAML template is available in `VISOR_random_bed` folder. Modify the YAML input to set the percentage of each variant you want to simulate (must equal 100). + +The variants generation is inspired by [VISOR](https://github.com/davidebolo1993/VISOR). YAML template is available in `VISOR_random_bed` folder. Modify the YAML input to set the percentage of each variant you want to simulate (must equal 100). Each variant type has a size distribution file (bins = 100 bp) in folder `sv_distributions`. The data was extracted from [An integrated map of structural variation in 2,504 human genomes (Sudmant, et al. 2015)](https://www.nature.com/articles/nature15394). The distributions are used to randomly sample each structural variant size. # TODO -- [ ] Add non supported VISOR variant types +- [ ] Add not yet supported VISOR variant types - [ ] MNP - [ ] tandem repeat contraction - [ ] tandem repeat expansion @@ -40,6 +41,10 @@ In the `vg_extact_data` folder. Snakemake/Singularity pipeline to get a pangenome in GFA format and a FASTA with all individuals from the VCF. Starts from a reference FASTA and a VCF to specify in the `config.yaml` file, with a name for the output. +Create directory or modify the SBATCH options in `job.sh`. +``` +mkdir -p slurm_logs +``` On SLURM cluster, run `sbatch job.sh dry` for a dry run or `sbatch job.sh` directly. Adjust the `SNG_BIND` variable if files are not found and the snakemake profile as necessary for performance. You can extract a VCF from the graph using the `vg deconstruct` command. It is not implemented in the pipeline. \ No newline at end of file -- GitLab