Skip to content

Commit

Permalink
Add better support for IME (#10)
Browse files Browse the repository at this point in the history
* Introduces a class to determine the best path for IME and MPI-IO settings.
* Unifies the HAVE_MPI flag definition.
  • Loading branch information
sergiorg-hpc authored Apr 19, 2021
1 parent 9148bd6 commit 91a2b51
Show file tree
Hide file tree
Showing 13 changed files with 216 additions and 33 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ build
libsonata.cpython-*.so
python/__pycache__/
*.pyc
spack-*
.vscode/
venv-clang-format/
9 changes: 6 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ else()
set(SONATA_REPORT_COMPILE_OPTIONS -Wall -Wextra -pedantic)
endif()

if(CMAKE_SYSTEM_NAME MATCHES "Linux")
add_definitions(-DSONATA_REPORT_CHECK_IME)
endif()

# =============================================================================
# Dependencies
# =============================================================================
Expand All @@ -81,9 +85,8 @@ if(SONATA_REPORT_ENABLE_MPI)
if (MPI_FOUND)
if (HDF5_FOUND)
if (HDF5_IS_PARALLEL)
set(HAVE_MPI TRUE)
# When MPI and HDF5 parallel are found, enable mpi in the report library
add_definitions(-DHAVE_MPI)
set(SONATA_REPORT_HAVE_MPI TRUE) # For integration tests
add_definitions(-DSONATA_REPORT_HAVE_MPI)
message(STATUS "Both MPI and HDF5 parallel found, using reporting parallel implementation")
else()
message(STATUS "MPI and HDF5 found, but no parallel IO support for HDF5, using reporting serial implementation")
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ set(sonatareport_SOURCES
"data/sonata_data.cpp"
"io/hdf5_writer.cpp"
"utils/logger.cpp"
"utils/imeutil.cpp"
)

# Shared library
Expand Down
10 changes: 5 additions & 5 deletions src/io/hdf5_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ template void HDF5Writer::write<double>(const std::string& dataset_name,

HDF5Writer::HDF5Writer(const std::string& report_name)
: report_name_(report_name) {
hid_t plist_id = H5Pcreate(H5P_FILE_ACCESS);
std::tie(collective_list_, independent_list_) = Implementation::prepare_write(report_name,
plist_id);
// Create hdf5 file named after the report_name
const std::string file_name = report_name + ".h5";
hid_t plist_id;
std::string file_name;
std::tie(plist_id, collective_list_, independent_list_, file_name) =
Implementation::prepare_write(report_name);

file_ = H5Fcreate(file_name.data(), H5F_ACC_TRUNC, H5P_DEFAULT, plist_id);

// Create enum type for the ordering of the spikes
Expand Down
38 changes: 27 additions & 11 deletions src/library/implementation_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
#include <tuple>
#include <vector>

#include "../utils/imeutil.h"
#include "../utils/logger.h"
#include "sonatareport.h"

#if defined(HAVE_MPI)
#ifdef SONATA_REPORT_HAVE_MPI
#include <mpi.h>
#endif

Expand All @@ -19,6 +20,8 @@ namespace bbp {
namespace sonata {
namespace detail {

#define FILE_EXTENSION ".h5"

template <class TImpl>
struct Implementation {
static int init(const std::vector<std::string>& report_names) {
Expand All @@ -27,8 +30,9 @@ struct Implementation {
static void close() {
TImpl::close();
}
static std::tuple<hid_t, hid_t> prepare_write(const std::string& report_name, hid_t plist_id) {
return TImpl::prepare_write(report_name, plist_id);
static std::tuple<hid_t, hid_t, hid_t, std::string> prepare_write(
const std::string& report_name) {
return TImpl::prepare_write(report_name);
}
static hsize_t get_offset(const std::string& report_name, hsize_t value) {
return TImpl::get_offset(report_name, value);
Expand Down Expand Up @@ -85,7 +89,7 @@ static void local_spikevec_sort(std::vector<double>& isvect,
std::transform(perm.begin(), perm.end(), osvecg.begin(), [&](uint64_t i) { return isvecg[i]; });
}

#if defined(HAVE_MPI)
#ifdef SONATA_REPORT_HAVE_MPI

static MPI_Comm get_Comm(const std::string& report_name) {
if (SonataReport::communicators_.find(report_name) != SonataReport::communicators_.end()) {
Expand Down Expand Up @@ -162,9 +166,18 @@ struct ParallelImplementation {
};

static void close(){};
static std::tuple<hid_t, hid_t> prepare_write(const std::string& report_name, hid_t plist_id) {
// Enable MPI access
static std::tuple<hid_t, hid_t, hid_t, std::string> prepare_write(
const std::string& report_name) {
const auto& path_info = IMEUtil::getPathInfo(report_name + FILE_EXTENSION);
MPI_Info info = MPI_INFO_NULL;

// Set proper MPI-IO hints for better IME support
if (path_info.first == FSTYPE_IME) {
IMEUtil::setMPIHints(info);
}

// Set the MPI Info object with the hints
hid_t plist_id = H5Pcreate(H5P_FILE_ACCESS);
H5Pset_fapl_mpio(plist_id, get_Comm(report_name), info);

// Initialize independent/collective lists
Expand All @@ -173,7 +186,7 @@ struct ParallelImplementation {
H5Pset_dxpl_mpio(collective_list, H5FD_MPIO_COLLECTIVE);
H5Pset_dxpl_mpio(independent_list, H5FD_MPIO_INDEPENDENT);

return std::make_tuple(collective_list, independent_list);
return std::make_tuple(plist_id, collective_list, independent_list, path_info.second);
};

static hsize_t get_offset(const std::string& report_name, hsize_t value) {
Expand Down Expand Up @@ -286,9 +299,12 @@ struct SerialImplementation {
return 0;
};
static void close(){};
static std::tuple<hid_t, hid_t> prepare_write(const std::string& /*report_name*/,
hid_t /*plist_id*/) {
return std::make_tuple(H5Pcreate(H5P_DATASET_XFER), H5Pcreate(H5P_DATASET_XFER));
static std::tuple<hid_t, hid_t, hid_t, std::string> prepare_write(
const std::string& report_name) {
return std::make_tuple(H5Pcreate(H5P_FILE_ACCESS),
H5Pcreate(H5P_DATASET_XFER),
H5Pcreate(H5P_DATASET_XFER),
report_name + FILE_EXTENSION);
}
static hsize_t get_offset(const std::string& /*report_name*/, hsize_t /*value*/) {
return 0;
Expand Down Expand Up @@ -318,7 +334,7 @@ struct SerialImplementation {
} // namespace bbp

using Implementation = bbp::sonata::detail::Implementation<
#if defined(HAVE_MPI)
#ifdef SONATA_REPORT_HAVE_MPI
bbp::sonata::detail::ParallelImplementation
#else
bbp::sonata::detail::SerialImplementation
Expand Down
2 changes: 1 addition & 1 deletion src/library/sonatareport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ double SonataReport::atomic_step_ = 1e-8;
double SonataReport::min_steps_to_record_ = 0.0;
bool SonataReport::first_report = true;
int SonataReport::rank_ = 0;
#ifdef HAVE_MPI
#ifdef SONATA_REPORT_HAVE_MPI
MPI_Comm SonataReport::has_nodes_ = MPI_COMM_WORLD;
SonataReport::communicators_t SonataReport::communicators_;
#endif
Expand Down
6 changes: 3 additions & 3 deletions src/library/sonatareport.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include <unordered_map>
#include <vector>

#ifdef HAVE_MPI
#ifdef SONATA_REPORT_HAVE_MPI
#include <mpi.h>
#endif

Expand All @@ -19,13 +19,13 @@ namespace sonata {
class SonataReport
{
using reports_t = std::unordered_map<std::string, std::shared_ptr<Report>>;
#ifdef HAVE_MPI
#ifdef SONATA_REPORT_HAVE_MPI
using communicators_t = std::unordered_map<std::string, MPI_Comm>;
#endif
public:
static double atomic_step_;
static double min_steps_to_record_;
#ifdef HAVE_MPI
#ifdef SONATA_REPORT_HAVE_MPI
static MPI_Comm has_nodes_;
static communicators_t communicators_;
#endif
Expand Down
112 changes: 112 additions & 0 deletions src/utils/imeutil.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#include "imeutil.h"
#include <limits.h>
#ifdef SONATA_REPORT_CHECK_IME
#include <sys/vfs.h>
#endif

#define IME_PREFIX "ime://"
#define IME_CONF_ENV "IM_CLIENT_CFG_FILE"
#define IME_CONF_PATH "/etc/ddn/ime/ime.conf"
#define IME_CONF_FUSE_PATH "/etc/ddn/ime/ime-fuse.conf"
#define FUSE_SUPER_MAGIC 0x65735546 // https://man7.org/linux/man-pages/man2/fstatfs.2.html

using namespace bbp::sonata;

#ifdef SONATA_REPORT_CHECK_IME
/**
* Parses the IME config. files to determine the BFS and FUSE mount points.
*/
std::pair<std::string, std::string> getIMEMountPoints() {
const char* env = getenv(IME_CONF_ENV);
const char* conf_path = (env != NULL) ? env : IME_CONF_PATH;

const auto parseFile =
[](std::ifstream ifs, const std::string keyword, const char sep) -> std::string {
if (ifs.is_open()) {
std::string line;
while (std::getline(ifs, line)) {
// Look for mount point setting, and ensure it is uncommented
size_t offset = line.find(keyword);
if (offset != std::string::npos && line.find('#') > offset) {
offset = line.find("/", offset);
return line.substr(offset, line.find(sep, offset) - offset);
}
}
}
return ":Error:-1:";
};

return std::pair<std::string, std::string>(
parseFile(std::ifstream(conf_path), "mount_point", ';'),
parseFile(std::ifstream(IME_CONF_FUSE_PATH), "MNTDIR", '\''));
}

/**
* Verifies that a given path is under an active FUSE mount point.
*/
bool isFUSEMountPoint(const std::string& path) {
struct statfs st;
return (statfs(path.c_str(), &st) == 0 && st.f_type == FUSE_SUPER_MAGIC);
}
#endif

std::pair<fstype_t, std::string> IMEUtil::getPathInfo(std::string path) {
#ifdef SONATA_REPORT_CHECK_IME
// If the path begins with "ime:", assume native access
if (path.find("ime:") == 0) {
return std::pair<fstype_t, std::string>(FSTYPE_IME, path);
}

// Resolve the full path and return an error if not possible (e.g., file,
// original parent folder, or both do not exist yet)
if (path[0] != '/') {
const auto limit = path.find_last_of('/');
auto path_orig = (limit != std::string::npos) ? path.substr(0, limit) : ".";
char full_path[PATH_MAX];
if (realpath(path_orig.c_str(), full_path) == NULL) {
return std::pair<fstype_t, std::string>(FSTYPE_UNKNOWN, path);
}
path = std::string(full_path) + "/" + path.substr((limit != std::string::npos) ? limit : 0);
}

// Check if the path contains the IME keyword
if (path.find("/ime/") != std::string::npos) {
// Parse config. files and verify FUSE mount point only once for performance
static auto mnt_paths = getIMEMountPoints();
static bool ime_fuse_active = isFUSEMountPoint(mnt_paths.second);

// Check if the path contains the BFS mount point
if (path.find(mnt_paths.first) == 0) {
return std::pair<fstype_t, std::string>(FSTYPE_IME, IME_PREFIX + path);
}

// Lastly, evaluate if the path is under a FUSE mount point
if (ime_fuse_active && path.find(mnt_paths.second) == 0) {
const off_t offset = mnt_paths.second.size();
path = IME_PREFIX + mnt_paths.first + path.substr(offset);
return std::pair<fstype_t, std::string>(FSTYPE_IME, path);
}
}
#endif
// At this point, assume a traditional file system
return std::pair<fstype_t, std::string>(FSTYPE_DEFAULT, path);
}

#ifdef SONATA_REPORT_HAVE_MPI
int IMEUtil::setMPIHints(MPI_Info& info) {
int status = MPI_SUCCESS;

// Create the MPI Info objects, if needed
if (info == MPI_INFO_NULL) {
status = MPI_Info_create(&info);
}

// Set the hints to disable two-phase I/O and data sieving
if (status == MPI_SUCCESS) {
status = MPI_Info_set(info, "romio_cb_write", "disable");
status |= MPI_Info_set(info, "romio_ds_write", "disable");
}

return status;
}
#endif
48 changes: 48 additions & 0 deletions src/utils/imeutil.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#pragma once

#include <fstream>
#ifdef SONATA_REPORT_HAVE_MPI
#include <mpi.h>
#endif

namespace bbp {
namespace sonata {

/**
* Enum that defines the type of backend filesystem from a path.
*/
typedef enum {
FSTYPE_DEFAULT = 0x6f510ca1, // Path on the BFS / local storage
FSTYPE_IME = 0x13e00000, // Path with "ime:" prefix
FSTYPE_UNKNOWN = 0xffffffff, // Error (e.g., file not found)
} fstype_t;

/**
* Helper class that defines utilities to make better use of IME.
*/
class IMEUtil
{
public:
/**
* Determines the type of backend filesystem from a given path, and
* provides the optimal IME path to use with libraries (e.g., MPI-IO).
* \param path Path to the file or folder.
* \return Filesystem type and optimal path for IME.
*/
static std::pair<fstype_t, std::string> getPathInfo(std::string path);

#ifdef SONATA_REPORT_HAVE_MPI
/**
* Defines the MPI Hints necessary to use IME efficiently with MPI-IO.
* \param info MPI Info object to be created / updated.
* \return MPI_SUCCESS if successful, or an MPI-based error.
*/
static int setMPIHints(MPI_Info& info);
#endif
private:
IMEUtil() {}
~IMEUtil() {}
};

} // namespace sonata
} // namespace bbp
6 changes: 3 additions & 3 deletions tests/integration/integration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <thread>
#include <vector>

#ifdef HAVE_MPI
#ifdef SONATA_REPORT_HAVE_MPI
#include <mpi.h>
#endif

Expand Down Expand Up @@ -119,7 +119,7 @@ int main() {
logger->set_level(spdlog::level::trace);
int global_rank = 0;
int global_size = 1;
#ifdef HAVE_MPI
#ifdef SONATA_REPORT_HAVE_MPI
MPI_Init(nullptr, nullptr);
MPI_Comm_rank(MPI_COMM_WORLD, &global_rank);
MPI_Comm_size(MPI_COMM_WORLD, &global_size);
Expand Down Expand Up @@ -203,7 +203,7 @@ int main() {
}


#ifdef HAVE_MPI
#ifdef SONATA_REPORT_HAVE_MPI
MPI_Finalize();
#endif
return 0;
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/integration_test.sh.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

export OMP_NUM_THREADS=1

echo @HAVE_MPI@
if [ -z "@HAVE_MPI@" ]; then
echo @SONATA_REPORT_HAVE_MPI@
if [ -z "@SONATA_REPORT_HAVE_MPI@" ]; then
@CMAKE_CURRENT_BINARY_DIR@/reports_integration_test
ref_soma=@CMAKE_CURRENT_SOURCE_DIR@/soma_report_serial.ref
ref_compartment=@CMAKE_CURRENT_SOURCE_DIR@/compartment_report_serial.ref
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_sonatadata.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#include <bbp/sonata/reports.h>
#include <catch2/catch.hpp>
#include <data/sonata_data.h>
#include <iostream>
#include <memory>
#include <data/sonata_data.h>
#ifdef HAVE_MPI
#ifdef SONATA_REPORT_HAVE_MPI
#include <mpi.h>
#endif

Expand Down
Loading

0 comments on commit 91a2b51

Please sign in to comment.