00001 ////////////////////////////////////////////////////////////////////////////////////////////////// 00002 // Copyright (c) 2010, Lawrence Livermore National Security, LLC. 00003 // Produced at the Lawrence Livermore National Laboratory 00004 // LLNL-CODE-433662 00005 // All rights reserved. 00006 // 00007 // This file is part of Muster. For details, see http://github.com/tgamblin/muster. 00008 // Please also read the LICENSE file for further information. 00009 // 00010 // Redistribution and use in source and binary forms, with or without modification, are 00011 // permitted provided that the following conditions are met: 00012 // 00013 // * Redistributions of source code must retain the above copyright notice, this list of 00014 // conditions and the disclaimer below. 00015 // * Redistributions in binary form must reproduce the above copyright notice, this list of 00016 // conditions and the disclaimer (as noted below) in the documentation and/or other materials 00017 // provided with the distribution. 00018 // * Neither the name of the LLNS/LLNL nor the names of its contributors may be used to endorse 00019 // or promote products derived from this software without specific prior written permission. 00020 // 00021 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS 00022 // OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 00023 // MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL 00024 // LAWRENCE LIVERMORE NATIONAL SECURITY, LLC, THE U.S. DEPARTMENT OF ENERGY OR CONTRIBUTORS BE 00025 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 00026 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 00027 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 00028 // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00029 // ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00030 ////////////////////////////////////////////////////////////////////////////////////////////////// 00031 00032 /// 00033 /// @file multi_gather.h 00034 /// @author Todd Gamblin tgamblin@llnl.gov 00035 /// @brief Asynchronous, some-to-some gather operation used by parallel clustering algorithms 00036 /// to simultaneously send members of sample sets to a set of distributed worker processes. 00037 /// 00038 #ifndef MULTI_GATHER_H 00039 #define MULTI_GATHER_H 00040 00041 #include <mpi.h> 00042 #include <vector> 00043 #include <iostream> 00044 #include "mpi_bindings.h" 00045 #include <algorithm> 00046 00047 namespace cluster { 00048 00049 /// 00050 /// Asynchronous, some-to-some gather operation used by parallel clustering algorithms 00051 /// to simultaneously send members of sample sets to a set of distributed worker processes. 00052 /// 00053 /// @tparam T Type of objects to be transferred by this multi_gather. 00054 /// Must support the following operations: 00055 /// - <code>int packed_size(MPI_Comm comm) const</code> 00056 /// - <code>void pack(void *buf, int bufsize, int *position, MPI_Comm comm) const</code> 00057 /// - <code>static T unpack(void *buf, int bufsize, int *position, MPI_Comm comm)</code> 00058 /// 00059 /// @see par_kmedoids::run_pam_trials(), which uses this class. 00060 /// 00061 template <class T> 00062 class multi_gather { 00063 /// internal struct for buffering sends and recvs. 00064 struct buffer { 00065 int size; ///< buffer for size of Isend or Irecv 00066 char *buf; ///< buffer for data to be sent/recv'd 00067 std::vector<T> *dest; ///< vector to push unpacked data onto 00068 00069 /// constructor for receive buffers 00070 buffer(std::vector<T>& _dest) 00071 : size(0), buf(NULL), dest(&_dest) { } 00072 00073 /// constructor for send buffers 00074 buffer(int _size) 00075 : size(_size), buf(new char[_size]), dest(NULL) { } 00076 00077 /// Destructor 00078 ~buffer() { 00079 if (buf) delete [] buf; 00080 } 00081 00082 /// Turn a send buffer into a receive buffer (for local "sends") 00083 void set_destination(std::vector<T>& _dest) { dest = &_dest; } 00084 00085 /// Whether buffer has been allocated. 00086 bool is_allocated() { return buf; } 00087 00088 /// Allocates a buffer of <size> chars, to be called after size is received. 00089 void allocate() { buf = new char[size]; } 00090 00091 /// This is a buffer for a send if true. It's a buffer for a recv if false. 00092 bool is_send() { return !dest; } 00093 }; 00094 00095 MPI_Comm comm; ///< Communicator on which gather takes place 00096 int tag; ///< tag for communication in multi_gathers. 00097 00098 std::vector<MPI_Request> reqs; ///< Oustanding requests to be completed. 00099 std::vector<buffer*> buffers; ///< Send and receive buffers for packed data in gathers. 00100 size_t unfinished_reqs; ///< Number of still outstanding requests 00101 00102 public: 00103 /// 00104 /// Construct a mult_gather on a communicator. MPI communication will use 00105 /// the specified tag. 00106 /// 00107 multi_gather(MPI_Comm _comm, int _tag=0) 00108 : comm(_comm), tag(_tag), unfinished_reqs(0) { } 00109 00110 /// 00111 /// Starts initial send and receive requests for this gather. Must be followed up with a call to finish(). 00112 /// 00113 template <class ObjIterator, class RankIterator> 00114 void start(ObjIterator begin_obj, ObjIterator end_obj, 00115 RankIterator begin_src, RankIterator end_src, 00116 std::vector<T>& dest, int root) 00117 { 00118 int size, rank; 00119 CMPI_Comm_size(comm, &size); 00120 CMPI_Comm_rank(comm, &rank); 00121 00122 // stop if this rank isn't a member of the gather. 00123 if (rank != root && find(begin_src, end_src, rank) == end_src) return; 00124 00125 // determine size of local data. 00126 int packed_size = cmpi_packed_size(1, MPI_SIZE_T, comm); // num objects 00127 for (ObjIterator o=begin_obj; o != end_obj; o++) { // size of each object 00128 packed_size += o->packed_size(comm); 00129 } 00130 00131 buffer *send_buffer = new buffer(packed_size); 00132 if (rank != root) { 00133 buffers.push_back(NULL); // no separate buffer for the size. 00134 reqs.push_back(MPI_REQUEST_NULL); 00135 CMPI_Isend(&send_buffer->size, 1, MPI_INT, root, tag, comm, &reqs.back()); 00136 unfinished_reqs++; 00137 } 00138 00139 // pack up local data into the buffer 00140 int pos = 0; 00141 size_t num_objects = distance(begin_obj, end_obj); 00142 CMPI_Pack(&num_objects, 1, MPI_SIZE_T, send_buffer->buf, send_buffer->size, &pos, comm); 00143 for (ObjIterator o=begin_obj; o != end_obj; o++) { 00144 o->pack(send_buffer->buf, packed_size, &pos, comm); 00145 } 00146 00147 if (rank != root) { 00148 // send packed data along to destination. 00149 buffers.push_back(send_buffer); // buffer data during send 00150 reqs.push_back(MPI_REQUEST_NULL); 00151 CMPI_Isend(send_buffer->buf, packed_size, MPI_PACKED, root, tag, comm, &reqs.back()); 00152 unfinished_reqs++; 00153 00154 } else { // rank is root; do receives instead 00155 // initiate all the receives for sizes 00156 for (RankIterator src=begin_src; src != end_src; src++) { 00157 if (*src == root) { 00158 // for the root, just insert the local packed buffer onto the array of buffers. 00159 // don't increment unfinished reqs here b/c local buffer is already "done." 00160 send_buffer->set_destination(dest); 00161 buffers.push_back(send_buffer); 00162 reqs.push_back(MPI_REQUEST_NULL); 00163 00164 } else { 00165 // make some buffer space for the receive, record its eventual destination 00166 buffers.push_back(new buffer(dest)); 00167 reqs.push_back(MPI_REQUEST_NULL); 00168 CMPI_Irecv(&buffers.back()->size, 1, MPI_INT, *src, tag, comm, &reqs.back()); 00169 unfinished_reqs++; 00170 } 00171 } 00172 } 00173 } 00174 00175 /// 00176 /// Starts a gather with one object instead of a range of objects. 00177 /// 00178 template <class RankIterator> 00179 void start(const T& obj, RankIterator begin_src, RankIterator end_src, std::vector<T>& dest, int root) { 00180 start(&obj, (&obj) + 1, begin_src, end_src, dest, root); 00181 } 00182 00183 void finish() { 00184 int rank; 00185 CMPI_Comm_rank(MPI_COMM_WORLD, &rank); 00186 00187 while (unfinished_reqs) { 00188 int outcount; 00189 std::vector<int> indices(reqs.size()); 00190 std::vector<MPI_Status> status(reqs.size()); 00191 00192 CMPI_Waitsome(reqs.size(), &reqs[0], &outcount, &indices[0], &status[0]); 00193 for (int o=0; o < outcount; o++) { 00194 const int r = indices[o]; // index of received object. 00195 00196 if (buffers[r] && !buffers[r]->is_send() && !buffers[r]->is_allocated()) { 00197 // buffers[r] is a recv and we just received packed size. Allocate space and recv data. 00198 int src = status[o].MPI_SOURCE; 00199 buffers[r]->allocate(); 00200 CMPI_Irecv(buffers[r]->buf, buffers[r]->size, MPI_PACKED, src, tag, comm, &reqs[r]); 00201 00202 } else { 00203 // buffers[r] is a send, or it's a receive and we just received full packed data. 00204 // in either case, the buffer is done, so decrement the number of unfinished reqs. 00205 unfinished_reqs--; 00206 } 00207 } 00208 } 00209 00210 // Unpack all the received buffers into their destination vectors. This preserves order 00211 // as unpacked data are only pushed onto the backs of destination vectors *after* everything 00212 // is received. Buffers are still received in any order above, though. 00213 for (size_t i=0; i < buffers.size(); i++) { 00214 if (!buffers[i]) continue; 00215 00216 if (!buffers[i]->is_send()) { 00217 int pos = 0; 00218 size_t num_objects; 00219 00220 CMPI_Unpack(buffers[i]->buf, buffers[i]->size, &pos, &num_objects, 1, MPI_SIZE_T, comm); 00221 for (size_t o=0; o < num_objects; o++) { 00222 buffers[i]->dest->push_back(T::unpack(buffers[i]->buf, buffers[i]->size, &pos, comm)); 00223 } 00224 } 00225 delete buffers[i]; 00226 } 00227 00228 // clear these out before the next call to start() 00229 buffers.clear(); 00230 reqs.clear(); 00231 } 00232 00233 }; // class multi_gather 00234 00235 } // namespace cluster 00236 00237 00238 #endif // MULTI_GATHER_H 00239