Source code for xyalign.reftools

# Part of XYalign
# Functions, etc. related to reference fasta processing
from __future__ import division
import logging
import os
import subprocess
import time
import pybedtools
import pysam


# Create logger for reftools submodule
reftools_logger = logging.getLogger("xyalign.reftools")


[docs]class RefFasta(): """ A class for working with external reference fasta files Attributes ---------- filepath : str Full path to external bam file. samtools : str Full path to samtools. Default = 'samtools' bwa : str Full path to bwa. Default = 'bwa' """ def __init__( self, filepath, samtools="samtools", bwa="bwa", no_initial_index=False): self.filepath = filepath self.samtools = samtools self.bwa = bwa self.logger = logging.getLogger("xyalign.reftools.RefFasta") self.logger.info("Creating a RefFasta instance for {}".format( self.filepath)) if no_initial_index is False: if self.is_faidxed() is False: self.index_fai()
[docs] def is_faidxed(self): """ Checks that fai index exists, is not empty and is newer than reference. Returns ------- bool True if fai index exists and is newer than fasta, False otherwise. """ self.logger.info("Checking fai indexing of {}".format(self.filepath)) if os.path.exists("{}.fai".format(self.filepath)): if os.stat("{}.fai".format(self.filepath)).st_size != 0: idx_stamp = os.path.getmtime("{}.fai".format(self.filepath)) else: self.logger.info("Fai index empty") return False else: self.logger.info("No fai index detected") return False ref_stamp = os.path.getmtime(self.filepath) if ref_stamp < idx_stamp: self.logger.info("Fai index present and newer than ref file") return True else: self.logger.info("Fai index is older than ref file") return False
[docs] def index_fai(self): """ Create fai index for reference using samtools ('samtools faidx ref.fa') Returns ------- bool True if successful Raises ------ RuntimeError If return code from external call is not 0 """ self.logger.info("Creating fai index for: {}".format(self.filepath)) idx_start = time.time() rc = subprocess.call([self.samtools, "faidx", self.filepath]) if rc == 0: self.logger.info( "Fai indexing complete. Elapsed time: {} seconds".format( time.time() - idx_start)) return True else: self.logger.error("Unable to create fai index for {}. Exiting".format( self.filepath)) logging.shutdown() raise RuntimeError("Unable to create faidx. Exiting")
[docs] def index_bwa(self): """ Index reference using bwa Returns ------- bool True if successful Raises ------ RuntimeError If return code from external call is not 0 """ self.logger.info("Creating bwa indices for: {}".format( self.filepath)) bwa_idx_start = time.time() rc = subprocess.call([self.bwa, "index", self.filepath]) if rc == 0: self.logger.info( "BWA indexing complete. Elapsed time: {} seconds".format( time.time() - bwa_idx_start)) return True else: self.logger.error( "Unable to create bwa indices for {}. Exiting".format( self.filepath)) logging.shutdown() raise RuntimeError("Unable to create bwa indicies. Exiting")
[docs] def check_bwa_index(self): """ Checks to see if bwa indices are newer than fasta. Returns ------- bool True if indices exist and are newer than fasta. False otherwise. """ self.logger.info("Checking bwa indexing of {}".format(self.filepath)) if ( os.path.exists("{}.amb".format(self.filepath)) and os.path.exists("{}.ann".format(self.filepath)) and os.path.exists("{}.bwt".format(self.filepath)) and os.path.exists("{}.pac".format(self.filepath)) and os.path.exists("{}.sa".format(self.filepath))): if ( os.stat("{}.amb".format(self.filepath)).st_size != 0 and os.stat("{}.ann".format(self.filepath)).st_size != 0 and os.stat("{}.bwt".format(self.filepath)).st_size != 0 and os.stat("{}.pac".format(self.filepath)).st_size != 0 and os.stat("{}.sa".format(self.filepath)).st_size != 0): bwa_idx_stamp = [ os.path.getmtime("{}.amb".format(self.filepath)), os.path.getmtime("{}.ann".format(self.filepath)), os.path.getmtime("{}.bwt".format(self.filepath)), os.path.getmtime("{}.pac".format(self.filepath)), os.path.getmtime("{}.sa".format(self.filepath))] else: self.logger.info("One or more BWA indices empty") return False else: self.logger.info("Bwa index incomplete or absent") return False ref_stamp = os.path.getmtime(self.filepath) if all(x > ref_stamp for x in bwa_idx_stamp) is True: self.logger.info("BWA index present and newer than ref file") return True else: self.logger.info("BWA index is older than ref file") return False
[docs] def conditional_index_bwa(self, bwa="bwa"): """ Indexes if indices are the same age or older than the fasta. Use RefFasta.index_bwa() to force indexing. Parameters ---------- bwa : str Path to bwa program (default is 'bwa') """ if self.check_bwa_index() is False: self.index_bwa()
[docs] def check_seq_dict(self): """ Checks that sequence dictionary exists, is not empty and is newer than reference. Returns ------- bool True if seq dict exists and is newer than fasta, False otherwise. """ self.logger.info( "Checking for sequence dictionary of {}".format(self.filepath)) filename, file_extension = os.path.splitext(self.filepath) if os.path.exists("{}.dict".format(self.filepath)): if os.stat("{}.dict".format(self.filepath)).st_size != 0: idx_stamp = os.path.getmtime("{}.dict".format(self.filepath)) else: self.logger.info("Seq dict empty") return False elif os.path.exists("{}.dict".format(filename)): if os.stat("{}.dict".format(filename)).st_size != 0: idx_stamp = os.path.getmtime("{}.dict".format(filename)) else: self.logger.info("Seq dict empty") return False else: self.logger.info("No seq dict detected") return False ref_stamp = os.path.getmtime(self.filepath) if ref_stamp < idx_stamp: self.logger.info("Sequence dictionary present and newer than ref file") return True else: self.logger.info("Sequence dictionary is older than ref file") return False
[docs] def seq_dict(self, out_dict=None): """ Create sequence dictionary .dict file using samtools Parameters ---------- out_dict : str The desired file name for the sequence dictionary - defaults to adding '.dict' to the fasta name Returns ------- bool True if exit code of external call is 0. Raises ------ RuntimeError If external call exit code is not 0. """ self.logger.info("Creating sequence dictionary for: {}".format( self.filepath)) if out_dict is None: out_dict = "{}.dict".format(self.filepath) dict_start = time.time() rc = subprocess.call( [self.samtools, "dict", "-o", out_dict, self.filepath]) if rc == 0: self.logger.info( "Sequence dictionary complete. " "Elapsed time: {} seconds".format( time.time() - dict_start)) return True else: self.logger.error( "Unable to create sequence dictionary for {}. " "Exiting".format( self.filepath)) logging.shutdown() raise RuntimeError( "Unable to create sequence dictionary. Exiting")
[docs] def conditional_seq_dict(self): """ Creates sequence dictionary if .dict the same age or older than the fasta, or doesn't exist. Use RefFasta.seq_dict() to force creation. """ if self.check_seq_dict() is False: self.seq_dict()
[docs] def mask_reference(self, bed_mask, output_fasta): """ Creates a new masked references by hardmasking regions included in the bed_mask Parameters ---------- bed_mask : str Bed file of regions to mask (as N) in the new reference output_fasta : str The full path to and filename of the output fasta Returns ------- str Path to new (indexed and masked) fasta """ mask_start = time.time() reftools_logger.info("Masking {} using regions in {}".format( self.filepath, bed_mask)) maskedpath = output_fasta b_fasta = pybedtools.BedTool(self.filepath) b_tool = pybedtools.BedTool(bed_mask) b = b_tool.mask_fasta(fi=b_fasta, fo=maskedpath) reftools_logger.info("Creating fai index for {}".format(maskedpath)) subprocess.call( [self.samtools, "faidx", "{}".format(maskedpath)]) reftools_logger.info( "Masking complete. Masked fasta output to {}. " "Elapsed time: {} seconds".format( maskedpath, time.time() - mask_start)) return maskedpath
[docs] def isolate_chroms(self, new_ref_prefix, chroms, bed_mask=None): """ Takes a reference fasta file and a list of chromosomes to include and outputs a new, indexed (and optionally masked) reference fasta. Parameters ---------- new_ref_prefix : str The desired path to and prefix of the output files chroms : list Chromosomes to include in the output fasta bed_mask : str Bed file of regions to mask (as N) in the new reference Returns ------- str Path to new, indexed (optionally masked) fasta """ iso_start = time.time() reftools_logger.info("Isolating chromosomes ({}) from {}.".format( " ".join(chroms), self.filepath)) outpath = "{}.fa".format(new_ref_prefix) if type(chroms) != list: chroms = list(chroms) if bed_mask is not None: maskedpath = "{}.masked.fa".format(new_ref_prefix) with open(outpath, "w") as f: subprocess.call( [self.samtools, "faidx", self.filepath, "{}".format( " ".join(chroms))], stdout=f) subprocess.call( [self.samtools, "faidx", outpath]) b_fasta = pybedtools.BedTool(outpath) b_tool = pybedtools.BedTool(bed_mask) b = b_tool.mask_fasta(fi=b_fasta, fo=maskedpath) subprocess.call( [self.samtools, "faidx", "{}".format(maskedpath)]) reftools_logger.info( "Isolating (masked) chromosomes complete. " "Elapsed time: {} seconds".format(time.time() - iso_start)) return maskedpath else: with open(outpath, "w") as f: subprocess.call( [self.samtools, "faidx", self.filepath, "{}".format( " ".join(chroms))], stdout=f) subprocess.call([self.samtools, "faidx", outpath]) reftools_logger.info( "Isolating (un-masked) chromosomes complete. " "Elapsed time: {} seconds".format(time.time() - iso_start)) return outpath
[docs] def get_chrom_length(self, chrom): """ Extract chromosome length from fasta. Parameters ---------- chrom : str The name of the chromosome or scaffold. Returns ------- length : int The length (integer) of the chromsome/scaffold Raises ------ RuntimeError If chromosome name not present in bam header """ fastafile = pysam.FastaFile(self.filepath) lengths = dict(zip(fastafile.references, fastafile.lengths)) try: lens = lengths[chrom] fastafile.close() return lens except: self.logger.error( "{} not present in {}. Exiting.".format( chrom, self.filepath)) logging.shutdown() raise RuntimeError( "Chromosome name not present in fasta. Exiting")
[docs] def chromosome_bed(self, output_file, chromosome_list): """ Takes list of chromosomes and outputs a bed file with the length of each chromosome on each line (e.g., chr1 0 247249719). Parameters ---------- output_file : str Name of (including full path to) desired output file chromosome_list : list Chromosome/scaffolds to include Returns ------- str output_file Raises ------ RuntimeError If chromosome name is not in fasta. """ c_bed_start = time.time() self.logger.info("Creating bed file with chromosome lengths for {}".format( " ".join(chromosome_list))) with open(output_file, "w") as f: for i in chromosome_list: try: lengths = self.get_chrom_length(i) f.write("{}\t{}\t{}\n".format(i, "0", lengths)) except: self.logger.error( "Error finding chromosome length in {} " "(for bed file)".format(self.filepath)) logging.shutdown() raise RuntimeError( "Error finding chromosome length in {}. Check " "chromosome names and fasta IDs.".format( self.filepath)) self.logger.info( "Bed file ({}) created. Elapsed time: {} seconds".format( output_file, time.time() - c_bed_start)) return output_file
[docs] def chromosome_lengths(self): """ Returns ------- tuple Chromosome lengths ordered by sequence order in fasta """ fastafile = pysam.FastaFile(self.filepath) lengths = fastafile.lengths fastafile.close() # pysam's .lengths does not return a tuple (despite what is in the docs), # so, convert to tuple before returning. return tuple(lengths)
[docs] def chromosome_names(self): """ Returns ------- tuple Chromosome names ordered by sequence order in fasta """ fastafile = pysam.FastaFile(self.filepath) names = fastafile.references fastafile.close() # pysam's .lengths does not return a tuple (despite what is in the docs), # so, convert to tuple before returning. return tuple(names)