-
Notifications
You must be signed in to change notification settings - Fork 318
/
Copy pathBasic.cs
134 lines (103 loc) · 4.72 KB
/
Basic.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;
namespace Microsoft.Spark.Examples.Sql.Batch
{
/// <summary>
/// A simple example demonstrating basic Spark SQL features.
/// </summary>
internal sealed class Basic : IExample
{
public void Run(string[] args)
{
if (args.Length != 1)
{
Console.Error.WriteLine(
"Usage: Basic <path to SPARK_HOME/examples/src/main/resources/people.json>");
Environment.Exit(1);
}
SparkSession spark = SparkSession
.Builder()
.AppName("SQL basic example using .NET for Apache Spark")
.Config("spark.some.config.option", "some-value")
.GetOrCreate();
// Need to explicitly specify the schema since pickling vs. arrow formatting
// will return different types. Pickling will turn longs into ints if the values fit.
// Same as the "age INT, name STRING" DDL-format string.
var inputSchema = new StructType(new[]
{
new StructField("age", new IntegerType()),
new StructField("name", new StringType())
});
DataFrame df = spark.Read().Schema(inputSchema).Json(args[0]);
Spark.Sql.Types.StructType schema = df.Schema();
Console.WriteLine(schema.SimpleString);
IEnumerable<Row> rows = df.Collect();
foreach (Row row in rows)
{
Console.WriteLine(row);
}
df.Show();
df.PrintSchema();
df.Select("name", "age", "age", "name").Show();
df.Select(df["name"], df["age"] + 1).Show();
df.Filter(df["age"] > 21).Show();
df.GroupBy("age")
.Agg(Avg(df["age"]), Avg(df["age"]), CountDistinct(df["age"], df["age"]))
.Show();
df.CreateOrReplaceTempView("people");
// Registering Udf for SQL expression.
DataFrame sqlDf = spark.Sql("SELECT * FROM people");
sqlDf.Show();
spark.Udf().Register<int?, string, string>(
"my_udf",
(age, name) => name + " with " + ((age.HasValue) ? age.Value.ToString() : "null"));
sqlDf = spark.Sql("SELECT my_udf(*) FROM people");
sqlDf.Show();
// Using UDF via data frames.
Func<Column, Column, Column> addition = Udf<int?, string, string>(
(age, name) => name + " is " + (age.HasValue ? age.Value + 10 : 0));
df.Select(addition(df["age"], df["name"])).Show();
// Chaining example:
Func<Column, Column> addition2 = Udf<string, string>(str => $"hello {str}!");
df.Select(addition2(addition(df["age"], df["name"]))).Show();
// Multiple UDF example:
df.Select(addition(df["age"], df["name"]), addition2(df["name"])).Show();
// UDF return type as array.
Func<Column, Column> udfArray =
Udf<string, string[]>((str) => new[] { str, str + str });
df.Select(Explode(udfArray(df["name"]))).Show();
// UDF return type as map.
Func<Column, Column> udfMap =
Udf<string, IDictionary<string, string[]>>(
(str) => new Dictionary<string, string[]> { { str, new[] { str, str } } });
df.Select(udfMap(df["name"]).As("UdfMap")).Show(truncate: 50);
// Joins.
DataFrame joinedDf = df.Join(df, "name");
joinedDf.Show();
DataFrame joinedDf2 = df.Join(df, new[] { "name", "age" });
joinedDf2.Show();
DataFrame joinedDf3 = df.Join(df, df["name"] == df["name"], "outer");
joinedDf3.Show();
// Union of two data frames
DataFrame unionDf = df.Union(df);
unionDf.Show();
// Add new column to data frame
df.WithColumn("location", Lit("Seattle")).Show();
// Rename existing column
df.WithColumnRenamed("name", "fullname").Show();
// Filter rows with null age
df.Filter(Col("age").IsNull()).Show();
// Fill null values in age column with -1
df.Na().Fill(-1, new[] { "age" }).Show();
// Drop age column
df.Drop(new[] { "age" }).Show();
spark.Stop();
}
}
}