Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up AsyncAdapter #277

Merged
merged 3 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions async_checklist.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
* [x] Ensure sqlite works in async
* [x] Fully support sync too. Using async should not be required
* [ ] Clean up miscellaneous TODOs
* [ ] Establish soundness for unsafe sections of AsyncAdapter
* [ ] Consider publishing `AsyncAdapter` into its own crate
* [x] Establish soundness for unsafe sections of AsyncAdapter
* [ ] Should async and/or async_adapter be under a separate feature?
* [ ] Integrate deadpool or bb8 for async connection pool
184 changes: 113 additions & 71 deletions butane_core/src/db/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::query::Order;
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;
use tokio::sync::oneshot;

enum Command {
Func(Box<dyn FnOnce() + Send>),
Expand All @@ -32,8 +31,7 @@ impl AsyncAdapterEnv {
match cmd {
Command::Func(func) => func(),
Command::Shutdown => {
// TODO should connection support an explicit close?
return;
return; // break out of the loop
}
}
}
Expand All @@ -44,32 +42,37 @@ impl AsyncAdapterEnv {
}
}

/// Invokes a blocking function `func` as if it were async. This
/// is implemented by running it on the special thread created when the `AsyncAdapterEnv` was created.
async fn invoke<'c, 's, 'result, F, T, U>(
&'s self,
context: &SyncSendPtrMut<T>,
func: F,
) -> Result<U>
// todo can this just be result
where
F: FnOnce(&'c T) -> Result<U> + Send,
F: 'result,
U: Send + 'result,
T: ?Sized + 'c, // TODO should this be Send
T: ?Sized + 'c,
's: 'result,
'c: 'result,
{
// todo parts of this can be shared with the other two invoke functions
let (tx, rx) = oneshot::channel();
let context_ptr = SendPtr::new(context.inner);
let func_taking_ptr = |ctx: SendPtr<T>| func(unsafe { ctx.inner.as_ref() }.unwrap());
let wrapped_func = move || _ = tx.send(func_taking_ptr(context_ptr));
let boxed_func: Box<dyn FnOnce() + Send + 'result> = Box::new(wrapped_func);
let static_func: Box<dyn FnOnce() + Send + 'static> =
unsafe { std::mem::transmute(boxed_func) };
self.sender.send(Command::Func(static_func))?;
// https://stackoverflow.com/questions/52424449/
// https://docs.rs/crossbeam/0.8.2/crossbeam/fn.scope.html
// TODO ensure soundness and document why
// func itself must be `Send`, but we do not require &T to be
// Send (and thus don't reuire T to be Sync). We do this by
// basically unsafely sending our raw context pointer over to
// the worker thread anyway. The key observation on why we
// believe this to be sound is that we actually created the
// context over on the worker thread in the first place (see
// [AsyncAdapter::new]) and we do not allow direct access to
// it. So despite fact that we pass the context pointer back
// and forth, it's essentially owned by the worker thread -- all operations
// with context occur on that worker thread.
let (tx, rx) = tokio::sync::oneshot::channel();
let func_taking_ptr = |ctx: SyncSendPtrMut<T>| func(unsafe { ctx.inner.as_ref() }.unwrap());
unsafe {
let wrapped_func = move || _ = tx.send(func_taking_ptr(context.clone_unsafe()));
self.invoke_internal_unsafe(wrapped_func)?;
}
rx.await?
}

Expand All @@ -82,21 +85,16 @@ impl AsyncAdapterEnv {
F: FnOnce(&'c mut T) -> Result<U> + Send,
F: 'result,
U: Send + 'result,
T: ?Sized + 'c, // TODO should this be Send
T: ?Sized + 'c,
's: 'result,
'c: 'result,
{
let (tx, rx) = oneshot::channel();
let context_ptr = SendPtrMut::new(context.inner);
let func_taking_ptr = |ctx: SendPtrMut<T>| func(unsafe { ctx.inner.as_mut().unwrap() });
let wrapped_func = move || _ = tx.send(func_taking_ptr(context_ptr));
let boxed_func: Box<dyn FnOnce() + Send + 'result> = Box::new(wrapped_func);
let static_func: Box<dyn FnOnce() + Send + 'static> =
unsafe { std::mem::transmute(boxed_func) };
self.sender.send(Command::Func(static_func))?;
// https://stackoverflow.com/questions/52424449/
// https://docs.rs/crossbeam/0.8.2/crossbeam/fn.scope.html
// TODO ensure soundness and document why
let (tx, rx) = tokio::sync::oneshot::channel();
let func_taking_ptr = |ctx: SyncSendPtrMut<T>| func(unsafe { ctx.inner.as_mut().unwrap() });
unsafe {
let wrapped_func = move || _ = tx.send(func_taking_ptr(context.clone_unsafe()));
self.invoke_internal_unsafe(wrapped_func)?;
}
rx.await?
}

Expand All @@ -110,71 +108,92 @@ impl AsyncAdapterEnv {
'c: 'result,
{
let (tx, rx) = crossbeam_channel::unbounded();
let context_ptr = SendPtr::new(context);
let context_ptr = unsafe { SendPtr::new(context) };
let func_taking_ptr = |ctx: SendPtr<T>| func(unsafe { ctx.inner.as_ref() }.unwrap());
let wrapped_func = move || _ = tx.send(func_taking_ptr(context_ptr));
unsafe {
let wrapped_func = move || _ = tx.send(func_taking_ptr(context_ptr));
self.invoke_internal_unsafe(wrapped_func)?;
}
rx.recv()?
}

unsafe fn invoke_internal_unsafe<'s, 'result>(
&'s self,
// wrapped_func is a complete encapsulation of the function we
// want to invoke without any parameters left to provide
wrapped_func: impl FnOnce() + Send + 'result,
) -> Result<()> {
// We transmute wrapped_func (with an intermediate boxing
// step) solely to transform it's lifetime. The lifetime is an
// issue here because Rust itself has no way of knowing how
// long our sync worker thread is going to use it for. But
// *we* know that our worker thread will immediately execute
// the function and the caller to this method will wait to
// hear from the worker thread before proceeding (and thus
// before letting the lifetime lapse)
// https://stackoverflow.com/questions/52424449/
let boxed_func: Box<dyn FnOnce() + Send + 'result> = Box::new(wrapped_func);
let static_func: Box<dyn FnOnce() + Send + 'static> =
unsafe { std::mem::transmute(boxed_func) };
self.sender.send(Command::Func(static_func))?;
// TODO ensure soundness and document why
rx.recv()?
Ok(())
}
}

impl Drop for AsyncAdapterEnv {
fn drop(&mut self) {
self.sender
.send(Command::Shutdown)
.expect("Cannot send async adapter env shutdown command, cannot join thread");
let r = self.sender.send(Command::Shutdown);
if r.is_err() {
// editorconfig-checker-disable
crate::error!("Cannot send async adapter env shutdown command because channel is disconnected.\
Assuming this means thread died and is joinable. If it is not, join may hang indefinitely");
// editorconfig-checker-enable
}
self.thread_handle.take().map(|h| h.join());
}
}

/// Wrapper around a raw pointer that we assert is [Send]. Needless to
/// say, this requires care. See comments on `AsyncAdapterEnv::invoke`
/// for why we believe this to be sound.
struct SendPtr<T: ?Sized> {
inner: *const T,
}
impl<T: ?Sized> SendPtr<T> {
fn new(inner: *const T) -> Self {
unsafe fn new(inner: *const T) -> Self {
Self { inner }
}
}
unsafe impl<T: ?Sized> Send for SendPtr<T> {}

struct SendPtrMut<T: ?Sized> {
inner: *mut T,
}
impl<T: ?Sized> SendPtrMut<T> {
fn new(inner: *mut T) -> Self {
Self { inner }
}
}
unsafe impl<T: ?Sized> Send for SendPtrMut<T> {}

/// Like [SendPtrMut] but we also assert that it is [Sync]
struct SyncSendPtrMut<T: ?Sized> {
inner: *mut T,
}
impl<T: ?Sized> SyncSendPtrMut<T> {
fn new(inner: *mut T) -> Self {
// todo should this be unsafe
unsafe fn new(inner: *mut T) -> Self {
Self { inner }
}
unsafe fn clone_unsafe(&self) -> Self {
Self { inner: self.inner }
}
}
impl<T> From<T> for SyncSendPtrMut<T>
where
T: Debug + Sized,
T: Sized,
{
fn from(val: T) -> Self {
Self {
inner: Box::into_raw(Box::new(val)),
} // todo should this be unsafe
}
}
}
unsafe impl<T: Debug + ?Sized> Send for SyncSendPtrMut<T> {}
unsafe impl<T: ?Sized> Send for SyncSendPtrMut<T> {}
unsafe impl<T: ?Sized> Sync for SyncSendPtrMut<T> {}

impl<T: Debug + ?Sized> Debug for SyncSendPtrMut<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
// We enforce that inner is non-null and valid.
unsafe { (*self.inner).fmt(f) }
}
}
Expand All @@ -186,8 +205,8 @@ pub(super) struct AsyncAdapter<T: ?Sized> {
}

impl<T: ?Sized> AsyncAdapter<T> {
//todo document what this is for
fn new_internal<U: ?Sized>(&self, context_ptr: SyncSendPtrMut<U>) -> AsyncAdapter<U> {
/// Create a new AsyncAdapter with the given context and using the same `env` as self. Not a public method.
fn create_with_same_env<U: ?Sized>(&self, context_ptr: SyncSendPtrMut<U>) -> AsyncAdapter<U> {
AsyncAdapter {
env: self.env.clone(),
context: context_ptr,
Expand All @@ -204,7 +223,6 @@ impl<T: ?Sized> AsyncAdapter<T> {
'c: 'result,
's: 'c,
{
// todo verify the interior mutability won't panic here
self.env.invoke(&self.context, func).await
}

Expand All @@ -215,7 +233,6 @@ impl<T: ?Sized> AsyncAdapter<T> {
U: Send + 'result,
'c: 'result,
{
// todo verify the interior mutability won't panic here
self.env.invoke_mut(&self.context, func).await
}

Expand All @@ -226,22 +243,31 @@ impl<T: ?Sized> AsyncAdapter<T> {
U: Send + 'result,
'c: 'result,
{
// todo verify the interior mutability won't panic here
self.env.invoke_blocking(self.context.inner, func)
}
}

impl<T> AsyncAdapter<T> {
/// Create a new async adapter using `create_context` to create an instance of the inner type `T`.
pub(super) fn new<F>(create_context: F) -> Result<Self>
where
Self: Sized,
F: FnOnce() -> Result<T> + Send,
{
// TODO execute the create context function on the thread
let context = create_context()?;
let env = AsyncAdapterEnv::new();

// Execute the context creation function on our worker thread.
let dummy = (); // because we have to pass a context pointer to env.invoke
let context = env.invoke_blocking(&dummy, |_ctx: &()| {
let concrete_context = create_context()?;
// See comments about soundness on AsyncAdapterEnv::invoke
let context = unsafe { SyncSendPtrMut::new(Box::into_raw(Box::new(concrete_context))) };
Ok(context)
})?;

Ok(Self {
env: Arc::new(AsyncAdapterEnv::new()),
context: SyncSendPtrMut::new(Box::into_raw(Box::new(context))),
env: Arc::new(env),
context,
})
}
}
Expand Down Expand Up @@ -348,27 +374,44 @@ where
.invoke_mut(|conn| {
let transaction: Transaction = conn.transaction()?;
let transaction_ptr: *mut dyn BackendTransaction = Box::into_raw(transaction.trans);
Ok(SyncSendPtrMut::new(transaction_ptr))
Ok(unsafe { SyncSendPtrMut::new(transaction_ptr) })
})
.await?;
let transaction_adapter = self.new_internal(transaction_ptr);
let transaction_adapter = self.create_with_same_env(transaction_ptr);
Ok(TransactionAsync::new(Box::new(transaction_adapter)))
}

fn backend(&self) -> Box<dyn Backend> {
// todo clean up unwrap
self.invoke_blocking(|conn| Ok(conn.backend())).unwrap()
ok_or_panic_with_adapter_error(self.invoke_blocking(|conn| Ok(conn.backend())))
}

fn backend_name(&self) -> &'static str {
// todo clean up unwrap
self.invoke_blocking(|conn| Ok(conn.backend_name()))
.unwrap()
ok_or_panic_with_adapter_error(self.invoke_blocking(|conn| Ok(conn.backend_name())))
}

/// Tests if the connection has been closed. Backends which do not
/// support this check should return false.
fn is_closed(&self) -> bool {
// todo clean up unwrap
self.invoke_blocking(|conn| Ok(conn.is_closed())).unwrap()
ok_or_panic_with_adapter_error(self.invoke_blocking(|conn| Ok(conn.is_closed())))
}
}

fn ok_or_panic_with_adapter_error<T>(r: Result<T>) -> T {
match r {
Ok(ret) => ret,
// This is unfortunate, but should be rare. We never use it
// when invoking functions that can fail in their own right,
// so it indicates that the channel operation failed, which
// should only be possible if the other thread died
// unexpectedly.
Err(e) => panic!(
// editorconfig-checker-disable
"Internal error occurred within the sync->async adapter invoked when wrapping a function\
which does not permit error returns.\n\
Error: {}",
// editorconfig-checker-enable
e
)
}
}

Expand Down Expand Up @@ -401,7 +444,6 @@ where

/// Create an async connection using the synchronous `connect` method of `backend`. Use this when authoring
/// a backend which doesn't natively support async.
#[cfg(feature = "sqlite")] // todo expose this publicly for out-of-tree backends
pub async fn connect_async_via_sync<B>(backend: &B, conn_str: &str) -> Result<ConnectionAsync>
where
B: Backend + Clone + 'static,
Expand Down
2 changes: 1 addition & 1 deletion butane_core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::query::{BoolExpr, Order};
use crate::{migrations::adb, Error, Result, SqlVal, SqlValRef};

mod adapter;
pub use adapter::connect_async_via_sync;
pub(crate) mod dummy;
use dummy::DummyConnection;
mod sync_adapter;
Expand Down Expand Up @@ -81,7 +82,6 @@ mod internal {
/// Database connection.
#[maybe_async_cfg::maybe(
idents(
AsyncRequiresSend,
ConnectionMethods(sync = "ConnectionMethods", async = "ConnectionMethodsAsync"),
Transaction(sync = "Transaction", async = "TransactionAsync"),
),
Expand Down
9 changes: 9 additions & 0 deletions butane_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ impl std::fmt::Display for SqlType {
#[cfg(feature = "log")]
pub use log::debug;
#[cfg(feature = "log")]
pub use log::error;
#[cfg(feature = "log")]
pub use log::info;
#[cfg(feature = "log")]
pub use log::warn;
Expand Down Expand Up @@ -411,4 +413,11 @@ mod btlog {
(target: $target:expr, $($arg:tt)+) => {};
($($arg:tt)+) => {};
}

/// Noop for when feature log is not enabled.
#[macro_export]
macro_rules! error {
(target: $target:expr, $($arg:tt)+) => {};
($($arg:tt)+) => {};
}
}
Loading