Skip to content

Commit

Permalink
Added Location #8
Browse files Browse the repository at this point in the history
  • Loading branch information
joekendal committed Mar 18, 2021
1 parent fdab614 commit c1da9c5
Showing 1 changed file with 355 additions and 0 deletions.
355 changes: 355 additions & 0 deletions Location.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,355 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "public-strengthening",
"metadata": {},
"outputs": [],
"source": [
"from pyspark.context import SparkContext, SparkConf\n",
"from awsglue.dynamicframe import DynamicFrame"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "revised-specific",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[('spark.executor.extraClassPath', '/Users/joe/aws-glue-libs/jarsv1/*'),\n",
" ('spark.rdd.compress', 'True'),\n",
" ('spark.driver.host', '192.168.0.14'),\n",
" ('spark.serializer.objectStreamReset', '100'),\n",
" ('spark.driver.port', '65079'),\n",
" ('spark.master', 'local[*]'),\n",
" ('spark.executor.id', 'driver'),\n",
" ('spark.submit.deployMode', 'client'),\n",
" ('spark.app.id', 'local-1616068294858'),\n",
" ('spark.ui.showConsoleProgress', 'true'),\n",
" ('spark.app.name', 'pyspark-shell'),\n",
" ('spark.driver.extraClassPath', '/Users/joe/aws-glue-libs/jarsv1/*')]"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"jars = '/Users/joe/aws-glue-libs/jarsv1/*'\n",
"sc = SparkContext(conf=SparkConf().setAll([\n",
" ('spark.executor.extraClassPath', jars),\n",
" ('spark.driver.extraClassPath', jars)\n",
"]))\n",
"\n",
"sc.getConf().getAll()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "overall-lafayette",
"metadata": {},
"outputs": [],
"source": [
"from awsglue.context import GlueContext\n",
"\n",
"glueContext = GlueContext(sc)\n",
"\n",
"spark = glueContext.spark_session"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "loving-fusion",
"metadata": {},
"outputs": [],
"source": [
"data = spark.read.parquet('data/catalog.parquet')"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "forward-cable",
"metadata": {},
"outputs": [],
"source": [
"datasource = DynamicFrame.fromDF(data, glueContext, 'datasource')"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "governing-april",
"metadata": {},
"outputs": [],
"source": [
"locations = datasource.filter(\n",
" lambda r: r['resourceType'] == 'Location'\n",
")\n",
"locations = locations.select_fields(\n",
" ['identifier','name','type','address','position']\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "european-texture",
"metadata": {},
"outputs": [],
"source": [
"df = locations.toDF()\n",
"\n",
"# care_sites = df.na.drop(subset=[\"type\"])"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "aggressive-delicious",
"metadata": {},
"outputs": [],
"source": [
"df.show()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "round-sterling",
"metadata": {},
"outputs": [],
"source": [
"import pyspark.sql.functions as F"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "opposed-surname",
"metadata": {},
"outputs": [],
"source": [
"df = df.withColumn('exploded', F.explode('identifier'))"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "portable-multimedia",
"metadata": {},
"outputs": [],
"source": [
"df = df.withColumn('id', df['exploded']['value'])"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "chemical-sword",
"metadata": {},
"outputs": [],
"source": [
"df = df.drop(*['exploded','identifier'])"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "level-psychology",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+----+--------------------+--------------------+--------------------+\n",
"| name|type| address| position| id|\n",
"+--------------------+----+--------------------+--------------------+--------------------+\n",
"|[FALMOUTH HOSPITAL,]|null|[[02540, FALMOUTH...|[41.57072, -70.55...|9cf3cd22-2eec-34e...|\n",
"| [PCP142036,]|null|[[02536-5671, TEA...|[41.562579, -70.5...|830716da-523f-3ca...|\n",
"+--------------------+----+--------------------+--------------------+--------------------+\n",
"\n"
]
}
],
"source": [
"df.show()"
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "given-frequency",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- name: struct (nullable = true)\n",
" | |-- string: string (nullable = true)\n",
" | |-- array: null (nullable = true)\n",
" |-- type: null (nullable = true)\n",
" |-- address: struct (nullable = true)\n",
" | |-- struct: struct (nullable = true)\n",
" | | |-- postalCode: string (nullable = true)\n",
" | | |-- city: string (nullable = true)\n",
" | | |-- country: string (nullable = true)\n",
" | | |-- state: string (nullable = true)\n",
" | | |-- line: array (nullable = true)\n",
" | | | |-- element: string (containsNull = true)\n",
" | |-- array: null (nullable = true)\n",
" |-- position: struct (nullable = true)\n",
" | |-- latitude: double (nullable = true)\n",
" | |-- longitude: double (nullable = true)\n",
" |-- id: string (nullable = true)\n",
"\n"
]
}
],
"source": [
"df.printSchema()"
]
},
{
"cell_type": "code",
"execution_count": 16,
"id": "productive-narrative",
"metadata": {},
"outputs": [],
"source": [
"df = df.withColumn('city', F.col('address.struct.city'))\\\n",
" .withColumn('state', F.col('address.struct.state'))\\\n",
" .withColumn('zip', F.col('address.struct.postalCode'))\\\n",
" .withColumn('country', F.col('address.struct.country'))"
]
},
{
"cell_type": "code",
"execution_count": 18,
"id": "unexpected-watershed",
"metadata": {},
"outputs": [],
"source": [
"# df = df.withColumn('exploded', F.explode('address.struct.line'))"
]
},
{
"cell_type": "code",
"execution_count": 22,
"id": "clean-bradley",
"metadata": {},
"outputs": [],
"source": [
"df = df.withColumn('address_1', F.col('address.struct.line').getItem(0))\n",
"df = df.withColumn('address_2', F.col('address.struct.line').getItem(1))"
]
},
{
"cell_type": "code",
"execution_count": 24,
"id": "elect-guide",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+---------+\n",
"| address_1|address_2|\n",
"+--------------------+---------+\n",
"|67 & 100 TER HEUN...| null|\n",
"|270 TEATICKET HWY 1A| null|\n",
"+--------------------+---------+\n",
"\n"
]
}
],
"source": [
"df.select(['address_1', 'address_2']).show()"
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "dressed-closer",
"metadata": {},
"outputs": [],
"source": [
"df = df.withColumnRenamed('id', 'location_id')"
]
},
{
"cell_type": "code",
"execution_count": 32,
"id": "alien-enemy",
"metadata": {},
"outputs": [],
"source": [
"df = df.drop(*['address','position','exploded','name','type'])"
]
},
{
"cell_type": "code",
"execution_count": 33,
"id": "western-ethnic",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+---------+-----+----------+-------+--------------------+---------+\n",
"| location_id| city|state| zip|country| address_1|address_2|\n",
"+--------------------+---------+-----+----------+-------+--------------------+---------+\n",
"|9cf3cd22-2eec-34e...| FALMOUTH| MA| 02540| US|67 & 100 TER HEUN...| null|\n",
"|830716da-523f-3ca...|TEATICKET| MA|02536-5671| US|270 TEATICKET HWY 1A| null|\n",
"+--------------------+---------+-----+----------+-------+--------------------+---------+\n",
"\n"
]
}
],
"source": [
"df.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "aggressive-chester",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "etl",
"language": "python",
"name": "etl"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.10"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

0 comments on commit c1da9c5

Please sign in to comment.