forked from EnsemblGSOC/Ensembl-Repeat-Identification
-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
161 lines (129 loc) · 4.63 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# standard library
import gzip
import hashlib
import pathlib
import shutil
import logging
import sys
from typing import Union
# third party
import pandas as pd
import requests
from tqdm import tqdm
data_directory = pathlib.Path("data")
data_directory.mkdir(exist_ok=True)
hits_column_dtypes = {
"seq_name": "string",
"family_acc": "string",
"family_name": "string",
"bits": "float",
"e-value": "float",
"bias": "float",
"hmm-st": "int",
"hmm-en": "int",
"strand": "string",
"ali-st": "int",
"ali-en": "int",
"env-st": "int",
"env-en": "int",
"sq-len": "int",
"kimura_div": "float",
}
def download_file(
source_url: str, file_save_path: Union[pathlib.Path, str], chunk_size: int = 10240
):
"""
Download a file in chunks, show progress bar while downloading.
Args:
source_url: URL of the file to be downloaded
file_save_path: path to save the downloaded file
chunk_size: chunk size in bytes, defaults to 10 kibibytes
"""
if not isinstance(file_save_path, pathlib.Path):
file_save_path = pathlib.Path(file_save_path)
response = requests.get(source_url, stream=True)
response.raise_for_status()
file_size = int(response.headers.get("content-length", 0))
with open(file_save_path, "wb+") as file, tqdm(
desc=file_save_path.name,
total=file_size,
unit="iB",
unit_scale=True,
) as progress_bar:
for stream_data in response.iter_content(chunk_size=chunk_size):
current_size = file.write(stream_data)
progress_bar.update(current_size)
def download_and_extract(
target_directory: str, extracted_filename: str, source_url: str, checksum: str
):
"""Check if the file already exists, and if not, download, verify data integrity,
and extract.
Args:
target_directory: path to the directory to download and extract the file
extracted_filename: name of the extracted file
source_url: URL of the file to be downloaded
checksum: MD5 hash of the file
"""
if not isinstance(target_directory, pathlib.Path):
target_directory = pathlib.Path(target_directory)
extracted_file_path = target_directory / extracted_filename
if not extracted_file_path.is_file():
compressed_file_path = target_directory / f"{extracted_filename}.gz"
# check if the compressed file exists and verify its data integrity
if not compressed_file_path.is_file() or not check_file_integrity(
compressed_file_path, checksum
):
download_file(source_url, compressed_file_path)
un_gz(compressed_file_path, extracted_file_path)
def check_file_integrity(file_path: str, checksum: str) -> bool:
"""Check the data integrity of a file, returns False if the file is corrupted
to download again.
Args:
file_path: file path
checksum: MD5 hash of the file
"""
print("Checking file integrity \U0001FAF6\U0001F913")
content_sum = hashlib.md5(open(file_path, "rb").read()).hexdigest()
# check the fasta.gz size
res = checksum == content_sum
if not res:
print("Oops \U0001F928, the file have been attack by Iron Man")
return res
def un_gz(zipped: str, unzipped: str):
"""unzip the gz files.
Args:
unzipped - the unzipped name of file
e.g. hg38.fa
zipped - the zipped name of file
e.g. hg38.gz.fa
"""
print("Unziping... \U0001F600\U0001F63C\U0001F9B2\U0001F349\U0001F34A")
with gzip.open(zipped, "rb") as f_in:
with open(unzipped, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
def hits_to_dataframe(hits_path: Union[pathlib.Path, str]) -> pd.DataFrame:
"""Read an annotation *.hits file to a pandas DataFrame.
Args:
hits_path: *.hits file path
"""
columns = hits_column_dtypes.keys()
hits = pd.read_csv(hits_path, sep="\t", names=columns)
# drop last row (contains the CSV header, i.e. column names)
hits.drop(hits.tail(1).index, inplace=True)
hits = hits.astype(hits_column_dtypes)
return hits
# logging formats
logging_formatter_time_message = logging.Formatter(
fmt="%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logging_formatter_message = logging.Formatter(fmt="%(message)s")
# set up base logger
logger = logging.getLogger("main_logger")
logger.setLevel(logging.DEBUG)
logger.propagate = False
# create console handler and add to logger
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(logging_formatter_time_message)
logger.addHandler(console_handler)