Skip to content

Commit

Permalink
🔄Add minimal retry logic; 🕰 Increase timeout to 15min per epoch (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
hf-kklein authored Feb 19, 2023
1 parent 47ff72b commit a80a0b9
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 18 deletions.
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ This repository helps you with the latter. It allows you to create an up-to-date
computer. Other than if you mirrored the files using `wget` or `curl`, you'll get a clean and intuitive directory
structure.

From there you can e.g. commit the files into a VCS, scrape the PDF/Word files for later use...

From there you can e.g. commit the files into a VCS (like e.g. our [edi_energy_mirror](https://github.com/Hochfrequenz/edi_energy_mirror)), scrape the PDF/Word files for later use...

We're all hoping for the day of true digitization on which this repository will become obsolete.

Expand All @@ -43,18 +44,23 @@ Then import it and start the download:

```python
import asyncio

from edi_energy_scraper import EdiEnergyScraper

# add the following lines to enable debug logging to stdout (CLI)
# import logging
# import sys
# logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)

async def mirror():
scraper = EdiEnergyScraper(path_to_mirror_directory="edi_energy_de")
await scraper.mirror()


if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(mirror())
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asyncio.run(mirror())

```

This creates a directory structure:
Expand Down
5 changes: 3 additions & 2 deletions mwe.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ async def mirror():


if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(mirror())
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asyncio.run(mirror())
48 changes: 37 additions & 11 deletions src/edi_energy_scraper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@
# https://github.com/Hochfrequenz/edi_energy_scraper/issues/28
import datetime
import io
import itertools
import logging
import os
import re
from enum import Enum
from pathlib import Path
from random import randint
from typing import Awaitable, Dict, Optional, Set, Union

import aiohttp
from aiohttp import ServerDisconnectedError
from aiohttp_requests import Requests # type:ignore[import]
from bs4 import BeautifulSoup, Comment # type:ignore[import]
from pypdf import PdfReader
Expand Down Expand Up @@ -64,7 +67,7 @@ def __init__(
self._root_dir = Path(path_to_mirror_directory)
else:
self._root_dir = path_to_mirror_directory
self.timeout = aiohttp.ClientTimeout(total=60 * 3) # 3min
self.timeout = aiohttp.ClientTimeout(total=60 * 15) # 15min per epoch (1 asyncio.gather)
self.tcp_connector = aiohttp.TCPConnector(
limit_per_host=connection_limit,
)
Expand All @@ -87,27 +90,36 @@ async def _download_and_save_pdf(self, epoch: Epoch, file_basename: str, link: s
as the directory, if the pdf does not exist yet or if the metadata has changed since the last download.
Returns the path to the downloaded pdf.
"""

if not link.startswith("http"):
link = f"{self._root_url}/{link.strip('/')}" # remove trailing slashes from relative link

_logger.debug("Download %s", link)
for number_of_tries in range(4, 0, -1):
for number_of_tries in range(5, 0, -1):
try:
response = await self.requests.get(link, timeout=self.timeout)
break
except asyncio.TimeoutError:
_logger.exception("Timeout while downloading '%s'", link, exc_info=True)
except (asyncio.TimeoutError, ServerDisconnectedError):
_logger.warning("Timeout while downloading '%s' (%s)", link, file_basename)
if number_of_tries <= 0:
_logger.exception(
"Too many timeouts while downloading '%s' (%s)", link, file_basename, exc_info=True
)
raise
await asyncio.sleep(delay=10) # cool down...
await asyncio.sleep(delay=randint(8, 16)) # cool down...
file_name = EdiEnergyScraper._add_file_extension_to_file_basename(
headers=response.headers, file_basename=file_basename
)

file_path = self._get_file_path(file_name=file_name, epoch=epoch)

response_content = await response.content.read()
for number_of_tries in range(4, 0, -1):
try:
response_content = await response.content.read()
break
except asyncio.TimeoutError:
_logger.exception("Timeout while reading content of '%s'", file_name, exc_info=True)
if number_of_tries <= 0:
raise
await asyncio.sleep(delay=randint(5, 10)) # cool down...
# Save file if it does not exist yet
if not os.path.isfile(file_path):
with open(file_path, "wb+") as outfile: # pdfs are written as binaries
Expand Down Expand Up @@ -279,17 +291,22 @@ def remove_no_longer_online_files(self, online_files: Set[Path]) -> Set[Path]:
:param online_files: set, all the paths to the pdfs that were being downloaded and compared.
:return: Set[Path], Set of Paths that were removed
"""
all_files_in_mirror_dir: Set = set((self._root_dir).glob("**/*.*[!html]"))
_logger.info("Removing outdated files")
all_files_in_mirror_dir: Set = set(self._root_dir.glob("**/*.*[!html]"))
no_longer_online_files = all_files_in_mirror_dir.symmetric_difference(online_files)
for path in no_longer_online_files:
_logger.debug("Removing %s which has been removed online", path)
os.remove(path)

return no_longer_online_files

async def _download(self, epoch: Epoch, file_basename: str, link: str) -> Optional[Path]:
async def _download(
self, epoch: Epoch, file_basename: str, link: str, optional_success_msg: Optional[str] = None
) -> Optional[Path]:
try:
file_path = await self._download_and_save_pdf(epoch=epoch, file_basename=file_basename, link=link)
if optional_success_msg is not None:
_logger.debug(optional_success_msg)
except KeyError as key_error:
if key_error.args[0].lower() == "content-disposition":
_logger.exception("Failed to download '%s'", file_basename, exc_info=True)
Expand Down Expand Up @@ -326,10 +343,19 @@ async def mirror(self):
outfile.write(epoch_soup.prettify())
file_map = EdiEnergyScraper.get_epoch_file_map(epoch_soup)
download_tasks: list[Awaitable[Optional[Path]]] = []
file_counter = itertools.count()
for file_basename, link in file_map.items():
download_tasks.append(self._download(epoch, file_basename, link))
download_tasks.append(
self._download(
epoch,
file_basename,
link,
f"Successfully downloaded {epoch} file {next(file_counter)}/{len(file_map)}",
)
)
download_results: list[Optional[Path]] = await asyncio.gather(*download_tasks)
for download_result in download_results:
if download_result is not None:
new_file_paths.add(download_result)
self.remove_no_longer_online_files(new_file_paths)
_logger.info("Finished mirroring")
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ setenv = PYTHONPATH = {toxinidir}/src
commands =
coverage run -m pytest --basetemp={envtmpdir} {posargs}
coverage html --omit .tox/*,unittests/*
coverage report --fail-under 88 --omit .tox/*,unittests/*
coverage report --fail-under 86 --omit .tox/*,unittests/*


[testenv:dev]
Expand Down

0 comments on commit a80a0b9

Please sign in to comment.