Skip to content

Commit

Permalink
Merge pull request #155 from Metaswitch/trait-composite-service
Browse files Browse the repository at this point in the history
Use a trait to define CompositeService
  • Loading branch information
richardwhiuk authored Jun 27, 2022
2 parents 23a8ed6 + 0e42da9 commit 9e9f5b4
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 98 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Fixed

## [6.2.0] - 2022-06-25
### Added
- Use a trait for connection on CompositeService to allow users to define
their own supported connection types

## [6.1.1] - 2022-02-01
### Fixed
- Remove private, unused dependency on `chrono`
Expand Down Expand Up @@ -185,7 +190,8 @@ No changes. We now think we've got enough to declare this crate stable.
## [0.5.0] - 2017-09-18
- Start of changelog.

[Unreleased]: https://github.com/Metaswitch/swagger-rs/compare/6.1.1...HEAD
[Unreleased]: https://github.com/Metaswitch/swagger-rs/compare/6.2.0...HEAD
[6.2.0]: https://github.com/Metaswitch/swagger-rs/compare/6.1.1...6.2.0
[6.1.1]: https://github.com/Metaswitch/swagger-rs/compare/6.1.0...6.1.1
[6.1.0]: https://github.com/Metaswitch/swagger-rs/compare/6.0.0...6.1.0
[6.0.0]: https://github.com/Metaswitch/swagger-rs/compare/5.1.0...6.0.0
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "swagger"
version = "6.1.1"
version = "6.2.0"
authors = ["Metaswitch Networks Ltd"]
license = "Apache-2.0"
description = "A set of common utilities for Rust code generated by swagger-codegen"
Expand Down
128 changes: 32 additions & 96 deletions src/composites.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use hyper::service::Service;
use hyper::{Request, Response, StatusCode};
use std::fmt;
use std::future::Future;
use std::net::SocketAddr;
use std::ops::{Deref, DerefMut};
use std::task::{Context, Poll};

Expand All @@ -27,6 +28,32 @@ impl<B: Default> NotFound<B> for B {
}
}

/// Connection which has a remote address, which can thus be composited.
pub trait HasRemoteAddr {
/// Get the remote address for the connection to pass
/// to the composited service
fn remote_addr(&self) -> Option<SocketAddr>;
}

impl<'a> HasRemoteAddr for &'a Option<SocketAddr> {
fn remote_addr(&self) -> Option<SocketAddr> {
**self
}
}

impl<'a> HasRemoteAddr for &'a hyper::server::conn::AddrStream {
fn remote_addr(&self) -> Option<SocketAddr> {
Some(hyper::server::conn::AddrStream::remote_addr(self))
}
}

#[cfg(feature = "uds")]
impl HasRemoteAddr for &'a tokio::net::UnixStream {
fn remote_addr(&self) -> Option<SocketAddr> {
None
}
}

/// Trait implemented by services which can be composited.
///
/// Wraps tower_service::Service
Expand Down Expand Up @@ -151,98 +178,10 @@ where
}
}

use std::net::SocketAddr;

impl<'a, ReqBody, ResBody, Error, MakeError> Service<&'a Option<SocketAddr>>
for CompositeMakeService<Option<SocketAddr>, ReqBody, ResBody, Error, MakeError>
where
ReqBody: 'static,
ResBody: NotFound<ResBody> + 'static,
MakeError: Send + 'static,
Error: 'static,
{
type Error = MakeError;
type Response = CompositeService<ReqBody, ResBody, Error>;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
for service in &mut self.0 {
match service.1.poll_ready(cx) {
Poll::Ready(Ok(_)) => {}
Poll::Ready(Err(e)) => {
return Poll::Ready(Err(e));
}
Poll::Pending => {
return Poll::Pending;
}
}
}
Poll::Ready(Ok(()))
}

fn call(&mut self, target: &'a Option<SocketAddr>) -> Self::Future {
let mut services = Vec::with_capacity(self.0.len());
for (path, service) in &mut self.0 {
let path: &'static str = path;
services.push(service.call(*target).map_ok(move |s| (path, s)));
}
Box::pin(futures::future::join_all(services).map(|results| {
let services: Result<Vec<_>, MakeError> = results.into_iter().collect();

Ok(CompositeService(services?))
}))
}
}

impl<'a, ReqBody, ResBody, Error, MakeError> Service<&'a hyper::server::conn::AddrStream>
for CompositeMakeService<Option<SocketAddr>, ReqBody, ResBody, Error, MakeError>
where
ReqBody: 'static,
ResBody: NotFound<ResBody> + 'static,
MakeError: Send + 'static,
Error: 'static,
{
type Error = MakeError;
type Response = CompositeService<ReqBody, ResBody, Error>;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
for service in &mut self.0 {
match service.1.poll_ready(cx) {
Poll::Ready(Ok(_)) => {}
Poll::Ready(Err(e)) => {
return Poll::Ready(Err(e));
}
Poll::Pending => {
return Poll::Pending;
}
}
}
Poll::Ready(Ok(()))
}

fn call(&mut self, target: &'a hyper::server::conn::AddrStream) -> Self::Future {
let mut services = Vec::with_capacity(self.0.len());
for (path, service) in &mut self.0 {
let path: &'static str = path;
services.push(
service
.call(Some(target.remote_addr()))
.map_ok(move |s| (path, s)),
);
}
Box::pin(futures::future::join_all(services).map(|results| {
let services: Result<Vec<_>, MakeError> = results.into_iter().collect();

Ok(CompositeService(services?))
}))
}
}

#[cfg(feature = "uds")]
impl<'a, ReqBody, ResBody, Error, MakeError> Service<&'a tokio::net::UnixStream>
impl<ReqBody, ResBody, Error, MakeError, Connection> Service<Connection>
for CompositeMakeService<Option<SocketAddr>, ReqBody, ResBody, Error, MakeError>
where
Connection: HasRemoteAddr,
ReqBody: 'static,
ResBody: NotFound<ResBody> + 'static,
MakeError: Send + 'static,
Expand All @@ -267,15 +206,12 @@ where
Poll::Ready(Ok(()))
}

fn call(&mut self, _target: &'a tokio::net::UnixStream) -> Self::Future {
fn call(&mut self, target: Connection) -> Self::Future {
let mut services = Vec::with_capacity(self.0.len());
let addr = target.remote_addr();
for (path, service) in &mut self.0 {
let path: &'static str = path;
services.push(
service
.call(None)
.map_ok(move |s| (path, s)),
);
services.push(service.call(addr).map_ok(move |s| (path, s)));
}
Box::pin(futures::future::join_all(services).map(|results| {
let services: Result<Vec<_>, MakeError> = results.into_iter().collect();
Expand Down

0 comments on commit 9e9f5b4

Please sign in to comment.