-
Notifications
You must be signed in to change notification settings - Fork 10
/
event_count.py
53 lines (37 loc) · 1.61 KB
/
event_count.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import json, sys, ConfigParser
from operator import add
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.storagelevel import StorageLevel
from pyspark.sql import SQLContext
conf = (SparkConf().setMaster("yarn-client").setAppName("Count Event Word 20 Sec"))
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc,20)
Config = ConfigParser.ConfigParser()
Config.read('particlespark.conf')
kafka_broker = Config.get('Kafka','KafkaBrokers')
kudu_master = Config.get('Kudu','KuduMaster')
kudu_table = "particle_counts_last_20"
topic = "particle"
def getSqlContextInstance(sparkContext):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(sc)
return globals()['sqlContextSingletonInstance']
def insert_into_kudu(time,rdd):
sqc = getSqlContextInstance(rdd.context)
kudu_df = rdd.toDF(['event_word','count'])
kudu_df.show()
kudu_df.write.format('org.apache.kudu.spark.kudu').option('kudu.master',kudu_master).option('kudu.table',kudu_table).mode("append").save()
def get_event(payload):
if 'event' not in payload:
print str(payload)
else:
return payload["event"]
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": kafka_broker})
events_kafka_stream = kafkaStream.map(lambda x: get_event(json.loads(x[1])))
counts = events_kafka_stream.flatMap(lambda x: x.split(' ')).map(lambda x: (x,1)).reduceByKey(add)
counts.foreachRDD(insert_into_kudu)
ssc.start()
ssc.awaitTermination()