52 #ifndef HAVOQGT_MPI_MAILBOX_ROUTED_HPP_INCLUDED
53 #define HAVOQGT_MPI_MAILBOX_ROUTED_HPP_INCLUDED
61 #include <boost/unordered_map.hpp>
66 #include <boost/tuple/tuple.hpp>
68 namespace havoqgt {
namespace mpi {
74 template <
typename TMsg>
96 procs_per_node = std::min(size,uint32_t(24));
97 uint64_t rank_to_print = 2;
138 static_cast<routed_msg_type*
>(
m_ptr)[
m_size] = _msg;
142 routed_msg_type&
operator[](
size_t i) {
return static_cast<routed_msg_type*
>(
m_ptr)[i]; }
190 void* irecv_buff = NULL;
191 int ret = posix_memalign(&irecv_buff, 32,
194 perror(
"posix_memalign-irecv"); exit(-1);
226 std::cout <<
"****************** Mailbox Statistics ********************" << std::endl;
227 std::cout <<
"routed message size = " <<
sizeof(TMsg) << std::endl;
228 std::cout <<
"mpi_send_counter = " << g_mpi_send_counter << std::endl;
229 std::cout <<
"tree_send_counter = " << g_tree_send_counter << std::endl;
230 std::cout <<
"route_counter = " << g_route_counter << std::endl;
231 std::cout <<
"send_counter = " << g_send_counter << std::endl;
232 std::cout <<
"recv_counter = " << g_recv_counter << std::endl;
233 std::cout <<
"Average send size = " << double(g_send_counter + g_route_counter) / double(g_mpi_send_counter) << std::endl;
234 std::cout <<
"***********************************************************" << std::endl;
243 routed_msg_type msg(raw_dest, _raw_msg);
287 template <
typename OutputIterator>
288 void bcast(TMsg _raw_msg, OutputIterator _oitr) {
289 assert(_raw_msg.get_bcast());
307 assert(_msg.get_bcast());
310 _msg.set_dest(target);
329 template <
typename OutputIterator>
330 void send(
int raw_dest,
const TMsg& _raw_msg, OutputIterator _oitr,
bool fast=
true) {
332 assert(raw_dest >= 0 && raw_dest <
m_mpi_size);
336 routed_msg_type msg = _raw_msg;
363 template <
typename OutputIterator >
364 void receive(OutputIterator _oitr,
bool aggregsive=
false) {
372 MPI_Request* request_ptr = &(pair_req.first);
374 CHK_MPI( MPI_Test( request_ptr, &flag, &status) );
376 routed_msg_type* recv_ptr =
static_cast<routed_msg_type*
> (
379 CHK_MPI( MPI_Get_count(&status, MPI_BYTE, &count) );
381 if(recv_ptr[i].dest() == uint32_t(
m_mpi_rank) ) {
382 *_oitr = recv_ptr[i];
385 }
else if(recv_ptr[i].get_bcast()) {
387 }
else if(recv_ptr[i].is_intercept()) {
388 if( _oitr.intercept(recv_ptr[i]) ) {
427 int ret = posix_memalign(&buff, 32,
430 perror(
"posix_memalign"); exit(-1);
447 bool was_first_pending =
false;
450 was_first_pending =
true;
457 boost::tuple<MPI_Request, void*,std::list<size_t>::iterator> isend_req_tuple;
458 MPI_Request* request_ptr = &(isend_req_tuple.get<0>());
464 CHK_MPI( MPI_Isend( buffer_ptr, size_in_bytes, MPI_BYTE, dest,
476 if(!was_first_pending) {
482 bool to_return =
false;
487 CHK_MPI( MPI_Test( request_ptr, &flag, MPI_STATUS_IGNORE) );
512 std::pair<MPI_Request, void*> irecv_req;
513 irecv_req.second = _buff;
514 MPI_Request* request_ptr = &(irecv_req.first);
516 CHK_MPI( MPI_Irecv( _buff, num_bytes, MPI_BYTE, MPI_ANY_SOURCE,
567 #endif //HAVOQGT_MPI_MAILBOX_ROUTED_HPP_INCLUDED
std::vector< std::list< boost::tuple< MPI_Request, void *, std::list< size_t >::iterator > > > m_list_isend_request_per_rank
void send_tree_parent(const TMsg &_raw_msg)
msg_buffer allocate_msg_buffer()
void receive(OutputIterator _oitr, bool aggregsive=false)
std::vector< uint32_t > m_bcast_proxies
std::list< size_t > m_list_isends
void post_isend(int index)
const std::vector< uint32_t > & bcast_proxies() const
routed_msg_type & operator[](size_t i)
std::list< std::pair< MPI_Request, void * > > m_list_irecv_request
size_t m_pending_partial_buffers
size_t push_back(const routed_msg_type &_msg)
uint32_t proxy_rank(uint32_t dest)
uint64_t m_tree_send_counter
uint64_t m_mpi_send_counter
void flush_buffers_if_idle()
uint32_t mailbox_num_irecv() const
mailbox_routed(MPI_Comm _mpi_comm, int _mpi_tag)
void bcast(TMsg _raw_msg, OutputIterator _oitr)
std::vector< uint32_t > m_bcast_targets
MPI_Comm m_mpi_comm
MPI configuration.
void free_msg_buffer(void *_ptr)
uint32_t mailbox_aggregation() const
void send_tree_children(const TMsg &_raw_msg)
T mpi_all_reduce(T in_d, Op in_op, MPI_Comm mpi_comm)
uint64_t my_node_base_rank
void check_for_starvation()
std::vector< msg_buffer > m_buffer_per_rank
void bcast_to_targets(TMsg _msg)
void route_fast_path(uint32_t dest, const routed_msg_type &_msg)
void cleanup_pending_isend_requests(bool force_aggressive=false)
void send(int raw_dest, const TMsg &_raw_msg, OutputIterator _oitr, bool fast=true)
old_environment & get_environment()
bool cleanup_pending_isend_requests_index(size_t index)
void send_tree_fast(int raw_dest, const TMsg &_raw_msg)
const std::vector< uint32_t > & bcast_targets() const
std::vector< std::list< size_t >::iterator > m_pending_iterator_per_rank
size_t m_num_pending_isend
std::list< size_t > m_list_pending
std::vector< void * > m_vec_free_buffers
twod_router(uint32_t rank, uint32_t size)
void post_new_irecv(void *_buff)