HavoqGT
delegate_partitioned_graph.ipp
Go to the documentation of this file.
1 
2 /*
3  * Copyright (c) 2013, Lawrence Livermore National Security, LLC.
4  * Produced at the Lawrence Livermore National Laboratory.
5  * Re-written by Steven Feldman <feldman12@llnl.gov>.
6  * LLNL-CODE-644630.
7  * All rights reserved.
8  *
9  * This file is part of HavoqGT, Version 0.1.
10  * For details, see https://computation.llnl.gov/casc/dcca-pub/dcca/Downloads.html
11  *
12  * Please also read this link – Our Notice and GNU Lesser General Public License.
13  * http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html
14  *
15  * This program is free software; you can redistribute it and/or modify it under
16  * the terms of the GNU Lesser General Public License (as published by the Free
17  * Software Foundation) version 2.1 dated February 1999.
18  *
19  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
20  * WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or FITNESS FOR A
21  * PARTICULAR PURPOSE. See the terms and conditions of the GNU General Public
22  * License for more details.
23  *
24  * You should have received a copy of the GNU Lesser General Public License along
25  * with this program; if not, write to the Free Software Foundation, Inc.,
26  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27  *
28  * OUR NOTICE AND TERMS AND CONDITIONS OF THE GNU GENERAL PUBLIC LICENSE
29  *
30  * Our Preamble Notice
31  *
32  * A. This notice is required to be provided under our contract with the
33  * U.S. Department of Energy (DOE). This work was produced at the Lawrence
34  * Livermore National Laboratory under Contract No. DE-AC52-07NA27344 with the DOE.
35  *
36  * B. Neither the United States Government nor Lawrence Livermore National
37  * Security, LLC nor any of their employees, makes any warranty, express or
38  * implied, or assumes any liability or responsibility for the accuracy,
39  * completeness, or usefulness of any information, apparatus, product, or process
40  * disclosed, or represents that its use would not infringe privately-owned rights.
41  *
42  * C. Also, reference herein to any specific commercial products, process, or
43  * services by trade name, trademark, manufacturer or otherwise does not
44  * necessarily constitute or imply its endorsement, recommendation, or favoring by
45  * the United States Government or Lawrence Livermore National Security, LLC. The
46  * views and opinions of authors expressed herein do not necessarily state or
47  * reflect those of the United States Government or Lawrence Livermore National
48  * Security, LLC, and shall not be used for advertising or product endorsement
49  * purposes.
50  * \file
51  * Implementation of delegate_partitioned_graph and internal classes.
52  */
53 
54 
55 #ifndef HAVOQGT_MPI_IMPL_DELEGATE_PARTITIONED_GRAPH_IPP_INCLUDED
56 #define HAVOQGT_MPI_IMPL_DELEGATE_PARTITIONED_GRAPH_IPP_INCLUDED
57 
58 namespace havoqgt {
59 namespace mpi {
68 template <typename SegmentManager>
69 template <typename Container>
72  MPI_Comm mpi_comm,
73  Container& edges, uint64_t max_vertex,
74  uint64_t delegate_degree_threshold,
75  ConstructionState stop_after)
76  : m_mpi_comm(mpi_comm),
77  m_global_edge_count(edges.size()),
78  m_local_outgoing_count(seg_allocator),
79  m_local_incoming_count(seg_allocator),
80  m_owned_info(seg_allocator),
81  m_owned_info_tracker(seg_allocator),
82  // m_owned_targets(seg_allocator),
83  m_delegate_degree_threshold(delegate_degree_threshold),
84  m_delegate_info(seg_allocator),
85  m_delegate_degree(seg_allocator),
86  m_delegate_label(seg_allocator),
87  // m_delegate_targets(seg_allocator),
88  //m_map_delegate_locator(100, boost::hash<uint64_t>(),
89  // std::equal_to<uint64_t>(), seg_allocator),
90  m_map_delegate_locator(seg_allocator),
91  m_controller_locators(seg_allocator) {
92 
93  CHK_MPI( MPI_Comm_size(m_mpi_comm, &m_mpi_size) );
94  CHK_MPI( MPI_Comm_rank(m_mpi_comm, &m_mpi_rank) );
95 
97  node_partitions = std::min(4,processes_per_node);
98  edge_chunk_size = 1024*8;
99 
100  m_global_max_vertex = max_vertex;
101  m_max_vertex = (std::ceil(double(max_vertex) / double(m_mpi_size)));
102 
103  LogStep logstep_main("Delegate Partitioning", m_mpi_comm, m_mpi_rank);
104 
108 
109  {
110  LogStep logstep("Allocating 4 Arrays of length max local vertex.",
112  m_owned_info.resize(m_max_vertex+2, vert_info(false, 0, 0));
113  m_owned_info_tracker.resize(m_max_vertex+2, 0);
116  MPI_Barrier(m_mpi_comm);
117 
118  }
119 
120  boost::unordered_set<uint64_t> global_hubs;
121 
122  {
123  LogStep logstep("count_edge_degree", m_mpi_comm, m_mpi_rank);
124  count_edge_degrees(edges.begin(), edges.end(), global_hubs,
125  delegate_degree_threshold);
126  if (m_mpi_rank == 0)
127  std::cout << "\tNumber of Delegates: " << global_hubs.size() << std::endl;
128  MPI_Barrier(m_mpi_comm);
129  }
130 
131  {
132  LogStep logstep("initialize_low_meta_data", m_mpi_comm, m_mpi_rank);
133  initialize_low_meta_data(global_hubs);
134  MPI_Barrier(m_mpi_comm);
135  }
136 
137  {
138  LogStep logstep("initialize_high_meta_data", m_mpi_comm, m_mpi_rank);
139  initialize_high_meta_data(global_hubs);
140  MPI_Barrier(m_mpi_comm);
141  }
142 
143  {
144  LogStep logstep("count_high_degree_edges", m_mpi_comm, m_mpi_rank);
145  count_high_degree_edges(edges.begin(), edges.end(), global_hubs);
146  MPI_Barrier(m_mpi_comm);
147  }
148 
150  if (m_graph_state != stop_after) {
151  complete_construction(seg_allocator, mpi_comm, edges);
152  }
153 }
154 
158 template <typename SegmentManager>
159 template <typename Container>
160 void
163  MPI_Comm mpi_comm, Container& edges) {
164  m_mpi_comm = mpi_comm;
165  int temp_mpi_rank, temp_mpi_size;
166  CHK_MPI( MPI_Comm_size(m_mpi_comm, &temp_mpi_size) );
167  CHK_MPI( MPI_Comm_rank(m_mpi_comm, &temp_mpi_rank) );
168  assert(temp_mpi_rank == m_mpi_rank);
169  assert(temp_mpi_size == temp_mpi_size);
170 
171  std::map< uint64_t, std::deque<OverflowSendInfo> > transfer_info;
172  switch (m_graph_state) {
173  case MetaDataGenerated:
174  {
175  LogStep logstep("calculate_overflow", m_mpi_comm, m_mpi_rank);
176  calculate_overflow(transfer_info);
177  MPI_Barrier(m_mpi_comm);
178  }
179 
180  {
181  LogStep logstep("initialize_edge_storage", m_mpi_comm, m_mpi_rank);
182  initialize_edge_storage(seg_allocator);
183  MPI_Barrier(m_mpi_comm);
184  }
185 
186  m_graph_state = EdgeStorageAllocated;
187 
191  case EdgeStorageAllocated:
192  {
193  LogStep logstep("partition_low_degree", m_mpi_comm, m_mpi_rank);
194  partition_low_degree(edges);
195  MPI_Barrier(m_mpi_comm);
196  }
197  m_graph_state = LowEdgesPartitioned;
198 
199  {
200  LogStep logstep("partition_high_degree", m_mpi_comm, m_mpi_rank);
201  partition_high_degree(edges, transfer_info);
202  MPI_Barrier(m_mpi_comm);
203  }
204  m_graph_state = HighEdgesPartitioned;
205 
209  // all-reduce hub degree
210  case HighEdgesPartitioned:
211  {
212  LogStep logstep("all-reduce hub degree", m_mpi_comm, m_mpi_rank);
213  if(m_delegate_degree.size() > 0) {
214  mpi_all_reduce_inplace(m_delegate_degree, std::plus<uint64_t>(), m_mpi_comm);
215  }
216  MPI_Barrier(m_mpi_comm);
217  }
218  assert(m_delegate_degree.size() == m_delegate_label.size());
219 
220  //
221  // Build controller lists
222  {
223  LogStep logstep("Build controller lists", m_mpi_comm, m_mpi_rank);
224  const int controllers = (m_delegate_degree.size() / m_mpi_size) + 1;
225  m_controller_locators.reserve(controllers);
226  for (size_t i=0; i < m_delegate_degree.size(); ++i) {
227  if (int(i % m_mpi_size) == m_mpi_rank) {
228  m_controller_locators.push_back(vertex_locator(true, i, m_mpi_rank));
229  }
230  }
231  MPI_Barrier(m_mpi_comm);
232  }
233 
234  //
235  // Verify CSR integration properlly tagged owned delegates
236  {
237  LogStep logstep("Verify CSR integration", m_mpi_comm, m_mpi_rank);
238  for (auto itr = m_map_delegate_locator.begin();
239  itr != m_map_delegate_locator.end(); ++itr) {
240  uint64_t label = itr->first;
241  vertex_locator locator = itr->second;
242 
243  uint64_t local_id = label / uint64_t(m_mpi_size);
244  if (label % uint64_t(m_mpi_size) == uint64_t(m_mpi_rank)) {
245  assert(m_owned_info[local_id].is_delegate == 1);
246  assert(m_owned_info[local_id].delegate_id == locator.local_id());
247  }
248  }
249  MPI_Barrier(m_mpi_comm);
250  }
251  m_graph_state = GraphReady;
252 
253  } // switch
257 };
258 
276 template <typename SegmentManager>
277 template <typename InputIterator>
278 void
280 count_edge_degrees(InputIterator unsorted_itr, InputIterator unsorted_itr_end,
281  boost::unordered_set<uint64_t>& global_hubs,
282  uint64_t delegate_degree_threshold) {
283  using boost::container::map;
284 
285  uint64_t high_vertex_count(0);
286 
287  uint64_t loop_counter = 0;
288  uint64_t edge_counter = 0;
289 
290  double start_time, last_loop_time;
291  start_time = last_loop_time = MPI_Wtime();
292 
293 
294  // Loop until no processor is producing edges
295  while (!detail::global_iterator_range_empty(unsorted_itr,
296  unsorted_itr_end, m_mpi_comm)) {
297  if (m_mpi_rank == 0 && (loop_counter% 1000) == 0) {
298  double curr_time = MPI_Wtime();
299 
300  std::cout << "\t["
301  << "Total Loops: " << loop_counter << ", "
302  << "Total Edges: " << edge_counter
303  << "] Time " << (curr_time - last_loop_time) << " second, "
304  << " Total Time " << (curr_time - start_time) << " second, "
305  << "Dirty Pages: " << get_dirty_pages() << "kb." << std::endl
306  << std::flush;
307 
308 
309  last_loop_time = curr_time;
310  }
311  loop_counter++;
312 
313  std::vector<
314  boost::container::map< int, std::pair<uint64_t, uint64_t> >
315  > maps_to_send(m_mpi_size);
316  int maps_to_send_element_count = 0;
317 
318  // Generate Enough information to send
319  for (size_t i = 0; i < edge_chunk_size && unsorted_itr != unsorted_itr_end; i++) {
320  edge_counter++;
321 
322  // Update this vertex's outgoing edge count (first member of the pair)
323  uint64_t local_id = local_source_id(m_mpi_size)(*unsorted_itr);
324  int owner = owner_source_id(m_mpi_size)(*unsorted_itr);
325  if (owner == m_mpi_rank) {
326  m_local_outgoing_count[local_id]++;
327  if (m_local_outgoing_count[local_id] == delegate_degree_threshold) {
328  high_vertex_count++;
329  }
330  } else {
331  maps_to_send.at(owner)[local_id].first++;
332  }
333 
334  // Update the vertex's incoming edge count (second member of the pair)
335  local_id = local_dest_id(m_mpi_size)(*unsorted_itr);
336  owner = owner_dest_id(m_mpi_size)(*unsorted_itr);
337  if (owner == m_mpi_rank) {
338  m_local_incoming_count[local_id]++;
339  // if (m_local_incoming_count[local_id] == delegate_degree_threshold) {
340  // high_vertex_count++;
341  // }
342  } else {
343  int c = maps_to_send.at(owner)[local_id].second++;
344  if (c == 0) {
345  maps_to_send_element_count++;
346  }
347  }
348 
349  unsorted_itr++;
350  } // for until threshold is reached
351 
352 
353  //Send Vertex degree count information to other nodes.
354  send_vertex_info(high_vertex_count, delegate_degree_threshold,
355  maps_to_send, maps_to_send_element_count);
356 
357  } // while more edges
358  mpi_yield_barrier(m_mpi_comm);
359  if (m_mpi_rank == 0 ) {
360  double curr_time = MPI_Wtime();
361  loop_counter++;
362  std::cout << "\t["
363  << "Total Loops: " << loop_counter << ", "
364  << "Total Edges: " << edge_counter
365  << "] Time " << (curr_time - last_loop_time) << " second, "
366  << " Total Time " << (curr_time - start_time) << " second, "
367  << "Dirty Pages: " << get_dirty_pages() << "kb." << std::endl
368  << std::flush;
369  }
370  mpi_yield_barrier(m_mpi_comm);
371 
372 
373  // Now, the m_local_incoming_count contains the total incoming and outgoing
374  // edges for each vertex owned by this node.
375  // Using this information we identify the hubs.
376  std::vector<uint64_t> temp_hubs;
377  temp_hubs.reserve(high_vertex_count);
378  for (size_t i = 0; i < m_local_incoming_count.size(); i++) {
379  const uint64_t outgoing = m_local_outgoing_count[i];
380  // const uint64_t incoming = m_local_incoming_count[i];
381 
382  if (outgoing >= delegate_degree_threshold) {
383  const uint64_t global_id = (i * m_mpi_size) + m_mpi_rank;
384  assert(global_id != 0);
385  temp_hubs.push_back(global_id);
386  } else {
387  m_edges_low_count += m_local_outgoing_count[i];
388  }
389  }
390 
391  assert(temp_hubs.size() == high_vertex_count);
392 
393  // Gather the hub lists and add them to the map,
394  std::vector<uint64_t> vec_global_hubs;
395  mpi_yield_barrier(m_mpi_comm);
396  mpi_all_gather(temp_hubs, vec_global_hubs, m_mpi_comm);
397  // if (m_mpi_rank == 0) {
398  // std::cout << "[";
399  // for (size_t i = 0; i < vec_global_hubs.size(); i++) {
400  // std::cout << vec_global_hubs[i] << ",";
401  // }
402  // std::cout << "]" << std::endl;
403 
404  // }
405  // Insert gathered global hubs to set
406  global_hubs.insert(vec_global_hubs.begin(), vec_global_hubs.end());
407 
408 
409 } // count_edge_degrees
410 
423 template <typename SegmentManager>
424 void
426 send_vertex_info(uint64_t& high_vertex_count, uint64_t delegate_degree_threshold,
427  std::vector< boost::container::map< int, std::pair<uint64_t, uint64_t> > >&
428  maps_to_send, int maps_to_send_element_count) {
429 
430 
431  int to_send_pos = 0;
432  std::vector<uint64_t> to_send(maps_to_send_element_count*3, 0);
433  std::vector<int> to_send_count(m_mpi_size, 0);
434 
435  assert(maps_to_send.size() == m_mpi_size);
436  for (size_t i = 0; i < maps_to_send.size(); i++) {
437  for (auto itr = maps_to_send[i].begin(); itr != maps_to_send[i].end(); itr++) {
438  assert(to_send_pos < to_send.size());
439  std::pair<int, std::pair<uint64_t, uint64_t>> triple = (*itr);
440  to_send[to_send_pos++] = uint64_t(triple.first);
441  to_send[to_send_pos++] = triple.second.first;
442  to_send[to_send_pos++] = triple.second.second;
443  }
444  to_send_count[i] = maps_to_send[i].size()*3;
445  }
446 
447  std::vector<uint64_t> to_recv;
448  std::vector<int> out_recvcnts;
449 
450  mpi_yield_barrier(m_mpi_comm);
451  mpi_all_to_all(to_send, to_send_count,to_recv, out_recvcnts, m_mpi_comm);
452 
453  for (size_t k = 0; k < to_recv.size(); ) {
454  const uint64_t local_id = to_recv[k++];
455  const uint64_t source_count = to_recv[k++];
456  const uint64_t dest_count = to_recv[k++];
457  assert(local_id < m_local_incoming_count.size());
458 
459  // If its not currently a high vertex but by adding this it becomes one
460  // then increment high_vertex_count
461  // if (m_local_incoming_count[local_id] < delegate_degree_threshold
462  // && m_local_incoming_count[local_id] + dest_count >=
463  // delegate_degree_threshold) {
464 
465  // high_vertex_count++;
466  // }
467  if (m_local_outgoing_count[local_id] < delegate_degree_threshold
468  && m_local_outgoing_count[local_id] + dest_count >=
469  delegate_degree_threshold) {
470 
471  high_vertex_count++;
472  }
473 
474  m_local_outgoing_count[local_id] += source_count;
475  m_local_incoming_count[local_id] += dest_count;
476  } // for each recieved element.
477 
478 } // send_vertex_info
479 
480 
481 
490 template <typename SegmentManager>
491 void
493 initialize_low_meta_data(boost::unordered_set<uint64_t>& global_hubs) {
494 
495 
496  // Allocate the index into the low edge csr.
497  // +2: because the m_max_vertex is indexible and the last position must hold
498  // the number of low edges.
499  // m_owned_info.resize(m_max_vertex+2, vert_info(false, 0, 0)); moved to
500  // constructor
501 
502 
503  // Initilize the m_owned_info, by iterating through owned vertexes and
504  // if it is not a hub, then it incremenets the edge count by the number of
505  // outgoing edges.
506  uint64_t edge_count = 0;
507  for (uint64_t vert_id = 0; vert_id < m_owned_info.size(); vert_id++) {
508  const uint64_t outgoing = m_local_outgoing_count[vert_id];
509  const uint64_t incoming = m_local_incoming_count[vert_id];
510 
511  m_owned_info[vert_id] = vert_info(false, 0, edge_count);
512 
513  if (outgoing < m_delegate_degree_threshold) {
514  edge_count += outgoing;
515  } else {
516  #ifdef DEBUG_DPG
517  const uint64_t global_id = (vert_id * m_mpi_size) + m_mpi_rank;
518  assert(global_id != 0);
519  if (global_id < m_max_vertex) {
520  // IF vert_id == size-1 then the above will be true
521  // And this assert will hit incorrectly
522  assert(global_hubs.count(global_id) != 0);
523  }
524  #endif
525  }
526  } // for over m_owned_info
527 }
528 
537 template <typename SegmentManager>
538 void
540 initialize_high_meta_data(boost::unordered_set<uint64_t>& global_hubs) {
541  //
542  // Setup and Compute Hub information
543  //
544  for (int i = 0; i < processes_per_node; i++) {
545  if (m_mpi_rank == 0) {
546  std::cout << "\t Setup and Compute Hub information: " << i << "/"
547  << processes_per_node << "." << std::endl << std::flush;
548  }
549  if (i == m_mpi_rank % processes_per_node) {
550  std::vector<uint64_t> vec_sorted_hubs(global_hubs.begin(), global_hubs.end());
551  std::sort(vec_sorted_hubs.begin(), vec_sorted_hubs.end());
552 
553  // Allocate space for the delegate csr index.
554  // This is initlized during the paritioning of the low edges and then adjusted
555  // in initialize_delegate_target
556 
557  #if 1
558  m_delegate_info.resize(global_hubs.size()+1);
559  m_delegate_degree.resize(vec_sorted_hubs.size());
560 
561  #else
562 
563  {
564  size_t loop_limit = global_hubs.size()+1;
565  m_delegate_info.reserve(loop_limit);
566  for (size_t j = 0; j < loop_limit; ) {
567  for (size_t k = 0; k < 10000000; k++) {
568  m_delegate_info.emplace_back();
569  j++;
570  if (j >= loop_limit) {
571  break;
572  }
573  }
574  }
575  }
576 
577  // Allocates and initilize the delegate (AKA hub) vertex infromation
578  {
579  size_t loop_limit = vec_sorted_hubs.size();
580  m_delegate_degree.reserve(loop_limit);
581  for (size_t j = 0; j < loop_limit; ) {
582  for (size_t k = 0; k < 10000000; k++) {
583  m_delegate_degree.emplace_back();
584  j++;
585  if (j >= loop_limit) {
586  break;
587  }
588  }
589  }
590  }
591  #endif
592 
593  // Loop over the hub vertexes:
594  // initilizing the delegate_degree tracking structures
595  for(size_t i=0; i<vec_sorted_hubs.size(); ++i) {
596  uint64_t t_local_id = vec_sorted_hubs[i] / uint64_t(m_mpi_size);
597  int t_owner = uint32_t(vec_sorted_hubs[i] % uint32_t(m_mpi_size));
598  vertex_locator new_ver_loc(true, i, t_owner);
599 
600  m_map_delegate_locator[vec_sorted_hubs[i]] = new_ver_loc;
601  m_delegate_label.push_back(vec_sorted_hubs[i]);
602 
603  //
604  // Tag owned delegates
605  //
606  if (t_owner == m_mpi_rank) {
607  m_owned_info[t_local_id].is_delegate = 1;
608  m_owned_info[t_local_id].delegate_id = i;
609  }
610  } // for over vec_sorted_hubs
611 
612  } // if i == mpi_rank % processes_per_node
613  mpi_yield_barrier(m_mpi_comm);
614  }
615 }
616 
617 template <typename SegmentManager>
618 void
621  // Allocate the low edge csr to accommdate the number of edges
622  // This will be filled by the partion_low_edge function
623  for (int i = 0; i < processes_per_node; i++) {
624  if (m_mpi_rank == 0) {
625  std::cout << "\tResizing m_owned_targets and m_delegate_targets: "
626  << i << "/" << processes_per_node << "." << std::endl << std::flush;
627  }
628 
629  if (i == m_mpi_rank % processes_per_node) {
630  // m_owned_targets.resize(m_edges_low_count);
631  // m_delegate_targets.resize(m_edges_high_count);
632  m_delegate_targets_size = m_edges_high_count;
633  m_owned_targets_size = m_edges_low_count;
634 
635  SegmentManager *segment_manager = seg_allocator.get_segment_manager();
636 
637  {
638  //void * temp = segment_manager->allocate(
639  // m_delegate_targets_size * sizeof(vertex_locator));
640  //m_delegate_targets = reinterpret_cast<vertex_locator *>(temp);
641  m_delegate_targets = (vertex_locator*) segment_manager->allocate(m_delegate_targets_size * sizeof(vertex_locator));
642  }
643 
644  {
645  //void * temp = segment_manager->allocate(
646  // m_owned_targets_size * sizeof(vertex_locator));
647  // m_owned_targets = reinterpret_cast<vertex_locator *>(temp);
648  m_owned_targets = (vertex_locator*) segment_manager->allocate(m_owned_targets_size * sizeof(vertex_locator));
649  }
650 
651  // Currently, m_delegate_info holds the count of high degree edges
652  // assigned to this node for each vertex.
653  // Below converts it into an index into the m_delegate_targets array
654  assert( m_delegate_info[ m_delegate_info.size()-1] == 0);
655  uint64_t edge_count = 0;
656  for (size_t i=0; i < m_delegate_info.size(); i++) {
657  uint64_t num_edges = m_delegate_info[i];
658  m_delegate_info[i] = edge_count;
659  edge_count += num_edges;
660  assert(edge_count <= m_edges_high_count);
661  }
662 
663  } // If this processes
664  mpi_yield_barrier(m_mpi_comm);
665  } // for over processes
666 
667 };
668 
669 
670 
679 template <typename SegmentManager>
680 template <typename Container>
681 void
683 partition_low_degree(Container& unsorted_edges) {
684 
685  uint64_t loop_counter = 0;
686  uint64_t edge_counter = 0;
687 
688  double start_time, last_loop_time, last_part_time;
689  start_time = last_loop_time = last_part_time = MPI_Wtime();
690 
691 
692  for (size_t node_turn = 0; node_turn < node_partitions; node_turn++) {
693 
694  auto unsorted_itr = unsorted_edges.begin();
695  auto unsorted_itr_end = unsorted_edges.end();
696  if (m_mpi_rank == 0) {
697  double curr_time = MPI_Wtime();
698 
699  std::cout << "\t***[LP]["
700  << "Partition Number: " << node_turn << "/" << node_partitions << ", "
701  << "Total Loops: " << loop_counter << ", "
702  << "Total Edges: " << edge_counter
703  << "] Time " << (curr_time - last_part_time) << " second, "
704  << " Total Time " << (curr_time - start_time) << " second, "
705  << "Dirty Pages: " << get_dirty_pages() << "kb." << std::endl
706  << std::flush;
707  last_part_time = curr_time;
708  }
709  MPI_Barrier(m_mpi_comm);
710 
711 
712  while (!detail::global_iterator_range_empty(unsorted_itr, unsorted_itr_end,
713  m_mpi_comm)) {
714  if (m_mpi_rank == 0 && (loop_counter% 500) == 0) {
715  double curr_time = MPI_Wtime();
716 
717  std::cout << "\t[LP]["
718  << "Partition Number: " << node_turn << "/" << node_partitions << ", "
719  << "Total Loops: " << loop_counter << ", "
720  << "Total Edges: " << edge_counter
721  << "] Time " << (curr_time - last_loop_time) << " second, "
722  << " Total Time " << (curr_time - start_time) << " second, "
723  << "Dirty Pages: " << get_dirty_pages() << "kb." << std::endl
724  << std::flush;
725 
726 
727  last_loop_time = curr_time;
728  }
729  loop_counter++;
730 
731  // Generate Edges to Send
732  std::vector<std::pair<uint64_t, uint64_t> > to_recv_edges_low;
733 
734  {
735  std::vector<std::pair<uint64_t, uint64_t> > to_send_edges_low;
736  to_send_edges_low.reserve(edge_chunk_size);
737 
738  for (size_t i = 0;
739  unsorted_itr != unsorted_itr_end && i < edge_chunk_size;
740  ++unsorted_itr) {
741  // Get next edge
742  const auto edge = *unsorted_itr;
743 
744  {
745  const int owner = unsorted_itr->first %m_mpi_size;
746  if ( (owner % processes_per_node) % node_partitions != node_turn ) {
747  continue;
748  }
749  }
750  edge_counter++;
751 
752  if (m_map_delegate_locator.count(unsorted_itr->first) == 0) {
753  to_send_edges_low.push_back(*unsorted_itr);
754  ++i;
755  } else {
756  continue;
757  }
758  } // for
759 
760  // Exchange Edges/Recieve edges
761  edge_source_partitioner paritioner(m_mpi_size);
762  mpi_yield_barrier(m_mpi_comm);
763  mpi_all_to_all_better(to_send_edges_low, to_recv_edges_low, paritioner,
764  m_mpi_comm);
765  }
766 
767  std::sort(to_recv_edges_low.begin(), to_recv_edges_low.end());
768 
769 
770  #ifdef DEBUG_DPG
771  // Sanity Check to make sure we recieve the correct edges
772  for (size_t i = 0; i<to_recv_edges_low.size(); ++i) {
773  auto edge = to_recv_edges_low[i];
774  assert(int(edge.first) % m_mpi_size ==m_mpi_rank);
775  assert(m_map_delegate_locator.count(edge.first) == 0);
776  }
777  #endif
778 
779  // Loop over recieved edges, appending them to the low CSR
780  auto itr_end = to_recv_edges_low.end();
781  for (auto itr = to_recv_edges_low.begin(); itr != itr_end; itr++) {
782 
783  auto edge = *itr;
784  uint64_t new_vertex_id = local_source_id(m_mpi_size)(edge);
785  assert(m_mpi_rank == int(edge.first % m_mpi_size));
786 
787  uint64_t temp_offset = (m_owned_info_tracker[new_vertex_id])++;
788  uint64_t loc = temp_offset + m_owned_info[new_vertex_id].low_csr_idx;
789 
790 
791  if (!(loc < m_owned_info[new_vertex_id+1].low_csr_idx)) {
792  std::cout << "Error rank: " << m_mpi_rank << " -- " <<
793  loc << " < " << m_owned_info[new_vertex_id+1].low_csr_idx
794  << ", new_vertex_id = " << new_vertex_id
795  << ", temp_offset = " << temp_offset
796  << ", edge = (" << edge.first << "," << edge.second << ")"
797  << std::endl << std::flush;
798  assert(false);
799  exit(-1);
800  }
802  //assert(!m_owned_targets[loc].is_valid());
803 
804  m_owned_targets[loc] = label_to_locator(edge.second);
805  } // for over recieved egdes
806  } // while global iterator range not empty
807  } // for node partition
808 
809  mpi_yield_barrier(m_mpi_comm);
810  if (m_mpi_rank == 0) {
811  double curr_time = MPI_Wtime();
812  std::cout << "\t[LP]["
813  << "Total Loops: " << loop_counter << ", "
814  << "Total Edges: " << edge_counter
815  << "] Time " << (curr_time - last_loop_time) << " second, "
816  << " Total Time " << (curr_time - start_time) << " second, "
817  << "Dirty Pages: " << get_dirty_pages() << "kb." << std::endl
818  << std::flush;
819  }
820  mpi_yield_barrier(m_mpi_comm);
821 
822 } // partition_low_degree
823 
824 
825 
832 template <typename SegmentManager>
833 template <typename InputIterator>
834 void
836 count_high_degree_edges(InputIterator unsorted_itr,
837  InputIterator unsorted_itr_end,
838  boost::unordered_set<uint64_t>& global_hub_set) {
839  // Temp Vector for storing offsets
840  // Used to store high_edge count
841  #if DEBUG_DPG
842  std::vector<uint64_t> tmp_high_count_per_rank(m_mpi_size, 0);
843  #endif
844  m_edges_high_count = 0;
845 
846  uint64_t loop_counter = 0;
847  uint64_t edge_counter = 0;
848 
849  double start_time, last_loop_time;
850  start_time = last_loop_time = MPI_Wtime();
851 
852 
853  while (!detail::global_iterator_range_empty(unsorted_itr, unsorted_itr_end,
854  m_mpi_comm)) {
855  if (m_mpi_rank == 0 && (loop_counter% 1000) == 0) {
856  double curr_time = MPI_Wtime();
857 
858  std::cout << "\t[CH]["
859  << "Total Loops: " << loop_counter << ", "
860  << "Total Edges: " << edge_counter
861  << "] Time " << (curr_time - last_loop_time) << " second, "
862  << " Total Time " << (curr_time - start_time) << " second, "
863  << "Dirty Pages: " << get_dirty_pages() << "kb." << std::endl
864  << std::flush;
865 
866 
867  last_loop_time = curr_time;
868  }
869  loop_counter++;
870 
871 
872  // Vector used to pass number of high edges
873  std::vector<
874  boost::container::map<uint64_t, uint64_t> > maps_to_send(m_mpi_size);
875  int maps_to_send_element_count = 0;
876  {
877  for (size_t i=0; unsorted_itr != unsorted_itr_end && i < edge_chunk_size;
878  ++unsorted_itr) {
879 
880  // Get next edge
881  const auto edge = *unsorted_itr;
882  edge_counter++;
883 
884 
885  if (global_hub_set.count(unsorted_itr->first) == 0) {
886  continue;
887  } else if(global_hub_set.count(unsorted_itr->first)) {
888  #if DEBUG_DPG
889  // This edge's source is a hub
890  // 1) Increment the high edge count for the owner of the edge's dest
891  tmp_high_count_per_rank[unsorted_itr->second %m_mpi_size]++;
892  #endif
893 
894  // 2) Increment the owner's count of edges for this hub.
895  const int owner = unsorted_itr->second %m_mpi_size;
896  if (owner == m_mpi_rank) {
897  const uint64_t ver_id = unsorted_itr->first;
898 
899  const uint64_t new_source_id = m_map_delegate_locator[ver_id].local_id();
900  assert(new_source_id < m_delegate_info.size()-1);
901  m_delegate_info[new_source_id]++;
902  m_edges_high_count++;
903  } else {
904  int c = maps_to_send.at(owner)[unsorted_itr->first]++;
905  if (c == 0) {
906  maps_to_send_element_count++;
907  i++;
908  }
909  }
910  } else {
911  assert(false);
912  }
913  } // for
914  // Send the hub edge count to the relevent nodes.
915  send_high_info(maps_to_send, maps_to_send_element_count);
916  }
917 
918  } // while global iterator range not empty
919 
920 
921  mpi_yield_barrier(m_mpi_comm);
922  if (m_mpi_rank == 0) {
923  double curr_time = MPI_Wtime();
924  std::cout << "\t[CH]["
925  << "Total Loops: " << loop_counter << ", "
926  << "Total Edges: " << edge_counter
927  << "] Time " << (curr_time - last_loop_time) << " second, "
928  << " Total Time " << (curr_time - start_time) << " second, "
929  << "Dirty Pages: " << get_dirty_pages() << "kb." << std::endl
930  << std::flush;
931  }
932 
933  #if DEBUG_DPG
934  std::cout << "Checking count_high_degree_edges:" << std::endl << std::flush;
935  size_t l_edges_high_count = 0;
936  for (size_t i = 0; i < m_delegate_info.size(); i++) {
937  l_edges_high_count += m_delegate_info[i];
938  }
939  assert(l_edges_high_count == m_edges_high_count);
940 
941  // Sync The high counts.
942  std::vector<uint64_t> high_count_per_rank;
943  mpi_all_reduce(tmp_high_count_per_rank, high_count_per_rank,
944  std::plus<uint64_t>(), m_mpi_comm);
945 
946  uint64_t sanity_check_high_edge_count = high_count_per_rank[m_mpi_rank];
947  assert(m_edges_high_count == sanity_check_high_edge_count);
948  #endif
949 
950 } // count_high_degree_edges
951 
952 
960 template <typename SegmentManager>
961 void
963 send_high_info(std::vector< boost::container::map< uint64_t, uint64_t> >&
964  maps_to_send, int maps_to_send_element_count) {
965 
966  int to_send_pos = 0;
967  std::vector<uint64_t> to_send(maps_to_send_element_count*2, 0);
968  std::vector<int> to_send_count(m_mpi_size, 0);
969 
970  assert(maps_to_send.size() == m_mpi_size);
971  for (size_t i = 0; i < maps_to_send.size(); i++) {
972  for (auto itr = maps_to_send[i].begin(); itr != maps_to_send[i].end(); itr++) {
973  assert(to_send_pos < to_send.size());
974  to_send[to_send_pos++] = itr->first;
975  to_send[to_send_pos++] = itr->second;
976  }
977  to_send_count[i] = maps_to_send[i].size()*2;
978  }
979 
980  std::vector<uint64_t> to_recv;
981  std::vector<int> out_recvcnts;
982 
983  mpi_yield_barrier(m_mpi_comm);
984  mpi_all_to_all(to_send, to_send_count,to_recv, out_recvcnts, m_mpi_comm);
985 
986  for (size_t i = 0; i < to_recv.size(); i++) {
987  const uint64_t ver_id = to_recv[i++];
988  const uint64_t delegate_dest_count = to_recv[i];
989 
990  const uint64_t new_source_id = m_map_delegate_locator[ver_id].local_id();
991  assert(new_source_id < m_delegate_info.size()-1);
992  m_delegate_info[new_source_id] += delegate_dest_count;
993  m_edges_high_count += delegate_dest_count;
994  }
995 } // send_high_info
996  //
997 
1010 template <typename SegmentManager>
1011 void
1013 calculate_overflow(std::map< uint64_t, std::deque<OverflowSendInfo> >
1014  &transfer_info) {
1015 
1016  //
1017  // Get the number of high and low edges for each node.
1018  //
1019  mpi_yield_barrier(m_mpi_comm);
1020  std::vector<uint64_t> low_count_per_rank, high_count_per_rank;
1021  mpi_all_gather(m_edges_low_count, low_count_per_rank, m_mpi_comm);
1022  mpi_all_gather(m_edges_high_count, high_count_per_rank, m_mpi_comm);
1023 
1024  // Determine the total of edges accorss all nodes.
1025  const uint64_t owned_total_edges = m_edges_high_count + m_edges_low_count;
1026  uint64_t global_edge_count = mpi_all_reduce(owned_total_edges,
1027  std::plus<uint64_t>(), m_mpi_comm);
1028 
1029  // Determine the desired number of edges at each node.
1030  const uint64_t target_edges_per_rank = global_edge_count / m_mpi_size;
1031 
1032  uint64_t gave_edge_counter = 0;
1033  uint64_t recieve_edge_counter = 0;
1034 
1035 
1036  //
1037  // Compure the edge count exchange
1038  //
1039  int heavy_idx(0), light_idx(0);
1040  for(; heavy_idx < m_mpi_size && light_idx < m_mpi_size; ++heavy_idx) {
1041 
1042  while(low_count_per_rank[heavy_idx] + high_count_per_rank[heavy_idx]
1043  > target_edges_per_rank) {
1044  // while heavy_idx has edges to give
1045  const int64_t total_edges_low_idx = low_count_per_rank[light_idx]
1046  + high_count_per_rank[light_idx];
1047  if(total_edges_low_idx < target_edges_per_rank) {
1048  // if the low_idx needs edges
1049 
1050  if(high_count_per_rank[heavy_idx] == 0) {
1051  // If the heavy_idx has no more edges to give then break the while loop
1052  // causing the heavy_idx to be incremented.
1053  break;
1054  }
1055 
1056  // Determine the most that can be given.
1057  uint64_t max_to_offload = std::min(high_count_per_rank[heavy_idx],
1058  high_count_per_rank[heavy_idx] + low_count_per_rank[heavy_idx] -
1059  target_edges_per_rank);
1060  // Determine the most that can be recived
1061  uint64_t max_to_receive = target_edges_per_rank -
1062  high_count_per_rank[light_idx] - low_count_per_rank[light_idx];
1063  // Determine the most that can be moved
1064  uint64_t to_move = std::min(max_to_offload, max_to_receive);
1065 
1066  assert(to_move != 0);
1067  // Update the local count variables
1068  high_count_per_rank[heavy_idx]-=to_move;
1069  high_count_per_rank[light_idx]+=to_move;
1070 
1071  assert(heavy_idx != light_idx);
1072  if (heavy_idx == m_mpi_rank) { // This node is sending
1073  std::vector<uint64_t> send_list;
1074  // Generate a list of [delegate_ids, edge_counts] to send
1075  generate_send_list(send_list, to_move, light_idx, transfer_info);
1076 
1077  for (int i = 0; i < send_list.size();) {
1078  const uint64_t vert_id = send_list[i++];
1079  const uint64_t count = send_list[i++];
1080  #if 0
1081  std::cout << "[" << m_mpi_rank << "] giving " << vert_id << " +" << count
1082  << " to " << light_idx << ". "<< std::endl << std::flush;
1083  #endif
1084 
1085  }
1086 
1087  // Send the information
1088  int64_t send_len = send_list.size();
1089  CHK_MPI(MPI_Send(&send_len, 1, mpi_typeof(send_len), light_idx, 0, m_mpi_comm));
1090  CHK_MPI(MPI_Send(send_list.data(), send_len, mpi_typeof(to_move),
1091  light_idx, 0, m_mpi_comm));
1092 
1093  // Adjust the m_edges_high_count info.
1094  m_edges_high_count -= to_move;
1095  gave_edge_counter += to_move;
1096  } else if (light_idx == m_mpi_rank) { // This node is reciving
1097  MPI_Status status;
1098  int64_t recv_length;
1099 
1100  // Recieve the information
1101  CHK_MPI(MPI_Recv(&recv_length, 1, mpi_typeof(recv_length), heavy_idx,
1102  0, m_mpi_comm, &status));
1103  std::vector<uint64_t> recv_list(recv_length);
1104  CHK_MPI(MPI_Recv(recv_list.data(), recv_length, mpi_typeof(to_move),
1105  heavy_idx, 0, m_mpi_comm, &status));
1106 
1107 
1108  // Update my delagate edge counts.
1109  uint64_t sanity_count = 0;
1110  for (int i = 0; i < recv_length;) {
1111  const uint64_t vert_id = recv_list[i++];
1112  const uint64_t count = recv_list[i++];
1113  m_delegate_info[vert_id] += count;
1114  sanity_count += count;
1115 
1116  #if 0
1117  std::cout << "[" << m_mpi_rank << "] recieved " << vert_id <<
1118  ", now has " << count << " edges for it." << std::endl << std::flush;
1119  #endif
1120 
1121  }
1122 
1123  // Adjust the m_edges_high_count info.
1124  m_edges_high_count += to_move;
1125  recieve_edge_counter += to_move;
1126  assert(sanity_count == to_move);
1127  } // else this node is not involved.
1128  mpi_yield_barrier(m_mpi_comm);
1129  } else {
1130  ++light_idx;
1131  if (light_idx == m_mpi_size) {
1132  break;
1133  }
1134  } // else
1135  } // While
1136  } // For
1137 
1138 #ifdef DEBUG_DPG
1139  const uint64_t owned_total_edges2 = m_edges_high_count + m_edges_low_count;
1140  uint64_t sanity_global_edge_count = mpi_all_reduce(owned_total_edges2,
1141  std::plus<uint64_t>(), m_mpi_comm);
1142  assert(sanity_global_edge_count == global_edge_count);
1143  assert(m_edges_high_count == high_count_per_rank[m_mpi_rank]);
1144  uint64_t high_count_2 = 0;
1145  for (size_t i = 0; i < m_delegate_info.size()-1; i++) {
1146  high_count_2 += m_delegate_info[i];
1147  }
1148 
1149  assert(m_edges_high_count == high_count_2);
1150 
1151  std::vector<uint64_t> low_count_per_rank2, high_count_per_rank2;
1152  mpi_all_gather(m_edges_low_count, low_count_per_rank2, m_mpi_comm);
1153  mpi_all_gather(m_edges_high_count, high_count_per_rank2, m_mpi_comm);
1154 
1155  for (size_t i = 0; i < m_mpi_size; i++) {
1156  assert(low_count_per_rank2[i] == low_count_per_rank[i]);
1157  assert(high_count_per_rank2[i] == high_count_per_rank[i]);
1158  }
1159 
1160  #if 0
1161  for (int i = 0; i < m_mpi_size; i++) {
1162  if (i == m_mpi_rank) {
1163  std::cout << i << " has " << ((int64_t)owned_total_edges - (int64_t)target_edges_per_rank)
1164  << " extra edges. Givng:" << gave_edge_counter << " Recieving: "
1165  << recieve_edge_counter << "." << std::endl;
1166  }
1167  MPI_Barrier(m_mpi_comm);
1168  }
1169  #endif
1170 
1171 
1172 #endif
1173 
1174 } // calculate overflow
1175 
1192 template <typename SegmentManager>
1193 void
1195 generate_send_list(std::vector<uint64_t> &send_list, uint64_t num_send,
1196  int send_id,
1197  std::map< uint64_t, std::deque<OverflowSendInfo> > &transfer_info ) {
1198  // Tracking variables used to determine how much space to allocate.
1199  uint64_t send_count = 0;
1200  uint64_t ver_count = 0;
1201  for (uint64_t i = 0; i < m_delegate_info.size()-1 && send_count < num_send;) {
1202  if (m_delegate_info[i] != 0) {
1203  send_count += m_delegate_info[i];
1204  ver_count++;
1205  }
1206  i++;
1207  }
1208 
1209  // Initilze the send_list
1210  send_list.reserve(ver_count * 2);
1211  send_count = 0;
1212  for (uint64_t i = 0; i < m_delegate_info.size()-1 && send_count < num_send;) {
1213  if (m_delegate_info[i] != 0) { // if there are edges to give
1214  uint64_t edges_to_give = m_delegate_info[i];
1215 
1216  if (send_count + edges_to_give > num_send) { // reduce edges if it will
1217  edges_to_give = num_send - send_count; // go over the num to send.
1218  }
1219 
1220  m_delegate_info[i] -= edges_to_give; // update this node's edge count
1221  send_count += edges_to_give; // update the send_count
1222 
1223  send_list.push_back(i); // add the information to the send_list
1224  send_list.push_back(edges_to_give);
1225 
1226 
1227  //Add this informatio to the transfer_info map
1228  if (transfer_info.count(i) == 0 ) {
1229  transfer_info[i] = std::deque<OverflowSendInfo>();
1230  }
1231  transfer_info[i].push_back(OverflowSendInfo(send_id, edges_to_give));
1232  } // if there are edges to give for this delegate
1233  i++;
1234  }
1235  assert(num_send == send_count);
1236 } // generate_send_list
1237 
1238 
1253 template <typename SegmentManager>
1254 template <typename Container>
1255 void
1257 partition_high_degree(Container& unsorted_edges,
1258  std::map< uint64_t, std::deque<OverflowSendInfo> > &transfer_info) {
1259 
1260  // Initates the paritioner, which determines where overflowed edges go
1261  high_edge_partitioner paritioner(m_mpi_size, m_mpi_rank, &transfer_info);
1262 
1263  uint64_t loop_counter = 0;
1264  uint64_t edge_counter = 0;
1265  double start_time, last_loop_time, last_part_time;
1266  start_time = last_loop_time = last_part_time = MPI_Wtime();
1267  uint64_t gave_edge_counter = 0;
1268 
1269  // Scratch vector use for storing edges to send
1270  std::vector<std::pair<uint64_t, uint64_t> > to_send_edges_high;
1271  to_send_edges_high.reserve(edge_chunk_size);
1272 
1273  for (size_t node_turn = 0; node_turn < node_partitions; node_turn++) {
1274 
1275  if (m_mpi_rank == 0) {
1276  double curr_time = MPI_Wtime();
1277 
1278  std::cout << "\t***[HP]["
1279  << "Partition Number: " << node_turn << "/" << node_partitions << ", "
1280  << "Total Loops: " << loop_counter << ", "
1281  << "Total Edges: " << edge_counter
1282  << "] Time " << (curr_time - last_part_time) << " second, "
1283  << " Total Time " << (curr_time - start_time) << " second, "
1284  << "Dirty Pages: " << get_dirty_pages() << "kb." << std::endl
1285  << std::flush;
1286 
1287  last_part_time = curr_time;
1288  }
1289  MPI_Barrier(m_mpi_comm);
1290 
1291 
1292  auto unsorted_itr = unsorted_edges.begin();
1293  auto unsorted_itr_end = unsorted_edges.end();
1294 
1295  while (!detail::global_iterator_range_empty(unsorted_itr, unsorted_itr_end,
1296  m_mpi_comm)) {
1297  if (m_mpi_rank == 0 && (loop_counter % 500) == 0) {
1298  double curr_time = MPI_Wtime();
1299 
1300  std::cout << "\t[HP]["
1301  << "Partition Number: " << node_turn << "/" << node_partitions << ", "
1302  << "Total Loops: " << loop_counter << ", "
1303  << "Total Edges: " << edge_counter
1304  << "] Time " << (curr_time - last_loop_time) << " second, "
1305  << " Total Time " << (curr_time - start_time) << " second, "
1306  << "Dirty Pages: " << get_dirty_pages() << "kb." << std::endl
1307  << std::flush;
1308 
1309  last_loop_time = curr_time;
1310  }
1311  loop_counter++;
1312 
1313 
1314  while (unsorted_itr != unsorted_itr_end &&
1315  to_send_edges_high.size() < edge_chunk_size) {
1316  // Get next edge
1317  const auto edge = *unsorted_itr;
1318  ++unsorted_itr;
1319 
1320  {
1321  const int owner = unsorted_itr->second %m_mpi_size;
1322  if (owner % processes_per_node % node_partitions != node_turn) {
1323  continue;
1324  }
1325  }
1326 
1327  edge_counter++;
1328 
1329  if (m_map_delegate_locator.count(edge.first) == 1) {
1330  // assert(global_hub_set.count(edge.first) > 0);
1331  // If the edge's source is a hub node
1332  const uint64_t source_id = edge.first;
1333  uint64_t new_source_id = m_map_delegate_locator[source_id].local_id();
1334  assert(new_source_id >=0 && new_source_id < m_delegate_info.size()-1);
1335 
1336  // Send the edge if we don't own it or if we own it but have no room.
1337  to_send_edges_high.push_back(std::make_pair(new_source_id, edge.second));
1338  } // end if is a hub
1339  else {
1340  // assert(global_hub_set.count(edge.first) == 0);
1341  }
1342 
1343  } // end while
1344 
1345  // Exchange edges we generated that we don't need with the other nodes and
1346  // recieve edges we may need
1347  // // Scratch vector use for storing recieved edges.
1348  std::vector< std::pair<uint64_t, uint64_t> > to_recv_edges_high;
1349  mpi_yield_barrier(m_mpi_comm);
1350  mpi_all_to_all_better(to_send_edges_high, to_recv_edges_high, paritioner,
1351  m_mpi_comm);
1352 
1353  // Empty the vector
1354  {
1355  std::vector<std::pair<uint64_t, uint64_t> > temp;
1356  to_send_edges_high.swap(temp);
1357  }
1358  to_send_edges_high.reserve(edge_chunk_size);
1359 
1360  assert(to_send_edges_high.size() == 0);
1361  std::sort(to_recv_edges_high.begin(), to_recv_edges_high.end());
1362 
1363  for (size_t i = 0; i < to_recv_edges_high.size(); ++i) {
1364  // Iterate over recieved edges, addiing them using similar logic from
1365  // above
1366  const auto edge = to_recv_edges_high[i];
1367  const uint64_t new_source_id = edge.first;
1368  assert(new_source_id >=0 && new_source_id < m_delegate_info.size()-1);
1369 
1370  uint64_t place_pos = m_delegate_degree[new_source_id];
1371  place_pos += m_delegate_info[new_source_id];
1372 
1373  if (place_pos == m_delegate_info[new_source_id+1]) {
1374  // We have no room for this node, so lets send it to a node that has
1375  // room.
1376  assert(transfer_info.size() > 0);
1377  assert(transfer_info.count(new_source_id) != 0);
1378  to_send_edges_high.push_back(edge);
1379  gave_edge_counter++;
1380  }
1381  else {
1382  assert(place_pos < m_delegate_info[new_source_id+1]);
1383  assert(place_pos < m_delegate_targets_size);
1384 
1385  uint64_t new_target_label = edge.second;
1386  m_delegate_targets[place_pos] = label_to_locator(new_target_label);
1387  assert(m_delegate_targets[place_pos].m_owner_dest < m_mpi_size);
1388  m_delegate_degree[new_source_id]++;
1389 
1390  if (owner_dest_id(m_mpi_size)(edge) != m_mpi_rank) {
1391  assert(transfer_info.size() == 0);
1392  }
1393 
1394  } // else we have room
1395  } // for edges recieved
1396 
1397  } // end while get next edge
1398  } // For over node partitions
1399  mpi_yield_barrier(m_mpi_comm);
1400  if (m_mpi_rank == 0) {
1401  double curr_time = MPI_Wtime();
1402  std::cout << "\t[HP]["
1403  << "Total Loops: " << loop_counter << ", "
1404  << "Total Edges: " << edge_counter
1405  << "] Time " << (curr_time - last_loop_time) << " second, "
1406  << " Total Time " << (curr_time - start_time) << " second, "
1407  << "Dirty Pages: " << get_dirty_pages() << "kb." << std::endl
1408  << std::flush;
1409  }
1410  mpi_yield_barrier(m_mpi_comm);
1411 
1412  {//
1413  // Exchange edges we generated with the other nodes and recieve edges we may need
1414  // // Scratch vector use for storing recieved edges.
1415  std::vector< std::pair<uint64_t, uint64_t> > to_recv_edges_high;
1416  mpi_yield_barrier(m_mpi_comm);
1417  mpi_all_to_all_better(to_send_edges_high, to_recv_edges_high, paritioner,
1418  m_mpi_comm);
1419 
1420  // Empty the vector
1421  {
1422  std::vector<std::pair<uint64_t, uint64_t> > temp;
1423  to_send_edges_high.swap(temp);
1424  }
1425 
1426  std::sort(to_recv_edges_high.begin(), to_recv_edges_high.end());
1427  for (size_t i=0; i<to_recv_edges_high.size(); ++i) {
1428  // Iterate over recieved edges, addiing them using similar logic from
1429  // above
1430  const auto edge = to_recv_edges_high[i];
1431  const uint64_t new_source_id = edge.first;
1432  assert(new_source_id >=0 && new_source_id < m_delegate_info.size()-1);
1433 
1434  uint64_t place_pos = m_delegate_degree[new_source_id];
1435  place_pos += m_delegate_info[new_source_id];
1436 
1437  if (place_pos == m_delegate_info[new_source_id+1]) {
1438  // We have no room for this node, so lets send it to a node that has
1439  // room. But this is the last round, so an error must have occurd
1440  assert(false);
1441  }
1442  else {
1443  assert(place_pos < m_delegate_info[new_source_id+1]);
1444  assert(place_pos < m_delegate_targets_size);
1445 
1446  uint64_t new_target_label = edge.second;
1447  m_delegate_targets[place_pos] = label_to_locator(new_target_label);
1448  assert(m_delegate_targets[place_pos].m_owner_dest < m_mpi_size);
1449  m_delegate_degree[new_source_id]++;
1450 
1451  if (owner_dest_id(m_mpi_size)(edge) != m_mpi_rank) {
1452  assert(transfer_info.size() == 0);
1453  }
1454 
1455  } // else there is have room
1456  } // for edges recieved
1457  }
1458 
1459 #ifdef DEBUG_DPG
1460  int64_t recv_count2 = 0;
1461  for (size_t i = 0; i < m_delegate_degree.size(); i++) {
1462  recv_count2 += m_delegate_degree[i];
1463  }
1464 
1465  int64_t difference = recv_count2 - m_delegate_targets_size;
1466 
1467  for (size_t i = 0; i < m_delegate_info.size()-1; i++) {
1468  size_t pos = m_delegate_info[i];
1469  for (size_t j = pos; j < m_delegate_info[i+1]; j++) {
1470  assert(m_delegate_targets[j].m_owner_dest != 0xFFFFF);
1471  assert(m_delegate_targets[j].m_local_id != 0x7FFFFFFFFF);
1472  }
1473  }
1474 #endif
1475 
1476 } // partition_high_degre
1477 
1478 
1482 
1487 template <typename SegmentManager>
1488 inline
1489 uint64_t
1492  locator) const {
1493  uint64_t res;
1494  if(locator.is_delegate()) {
1495  res = m_delegate_label[locator.local_id()];
1496  } else {
1497  res = uint64_t(locator.local_id()) * uint64_t(m_mpi_size) +
1498  uint64_t(locator.owner());
1499  }
1500  return res;
1501 } // locator_to_label
1502 
1507 template <typename SegmentManager>
1508 inline
1511 label_to_locator(uint64_t label) const {
1512  /*typename boost::unordered_map< uint64_t, vertex_locator,
1513  boost::hash<uint64_t>, std::equal_to<uint64_t>,
1514  SegmentAllocator< std::pair<uint64_t,vertex_locator> >
1515  >::const_iterator*/
1516  auto itr = m_map_delegate_locator.find(label);
1517 
1518  if(itr == m_map_delegate_locator.end()) {
1519  uint32_t owner = label % uint64_t(m_mpi_size);
1520  uint64_t local_id = label / uint64_t(m_mpi_size);
1521  return vertex_locator(false, local_id, owner);
1522  }
1523  return itr->second;
1524 }
1525 
1533 template <typename SegmentManager>
1534 inline void
1536 sync_global_hub_set(const boost::unordered_set<uint64_t>& local_hubs,
1537  boost::unordered_set<uint64_t>& global_hubs,
1538  bool local_change) {
1539  uint32_t new_hubs = mpi_all_reduce(uint32_t(local_change),
1540  std::plus<uint32_t>(), m_mpi_comm);
1541 
1542  if(new_hubs > 0) {
1543  std::vector<uint64_t> vec_local_hubs(local_hubs.begin(), local_hubs.end());
1544  std::vector<uint64_t> vec_global_hubs;
1545  // global gather local hubs
1546  mpi_all_gather(vec_local_hubs, vec_global_hubs, m_mpi_comm);
1547  // Insert gathered global hubs to set
1548  global_hubs.insert(vec_global_hubs.begin(), vec_global_hubs.end());
1549  }
1550 }
1551 
1556 template <typename SegmentManager>
1557 inline
1561  locator) const {
1562  if(locator.is_delegate()) {
1563  assert(locator.local_id() < m_delegate_info.size()-1);
1564  return edge_iterator(locator, m_delegate_info[locator.local_id()], this);
1565  }
1566  assert(locator.owner() == m_mpi_rank);
1567  assert(locator.local_id() < m_owned_info.size());
1568  return edge_iterator(locator, m_owned_info[locator.local_id()].low_csr_idx,
1569  this);
1570 }
1571 
1576 template <typename SegmentManager>
1577 inline
1581  locator) const {
1582  if(locator.is_delegate()) {
1583  assert(locator.local_id()+1 < m_delegate_info.size());
1584  return edge_iterator(locator, m_delegate_info[locator.local_id() + 1], this);
1585  }
1586  assert(locator.owner() == m_mpi_rank);
1587  assert(locator.local_id()+1 < m_owned_info.size());
1588  return edge_iterator(locator, m_owned_info[locator.local_id() + 1].low_csr_idx, this);
1589 }
1590 
1595 template <typename SegmentManager>
1596 inline
1597 uint64_t
1600  locator) const {
1601  uint64_t local_id = locator.local_id();
1602  if(locator.is_delegate()) {
1603  assert(local_id < m_delegate_degree.size());
1604  return m_delegate_degree[local_id];
1605  }
1606  assert(local_id + 1 < m_owned_info.size());
1607  return m_owned_info[local_id+1].low_csr_idx -
1608  m_owned_info[local_id].low_csr_idx;
1609 }
1610 
1615 template <typename SegmentManager>
1616 inline
1617 uint64_t
1620  locator) const {
1621  uint64_t local_id = locator.local_id();
1622  if(locator.is_delegate()) {
1623  assert(local_id + 1 < m_delegate_info.size());
1624  return m_delegate_info[local_id + 1] - m_delegate_info[local_id];
1625  }
1626  assert(local_id + 1 < m_owned_info.size());
1627  return m_owned_info[local_id+1].low_csr_idx -
1628  m_owned_info[local_id].low_csr_idx;
1629 }
1630 
1631 
1632 template <typename SegmentManager>
1633 inline
1637  return vertex_iterator(0,this);
1638 }
1639 
1640 template <typename SegmentManager>
1641 inline
1644 vertices_end() const {
1645  return vertex_iterator(m_owned_info.size()-1,this);
1646 }
1647 
1648 template <typename SegmentManager>
1649 inline
1650 bool
1652 is_label_delegate(uint64_t label) const {
1653  return m_map_delegate_locator.count(label) > 0;
1654 }
1655 
1659 template <typename SegmentManager>
1660 template <typename T, typename SegManagerOther>
1661 typename delegate_partitioned_graph<SegmentManager>::template edge_data<T, SegManagerOther>*
1663 create_edge_data(SegManagerOther* segment_manager_o,
1664  const char *obj_name) const {
1667 
1668  if (obj_name == nullptr) {
1669  return segment_manager_o->template construct<mytype>(bip::anonymous_instance)
1670  (m_owned_targets_size, m_delegate_targets_size,
1671  segment_manager_o);
1672  } else {
1673  return segment_manager_o->template construct<mytype>(obj_name)
1674  (m_owned_targets_size, m_delegate_targets_size,
1675  segment_manager_o);
1676  }
1677 }
1678 
1683 template <typename SegmentManager>
1684 template <typename T, typename SegManagerOther>
1687 create_edge_data(const T& init, SegManagerOther * segment_manager_o,
1688  const char *obj_name) const {
1689 
1692 
1693  if (obj_name == nullptr) {
1694  return segment_manager_o->template construct<mytype>(bip::anonymous_instance)
1695  (m_owned_targets_size, m_delegate_targets_size, init,
1696  segment_manager_o);
1697  } else {
1698  return segment_manager_o->template construct<mytype>(obj_name)
1699  (m_owned_targets_size, m_delegate_targets_size, init,
1700  segment_manager_o);
1701  }
1702 }
1703 
1704 template <typename SegmentManager>
1705 void
1708  /*if(m_mpi_rank == 0) {
1709  for(size_t i=0; i<m_delegate_degree.size(); ++i) {
1710  std::cout << "Hub label = " << m_delegate_label[i] << ", degree = " <<
1711  m_delegate_degree[i] << std::endl << std::flush;
1712  }
1713  }*/
1714 
1715  uint64_t low_local_size = m_owned_targets_size;
1716  uint64_t high_local_size = m_delegate_targets_size;
1717  uint64_t total_local_size = low_local_size + high_local_size;
1718 
1719  //
1720  // Print out debugging info
1721  if (m_mpi_rank == 0) {
1722  std::cout << "Reducing.." << std::endl;
1723  }
1724  uint64_t low_max_size = mpi_all_reduce(low_local_size,
1725  std::greater<uint64_t>(), m_mpi_comm);
1726  uint64_t high_max_size = mpi_all_reduce(high_local_size,
1727  std::greater<uint64_t>(), m_mpi_comm);
1728  uint64_t total_max_size = mpi_all_reduce(total_local_size,
1729  std::greater<uint64_t>(), m_mpi_comm);
1730  uint64_t low_sum_size = mpi_all_reduce(low_local_size,
1731  std::plus<uint64_t>(), m_mpi_comm);
1732  uint64_t high_sum_size = mpi_all_reduce(high_local_size,
1733  std::plus<uint64_t>(), m_mpi_comm);
1734  uint64_t total_sum_size = mpi_all_reduce(total_local_size,
1735  std::plus<uint64_t>(), m_mpi_comm);
1736 
1737  if (m_mpi_rank == 0) {
1738  std::cout << "counting..(" << m_owned_targets_size << ")" << std::endl;
1739  }
1740  uint64_t local_count_del_target = 0;
1741  for (uint64_t i = 0; i < m_owned_targets_size; ++i) {
1742  if (m_owned_targets[i].is_delegate())
1743  ++local_count_del_target;
1744  if (m_mpi_rank == 0 && i % 10000000 == 0) {
1745  std::cout << i << "..." << std::flush;
1746  }
1747  }
1748 
1749  if (m_mpi_rank == 0) {
1750  std::cout << "reducing(2).." << std::endl;
1751  }
1752 
1753  uint64_t total_count_del_target = mpi_all_reduce(local_count_del_target,
1754  std::plus<uint64_t>(), m_mpi_comm);
1755 
1756  if (m_mpi_rank == 0) {
1757  std::cout
1758  << "========================================================" << std::endl
1759  << "Graph Statistics" << std::endl
1760  << "========================================================" << std::endl
1761  << "\tMax Local Vertex Id = " << m_max_vertex << std::endl
1762  << "\tHub vertices = " << m_map_delegate_locator.size() << std::endl
1763  << "\tTotal percentage good hub edges = " <<
1764  double(high_sum_size) / double(total_sum_size) * 100.0 << std::endl
1765  << "\tTotal count del target = " << total_count_del_target << std::endl
1766  << "\tTotal percentage of localized edges = " <<
1767  double(high_sum_size + total_count_del_target) /
1768  double(total_sum_size) * 100.0 << std::endl
1769  << "\tGlobal number of edges = " << total_sum_size << std::endl
1770  << "\tNumber of small degree = " << low_sum_size << std::endl
1771  << "\tNumber of hubs = " << high_sum_size << std::endl
1772  << "\toned imbalance = "
1773  << double(low_max_size) / double(low_sum_size/m_mpi_size) << std::endl
1774  << "\thubs imbalance = "
1775  << double(high_max_size) / double(high_sum_size/m_mpi_size) << std::endl
1776  << "\tTOTAL imbalance = "
1777  << double(total_max_size) / double(total_sum_size/m_mpi_size) << std::endl
1778  << "\tNode Partition = " << node_partitions << std::endl
1779  << "\tEdge Chunk Size = " << edge_chunk_size << std::endl
1780  << "\tProcesses_per_node = " << processes_per_node << std::endl
1781  << "\tDebuging = " << IS_DEBUGING << std::endl
1782  << "========================================================" << std::endl
1783  << "End Graph Statistics" << std::endl
1784  << "========================================================" << std::endl
1785  << std::flush;
1786  }
1787 }
1788 } // namespace mpi
1789 } // namespace havoqgt
1790 
1791 #endif // HAVOQGT_MPI_IMPL_DELEGATE_PARTITIONED_GRAPH_IPP_INCLUDED
edge_iterator edges_end(vertex_locator locator) const
Returns an end iterator for edges of a vertex.
bool global_iterator_range_empty(Iterator itr, Iterator itr_end, MPI_Comm comm)
Definition: iterator.hpp:67
void count_edge_degrees(InputIterator unsorted_itr, InputIterator unsorted_itr_end, boost::unordered_set< uint64_t > &global_hub_set, uint64_t delegate_degree_threshold)
vertex_iterator vertices_end() const
Returns an end iterator for all local vertices.
void partition_low_degree(Container &unsorted_edges)
bip::vector< uint32_t, SegmentAllocator< uint32_t > > m_local_outgoing_count
uint64_t degree(vertex_locator locator) const
Returns the degree of a vertex.
void send_high_info(std::vector< boost::container::map< uint64_t, uint64_t > > &maps_to_send, int maps_to_send_element_count)
uint64_t locator_to_label(vertex_locator locator) const
Converts a vertex_locator to the vertex label.
void mpi_all_to_all_better(std::vector< T > &in_vec, std::vector< T > &out_vec, Partitioner &owner, MPI_Comm mpi_comm)
Definition: mpi.hpp:320
edge_iterator edges_begin(vertex_locator locator) const
Returns a begin iterator for edges of a vertex.
void mpi_all_reduce_inplace(Vec &vec, Op in_op, MPI_Comm mpi_comm)
Definition: mpi.hpp:186
#define IS_DEBUGING
bool is_label_delegate(uint64_t label) const
Tests if vertex label is a delegate.
uint64_t local_degree(vertex_locator locator) const
Returns the local degree of a vertex.
int size() const
Definition: mpi.hpp:618
uint32_t get_dirty_pages()
vertex_iterator vertices_begin() const
Returns a begin iterator for all local vertices.
void send_vertex_info(uint64_t &high_vertex_count, uint64_t delegate_degree_threshold, std::vector< boost::container::map< int, std::pair< uint64_t, uint64_t > > > &maps_to_send, int maps_to_send_element_count)
T mpi_all_reduce(T in_d, Op in_op, MPI_Comm mpi_comm)
Definition: mpi.hpp:176
void complete_construction(const SegmentAllocator< void > &seg_allocator, MPI_Comm mpi_comm, Container &edges)
Edge storage allocation phase of graph construction.
void initialize_high_meta_data(boost::unordered_set< uint64_t > &global_hubs)
void partition_high_degree(Container &unsorted_edges, std::map< uint64_t, std::deque< OverflowSendInfo > > &transfer_info)
bip::vector< vert_info, SegmentAllocator< vert_info > > m_owned_info
bip::vector< uint32_t, SegmentAllocator< uint32_t > > m_local_incoming_count
environment * havoqgt_env()
void generate_send_list(std::vector< uint64_t > &send_list, uint64_t num_send, int send_id, std::map< uint64_t, std::deque< OverflowSendInfo > > &transfer_info)
void mpi_all_gather(T _t, std::vector< T > &out_p_vec, MPI_Comm mpi_comm)
TODO: Add tests.
Definition: mpi.hpp:432
MPI_Datatype mpi_typeof(char)
Definition: mpi.hpp:80
void sync_global_hub_set(const boost::unordered_set< uint64_t > &local_hubs, boost::unordered_set< uint64_t > &global_hubs, bool local_change)
Synchronizes hub set amongst all processes.
void mpi_yield_barrier(MPI_Comm mpi_comm)
Definition: mpi.hpp:156
void initialize_edge_storage(const SegmentAllocator< void > &seg_allocator)
void calculate_overflow(std::map< uint64_t, std::deque< OverflowSendInfo > > &transfer_info)
const int processes_per_node
vertex_locator label_to_locator(uint64_t label) const
Converts a vertex label to a vertex_locator.
#define CHK_MPI(a)
Definition: mpi.hpp:68
void initialize_low_meta_data(boost::unordered_set< uint64_t > &global_hub_set)
const communicator & node_local_comm() const
bip::vector< uint32_t, SegmentAllocator< uint32_t > > m_owned_info_tracker
bip::allocator< T, SegementManager > SegmentAllocator
delegate_partitioned_graph(const SegmentAllocator< void > &seg_allocator, MPI_Comm mpi_comm, Container &edges, uint64_t max_vertex, uint64_t delegate_degree_threshold, ConstructionState stop_after=GraphReady)
Constructor that initializes given and unsorted sequence of edges.
void count_high_degree_edges(InputIterator unsorted_itr, InputIterator unsorted_itr_end, boost::unordered_set< uint64_t > &global_hub_set)
struct havoqgt::mpi::OverflowSendInfo OverflowSendInfo
edge_data< T, SegManagerOther > * create_edge_data(SegManagerOther *, const char *obj_name=nullptr) const
Creates edge_data of type T.
void mpi_all_to_all(std::vector< T > &in_vec, std::vector< int > &in_sendcnts, std::vector< T > &out_vec, std::vector< int > &out_recvcnts, MPI_Comm mpi_comm)
Definition: mpi.hpp:218