From b29a3cad5c1d3959e1980f107873190f5272beee Mon Sep 17 00:00:00 2001 From: Luke Benson Date: Wed, 17 Mar 2021 12:30:45 +0000 Subject: [PATCH] Added Encounter table and changed to reading from parquet --- FHIR to OMOP.ipynb | 398 +++++++---------------------------------- VISIT_OCCURRENCE.ipynb | 272 ++++++++++++++++++++++++++++ 2 files changed, 336 insertions(+), 334 deletions(-) create mode 100644 VISIT_OCCURRENCE.ipynb diff --git a/FHIR to OMOP.ipynb b/FHIR to OMOP.ipynb index f3f4cbd..3576af2 100644 --- a/FHIR to OMOP.ipynb +++ b/FHIR to OMOP.ipynb @@ -22,38 +22,14 @@ }, { "cell_type": "code", - "execution_count": 132, + "execution_count": 1, "id": "contrary-alabama", - "metadata": {}, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "An error was encountered:\n", - "'SparkSession' object has no attribute 'getConf'\n", - "Traceback (most recent call last):\n", - "AttributeError: 'SparkSession' object has no attribute 'getConf'\n", - "\n" - ] - } - ], + "metadata": { + "scrolled": false + }, + "outputs": [], "source": [ - "spark.sparkContext.getConf().get('spark.driver.memory')" + "#spark.sparkContext.getConf().get('spark.driver.memory')" ] }, { @@ -61,65 +37,32 @@ "id": "convertible-tradition", "metadata": {}, "source": [ - "### Load DynamicFrame from Glue Catalog\n", - "\n", - "This is similar to DataFrame but allows for dynamic schema changes which is what we want" + "### Load Data Frame from Parquet Catalog File" ] }, { "cell_type": "code", - "execution_count": 160, + "execution_count": 2, "id": "dynamic-rabbit", "metadata": {}, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], + "outputs": [], "source": [ - "from awsglue.context import GlueContext\n", "from pyspark.sql import SparkSession\n", - "from awsglue.dynamicframe import DynamicFrame\n", + "from pyspark.sql.functions import dayofmonth,month,year,to_date,trunc,split,explode,array\n", "\n", - "glueContext = GlueContext(SparkSession.builder.enableHiveSupport().getOrCreate())\n", - "spark = glueContext.spark_session\n" + "# Create a local Spark session\n", + "spark = SparkSession.builder.appName('etl').getOrCreate()\n" ] }, { "cell_type": "code", - "execution_count": 161, + "execution_count": 3, "id": "fluid-edwards", "metadata": {}, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], + "outputs": [], "source": [ - "df = glueContext.create_dynamic_frame.from_catalog(\n", - " database=\"fhir-catalog\", table_name=\"resource_db_dev\")" + "# Read in our data\n", + "df = spark.read.parquet('data/catalog.parquet')" ] }, { @@ -132,92 +75,30 @@ }, { "cell_type": "code", - "execution_count": 162, + "execution_count": 4, "id": "italian-intranet", "metadata": { "scrolled": false }, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], + "outputs": [], "source": [ "#df.printSchema()" ] }, - { - "cell_type": "markdown", - "id": "delayed-complaint", - "metadata": {}, - "source": [ - "### Start with Patient" - ] - }, - { - "cell_type": "code", - "execution_count": 163, - "id": "equivalent-grant", - "metadata": {}, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], - "source": [ - "df = df.toDF()" - ] - }, { "cell_type": "markdown", "id": "little-cylinder", "metadata": {}, "source": [ - "Other resource types " + "List of different resource types " ] }, { "cell_type": "code", - "execution_count": 164, + "execution_count": 5, "id": "celtic-transfer", "metadata": {}, "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, { "name": "stdout", "output_type": "stream", @@ -244,7 +125,8 @@ "| CareTeam|\n", "| Encounter|\n", "| Practitioner|\n", - "+--------------------+" + "+--------------------+\n", + "\n" ] } ], @@ -252,6 +134,14 @@ "df.select('resourceType').distinct().show()" ] }, + { + "cell_type": "markdown", + "id": "delayed-complaint", + "metadata": {}, + "source": [ + "### Patient Mapping" + ] + }, { "cell_type": "markdown", "id": "material-country", @@ -262,50 +152,20 @@ }, { "cell_type": "code", - "execution_count": 165, + "execution_count": 6, "id": "looking-greensboro", "metadata": {}, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], + "outputs": [], "source": [ "patients = df.filter(df['resourceType'] == 'Patient')" ] }, { "cell_type": "code", - "execution_count": 177, + "execution_count": 7, "id": "virgin-alaska", "metadata": {}, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], + "outputs": [], "source": [ "#patients.printSchema()" ] @@ -320,25 +180,10 @@ }, { "cell_type": "code", - "execution_count": 178, + "execution_count": 8, "id": "fabulous-marijuana", "metadata": {}, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], + "outputs": [], "source": [ "persons = patients.select(['id','gender','birthDate'])" ] @@ -353,50 +198,20 @@ }, { "cell_type": "code", - "execution_count": 179, + "execution_count": 9, "id": "olympic-occupation", "metadata": {}, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], + "outputs": [], "source": [ "from pyspark.sql.functions import dayofmonth,month,year,to_date" ] }, { "cell_type": "code", - "execution_count": 180, + "execution_count": 10, "id": "weird-currency", "metadata": {}, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], + "outputs": [], "source": [ "stage_persons = persons\\\n", " .withColumn(\"year_of_birth\",year(persons['birthDate']))\\\n", @@ -407,25 +222,10 @@ }, { "cell_type": "code", - "execution_count": 181, + "execution_count": 11, "id": "compact-workstation", "metadata": {}, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], + "outputs": [], "source": [ "#stage_persons.select([\n", "# \"year_of_birth\",\"month_of_birth\",\"day_of_birth\",\"birth_datetime\"\n", @@ -442,25 +242,10 @@ }, { "cell_type": "code", - "execution_count": 182, + "execution_count": 12, "id": "copyrighted-delaware", "metadata": {}, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], + "outputs": [], "source": [ "patient_dataframe = stage_persons.withColumnRenamed(\"identifier\",\"person_id\")\\\n", " .withColumnRenamed(\"gender\",\"gender_concept_id\")" @@ -477,26 +262,12 @@ }, { "cell_type": "code", - "execution_count": 183, + "execution_count": 13, "id": "executed-player", "metadata": { "scrolled": true }, "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, { "name": "stdout", "output_type": "stream", @@ -504,13 +275,14 @@ "+--------------------+-----------------+----------+-------------+--------------+------------+--------------+\n", "| id|gender_concept_id| birthDate|year_of_birth|month_of_birth|day_of_birth|birth_datetime|\n", "+--------------------+-----------------+----------+-------------+--------------+------------+--------------+\n", + "|892799c4-760c-445...| female|2008-04-18| 2008| 4| 18| 2008-04-18|\n", + "|394cbec0-93ce-4a9...| female|1943-08-30| 1943| 8| 30| 1943-08-30|\n", + "|96f58e83-0237-4a8...| female|2009-01-01| 2009| 1| 1| 2009-01-01|\n", "|b1a91dd8-27d9-439...| male|1991-10-11| 1991| 10| 11| 1991-10-11|\n", - "|5697c724-a5cd-479...| female|2001-08-03| 2001| 8| 3| 2001-08-03|\n", - "|81bfb1ae-323f-43a...| female|2018-05-11| 2018| 5| 11| 2018-05-11|\n", - "|e3b2af8e-24ce-493...| female|1980-06-16| 1980| 6| 16| 1980-06-16|\n", - "|1eb90da7-fff7-46a...| male|1988-05-06| 1988| 5| 6| 1988-05-06|\n", + "|b9d2e182-6859-402...| male|2005-11-17| 2005| 11| 17| 2005-11-17|\n", "+--------------------+-----------------+----------+-------------+--------------+------------+--------------+\n", - "only showing top 5 rows" + "only showing top 5 rows\n", + "\n" ] } ], @@ -528,84 +300,42 @@ }, { "cell_type": "code", - "execution_count": 184, + "execution_count": 14, "id": "primary-lebanon", "metadata": {}, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], + "outputs": [], "source": [ - "patient_dynamicframe = DynamicFrame.fromDF(patient_dataframe,glueContext,\"patient_dynamicframe\")" + "#patient_dynamicframe = DynamicFrame.fromDF(patient_dataframe,glueContext,\"patient_dynamicframe\")" ] }, { "cell_type": "code", - "execution_count": 185, + "execution_count": 15, "id": "clinical-custom", "metadata": {}, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "root\n", - "|-- id: string\n", - "|-- gender_concept_id: string\n", - "|-- birthDate: string\n", - "|-- year_of_birth: int\n", - "|-- month_of_birth: int\n", - "|-- day_of_birth: int\n", - "|-- birth_datetime: date" - ] - } - ], + "outputs": [], "source": [ - "patient_dynamicframe.printSchema()" + "#patient_dynamicframe.printSchema()" ] } ], "metadata": { "kernelspec": { - "display_name": "PySpark", + "display_name": "etl", "language": "python", - "name": "pysparkkernel" + "name": "etl" }, "language_info": { "codemirror_mode": { - "name": "python", + "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", - "name": "pyspark", - "pygments_lexer": "python3" + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.2" } }, "nbformat": 4, diff --git a/VISIT_OCCURRENCE.ipynb b/VISIT_OCCURRENCE.ipynb new file mode 100644 index 0000000..47c5960 --- /dev/null +++ b/VISIT_OCCURRENCE.ipynb @@ -0,0 +1,272 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "facial-wyoming", + "metadata": {}, + "source": [ + "## Vist Occurance Table Mapping\n", + "\n", + "This is an attempt at mapping FHIR to OMOP using the following guide: https://build.fhir.org/ig/HL7/cdmh/profiles.html#omop-to-fhir-mappings\n", + "
In this notebook we are mapping FHIR to the OMOP Encounter Table" + ] + }, + { + "cell_type": "markdown", + "id": "express-citizen", + "metadata": {}, + "source": [ + "### Load Data Frame from Parquet Catalog File" + ] + }, + { + "cell_type": "code", + "execution_count": 45, + "id": "automotive-dubai", + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession\n", + "from pyspark.sql.functions import dayofmonth,month,year,to_date,trunc,split,explode,array\n", + "\n", + "# Create a local Spark session\n", + "spark = SparkSession.builder.appName('etl').getOrCreate()" + ] + }, + { + "cell_type": "code", + "execution_count": 46, + "id": "further-equivalent", + "metadata": {}, + "outputs": [], + "source": [ + "# Reads file \n", + "df = spark.read.parquet('data/catalog.parquet')" + ] + }, + { + "cell_type": "markdown", + "id": "muslim-avatar", + "metadata": {}, + "source": [ + "Data Frame schema " + ] + }, + { + "cell_type": "code", + "execution_count": 47, + "id": "furnished-residence", + "metadata": {}, + "outputs": [], + "source": [ + "#df.printSchema()" + ] + }, + { + "cell_type": "markdown", + "id": "extra-channels", + "metadata": {}, + "source": [ + "### Encounter Mapping " + ] + }, + { + "cell_type": "markdown", + "id": "fatal-confirmation", + "metadata": {}, + "source": [ + "Filter By Encounter Resource type " + ] + }, + { + "cell_type": "code", + "execution_count": 48, + "id": "undefined-building", + "metadata": {}, + "outputs": [], + "source": [ + "filtered = df.filter(df['resourceType'] == 'Encounter')" + ] + }, + { + "cell_type": "code", + "execution_count": 49, + "id": "qualified-principal", + "metadata": {}, + "outputs": [], + "source": [ + "#filtered.show(20)" + ] + }, + { + "cell_type": "markdown", + "id": "distinct-hydrogen", + "metadata": {}, + "source": [ + "Selects relevant fields " + ] + }, + { + "cell_type": "code", + "execution_count": 51, + "id": "sublime-reach", + "metadata": {}, + "outputs": [], + "source": [ + "Encounter = filtered.select(['id','subject','type',\n", + " 'location','hospitalization.admitSource',\n", + " 'period','extension'])\n", + "#Encounter.printSchema()" + ] + }, + { + "cell_type": "markdown", + "id": "marine-editing", + "metadata": {}, + "source": [ + "Explode the location structure to form the fields \"care_site_id\" and \"discharge_to_concept_id\"
TODO: Find the correct origin field" + ] + }, + { + "cell_type": "code", + "execution_count": 52, + "id": "racial-functionality", + "metadata": {}, + "outputs": [], + "source": [ + "#Encounter.withColumn(\"care_site_id\", \"location.location\")))\\\n", + "# .withColumn(\"discharge_to_concept_id\", explode(array(\"location.physicalType\")))\\\n", + "# .show(10)" + ] + }, + { + "cell_type": "markdown", + "id": "crude-royal", + "metadata": {}, + "source": [ + "Extract the start and end date along with the time from the period field." + ] + }, + { + "cell_type": "code", + "execution_count": 53, + "id": "ultimate-investigator", + "metadata": {}, + "outputs": [], + "source": [ + "#splits the date and time\n", + "split_start = split(Encounter['period.start'], 'T')\n", + "split_end = split(Encounter['period.end'], 'T') \n", + "\n", + "#assigns each to a column \n", + "vist_date_time = Encounter\\\n", + " .withColumn(\"visit_start_date\",split_start.getItem(0))\\\n", + " .withColumn(\"visit_start_datetime\",split_start.getItem(1))\\\n", + " .withColumn(\"visit_end_date\",split_end.getItem(0))\\\n", + " .withColumn(\"visit_end_datetime\",split_end.getItem(1))" + ] + }, + { + "cell_type": "markdown", + "id": "million-transition", + "metadata": {}, + "source": [ + "Drop columns no longer needed" + ] + }, + { + "cell_type": "code", + "execution_count": 54, + "id": "under-blanket", + "metadata": {}, + "outputs": [], + "source": [ + "dropped = vist_date_time.drop(\"period\")" + ] + }, + { + "cell_type": "markdown", + "id": "potential-consciousness", + "metadata": {}, + "source": [ + "Rename the columns " + ] + }, + { + "cell_type": "code", + "execution_count": 55, + "id": "beginning-lighting", + "metadata": {}, + "outputs": [], + "source": [ + "visit_occurnace = dropped\\\n", + " .withColumnRenamed(\"type\",\"preceding_visit_occurence\")\\\n", + " .withColumnRenamed(\"id\",\"visit_occurence_id\")\\\n", + " .withColumnRenamed(\"admitSource\",\"admitting_source_concept_id\")\\\n", + " .withColumnRenamed(\"subject\",\"person_id\")\\\n", + " .withColumnRenamed(\"type\",\"preceding_visit_occurence\")\\\n", + " .withColumnRenamed(\"extension\",\"visit_type_concept_id\")\n", + "\n", + "#.withColumnRenamed(\"location.location.id\",\"care_site_id\")\\ \n", + "#.withColumnRenamed(\"location.location.type\",\"discharge_to_concept_id\")\\" + ] + }, + { + "cell_type": "markdown", + "id": "female-hardware", + "metadata": {}, + "source": [ + "Shows mapped output table" + ] + }, + { + "cell_type": "code", + "execution_count": 56, + "id": "liable-petite", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------------------+--------------------+-------------------------+--------------------+---------------------------+---------------------+----------------+--------------------+--------------+------------------+\n", + "| visit_occurence_id| person_id|preceding_visit_occurence| location|admitting_source_concept_id|visit_type_concept_id|visit_start_date|visit_start_datetime|visit_end_date|visit_end_datetime|\n", + "+--------------------+--------------------+-------------------------+--------------------+---------------------------+---------------------+----------------+--------------------+--------------+------------------+\n", + "|a3d6098b-0af4-4d9...|{urn:uuid:f245361...| {[{[{http://snome...| null| null| null| 2016-09-04| 02:36:34-12:00| 2016-09-04| 02:51:34-12:00|\n", + "|c420368d-3336-44a...|{urn:uuid:46c9970...| {[{[{http://snome...|{[{{Location?iden...| null| null| 2011-03-23| 20:41:57+02:00| 2011-03-23| 21:33:57+02:00|\n", + "|5fe41249-3193-4f4...|{urn:uuid:3ba0e01...| {[{[{http://snome...| null| null| null| 2020-03-22| 19:32:05+02:00| 2020-03-22| 19:47:05+02:00|\n", + "|aa911c12-6950-40e...|{urn:uuid:193682a...| {[{[{http://snome...| null| null| null| 2018-06-12| 02:43:25-12:00| 2018-06-12| 02:58:25-12:00|\n", + "|df0107b5-e588-49f...|{urn:uuid:ce95a00...| {[{[{http://snome...| null| null| null| 2010-10-13| 22:42:53-12:00| 2010-10-13| 22:57:53-12:00|\n", + "+--------------------+--------------------+-------------------------+--------------------+---------------------------+---------------------+----------------+--------------------+--------------+------------------+\n", + "only showing top 5 rows\n", + "\n" + ] + } + ], + "source": [ + "visit_occurnace.show(5) " + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "etl", + "language": "python", + "name": "etl" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.2" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}