-
Notifications
You must be signed in to change notification settings - Fork 15
/
iotHubToPowerBI.scala
99 lines (69 loc) · 2.91 KB
/
iotHubToPowerBI.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// Databricks notebook source
// MAGIC %md ### Sink to Power BI Streaming Dataset from an IoT Hub Stream using [Power BI REST API](https://docs.microsoft.com/en-us/power-bi/service-real-time-streaming#pushing-data-to-datasets)
// MAGIC **Note:** [azure-event-hubs-spark](https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/structured-streaming-eventhubs-integration.md#iot-hub) library must be attached to the cluster.
// MAGIC The simulated telemetry was sent using [this C# sample](https://docs.microsoft.com/en-us/azure/iot-hub/quickstart-send-telemetry-dotnet#send-simulated-telemetry)
// COMMAND ----------
import org.apache.spark.eventhubs._
val connectionString = ConnectionStringBuilder("<EVENTHUB_COMPATIBLE_ENDPOINT>")
.setEventHubName("<EVENTHUB_COMPATIBLE_NAME>")
.build
val ehConf = EventHubsConf(connectionString)
// COMMAND ----------
val dfRaw = spark
.readStream
.format("eventhubs")
.options(ehConf.toMap)
.load()
display(dfRaw)
// COMMAND ----------
import org.apache.spark.sql.types._;
import org.apache.spark.sql.functions._
val schema = StructType(List(
StructField("temperature", DoubleType),
StructField("humidity", DoubleType))
)
val dfStream = dfRaw.select($"enqueuedTime", from_json($"body".cast("string"), schema).alias("data"))
.select($"enqueuedTime", $"data.*")
display(dfStream)
// COMMAND ----------
val dfJsonStream = dfStream.select(
to_json(
array(
struct($"enqueuedTime", $"temperature", $"humidity")
)
).alias("jsonArray")
)
display(dfJsonStream)
// COMMAND ----------
import org.apache.spark.sql.ForeachWriter
import org.apache.http.client.HttpClient
import org.apache.http.client.methods.HttpPost
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.entity.StringEntity
class PowerBISink(var powerBIEndpoint: String, var jsonArrayColumn: String) extends ForeachWriter[Row] {
var httpClient:HttpClient = _
var post:HttpPost = _
def open(partitionId: Long,version: Long): Boolean = {
httpClient = HttpClientBuilder.create().build()
post = new HttpPost(powerBIEndpoint)
post.setHeader("Content-type", "application/json")
true
}
def process(row: Row): Unit = {
val jsonArray = row.getAs[String](jsonArrayColumn)
post.setEntity(new StringEntity(jsonArray))
val response = httpClient.execute(post)
}
def close(errorOrNull: Throwable): Unit = {
}
}
// COMMAND ----------
// MAGIC %md #### Aggregate requests due to [Power BI REST API limitations](https://docs.microsoft.com/en-us/power-bi/developer/api-rest-api-limitations)
// COMMAND ----------
import org.apache.spark.sql.streaming.Trigger
val powerBIEndpoint = "https://api.powerbi.com/beta/<WORKSPACE_ID>/datasets/<DATASET_ID>/rows?key=<AUTH_KEY>"
val powerBIWriter = new PowerBISink(powerBIEndpoint, "jsonArray")
dfJsonStream.writeStream
.foreach(powerBIWriter)
.trigger(Trigger.ProcessingTime("1 second"))
.start()