Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util to generate ingest template from library #1258

Merged
merged 4 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions dcpy/connectors/edm/recipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import pandas as pd
from pathlib import Path
from pyarrow import parquet
import shutil
from tempfile import TemporaryDirectory
from typing import Callable
Expand Down Expand Up @@ -137,6 +138,16 @@
return ingest.Config(**config)


def get_parquet_metadata(id: str, version="latest") -> parquet.FileMetaData:
s3_fs = s3.pyarrow_fs()
ds = parquet.ParquetDataset(

Check warning on line 143 in dcpy/connectors/edm/recipes.py

View check run for this annotation

Codecov / codecov/patch

dcpy/connectors/edm/recipes.py#L142-L143

Added lines #L142 - L143 were not covered by tests
f"{BUCKET}/{DATASET_FOLDER}/{id}/{version}/{id}.parquet", filesystem=s3_fs
)

assert len(ds.fragments) == 1, "recipes does not support multi-fragment datasets"
return ds.fragments[0].metadata

Check warning on line 148 in dcpy/connectors/edm/recipes.py

View check run for this annotation

Codecov / codecov/patch

dcpy/connectors/edm/recipes.py#L147-L148

Added lines #L147 - L148 were not covered by tests


def get_latest_version(name: str) -> str:
"""Retrieve a recipe config from s3."""
return get_config(name).dataset.version
Expand Down
13 changes: 11 additions & 2 deletions dcpy/data/compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@
SELECT
{left_keys},
st_orderingequals({lc}, {rc}) AS "ordering_equal",
st_equals({lc}, {rc}) AS "spatially_equal"
st_equals({lc}, {rc}) AS "spatially_equal",
st_geometrytype({lc}) AS "left_geom_type",
st_geometrytype({rc}) AS "right_geom_type"
FROM {left} AS "left"
INNER JOIN {right} AS "right"
ON {on}
Expand All @@ -165,7 +167,14 @@
if (column in left_geom_columns) and (column in right_geom_columns):
comp_df = client.execute_select_query(spatial_query(column))
comp_df = comp_df.set_index(key_columns)
comp_df.columns = pd.Index(["ordering_equal", "spatially_equal"])
comp_df.columns = pd.Index(

Check warning on line 170 in dcpy/data/compare.py

View check run for this annotation

Codecov / codecov/patch

dcpy/data/compare.py#L170

Added line #L170 was not covered by tests
[
"ordering_equal",
"spatially_equal",
"left_geom_type",
"right_geom_type",
]
)

elif (column not in left_geom_columns) and (column not in right_geom_columns):
comp_df = client.execute_select_query(query(column))
Expand Down
30 changes: 0 additions & 30 deletions dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_housing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,3 @@ ingestion:
insert_behavior: error
missing_key_behavior: error
mode: update_column

library_dataset:
name: dcp_pop_acs2010_housing
version: ""
acl: public-read
source:
script:
name: excel
path: https://nyc3.digitaloceanspaces.com/edm-recipes/inbox/dcp_pop_acs2010/{{ version }}/dcp_pop_acs.xlsx
sheet_name: Housing0610
geometry:
SRS: null
type: NONE

destination:
geometry:
SRS: null
type: NONE
fields: []
sql: null

info:
description: |
## 2010 ACS file from Population
This file is produced internally by the Population division. 2010 version is used as a reference dataset
for the latest ACS data, and occasionally is modified so these different subsections are archived as their
own recipe datasets so that they can easily be updated individually

url: null
dependents: []
68 changes: 68 additions & 0 deletions dcpy/lifecycle/scripts/validate_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import shutil
import typer
from typing import Literal
import yaml

Check warning on line 6 in dcpy/lifecycle/scripts/validate_ingest.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/scripts/validate_ingest.py#L6

Added line #L6 was not covered by tests

from dcpy.utils import postgres
from dcpy.utils.collections import indented_report
from dcpy.models.data import comparison
from dcpy.models.base import SortedSerializedBase, YamlWriter

Check warning on line 11 in dcpy/lifecycle/scripts/validate_ingest.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/scripts/validate_ingest.py#L11

Added line #L11 was not covered by tests
from dcpy.data import compare
from dcpy.connectors.edm import recipes
from dcpy.lifecycle.ingest.run import TMP_DIR, run as run_ingest
Expand All @@ -17,6 +19,67 @@
SCHEMA = build_metadata.build_name(os.environ.get("BUILD_NAME"))


class Converter(SortedSerializedBase, YamlWriter):
_exclude_falsey_values: bool = False
_exclude_none: bool = False
_head_sort_order: list[str] = [

Check warning on line 25 in dcpy/lifecycle/scripts/validate_ingest.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/scripts/validate_ingest.py#L22-L25

Added lines #L22 - L25 were not covered by tests
"id",
"acl",
"ingestion",
"columns",
"library_dataset",
]

id: str
acl: str
ingestion: dict
columns: list = []
library_dataset: dict

Check warning on line 37 in dcpy/lifecycle/scripts/validate_ingest.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/scripts/validate_ingest.py#L33-L37

Added lines #L33 - L37 were not covered by tests


def convert_template(dataset: str):
library_path = (

Check warning on line 41 in dcpy/lifecycle/scripts/validate_ingest.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/scripts/validate_ingest.py#L40-L41

Added lines #L40 - L41 were not covered by tests
Path(__file__).parent.parent.parent / "library" / "templates" / f"{dataset}.yml"
)
ingest_path = (

Check warning on line 44 in dcpy/lifecycle/scripts/validate_ingest.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/scripts/validate_ingest.py#L44

Added line #L44 was not covered by tests
Path(__file__).parent.parent.parent
/ "lifecycle"
/ "ingest"
/ "templates"
/ f"{dataset}.yml"
)
with open(library_path) as library_file:
library_template = yaml.safe_load(library_file)
converter = Converter(

Check warning on line 53 in dcpy/lifecycle/scripts/validate_ingest.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/scripts/validate_ingest.py#L51-L53

Added lines #L51 - L53 were not covered by tests
id=library_template["dataset"]["name"],
acl=library_template["dataset"]["acl"],
ingestion={
"source": {
"one_of": [
{"type": "local_file", "path": ""},
{"type": "file_download", "url": ""},
{"type": "api", "endpoint": "", "format": ""},
{"type": "s3", "bucket": "", "key": ""},
{
"type": "socrata",
"org": "",
"uid": "",
"format": "",
},
{"type": "edm_publishing_gis_dataset", "name": ""},
]
},
"file_format": {
"type": "csv, json, xlsx, shapefile, geojson, geodatabase",
"crs": "EPSG:2263",
},
"processing_steps": [],
},
library_dataset=library_template["dataset"],
)
converter.write_to_yaml(ingest_path)

Check warning on line 80 in dcpy/lifecycle/scripts/validate_ingest.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/scripts/validate_ingest.py#L80

Added line #L80 was not covered by tests


def library_archive(dataset: str, version: str | None = None, file_type="pgdump"):
# BEWARE: once you import library, parquet file writing fails
# Something to do with gdal's interaction with parquet file driver
Expand Down Expand Up @@ -104,6 +167,11 @@
app = typer.Typer()


@app.command("convert")
def _convert(dataset: str = typer.Argument()):
convert_template(dataset)


@app.command("run_single")
def run_single(
tool: str = typer.Argument(),
Expand Down
4 changes: 3 additions & 1 deletion dcpy/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ def _model_dump_ordered(self, handler):


class YamlWriter(BaseModel):
_exclude_none: bool = True

class _YamlTopLevelSpacesDumper(yaml.SafeDumper):
"""YAML serializer that will insert lines between top-level entries,
which is nice in longer files."""
Expand All @@ -106,7 +108,7 @@ def str_presenter(dumper, data):
with open(path, "w", encoding="utf8") as f:
f.write(
yaml.dump(
self.model_dump(exclude_none=True),
self.model_dump(exclude_none=self._exclude_none),
sort_keys=False,
default_flow_style=False,
Dumper=YamlWriter._YamlTopLevelSpacesDumper,
Expand Down
5 changes: 1 addition & 4 deletions dcpy/models/lifecycle/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dcpy.utils.metadata import RunDetails
from dcpy.models.connectors.edm import recipes, publishing
from dcpy.models.connectors import web, socrata
from dcpy.models import library, file
from dcpy.models import file
from dcpy.models.base import SortedSerializedBase


Expand Down Expand Up @@ -88,9 +88,6 @@ class Template(BaseModel, extra="forbid"):
ingestion: Ingestion
columns: list[Column] = []

## this is the original library template, included just for reference while we build out our new templates
library_dataset: library.DatasetDefinition | None = None

@property
def has_geom(self):
match self.ingestion.file_format:
Expand Down
9 changes: 9 additions & 0 deletions dcpy/utils/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from io import BytesIO
import os
from pathlib import Path
from pyarrow import fs
import pytz
from rich.progress import (
BarColumn,
Expand Down Expand Up @@ -502,6 +503,14 @@
raise Exception(f"No body found for file '{path}' in bucket '{bucket}'.")


def pyarrow_fs() -> fs.S3FileSystem:
return fs.S3FileSystem(

Check warning on line 507 in dcpy/utils/s3.py

View check run for this annotation

Codecov / codecov/patch

dcpy/utils/s3.py#L507

Added line #L507 was not covered by tests
access_key=os.environ["AWS_ACCESS_KEY_ID"],
secret_key=os.environ["AWS_SECRET_ACCESS_KEY"],
endpoint_override=os.environ.get("AWS_S3_ENDPOINT"),
)


app = typer.Typer(add_completion=False)


Expand Down
Loading