-
Notifications
You must be signed in to change notification settings - Fork 0
/
bam_split.py
109 lines (92 loc) · 3.54 KB
/
bam_split.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import os
import sys
import pickle
import subprocess
import numpy as np
def format_command(job_name, bam_path, contig, res_dir, err_dir, memory):
idx_cmd = [
"samtools",
"view",
"-h",
"-b",
"-o",
os.path.join(res_dir, f"{job_name}_{contig}.bam"),
bam_path,
contig
]
err_name = os.path.join(err_dir, "partition_%j.out")
cmd = [
"sbatch",
"--mem={0}".format(memory),
"-J",
job_name,
"-o",
err_name,
# "-x", "node03,node06,node07,node11,node13",
"--wrap='{0}'".format(" ".join(idx_cmd))
]
print(" ".join(cmd))
return cmd
def dispatch(bam_map, out_path_base, contigs, memory, selection=None):
# if selection is not None:
# bam_map = {k: v for k, v in bam_map.items() if k in selection}
# print(contigs) ####
jobs = []
for k, v in bam_map.items():
res_dir = os.path.join(out_path_base, k, "raw")
err_dir = os.path.join(out_path_base, k, "status")
os.makedirs(res_dir, exist_ok=True)
os.makedirs(err_dir, exist_ok=True)
for c in contigs:
if selection is None or (k, c) in selection:
cmd = format_command(k, v, c, res_dir, err_dir, memory)
jobs.append(cmd)
# print(" & ".join([" ".join(cmd) for cmd in jobs])) ####
with open("exec.sh", "w") as script_file:
script_file.write("#!/bin/bash\n") ####
script_file.writelines([(" ".join(cmd) + "\n") for cmd in jobs]) ####
subprocess.run("./exec.sh", stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# timeout = "sbatch: error: Batch job submission failed: Socket timed out on send/recv operation"
# for i in jobs:
# while True:
# try:
# submission = subprocess.run(i, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# print(str(submission.stdout, 'utf-8').rstrip())
# break
# except subprocess.CalledProcessError as e:
# # print(e.stdout) ####
# err = str(e.stderr, 'utf-8').rstrip()
# print(err)
# if err == timeout:
# print("Retrying Submit")
# continue
# else:
# raise e
def get_failed_jobs(bam_map, contigs, out_path_base):
res_dir = os.path.join(out_path_base, "raw")
fails = set()
for k in bam_map.keys():
for c in contigs:
if not os.path.isfile(os.path.join(res_dir, f"{k}_{v}.bam")):
fails.add((k, c),)
return fails
if __name__ == '__main__':
# Kellis 429
kellis_path_base = "/agusevlab/awang/sc_kellis"
bam_path_kellis = os.path.join(kellis_path_base, "PFC_bam_files")
bam_map_kellis_429 = {}
with open(os.path.join(kellis_path_base, "Bam_paths_432_PFC_HM_Austin.csv")) as bam_data:
next(bam_data)
for line in bam_data:
cols = line.strip().split(",")
bam_map_kellis_429[cols[1]] = os.path.join(bam_path_kellis, cols[2].lstrip("/"), cols[3].lstrip("/"))
out_path_base_kellis_429 = os.path.join(kellis_path_base, "partitioned_429")
contigs = [str(i) for i in range(1, 23)]
# print(bam_map_kellis_429) ####
dispatch(
bam_map_kellis_429, out_path_base_kellis_429, contigs, 5000
)
# fail_kellis_429 = get_failed_jobs(bam_map_kellis_429, contigs, out_path_base_kellis_429)
# dispatch_star(
# bam_map_kellis_429, out_path_base_kellis_429, 260000, selection=fail_kellis_429
# )