-
Notifications
You must be signed in to change notification settings - Fork 15
/
iothub_to_cosmosdb.py
68 lines (48 loc) · 1.96 KB
/
iothub_to_cosmosdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# Databricks notebook source
# MAGIC %md # Azure IoT Hub -> Azure Cosmos DB
# MAGIC This notebook demonstrates reading IoT events from an Azure IoT Hub and writes these raw events into an Azure Cosmos DB Collection.
# MAGIC In order to run this notebook successfully, the following connectors are required.
# MAGIC - [azure-eventhubs-spark](https://github.com/Azure/azure-event-hubs-spark)
# MAGIC - [azure-cosmosdb-spark (uber jar)](https://github.com/Azure/azure-cosmosdb-spark#using-databricks-notebooks)
# COMMAND ----------
from pyspark.sql.functions import *
from pyspark.sql.types import *
# COMMAND ----------
connectionString = "Endpoint=sb://<EVENTHUB_COMPATIBLE_ENDPOINT>.servicebus.windows.net/;SharedAccessKeyName=iothubowner;SharedAccessKey=<SHARED_ACCESS_KEY>;EntityPath=<IOTHUB_NAME>"
ehConf = {
"eventhubs.connectionString": connectionString,
"eventhubs.consumerGroup": "$Default"
}
# COMMAND ----------
inputStream = spark.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
bodyNoSchema = inputStream.selectExpr("CAST(body as STRING)")
# COMMAND ----------
schema = StructType([
StructField("messageId", IntegerType()),
StructField("deviceId", StringType()),
StructField("temperature", DoubleType()),
StructField("humidity", DoubleType())
])
bodyWithSchema = bodyNoSchema.select(col("body"), from_json(col("body"), schema).alias("data"))
ehStream = bodyWithSchema.select("data.*")
# COMMAND ----------
display(ehStream)
# COMMAND ----------
cosmosDbConfig = {
"Endpoint" : "https://<COSMOSDB_ENDPOINT>.documents.azure.com:443/",
"Masterkey" : "<COSMOSDB_PRIMARYKEY>",
"Database" : "<DATABASE>",
"Collection" : "<COLLECTION>",
"Upsert" : "true"
}
cosmosDbStreamWriter = ehStream \
.writeStream \
.outputMode("append") \
.format("com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSinkProvider") \
.options(**cosmosDbConfig) \
.option("checkpointLocation", "/tmp/streamingCheckpoint") \
.start()
# COMMAND ----------