From 4cd6c05f4a6831757397577d74da41c8378317d4 Mon Sep 17 00:00:00 2001 From: priyappillai <34429433+priyappillai@users.noreply.github.com> Date: Thu, 10 Sep 2020 03:36:13 -0400 Subject: [PATCH 1/4] Added diversity metric to design_naively; changed design_naively to get best n guides within a window Allows design_naively to output designs based on a diversity metric (currently, only works with entropy). Added command line arguments to pick which methods are used, to design based on a reference sequence, and to set the number of guides to design for in each window. Modified constuct_guide_naively and find_guide_in_each_window in design_naively.py for the above and wrote a function to calculate entropy at each position (position_entropy) in alignment.py --- adapt/alignment.py | 30 ++++++- bin/design_naively.py | 194 +++++++++++++++++++++++++++++------------- 2 files changed, 166 insertions(+), 58 deletions(-) diff --git a/adapt/alignment.py b/adapt/alignment.py index ec4daee..1c541b0 100644 --- a/adapt/alignment.py +++ b/adapt/alignment.py @@ -6,6 +6,7 @@ import statistics import numpy as np +from math import log2 from adapt.utils import guide from adapt.utils import lsh @@ -801,7 +802,7 @@ def sequences_bound_by_guide(self, gd_seq, gd_start, mismatches, """Determine the sequences to which a guide hybridizes. Args: - gd_seq: seequence of the guide + gd_seq: sequence of the guide gd_start: start position of the guide in the alignment mismatches: threshold on number of mismatches for determining whether a guide would hybridize to a target sequence @@ -832,6 +833,33 @@ def sequences_bound_by_guide(self, gd_seq, gd_start, mismatches, binding_seqs += [seq_idx] return binding_seqs + def position_entropy(self): + """Determine the entropy at each position in the alignment. + + Returns: + list with the entropy of position i listed at index i + """ + + position_entropy = [] + for i in range(self.seq_length): + counts = {'A': 0, 'T': 0, 'C': 0, 'G': 0, '-': 0} + for b in [self.seqs[i][j] for j in range(self.num_sequences)]: + if b in counts: + counts[b] += 1 + elif b in guide.FASTA_CODES: + for c in guide.FASTA_CODES[b]: + counts[c] += 1.0 / len(guide.FASTA_CODES[b]) + else: + raise ValueError("Unknown base call %s" % b) + + # Calculate entropy + probabilities = [counts[base]/self.num_sequences for base in counts] + this_position_entropy = sum([-p*log2(p) for p in probabilities if p > 0]) + + position_entropy.append(this_position_entropy) + + return position_entropy + @staticmethod def from_list_of_seqs(seqs): """Construct a Alignment from aligned list of sequences. diff --git a/bin/design_naively.py b/bin/design_naively.py index f29acd6..012fc89 100644 --- a/bin/design_naively.py +++ b/bin/design_naively.py @@ -6,6 +6,7 @@ import argparse import logging +import heapq from adapt import alignment from adapt.utils import log @@ -16,7 +17,7 @@ logger = logging.getLogger(__name__) -def construct_guide_naively_at_each_pos(aln, args): +def construct_guide_naively_at_each_pos(aln, args, ref_seq=None): """Naively construct a guide sequence at each position of an alignment. This constructs a guide sequence at each position. It does so in two @@ -26,6 +27,7 @@ def construct_guide_naively_at_each_pos(aln, args): Args: aln: alignment.Alignment object args: arguments to program + ref_seq: reference sequence to base diversity guides on Returns: list x where x[i] is a dict, giving a guide sequence at position @@ -53,12 +55,19 @@ def construct_guide_naively_at_each_pos(aln, args): # too many sequences with a gap consensus_guide = None mode_guide = None + diversity_guide = None else: # Construct guides consensus_guide = aln_for_guide.determine_consensus_sequence( - seqs_to_consider=seqs_to_consider) + seqs_to_consider=seqs_to_consider) \ + if args.consensus else None + mode_guide = aln_for_guide.determine_most_common_sequence( - seqs_to_consider=seqs_to_consider, skip_ambiguity=True) + seqs_to_consider=seqs_to_consider, skip_ambiguity=True) \ + if args.mode else None + + diversity_guide = ref_seq[pos_start:pos_end] \ + if args.diversity else None # Determine the fraction of the sequences that each guide binds to if consensus_guide is not None: @@ -79,64 +88,100 @@ def construct_guide_naively_at_each_pos(aln, args): mode_guide = 'None' mode_guide_frac = 0 - d = {'consensus': (consensus_guide, consensus_guide_frac), - 'mode': (mode_guide, mode_guide_frac)} + if diversity_guide is not None: + if args.diversity == 'entropy': + all_entropy = aln_for_guide.position_entropy() + diversity_metric = sum(all_entropy)/args.guide_length + else: + raise ValueError("Invalid diversity method '%s'; use one of ['entropy']" %args.diversity_metric) + else: + diversity_guide = 'None' + diversity_metric = float('inf') + + d = {} + if args.consensus: + d['consensus'] = (consensus_guide, consensus_guide_frac) + if args.mode: + d['mode'] = (mode_guide, mode_guide_frac) + if args.diversity: + d[args.diversity] = (diversity_guide, diversity_metric) guides[i] = d return guides -def find_guide_in_each_window(guides, aln_length, args): +def find_guide_in_each_window(guides, aln_length, args, obj_type='max'): """Determine a guide for each window of an alignment. For each window, this selects the guide within it (given one - guide per position) that covers the highest fraction of sequences. + guide per position) that has the best metric score. To break ties, this selects the first guide (in terms of - position) among ones with a tied fraction covered. + position) among ones with a tied metric. Args: - guides: list such that guides[i] is a tuple (guide, frac) + guides: list such that guides[i] is a tuple (guide, metric) giving a guide sequence (guide) at position i of - an alignment and the fraction of sequences in the - alignment (frac) covered by the guide + an alignment and a metric (metric) that can be used to + compare guide quality aln_length: length of alignment args: arguments to program + obj_type: if 'max', consider the largest value the best; else + if 'min', consider the smallest value the best. Must be + either 'min' or 'max' Returns: - list x where x[i] gives a tuple (guide, frac) representing a - guide in the window that starts at position i + list x where x[i] gives a list of the args.best_n tuples (guide, metric) + representing guides in the window that starts at position i """ window_start_positions = range(aln_length - args.window_size + 1) - guide_in_window = [None for _ in window_start_positions] - best_guide_seq, best_guide_frac, best_guide_pos = None, -1, -1 + guide_in_window = [[] for _ in window_start_positions] for i in window_start_positions: window_start, window_end = i, i + args.window_size last_guide_pos = window_end - args.guide_length logger.info("Searching for a guide within window [%d, %d)" % (window_start, window_end)) - - if best_guide_pos < window_start: + + # Check if any of the guides are no longer within the window + # Keep track of which positions still have guides in the heap + positions = set() + for guide in guide_in_window[i-1]: + if guide[2] >= window_start: + guide_in_window[i].append(guide) + positions.add(guide[2]) + heapq.heapify(guide_in_window[i]) + + if len(guide_in_window[i]) < args.best_n: # The best guide is no longer in the window; find # a new one - best_guide_seq, best_guide_frac, best_guide_pos = None, -1, -1 for j in range(window_start, last_guide_pos + 1): - guide, frac = guides[j] - if frac > best_guide_frac: - best_guide_seq = guide - best_guide_frac = frac - best_guide_pos = j + # Skip if guide is already in the heap + if j in positions: + continue + + guide, metric = guides[j] + # Reverse order for minimizing + if obj_type == 'min': + metric = -metric + + if len(guide_in_window[i]) < args.best_n: + heapq.heappush(guide_in_window[i], (metric, guide, j)) + elif metric > guide_in_window[i][0][0]: + heapq.heappushpop(guide_in_window[i], (metric, guide, j)) else: - # The last best guide is still within the window, but now + # All args.best_n guides are still within the window, but now # check if the new guide at the very end of the window # does better - guide, frac = guides[last_guide_pos] - if frac > best_guide_frac: - best_guide_seq = guide - best_guide_frac = frac - best_guide_pos = last_guide_pos - - # Save the best guide for the current window - guide_in_window[i] = (best_guide_seq, best_guide_frac) + guide, metric = guides[last_guide_pos] + # Reverse order for minimizing + if obj_type == 'min': + metric = -metric + if metric > guide_in_window[i][0][0]: + heapq.heappushpop(guide_in_window[i], (metric, guide, last_guide_pos)) + + # Undo reverse order for minimizing and sort + fix = 1 if obj_type == 'max' else -1 + guide_in_window = [[(guide, fix*metric) for metric, guide, _ in sorted(guide_in_window_i, reverse=True)] \ + for guide_in_window_i in guide_in_window] return guide_in_window @@ -147,37 +192,51 @@ def main(args): # Read the input alignment seqs = seq_io.read_fasta(args.in_fasta) aln = alignment.Alignment.from_list_of_seqs(list(seqs.values())) + ref_seq = seqs[args.ref_seq] if args.ref_seq else None # Construct a guide at each position of the alignment logger.info("Constructing guides naively at each position of alignment") - guides = construct_guide_naively_at_each_pos(aln, args) - - # Find the best guide in each window (for both the - # consensus and mode approach) - logger.info("Searching for consensus guides") - consensus_guides_in_window = find_guide_in_each_window( - [guides[i]['consensus'] for i in range(len(guides))], - aln.seq_length, args) - logger.info("Searching for mode guides") - mode_guides_in_window = find_guide_in_each_window( - [guides[i]['mode'] for i in range(len(guides))], - aln.seq_length, args) + guides = construct_guide_naively_at_each_pos(aln, args, ref_seq=ref_seq) + + # Find the best guide in each window (for the + # consensus, mode, and diversity approaches) + if args.consensus: + logger.info("Searching for consensus guides") + consensus_guides_in_window = find_guide_in_each_window( + [guides[i]['consensus'] for i in range(len(guides))], + aln.seq_length, args) + if args.mode: + logger.info("Searching for mode guides") + mode_guides_in_window = find_guide_in_each_window( + [guides[i]['mode'] for i in range(len(guides))], + aln.seq_length, args) + if args.diversity: + logger.info("Searching for %s guides" %args.diversity) + diversity_guides_in_window = find_guide_in_each_window( + [guides[i][args.diversity] for i in range(len(guides))], + aln.seq_length, args, obj_type='min') # Write the guides to a TSV file with open(args.out_tsv, 'w') as outf: - header = ['window-start', 'window-end', - 'target-sequence-by-consensus', 'frac-bound-by-consensus', - 'target-sequence-by-mode', 'frac-bound-by-mode'] + header = ['window-start', 'window-end', 'rank'] + if args.consensus: + header.extend(['target-sequence-by-consensus', 'frac-bound-by-consensus']) + if args.mode: + header.extend(['target-sequence-by-mode', 'frac-bound-by-mode']) + if args.diversity: + header.extend(['target-sequence-by-%s' %args.diversity, args.diversity]) + outf.write('\t'.join(header) + '\n') - for i in range(len(consensus_guides_in_window)): - window_start, window_end = i, i + args.window_size - consensus_guide_seq, consensus_guide_frac = \ - consensus_guides_in_window[i] - mode_guide_seq, mode_guide_frac = mode_guides_in_window[i] - line = [window_start, window_end, - consensus_guide_seq, consensus_guide_frac, - mode_guide_seq, mode_guide_frac] - outf.write('\t'.join([str(x) for x in line]) + '\n') + for i in range(aln.seq_length - args.window_size + 1): + for j in range(args.best_n): + line = [i, i + args.window_size, j+1] + if args.consensus: + line.extend(consensus_guides_in_window[i][j]) + if args.mode: + line.extend(mode_guides_in_window[i][j]) + if args.diversity: + line.extend(diversity_guides_in_window[i][j]) + outf.write('\t'.join([str(x) for x in line]) + '\n') if __name__ == "__main__": @@ -191,7 +250,7 @@ def main(args): # Window size parser.add_argument('-w', '--window-size', type=int, default=200, - help=("Output a guide within each window (sliding along " + help=("Output guide(s) within each window (sliding along " "the alignment) of this length")) # Parameters on guide length and mismatches @@ -201,6 +260,10 @@ def main(args): help=("Allow for this number of mismatches when " "determining whether a guide covers a sequence")) + # Best n guides per window + parser.add_argument('--best-n', type=int, default=1, + help=("Find the best BEST_N guides in each window")) + # G-U pairing options parser.add_argument('--do-not-allow-gu-pairing', action='store_true', help=("When determining whether a guide binds to a region of " @@ -216,6 +279,23 @@ def main(args): help=("If this fraction or more of sequences at a position contain " "a gap character, do not design a guide there")) + # Reference sequence + parser.add_argument('--ref-seq', type=str, default=None, + help=("The accession number of the reference sequence to design " + "guides based on sequence diversity; required for diversity " + "method")) + + # Guide sequence methods + parser.add_argument('--consensus', type=bool, default=True, + help=("True (default) to use the consensus method to determine guides; " + "False otherwise")) + parser.add_argument('--mode', type=bool, default=True, + help=("True (default) to use the mode method to determine guides; " + "False otherwise")) + parser.add_argument('--diversity', type=str, default=None, + help=("A string of which diversity method to use to determine guides " + "('entropy'); None (default) to not use a diversity method")) + # Log levels parser.add_argument("--debug", dest="log_level", From 59a0f41d9a3614cf263281e15147345d87c1a8b6 Mon Sep 17 00:00:00 2001 From: priyappillai <34429433+priyappillai@users.noreply.github.com> Date: Sun, 13 Sep 2020 23:47:30 -0400 Subject: [PATCH 2/4] Fixed design_naively base on PR Added a check if the loop is not on the first round in find_guide_in_each_window, modified comments and documentation to be clearer, added error messages to help with debugging, changed input types for consensus and mode (previous were not functional), fixed some typos --- adapt/guide_search.py | 2 +- adapt/specificity/alignment_query.py | 2 +- bin/design_naively.py | 53 ++++++++++++++++++---------- 3 files changed, 37 insertions(+), 20 deletions(-) diff --git a/adapt/guide_search.py b/adapt/guide_search.py index 2865e3a..8031f55 100644 --- a/adapt/guide_search.py +++ b/adapt/guide_search.py @@ -1504,7 +1504,7 @@ def _analyze_guides(self, start, curr_activities): def _analyze_guides_memoized(self, start, curr_activities, use_last=False): - """Make a memoized call to self._analyze_gudes(). + """Make a memoized call to self._analyze_guides(). Args: start: start position in alignment at which to target diff --git a/adapt/specificity/alignment_query.py b/adapt/specificity/alignment_query.py index 6c6ab0d..e46d029 100644 --- a/adapt/specificity/alignment_query.py +++ b/adapt/specificity/alignment_query.py @@ -325,7 +325,7 @@ def setup(self): (up to ambiguity) to be non-specific. However, this alternative option does not make sense in the extreme case: for example, if a k-mer is all Ns, then every sequence would be deemed non-specific. Nevertheless, - ignoring k-mers with amguity entirely might not be the best choice so + ignoring k-mers with ambiguity entirely might not be the best choice so this should be revisited (TODO). """ unambig_chars = set(['a', 'c', 'g', 't']) diff --git a/bin/design_naively.py b/bin/design_naively.py index 012fc89..3d18764 100644 --- a/bin/design_naively.py +++ b/bin/design_naively.py @@ -144,15 +144,16 @@ def find_guide_in_each_window(guides, aln_length, args, obj_type='max'): # Check if any of the guides are no longer within the window # Keep track of which positions still have guides in the heap positions = set() - for guide in guide_in_window[i-1]: - if guide[2] >= window_start: - guide_in_window[i].append(guide) - positions.add(guide[2]) + if i > 0: + for guide in guide_in_window[i-1]: + if guide[2] >= window_start: + guide_in_window[i].append(guide) + positions.add(guide[2]) heapq.heapify(guide_in_window[i]) if len(guide_in_window[i]) < args.best_n: - # The best guide is no longer in the window; find - # a new one + # One of the previous args.best_n best guides is no longer in the + # window; find a new one for j in range(window_start, last_guide_pos + 1): # Skip if guide is already in the heap if j in positions: @@ -188,11 +189,23 @@ def find_guide_in_each_window(guides, aln_length, args, obj_type='max'): def main(args): # Allow G-U base pairing, unless it is explicitly disallowed args.allow_gu_pairs = not args.do_not_allow_gu_pairing + # Run the consensus method, unless it is explicitly disallowed + args.consensus = not args.no_consensus + # Run the mode method, unless it is explicitly disallowed + args.mode = not args.no_mode + + if args.diversity: + if not args.ref_seq: + raise Exception('Must include a reference sequence label to run any diversity method') # Read the input alignment seqs = seq_io.read_fasta(args.in_fasta) aln = alignment.Alignment.from_list_of_seqs(list(seqs.values())) - ref_seq = seqs[args.ref_seq] if args.ref_seq else None + try: + ref_seq = seqs[args.ref_seq] if args.ref_seq else None + except KeyError: + raise Exception('Reference sequence %s does not match any label of the sequences in the given FASTA' \ + %args.ref_seq) # Construct a guide at each position of the alignment logger.info("Constructing guides naively at each position of alignment") @@ -281,20 +294,24 @@ def main(args): # Reference sequence parser.add_argument('--ref-seq', type=str, default=None, - help=("The accession number of the reference sequence to design " - "guides based on sequence diversity; required for diversity " - "method")) + help=("The label used in the FASTA file of the reference sequence " + "to design guides based on sequence diversity; required " + "for diversity method")) # Guide sequence methods - parser.add_argument('--consensus', type=bool, default=True, - help=("True (default) to use the consensus method to determine guides; " - "False otherwise")) - parser.add_argument('--mode', type=bool, default=True, - help=("True (default) to use the mode method to determine guides; " - "False otherwise")) - parser.add_argument('--diversity', type=str, default=None, + parser.add_argument('--no-consensus', action='store_true', + help=("If set, do not use the consensus method to determine guides; " + "otherwise, will use the consensus method")) + parser.add_argument('--no-mode', action='store_true', + help=("If set, do not use the mode method to determine guides; " + "otherwise, will use the mode method")) + parser.add_argument('--diversity', type=str, default=None, choices=["entropy"], help=("A string of which diversity method to use to determine guides " - "('entropy'); None (default) to not use a diversity method")) + "('entropy'); None (default) to not use a diversity method. " + "'entropy' will calculate the average per position entropy of " + "each potential guide, then return the guides at the positions " + "with the lowest entropy; nucleotides are determined by the " + "reference sequence")) # Log levels parser.add_argument("--debug", From ad905a355767b40472611e9843640183c0beceea Mon Sep 17 00:00:00 2001 From: priyappillai <34429433+priyappillai@users.noreply.github.com> Date: Thu, 17 Sep 2020 06:19:26 -0400 Subject: [PATCH 3/4] Added design_naively flanking seqs; testing design_naively now accounts for flanking sequences; automated testing for design_naively functions (and entropy functions in alignment.py) now exists, fixed tie-breaker of position in heap --- adapt/tests/test_alignment.py | 35 ++++++++ bin/design.py | 2 +- bin/design_naively.py | 118 ++++++++++++++++++--------- tests/__init__.py | 0 tests/test_design_naively.py | 149 ++++++++++++++++++++++++++++++++++ 5 files changed, 264 insertions(+), 40 deletions(-) create mode 100644 tests/__init__.py create mode 100644 tests/test_design_naively.py diff --git a/adapt/tests/test_alignment.py b/adapt/tests/test_alignment.py index bab1cac..656d788 100644 --- a/adapt/tests/test_alignment.py +++ b/adapt/tests/test_alignment.py @@ -3,6 +3,7 @@ import random import unittest +from math import log2 from adapt import alignment from adapt.utils import lsh @@ -564,6 +565,40 @@ def test_most_common_sequence_with_ambiguity(self): self.assertEqual(aln.determine_most_common_sequence(skip_ambiguity=True), 'GGGCCC') + def test_position_entropy_simple(self): + seqs = ['ACCCC', + 'AAGGC', + 'AAATA', + 'AAAAA'] + aln = alignment.Alignment.from_list_of_seqs(seqs) + all_ps = [[1], + [0.25, 0.75], + [0.25, 0.25, .5], + [0.25, 0.25, 0.25, 0.25], + [0.5, 0.5]] + + entropy = [sum([-p*log2(p) for p in ps]) for ps in all_ps] + self.assertEqual(aln.position_entropy(), + entropy) + + def test_position_entropy_with_ambiguity(self): + seqs = ['MRWSYKVHDBN'] + aln = alignment.Alignment.from_list_of_seqs(seqs) + all_ps = [[0.5, 0.5], + [0.5, 0.5], + [0.5, 0.5], + [0.5, 0.5], + [0.5, 0.5], + [0.5, 0.5], + [1/3.0, 1/3.0, 1/3.0], + [1/3.0, 1/3.0, 1/3.0], + [1/3.0, 1/3.0, 1/3.0], + [1/3.0, 1/3.0, 1/3.0], + [0.25, 0.25, 0.25, 0.25]] + entropy = [sum([-p*log2(p) for p in ps]) for ps in all_ps] + self.assertEqual(aln.position_entropy(), + entropy) + def test_construct_from_0_seqs(self): with self.assertRaises(Exception): seqs = [] diff --git a/bin/design.py b/bin/design.py index 688296d..5741e09 100644 --- a/bin/design.py +++ b/bin/design.py @@ -1128,7 +1128,7 @@ def __call__(self, parser, namespace, values, option_string=None): input_auto_common_subparser.add_argument('--ncbi-api-key', help=("API key to use for NCBI e-utils. Using this increases the " "limit on requests/second and may prevent an IP address " - "from being block due to too many requests")) + "from being blocked due to too many requests")) # Auto prepare from file input_autofile_subparser = argparse.ArgumentParser(add_help=False) diff --git a/bin/design_naively.py b/bin/design_naively.py index 3d18764..c084ec0 100644 --- a/bin/design_naively.py +++ b/bin/design_naively.py @@ -39,63 +39,76 @@ def construct_guide_naively_at_each_pos(aln, args, ref_seq=None): """ start_positions = range(aln.seq_length - args.guide_length + 1) guides = [None for _ in start_positions] + ref_seq_aln = alignment.Alignment.from_list_of_seqs([ref_seq]) for i in start_positions: # Extract the portion of the alignment that starts at i pos_start, pos_end = i, i + args.guide_length aln_for_guide = aln.extract_range(pos_start, pos_end) # When constructing guides, ignore any sequences in the alignment - # that have a gap in this region + # that have a gap in this region. seqs_with_gap = set(aln_for_guide.seqs_with_gap()) seqs_to_consider = set(range(aln.num_sequences)) - seqs_with_gap + # Only look at sequences with valid flanking regions + seqs_to_consider = aln.seqs_with_required_flanking(i, args.guide_length, + args.required_flanking_seqs, seqs_to_consider=seqs_to_consider) + ref_seqs_to_consider = ref_seq_aln.seqs_with_required_flanking( + i, args.guide_length, args.required_flanking_seqs) + + consensus_guide = None + mode_guide = None + diversity_guide = None frac_with_gap = float(len(seqs_with_gap)) / aln.num_sequences - if frac_with_gap >= args.skip_gaps: - # Do not bother designing a guide here; there are - # too many sequences with a gap - consensus_guide = None - mode_guide = None - diversity_guide = None - else: + # Do not bother designing a guide here; there are + # too many sequences with a gap + if frac_with_gap < args.skip_gaps: # Construct guides - consensus_guide = aln_for_guide.determine_consensus_sequence( - seqs_to_consider=seqs_to_consider) \ - if args.consensus else None - - mode_guide = aln_for_guide.determine_most_common_sequence( - seqs_to_consider=seqs_to_consider, skip_ambiguity=True) \ - if args.mode else None + # Only design guides if there exist sequences to consider + if len(seqs_to_consider) > 0: + if args.consensus: + consensus_guide = aln_for_guide.determine_consensus_sequence( + seqs_to_consider=seqs_to_consider) + if args.mode: + mode_guide = aln_for_guide.determine_most_common_sequence( + seqs_to_consider=seqs_to_consider, skip_ambiguity=True) \ - diversity_guide = ref_seq[pos_start:pos_end] \ - if args.diversity else None + if len(ref_seqs_to_consider) > 0: + if args.diversity: + diversity_guide = ref_seq[pos_start:pos_end] # Determine the fraction of the sequences that each guide binds to if consensus_guide is not None: - consensus_guide_bound = aln_for_guide.sequences_bound_by_guide( - consensus_guide, 0, args.guide_mismatches, - args.allow_gu_pairs) + consensus_guide_bound = aln.sequences_bound_by_guide( + consensus_guide, i, args.guide_mismatches, + args.allow_gu_pairs, required_flanking_seqs=args.required_flanking_seqs) consensus_guide_frac = float(len(consensus_guide_bound)) / aln.num_sequences else: consensus_guide = 'None' consensus_guide_frac = 0 if mode_guide is not None: - mode_guide_bound = aln_for_guide.sequences_bound_by_guide( - mode_guide, 0, args.guide_mismatches, - args.allow_gu_pairs) + mode_guide_bound = aln.sequences_bound_by_guide( + mode_guide, i, args.guide_mismatches, + args.allow_gu_pairs, required_flanking_seqs=args.required_flanking_seqs) mode_guide_frac = float(len(mode_guide_bound)) / aln.num_sequences else: mode_guide = 'None' mode_guide_frac = 0 if diversity_guide is not None: + diversity_guide_bound = aln.sequences_bound_by_guide( + diversity_guide, i, args.guide_mismatches, + args.allow_gu_pairs, required_flanking_seqs=args.required_flanking_seqs) + diversity_guide_frac = float(len(diversity_guide_bound)) / aln.num_sequences if args.diversity == 'entropy': all_entropy = aln_for_guide.position_entropy() diversity_metric = sum(all_entropy)/args.guide_length else: - raise ValueError("Invalid diversity method '%s'; use one of ['entropy']" %args.diversity_metric) + raise ValueError("Invalid diversity method '%s'; use one of ['entropy']" %args.diversity) else: diversity_guide = 'None' + diversity_guide_frac = 0 diversity_metric = float('inf') d = {} @@ -104,7 +117,7 @@ def construct_guide_naively_at_each_pos(aln, args, ref_seq=None): if args.mode: d['mode'] = (mode_guide, mode_guide_frac) if args.diversity: - d[args.diversity] = (diversity_guide, diversity_metric) + d[args.diversity] = ((diversity_guide, diversity_guide_frac), diversity_metric) guides[i] = d return guides @@ -112,8 +125,8 @@ def construct_guide_naively_at_each_pos(aln, args, ref_seq=None): def find_guide_in_each_window(guides, aln_length, args, obj_type='max'): """Determine a guide for each window of an alignment. - For each window, this selects the guide within it (given one - guide per position) that has the best metric score. + For each window, this selects the args.best_n guides within it + (given one guide per position) that has the best metric score. To break ties, this selects the first guide (in terms of position) among ones with a tied metric. @@ -146,9 +159,9 @@ def find_guide_in_each_window(guides, aln_length, args, obj_type='max'): positions = set() if i > 0: for guide in guide_in_window[i-1]: - if guide[2] >= window_start: + if guide[1] >= window_start: guide_in_window[i].append(guide) - positions.add(guide[2]) + positions.add(guide[1]) heapq.heapify(guide_in_window[i]) if len(guide_in_window[i]) < args.best_n: @@ -156,7 +169,7 @@ def find_guide_in_each_window(guides, aln_length, args, obj_type='max'): # window; find a new one for j in range(window_start, last_guide_pos + 1): # Skip if guide is already in the heap - if j in positions: + if -j in positions: continue guide, metric = guides[j] @@ -164,24 +177,30 @@ def find_guide_in_each_window(guides, aln_length, args, obj_type='max'): if obj_type == 'min': metric = -metric + # If there aren't args.best_n guides on the heap yet, add the + # guide to the heap if len(guide_in_window[i]) < args.best_n: - heapq.heappush(guide_in_window[i], (metric, guide, j)) + # Use negative position to break ties + heapq.heappush(guide_in_window[i], (metric, -j, guide)) + # If the new guide has a better metric than the worst guide on the + # heap, remove the worst guide and add the new one elif metric > guide_in_window[i][0][0]: - heapq.heappushpop(guide_in_window[i], (metric, guide, j)) + # Use negative position to break ties + heapq.heappushpop(guide_in_window[i], (metric, -j, guide)) else: # All args.best_n guides are still within the window, but now - # check if the new guide at the very end of the window - # does better + # check if the new guide at the very end of the window does better guide, metric = guides[last_guide_pos] # Reverse order for minimizing if obj_type == 'min': metric = -metric if metric > guide_in_window[i][0][0]: - heapq.heappushpop(guide_in_window[i], (metric, guide, last_guide_pos)) + heapq.heappushpop(guide_in_window[i], (metric, -last_guide_pos, guide)) # Undo reverse order for minimizing and sort fix = 1 if obj_type == 'max' else -1 - guide_in_window = [[(guide, fix*metric) for metric, guide, _ in sorted(guide_in_window_i, reverse=True)] \ + guide_in_window = [[(guide, fix*metric) for metric, _, guide \ + in sorted(guide_in_window_i, reverse=True)] \ for guide_in_window_i in guide_in_window] return guide_in_window @@ -193,11 +212,13 @@ def main(args): args.consensus = not args.no_consensus # Run the mode method, unless it is explicitly disallowed args.mode = not args.no_mode - + # Check if there is a reference sequence if there is a diversity method if args.diversity: if not args.ref_seq: raise Exception('Must include a reference sequence label to run any diversity method') + args.required_flanking_seqs = (args.require_flanking5, args.require_flanking3) + # Read the input alignment seqs = seq_io.read_fasta(args.in_fasta) aln = alignment.Alignment.from_list_of_seqs(list(seqs.values())) @@ -237,7 +258,7 @@ def main(args): if args.mode: header.extend(['target-sequence-by-mode', 'frac-bound-by-mode']) if args.diversity: - header.extend(['target-sequence-by-%s' %args.diversity, args.diversity]) + header.extend(['target-sequence-by-%s' %args.diversity, args.diversity, 'frac-bound-by-%s' %args.diversity]) outf.write('\t'.join(header) + '\n') for i in range(aln.seq_length - args.window_size + 1): @@ -248,7 +269,10 @@ def main(args): if args.mode: line.extend(mode_guides_in_window[i][j]) if args.diversity: - line.extend(diversity_guides_in_window[i][j]) + diversity_line = (diversity_guides_in_window[i][j][0][0], + diversity_guides_in_window[i][j][1], + diversity_guides_in_window[i][j][0][1]) + line.extend(diversity_line) outf.write('\t'.join([str(x) for x in line]) + '\n') @@ -313,6 +337,22 @@ def main(args): "with the lowest entropy; nucleotides are determined by the " "reference sequence")) + # Requiring flanking sequence (PFS) + parser.add_argument('--require-flanking5', + help=("Require the given sequence on the 5' protospacer flanking " + "site (PFS) of each designed guide; this tolerates ambiguity " + "in the sequence (e.g., 'H' requires 'A', 'C', or 'T', or, " + "equivalently, avoids guides flanked by 'G'). Note that " + "this is the 5' end in the target sequence (not the spacer " + "sequence).")) + parser.add_argument('--require-flanking3', + help=("Require the given sequence on the 3' protospacer flanking " + "site (PFS) of each designed guide; this tolerates ambiguity " + "in the sequence (e.g., 'H' requires 'A', 'C', or 'T', or, " + "equivalently, avoids guides flanked by 'G'). Note that " + "this is the 3' end in the target sequence (not the spacer " + "sequence).")) + # Log levels parser.add_argument("--debug", dest="log_level", diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_design_naively.py b/tests/test_design_naively.py new file mode 100644 index 0000000..09d47b5 --- /dev/null +++ b/tests/test_design_naively.py @@ -0,0 +1,149 @@ +"""Tests for design_naively.py +""" + +import random +import copy +import unittest + +from argparse import Namespace +from adapt import alignment +from importlib.machinery import SourceFileLoader +from importlib.util import spec_from_loader, module_from_spec +import os.path +import types + +file_path = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "bin", "design_naively.py")) + +loader = SourceFileLoader("design_naively", file_path) +spec = spec_from_loader(loader.name, loader) +design_naively = module_from_spec(spec) +loader.exec_module(design_naively) + +__author__ = 'Priya Pillai ' + + +class TestDesignNaively(unittest.TestCase): + """Test design_naively.py + """ + + def test_construct_guide_naively_at_each_pos_simple(self): + ref_seq = 'ACAAA' + seqs = [ref_seq, + 'AAAAA', + 'AAAAA', + 'AAAAA', + 'GGATA', + 'GGATA', + 'GGTAA', + 'GGTAA'] + + def test_seqs(seqs, args): + aln = alignment.Alignment.from_list_of_seqs(seqs) + guides = design_naively.construct_guide_naively_at_each_pos(aln, + args, ref_seq=ref_seq) + guides = [list(guide.values())[0] for guide in guides] + return guides + + args = baseArgs() + args.consensus = True + self.assertEqual(test_seqs(seqs, args), [('AG', 0.0), ('GA', 0.25), ('AA', 0.5), ('AA', 0.75)]) + + args.consensus = False + args.mode = True + self.assertEqual(test_seqs(seqs, args), [('GG', 0.5), ('AA', 0.375), ('AA', 0.5), ('AA', 0.75)]) + + args.mode = False + args.diversity = 'entropy' + test_guides = test_seqs(seqs, args) + test_guides = [guide[0] for guide in test_guides] + self.assertEqual(test_guides, [('AC', 0.125), ('CA', 0.125), ('AA', 0.5), ('AA', 0.75)]) + + def test_construct_guide_naively_at_each_pos_flanking(self): + ref_seq = 'ACAAA' + seqs = [ref_seq, + 'AAAAA', + 'AAAAA', + 'AAAAA', + 'GGATA', + 'GGATA', + 'GGTAA', + 'GGTAA'] + + def test_seqs(seqs, args): + aln = alignment.Alignment.from_list_of_seqs(seqs) + guides = design_naively.construct_guide_naively_at_each_pos(aln, + args, ref_seq=ref_seq) + guides = [list(guide.values())[0] for guide in guides] + return guides + + args = baseArgs() + args.required_flanking_seqs = ('B', 'V') + args.consensus = True + self.assertEqual(test_seqs(seqs, args), [('None', 0), ('GT', 0.25), ('AA', 0.125), ('None', 0)]) + + args.consensus = False + args.mode = True + self.assertEqual(test_seqs(seqs, args), [('None', 0), ('GT', 0.25), ('AT', 0.25), ('None', 0)]) + + args.mode = False + args.diversity = 'entropy' + test_guides = test_seqs(seqs, args) + test_guides = [guide[0] for guide in test_guides] + self.assertEqual(test_guides, [('None', 0), ('None', 0), ('AA', 0.125), ('None', 0)]) + + def test_find_guide_in_each_window_simple(self): + guides = [('TT', 50), + ('GT', 30), + ('GG', 10), + ('CG', 40), + ('CC', 60), + ('CA', 70), + ('AA', 20)] + + args = baseArgs() + max_windows = design_naively.find_guide_in_each_window(guides, len(guides) + 1, args) + min_windows = design_naively.find_guide_in_each_window(guides, len(guides) + 1, args, obj_type='min') + + def test_find_guide_in_each_window_ties(self): + guides = [('TT', 30), + ('GT', 30), + ('GG', 10), + ('CG', 10), + ('CC', 10), + ('CA', 10), + ('AA', 30)] + + args = baseArgs() + max_windows = design_naively.find_guide_in_each_window(guides, len(guides) + 1, args) + min_windows = design_naively.find_guide_in_each_window(guides, len(guides) + 1, args, obj_type='min') + + def test_find_guide_in_each_window_negative(self): + guides = [('TT', 30), + ('GT', 0), + ('GG', -20), + ('CG', 10), + ('CC', 20), + ('CA', -30), + ('AA', -10)] + + args = baseArgs() + max_windows = design_naively.find_guide_in_each_window(guides, len(guides) + 1, args) + min_windows = design_naively.find_guide_in_each_window(guides, len(guides) + 1, args, obj_type='min') + + +def baseArgs(): + args = Namespace() + + args.window_size = 6 + args.guide_length = 2 + args.skip_gaps = 0.5 + args.consensus = False + args.mode = False + args.diversity = None + args.guide_mismatches = 0 + args.allow_gu_pairs = False + args.best_n = 3 + args.required_flanking_seqs = (None, None) + + return args + From 358222cbeea0d2de96b4e183f6b3049d0512f50f Mon Sep 17 00:00:00 2001 From: priyappillai <34429433+priyappillai@users.noreply.github.com> Date: Thu, 17 Sep 2020 12:03:03 -0400 Subject: [PATCH 4/4] Moved test_design_naively to `bin/tests` Moved test_design_naively to a tests subdirectory of the file it is testing, and added __init__.py file to `bin` so that unittest can find it; since `bin` is now a package, simplified the import of design_naively. --- {tests => bin}/__init__.py | 0 bin/tests/__init__.py | 0 {tests => bin/tests}/test_design_naively.py | 12 +----------- 3 files changed, 1 insertion(+), 11 deletions(-) rename {tests => bin}/__init__.py (100%) create mode 100644 bin/tests/__init__.py rename {tests => bin/tests}/test_design_naively.py (91%) diff --git a/tests/__init__.py b/bin/__init__.py similarity index 100% rename from tests/__init__.py rename to bin/__init__.py diff --git a/bin/tests/__init__.py b/bin/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_design_naively.py b/bin/tests/test_design_naively.py similarity index 91% rename from tests/test_design_naively.py rename to bin/tests/test_design_naively.py index 09d47b5..ee30a65 100644 --- a/tests/test_design_naively.py +++ b/bin/tests/test_design_naively.py @@ -7,17 +7,7 @@ from argparse import Namespace from adapt import alignment -from importlib.machinery import SourceFileLoader -from importlib.util import spec_from_loader, module_from_spec -import os.path -import types - -file_path = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "bin", "design_naively.py")) - -loader = SourceFileLoader("design_naively", file_path) -spec = spec_from_loader(loader.name, loader) -design_naively = module_from_spec(spec) -loader.exec_module(design_naively) +from bin import design_naively __author__ = 'Priya Pillai '