-
Notifications
You must be signed in to change notification settings - Fork 0
/
ERG_Transform.py
292 lines (219 loc) · 8.88 KB
/
ERG_Transform.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
import collections
import os
import collections
from pathlib import Path
import pandas as pd
import os
import optparse
import sys
"""
Algorithm:
Parse and collect data from .txt file of PCP raw data, output a .csv file.
1. Group files based on its name, for example, 1-P.txt should be grouped with 1-S.txt
2. Extract data from a tuple/group of files(use different approach based on S/P file)
3. Merge all extracted data into one big excel/csv file
Usage eg: python PCP_Transform.py -d path/to/your/work/directory -f outputfilename.csv
"""
"""Utility functions"""
def get_file_extension(filename: str):
return Path(filename).suffix.lower()
def get_field_prefix(fileName: str):
return fileName.split("-")[0]
#Get type of a file, e.g it's a 'S' file or a 'T' file
def get_filed_sub(fileName: str) -> str:
return fileName[2:3]
#Function to remove space and tab from a line in the .txt file
def strip_lines(lines):
return [x.strip().split('\t') for x in lines]
#Function to convert tuple/pair to a dict
def tuple_to_dict(list_of_tuple, d):
for (x, y) in list_of_tuple:
d.setdefault(x, []).append(y)
return d
def get_first_five_cols(lines, num_rows: int):
"""
Function to generate information from the file and generate the first five columns(general information of mice),
the result is like the following:
Protocol,Steps,Channels,Animal # ,Study Name
PCP Photopic Adapted Long Protocol06 [14806-D || ECN 1496 || 8 June 2020],5,4,6137-B,OM-243_C2
"""
# Read data in the file
d = collections.defaultdict(list)
protocol_name = lines[14].strip().split('\t')
steps = lines[18].strip().split('\t')
channel = lines[19].strip().split('\t')
animal_number = lines[20].strip().split('\t')
study_name = lines[21].strip().split('\t')
# Make n copy of data read above
d[protocol_name[0]].extend([protocol_name[1]] * num_rows)
d[steps[0]].extend([steps[1]] * num_rows)
d[channel[0]].extend([channel[1]] * num_rows)
d[animal_number[0]].extend([animal_number[1]] * num_rows)
d[study_name[0]].extend([study_name[1]] * num_rows)
df = pd.DataFrame.from_dict(d)
return df
# Function to group files based on their names, e.g 1-P.txt and 1-S.txt will be grouped together because they both
# have '1' as prefix
def organize_files(path: str):
if not os.path.isdir(path):
print(f"{path} is not a directory")
return {}
files = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
file_dict = collections.defaultdict(list)
for file in files:
extention = get_file_extension(file)
if extention != ".txt":
continue
file_key = get_field_prefix(file)
group = file_dict.get(file_key, [])
group.append(file)
file_dict[file_key] = group
return file_dict
# Function to read data from files
def parse_file(filename, file_type, workspace):
os.chdir(workspace)
with open(filename, "r", encoding='utf-8',
errors='ignore') as f:
lines = f.readlines()
columns = lines[26].strip().split()
# Reformat the column name of 'Cage #'
columns.remove('#')
columns[1] = 'Mouse Name'
columns[2] = 'Cage #'
if file_type == "P":
num_rows = len(lines[27:47])
first_five_cols = get_first_five_cols(lines, num_rows)
# get data in marker's table section
temp = strip_lines(lines[27:47])
marker_table_data = pd.DataFrame.from_records(temp, columns=columns)
res = pd.concat([first_five_cols, marker_table_data], axis=1)
return res
elif file_type == "S":
num_rows = len(lines[27:63])
first_five_cols = get_first_five_cols(lines, num_rows)
# get data in marker's table section
temp = strip_lines(lines[27:63])
marker_table_data = pd.DataFrame.from_records(temp, columns=columns)
res = pd.concat([first_five_cols, marker_table_data], axis=1)
return res
def transform_files(file_list, workspace, outputFileName) -> None:
if not file_list:
return []
result = []
for file in file_list:
# Read and aggregate data
if isPfile(file):
df = parse_file(file, "P", workspace)
elif isSfile(file):
df = parse_file(file, "S", workspace)
else:
continue
result.append(df)
# Write data to the file
final_data = pd.concat(result, ignore_index=True)
final_data.drop('Animal # ', axis=1, inplace=True)
final_data.drop('Cage #', axis=1, inplace=True)
final_data.drop('Age', axis=1, inplace=True)
final_data.drop('Comment', axis=1, inplace=True)
final_data.drop('C', axis=1, inplace=True)
final_data.drop('Group', axis=1, inplace=True)
# Change some column names
final_data.rename(columns={'ms':'Latency (ms)', 'uV': 'Amplitude (uV)', \
'R':'Result', 'S':'Stimulation', \
'Name' : 'Waveform', 'Channels':'Total Channels'}, inplace=True)
## Re-sort. Note that dataframes are case sensitive when it comes to sorting.Uppercase comes before lower.
#sorted_data = final_data.sort_values(by=['LOT BARCODE','Protocol', 'Stimulation', 'Waveform','Eye'])
sorted_data = final_data.sort_values(by=['Mouse Name','Protocol', 'Stimulation','Waveform', 'Eye'])
sorted_data.to_csv(outputFileName,sep=',')
def transform(file_groups: dict, workspace, outputFileName) -> None:
if not file_groups:
return []
result = []
for prefix, files in file_groups.items():
files = sorted(files, key=get_filed_sub)
p_file, s_file = files[0], files[1]
# Read and aggregate data
df_1 = parse_file(p_file, "P", workspace)
df_2 = parse_file(s_file, "S", workspace)
df = pd.concat([df_1, df_2], ignore_index=True)
result.append(df)
# Write data to the file
final_data = pd.concat(result, ignore_index=True)
final_data.to_csv(outputFileName,sep=',')
def isPfile(filename):
# If line 49 starts with "Summary Table" then it is a P file
try:
with open(filename, "r", encoding='utf-8',
errors='ignore') as f:
lines = f.readlines()
columns = lines[48].strip().split()
return columns[0] == "Summary" and columns[1] == "Table"
except Exception:
return False
def isSfile(filename):
# If line 65 starts with "Summary Table" then it is an S file
try:
with open(filename, "r", encoding='utf-8',
errors='ignore') as f:
lines = f.readlines()
columns = lines[64].strip().split()
return columns[0] == "Summary" and columns[1] == "Table"
except Exception:
return False
def validateFiles(inputFile1, inputFile2):
try:
f1 = Path(inputFile1)
if f1.exists() == False:
print("First file in list does not exist.")
return False
f2 = Path(inputFile2)
if f2.exists() == False:
print("Second file in list does not exist.")
return False
if isSfile(inputFile1) == False and isPfile(inputFile1) == False:
print("First file has an invalid format")
return False
if isSfile(inputFile2) == False and isPfile(inputFile2) == False:
print("Second file has an invalid format")
return False
except Exception as e:
print(e)
return False
return True
def main():
#Parse the coomand line argument
#print(sys.argv)
"""
parser = optparse.OptionParser()
parser.add_option('-d', dest = 'directory',
type = 'str',
help = 'path to the directory you want to work with')
parser.add_option('-f', dest = 'filename',
type = 'str',
help = 'desired name of your outputfile, be sure to include .csv at the end')
options, args = parser.parse_args()
if (options.directory == None):
print("You must provide a work directory.")
print(parser.usage)
exit(0)
if (options.filename == None):
print("You must provide a output filename.")
print(parser.usage)
exit(0)
workspace = options.directory
outputFileName = options.filename
# Collect data from files
file_groups = organize_files(path=workspace)
transform(file_groups=file_groups, workspace=workspace, outputFileName=outputFileName)
"""
if len(sys.argv) < 3:
print("Usage: inputfile1,inputFile2 outputFile")
print(len(sys.argv))
print(sys.argv)
exit()
#print(sys.argv)
inputFiles = sys.argv[1].split(',')
outputFile = sys.argv[2]
transform_files(inputFiles, '.', outputFile)
if __name__ == "__main__":
main()