Skip to content

Commit

Permalink
Expand pollen_data_gen simple (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
anshumanmohan authored Jun 10, 2023
1 parent 62b336a commit de4f8db
Show file tree
Hide file tree
Showing 9 changed files with 274 additions and 48 deletions.
52 changes: 42 additions & 10 deletions mygfa/mygfa/mygfa.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,17 @@ class Segment:
name: str
seq: Strand

@classmethod
def parse_inner(cls, name: str, seq: str) -> "Segment":
"""Parse a GFA segment, assuming that the name and sequence
have already been extracted."""
return Segment(name, Strand.parse(seq))

@classmethod
def parse(cls, fields: List[str]) -> "Segment":
"""Parse a GFA segment."""
_, name, seq = fields[:3]
return Segment(name, Strand.parse(seq))
return cls.parse_inner(name, seq)

def revcomp(self) -> "Segment":
"""Returns the reverse complement of this segment."""
Expand All @@ -85,7 +91,7 @@ def __str__(self) -> str:
[
"S",
self.name,
self.seq,
str(self.seq),
]
)

Expand Down Expand Up @@ -152,15 +158,24 @@ class Link:
overlap: Alignment

@classmethod
def parse(cls, fields: List[str]) -> "Link":
"""Parse a GFA link."""
_, from_, from_ori, to_, to_ori, overlap = fields[:6]
def parse_inner(
cls, from_: str, from_ori: str, to_: str, to_ori: str, overlap: str
) -> "Link":
"""Parse a GFA link, assuming that the key elements have
already been extracted.
"""
return Link(
Handle.parse(from_, from_ori),
Handle.parse(to_, to_ori),
Alignment.parse(overlap),
)

@classmethod
def parse(cls, fields: List[str]) -> "Link":
"""Parse a GFA link."""
_, from_, from_ori, to_, to_ori, overlap = fields[:6]
return cls.parse_inner(from_, from_ori, to_, to_ori, overlap)

def rev(self) -> "Link":
"""Return the link representing the reverse of this link.
i.e, `AAAA --> GGGG` becomes `TTTT <-- CCCC`
Expand All @@ -187,9 +202,10 @@ class Path:
olaps: Optional[List[Alignment]]

@classmethod
def parse(cls, fields: List[str]) -> "Path":
"""Parse a GFA path."""
_, name, seq, overlaps = fields[:4]
def parse_inner(cls, name: str, seq: str, overlaps: str) -> "Path":
"""Parse a GFA path, assuming that
the name, sequence and overlaps have already been extracted."""

seq_lst = [Handle.parse(s[:-1], s[-1]) for s in seq.split(",")]
olaps_lst = (
None
Expand All @@ -207,6 +223,13 @@ def parse(cls, fields: List[str]) -> "Path":
olaps_lst,
)

@classmethod
def parse(cls, fields: List[str]) -> "Path":
"""Parse a GFA path.
Extract the name, seq, and overlaps, and dispatch to the helper above."""
_, name, seq, overlaps = fields[:4]
return cls.parse_inner(name, seq, overlaps)

def drop_overlaps(self) -> "Path":
"""Return a copy of this path without overlaps."""
return Path(self.name, self.segments, None)
Expand All @@ -230,11 +253,20 @@ def nonblanks(file: TextIO) -> Iterator[str]:
yield line


class Header(str):
"""A GFA header."""

@classmethod
def parse(cls, line: str) -> "Header":
"""Parse a GFA header."""
return Header(line)


@dataclass
class Graph:
"""An entire GFA file."""

headers: List[str]
headers: List[Header]
segments: Dict[str, Segment]
links: List[Link]
paths: Dict[str, Path]
Expand All @@ -247,7 +279,7 @@ def parse(cls, infile: TextIO) -> "Graph":
for line in nonblanks(infile):
fields = line.split()
if fields[0] == "H":
graph.headers.append(line) # Parse headers verbatim.
graph.headers.append(Header.parse(line))
elif fields[0] == "S":
segment = Segment.parse(fields)
graph.segments[segment.name] = segment
Expand Down
2 changes: 1 addition & 1 deletion mygfa/mygfa/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def pathseq(graph: mygfa.Graph) -> Dict[str, str]:
ans: Dict[str, str] = {}
for path in graph.paths.keys():
ans[path] = "".join(
handle_seq(graph, handle) for handle in graph.paths[path].segments
str(handle_seq(graph, handle)) for handle in graph.paths[path].segments
)
return ans

Expand Down
36 changes: 33 additions & 3 deletions pollen_data_gen/pollen_data_gen/__main__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import sys
import argparse
from mygfa import mygfa

Expand All @@ -12,7 +13,35 @@ def parse_args() -> tuple[argparse.ArgumentParser, argparse.Namespace]:
title="pollen-data-gen commands", metavar="COMMAND", dest="command"
)

_ = subparsers.add_parser("simple", help="Produces a simple JSON of the graph.")
simple_parser = subparsers.add_parser(
"simple", help="Produces a simple JSON serialization of the graph."
)
simple_parser.add_argument(
"-n",
nargs="?",
const="d",
help="The max number of nodes.",
required=False,
)
simple_parser.add_argument(
"-e",
nargs="?",
const="d",
help="The max number of steps per node.",
required=False,
)
simple_parser.add_argument(
"-p",
nargs="?",
const="d",
help="The max number of paths.",
required=False,
)

_ = subparsers.add_parser(
"roundtrip",
help="Checks that we can serialize the deserilize the graph losslessly.",
)

depth_parser = subparsers.add_parser(
"depth", help="Produces a `depth`-specific JSON of the graph."
Expand Down Expand Up @@ -57,8 +86,9 @@ def dispatch(args: argparse.Namespace) -> None:
then dispatch to the appropriate pollen_data_gen command.
"""
name_to_func = {
"depth": lambda g: depth.depth(g, args.n, args.e, args.p),
"simple": simple.simple,
"depth": lambda g: depth.depth_stdout(g, args.n, args.e, args.p),
"simple": lambda g: simple.dump(g, sys.stdout, args.n, args.e, args.p),
"roundtrip": simple.roundtrip_test,
}
graph = mygfa.Graph.parse(open(args.graph, "r", encoding="utf-8"))
name_to_func[args.command](graph)
Expand Down
34 changes: 24 additions & 10 deletions pollen_data_gen/pollen_data_gen/depth.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys
from typing import Any, Collection, Dict, Union, Optional
import json
from typing import Any, Collection, Dict, Union
from json import JSONEncoder
from mygfa import mygfa, preprocess

Expand Down Expand Up @@ -59,7 +60,7 @@ def __init__(self, max_n: int, max_e: int, max_p: int, **kwargs: Any) -> None:
self.max_e = max_e
self.max_p = max_p

def default(self, o: Any) -> None:
def default(self, o: Any) -> Dict[str, Dict[str, Collection[object]]]:
answer_field = {
"depth_output": {
"data": list([0] * self.max_n),
Expand All @@ -75,15 +76,14 @@ def default(self, o: Any) -> None:
paths = paths_viewed_from_nodes(
o, self.max_n, self.max_e, self.max_p
) | paths_to_consider(self.max_n, self.max_p)
print(
json.dumps(
answer_field | paths | answer_field_uniq, indent=2, sort_keys=True
)
)

return answer_field | paths | answer_field_uniq

def depth(graph: mygfa.Graph, max_n: int, max_e: int, max_p: int) -> None:
"""Prints a JSON representation of `graph`

def depth_json(
graph: mygfa.Graph, max_n: Optional[int], max_e: Optional[int], max_p: Optional[int]
) -> str:
"""Returns a JSON representation of `graph`
that is specific to the exine command `depth`.
"""
n_tight, e_tight, p_tight = preprocess.get_maxes(graph)
Expand All @@ -96,4 +96,18 @@ def depth(graph: mygfa.Graph, max_n: int, max_e: int, max_p: int) -> None:
if not max_p:
max_p = p_tight

NodeDepthEncoder(max_n=int(max_n), max_e=int(max_e), max_p=int(max_p)).encode(graph)
return NodeDepthEncoder(
max_n=int(max_n), max_e=int(max_e), max_p=int(max_p)
).encode(graph)


def depth_stdout(graph: mygfa.Graph, max_n: int, max_e: int, max_p: int) -> None:
"""Prints a JSON representation of `graph` to stdout."""
encoding = depth_json(graph, max_n, max_e, max_p)

json.dump(
json.loads(encoding),
sys.stdout,
indent=2,
sort_keys=True,
)
Loading

0 comments on commit de4f8db

Please sign in to comment.