diff --git a/async_checklist.md b/async_checklist.md index 08abb24..6f37645 100644 --- a/async_checklist.md +++ b/async_checklist.md @@ -7,7 +7,6 @@ * [x] Ensure sqlite works in async * [x] Fully support sync too. Using async should not be required * [ ] Clean up miscellaneous TODOs -* [ ] Establish soundness for unsafe sections of AsyncAdapter -* [ ] Consider publishing `AsyncAdapter` into its own crate +* [x] Establish soundness for unsafe sections of AsyncAdapter * [ ] Should async and/or async_adapter be under a separate feature? * [ ] Integrate deadpool or bb8 for async connection pool diff --git a/butane_core/src/db/adapter.rs b/butane_core/src/db/adapter.rs index 6108c60..3cb7e8e 100644 --- a/butane_core/src/db/adapter.rs +++ b/butane_core/src/db/adapter.rs @@ -9,7 +9,6 @@ use crate::query::Order; use std::sync::Arc; use std::thread; use std::thread::JoinHandle; -use tokio::sync::oneshot; enum Command { Func(Box), @@ -32,8 +31,7 @@ impl AsyncAdapterEnv { match cmd { Command::Func(func) => func(), Command::Shutdown => { - // TODO should connection support an explicit close? - return; + return; // break out of the loop } } } @@ -44,32 +42,37 @@ impl AsyncAdapterEnv { } } + /// Invokes a blocking function `func` as if it were async. This + /// is implemented by running it on the special thread created when the `AsyncAdapterEnv` was created. async fn invoke<'c, 's, 'result, F, T, U>( &'s self, context: &SyncSendPtrMut, func: F, ) -> Result - // todo can this just be result where F: FnOnce(&'c T) -> Result + Send, F: 'result, U: Send + 'result, - T: ?Sized + 'c, // TODO should this be Send + T: ?Sized + 'c, 's: 'result, 'c: 'result, { - // todo parts of this can be shared with the other two invoke functions - let (tx, rx) = oneshot::channel(); - let context_ptr = SendPtr::new(context.inner); - let func_taking_ptr = |ctx: SendPtr| func(unsafe { ctx.inner.as_ref() }.unwrap()); - let wrapped_func = move || _ = tx.send(func_taking_ptr(context_ptr)); - let boxed_func: Box = Box::new(wrapped_func); - let static_func: Box = - unsafe { std::mem::transmute(boxed_func) }; - self.sender.send(Command::Func(static_func))?; - // https://stackoverflow.com/questions/52424449/ - // https://docs.rs/crossbeam/0.8.2/crossbeam/fn.scope.html - // TODO ensure soundness and document why + // func itself must be `Send`, but we do not require &T to be + // Send (and thus don't reuire T to be Sync). We do this by + // basically unsafely sending our raw context pointer over to + // the worker thread anyway. The key observation on why we + // believe this to be sound is that we actually created the + // context over on the worker thread in the first place (see + // [AsyncAdapter::new]) and we do not allow direct access to + // it. So despite fact that we pass the context pointer back + // and forth, it's essentially owned by the worker thread -- all operations + // with context occur on that worker thread. + let (tx, rx) = tokio::sync::oneshot::channel(); + let func_taking_ptr = |ctx: SyncSendPtrMut| func(unsafe { ctx.inner.as_ref() }.unwrap()); + unsafe { + let wrapped_func = move || _ = tx.send(func_taking_ptr(context.clone_unsafe())); + self.invoke_internal_unsafe(wrapped_func)?; + } rx.await? } @@ -82,21 +85,16 @@ impl AsyncAdapterEnv { F: FnOnce(&'c mut T) -> Result + Send, F: 'result, U: Send + 'result, - T: ?Sized + 'c, // TODO should this be Send + T: ?Sized + 'c, 's: 'result, 'c: 'result, { - let (tx, rx) = oneshot::channel(); - let context_ptr = SendPtrMut::new(context.inner); - let func_taking_ptr = |ctx: SendPtrMut| func(unsafe { ctx.inner.as_mut().unwrap() }); - let wrapped_func = move || _ = tx.send(func_taking_ptr(context_ptr)); - let boxed_func: Box = Box::new(wrapped_func); - let static_func: Box = - unsafe { std::mem::transmute(boxed_func) }; - self.sender.send(Command::Func(static_func))?; - // https://stackoverflow.com/questions/52424449/ - // https://docs.rs/crossbeam/0.8.2/crossbeam/fn.scope.html - // TODO ensure soundness and document why + let (tx, rx) = tokio::sync::oneshot::channel(); + let func_taking_ptr = |ctx: SyncSendPtrMut| func(unsafe { ctx.inner.as_mut().unwrap() }); + unsafe { + let wrapped_func = move || _ = tx.send(func_taking_ptr(context.clone_unsafe())); + self.invoke_internal_unsafe(wrapped_func)?; + } rx.await? } @@ -110,71 +108,92 @@ impl AsyncAdapterEnv { 'c: 'result, { let (tx, rx) = crossbeam_channel::unbounded(); - let context_ptr = SendPtr::new(context); + let context_ptr = unsafe { SendPtr::new(context) }; let func_taking_ptr = |ctx: SendPtr| func(unsafe { ctx.inner.as_ref() }.unwrap()); - let wrapped_func = move || _ = tx.send(func_taking_ptr(context_ptr)); + unsafe { + let wrapped_func = move || _ = tx.send(func_taking_ptr(context_ptr)); + self.invoke_internal_unsafe(wrapped_func)?; + } + rx.recv()? + } + + unsafe fn invoke_internal_unsafe<'s, 'result>( + &'s self, + // wrapped_func is a complete encapsulation of the function we + // want to invoke without any parameters left to provide + wrapped_func: impl FnOnce() + Send + 'result, + ) -> Result<()> { + // We transmute wrapped_func (with an intermediate boxing + // step) solely to transform it's lifetime. The lifetime is an + // issue here because Rust itself has no way of knowing how + // long our sync worker thread is going to use it for. But + // *we* know that our worker thread will immediately execute + // the function and the caller to this method will wait to + // hear from the worker thread before proceeding (and thus + // before letting the lifetime lapse) + // https://stackoverflow.com/questions/52424449/ let boxed_func: Box = Box::new(wrapped_func); let static_func: Box = unsafe { std::mem::transmute(boxed_func) }; self.sender.send(Command::Func(static_func))?; - // TODO ensure soundness and document why - rx.recv()? + Ok(()) } } impl Drop for AsyncAdapterEnv { fn drop(&mut self) { - self.sender - .send(Command::Shutdown) - .expect("Cannot send async adapter env shutdown command, cannot join thread"); + let r = self.sender.send(Command::Shutdown); + if r.is_err() { + // editorconfig-checker-disable + crate::error!("Cannot send async adapter env shutdown command because channel is disconnected.\ + Assuming this means thread died and is joinable. If it is not, join may hang indefinitely"); + // editorconfig-checker-enable + } self.thread_handle.take().map(|h| h.join()); } } +/// Wrapper around a raw pointer that we assert is [Send]. Needless to +/// say, this requires care. See comments on `AsyncAdapterEnv::invoke` +/// for why we believe this to be sound. struct SendPtr { inner: *const T, } impl SendPtr { - fn new(inner: *const T) -> Self { + unsafe fn new(inner: *const T) -> Self { Self { inner } } } unsafe impl Send for SendPtr {} -struct SendPtrMut { - inner: *mut T, -} -impl SendPtrMut { - fn new(inner: *mut T) -> Self { - Self { inner } - } -} -unsafe impl Send for SendPtrMut {} - +/// Like [SendPtrMut] but we also assert that it is [Sync] struct SyncSendPtrMut { inner: *mut T, } impl SyncSendPtrMut { - fn new(inner: *mut T) -> Self { - // todo should this be unsafe + unsafe fn new(inner: *mut T) -> Self { Self { inner } } + unsafe fn clone_unsafe(&self) -> Self { + Self { inner: self.inner } + } } impl From for SyncSendPtrMut where - T: Debug + Sized, + T: Sized, { fn from(val: T) -> Self { Self { inner: Box::into_raw(Box::new(val)), - } // todo should this be unsafe + } } } -unsafe impl Send for SyncSendPtrMut {} +unsafe impl Send for SyncSendPtrMut {} unsafe impl Sync for SyncSendPtrMut {} impl Debug for SyncSendPtrMut { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { + // We enforce that inner is non-null and valid. unsafe { (*self.inner).fmt(f) } } } @@ -186,8 +205,8 @@ pub(super) struct AsyncAdapter { } impl AsyncAdapter { - //todo document what this is for - fn new_internal(&self, context_ptr: SyncSendPtrMut) -> AsyncAdapter { + /// Create a new AsyncAdapter with the given context and using the same `env` as self. Not a public method. + fn create_with_same_env(&self, context_ptr: SyncSendPtrMut) -> AsyncAdapter { AsyncAdapter { env: self.env.clone(), context: context_ptr, @@ -204,7 +223,6 @@ impl AsyncAdapter { 'c: 'result, 's: 'c, { - // todo verify the interior mutability won't panic here self.env.invoke(&self.context, func).await } @@ -215,7 +233,6 @@ impl AsyncAdapter { U: Send + 'result, 'c: 'result, { - // todo verify the interior mutability won't panic here self.env.invoke_mut(&self.context, func).await } @@ -226,22 +243,31 @@ impl AsyncAdapter { U: Send + 'result, 'c: 'result, { - // todo verify the interior mutability won't panic here self.env.invoke_blocking(self.context.inner, func) } } impl AsyncAdapter { + /// Create a new async adapter using `create_context` to create an instance of the inner type `T`. pub(super) fn new(create_context: F) -> Result where Self: Sized, F: FnOnce() -> Result + Send, { - // TODO execute the create context function on the thread - let context = create_context()?; + let env = AsyncAdapterEnv::new(); + + // Execute the context creation function on our worker thread. + let dummy = (); // because we have to pass a context pointer to env.invoke + let context = env.invoke_blocking(&dummy, |_ctx: &()| { + let concrete_context = create_context()?; + // See comments about soundness on AsyncAdapterEnv::invoke + let context = unsafe { SyncSendPtrMut::new(Box::into_raw(Box::new(concrete_context))) }; + Ok(context) + })?; + Ok(Self { - env: Arc::new(AsyncAdapterEnv::new()), - context: SyncSendPtrMut::new(Box::into_raw(Box::new(context))), + env: Arc::new(env), + context, }) } } @@ -348,27 +374,44 @@ where .invoke_mut(|conn| { let transaction: Transaction = conn.transaction()?; let transaction_ptr: *mut dyn BackendTransaction = Box::into_raw(transaction.trans); - Ok(SyncSendPtrMut::new(transaction_ptr)) + Ok(unsafe { SyncSendPtrMut::new(transaction_ptr) }) }) .await?; - let transaction_adapter = self.new_internal(transaction_ptr); + let transaction_adapter = self.create_with_same_env(transaction_ptr); Ok(TransactionAsync::new(Box::new(transaction_adapter))) } fn backend(&self) -> Box { - // todo clean up unwrap - self.invoke_blocking(|conn| Ok(conn.backend())).unwrap() + ok_or_panic_with_adapter_error(self.invoke_blocking(|conn| Ok(conn.backend()))) } + fn backend_name(&self) -> &'static str { - // todo clean up unwrap - self.invoke_blocking(|conn| Ok(conn.backend_name())) - .unwrap() + ok_or_panic_with_adapter_error(self.invoke_blocking(|conn| Ok(conn.backend_name()))) } + /// Tests if the connection has been closed. Backends which do not /// support this check should return false. fn is_closed(&self) -> bool { - // todo clean up unwrap - self.invoke_blocking(|conn| Ok(conn.is_closed())).unwrap() + ok_or_panic_with_adapter_error(self.invoke_blocking(|conn| Ok(conn.is_closed()))) + } +} + +fn ok_or_panic_with_adapter_error(r: Result) -> T { + match r { + Ok(ret) => ret, + // This is unfortunate, but should be rare. We never use it + // when invoking functions that can fail in their own right, + // so it indicates that the channel operation failed, which + // should only be possible if the other thread died + // unexpectedly. + Err(e) => panic!( + // editorconfig-checker-disable + "Internal error occurred within the sync->async adapter invoked when wrapping a function\ + which does not permit error returns.\n\ + Error: {}", + // editorconfig-checker-enable + e + ) } } @@ -401,7 +444,6 @@ where /// Create an async connection using the synchronous `connect` method of `backend`. Use this when authoring /// a backend which doesn't natively support async. -#[cfg(feature = "sqlite")] // todo expose this publicly for out-of-tree backends pub async fn connect_async_via_sync(backend: &B, conn_str: &str) -> Result where B: Backend + Clone + 'static, diff --git a/butane_core/src/db/mod.rs b/butane_core/src/db/mod.rs index 356d9a8..40efd14 100644 --- a/butane_core/src/db/mod.rs +++ b/butane_core/src/db/mod.rs @@ -29,6 +29,7 @@ use crate::query::{BoolExpr, Order}; use crate::{migrations::adb, Error, Result, SqlVal, SqlValRef}; mod adapter; +pub use adapter::connect_async_via_sync; pub(crate) mod dummy; use dummy::DummyConnection; mod sync_adapter; @@ -81,7 +82,6 @@ mod internal { /// Database connection. #[maybe_async_cfg::maybe( idents( - AsyncRequiresSend, ConnectionMethods(sync = "ConnectionMethods", async = "ConnectionMethodsAsync"), Transaction(sync = "Transaction", async = "TransactionAsync"), ), diff --git a/butane_core/src/lib.rs b/butane_core/src/lib.rs index c4bbeab..6573dc1 100644 --- a/butane_core/src/lib.rs +++ b/butane_core/src/lib.rs @@ -383,6 +383,8 @@ impl std::fmt::Display for SqlType { #[cfg(feature = "log")] pub use log::debug; #[cfg(feature = "log")] +pub use log::error; +#[cfg(feature = "log")] pub use log::info; #[cfg(feature = "log")] pub use log::warn; @@ -411,4 +413,11 @@ mod btlog { (target: $target:expr, $($arg:tt)+) => {}; ($($arg:tt)+) => {}; } + + /// Noop for when feature log is not enabled. + #[macro_export] + macro_rules! error { + (target: $target:expr, $($arg:tt)+) => {}; + ($($arg:tt)+) => {}; + } }