-
Notifications
You must be signed in to change notification settings - Fork 0
/
gmmall.py
384 lines (316 loc) · 12.5 KB
/
gmmall.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
# -*- coding: utf-8 -*-
"""gmmAll.ipynb
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1HJP3xWZ3_DiltbkQiS7P_Fd_jWNpl3ve
"""
import pandas as pd
import librosa
import glob
import numpy as np
from joblib import dump, load
from sklearn.mixture import GaussianMixture
from sklearn.metrics import roc_curve
class voice_processor():
"""
provides functions for loading wav files and extracting MFCCs:
getMFCCsDF: method
Loads the audio in the filename and extracts the MFCCs of the audio.
getMFCCsDF_batch: method
Loads all audio in a folder and extracts the MFCCs of each audio.
"""
def __init__(self):
"""Inits voice processor
"""
self.author='Ching Pui WAN'
self.author_contact='cpwan@ust.hk'
def getMFCCsDF(self,filename,speakerId):
""" Loads the audio in the filename and extracts the MFCCs of the audio.
The MFCCs is returned as a dataframe.
Args:
filename: string
The path to the audio.
For example, 'wav/id10277/tbh20gz_KRA/00007.wav'
speakerId: string
The speaker id to be included in the output dataframe.
For example, 'id10277'
Returns:
A dataframe of the MFCCs with the metadata. Each row of the dataframe
corresponds to a speech segment, and consists of 20 MFCCs, the filename,
and the speaker id w.r.t to the audio. For example:
columns: 0,1,2,...18,19,filename,speakerId
"""
y, sr = librosa.load(filename)
mfccs=librosa.feature.mfcc(y,sr)
tempDF=pd.DataFrame(mfccs.T)
tempDF['filename']=filename
tempDF['speakerId']=speakerId
out=tempDF
return out
def getMFCCsDF_batch(self,folder,speakerId):
""" Loads all audio in a folder and extracts the MFCCs of each audio.
The MFCCs is concatenated as a dataframe.
Args:
folder: string
The path to the folder containing subfolder of audios.
For example, folder='wav/id10270'. Then all the wav files in
'wav/id10270/**/' will be loaded
speakerId: string
The speaker id to be included in the output dataframe.
For example, 'id10270'
Returns:
A dataframe of the MFCCs with the metadata. Each row of the dataframe
corresponds to a speech segment, and consists of 20 MFCCs, the filename,
and the speaker id w.r.t to the audio. For example:
columns: 0,1,2,...18,19,filename,speakerId
"""
DF=pd.DataFrame([])
print('Processing speaker %s'%speakerId)
files = glob.glob('%s/**/*.wav'%folder, recursive=True)
count=0
for filename in files:
if count%50==0:
print('Extracted %d / %d files'%(count,len(files)))
DF=DF.append(self.getMFCCsDF(filename,speakerId))
count=count+1
print('Finished extracting features for %s'%speakerId)
DF.reset_index(drop=True,inplace=True)
out=DF
return out
def getFeature_batch(self,folder,speakerId):
out=self.getMFCCsDF_batch(folder,speakerId)
return out
class GMM_UBM():
"""Summary of class here.
Longer class information....
Longer class information....
Attributes:
speaker_Ids:
A list of strings to index the speaker
audio_folders:
A dict, in the form of {speaker_id: 'path/to/folder'}
feature_dict:
A dict of dataframe w.r.t. each speaker,
in the form of {speaker_id: dataframe}
processor:
A class where the feature extraction methods are.
feature_out_format:
A string to indicate the path to store the extracted features.For
example, feature_out_format='feature_%s.joblib', for which %s would
correspond to the speaker id.
"""
def __init__(self):
"""Inits GMM_UBM.
"""
self.path=''
self.speaker_Ids=[]
self.train_speaker_Ids=[]
self.test_speaker_Ids=[]
self.audio_folders={}
self.feature_dict={}
self.processor=voice_processor()
self.feature_out_format='feature/feature_%s.joblib'
self.ubm=None
self.ubm_out_name='ubm/ubm.joblib'
self.speaker_models={}
self.speaker_model_out_format='speaker_model/speaker_model_%s.joblib'
def init_ubm(self,n_components=64):
self.ubm=GaussianMixture(n_components=n_components,covariance_type='diag')
print('Initialized the Gaussian Mixture Model for UBM with %d components'%n_components)
def process(self):
"""
Extracts features for each speaker.
This method extracts all the wavs file in self.audio_folders for each
speaker using the method in self.processor. The features are then dumped
to disk with the filename specified by self.feature_out_format.
Remarks: this method overwrites the exisiting files.
"""
speaker_Ids=self.speaker_Ids
folders=self.audio_folders
feature_out_format=self.feature_out_format
for speaker_Id in speaker_Ids:
folder= folders[speaker_Id]
df=self.processor.getFeature_batch(folder,speaker_Id)
#save the feature dataframe in memory
self.feature_dict[speaker_Id]=df
#save the features to disk
output_name=feature_out_format%speaker_Id
dump(df,output_name)
print('Saved feature to %s.'%output_name)
def process_new(self):
"""
Extracts features for each speaker.
This method extracts all the wavs file in self.audio_folders for each
speaker using the method in self.processor. The features are then dumped
to disk with the filename specified by self.feature_out_format.
"""
speaker_Ids=self.speaker_Ids
folders=self.audio_folders
feature_out_format=self.feature_out_format
feature_dict=self.feature_dict
for speaker_Id in speaker_Ids:
if speaker_Id in feature_dict:
print('Speaker %s already processed. Check feature_dict.'%speaker_Id)
continue
folder= folders[speaker_Id]
df=self.processor.getFeature_batch(folder,speaker_Id)
#save the feature dataframe in memory
self.feature_dict[speaker_Id]=df
#save the features to disk
output_name=feature_out_format%speaker_Id
dump(df,output_name)
print('Saved feature to %s.'%output_name)
def load_feature_from_disk(self):
"""
Loads features for each speaker from disk.
This method loads the features for all speaker in the speaker_Ids from
the path specified by self.feature_out_format.
"""
feature_out_format=self.feature_out_format
speaker_Ids=self.speaker_Ids
for speaker_Id in speaker_Ids:
try:
df=load(feature_out_format%speaker_Id)
self.feature_dict[speaker_Id]=df
except:
print('Feature of Speaker %s not found in disk.' %speaker_Id)
def get_balanced_data(self,X,y):
"""
Oversamples the minority to balance the classes.
Dependency: imbalanced-learn
"""
from imblearn.over_sampling import RandomOverSampler
ros = RandomOverSampler(random_state=0)
X_resampled, y_resampled = ros.fit_resample(X, y)
return X_resampled, y_resampled
def train_ubm(self):
"""
Trains the gaussian mixture model for the universal background model by
concatenating the features of each speech segments.
"""
train_speaker_Ids=self.train_speaker_Ids
feature_dict=self.feature_dict
#prepare the dataframe feeding to gmm
listOfDF=[feature_dict[speaker_Id] for speaker_Id in train_speaker_Ids]
DF=pd.concat(listOfDF)
DF.reset_index(drop=True,inplace=True)
X=DF[[i for i in DF.columns if i not in ['filename','speakerId']]]
y=DF['speakerId']
X,y=self.get_balanced_data(X,y)
if self.ubm==None:
self.init_ubm()
self.ubm.fit(X)
def save_ubm_to_disk(self):
ubm=self.ubm
ubm_out_name=self.ubm_out_name
dump(ubm, ubm_out_name)
def load_ubm_from_disk(self):
ubm_out_name=self.ubm_out_name
ubm=load(ubm_out_name)
self.ubm=ubm
def enroll_speaker(self,speakerId):
"""
Enrolls the speaker by building the speaker model.
1.) Must run self.process or self.process_new in advance to extract
feature and store feature in self.feature_dict.
2.) Must train the ubm in advance. Run self.train_ubm or self.load_ubm_from_disk.
"""
feature_dict=self.feature_dict
ubm=self.ubm
n_components=ubm.n_components
weights=ubm.weights_
means=ubm.means_
precisions=ubm.precisions_#this is the inverse of the covariance matrix
if speakerId in feature_dict:
gmm=GaussianMixture(n_components=n_components,
covariance_type='diag',
weights_init=weights,
means_init=means,
precisions_init=precisions)
DF=feature_dict[speakerId]
X=DF[[i for i in DF.columns if i not in ['filename','speakerId']]]
gmm.fit(X.values)
self.speaker_models[speakerId]=gmm
else:
print('Please first extract the feature of speaker %s using process() or process_new()'%speakerId)
def save_speaker_model(self):
speaker_models=self.speaker_models
speaker_model_out_format=self.speaker_model_out_format
for (speakerId,model) in speaker_models.items():
dump(model,speaker_model_out_format%speakerId)
def load_speaker_model(self,speakerId):
speaker_model_out_format=self.speaker_model_out_format
out=None
try:
out=load(speaker_model_out_format%speakerId)
self.speaker_models[speakerId]=out
except:
print('Speaker model not found in disk')
finally:
return out
def score_pairwise(self,speakerId,speakerId_test):
"""
Tests if the speech in speakerId_test is spoken by speakerId.
Args:
speakerId: string
The speaker to be tested on.
speakerId_test: string
Speaker id w.r.t the test set.
Returns:
A dataframe of the mean log likelihood ratio score of test sample w.r.t the
speaker specified by speakerId. It has the following coloums:
filename, speakerId, score
"""
if speakerId_test not in self.feature_dict:
raise KeyError('Feature of speaker %s not initialized'%speakerId_test)
if speakerId not in self.speaker_models:
raise KeyError('Speaker model for speaker %s not initialized'%speakerId)
testDF=self.feature_dict[speakerId_test]
speaker_model=self.speaker_models[speakerId]
ubm=self.ubm
X=testDF[[i for i in testDF.columns if i not in ['filename','speakerId']]]
metaData=testDF[['filename','speakerId']]
scoreDF=metaData.copy()
#gmm.score_smaples give the log likelihood
scoreDF['score']=speaker_model.score_samples(X)-ubm.score_samples(X)
out=scoreDF.groupby(['filename','speakerId']).mean()
return out
def test_wrt_speaker(self,speakerId,positive_speaker_id,negative_speaker_ids):
"""
Tests the performance of the verification system.
W.r.t the speaker specified by speakerId, evaluates the log likelihood ratio
for positive and negative samples.
Args:
speakerId: string
The speaker to be tested on.
positive_speaker_id: string
Speaker id w.r.t the positive test set of the same speaker.
negative_speaker_ids: a list of string
Speaker ids w.r.t the negative test set containing speeces of other speakers.
Returns:
A tuple of (DF,fpr,fnr,eer),
DF is a dataframe of the log likelihood ratio score for each audio file,
it has columns: filename, speakerId, score, y_true. 'y_true' indicates
whether the audio is positive or negative sample.
fpr,fnr are the false postive rate and false negative rate under different
threshold of log likelihood ratio score.
eer is the equal error rate.
"""
score=self.score_pairwise
listDF=[]
pos_df=score(speakerId,positive_speaker_id)
pos_df['y_true']=np.ones(len(pos_df))
listDF.append(pos_df)
for neg_speaker_id in negative_speaker_ids:
neg_df=score(speakerId,neg_speaker_id)
neg_df['y_true']=np.zeros(len(neg_df))
listDF.append(neg_df)
DF=pd.concat(listDF)
DF.reset_index(drop=True,inplace=True)
y_true=DF['y_true']
y_score=DF['score']
fpr, tpr, threshold = roc_curve(y_true, y_score, pos_label=1)
fnr = 1 - tpr
eer=(fpr[np.argmin(np.abs(fpr-fnr))]+fnr[np.argmin(np.abs(fpr-fnr))])/2
print('EER for speaker %s = %.4f'%(speakerId,eer))
return DF,fpr,fnr,eer