-
Notifications
You must be signed in to change notification settings - Fork 5
/
generator.py
97 lines (84 loc) · 3.79 KB
/
generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import numpy as np
import os
import pandas as pd
from tensorflow.keras.utils import Sequence
from PIL import Image
from skimage.transform import resize
class AugmentedImageSequence(Sequence):
"""
Thread-safe image generator with imgaug support
For more information of imgaug see: https://github.com/aleju/imgaug
"""
def __init__(self, dataset_csv_file, class_names, source_image_dir,tokenizer_wrapper, batch_size=16,
target_size=(224, 224), augmenter=None, verbose=0, steps=None,
shuffle_on_epoch_end=True, random_state=1):
"""
:param dataset_csv_file: str, path of dataset csv file
:param class_names: list of str
:param batch_size: int
:param target_size: tuple(int, int)
:param augmenter: imgaug object. Do not specify resize in augmenter.
It will be done automatically according to input_shape of the model.
:param verbose: int
"""
self.dataset_df = pd.read_csv(dataset_csv_file)
self.source_image_dir = source_image_dir
self.batch_size = batch_size
self.target_size = target_size
self.augmenter = augmenter
self.tokenizer_wrapper = tokenizer_wrapper
self.verbose = verbose
self.shuffle = shuffle_on_epoch_end
self.random_state = random_state
self.class_names = class_names
self.prepare_dataset()
if steps is None:
self.steps = int(np.ceil(len(self.x_path) / float(self.batch_size)))
else:
self.steps = int(steps)
def __bool__(self):
return True
def __len__(self):
return self.steps
def __getitem__(self, idx):
batch_x_path = self.x_path[idx * self.batch_size:(idx + 1) * self.batch_size]
batch_x = np.asarray([self.load_image(x_path) for x_path in batch_x_path])
batch_x = self.transform_batch_images(batch_x)
batch_y = self.y[idx * self.batch_size:(idx + 1) * self.batch_size]
if batch_x.shape[0] < self.batch_size:
remaining = self.batch_size - batch_x.shape[0]
batch_x = np.concatenate((batch_x,batch_x[0:remaining]), axis = 0)
batch_x_path = np.concatenate((batch_x_path,batch_x_path[0:remaining]), axis = 0)
batch_y = np.concatenate((batch_y,batch_y[0:remaining]), axis = 0)
return batch_x, batch_y, batch_x_path
def load_image(self, image_file):
image_path = os.path.join(self.source_image_dir, image_file)
image = Image.open(image_path)
image_array = np.asarray(image.convert("RGB"))
image_array = image_array / 255.
image_array = resize(image_array, self.target_size)
return image_array
def transform_batch_images(self, batch_x):
if self.augmenter is not None:
batch_x = self.augmenter.augment_images(batch_x)
imagenet_mean = np.array([0.485, 0.456, 0.406])
imagenet_std = np.array([0.229, 0.224, 0.225])
batch_x = (batch_x - imagenet_mean) / imagenet_std
return batch_x
def get_y_true(self):
"""
Use this function to get y_true for predict_generator
In order to get correct y, you have to set shuffle_on_epoch_end=False.
"""
if self.shuffle:
raise ValueError("""
You're trying run get_y_true() when generator option 'shuffle_on_epoch_end' is True.
""")
return self.y[:self.steps * self.batch_size, :]
def prepare_dataset(self):
df = self.dataset_df.sample(frac=1., random_state=self.random_state)
self.x_path, self.y = df["Image Index"].values, self.tokenizer_wrapper.tokenize_sentences(df[self.class_names].values)
def on_epoch_end(self):
if self.shuffle:
self.random_state += 1
self.prepare_dataset()