From 1e8415fcae2d37baa34770d5e5810ebfd424276e Mon Sep 17 00:00:00 2001 From: Tianjing Li Date: Mon, 11 Dec 2023 15:15:16 -0500 Subject: [PATCH] Calculate total cache size in bytes instead of length --- dropbox/provider/unstructured.py | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/dropbox/provider/unstructured.py b/dropbox/provider/unstructured.py index 186582089..3b7bf6ca7 100644 --- a/dropbox/provider/unstructured.py +++ b/dropbox/provider/unstructured.py @@ -1,12 +1,15 @@ +import sys import asyncio import aiohttp import logging +import functools from collections import OrderedDict from flask import current_app as app logger = logging.getLogger(__name__) -CACHE_SIZE = 256 +CACHE_LIMIT_BYTES = 20 * 1024 * 1024 # 20 MB to bytes +TIMEOUT_SECONDS = 20 unstructured = None @@ -14,19 +17,29 @@ class UnstructuredRequestSession: def __init__(self, unstructured_base_url, api_key): self.get_content_url = f"{unstructured_base_url}/general/v0/general" - self.headers = {"unstructured-api-key": api_key} + self.api_key = api_key # Manually cache because functools.lru_cache does not support async methods self.cache = OrderedDict() self.start_session() def start_session(self): self.loop = asyncio.new_event_loop() - self.session = aiohttp.ClientSession(loop=self.loop) + # Create ClientTimeout object to apply timeout for every request in the session + client_timeout = aiohttp.ClientTimeout(total=TIMEOUT_SECONDS) + self.session = aiohttp.ClientSession(loop=self.loop, timeout=client_timeout) def close_loop(self): self.loop.stop() self.loop.close() + def cache_size(self): + # Calculate the total size of values in bytes + total_size_bytes = functools.reduce( + lambda a, b: a + b, map(lambda v: sys.getsizeof(v), self.cache.values()), 0 + ) + + return total_size_bytes + def cache_get(self, key): self.cache.move_to_end(key) @@ -35,7 +48,7 @@ def cache_get(self, key): def cache_put(self, key, item): self.cache[key] = item - if len(self.cache) > CACHE_SIZE: + while self.cache_size() > CACHE_LIMIT_BYTES: self.cache.popitem() async def close_session(self): @@ -53,9 +66,12 @@ async def get_unstructured_content(self, file): data = aiohttp.FormData() data.add_field("files", file_data, filename=file_name) + # API key optional if self-hosted + headers = {} if self.api_key is None else {"unstructured-api-key": self.api_key} + async with self.session.post( self.get_content_url, - headers=self.headers, + headers=headers, data=data, ) as response: content = await response.json() @@ -91,13 +107,10 @@ def get_unstructured_client(): if unstructured is not None: return unstructured - # Fetch environment variables assert ( unstructured_base_url := app.config.get("UNSTRUCTURED_BASE_URL") ), "DROPBOX_UNSTRUCTURED_BASE_URL must be set" - assert ( - api_key := app.config.get("UNSTRUCTURED_API_KEY") - ), "DROPBOX_UNSTRUCTURED_API_KEY must be set" + api_key = app.config.get("UNSTRUCTURED_API_KEY", None) unstructured = UnstructuredRequestSession(unstructured_base_url, api_key)