Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FastAPI async endpoint: which s3fs client should I use? Needed example #907

Open
LuchiLucs opened this issue Oct 24, 2024 · 4 comments
Open

Comments

@LuchiLucs
Copy link

If I wrap the s3fs client inside a DAO class in order to create an interface to add features with the remote fs, which s3fs client should I use?

For instance, assuming I create an async client, because I wish to use either example_one or example_two "coroutines/methods" in FastAPI async endpoints, which example should I use?

class S3Manager:
    def __init__(self, bucket_name: str = None):
        self.bucket_name = bucket_name
        self.s3_fs = s3fs.S3FileSystem(
            anon=False,
            asynchronous=True,
        )
        
    async def example_one(self):
        session = await self.s3_fs.set_session()
        work = await self.s3_fs._glob(...)
        await session.close()
        return work
        
    def example_two(self):
        return s3_fs.glob(...)

Since FastAPI handles its own loop to manage async coroutines, I believe the best approach would be to create the async client, leveraging the same FastAPI loop somehow: e.g. delegating the management to it. But:

  1. the async client and example seems cumbersome: it seems like the s3fs library wishes to user to exploit the sync interface/client which itself uses the async support behind.
  2. is there a performance gap by using the sync blocking client by calling its methods in a async coroutine (fastapi)?
  3. how should I setup a complete example using FastAPI own loop?
@martindurant
Copy link
Member

FastAPI handles its own loop to manage async coroutines, I believe the best approach would be to create the async client

This is fair. It is slightly more complicated, but will allow you not to block fastAPI's event loop waiting for sync, which I htink is the main benefit you're after.

Note that if you use async mode, you should create your filesystem within a coroutine, not init (unless it is itself called from within a coroutine).

it seems like the s3fs library wishes to user to exploit the sync interface/client which itself uses the async support

I would say that async mode is more "expert" functionality. But if you are doing async programming with fastAPI already, then you qualify :)

s there a performance gap by using the sync blocking client

You will block at every call to s3fs. That might be a problem in some situations, it depends on your use case.

    async def example_one(self):
        session = await self.s3_fs.set_session()
        work = await self.s3_fs._glob(...)
        await session.close()
        return work

I think this should work fine without the explicit session calls, except you might get a warning when your process finally exits.

My suggestion is, that you should have a method which sets up and caches the filesystem object

    async def _fs():
        if self.s3_fs is None:
             self.s3_fs = S3FileSystem(asynchronous=True)
             await self.s3_fs.set_session() # store this, if you want to make a cleanup method
        return self.s3_fs

@LuchiLucs
Copy link
Author

I created this simple DAO class in order to interface the s3 fs coroutines and its cached fs. What do you think?
Also the documention says to manage the sessions, but I am not sure why is that necessary. Anyway, does that context manager do the job? Should I use the sessions for within each awaited coroutine or for each async defined couritine (for instance a coroutine which await 2 inner coroutines)?

import s3fs
from contextlib import asynccontextmanager
from src.utils.logger import get_logger

logger = get_logger(__name__)

@asynccontextmanager
async def session_manager(fs: s3fs.S3FileSystem):
    """ Context manager for async AWS S3 boto sessions"""
    session = await fs.set_session()
    try:
        yield session
    finally:
        await session.close()

class S3FileSystem:
    """Class to handle the s3 resource once created a default session"""

    def __init__(self, bucket_name: str = None):
        self.bucket_name = bucket_name
        self.fs = s3fs.S3FileSystem(
            anon=False,
            asynchronous=True,
        )

    async def async_list_objects(self, prefix: str = None, **kwargs):
        if not prefix:
            prefix = ""
        async with session_manager(self.fs):
            objects = await self.fs._ls(path=f"{self.bucket_name}/{prefix}", **kwargs)
        return objects

@martindurant
Copy link
Member

No, I would not create and destroy a session on every call, you will find this expensive. I think the (async) method I suggested which stores the filesystem just once is better.

Also the documention says to manage the sessions, but I am not sure why is that necessary.

The code has evolved over time to make this less necessary. It's still useful for you to have control over when the session is made and closed, but the default behaviour may well be fine.

@LuchiLucs
Copy link
Author

@martindurant Do you mind proving a complete self-contained example on how you could define such a class in order to use its async coroutines in another asyn courines (i.e. FastAPI async endpoints) by means of managing the boto3 sessions/client sessions w.r.t. aiobotocore latest versions (i.e. what does the library suggest to exploit)?

Thank you. I am a bit busy these days so I will come back as soon as possible...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants