-
Notifications
You must be signed in to change notification settings - Fork 4
/
workers.py
executable file
·64 lines (56 loc) · 2.4 KB
/
workers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import time
import pandas as pd
from copy import deepcopy
from multiprocessing import Queue
from subgroup import Subgroup
def create_subgroups(subgroup: Subgroup, column: str, queue: Queue, settings: dict):
if column in subgroup.description:
return
data = subgroup.data
values = list(data[column].unique())
if len(values) == 1: # No need to make a split for a single value
return
if column in settings['object_cols'] or len(values) < settings['n_bins']:
while len(values) > 0:
if queue.qsize() < 10: # Reasonable size to keep in the beam
value = values.pop(0)
subset = data[data[column] == value]
queue.put(Subgroup(subset, deepcopy(subgroup.description).extend(column, value)))
else:
time.sleep(.1) # Else try again in a .1 second
else: # Float or Int
if settings['bin_strategy'] == 'equidepth':
_, intervals = pd.qcut(data[column].tolist(), q=min(settings['n_bins'], len(values)),
duplicates='drop', retbins=True)
else:
raise ValueError(f"Invalid bin strategy `{settings['strategy']}`")
intervals = list(intervals)
lower_bound = intervals.pop(0)
while len(intervals) > 0:
if queue.qsize() < 10:
upper_bound = intervals.pop(0)
subset = data[(data[column] > lower_bound) & (data[column] <= upper_bound)]
queue.put(Subgroup(subset, deepcopy(subgroup.description).extend(column, [lower_bound, upper_bound])))
lower_bound = upper_bound
else:
time.sleep(.1) # Else try again in a .1 second
def evaluate_subgroups(queue_from, queue_to, target_columns, dataset_target, score):
while True:
item = queue_from.get()
if item == 'done':
queue_to.put('done')
break
if len(item.data[target_columns]) == 0:
continue
subgroup_target = item.data[target_columns]
item.score, item.target = score(subgroup_target, dataset_target)
item.print()
queue_to.put(item)
def beam_adder(queue, beam, n_jobs):
workers = n_jobs
while workers > 0:
item = queue.get()
if item == 'done':
workers -= 1
continue
beam.add(item)