6 #ifndef METALL_UTILITY_METALL_MPI_ADAPTOR_HPP
7 #define METALL_UTILITY_METALL_MPI_ADAPTOR_HPP
12 #include <metall/detail/file.hpp>
39 const MPI_Comm &comm = MPI_COMM_WORLD)
41 m_root_dir_prefix(root_dir_prefix),
42 m_local_metall_manager(nullptr) {
43 if (!priv_verify_num_partitions(root_dir_prefix, comm)) {
44 ::MPI_Abort(comm, -1);
46 m_local_metall_manager = std::make_unique<manager_type>(
57 const MPI_Comm &comm = MPI_COMM_WORLD)
59 m_root_dir_prefix(root_dir_prefix),
60 m_local_metall_manager(nullptr) {
61 if (!priv_verify_num_partitions(root_dir_prefix, comm)) {
62 ::MPI_Abort(comm, -1);
64 m_local_metall_manager = std::make_unique<manager_type>(
65 metall::open_read_only,
78 const MPI_Comm &comm = MPI_COMM_WORLD,
79 bool overwrite =
false)
81 m_root_dir_prefix(root_dir_prefix),
82 m_local_metall_manager(nullptr) {
83 priv_setup_root_dir(root_dir_prefix, overwrite, comm);
84 m_local_metall_manager = std::make_unique<manager_type>(
99 const std::size_t capacity,
100 const MPI_Comm &comm = MPI_COMM_WORLD,
101 bool overwrite =
false)
103 m_root_dir_prefix(root_dir_prefix),
104 m_local_metall_manager(nullptr) {
105 priv_setup_root_dir(root_dir_prefix, overwrite, comm);
106 m_local_metall_manager = std::make_unique<manager_type>(
116 m_local_metall_manager.reset(
nullptr);
117 priv_mpi_barrier(m_mpi_comm);
131 return *m_local_metall_manager;
144 priv_mpi_comm_rank(m_mpi_comm));
152 const int mpi_rank) {
168 const MPI_Comm &comm = MPI_COMM_WORLD,
169 bool overwrite =
false) {
171 if (priv_mpi_comm_rank(comm) == 0) {
172 std::stringstream ss;
173 ss <<
"Source directory is not consistnt (may not have closed properly "
174 "or may still be open): "
180 priv_setup_root_dir(destination_dir_path, overwrite, comm);
181 const int rank = priv_mpi_comm_rank(comm);
182 return priv_global_and(
197 bool overwrite =
false) {
198 priv_setup_root_dir(destination_dir_path, overwrite, m_mpi_comm);
199 const int rank = priv_mpi_comm_rank(m_mpi_comm);
200 return priv_global_and(
201 m_local_metall_manager->snapshot(
213 const MPI_Comm &comm = MPI_COMM_WORLD) {
214 const int rank = priv_mpi_comm_rank(comm);
215 const int size = priv_mpi_comm_size(comm);
217 if (!metall::mtlldetail::file_exist(
225 bool metall_dir =
true;
226 if (!metall::mtlldetail::file_exist(
228 k_datastore_mark_file_name)) {
231 if (!priv_global_and(metall_dir, comm)) {
239 if (!priv_verify_num_partitions(root_dir_prefix, comm)) {
245 for (
int i = 0; i < size; ++i) {
247 if (metall::mtlldetail::file_exist(
249 !metall::mtlldetail::remove_file(
257 priv_mpi_barrier(comm);
260 return priv_global_and(ret, comm);
268 const MPI_Comm &comm = MPI_COMM_WORLD) {
269 return priv_read_partition_size(root_dir_prefix, comm);
278 const MPI_Comm &comm = MPI_COMM_WORLD) {
279 const int rank = priv_mpi_comm_rank(comm);
282 return priv_global_and(ret, comm);
286 static constexpr
const char *k_datastore_mark_file_name =
287 "metall_mpi_datastore";
288 static constexpr
const char *k_partition_size_file_name =
289 "metall_mpi_adaptor_partition_size";
294 static void priv_remove_for_overwrite(
const std::string &root_dir_prefix,
295 const MPI_Comm &comm) {
296 if (!
remove(root_dir_prefix, comm)) {
297 if (priv_mpi_comm_rank(comm) == 0) {
298 std::stringstream ss;
299 ss <<
"Failed to overwrite " << root_dir_prefix;
301 ::MPI_Abort(comm, -1);
306 static void priv_setup_root_dir(
const std::string &root_dir_prefix,
307 bool overwrite,
const MPI_Comm &comm) {
308 const int rank = priv_mpi_comm_rank(comm);
309 const int size = priv_mpi_comm_size(comm);
313 priv_remove_for_overwrite(root_dir_prefix, comm);
317 const auto local_ret = metall::mtlldetail::file_exist(
root_dir_path);
318 if (priv_global_or(local_ret, comm)) {
321 "Root directory (or a file with the same name) already exists: " +
324 ::MPI_Abort(comm, -1);
327 priv_mpi_barrier(comm);
329 for (
int i = 0; i < size; ++i) {
330 if (i == rank && !metall::mtlldetail::directory_exist(
root_dir_path)) {
334 ::MPI_Abort(comm, -1);
339 if (!metall::mtlldetail::create_file(mark_file)) {
340 std::string s(
"Failed to create file: " + mark_file);
342 ::MPI_Abort(comm, -1);
345 priv_store_partition_size(root_dir_prefix, comm);
347 priv_mpi_barrier(comm);
351 static void priv_store_partition_size(
const std::string &root_dir_prefix,
352 const MPI_Comm &comm) {
353 const int size = priv_mpi_comm_size(comm);
355 k_partition_size_file_name;
357 std::ofstream ofs(path);
361 ::MPI_Abort(comm, -1);
365 std::string s(
"Failed to write data to a file: " + path);
367 ::MPI_Abort(comm, -1);
372 static int priv_read_partition_size(
const std::string &root_dir_prefix,
373 const MPI_Comm &comm) {
375 k_partition_size_file_name;
376 std::ifstream ifs(path);
380 ::MPI_Abort(comm, -1);
383 if (!(ifs >> read_size)) {
384 std::string s(
"Failed to read data from: " + path);
386 ::MPI_Abort(comm, -1);
392 static bool priv_verify_num_partitions(
const std::string &root_dir_prefix,
393 const MPI_Comm &comm) {
394 const int rank = priv_mpi_comm_rank(comm);
395 const int size = priv_mpi_comm_size(comm);
397 bool correct_mpi_size =
true;
399 const int read_size = priv_read_partition_size(root_dir_prefix, comm);
400 if (read_size != size) {
401 correct_mpi_size =
false;
402 std::stringstream ss;
403 ss <<
"Invalid number of MPI processes (provided " << size <<
", "
404 <<
"expected " << read_size <<
")";
408 return priv_global_and(correct_mpi_size, comm);
411 static int priv_mpi_comm_rank(
const MPI_Comm &comm) {
414 ::MPI_Abort(comm, -1);
419 static int priv_mpi_comm_size(
const MPI_Comm &comm) {
422 ::MPI_Abort(comm, -1);
427 static void priv_mpi_barrier(
const MPI_Comm &comm) {
429 ::MPI_Abort(comm, -1);
433 static bool priv_global_and(
const bool local_result,
const MPI_Comm &comm) {
436 ::MPI_Abort(comm, -1);
441 static bool priv_global_or(
const bool local_result,
const MPI_Comm &comm) {
444 ::MPI_Abort(comm, -1);
449 static int priv_determine_local_root_rank(
const MPI_Comm &comm) {
453 "Failed at determining a local root rank");
454 ::MPI_Abort(comm, -1);
461 std::unique_ptr<manager_type> m_local_metall_manager;