Skip to content

Commit

Permalink
use intel thread building blocks (#25)
Browse files Browse the repository at this point in the history
* tbb

* wip

* wip

* update tiles + oneTBB: no stack check for asan

* wip

* valgrind suppression for intel tbb
  • Loading branch information
felixguendling authored Nov 12, 2024
1 parent 736ee98 commit 05ea709
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 115 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ jobs:
- preset: clang-tidy
- preset: linux-sanitizer
- preset: linux-debug
emulator: valgrind --leak-check=full --error-exitcode=1
emulator: valgrind --suppressions=./docs/mmap.supp --suppressions=./docs/tbb.supp --suppressions=./docs/pthread.supp --leak-check=full --error-exitcode=1
env:
BUILDCACHE_DIR: /buildcache
BUILDCACHE_COMPRESS: true
Expand Down
6 changes: 3 additions & 3 deletions .pkg
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
[geo]
url=git@github.com:motis-project/geo.git
branch=master
commit=de5a3586871b8b76a487d42fcc673c9487a1e233
commit=5d99aeb10674a41a82d7c78f850abfd9605bf6e1
[cista]
url=git@github.com:felixguendling/cista.git
branch=master
commit=52577def055e4bdf90eaa461872fc1f7b5b1131d
commit=847b27100b7e730370b810ce62206a66b0bf2d79
[googletest]
url=git@github.com:motis-project/googletest.git
branch=master
Expand All @@ -37,7 +37,7 @@
[tiles]
url=git@github.com:motis-project/tiles.git
branch=master
commit=f7208d4e05661abbf09e4f284de0112a445f6b0a
commit=6513820682dda589594f705b0ac7007d7bc7cdad
[mimalloc]
url=git@github.com:motis-project/mimalloc.git
branch=master
Expand Down
13 changes: 7 additions & 6 deletions .pkg.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
15309325614644894217
cista 52577def055e4bdf90eaa461872fc1f7b5b1131d
13925851286882046445
cista 847b27100b7e730370b810ce62206a66b0bf2d79
zlib-ng 68ab3e2d80253ec5dc3c83691d9ff70477b32cd3
boost 73549ebca677fe6214202a1ab580362b4f80e653
boost 930f38eb0365ceb7853273e03da4d9e7787abfb9
conf f9bf4bd83bf55a2170725707e526cbacc45dcc66
expat 636c9861e8e7c119f3626d1e6c260603ab624516
fmt edb385ac526c24bc917ec4a41bb0edb28f0ca59e
Expand All @@ -20,10 +20,11 @@ clipper 904f0e6644c7f01c176443613be8f7788d59c658
concurrentqueue 54fdce755d3e52c785d6d9d7d91c94615495868c
lmdb 39d8127e5697b1323a67e61c3ad8f087384c7429
miniz 1edbdece9d71dc65c6ff405572ee37cbdcef7af4
res 7d97784ba785ce8a2677ea77164040fde484fb04
oneTBB cda5d37ce8303c58ac506259387e75de4ffcf26b
res b759b93316afeb529b6cb5b2548b24c41e382fb0
pbf-sdf-fonts 91b369e4eb8a618e0a83b0c04b1b08632ea872c4
sol2 3427b4412516c776226a751c13bfb95bb57bc61a
sol2 40c7cbc7c5cfed1e8c7f1bbe6fcbe23d7a67fc75
variant 5aa73631dc969087c77433a5cdef246303051f69
tiles f7208d4e05661abbf09e4f284de0112a445f6b0a
tiles 6513820682dda589594f705b0ac7007d7bc7cdad
unordered_dense b33b037377ca966bbdd9cccc3417e46e88f83bfb
rtree.c 6ed73a7dc4f1184f2b5b2acd8ac1c2b28a273057
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ target_link_libraries(osr-benchmark osr)

file(GLOB_RECURSE osr-backend-src exe/backend/*.cc)
add_executable(osr-backend ${osr-backend-src})
target_link_libraries(osr-backend osr web-server conf boost-json)
target_link_libraries(osr-backend osr web-server conf boost-json TBB::tbb)
target_include_directories(osr-backend PRIVATE exe/backend/include)

# --- TEST ---
Expand Down
7 changes: 7 additions & 0 deletions docs/mmap.supp
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
uninitialized_values_written_to_memory_map
Memcheck:Param
msync(start)
fun:msync
fun:_ZN5cista4mmap4syncEv
}
11 changes: 11 additions & 0 deletions docs/pthread.supp
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
pthread thread creation leak
Memcheck:Leak
match-leak-kinds: possible
fun:calloc
fun:calloc
fun:allocate_dtv
fun:_dl_allocate_tls
fun:allocate_stack
fun:pthread_create@@GLIBC_2.34
}
8 changes: 8 additions & 0 deletions docs/tbb.supp
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
Intel oneTBB thread initialization leak
Memcheck:Leak
match-leak-kinds: possible
fun:_ZN3tbb6detail2r13rml14private_worker14thread_routineEPv
fun:start_thread
fun:clone
}
2 changes: 1 addition & 1 deletion include/osr/ways.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ struct ways {
}

static cista::wrapped<routing> read(std::filesystem::path const&);
void write(std::filesystem::path const&);
void write(std::filesystem::path const&) const;

vec_map<node_idx_t, node_properties> node_properties_;
vec_map<way_idx_t, way_properties> way_properties_;
Expand Down
142 changes: 40 additions & 102 deletions src/extract.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "fmt/core.h"
#include "fmt/std.h"

#include "oneapi/tbb/parallel_pipeline.h"

#include "osmium/area/assembler.hpp"
#include "osmium/area/multipolygon_manager.hpp"
#include "osmium/handler/node_locations_for_ways.hpp"
Expand All @@ -25,7 +27,6 @@

#include "tiles/osm/hybrid_node_idx.h"
#include "tiles/osm/tmp_file.h"
#include "tiles/util_parallel.h"

#include "osr/extract/tags.h"
#include "osr/lookup.h"
Expand Down Expand Up @@ -154,11 +155,11 @@ struct way_handler : public osm::handler::Handler {
struct strings_equals {
using is_transparent = void;

cista::hash_t operator()(string_idx_t const a, string_idx_t const b) const {
bool operator()(string_idx_t const a, string_idx_t const b) const {
return a == b;
}

cista::hash_t operator()(std::string_view a, string_idx_t const b) const {
bool operator()(std::string_view a, string_idx_t const b) const {
return a == (*strings_)[b].view();
}

Expand Down Expand Up @@ -507,60 +508,28 @@ void extract(bool const with_platforms,
{ // Extract streets, places, and areas.
pt->status("Load OSM / Ways").in_high(file_size).out_bounds(20, 50);

auto const thread_count = std::max(2U, std::thread::hardware_concurrency());

// pool must be destructed before handlers!
auto pool =
osmium::thread::Pool{static_cast<int>(thread_count), thread_count * 8U};

auto reader = osm_io::Reader{input_file, pool, osm_eb::way,
osmium::io::read_meta::no};
auto seq_reader = tiles::sequential_until_finish<osm_mem::Buffer>{[&] {
pt->update(reader.offset());
return reader.read();
}};
auto has_exception = std::atomic_bool{false};
auto workers = std::vector<std::future<void>>{};
workers.reserve(thread_count / 2U);
auto h = way_handler{w, pl.get(), rel_ways, elevator_nodes};
auto queue = tiles::in_order_queue<osm_mem::Buffer>{};
for (auto i = 0U; i < thread_count / 2U; ++i) {
workers.emplace_back(pool.submit([&] {
try {
while (true) {
auto opt = seq_reader.process();
if (!opt.has_value()) {
break;
}

auto& [idx, buf] = *opt;
tiles::update_locations(node_idx, buf);

queue.process_in_order(idx, std::move(buf),
[&](auto buf2) { osm::apply(buf2, h); });
}
} catch (std::exception const& e) {
fmt::print(std::clog, "EXCEPTION CAUGHT: {} {}\n",
std::this_thread::get_id(), e.what());
has_exception = true;
} catch (...) {
fmt::print(std::clog, "UNKNOWN EXCEPTION CAUGHT: {} \n",
std::this_thread::get_id());
has_exception = true;
}
}));
}

utl::verify(!workers.empty(), "have no workers");
for (auto& worker : workers) {
worker.wait();
}

utl::verify(!has_exception, "load_osm: exception caught!");
auto reader =
osm_io::Reader{input_file, osm_eb::way, osmium::io::read_meta::no};
oneapi::tbb::parallel_pipeline(
std::thread::hardware_concurrency() * 4U,
oneapi::tbb::make_filter<void, osm_mem::Buffer>(
oneapi::tbb::filter_mode::serial_in_order,
[&](oneapi::tbb::flow_control& fc) {
auto buf = reader.read();
pt->update(reader.offset());
if (!buf) {
fc.stop();
}
return buf;
}) &
oneapi::tbb::make_filter<osm_mem::Buffer, void>(
oneapi::tbb::filter_mode::parallel, [&](osm_mem::Buffer&& buf) {
update_locations(node_idx, buf);
osm::apply(buf, h);
}));

reader.close();
pt->update(pt->in_high_);

reader.close();
}

Expand All @@ -574,58 +543,27 @@ void extract(bool const with_platforms,
pt->status("Load OSM / Node Properties")
.in_high(file_size)
.out_bounds(90, 100);
auto const thread_count = std::max(2U, std::thread::hardware_concurrency());

// pool must be destructed before handlers!
auto pool =
osmium::thread::Pool{static_cast<int>(thread_count), thread_count * 8U};

auto reader =
osm_io::Reader{input_file, pool, osm_eb::node | osm_eb::relation,
osmium::io::read_meta::no};
auto seq_reader = tiles::sequential_until_finish<osm_mem::Buffer>{[&] {
pt->update(reader.offset());
return reader.read();
}};
auto reader = osm_io::Reader{input_file, osm_eb::node | osm_eb::relation,
osmium::io::read_meta::no};
auto h = node_handler{w, pl.get(), r, elevator_nodes};
auto has_exception = std::atomic_bool{false};
auto workers = std::vector<std::future<void>>{};
workers.reserve(thread_count / 2U);
for (auto i = 0U; i < thread_count / 2U; ++i) {
workers.emplace_back(pool.submit([&] {
try {
while (true) {
auto opt = seq_reader.process();
if (!opt.has_value()) {
break;
}

auto& [idx, buf] = *opt;
osm::apply(buf, h);
}
} catch (std::exception const& e) {
fmt::print(std::clog, "EXCEPTION CAUGHT: {} {}\n",
std::this_thread::get_id(), e.what());
has_exception = true;
} catch (...) {
fmt::print(std::clog, "UNKNOWN EXCEPTION CAUGHT: {} \n",
std::this_thread::get_id());
has_exception = true;
}
}));
}

utl::verify(!workers.empty(), "have no workers");
for (auto& worker : workers) {
worker.wait();
}

utl::verify(!has_exception, "load_osm: exception caught!");
oneapi::tbb::parallel_pipeline(
std::thread::hardware_concurrency() * 4U,
oneapi::tbb::make_filter<void, osm_mem::Buffer>(
oneapi::tbb::filter_mode::serial_in_order,
[&](oneapi::tbb::flow_control& fc) {
auto buf = reader.read();
pt->update(reader.offset());
if (!buf) {
fc.stop();
}
return buf;
}) &
oneapi::tbb::make_filter<osm_mem::Buffer, void>(
oneapi::tbb::filter_mode::parallel,
[&](osm_mem::Buffer&& buf) { osm::apply(buf, h); }));

reader.close();
pt->update(pt->in_high_);

reader.close();
}

w.add_restriction(r);
Expand Down
2 changes: 1 addition & 1 deletion src/ways.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ cista::wrapped<ways::routing> ways::routing::read(
return cista::read<ways::routing>(p / "routing.bin");
}

void ways::routing::write(std::filesystem::path const& p) {
void ways::routing::write(std::filesystem::path const& p) const {
return cista::write(p / "routing.bin", *this);
}

Expand Down

0 comments on commit 05ea709

Please sign in to comment.