Metall  v0.28
A persistent memory allocator for data-centric analytics
metall_mpi_adaptor.hpp
Go to the documentation of this file.
1 // Copyright 2020 Lawrence Livermore National Security, LLC and other Metall
2 // Project Developers. See the top-level COPYRIGHT file for details.
3 //
4 // SPDX-License-Identifier: (Apache-2.0 OR MIT)
5 
6 #ifndef METALL_UTILITY_METALL_MPI_ADAPTOR_HPP
7 #define METALL_UTILITY_METALL_MPI_ADAPTOR_HPP
8 
9 #include <sstream>
10 
11 #include <metall/metall.hpp>
12 #include <metall/detail/file.hpp>
13 #include <metall/utility/mpi.hpp>
15 
16 namespace metall::utility {
17 
18 namespace {
20 }
21 
25  public:
26  // -------------------- //
27  // Public types and static values
28  // -------------------- //
31 
32  // -------------------- //
33  // Constructor & assign operator
34  // -------------------- //
39  const MPI_Comm &comm = MPI_COMM_WORLD)
40  : m_mpi_comm(comm),
41  m_root_dir_prefix(root_dir_prefix),
42  m_local_metall_manager(nullptr) {
43  if (!priv_verify_num_partitions(root_dir_prefix, comm)) {
44  ::MPI_Abort(comm, -1);
45  }
46  m_local_metall_manager = std::make_unique<manager_type>(
47  metall::open_only,
48  ds::make_local_dir_path(m_root_dir_prefix, priv_mpi_comm_rank(comm))
49  .c_str());
50  }
51 
56  const std::string &root_dir_prefix,
57  const MPI_Comm &comm = MPI_COMM_WORLD)
58  : m_mpi_comm(comm),
59  m_root_dir_prefix(root_dir_prefix),
60  m_local_metall_manager(nullptr) {
61  if (!priv_verify_num_partitions(root_dir_prefix, comm)) {
62  ::MPI_Abort(comm, -1);
63  }
64  m_local_metall_manager = std::make_unique<manager_type>(
65  metall::open_read_only,
66  ds::make_local_dir_path(m_root_dir_prefix, priv_mpi_comm_rank(comm))
67  .c_str());
68  }
69 
78  const MPI_Comm &comm = MPI_COMM_WORLD,
79  bool overwrite = false)
80  : m_mpi_comm(comm),
81  m_root_dir_prefix(root_dir_prefix),
82  m_local_metall_manager(nullptr) {
83  priv_setup_root_dir(root_dir_prefix, overwrite, comm);
84  m_local_metall_manager = std::make_unique<manager_type>(
85  metall::create_only,
86  ds::make_local_dir_path(m_root_dir_prefix, priv_mpi_comm_rank(comm))
87  .c_str());
88  }
89 
99  const std::size_t capacity,
100  const MPI_Comm &comm = MPI_COMM_WORLD,
101  bool overwrite = false)
102  : m_mpi_comm(comm),
103  m_root_dir_prefix(root_dir_prefix),
104  m_local_metall_manager(nullptr) {
105  priv_setup_root_dir(root_dir_prefix, overwrite, comm);
106  m_local_metall_manager = std::make_unique<manager_type>(
107  metall::create_only,
108  ds::make_local_dir_path(m_root_dir_prefix, priv_mpi_comm_rank(comm))
109  .c_str(),
110  capacity);
111  }
112 
116  m_local_metall_manager.reset(nullptr);
117  priv_mpi_barrier(m_mpi_comm);
118  }
119 
120  // -------------------- //
121  // Public methods
122  // -------------------- //
123 
126  manager_type &get_local_manager() { return *m_local_metall_manager; }
127 
131  return *m_local_metall_manager;
132  }
133 
137  return ds::make_root_dir_path(m_root_dir_prefix);
138  }
139 
143  return ds::make_local_dir_path(ds::make_root_dir_path(m_root_dir_prefix),
144  priv_mpi_comm_rank(m_mpi_comm));
145  }
146 
151  static std::string local_dir_path(const std::string &root_dir_prefix,
152  const int mpi_rank) {
153  return ds::make_local_dir_path(root_dir_prefix, mpi_rank);
154  }
155 
166  static bool copy(const std::string &source_dir_path,
167  const std::string &destination_dir_path,
168  const MPI_Comm &comm = MPI_COMM_WORLD,
169  bool overwrite = false) {
170  if (!consistent(source_dir_path, comm)) {
171  if (priv_mpi_comm_rank(comm) == 0) {
172  std::stringstream ss;
173  ss << "Source directory is not consistnt (may not have closed properly "
174  "or may still be open): "
175  << source_dir_path;
176  logger::out(logger::level::error, __FILE__, __LINE__, ss.str().c_str());
177  }
178  return false;
179  }
180  priv_setup_root_dir(destination_dir_path, overwrite, comm);
181  const int rank = priv_mpi_comm_rank(comm);
182  return priv_global_and(
184  ds::make_local_dir_path(source_dir_path, rank).c_str(),
185  ds::make_local_dir_path(destination_dir_path, rank).c_str()),
186  comm);
187  }
188 
196  bool snapshot(const std::string &destination_dir_path,
197  bool overwrite = false) {
198  priv_setup_root_dir(destination_dir_path, overwrite, m_mpi_comm);
199  const int rank = priv_mpi_comm_rank(m_mpi_comm);
200  return priv_global_and(
201  m_local_metall_manager->snapshot(
202  ds::make_local_dir_path(destination_dir_path, rank).c_str()),
203  m_mpi_comm);
204  }
205 
212  static bool remove(const std::string &root_dir_prefix,
213  const MPI_Comm &comm = MPI_COMM_WORLD) {
214  const int rank = priv_mpi_comm_rank(comm);
215  const int size = priv_mpi_comm_size(comm);
216 
217  if (!metall::mtlldetail::file_exist(
218  ds::make_root_dir_path(root_dir_prefix))) {
219  // As long as the root directory does not exist, we consider it as a
220  // success.
221  return true;
222  }
223 
224  // ----- Check if this is a Metall datastore ----- //
225  bool metall_dir = true;
226  if (!metall::mtlldetail::file_exist(
227  ds::make_root_dir_path(root_dir_prefix) + "/" +
228  k_datastore_mark_file_name)) {
229  metall_dir = false;
230  }
231  if (!priv_global_and(metall_dir, comm)) {
232  if (rank == 0) {
233  std::string s("This is not a Metall datastore: " +
234  ds::make_root_dir_path(root_dir_prefix));
235  logger::out(logger::level::error, __FILE__, __LINE__, s.c_str());
236  }
237  return false;
238  }
239  if (!priv_verify_num_partitions(root_dir_prefix, comm)) {
240  return false;
241  }
242 
243  // ----- Remove directories ----- //
244  bool ret = true;
245  for (int i = 0; i < size; ++i) {
246  if (i == rank) {
247  if (metall::mtlldetail::file_exist(
248  ds::make_root_dir_path(root_dir_prefix)) &&
249  !metall::mtlldetail::remove_file(
250  ds::make_root_dir_path(root_dir_prefix))) {
251  std::string s("Failed to remove directory: " +
252  ds::make_root_dir_path(root_dir_prefix));
253  logger::out(logger::level::error, __FILE__, __LINE__, s.c_str());
254  ret = false;
255  }
256  }
257  priv_mpi_barrier(comm);
258  }
259 
260  return priv_global_and(ret, comm);
261  }
262 
267  static int partitions(const std::string &root_dir_prefix,
268  const MPI_Comm &comm = MPI_COMM_WORLD) {
269  return priv_read_partition_size(root_dir_prefix, comm);
270  }
271 
277  static bool consistent(const std::string &root_dir_prefix,
278  const MPI_Comm &comm = MPI_COMM_WORLD) {
279  const int rank = priv_mpi_comm_rank(comm);
280  const auto local_path = ds::make_local_dir_path(root_dir_prefix, rank);
281  const auto ret = manager_type::consistent(local_path.c_str());
282  return priv_global_and(ret, comm);
283  }
284 
285  private:
286  static constexpr const char *k_datastore_mark_file_name =
287  "metall_mpi_datastore";
288  static constexpr const char *k_partition_size_file_name =
289  "metall_mpi_adaptor_partition_size";
290 
291  // -------------------- //
292  // Private methods
293  // -------------------- //
294  static void priv_remove_for_overwrite(const std::string &root_dir_prefix,
295  const MPI_Comm &comm) {
296  if (!remove(root_dir_prefix, comm)) {
297  if (priv_mpi_comm_rank(comm) == 0) {
298  std::stringstream ss;
299  ss << "Failed to overwrite " << root_dir_prefix;
300  logger::out(logger::level::error, __FILE__, __LINE__, ss.str().c_str());
301  ::MPI_Abort(comm, -1);
302  }
303  }
304  }
305 
306  static void priv_setup_root_dir(const std::string &root_dir_prefix,
307  bool overwrite, const MPI_Comm &comm) {
308  const int rank = priv_mpi_comm_rank(comm);
309  const int size = priv_mpi_comm_size(comm);
310  const std::string root_dir_path = ds::make_root_dir_path(root_dir_prefix);
311 
312  if (overwrite) {
313  priv_remove_for_overwrite(root_dir_prefix, comm);
314  }
315 
316  // Make sure the root directory and a file with the same name do not exist
317  const auto local_ret = metall::mtlldetail::file_exist(root_dir_path);
318  if (priv_global_or(local_ret, comm)) {
319  if (rank == 0) {
320  std::string s(
321  "Root directory (or a file with the same name) already exists: " +
322  root_dir_path);
323  logger::out(logger::level::error, __FILE__, __LINE__, s.c_str());
324  ::MPI_Abort(comm, -1);
325  }
326  }
327  priv_mpi_barrier(comm);
328 
329  for (int i = 0; i < size; ++i) {
330  if (i == rank && !metall::mtlldetail::directory_exist(root_dir_path)) {
331  if (!metall::mtlldetail::create_directory(root_dir_path)) {
332  std::string s("Failed to create directory: " + root_dir_path);
333  logger::out(logger::level::error, __FILE__, __LINE__, s.c_str());
334  ::MPI_Abort(comm, -1);
335  }
336 
337  const std::string mark_file =
338  root_dir_path + "/" + k_datastore_mark_file_name;
339  if (!metall::mtlldetail::create_file(mark_file)) {
340  std::string s("Failed to create file: " + mark_file);
341  logger::out(logger::level::error, __FILE__, __LINE__, s.c_str());
342  ::MPI_Abort(comm, -1);
343  }
344 
345  priv_store_partition_size(root_dir_prefix, comm);
346  }
347  priv_mpi_barrier(comm);
348  }
349  }
350 
351  static void priv_store_partition_size(const std::string &root_dir_prefix,
352  const MPI_Comm &comm) {
353  const int size = priv_mpi_comm_size(comm);
354  const std::string path = ds::make_root_dir_path(root_dir_prefix) + "/" +
355  k_partition_size_file_name;
356 
357  std::ofstream ofs(path);
358  if (!ofs) {
359  std::string s("Failed to create a file: " + path);
360  logger::out(logger::level::error, __FILE__, __LINE__, s.c_str());
361  ::MPI_Abort(comm, -1);
362  }
363  ofs << size;
364  if (!ofs) {
365  std::string s("Failed to write data to a file: " + path);
366  logger::out(logger::level::error, __FILE__, __LINE__, s.c_str());
367  ::MPI_Abort(comm, -1);
368  }
369  ofs.close();
370  }
371 
372  static int priv_read_partition_size(const std::string &root_dir_prefix,
373  const MPI_Comm &comm) {
374  const std::string path = ds::make_root_dir_path(root_dir_prefix) + "/" +
375  k_partition_size_file_name;
376  std::ifstream ifs(path);
377  if (!ifs) {
378  std::string s("Failed to open a file: " + path);
379  logger::out(logger::level::error, __FILE__, __LINE__, s.c_str());
380  ::MPI_Abort(comm, -1);
381  }
382  int read_size = -1;
383  if (!(ifs >> read_size)) {
384  std::string s("Failed to read data from: " + path);
385  logger::out(logger::level::error, __FILE__, __LINE__, s.c_str());
386  ::MPI_Abort(comm, -1);
387  }
388 
389  return read_size;
390  }
391 
392  static bool priv_verify_num_partitions(const std::string &root_dir_prefix,
393  const MPI_Comm &comm) {
394  const int rank = priv_mpi_comm_rank(comm);
395  const int size = priv_mpi_comm_size(comm);
396 
397  bool correct_mpi_size = true;
398  if (rank == 0) {
399  const int read_size = priv_read_partition_size(root_dir_prefix, comm);
400  if (read_size != size) {
401  correct_mpi_size = false;
402  std::stringstream ss;
403  ss << "Invalid number of MPI processes (provided " << size << ", "
404  << "expected " << read_size << ")";
405  logger::out(logger::level::error, __FILE__, __LINE__, ss.str().c_str());
406  }
407  }
408  return priv_global_and(correct_mpi_size, comm);
409  }
410 
411  static int priv_mpi_comm_rank(const MPI_Comm &comm) {
412  const int rank = mpi::comm_rank(comm);
413  if (rank == -1) {
414  ::MPI_Abort(comm, -1);
415  }
416  return rank;
417  }
418 
419  static int priv_mpi_comm_size(const MPI_Comm &comm) {
420  const int size = mpi::comm_size(comm);
421  if (size == -1) {
422  ::MPI_Abort(comm, -1);
423  }
424  return size;
425  }
426 
427  static void priv_mpi_barrier(const MPI_Comm &comm) {
428  if (!mpi::barrier(comm)) {
429  ::MPI_Abort(comm, -1);
430  }
431  }
432 
433  static bool priv_global_and(const bool local_result, const MPI_Comm &comm) {
434  const auto ret = mpi::global_logical_and(local_result, comm);
435  if (!ret.first) {
436  ::MPI_Abort(comm, -1);
437  }
438  return ret.second;
439  }
440 
441  static bool priv_global_or(const bool local_result, const MPI_Comm &comm) {
442  const auto ret = mpi::global_logical_or(local_result, comm);
443  if (!ret.first) {
444  ::MPI_Abort(comm, -1);
445  }
446  return ret.second;
447  }
448 
449  static int priv_determine_local_root_rank(const MPI_Comm &comm) {
450  const int rank = mpi::determine_local_root(comm);
451  if (rank == -1) {
452  logger::out(logger::level::error, __FILE__, __LINE__,
453  "Failed at determining a local root rank");
454  ::MPI_Abort(comm, -1);
455  }
456  return rank;
457  }
458 
459  MPI_Comm m_mpi_comm;
460  std::string m_root_dir_prefix;
461  std::unique_ptr<manager_type> m_local_metall_manager;
462 };
463 
464 } // namespace metall::utility
465 
466 #endif // METALL_UTILITY_METALL_MPI_ADAPTOR_HPP
A generalized Metall manager class.
Definition: basic_manager.hpp:40
static bool copy(const path_type &source_path, const path_type &destination_path, const bool clone=true, const int num_max_copy_threads=0) noexcept
Copies data store synchronously. The behavior of copying a data store that is open without the read-o...
Definition: basic_manager.hpp:1020
static bool consistent(const path_type &path) noexcept
Check if a data store exists and is consistent (i.e., it was closed properly in the previous run).
Definition: basic_manager.hpp:1106
static void out(const level lvl, const char *const file_name, const int line_no, const char *const message) noexcept
Log a message.
Definition: logger.hpp:35
@ error
Error logger message.
A utility class for using Metall with MPI This is an experimental implementation.
Definition: metall_mpi_adaptor.hpp:24
manager_type & get_local_manager()
Returns the Metall manager object of the process.
Definition: metall_mpi_adaptor.hpp:126
static bool copy(const std::string &source_dir_path, const std::string &destination_dir_path, const MPI_Comm &comm=MPI_COMM_WORLD, bool overwrite=false)
Copies a Metall datastore to another location. The behavior of copying a data store that is open with...
Definition: metall_mpi_adaptor.hpp:166
metall_mpi_adaptor(metall::create_only_t, const std::string &root_dir_prefix, const std::size_t capacity, const MPI_Comm &comm=MPI_COMM_WORLD, bool overwrite=false)
Creates a new Metall datastore.
Definition: metall_mpi_adaptor.hpp:98
static std::string local_dir_path(const std::string &root_dir_prefix, const int mpi_rank)
Returns the path of a Metall datastore of a MPI rank.
Definition: metall_mpi_adaptor.hpp:151
metall_mpi_adaptor(metall::create_only_t, const std::string &root_dir_prefix, const MPI_Comm &comm=MPI_COMM_WORLD, bool overwrite=false)
Creates a new Metall datastore.
Definition: metall_mpi_adaptor.hpp:77
metall_mpi_adaptor(metall::open_only_t, const std::string &root_dir_prefix, const MPI_Comm &comm=MPI_COMM_WORLD)
Opens an existing Metall datastore.
Definition: metall_mpi_adaptor.hpp:38
std::string local_dir_path() const
Returns the path of the sub-Metall datastore of the process.
Definition: metall_mpi_adaptor.hpp:142
static bool consistent(const std::string &root_dir_prefix, const MPI_Comm &comm=MPI_COMM_WORLD)
Checks if all local datastores are consistent.
Definition: metall_mpi_adaptor.hpp:277
std::string root_dir_path() const
Returns the root path of a Metall datastore.
Definition: metall_mpi_adaptor.hpp:136
bool snapshot(const std::string &destination_dir_path, bool overwrite=false)
Take a snapshot of the current Metall datastore to another location.
Definition: metall_mpi_adaptor.hpp:196
static int partitions(const std::string &root_dir_prefix, const MPI_Comm &comm=MPI_COMM_WORLD)
Returns the number of partition of a Metall datastore.
Definition: metall_mpi_adaptor.hpp:267
metall_mpi_adaptor(metall::open_read_only_t, const std::string &root_dir_prefix, const MPI_Comm &comm=MPI_COMM_WORLD)
Opens an existing Metall datastore with the read-only mode.
Definition: metall_mpi_adaptor.hpp:55
~metall_mpi_adaptor()
Destructor that globally synchronizes the close operations of all sub-Metall datastores.
Definition: metall_mpi_adaptor.hpp:115
static bool remove(const std::string &root_dir_prefix, const MPI_Comm &comm=MPI_COMM_WORLD)
Removes Metall datastore.
Definition: metall_mpi_adaptor.hpp:212
const manager_type & get_local_manager() const
Returns the Metall manager object of the process.
Definition: metall_mpi_adaptor.hpp:130
basic_string< char > string
A string container that uses char as its character type and Metall as its default allocator.
Definition: string.hpp:23
Namespace for MPI datastore.
std::string make_root_dir_path(const std::string &root_dir_prefix)
Makes a path of a root directory. The MPI ranks that share the same storage space create their data s...
Definition: metall_mpi_datastore.hpp:19
std::string make_local_dir_path(const std::string &root_dir_prefix, const int rank)
Makes the data store path of a MPI rank.
Definition: metall_mpi_datastore.hpp:29
bool barrier(const MPI_Comm &comm)
Definition: mpi.hpp:43
int comm_rank(const MPI_Comm &comm)
Definition: mpi.hpp:23
int comm_size(const MPI_Comm &comm)
Definition: mpi.hpp:33
int determine_local_root(const MPI_Comm &comm)
Determines a local root rank.
Definition: mpi.hpp:84
std::pair< bool, bool > global_logical_or(const bool local_value, const MPI_Comm &comm)
Performs the logical 'or' operation.
Definition: mpi.hpp:71
std::pair< bool, bool > global_logical_and(const bool local_value, const MPI_Comm &comm)
Performs the logical 'and' operation.
Definition: mpi.hpp:56
Namespace for utility items.
basic_manager<> manager
Default Metall manager class which is an alias of basic_manager with the default template parameters.
Definition: metall.hpp:34
Tag type to create the segment always. The existing segment with the same name is over written.
Definition: tags.hpp:15
Tag type to open an already created segment.
Definition: tags.hpp:22
Tag type to open an already created segment as read only.
Definition: tags.hpp:28