-
Notifications
You must be signed in to change notification settings - Fork 0
/
Código
163 lines (143 loc) · 4.86 KB
/
Código
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
import re
pipeline_options = PipelineOptions(argv=None)
pipeline = beam.Pipeline(options=pipeline_options)
colunas_dengue = [
'id',
'data_iniSE',
'casos',
'ibge_code',
'cidade',
'uf',
'cep',
'latitude',
'longitude']
def texto_para_lista(elemento, delimitador='|'):
"""
Recebe um texto e um delimitador
Retorna uma lista com os elementos separados pelo delimitador
"""
return elemento.split(delimitador)
def lista_para_dicionario(colunas, elemento):
"""
Recebe duas listas
Retorna um dicionário
"""
return dict(zip(elemento, colunas))
def trata_datas(elemento):
"""
Recebe uma data em string
Quebra em uma list com apenas mês e ano
Retorna uma string com mês e ano
"""
elemento['ano_mes'] = '-'.join(elemento['data_iniSE'].split('-')[:2])
return elemento
def chave_uf(elemento):
"""
Entra com a chave (uf)
Retorna uma tupla com o estado e o elemento (UF, valor)
"""
chave = elemento['uf']
return (chave, elemento)
def retorna_data_e_estado(elemento):
"""
Recebe uma lista
Retorna uma tupla por registro no formato ('uf-YY-mm')
"""
uf, registros = elemento
for registro in registros:
if bool(re.search(r'\d', registro['casos'])):
yield (f"{uf}-{registro['ano_mes']}", float(registro['casos']))
def texto_para_lista_chuva(elemento, delimitador=','):
"""
Recebe um texto e um delimitador
Retorna uma lista com os elementos separados pelo delimitador
"""
return elemento.split(delimitador)
def chave_uf_ano_mes_de_lista(elemento):
"""
Recebe uma lista
Retorna uma tupla contendo do tipo ('uf-ano-mes', 5.6)
"""
data, mm, uf = elemento
ano_mes = '-'.join(data.split('-')[:2])
chave = f'{uf}-{ano_mes}'
mm = float(mm)
if mm < 0.0:
mm = 0.0
return chave, mm
def arredonda(elemento):
"""
Arredonda o valor de precipitação para apenas
uma casa decimal
"""
chave, mm = elemento
mm = round(mm, 1)
return chave, mm
def filtra_campos_vazios(elemento):
"""
Recebe uma lista
Retorna a lista apenas com as linhas que possuem valores não nulos
"""
chave, dicionario = elemento
if all([
dicionario['chuvas'],
dicionario['dengue']]
):
return True
return False
def desagrupamento(elemento):
"""
Recebe lista com string (data e uf) e dicionário (casos e precipitação)
Retorna tupla do tipo (uf, ano, mes, precipitação, casos)
"""
chave, valor = elemento
uf, ano, mes = chave.split('-')
chuva = valor['chuvas'][0]
dengue = valor['dengue'][0]
return uf, ano, mes, str(chuva), str(dengue)
def preparacao_csv(elemento, separador=';'):
"""
Recebe uma tupla
Retorna string com ponto e vírgula entre os termos
"""
return f'{separador}'.join(elemento)
dengue = (
pipeline
| "Leitura dos casos de dengue" >>
ReadFromText('sample_casos_dengue.txt', skip_header_lines=1)
| "De texto para lista casos" >> beam.Map(texto_para_lista)
| "Lista para dicionário" >> beam.Map(lista_para_dicionario, colunas_dengue)
| "Criar campo ano_mes" >> beam.Map(trata_datas)
| "Criar chave pelo estado" >> beam.Map(chave_uf)
| "Agrupar pelo estado" >> beam.GroupByKey()
| "descompactar casos de dengue" >> beam.FlatMap(retorna_data_e_estado)
| "Somas dos casos pela chave" >> beam.CombinePerKey(sum)
#| "Imprime" >> beam.Map(print)
)
chuvas = (
pipeline
| "Leitura das chuvas" >>
ReadFromText('sample_chuvas.csv', skip_header_lines=1)
| "De texto para lista chuvas" >> beam.Map(texto_para_lista_chuva)
| "Lista para tupla de chuva sem negativos" >> beam.Map(chave_uf_ano_mes_de_lista)
| "Somas das precipitações pela chave" >> beam.CombinePerKey(sum)
| "Arredonda os valores de precipitação" >> beam.Map(arredonda)
#| "Imprime" >> beam.Map(print)
)
resultado = (
({'chuvas': chuvas, 'dengue': dengue})
# | "Empilha as colunas" >> beam.Flatten()
# | "Agrupa as p collections" >> beam.GroupByKey()
| "Une e agrupa os p collections por chave" >> beam.CoGroupByKey()
| "Filtra os campos com valores faltantes" >> beam.Filter(filtra_campos_vazios)
| "Descompacta, converte para numérico" >> beam.Map(desagrupamento)
| "Prepara para tranformar em CSV" >> beam.Map(preparacao_csv)
| "Imprime o conjunto unido" >> beam.Map(print)
)
header = 'UF, ANO, MES, CHUVA, DENGUE'
resultado | "Salva os dados em um arquivo CSV" >> WriteToText('resultado', file_name_suffix='.csv', header=header)
pipeline.run()