Skip to content

Commit

Permalink
Bug fixes for 0.5.1 (#106)
Browse files Browse the repository at this point in the history
* Remove Stable declaration from delete_items function

* Python dehydrate marks top-level base_keys as DNM

Fixes and tests that dehydration is aware of top-level keys on the base
item which should be marked as do-not-merge on the dehydrated item.

* Fix test class to use hydration module

* update changelog


* update tests for changes in fields extension, add simpler testing for sql queries

* update version to 0.6.0, update changelog

* add triggers to allow deleting collections and cleaning up partitions

Co-authored-by: Rob Emanuele <rdemanuele@gmail.com>
Co-authored-by: Matt McFarland <mmcfarland@microsoft.com>
  • Loading branch information
3 people authored May 12, 2022
1 parent 98ac920 commit ed77040
Show file tree
Hide file tree
Showing 34 changed files with 4,998 additions and 846 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
# Changelog
## [v0.6.0]

### Fixed
- Fix function signatures for transactional functions (delete_item etc) to make sure that they are marked as volatile
- Fix function for getting start/end dates from a stac item
### Changed
- Update hydration/dehydration logic to make sure that it matches hydration/dehydration in pypgstac
- Update fields logic in pgstac to only use full paths and to match logic in stac-fastapi
- Always include id and collection on features regardless of fields setting
### Added
- Add tests to ensure that pgstac and pypgstac hydration logic is equivalent
- Add conf item to search to allow returning results without hydrating. This allows an application using pgstac to shift the CPU load of rehydrating items from the database onto the application server.
- Add "--dehydrated" option to loader to be able to load a dehydrated file (or iterable) of items such as would be output using pg_dump or postgresql copy.
- Add "--chunksize" option to loader that can split the processing of an iterable or file into chunks of n records at a time

## [v0.5.1]

### Fixed
Expand Down
10 changes: 7 additions & 3 deletions pypgstac/pypgstac/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,15 @@ def func(self, function_name: str, *args: Any) -> Generator:
"""Call a database function."""
placeholders = sql.SQL(", ").join(sql.Placeholder() * len(args))
func = sql.Identifier(function_name)
cleaned_args = []
for arg in args:
if isinstance(arg, dict):
cleaned_args.append(psycopg.types.json.Jsonb(arg))
else:
cleaned_args.append(arg)
base_query = sql.SQL("SELECT * FROM {}({});").format(func, placeholders)
return self.query(base_query, *args)
return self.query(base_query, cleaned_args)

def search(self, query: Union[dict, str, psycopg.types.json.Jsonb] = "{}") -> str:
"""Search PgStac."""
if isinstance(query, dict):
query = psycopg.types.json.Jsonb(query)
return dumps(next(self.func("search", query))[0])
18 changes: 13 additions & 5 deletions pypgstac/pypgstac/hydration.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Hydrate data in pypgstac rather than on the database."""
from copy import deepcopy
from typing import Any, Dict

Expand All @@ -11,7 +12,6 @@ def hydrate(base_item: Dict[str, Any], item: Dict[str, Any]) -> Dict[str, Any]:
This will not perform a deep copy; values of the original item will be referenced
in the return item.
"""

# Merge will mutate i, but create deep copies of values in the base item
# This will prevent the base item values from being mutated, e.g. by
# filtering out fields in `filter_fields`.
Expand Down Expand Up @@ -103,6 +103,10 @@ def strip(base_value: Dict[str, Any], item_value: Dict[str, Any]) -> Dict[str, A
else:
# Unequal non-dict values are copied over from the incoming item
out[key] = value

# Mark any top-level keys from the base_item that are not in the incoming item
apply_marked_keys(base_value, item_value, out)

return out

return strip(base_item, full_item)
Expand All @@ -113,13 +117,17 @@ def apply_marked_keys(
full_item: Dict[str, Any],
dehydrated: Dict[str, Any],
) -> None:
"""
"""Mark keys.
Mark any keys that are present on the base item but not in the incoming item
as `do-not-merge` on the dehydrated item. This will prevent they key from
being rehydrated.
This modifies the dehydrated item in-place.
"""
marked_keys = [key for key in base_item if key not in full_item.keys()]
marked_dict = {k: DO_NOT_MERGE_MARKER for k in marked_keys}
dehydrated.update(marked_dict)
try:
marked_keys = [key for key in base_item if key not in full_item.keys()]
marked_dict = {k: DO_NOT_MERGE_MARKER for k in marked_keys}
dehydrated.update(marked_dict)
except TypeError:
pass
168 changes: 119 additions & 49 deletions pypgstac/pypgstac/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
Dict,
Iterable,
Iterator,
List,
Optional,
Tuple,
Union,
Generator,
TextIO,
)

import csv
import orjson
import psycopg
from orjson import JSONDecodeError
Expand All @@ -42,6 +41,16 @@
logger = logging.getLogger(__name__)


def chunked_iterable(iterable: Iterable, size: Optional[int] = 10000) -> Iterable:
"""Chunk an iterable."""
it = iter(iterable)
while True:
chunk = tuple(itertools.islice(it, size))
if not chunk:
break
yield chunk


class Tables(str, Enum):
"""Available tables for loading."""

Expand Down Expand Up @@ -133,6 +142,7 @@ class Loader:
"""Utilities for loading data."""

db: PgstacDB
_partition_cache: Optional[dict] = None

@lru_cache
def collection_json(self, collection_id: str) -> Tuple[dict, int, str]:
Expand All @@ -149,6 +159,7 @@ def collection_json(self, collection_id: str) -> Tuple[dict, int, str]:
raise Exception(
f"Collection {collection_id} is not present in the database"
)
logger.debug(f"Found {collection_id} with base_item {base_item}")
return base_item, key, partition_trunc

def load_collections(
Expand Down Expand Up @@ -270,7 +281,16 @@ def load_partition(
) as copy:
for item in items:
item.pop("partition")
copy.write_row(list(item.values()))
copy.write_row(
(
item["id"],
item["collection"],
item["datetime"],
item["end_datetime"],
item["geometry"],
item["content"],
)
)
logger.debug(cur.statusmessage)
logger.debug(f"Rows affected: {cur.rowcount}")
elif insert_mode in (
Expand All @@ -295,7 +315,16 @@ def load_partition(
) as copy:
for item in items:
item.pop("partition")
copy.write_row(list(item.values()))
copy.write_row(
(
item["id"],
item["collection"],
item["datetime"],
item["end_datetime"],
item["geometry"],
item["content"],
)
)
logger.debug(cur.statusmessage)
logger.debug(f"Copied rows: {cur.rowcount}")

Expand Down Expand Up @@ -369,61 +398,102 @@ def load_partition(
f"Copying data for {partition} took {time.perf_counter() - t} seconds"
)

def _partition_update(self, item: dict) -> str:

p = item.get("partition", None)
if p is None:
_, key, partition_trunc = self.collection_json(item["collection"])
if partition_trunc == "year":
pd = item["datetime"].replace("-", "")[:4]
p = f"_items_{key}_{pd}"
elif partition_trunc == "month":
pd = item["datetime"].replace("-", "")[:6]
p = f"_items_{key}_{pd}"
else:
p = f"_items_{key}"
item["partition"] = p

if self._partition_cache is None:
self._partition_cache = {}

partition = self._partition_cache.get(
item["partition"],
{
"partition": None,
"collection": None,
"mindt": None,
"maxdt": None,
"minedt": None,
"maxedt": None,
},
)

partition["partition"] = item["partition"]
partition["collection"] = item["collection"]
if partition["mindt"] is None or item["datetime"] < partition["mindt"]:
partition["mindt"] = item["datetime"]

if partition["maxdt"] is None or item["datetime"] > partition["maxdt"]:
partition["maxdt"] = item["datetime"]

if partition["minedt"] is None or item["end_datetime"] < partition["minedt"]:
partition["minedt"] = item["end_datetime"]

if partition["maxedt"] is None or item["end_datetime"] > partition["maxedt"]:
partition["maxedt"] = item["end_datetime"]
self._partition_cache[item["partition"]] = partition

return p

def read_dehydrated(self, file: Union[Path, str] = "stdin") -> Generator:
if file is None:
file = "stdin"
if isinstance(file, str):
open_file: Any = open_std(file, "r")
with open_file as f:
fields = [
"id",
"geometry",
"collection",
"datetime",
"end_datetime",
"content",
]
csvreader = csv.DictReader(f, fields, delimiter="\t")
for item in csvreader:
item["partition"] = self._partition_update(item)
yield item

def read_hydrated(
self, file: Union[Path, str, Iterator[Any]] = "stdin"
) -> Generator:
for line in read_json(file):
item = self.format_item(line)
item["partition"] = self._partition_update(item)
yield item

def load_items(
self,
file: Union[Path, str, Iterator[Any]] = "stdin",
insert_mode: Optional[Methods] = Methods.insert,
dehydrated: Optional[bool] = False,
chunksize: Optional[int] = 10000,
) -> None:
"""Load items json records."""
if file is None:
file = "stdin"
t = time.perf_counter()
items: List = []
partitions: dict = {}
for line in read_json(file):
item = self.format_item(line)
items.append(item)
partition = partitions.get(
item["partition"],
{
"partition": None,
"collection": None,
"mindt": None,
"maxdt": None,
"minedt": None,
"maxedt": None,
},
)
partition["partition"] = item["partition"]
partition["collection"] = item["collection"]
if partition["mindt"] is None or item["datetime"] < partition["mindt"]:
partition["mindt"] = item["datetime"]

if partition["maxdt"] is None or item["datetime"] > partition["maxdt"]:
partition["maxdt"] = item["datetime"]

if (
partition["minedt"] is None
or item["end_datetime"] < partition["minedt"]
):
partition["minedt"] = item["end_datetime"]

if (
partition["maxedt"] is None
or item["end_datetime"] > partition["maxedt"]
):
partition["maxedt"] = item["end_datetime"]
partitions[item["partition"]] = partition
logger.debug(
f"Loading and parsing data took {time.perf_counter() - t} seconds."
)
t = time.perf_counter()
items.sort(key=lambda x: x["partition"])
logger.debug(f"Sorting data took {time.perf_counter() - t} seconds.")
t = time.perf_counter()
self._partition_cache = {}

if dehydrated and isinstance(file, str):
items = self.read_dehydrated(file)
else:
items = self.read_hydrated(file)

for k, g in itertools.groupby(items, lambda x: x["partition"]):
self.load_partition(partitions[k], g, insert_mode)
for chunk in chunked_iterable(items, chunksize):
list(chunk).sort(key=lambda x: x["partition"])
for k, g in itertools.groupby(chunk, lambda x: x["partition"]):
self.load_partition(self._partition_cache[k], g, insert_mode)

logger.debug(f"Adding data to database took {time.perf_counter() - t} seconds.")

Expand Down
Loading

0 comments on commit ed77040

Please sign in to comment.