From fca9bf5abd6beda0effd05520ceac740307e44d2 Mon Sep 17 00:00:00 2001 From: James Oakley Date: Sat, 7 Dec 2024 20:40:33 -0500 Subject: [PATCH] Add deadpool support (#284) Add deadpool support and refactor r2d2 support for sharing. --- Cargo.lock | 29 ++++++++- Cargo.toml | 1 + README.md | 11 +++- async_checklist.md | 2 +- butane/Cargo.toml | 10 ++-- butane/src/db/deadpool.rs | 21 +++++++ butane/src/db/mod.rs | 37 ++++++++++++ butane/src/db/r2.rs | 23 ++++++++ butane/src/lib.rs | 5 +- butane/tests/{r2d2.rs => pool.rs} | 29 ++++++--- butane_core/Cargo.toml | 1 - butane_core/src/db/mod.rs | 3 - butane_core/src/db/r2.rs | 97 ------------------------------- 13 files changed, 149 insertions(+), 120 deletions(-) create mode 100644 butane/src/db/deadpool.rs create mode 100644 butane/src/db/mod.rs create mode 100644 butane/src/db/r2.rs rename butane/tests/{r2d2.rs => pool.rs} (64%) delete mode 100644 butane_core/src/db/r2.rs diff --git a/Cargo.lock b/Cargo.lock index a4ddd02..be74108 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -261,6 +261,7 @@ dependencies = [ "butane_test_macros", "cfg-if", "chrono", + "deadpool", "env_logger", "fake", "geo-types", @@ -343,7 +344,6 @@ dependencies = [ "postgres-native-tls", "proc-macro2", "quote", - "r2d2", "rand", "regex", "rusqlite", @@ -626,6 +626,23 @@ dependencies = [ "syn 2.0.89", ] +[[package]] +name = "deadpool" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6541a3916932fe57768d4be0b1ffb5ec7cbf74ca8c903fdfd5c0fe8aa958f0ed" +dependencies = [ + "deadpool-runtime", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" + [[package]] name = "deunicode" version = "1.6.0" @@ -1236,6 +1253,16 @@ dependencies = [ "libm", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "object" version = "0.36.5" diff --git a/Cargo.toml b/Cargo.toml index 2df2ea7..2e0b875 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ chrono = { version = "0.4.25", default-features = false, features = [ "std", ] } crossbeam-channel = "0.5" +deadpool = "0.12" env_logger = "0.11" fake = "2.6" log = "0.4" diff --git a/README.md b/README.md index b9f733e..efb5bae 100644 --- a/README.md +++ b/README.md @@ -72,12 +72,13 @@ enabled: you will want to enable `sqlite` and/or `pg`: * `async`: Turns on async support. This is automatically enabled for the `pg` backend, which is implemented on the `tokio-postgres` crate. * `async-adapter`: Enables the use of `async` with the `sqlite` backend, which is not natively async. * `debug`: Used in developing Butane, not expected to be enabled by consumers. +* `deadpool`: Connection pooling using [`deadpool`](https://crates.io/crates/deadpool). * `datetime`: Support for timestamps (using [`chrono`](https://crates.io/crates/chrono) crate). * `fake`: Support for the [`fake`](https://crates.io/crates/fake) crate's generation of fake data. * `json`: Support for storing structs as JSON, including using postgres' `JSONB` field type. * `log`: Log certain warnings to the [`log`](https://crates.io/crates/log) crate facade (target "butane"). * `pg`: Support for PostgreSQL using [`postgres`](https://crates.io/crates/postgres) crate. -* `r2d2`: Connection pooling using [`r2d2`](https://crates.io/crates/r2d2) support +* `r2d2`: Connection pooling using [`r2d2`](https://crates.io/crates/r2d2). (See `butane::db::ConnectionManager`). * `sqlite`: Support for SQLite using [`rusqlite`](https://crates.io/crates/rusqlite) crate. * `sqlite-bundled`: Bundles sqlite instead of using the system version. @@ -98,7 +99,7 @@ enabled: you will want to enable `sqlite` and/or `pg`: ## Migration of Breaking Changes ### 0.8 (not yet released) - +#### Async This is a major release which adds Async support. Effort has been made to keep the sync experience as unchanged as possible. Async versions of many types have been added, but the sync ones generally retain @@ -125,6 +126,12 @@ The Ops traits are: * `ForeignKeyOpsSync` / `ForeignKeyOpsAsync` (for use with [`ForeignKey`](https://docs.rs/butane/latest/butane/struct.ForeignKey.html)) * `ManyOpsSync` / `ManyOpsAsync` (for use with [`Many`](https://docs.rs/butane/latest/butane/struct.Many.html)) +#### ConnectionManager +The `ConnectionManager` struct has moved from `butane::db::r2` to +`butane::db`. It no longer implements `ConnectionMethods` as this was +unnecessary due to `Deref`. The `butane::db::r2` module is no longer +public. + ### 0.7 #### `AutoPk` Replace model fields like diff --git a/async_checklist.md b/async_checklist.md index 0e9d5ef..664d48c 100644 --- a/async_checklist.md +++ b/async_checklist.md @@ -9,4 +9,4 @@ * [ ] Clean up miscellaneous TODOs * [x] Establish soundness for unsafe sections of AsyncAdapter * [x] Should async and/or async_adapter be under a separate feature? -* [ ] Integrate deadpool or bb8 for async connection pool +* [x] Integrate deadpool or bb8 for async connection pool diff --git a/butane/Cargo.toml b/butane/Cargo.toml index 0b92711..e41635e 100644 --- a/butane/Cargo.toml +++ b/butane/Cargo.toml @@ -15,6 +15,7 @@ build = "build.rs" [features] async = ["butane_core/async", "butane_codegen/async"] async-adapter = ["butane_core/async-adapter"] +deadpool = ["dep:deadpool", "async"] default = ["datetime", "json", "uuid"] fake = ["butane_core/fake"] json = ["butane_codegen/json", "butane_core/json"] @@ -24,13 +25,15 @@ pg = ["async", "butane_core/pg"] datetime = ["butane_codegen/datetime", "butane_core/datetime"] debug = ["butane_core/debug"] log = ["butane_core/log"] -r2d2 = ["butane_core/r2d2"] +r2d2 = ["dep:r2d2"] tls = ["butane_core/tls"] uuid = ["butane_codegen/uuid", "butane_core/uuid"] [dependencies] butane_codegen = { workspace = true } butane_core = { workspace = true } +r2d2 = { optional = true, workspace = true } +deadpool = { optional = true, workspace = true } [dev-dependencies] butane_test_helper = { workspace = true } @@ -51,7 +54,6 @@ tokio = { workspace = true, features = ["macros"] } tokio-postgres = { features = ["with-geo-types-0_7"], workspace = true } tokio-test = { workspace = true } rand = { workspace = true } -r2d2_for_test = { package = "r2d2", version = "0.8" } rusqlite = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } @@ -74,8 +76,8 @@ name = "json" required-features = ["json"] [[test]] -name = "r2d2" -required-features = ["r2d2"] +name = "pool" +required-features = ["r2d2", "deadpool"] [[test]] name = "uuid" diff --git a/butane/src/db/deadpool.rs b/butane/src/db/deadpool.rs new file mode 100644 index 0000000..0d0a822 --- /dev/null +++ b/butane/src/db/deadpool.rs @@ -0,0 +1,21 @@ +//! Deadpool support for Butane. +use super::ConnectionManager; +use crate::db::{BackendConnectionAsync, ConnectionAsync}; +use crate::Result; +use deadpool::managed::{Manager, Metrics, RecycleError, RecycleResult}; + +impl Manager for ConnectionManager { + type Type = ConnectionAsync; + type Error = crate::Error; + + async fn create(&self) -> Result { + crate::db::connect_async(&self.spec).await + } + + async fn recycle(&self, conn: &mut ConnectionAsync, _: &Metrics) -> RecycleResult { + if conn.is_closed() { + return Err(RecycleError::message("Connection is closed")); + } + Ok(()) + } +} diff --git a/butane/src/db/mod.rs b/butane/src/db/mod.rs new file mode 100644 index 0000000..bba983b --- /dev/null +++ b/butane/src/db/mod.rs @@ -0,0 +1,37 @@ +//! Types, traits, and methods for interacting with a database. +//! +//! The different ways of referring to a database handle may present +//! some initial confusion. +//! * `ConnectionMethods` is a trait containing the methods available on a database connection or a transaction. +//! Most methods on a [DataObject][crate::DataObject] or a [Query][crate::query::Query] require an +//! implementation of `ConnectionMethods`. +//! * `BackendConnection` is a trait representing a direct connection to a database backend. It is a superset +//! of `ConnectionMethods` and also includes the ability to create a transaction. +//! * `Transaction` is a struct representing a database transaction. It implements `ConnectionMethods`. +//! * `Connection` is a convenience struct containing a boxed `BackendConnection`. It cannot do anything other than +//! what a `BackendConnection` can do, but allows using a single concrete type that is not tied to a particular +//! database backend. It is returned by the `connect` method. + +pub use butane_core::db::*; + +#[cfg(feature = "r2d2")] +mod r2; + +#[cfg(feature = "deadpool")] +mod deadpool; + +/// Connection manager used with connection pooling systems such as r2d2 or deadpool. +/// With the `r2d2` feature enabled, it implements `r2d2::ManageConnection`. +/// With the `deadpool` feature enabled, it implements `deadpool::managed::Manager`. +#[cfg(any(feature = "deadpool", feature = "r2d2"))] +#[derive(Clone, Debug)] +pub struct ConnectionManager { + spec: ConnectionSpec, +} +#[cfg(any(feature = "deadpool", feature = "r2d2"))] +impl ConnectionManager { + /// Create a new ConnectionManager from a [ConnectionSpec]. + pub fn new(spec: ConnectionSpec) -> Self { + ConnectionManager { spec } + } +} diff --git a/butane/src/db/r2.rs b/butane/src/db/r2.rs new file mode 100644 index 0000000..7ba2b49 --- /dev/null +++ b/butane/src/db/r2.rs @@ -0,0 +1,23 @@ +//! R2D2 support for Butane. +use r2d2::ManageConnection; + +use super::ConnectionManager; +use crate::db::{BackendConnection, Connection}; +use crate::Result; + +impl ManageConnection for ConnectionManager { + type Connection = Connection; + type Error = crate::Error; + + fn connect(&self) -> Result { + crate::db::connect(&self.spec) + } + + fn is_valid(&self, conn: &mut Connection) -> Result<()> { + conn.execute("SELECT 1") + } + + fn has_broken(&self, conn: &mut Connection) -> bool { + conn.is_closed() + } +} diff --git a/butane/src/lib.rs b/butane/src/lib.rs index 5c1ed7b..1833a91 100644 --- a/butane/src/lib.rs +++ b/butane/src/lib.rs @@ -19,10 +19,7 @@ pub use butane_core::{ PrimaryKeyType, Result, SqlType, SqlVal, SqlValRef, ToSql, }; -pub mod db { - //! Database helpers - pub use butane_core::db::*; -} +pub mod db; /// Macro to construct a [`BoolExpr`] (for use with a [`Query`]) from /// an expression with Rust syntax. diff --git a/butane/tests/r2d2.rs b/butane/tests/pool.rs similarity index 64% rename from butane/tests/r2d2.rs rename to butane/tests/pool.rs index b2b91ca..7164630 100644 --- a/butane/tests/r2d2.rs +++ b/butane/tests/pool.rs @@ -1,13 +1,10 @@ #[cfg(any(feature = "pg", feature = "sqlite"))] -use butane::db::r2::ConnectionManager; -#[cfg(feature = "pg")] -use butane_test_helper::pg_connspec; +use butane::db::ConnectionManager; +use butane_test_helper::*; #[cfg(any(feature = "pg", feature = "sqlite"))] -use butane_test_helper::setup_db; -#[cfg(feature = "sqlite")] -use butane_test_helper::sqlite_connspec; #[cfg(any(feature = "pg", feature = "sqlite"))] -use r2d2_for_test as r2d2; +use r2d2; +use std::ops::DerefMut; #[cfg(feature = "sqlite")] #[test] @@ -49,3 +46,21 @@ fn r2d2_pq() { } assert_eq!(pool.state().idle_connections, 3); } + +#[tokio::test] +async fn deadpool_test_pg_async() { + let (connspec, _data) = pg_connspec().await; + let manager = ConnectionManager::new(connspec); + let pool = deadpool::managed::Pool::builder(manager).build().unwrap(); + assert_eq!(pool.status().size, 0); + assert_eq!(pool.status().available, 0); + { + let mut conn: deadpool::managed::Object = pool.get().await.unwrap(); + assert_eq!(pool.status().size, 1); + assert_eq!(pool.status().available, 0); + + setup_db_async(conn.deref_mut()).await; + } + assert_eq!(pool.status().size, 1); + assert_eq!(pool.status().available, 1); +} diff --git a/butane_core/Cargo.toml b/butane_core/Cargo.toml index 2396542..104108d 100644 --- a/butane_core/Cargo.toml +++ b/butane_core/Cargo.toml @@ -47,7 +47,6 @@ tokio-postgres = { optional = true, workspace = true } postgres-native-tls = { version = "0.5", optional = true } proc-macro2 = { workspace = true } quote = { workspace = true } -r2d2 = { optional = true, workspace = true } rand = { optional = true, workspace = true } regex = { version = "1.5", features = ["std"] } rusqlite = { workspace = true, optional = true } diff --git a/butane_core/src/db/mod.rs b/butane_core/src/db/mod.rs index be05a1e..c1b345e 100644 --- a/butane_core/src/db/mod.rs +++ b/butane_core/src/db/mod.rs @@ -54,9 +54,6 @@ pub mod pg; #[cfg(feature = "sqlite")] pub mod sqlite; -#[cfg(feature = "r2d2")] -pub mod r2; - // Macros are always exported at the root of the crate use crate::connection_method_wrapper; diff --git a/butane_core/src/db/r2.rs b/butane_core/src/db/r2.rs deleted file mode 100644 index c7d27bf..0000000 --- a/butane_core/src/db/r2.rs +++ /dev/null @@ -1,97 +0,0 @@ -//! R2D2 support for Butane. - -use std::ops::Deref; - -pub use r2d2::ManageConnection; - -use crate::db::{ - BackendConnection, Column, Connection, ConnectionMethods, ConnectionSpec, RawQueryResult, -}; -use crate::{query::BoolExpr, query::Order, Result, SqlVal, SqlValRef}; - -/// R2D2 support for Butane. Implements [`r2d2::ManageConnection`]. -#[derive(Clone, Debug)] -pub struct ConnectionManager { - spec: ConnectionSpec, -} -impl ConnectionManager { - pub fn new(spec: ConnectionSpec) -> Self { - ConnectionManager { spec } - } -} - -impl ManageConnection for ConnectionManager { - type Connection = Connection; - type Error = crate::Error; - - fn connect(&self) -> Result { - crate::db::connect(&self.spec) - } - - fn is_valid(&self, conn: &mut Self::Connection) -> Result<()> { - conn.execute("SELECT 1") - } - - fn has_broken(&self, conn: &mut Self::Connection) -> bool { - conn.is_closed() - } -} - -impl ConnectionMethods for r2d2::PooledConnection { - fn execute(&self, sql: &str) -> Result<()> { - self.deref().execute(sql) - } - fn query<'c>( - &'c self, - table: &str, - columns: &[Column], - expr: Option, - limit: Option, - offset: Option, - sort: Option<&[Order]>, - ) -> Result> { - self.deref() - .query(table, columns, expr, limit, offset, sort) - } - fn insert_returning_pk( - &self, - table: &str, - columns: &[Column], - pkcol: &Column, - values: &[SqlValRef<'_>], - ) -> Result { - self.deref() - .insert_returning_pk(table, columns, pkcol, values) - } - /// Like `insert_returning_pk` but with no return value. - fn insert_only(&self, table: &str, columns: &[Column], values: &[SqlValRef<'_>]) -> Result<()> { - self.deref().insert_only(table, columns, values) - } - /// Insert unless there's a conflict on the primary key column, in which case update. - fn insert_or_replace( - &self, - table: &str, - columns: &[Column], - pkcol: &Column, - values: &[SqlValRef<'_>], - ) -> Result<()> { - self.deref() - .insert_or_replace(table, columns, pkcol, values) - } - fn update( - &self, - table: &str, - pkcol: Column, - pk: SqlValRef<'_>, - columns: &[Column], - values: &[SqlValRef<'_>], - ) -> Result<()> { - self.deref().update(table, pkcol, pk, columns, values) - } - fn delete_where(&self, table: &str, expr: BoolExpr) -> Result { - self.deref().delete_where(table, expr) - } - fn has_table(&self, table: &str) -> Result { - self.deref().has_table(table) - } -}