From aa5604284bae2a7aff7fe3d085168c9d37d5a4f0 Mon Sep 17 00:00:00 2001 From: sttk Date: Sun, 5 Jan 2025 07:20:06 +0900 Subject: [PATCH] new: added implementations about DaxConn --- src/dax_conn.rs | 507 ++++++++++++++++++++++++++++++++++++++++++++++++ src/dax_src.rs | 40 +--- src/errors.rs | 13 ++ src/lib.rs | 3 +- 4 files changed, 524 insertions(+), 39 deletions(-) create mode 100644 src/dax_conn.rs diff --git a/src/dax_conn.rs b/src/dax_conn.rs new file mode 100644 index 0000000..e10f4bc --- /dev/null +++ b/src/dax_conn.rs @@ -0,0 +1,507 @@ +// Copyright (C) 2024 Takayuki Sato. All Rights Reserved. +// This program is free software under MIT License. +// See the file LICENSE in this distribution for more details. + +use std::any; +use std::collections::HashMap; +use std::ptr; + +use crate::async_group::{AsyncGroup, AsyncGroupAsync}; +use crate::errors; +use crate::errs::Err; + +/// Represents a connection to a data store. +/// +/// This trait declares methods: `commit`, `rollback`, `close`, etc. to work in a transaction +/// process. +pub trait DaxConn { + /// Commits the updates in a transaction. + fn commit(&mut self, ag: &mut dyn AsyncGroup) -> Result<(), Err>; + + /// Checks whether updates are already committed. + fn is_committed(&self) -> bool; + + /// Rollbacks updates in a transaction. + fn rollback(&mut self, ag: &mut dyn AsyncGroup); + + /// Reverts updates forcely even if updates are already committed or this connection does not + /// have rollback mechanism. + fn force_back(&mut self, ag: &mut dyn AsyncGroup); + + /// Closes this connection. + fn close(&mut self); +} + +pub(crate) struct NoopDaxConn {} + +impl DaxConn for NoopDaxConn { + fn commit(&mut self, _ag: &mut dyn AsyncGroup) -> Result<(), Err> { + Ok(()) + } + fn is_committed(&self) -> bool { + false + } + fn rollback(&mut self, _ag: &mut dyn AsyncGroup) {} + fn force_back(&mut self, _ag: &mut dyn AsyncGroup) {} + fn close(&mut self) {} +} + +#[repr(C)] +struct DaxConnContainer<'a, C = NoopDaxConn> +where + C: DaxConn + 'static, +{ + drop_fn: fn(*const DaxConnContainer), + is_fn: fn(any::TypeId) -> bool, + commit_fn: fn(*const DaxConnContainer, &mut dyn AsyncGroup) -> Result<(), Err>, + is_committed_fn: fn(*const DaxConnContainer) -> bool, + rollback_fn: fn(*const DaxConnContainer, &mut dyn AsyncGroup), + force_back_fn: fn(*const DaxConnContainer, &mut dyn AsyncGroup), + close_fn: fn(*const DaxConnContainer), + prev: *mut DaxConnContainer<'a>, + next: *mut DaxConnContainer<'a>, + name: &'a str, + dax_conn: Box, +} + +impl<'a, C> DaxConnContainer<'a, C> +where + C: DaxConn + 'static, +{ + fn new(name: &'a str, dax_conn: Box) -> Self { + Self { + drop_fn: drop_dax_conn::, + is_fn: is_dax_conn::, + commit_fn: commit_dax_conn::, + is_committed_fn: is_committed_dax_conn::, + rollback_fn: rollback_dax_conn::, + force_back_fn: force_back_dax_conn::, + close_fn: close_dax_conn::, + prev: ptr::null_mut(), + next: ptr::null_mut(), + name, + dax_conn, + } + } +} + +fn drop_dax_conn(ptr: *const DaxConnContainer) +where + C: DaxConn + 'static, +{ + let typed_ptr = ptr as *mut DaxConnContainer; + unsafe { + drop(Box::from_raw(typed_ptr)); + } +} + +fn is_dax_conn(type_id: any::TypeId) -> bool +where + C: DaxConn + 'static, +{ + any::TypeId::of::() == type_id +} + +fn commit_dax_conn(ptr: *const DaxConnContainer, ag: &mut dyn AsyncGroup) -> Result<(), Err> +where + C: DaxConn + 'static, +{ + let typed_ptr = ptr as *mut DaxConnContainer; + unsafe { (*typed_ptr).dax_conn.commit(ag) } +} + +fn is_committed_dax_conn(ptr: *const DaxConnContainer) -> bool +where + C: DaxConn + 'static, +{ + let typed_ptr = ptr as *mut DaxConnContainer; + unsafe { (*typed_ptr).dax_conn.is_committed() } +} + +fn rollback_dax_conn(ptr: *const DaxConnContainer, ag: &mut dyn AsyncGroup) +where + C: DaxConn + 'static, +{ + let typed_ptr = ptr as *mut DaxConnContainer; + unsafe { (*typed_ptr).dax_conn.rollback(ag) } +} + +fn force_back_dax_conn(ptr: *const DaxConnContainer, ag: &mut dyn AsyncGroup) +where + C: DaxConn + 'static, +{ + let typed_ptr = ptr as *mut DaxConnContainer; + unsafe { (*typed_ptr).dax_conn.force_back(ag) }; +} + +fn close_dax_conn(ptr: *const DaxConnContainer) +where + C: DaxConn + 'static, +{ + let typed_ptr = ptr as *mut DaxConnContainer; + unsafe { (*typed_ptr).dax_conn.close() }; +} + +pub(crate) struct DaxConnMap<'a> { + head: *mut DaxConnContainer<'a>, + last: *mut DaxConnContainer<'a>, + map: HashMap<&'a str, *mut DaxConnContainer<'a>>, +} + +impl<'a> DaxConnMap<'a> { + pub(crate) fn new() -> Self { + Self { + head: ptr::null_mut(), + last: ptr::null_mut(), + map: HashMap::new(), + } + } + + pub(crate) fn insert(&mut self, name: &'a str, conn: Box) + where + C: DaxConn + 'static, + { + let boxed = Box::new(DaxConnContainer::::new(name, conn)); + let typed_ptr = Box::into_raw(boxed); + let ptr = typed_ptr.cast::(); + if self.last.is_null() { + self.head = ptr; + self.last = ptr; + } else { + unsafe { + (*self.last).next = ptr; + (*typed_ptr).prev = self.last; + } + self.last = ptr; + } + + self.map.insert(name, ptr); + } + + pub(crate) fn commit(&self) -> Result<(), Err> { + if self.last.is_null() { + return Ok(()); + } + + let mut err_map = HashMap::::new(); + let mut ag = AsyncGroupAsync::new(); + + let mut ptr = self.head; + while !ptr.is_null() { + let commit_fn = unsafe { (*ptr).commit_fn }; + let name = unsafe { (*ptr).name }; + let next = unsafe { (*ptr).next }; + if let Err(err) = commit_fn(ptr, &mut ag) { + err_map.insert(name.to_string(), err); + } + ptr = next; + } + + ag.wait(&mut err_map); + + if err_map.is_empty() { + return Ok(()); + } + + Err(Err::new(errors::DaxConn::FailToCommit { errors: err_map })) + } + + pub(crate) fn rollback(&self) { + if self.last.is_null() { + return; + } + + let mut ag = AsyncGroupAsync::new(); + + let mut ptr = self.head; + while !ptr.is_null() { + let is_committed_fn = unsafe { (*ptr).is_committed_fn }; + let rollback_fn = unsafe { (*ptr).rollback_fn }; + let force_back_fn = unsafe { (*ptr).force_back_fn }; + let next = unsafe { (*ptr).next }; + if is_committed_fn(ptr) { + force_back_fn(ptr, &mut ag); + } else { + rollback_fn(ptr, &mut ag); + } + ptr = next; + } + + let mut err_map = HashMap::::new(); + ag.wait(&mut err_map); + } + + pub(crate) fn close(&self) { + if self.last.is_null() { + return; + } + + let mut ptr = self.last; + while !ptr.is_null() { + let close_fn = unsafe { (*ptr).close_fn }; + let prev = unsafe { (*ptr).prev }; + close_fn(ptr); + ptr = prev; + } + } +} + +impl Drop for DaxConnMap<'_> { + fn drop(&mut self) { + let mut ptr = self.last; + while !ptr.is_null() { + let drop_fn = unsafe { (*ptr).drop_fn }; + let prev = unsafe { (*ptr).prev }; + drop_fn(ptr); + ptr = prev; + } + } +} + +#[cfg(test)] +mod tests_of_dax_conn { + use super::*; + use std::sync::{LazyLock, Mutex}; + + struct Logger { + log_vec: Vec, + } + + impl Logger { + fn new() -> Self { + Self { + log_vec: Vec::::new(), + } + } + fn log(&mut self, s: &str) { + self.log_vec.push(s.to_string()); + } + fn assert_logs(&self, logs: &[&str]) { + assert_eq!(self.log_vec.len(), logs.len()); + for i in 0..self.log_vec.len() { + assert_eq!(self.log_vec[i], logs[i]); + } + } + fn clear(&mut self) { + self.log_vec.clear(); + } + } + + mod test_of_dax_conn_map { + use super::*; + + static LOGGER: LazyLock> = LazyLock::new(|| Mutex::new(Logger::new())); + + struct DaxConnA { + committed: bool, + } + + impl DaxConnA { + fn new() -> Self { + Self { committed: false } + } + } + + impl DaxConn for DaxConnA { + fn commit(&mut self, _ag: &mut dyn AsyncGroup) -> Result<(), Err> { + self.committed = true; + LOGGER.lock().unwrap().log("DaxConnA commit"); + Ok(()) + } + fn is_committed(&self) -> bool { + self.committed + } + fn rollback(&mut self, _ag: &mut dyn AsyncGroup) { + LOGGER.lock().unwrap().log("DaxConnA rollback"); + } + fn force_back(&mut self, _ag: &mut dyn AsyncGroup) { + LOGGER.lock().unwrap().log("DaxConnA force back"); + } + fn close(&mut self) { + LOGGER.lock().unwrap().log("DaxConnA close"); + } + } + + impl Drop for DaxConnA { + fn drop(&mut self) { + LOGGER.lock().unwrap().log("DaxConnA drop"); + } + } + + struct DaxConnB { + committed: bool, + } + + impl DaxConnB { + fn new() -> Self { + Self { committed: false } + } + } + + impl DaxConn for DaxConnB { + fn commit(&mut self, _ag: &mut dyn AsyncGroup) -> Result<(), Err> { + self.committed = true; + LOGGER.lock().unwrap().log("DaxConnB commit"); + Ok(()) + } + fn is_committed(&self) -> bool { + self.committed + } + fn rollback(&mut self, _ag: &mut dyn AsyncGroup) { + LOGGER.lock().unwrap().log("DaxConnB rollback"); + } + fn force_back(&mut self, _ag: &mut dyn AsyncGroup) { + LOGGER.lock().unwrap().log("DaxConnB force back"); + } + fn close(&mut self) { + LOGGER.lock().unwrap().log("DaxConnB close"); + } + } + + impl Drop for DaxConnB { + fn drop(&mut self) { + LOGGER.lock().unwrap().log("DaxConnB drop"); + } + } + + fn create_dax_conn_a() -> Result, Err> { + Ok(Box::new(DaxConnA::new())) + } + + fn create_dax_conn_b() -> Result, Err> { + Ok(Box::new(DaxConnB::new())) + } + + fn commits() { + let mut dax_conn_map = DaxConnMap::new(); + + match create_dax_conn_a() { + Ok(dax_conn) => { + let ptr = Box::into_raw(dax_conn); + let typed_ptr = ptr as *mut DaxConnA; + let boxed = unsafe { Box::from_raw(typed_ptr) }; + dax_conn_map.insert("foo", boxed); + } + Err(_) => panic!(), + } + + match create_dax_conn_b() { + Ok(dax_conn) => { + let ptr = Box::into_raw(dax_conn); + let typed_ptr = ptr as *mut DaxConnB; + let boxed = unsafe { Box::from_raw(typed_ptr) }; + dax_conn_map.insert("bar", boxed); + } + Err(_) => panic!(), + } + + match dax_conn_map.commit() { + Ok(_) => {} + Err(_) => panic!(), + } + + dax_conn_map.close(); + } + + fn rollbacks() { + let mut dax_conn_map = DaxConnMap::new(); + + match create_dax_conn_a() { + Ok(dax_conn) => { + let ptr = Box::into_raw(dax_conn); + let typed_ptr = ptr as *mut DaxConnA; + let boxed = unsafe { Box::from_raw(typed_ptr) }; + dax_conn_map.insert("foo", boxed); + } + Err(_) => panic!(), + } + + match create_dax_conn_b() { + Ok(dax_conn) => { + let ptr = Box::into_raw(dax_conn); + let typed_ptr = ptr as *mut DaxConnB; + let boxed = unsafe { Box::from_raw(typed_ptr) }; + dax_conn_map.insert("bar", boxed); + } + Err(_) => panic!(), + } + + dax_conn_map.rollback(); + dax_conn_map.close(); + } + + fn force_backs() { + let mut dax_conn_map = DaxConnMap::new(); + + match create_dax_conn_a() { + Ok(dax_conn) => { + let ptr = Box::into_raw(dax_conn); + let typed_ptr = ptr as *mut DaxConnA; + let boxed = unsafe { Box::from_raw(typed_ptr) }; + dax_conn_map.insert("foo", boxed); + } + Err(_) => panic!(), + } + + match create_dax_conn_b() { + Ok(dax_conn) => { + let ptr = Box::into_raw(dax_conn); + let typed_ptr = ptr as *mut DaxConnB; + let boxed = unsafe { Box::from_raw(typed_ptr) }; + dax_conn_map.insert("bar", boxed); + } + Err(_) => panic!(), + } + + match dax_conn_map.commit() { + Ok(_) => {} + Err(_) => panic!(), + } + + dax_conn_map.rollback(); + dax_conn_map.close(); + } + + #[test] + fn test() { + commits(); + + LOGGER.lock().unwrap().assert_logs(&[ + "DaxConnA commit", + "DaxConnB commit", + "DaxConnB close", + "DaxConnA close", + "DaxConnB drop", + "DaxConnA drop", + ]); + + LOGGER.lock().unwrap().clear(); + + rollbacks(); + + LOGGER.lock().unwrap().assert_logs(&[ + "DaxConnA rollback", + "DaxConnB rollback", + "DaxConnB close", + "DaxConnA close", + "DaxConnB drop", + "DaxConnA drop", + ]); + + LOGGER.lock().unwrap().clear(); + + force_backs(); + + LOGGER.lock().unwrap().assert_logs(&[ + "DaxConnA commit", + "DaxConnB commit", + "DaxConnA force back", + "DaxConnB force back", + "DaxConnB close", + "DaxConnA close", + "DaxConnB drop", + "DaxConnA drop", + ]); + } + } +} diff --git a/src/dax_src.rs b/src/dax_src.rs index a937bc6..8af618c 100644 --- a/src/dax_src.rs +++ b/src/dax_src.rs @@ -5,33 +5,11 @@ use std::collections::HashMap; use std::ptr; -use crate::async_group::AsyncGroupAsync; +use crate::async_group::{AsyncGroup, AsyncGroupAsync}; +use crate::dax_conn::{DaxConn, NoopDaxConn}; use crate::errors; -use crate::AsyncGroup; use crate::Err; -/// Represents a connection to a data store. -/// -/// This trait declares methods: `commit`, `rollback`, `close`, etc. to work in a transaction -/// process. -pub trait DaxConn { - /// Commits the updates in a transaction. - fn commit(&mut self, ag: &mut dyn AsyncGroup) -> Result<(), Err>; - - /// Checks whether updates are already committed. - fn is_committed(&self) -> bool; - - /// Rollbacks updates in a transaction. - fn rollback(&mut self, ag: &mut dyn AsyncGroup); - - /// Reverts updates forcely even if updates are already committed or this connection does not - /// have rollback mechanism. - fn force_back(&mut self, ag: &mut dyn AsyncGroup); - - /// Closes this connection. - fn close(&mut self); -} - /// Represents a data source which creates connections to a data store like database, etc. /// /// This trait declares methods: `setup`, `close`, and `create_dax_conn`. @@ -52,20 +30,6 @@ pub trait DaxSrc { fn create_dax_conn(&self) -> Result, Err>; } -struct NoopDaxConn {} - -impl DaxConn for NoopDaxConn { - fn commit(&mut self, _ag: &mut dyn AsyncGroup) -> Result<(), Err> { - Ok(()) - } - fn is_committed(&self) -> bool { - true - } - fn rollback(&mut self, _ag: &mut dyn AsyncGroup) {} - fn force_back(&mut self, _ag: &mut dyn AsyncGroup) {} - fn close(&mut self) {} -} - struct NoopDaxSrc {} impl DaxSrc for NoopDaxSrc { diff --git a/src/errors.rs b/src/errors.rs index 9dcfdfe..d4b7389 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -31,3 +31,16 @@ pub enum DaxSrc { errors: HashMap, }, } + +/// The enum type for errors by `DaxConn`. +/// +/// The variants of this enum indicates the possible errors that may occur with `DaxConn` +#[derive(Debug)] +pub enum DaxConn { + /// The error reason which indicates that some `DaxConn`(s) failed to commit. + FailToCommit { + /// The map of which keys are the registered names of `DaxConn`(s) that failed, and of + /// which values are `Err`(s) having their error reasons. + errors: HashMap, + }, +} diff --git a/src/lib.rs b/src/lib.rs index c5a0ac0..0ec35f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ // See the file LICENSE in this distribution for more details. mod async_group; +mod dax_conn; mod dax_src; mod errs; @@ -13,7 +14,7 @@ pub use errs::Err; pub use async_group::AsyncGroup; -pub use dax_src::DaxConn; +pub use dax_conn::DaxConn; pub use dax_src::DaxSrc; pub use dax_src::close;