Skip to content

Commit

Permalink
Apply various Clippy recommendations (#603)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry authored Nov 24, 2024
1 parent 0062c29 commit 1bc6ae8
Show file tree
Hide file tree
Showing 41 changed files with 124 additions and 140 deletions.
4 changes: 2 additions & 2 deletions bytes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub mod arc {
sequestered: self.sequestered.clone(),
};

unsafe { self.ptr = self.ptr.offset(index as isize); }
unsafe { self.ptr = self.ptr.add(index); }
self.len -= index;

result
Expand Down Expand Up @@ -161,7 +161,7 @@ pub mod arc {
/// shared4.try_merge(shared2).ok().expect("Failed to merge 4 and 231");
/// ```
pub fn try_merge(&mut self, other: Bytes) -> Result<(), Bytes> {
if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(unsafe { self.ptr.offset(self.len as isize) }, other.ptr) {
if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(unsafe { self.ptr.add(self.len) }, other.ptr) {
self.len += other.len;
Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions communication/src/allocator/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl AllocateBuilder for ProcessBuilder {

// Initialize buzzers; send first, then recv.
for worker in self.buzzers_send.iter() {
let buzzer = Buzzer::new();
let buzzer = Buzzer::default();
worker.send(buzzer).expect("Failed to send buzzer");
}
let mut buzzers = Vec::with_capacity(self.buzzers_recv.len());
Expand Down Expand Up @@ -88,8 +88,8 @@ impl Process {

counters_recv
.into_iter()
.zip(buzzers_send.into_iter())
.zip(buzzers_recv.into_iter())
.zip(buzzers_send)
.zip(buzzers_recv)
.enumerate()
.map(|(index, ((recv, bsend), brecv))| {
ProcessBuilder {
Expand Down
10 changes: 2 additions & 8 deletions communication/src/allocator/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ pub struct ThreadBuilder;

impl AllocateBuilder for ThreadBuilder {
type Allocator = Thread;
fn build(self) -> Self::Allocator { Thread::new() }
fn build(self) -> Self::Allocator { Thread::default() }
}


/// An allocator for intra-thread communication.
#[derive(Default)]
pub struct Thread {
/// Shared counts of messages in channels.
events: Rc<RefCell<Vec<usize>>>,
Expand Down Expand Up @@ -53,13 +54,6 @@ pub type ThreadPusher<T> = CountPusher<T, Pusher<T>>;
pub type ThreadPuller<T> = CountPuller<T, Puller<T>>;

impl Thread {
/// Allocates a new thread-local channel allocator.
pub fn new() -> Self {
Thread {
events: Rc::new(RefCell::new(Default::default())),
}
}

/// Creates a new thread-local channel from an identifier and shared counts.
pub fn new_from<T: 'static>(identifier: usize, events: Rc<RefCell<Vec<usize>>>)
-> (ThreadPusher<T>, ThreadPuller<T>)
Expand Down
2 changes: 1 addition & 1 deletion communication/src/allocator/zero_copy/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<A: AllocateBuilder> TcpBuilder<A> {
// Fulfill puller obligations.
let mut recvs = Vec::with_capacity(self.peers);
for promise in self.promises.into_iter() {
let buzzer = crate::buzzer::Buzzer::new();
let buzzer = crate::buzzer::Buzzer::default();
let queue = MergeQueue::new(buzzer);
promise.send(queue.clone()).expect("Failed to send MergeQueue");
recvs.push(queue.clone());
Expand Down
2 changes: 1 addition & 1 deletion communication/src/allocator/zero_copy/allocator_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl ProcessBuilder {
// Fulfill puller obligations.
let mut recvs = Vec::with_capacity(self.peers);
for puller in self.pullers.into_iter() {
let buzzer = crate::buzzer::Buzzer::new();
let buzzer = crate::buzzer::Buzzer::default();
let queue = MergeQueue::new(buzzer);
puller.send(queue.clone()).expect("Failed to send MergeQueue");
recvs.push(queue.clone());
Expand Down
6 changes: 2 additions & 4 deletions communication/src/allocator/zero_copy/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,8 @@ pub fn initialize_networking_from_sockets<S: Stream + 'static>(
-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
{
// Sockets are expected to be blocking,
for socket in sockets.iter_mut() {
if let Some(socket) = socket {
socket.set_nonblocking(false).expect("failed to set socket to blocking");
}
for socket in sockets.iter_mut().flatten() {
socket.set_nonblocking(false).expect("failed to set socket to blocking");
}

let processes = sockets.len();
Expand Down
10 changes: 5 additions & 5 deletions communication/src/allocator/zero_copy/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//!
//! Methods related to reading from and writing to TCP connections
use std::io::{self, Write};
use crossbeam_channel::{Sender, Receiver};
Expand Down Expand Up @@ -67,9 +67,9 @@ where
assert!(!buffer.empty().is_empty());

// Attempt to read some more bytes into self.buffer.
let read = match reader.read(&mut buffer.empty()) {
let read = match reader.read(buffer.empty()) {
Err(x) => tcp_panic("reading data", x),
Ok(n) if n == 0 => {
Ok(0) => {
tcp_panic(
"reading data",
std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "socket closed"),
Expand Down Expand Up @@ -102,7 +102,7 @@ where
panic!("Clean shutdown followed by data.");
}
buffer.ensure_capacity(1);
if reader.read(&mut buffer.empty()).unwrap_or_else(|e| tcp_panic("reading EOF", e)) > 0 {
if reader.read(buffer.empty()).unwrap_or_else(|e| tcp_panic("reading EOF", e)) > 0 {
panic!("Clean shutdown followed by data.");
}
}
Expand Down Expand Up @@ -141,7 +141,7 @@ pub fn send_loop<S: Stream>(
logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: true, }));

let mut sources: Vec<MergeQueue> = sources.into_iter().map(|x| {
let buzzer = crate::buzzer::Buzzer::new();
let buzzer = crate::buzzer::Buzzer::default();
let queue = MergeQueue::new(buzzer);
x.send(queue.clone()).expect("failed to send MergeQueue");
queue
Expand Down
10 changes: 4 additions & 6 deletions communication/src/buzzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@ pub struct Buzzer {
thread: Thread,
}

impl Default for Buzzer {
fn default() -> Self { Self { thread: std::thread::current() } }
}

impl Buzzer {
/// Creates a new buzzer for the current thread.
pub fn new() -> Self {
Self {
thread: std::thread::current()
}
}
/// Unparks the target thread.
pub fn buzz(&self) {
self.thread.unpark()
Expand Down
6 changes: 3 additions & 3 deletions communication/src/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,15 @@ impl Config {
Ok((vec![GenericBuilder::Thread(ThreadBuilder)], Box::new(())))
},
Config::Process(threads) => {
Ok((Process::new_vector(threads).into_iter().map(|x| GenericBuilder::Process(x)).collect(), Box::new(())))
Ok((Process::new_vector(threads).into_iter().map(GenericBuilder::Process).collect(), Box::new(())))
},
Config::ProcessBinary(threads) => {
Ok((ProcessBuilder::new_vector(threads).into_iter().map(|x| GenericBuilder::ProcessBinary(x)).collect(), Box::new(())))
Ok((ProcessBuilder::new_vector(threads).into_iter().map(GenericBuilder::ProcessBinary).collect(), Box::new(())))
},
Config::Cluster { threads, process, addresses, report, log_fn } => {
match initialize_networking(addresses, process, threads, report, log_fn) {
Ok((stuff, guard)) => {
Ok((stuff.into_iter().map(|x| GenericBuilder::ZeroCopy(x)).collect(), Box::new(guard)))
Ok((stuff.into_iter().map(GenericBuilder::ZeroCopy).collect(), Box::new(guard)))
},
Err(err) => Err(format!("failed to initialize networking: {}", err))
}
Expand Down
8 changes: 4 additions & 4 deletions communication/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,11 @@ fn promise_futures<T>(sends: usize, recvs: usize) -> (Vec<Vec<Sender<T>>>, Vec<V
let mut senders: Vec<_> = (0 .. sends).map(|_| Vec::with_capacity(recvs)).collect();
let mut recvers: Vec<_> = (0 .. recvs).map(|_| Vec::with_capacity(sends)).collect();

for sender in 0 .. sends {
for recver in 0 .. recvs {
for sender in senders.iter_mut() {
for recver in recvers.iter_mut() {
let (send, recv) = crossbeam_channel::unbounded();
senders[sender].push(send);
recvers[recver].push(recv);
sender.push(send);
recver.push(recv);
}
}

Expand Down
2 changes: 1 addition & 1 deletion communication/src/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub fn create_sockets(addresses: Vec<String>, my_index: usize, noisy: bool) -> R
let mut results = start_task.join().unwrap()?;
results.push(None);
let to_extend = await_task.join().unwrap()?;
results.extend(to_extend.into_iter());
results.extend(to_extend);

if noisy { println!("worker {}:\tinitialization complete", my_index) }

Expand Down
4 changes: 2 additions & 2 deletions container/src/columnation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ impl<T: Columnation> Default for TimelyStack<T> {

impl<'a, A: 'a + Columnation> FromIterator<&'a A> for TimelyStack<A> {
fn from_iter<T: IntoIterator<Item = &'a A>>(iter: T) -> Self {
let mut iter = iter.into_iter();
let iter = iter.into_iter();
let mut c = TimelyStack::<A>::with_capacity(iter.size_hint().0);
while let Some(element) = iter.next() {
for element in iter {
c.copy(element);
}

Expand Down
4 changes: 2 additions & 2 deletions container/src/flatcontainer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ impl<R: Region + Clone + 'static> Container for FlatStack<R> {

type Iter<'a> = <&'a Self as IntoIterator>::IntoIter;

fn iter<'a>(&'a self) -> Self::Iter<'a> {
fn iter(&self) -> Self::Iter<'_> {
IntoIterator::into_iter(self)
}

type DrainIter<'a> = Self::Iter<'a>;

fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
fn drain(&mut self) -> Self::DrainIter<'_> {
IntoIterator::into_iter(&*self)
}
}
Expand Down
2 changes: 1 addition & 1 deletion mdbook/src/chapter_2/chapter_2_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ fn main() {

in1.binary_frontier(&in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {

let mut notificator = FrontierNotificator::new();
let mut notificator = FrontierNotificator::default();
let mut stash = HashMap::new();

move |input1, input2, output| {
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/threadless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use timely::WorkerConfig;
fn main() {

// create a naked single-threaded worker.
let allocator = timely::communication::allocator::Thread::new();
let allocator = timely::communication::allocator::Thread::default();
let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator);

// create input and probe handles.
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/channels/pullers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct ConsumedGuard<T: Ord + Clone + 'static> {

impl<T:Ord+Clone+'static> ConsumedGuard<T> {
pub(crate) fn time(&self) -> &T {
&self.time.as_ref().unwrap()
self.time.as_ref().unwrap()
}
}

Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/channels/pushers/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
let hash_func = &mut self.hash_func;

// if the number of pushers is a power of two, use a mask
if (self.pushers.len() & (self.pushers.len() - 1)) == 0 {
if self.pushers.len().is_power_of_two() {
let mask = (self.pushers.len() - 1) as u64;
let pushers = &mut self.pushers;
data.push_partitioned(
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/aggregation/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> Aggregate<S, K, V> for
hash: H) -> Stream<S, R> where S::Timestamp: Eq {

let mut aggregates = HashMap::new();
self.unary_notify(Exchange::new(move |&(ref k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| {
self.unary_notify(Exchange::new(move |(k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| {

// read each input, fold into aggregates
input.for_each(|time, data| {
Expand Down
8 changes: 4 additions & 4 deletions timely/src/dataflow/operators/aggregation/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::dataflow::{Stream, Scope};
use crate::dataflow::operators::generic::operator::Operator;
use crate::dataflow::channels::pact::Exchange;

/// Provides the `state_machine` method.
///
/// Generic state-transition machinery: each key has a state, and receives a sequence of events.
/// Events are applied in time-order, but no other promises are made. Each state transition can
/// produce output, which is sent.
Expand All @@ -15,8 +17,6 @@ use crate::dataflow::channels::pact::Exchange;
/// updates for the current time reflected in the notificator, though. In the case of partially
/// ordered times, the only guarantee is that updates are not applied out of order, not that there
/// is some total order on times respecting the total order (updates may be interleaved).
/// Provides the `state_machine` method.
pub trait StateMachine<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> {
/// Tracks a state for each presented key, using user-supplied state transition logic.
///
Expand Down Expand Up @@ -66,7 +66,7 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> f
let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new(); // times -> (keys -> state)
let mut states = HashMap::new(); // keys -> state

self.unary_notify(Exchange::new(move |&(ref k, _)| hash(k)), "StateMachine", vec![], move |input, output, notificator| {
self.unary_notify(Exchange::new(move |(k, _)| hash(k)), "StateMachine", vec![], move |input, output, notificator| {

// go through each time with data, process each (key, val) pair.
notificator.for_each(|time,_,_| {
Expand All @@ -88,7 +88,7 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> f

// stash if not time yet
if notificator.frontier(0).less_than(time.time()) {
pending.entry(time.time().clone()).or_insert_with(Vec::new).extend(data.drain(..));
pending.entry(time.time().clone()).or_insert_with(Vec::new).append(data);
notificator.notify_at(time.retain());
}
else {
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/branch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl<S: Scope, D: Data> Branch<S, D> for Stream<S, D> {
let mut out1 = output1_handle.session(&time);
let mut out2 = output2_handle.session(&time);
for datum in data.drain(..) {
if condition(&time.time(), &datum) {
if condition(time.time(), &datum) {
out2.give(datum);
} else {
out1.give(datum);
Expand Down Expand Up @@ -107,7 +107,7 @@ impl<S: Scope, C: Container> BranchWhen<S::Timestamp> for StreamCore<S, C> {
let mut output2_handle = output2.activate();

input.for_each(|time, data| {
let mut out = if condition(&time.time()) {
let mut out = if condition(time.time()) {
output2_handle.session(&time)
} else {
output1_handle.session(&time)
Expand Down
11 changes: 6 additions & 5 deletions timely/src/dataflow/operators/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ pub trait CapabilityTrait<T: Timestamp> {
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool;
}

impl<'a, T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &'a C {
impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &C {
fn time(&self) -> &T { (**self).time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
(**self).valid_for_output(query_buffer)
}
}
impl<'a, T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &'a mut C {
impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &mut C {
fn time(&self) -> &T { (**self).time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
(**self).valid_for_output(query_buffer)
Expand Down Expand Up @@ -227,9 +227,10 @@ impl Error for DowngradeError {}
/// A shared list of shared output capability buffers.
type CapabilityUpdates<T> = Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>;

/// An capability of an input port. Holding onto this capability will implicitly holds onto a
/// capability for all the outputs ports this input is connected to, after the connection summaries
/// have been applied.
/// An capability of an input port.
///
/// Holding onto this capability will implicitly holds onto a capability for all the outputs
/// ports this input is connected to, after the connection summaries have been applied.
///
/// This input capability supplies a `retain_for_output(self)` method which consumes the input
/// capability and turns it into a [Capability] for a specific output port.
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/enterleave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub trait Leave<G: Scope, C: Container> {
fn leave(&self) -> StreamCore<G, C>;
}

impl<'a, G: Scope, C: Clone+Container, T: Timestamp+Refines<G::Timestamp>> Leave<G, C> for StreamCore<Child<'a, G, T>, C> {
impl<G: Scope, C: Clone+Container, T: Timestamp+Refines<G::Timestamp>> Leave<G, C> for StreamCore<Child<'_, G, T>, C> {
fn leave(&self) -> StreamCore<G, C> {

let scope = self.scope();
Expand Down
8 changes: 4 additions & 4 deletions timely/src/dataflow/operators/core/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ pub trait Exchange<C: PushPartitioned> {
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn exchange<F: 'static>(&self, route: F) -> Self
fn exchange<F>(&self, route: F) -> Self
where
for<'a> F: FnMut(&C::Item<'a>) -> u64;
for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static;
}

impl<G: Scope, C> Exchange<C> for StreamCore<G, C>
where
C: PushPartitioned + ExchangeData,
{
fn exchange<F: 'static>(&self, route: F) -> StreamCore<G, C>
fn exchange<F>(&self, route: F) -> StreamCore<G, C>
where
for<'a> F: FnMut(&C::Item<'a>) -> u64,
for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static,
{
self.unary(ExchangeCore::new(route), "Exchange", |_, _| {
move |input, output| {
Expand Down
Loading

0 comments on commit 1bc6ae8

Please sign in to comment.