55 #ifndef HAVOQGT_MPI_IMPL_DELEGATE_PARTITIONED_GRAPH_IPP_INCLUDED
56 #define HAVOQGT_MPI_IMPL_DELEGATE_PARTITIONED_GRAPH_IPP_INCLUDED
68 template <
typename SegmentManager>
69 template <
typename Container>
73 Container& edges, uint64_t max_vertex,
74 uint64_t delegate_degree_threshold,
76 : m_mpi_comm(mpi_comm),
77 m_global_edge_count(edges.size()),
78 m_local_outgoing_count(seg_allocator),
79 m_local_incoming_count(seg_allocator),
80 m_owned_info(seg_allocator),
81 m_owned_info_tracker(seg_allocator),
83 m_delegate_degree_threshold(delegate_degree_threshold),
84 m_delegate_info(seg_allocator),
85 m_delegate_degree(seg_allocator),
86 m_delegate_label(seg_allocator),
90 m_map_delegate_locator(seg_allocator),
91 m_controller_locators(seg_allocator) {
110 LogStep logstep(
"Allocating 4 Arrays of length max local vertex.",
120 boost::unordered_set<uint64_t> global_hubs;
125 delegate_degree_threshold);
127 std::cout <<
"\tNumber of Delegates: " << global_hubs.size() << std::endl;
158 template <
typename SegmentManager>
159 template <
typename Container>
163 MPI_Comm mpi_comm, Container& edges) {
164 m_mpi_comm = mpi_comm;
165 int temp_mpi_rank, temp_mpi_size;
166 CHK_MPI( MPI_Comm_size(m_mpi_comm, &temp_mpi_size) );
167 CHK_MPI( MPI_Comm_rank(m_mpi_comm, &temp_mpi_rank) );
168 assert(temp_mpi_rank == m_mpi_rank);
169 assert(temp_mpi_size == temp_mpi_size);
171 std::map< uint64_t, std::deque<OverflowSendInfo> > transfer_info;
172 switch (m_graph_state) {
173 case MetaDataGenerated:
175 LogStep logstep(
"calculate_overflow", m_mpi_comm, m_mpi_rank);
176 calculate_overflow(transfer_info);
177 MPI_Barrier(m_mpi_comm);
181 LogStep logstep(
"initialize_edge_storage", m_mpi_comm, m_mpi_rank);
182 initialize_edge_storage(seg_allocator);
183 MPI_Barrier(m_mpi_comm);
186 m_graph_state = EdgeStorageAllocated;
191 case EdgeStorageAllocated:
193 LogStep logstep(
"partition_low_degree", m_mpi_comm, m_mpi_rank);
194 partition_low_degree(edges);
195 MPI_Barrier(m_mpi_comm);
197 m_graph_state = LowEdgesPartitioned;
200 LogStep logstep(
"partition_high_degree", m_mpi_comm, m_mpi_rank);
201 partition_high_degree(edges, transfer_info);
202 MPI_Barrier(m_mpi_comm);
204 m_graph_state = HighEdgesPartitioned;
210 case HighEdgesPartitioned:
212 LogStep logstep(
"all-reduce hub degree", m_mpi_comm, m_mpi_rank);
213 if(m_delegate_degree.size() > 0) {
216 MPI_Barrier(m_mpi_comm);
218 assert(m_delegate_degree.size() == m_delegate_label.size());
223 LogStep logstep(
"Build controller lists", m_mpi_comm, m_mpi_rank);
224 const int controllers = (m_delegate_degree.size() / m_mpi_size) + 1;
225 m_controller_locators.reserve(controllers);
226 for (
size_t i=0; i < m_delegate_degree.size(); ++i) {
227 if (
int(i % m_mpi_size) == m_mpi_rank) {
228 m_controller_locators.push_back(
vertex_locator(
true, i, m_mpi_rank));
231 MPI_Barrier(m_mpi_comm);
237 LogStep logstep(
"Verify CSR integration", m_mpi_comm, m_mpi_rank);
238 for (
auto itr = m_map_delegate_locator.begin();
239 itr != m_map_delegate_locator.end(); ++itr) {
240 uint64_t label = itr->first;
243 uint64_t local_id = label / uint64_t(m_mpi_size);
244 if (label % uint64_t(m_mpi_size) == uint64_t(m_mpi_rank)) {
245 assert(m_owned_info[local_id].is_delegate == 1);
246 assert(m_owned_info[local_id].delegate_id == locator.
local_id());
249 MPI_Barrier(m_mpi_comm);
251 m_graph_state = GraphReady;
276 template <
typename SegmentManager>
277 template <
typename InputIterator>
281 boost::unordered_set<uint64_t>& global_hubs,
282 uint64_t delegate_degree_threshold) {
283 using boost::container::map;
285 uint64_t high_vertex_count(0);
287 uint64_t loop_counter = 0;
288 uint64_t edge_counter = 0;
290 double start_time, last_loop_time;
291 start_time = last_loop_time = MPI_Wtime();
296 unsorted_itr_end, m_mpi_comm)) {
297 if (m_mpi_rank == 0 && (loop_counter% 1000) == 0) {
298 double curr_time = MPI_Wtime();
301 <<
"Total Loops: " << loop_counter <<
", "
302 <<
"Total Edges: " << edge_counter
303 <<
"] Time " << (curr_time - last_loop_time) <<
" second, "
304 <<
" Total Time " << (curr_time - start_time) <<
" second, "
309 last_loop_time = curr_time;
314 boost::container::map< int, std::pair<uint64_t, uint64_t> >
315 > maps_to_send(m_mpi_size);
316 int maps_to_send_element_count = 0;
319 for (
size_t i = 0; i < edge_chunk_size && unsorted_itr != unsorted_itr_end; i++) {
325 if (owner == m_mpi_rank) {
326 m_local_outgoing_count[local_id]++;
327 if (m_local_outgoing_count[local_id] == delegate_degree_threshold) {
331 maps_to_send.at(owner)[local_id].first++;
337 if (owner == m_mpi_rank) {
338 m_local_incoming_count[local_id]++;
343 int c = maps_to_send.at(owner)[local_id].second++;
345 maps_to_send_element_count++;
354 send_vertex_info(high_vertex_count, delegate_degree_threshold,
355 maps_to_send, maps_to_send_element_count);
359 if (m_mpi_rank == 0 ) {
360 double curr_time = MPI_Wtime();
363 <<
"Total Loops: " << loop_counter <<
", "
364 <<
"Total Edges: " << edge_counter
365 <<
"] Time " << (curr_time - last_loop_time) <<
" second, "
366 <<
" Total Time " << (curr_time - start_time) <<
" second, "
376 std::vector<uint64_t> temp_hubs;
377 temp_hubs.reserve(high_vertex_count);
378 for (
size_t i = 0; i < m_local_incoming_count.size(); i++) {
379 const uint64_t outgoing = m_local_outgoing_count[i];
382 if (outgoing >= delegate_degree_threshold) {
383 const uint64_t global_id = (i * m_mpi_size) + m_mpi_rank;
384 assert(global_id != 0);
385 temp_hubs.push_back(global_id);
387 m_edges_low_count += m_local_outgoing_count[i];
391 assert(temp_hubs.size() == high_vertex_count);
394 std::vector<uint64_t> vec_global_hubs;
406 global_hubs.insert(vec_global_hubs.begin(), vec_global_hubs.end());
423 template <
typename SegmentManager>
427 std::vector< boost::container::map<
int, std::pair<uint64_t, uint64_t> > >&
428 maps_to_send,
int maps_to_send_element_count) {
432 std::vector<uint64_t> to_send(maps_to_send_element_count*3, 0);
433 std::vector<int> to_send_count(m_mpi_size, 0);
435 assert(maps_to_send.size() == m_mpi_size);
436 for (
size_t i = 0; i < maps_to_send.size(); i++) {
437 for (
auto itr = maps_to_send[i].begin(); itr != maps_to_send[i].end(); itr++) {
438 assert(to_send_pos < to_send.size());
439 std::pair<int, std::pair<uint64_t, uint64_t>> triple = (*itr);
440 to_send[to_send_pos++] = uint64_t(triple.first);
441 to_send[to_send_pos++] = triple.second.first;
442 to_send[to_send_pos++] = triple.second.second;
444 to_send_count[i] = maps_to_send[i].size()*3;
447 std::vector<uint64_t> to_recv;
448 std::vector<int> out_recvcnts;
451 mpi_all_to_all(to_send, to_send_count,to_recv, out_recvcnts, m_mpi_comm);
453 for (
size_t k = 0; k < to_recv.size(); ) {
454 const uint64_t local_id = to_recv[k++];
455 const uint64_t source_count = to_recv[k++];
456 const uint64_t dest_count = to_recv[k++];
457 assert(local_id < m_local_incoming_count.size());
467 if (m_local_outgoing_count[local_id] < delegate_degree_threshold
468 && m_local_outgoing_count[local_id] + dest_count >=
469 delegate_degree_threshold) {
474 m_local_outgoing_count[local_id] += source_count;
475 m_local_incoming_count[local_id] += dest_count;
490 template <
typename SegmentManager>
506 uint64_t edge_count = 0;
507 for (uint64_t vert_id = 0; vert_id < m_owned_info.size(); vert_id++) {
508 const uint64_t outgoing = m_local_outgoing_count[vert_id];
509 const uint64_t incoming = m_local_incoming_count[vert_id];
511 m_owned_info[vert_id] =
vert_info(
false, 0, edge_count);
513 if (outgoing < m_delegate_degree_threshold) {
514 edge_count += outgoing;
517 const uint64_t global_id = (vert_id * m_mpi_size) + m_mpi_rank;
518 assert(global_id != 0);
519 if (global_id < m_max_vertex) {
522 assert(global_hubs.count(global_id) != 0);
537 template <
typename SegmentManager>
545 if (m_mpi_rank == 0) {
546 std::cout <<
"\t Setup and Compute Hub information: " << i <<
"/"
547 << processes_per_node <<
"." << std::endl << std::flush;
549 if (i == m_mpi_rank % processes_per_node) {
550 std::vector<uint64_t> vec_sorted_hubs(global_hubs.begin(), global_hubs.end());
551 std::sort(vec_sorted_hubs.begin(), vec_sorted_hubs.end());
558 m_delegate_info.resize(global_hubs.size()+1);
559 m_delegate_degree.resize(vec_sorted_hubs.size());
564 size_t loop_limit = global_hubs.size()+1;
565 m_delegate_info.reserve(loop_limit);
566 for (
size_t j = 0; j < loop_limit; ) {
567 for (
size_t k = 0; k < 10000000; k++) {
568 m_delegate_info.emplace_back();
570 if (j >= loop_limit) {
579 size_t loop_limit = vec_sorted_hubs.size();
580 m_delegate_degree.reserve(loop_limit);
581 for (
size_t j = 0; j < loop_limit; ) {
582 for (
size_t k = 0; k < 10000000; k++) {
583 m_delegate_degree.emplace_back();
585 if (j >= loop_limit) {
595 for(
size_t i=0; i<vec_sorted_hubs.size(); ++i) {
596 uint64_t t_local_id = vec_sorted_hubs[i] / uint64_t(m_mpi_size);
597 int t_owner = uint32_t(vec_sorted_hubs[i] % uint32_t(m_mpi_size));
600 m_map_delegate_locator[vec_sorted_hubs[i]] = new_ver_loc;
601 m_delegate_label.push_back(vec_sorted_hubs[i]);
606 if (t_owner == m_mpi_rank) {
608 m_owned_info[t_local_id].delegate_id = i;
617 template <
typename SegmentManager>
624 if (m_mpi_rank == 0) {
625 std::cout <<
"\tResizing m_owned_targets and m_delegate_targets: "
626 << i <<
"/" << processes_per_node <<
"." << std::endl << std::flush;
629 if (i == m_mpi_rank % processes_per_node) {
632 m_delegate_targets_size = m_edges_high_count;
633 m_owned_targets_size = m_edges_low_count;
635 SegmentManager *segment_manager = seg_allocator.get_segment_manager();
654 assert( m_delegate_info[ m_delegate_info.size()-1] == 0);
655 uint64_t edge_count = 0;
656 for (
size_t i=0; i < m_delegate_info.size(); i++) {
657 uint64_t num_edges = m_delegate_info[i];
658 m_delegate_info[i] = edge_count;
659 edge_count += num_edges;
660 assert(edge_count <= m_edges_high_count);
679 template <
typename SegmentManager>
680 template <
typename Container>
685 uint64_t loop_counter = 0;
686 uint64_t edge_counter = 0;
688 double start_time, last_loop_time, last_part_time;
689 start_time = last_loop_time = last_part_time = MPI_Wtime();
692 for (
size_t node_turn = 0; node_turn < node_partitions; node_turn++) {
694 auto unsorted_itr = unsorted_edges.begin();
695 auto unsorted_itr_end = unsorted_edges.end();
696 if (m_mpi_rank == 0) {
697 double curr_time = MPI_Wtime();
699 std::cout <<
"\t***[LP]["
700 <<
"Partition Number: " << node_turn <<
"/" << node_partitions <<
", "
701 <<
"Total Loops: " << loop_counter <<
", "
702 <<
"Total Edges: " << edge_counter
703 <<
"] Time " << (curr_time - last_part_time) <<
" second, "
704 <<
" Total Time " << (curr_time - start_time) <<
" second, "
707 last_part_time = curr_time;
709 MPI_Barrier(m_mpi_comm);
714 if (m_mpi_rank == 0 && (loop_counter% 500) == 0) {
715 double curr_time = MPI_Wtime();
717 std::cout <<
"\t[LP]["
718 <<
"Partition Number: " << node_turn <<
"/" << node_partitions <<
", "
719 <<
"Total Loops: " << loop_counter <<
", "
720 <<
"Total Edges: " << edge_counter
721 <<
"] Time " << (curr_time - last_loop_time) <<
" second, "
722 <<
" Total Time " << (curr_time - start_time) <<
" second, "
727 last_loop_time = curr_time;
732 std::vector<std::pair<uint64_t, uint64_t> > to_recv_edges_low;
735 std::vector<std::pair<uint64_t, uint64_t> > to_send_edges_low;
736 to_send_edges_low.reserve(edge_chunk_size);
739 unsorted_itr != unsorted_itr_end && i < edge_chunk_size;
742 const auto edge = *unsorted_itr;
745 const int owner = unsorted_itr->first %m_mpi_size;
752 if (m_map_delegate_locator.count(unsorted_itr->first) == 0) {
753 to_send_edges_low.push_back(*unsorted_itr);
767 std::sort(to_recv_edges_low.begin(), to_recv_edges_low.end());
772 for (
size_t i = 0; i<to_recv_edges_low.size(); ++i) {
773 auto edge = to_recv_edges_low[i];
774 assert(
int(edge.first) % m_mpi_size ==m_mpi_rank);
775 assert(m_map_delegate_locator.count(edge.first) == 0);
780 auto itr_end = to_recv_edges_low.end();
781 for (
auto itr = to_recv_edges_low.begin(); itr != itr_end; itr++) {
785 assert(m_mpi_rank ==
int(edge.first % m_mpi_size));
787 uint64_t temp_offset = (m_owned_info_tracker[new_vertex_id])++;
788 uint64_t loc = temp_offset + m_owned_info[new_vertex_id].low_csr_idx;
791 if (!(loc < m_owned_info[new_vertex_id+1].low_csr_idx)) {
792 std::cout <<
"Error rank: " << m_mpi_rank <<
" -- " <<
793 loc <<
" < " << m_owned_info[new_vertex_id+1].low_csr_idx
794 <<
", new_vertex_id = " << new_vertex_id
795 <<
", temp_offset = " << temp_offset
796 <<
", edge = (" << edge.first <<
"," << edge.second <<
")"
797 << std::endl << std::flush;
804 m_owned_targets[loc] = label_to_locator(edge.second);
810 if (m_mpi_rank == 0) {
811 double curr_time = MPI_Wtime();
812 std::cout <<
"\t[LP]["
813 <<
"Total Loops: " << loop_counter <<
", "
814 <<
"Total Edges: " << edge_counter
815 <<
"] Time " << (curr_time - last_loop_time) <<
" second, "
816 <<
" Total Time " << (curr_time - start_time) <<
" second, "
832 template <
typename SegmentManager>
833 template <
typename InputIterator>
837 InputIterator unsorted_itr_end,
838 boost::unordered_set<uint64_t>& global_hub_set) {
842 std::vector<uint64_t> tmp_high_count_per_rank(m_mpi_size, 0);
844 m_edges_high_count = 0;
846 uint64_t loop_counter = 0;
847 uint64_t edge_counter = 0;
849 double start_time, last_loop_time;
850 start_time = last_loop_time = MPI_Wtime();
855 if (m_mpi_rank == 0 && (loop_counter% 1000) == 0) {
856 double curr_time = MPI_Wtime();
858 std::cout <<
"\t[CH]["
859 <<
"Total Loops: " << loop_counter <<
", "
860 <<
"Total Edges: " << edge_counter
861 <<
"] Time " << (curr_time - last_loop_time) <<
" second, "
862 <<
" Total Time " << (curr_time - start_time) <<
" second, "
867 last_loop_time = curr_time;
874 boost::container::map<uint64_t, uint64_t> > maps_to_send(m_mpi_size);
875 int maps_to_send_element_count = 0;
877 for (
size_t i=0; unsorted_itr != unsorted_itr_end && i < edge_chunk_size;
881 const auto edge = *unsorted_itr;
885 if (global_hub_set.count(unsorted_itr->first) == 0) {
887 }
else if(global_hub_set.count(unsorted_itr->first)) {
891 tmp_high_count_per_rank[unsorted_itr->second %m_mpi_size]++;
895 const int owner = unsorted_itr->second %m_mpi_size;
896 if (owner == m_mpi_rank) {
897 const uint64_t ver_id = unsorted_itr->first;
899 const uint64_t new_source_id = m_map_delegate_locator[ver_id].local_id();
900 assert(new_source_id < m_delegate_info.size()-1);
901 m_delegate_info[new_source_id]++;
902 m_edges_high_count++;
904 int c = maps_to_send.at(owner)[unsorted_itr->first]++;
906 maps_to_send_element_count++;
915 send_high_info(maps_to_send, maps_to_send_element_count);
922 if (m_mpi_rank == 0) {
923 double curr_time = MPI_Wtime();
924 std::cout <<
"\t[CH]["
925 <<
"Total Loops: " << loop_counter <<
", "
926 <<
"Total Edges: " << edge_counter
927 <<
"] Time " << (curr_time - last_loop_time) <<
" second, "
928 <<
" Total Time " << (curr_time - start_time) <<
" second, "
934 std::cout <<
"Checking count_high_degree_edges:" << std::endl << std::flush;
935 size_t l_edges_high_count = 0;
936 for (
size_t i = 0; i < m_delegate_info.size(); i++) {
937 l_edges_high_count += m_delegate_info[i];
939 assert(l_edges_high_count == m_edges_high_count);
942 std::vector<uint64_t> high_count_per_rank;
944 std::plus<uint64_t>(), m_mpi_comm);
946 uint64_t sanity_check_high_edge_count = high_count_per_rank[m_mpi_rank];
947 assert(m_edges_high_count == sanity_check_high_edge_count);
960 template <
typename SegmentManager>
964 maps_to_send,
int maps_to_send_element_count) {
967 std::vector<uint64_t> to_send(maps_to_send_element_count*2, 0);
968 std::vector<int> to_send_count(m_mpi_size, 0);
970 assert(maps_to_send.size() == m_mpi_size);
971 for (
size_t i = 0; i < maps_to_send.size(); i++) {
972 for (
auto itr = maps_to_send[i].begin(); itr != maps_to_send[i].end(); itr++) {
973 assert(to_send_pos < to_send.size());
974 to_send[to_send_pos++] = itr->first;
975 to_send[to_send_pos++] = itr->second;
977 to_send_count[i] = maps_to_send[i].size()*2;
980 std::vector<uint64_t> to_recv;
981 std::vector<int> out_recvcnts;
984 mpi_all_to_all(to_send, to_send_count,to_recv, out_recvcnts, m_mpi_comm);
986 for (
size_t i = 0; i < to_recv.size(); i++) {
987 const uint64_t ver_id = to_recv[i++];
988 const uint64_t delegate_dest_count = to_recv[i];
990 const uint64_t new_source_id = m_map_delegate_locator[ver_id].local_id();
991 assert(new_source_id < m_delegate_info.size()-1);
992 m_delegate_info[new_source_id] += delegate_dest_count;
993 m_edges_high_count += delegate_dest_count;
1010 template <
typename SegmentManager>
1020 std::vector<uint64_t> low_count_per_rank, high_count_per_rank;
1021 mpi_all_gather(m_edges_low_count, low_count_per_rank, m_mpi_comm);
1022 mpi_all_gather(m_edges_high_count, high_count_per_rank, m_mpi_comm);
1025 const uint64_t owned_total_edges = m_edges_high_count + m_edges_low_count;
1027 std::plus<uint64_t>(), m_mpi_comm);
1030 const uint64_t target_edges_per_rank = global_edge_count / m_mpi_size;
1032 uint64_t gave_edge_counter = 0;
1033 uint64_t recieve_edge_counter = 0;
1039 int heavy_idx(0), light_idx(0);
1040 for(; heavy_idx < m_mpi_size && light_idx < m_mpi_size; ++heavy_idx) {
1042 while(low_count_per_rank[heavy_idx] + high_count_per_rank[heavy_idx]
1043 > target_edges_per_rank) {
1045 const int64_t total_edges_low_idx = low_count_per_rank[light_idx]
1046 + high_count_per_rank[light_idx];
1047 if(total_edges_low_idx < target_edges_per_rank) {
1050 if(high_count_per_rank[heavy_idx] == 0) {
1057 uint64_t max_to_offload = std::min(high_count_per_rank[heavy_idx],
1058 high_count_per_rank[heavy_idx] + low_count_per_rank[heavy_idx] -
1059 target_edges_per_rank);
1061 uint64_t max_to_receive = target_edges_per_rank -
1062 high_count_per_rank[light_idx] - low_count_per_rank[light_idx];
1064 uint64_t to_move = std::min(max_to_offload, max_to_receive);
1066 assert(to_move != 0);
1068 high_count_per_rank[heavy_idx]-=to_move;
1069 high_count_per_rank[light_idx]+=to_move;
1071 assert(heavy_idx != light_idx);
1072 if (heavy_idx == m_mpi_rank) {
1073 std::vector<uint64_t> send_list;
1075 generate_send_list(send_list, to_move, light_idx, transfer_info);
1077 for (
int i = 0; i < send_list.size();) {
1078 const uint64_t vert_id = send_list[i++];
1079 const uint64_t count = send_list[i++];
1081 std::cout <<
"[" << m_mpi_rank <<
"] giving " << vert_id <<
" +" << count
1082 <<
" to " << light_idx <<
". "<< std::endl << std::flush;
1088 int64_t send_len = send_list.size();
1089 CHK_MPI(MPI_Send(&send_len, 1,
mpi_typeof(send_len), light_idx, 0, m_mpi_comm));
1091 light_idx, 0, m_mpi_comm));
1094 m_edges_high_count -= to_move;
1095 gave_edge_counter += to_move;
1096 }
else if (light_idx == m_mpi_rank) {
1098 int64_t recv_length;
1102 0, m_mpi_comm, &status));
1103 std::vector<uint64_t> recv_list(recv_length);
1105 heavy_idx, 0, m_mpi_comm, &status));
1109 uint64_t sanity_count = 0;
1110 for (
int i = 0; i < recv_length;) {
1111 const uint64_t vert_id = recv_list[i++];
1112 const uint64_t count = recv_list[i++];
1113 m_delegate_info[vert_id] += count;
1114 sanity_count += count;
1117 std::cout <<
"[" << m_mpi_rank <<
"] recieved " << vert_id <<
1118 ", now has " << count <<
" edges for it." << std::endl << std::flush;
1124 m_edges_high_count += to_move;
1125 recieve_edge_counter += to_move;
1126 assert(sanity_count == to_move);
1131 if (light_idx == m_mpi_size) {
1139 const uint64_t owned_total_edges2 = m_edges_high_count + m_edges_low_count;
1140 uint64_t sanity_global_edge_count =
mpi_all_reduce(owned_total_edges2,
1141 std::plus<uint64_t>(), m_mpi_comm);
1142 assert(sanity_global_edge_count == global_edge_count);
1143 assert(m_edges_high_count == high_count_per_rank[m_mpi_rank]);
1144 uint64_t high_count_2 = 0;
1145 for (
size_t i = 0; i < m_delegate_info.size()-1; i++) {
1146 high_count_2 += m_delegate_info[i];
1149 assert(m_edges_high_count == high_count_2);
1151 std::vector<uint64_t> low_count_per_rank2, high_count_per_rank2;
1152 mpi_all_gather(m_edges_low_count, low_count_per_rank2, m_mpi_comm);
1153 mpi_all_gather(m_edges_high_count, high_count_per_rank2, m_mpi_comm);
1155 for (
size_t i = 0; i < m_mpi_size; i++) {
1156 assert(low_count_per_rank2[i] == low_count_per_rank[i]);
1157 assert(high_count_per_rank2[i] == high_count_per_rank[i]);
1161 for (
int i = 0; i < m_mpi_size; i++) {
1162 if (i == m_mpi_rank) {
1163 std::cout << i <<
" has " << ((int64_t)owned_total_edges - (int64_t)target_edges_per_rank)
1164 <<
" extra edges. Givng:" << gave_edge_counter <<
" Recieving: "
1165 << recieve_edge_counter <<
"." << std::endl;
1167 MPI_Barrier(m_mpi_comm);
1192 template <
typename SegmentManager>
1197 std::map< uint64_t, std::deque<OverflowSendInfo> > &transfer_info ) {
1199 uint64_t send_count = 0;
1200 uint64_t ver_count = 0;
1201 for (uint64_t i = 0; i < m_delegate_info.size()-1 && send_count < num_send;) {
1202 if (m_delegate_info[i] != 0) {
1203 send_count += m_delegate_info[i];
1210 send_list.reserve(ver_count * 2);
1212 for (uint64_t i = 0; i < m_delegate_info.size()-1 && send_count < num_send;) {
1213 if (m_delegate_info[i] != 0) {
1214 uint64_t edges_to_give = m_delegate_info[i];
1216 if (send_count + edges_to_give > num_send) {
1217 edges_to_give = num_send - send_count;
1220 m_delegate_info[i] -= edges_to_give;
1221 send_count += edges_to_give;
1223 send_list.push_back(i);
1224 send_list.push_back(edges_to_give);
1228 if (transfer_info.count(i) == 0 ) {
1229 transfer_info[i] = std::deque<OverflowSendInfo>();
1235 assert(num_send == send_count);
1253 template <
typename SegmentManager>
1254 template <
typename Container>
1258 std::map< uint64_t, std::deque<OverflowSendInfo> > &transfer_info) {
1263 uint64_t loop_counter = 0;
1264 uint64_t edge_counter = 0;
1265 double start_time, last_loop_time, last_part_time;
1266 start_time = last_loop_time = last_part_time = MPI_Wtime();
1267 uint64_t gave_edge_counter = 0;
1270 std::vector<std::pair<uint64_t, uint64_t> > to_send_edges_high;
1271 to_send_edges_high.reserve(edge_chunk_size);
1273 for (
size_t node_turn = 0; node_turn < node_partitions; node_turn++) {
1275 if (m_mpi_rank == 0) {
1276 double curr_time = MPI_Wtime();
1278 std::cout <<
"\t***[HP]["
1279 <<
"Partition Number: " << node_turn <<
"/" << node_partitions <<
", "
1280 <<
"Total Loops: " << loop_counter <<
", "
1281 <<
"Total Edges: " << edge_counter
1282 <<
"] Time " << (curr_time - last_part_time) <<
" second, "
1283 <<
" Total Time " << (curr_time - start_time) <<
" second, "
1287 last_part_time = curr_time;
1289 MPI_Barrier(m_mpi_comm);
1292 auto unsorted_itr = unsorted_edges.begin();
1293 auto unsorted_itr_end = unsorted_edges.end();
1297 if (m_mpi_rank == 0 && (loop_counter % 500) == 0) {
1298 double curr_time = MPI_Wtime();
1300 std::cout <<
"\t[HP]["
1301 <<
"Partition Number: " << node_turn <<
"/" << node_partitions <<
", "
1302 <<
"Total Loops: " << loop_counter <<
", "
1303 <<
"Total Edges: " << edge_counter
1304 <<
"] Time " << (curr_time - last_loop_time) <<
" second, "
1305 <<
" Total Time " << (curr_time - start_time) <<
" second, "
1309 last_loop_time = curr_time;
1314 while (unsorted_itr != unsorted_itr_end &&
1315 to_send_edges_high.size() < edge_chunk_size) {
1317 const auto edge = *unsorted_itr;
1321 const int owner = unsorted_itr->second %m_mpi_size;
1329 if (m_map_delegate_locator.count(edge.first) == 1) {
1332 const uint64_t source_id = edge.first;
1333 uint64_t new_source_id = m_map_delegate_locator[source_id].local_id();
1334 assert(new_source_id >=0 && new_source_id < m_delegate_info.size()-1);
1337 to_send_edges_high.push_back(std::make_pair(new_source_id, edge.second));
1348 std::vector< std::pair<uint64_t, uint64_t> > to_recv_edges_high;
1355 std::vector<std::pair<uint64_t, uint64_t> > temp;
1356 to_send_edges_high.swap(temp);
1358 to_send_edges_high.reserve(edge_chunk_size);
1360 assert(to_send_edges_high.size() == 0);
1361 std::sort(to_recv_edges_high.begin(), to_recv_edges_high.end());
1363 for (
size_t i = 0; i < to_recv_edges_high.size(); ++i) {
1366 const auto edge = to_recv_edges_high[i];
1367 const uint64_t new_source_id = edge.first;
1368 assert(new_source_id >=0 && new_source_id < m_delegate_info.size()-1);
1370 uint64_t place_pos = m_delegate_degree[new_source_id];
1371 place_pos += m_delegate_info[new_source_id];
1373 if (place_pos == m_delegate_info[new_source_id+1]) {
1376 assert(transfer_info.size() > 0);
1377 assert(transfer_info.count(new_source_id) != 0);
1378 to_send_edges_high.push_back(edge);
1379 gave_edge_counter++;
1382 assert(place_pos < m_delegate_info[new_source_id+1]);
1383 assert(place_pos < m_delegate_targets_size);
1385 uint64_t new_target_label = edge.second;
1386 m_delegate_targets[place_pos] = label_to_locator(new_target_label);
1387 assert(m_delegate_targets[place_pos].m_owner_dest < m_mpi_size);
1388 m_delegate_degree[new_source_id]++;
1391 assert(transfer_info.size() == 0);
1400 if (m_mpi_rank == 0) {
1401 double curr_time = MPI_Wtime();
1402 std::cout <<
"\t[HP]["
1403 <<
"Total Loops: " << loop_counter <<
", "
1404 <<
"Total Edges: " << edge_counter
1405 <<
"] Time " << (curr_time - last_loop_time) <<
" second, "
1406 <<
" Total Time " << (curr_time - start_time) <<
" second, "
1415 std::vector< std::pair<uint64_t, uint64_t> > to_recv_edges_high;
1422 std::vector<std::pair<uint64_t, uint64_t> > temp;
1423 to_send_edges_high.swap(temp);
1426 std::sort(to_recv_edges_high.begin(), to_recv_edges_high.end());
1427 for (
size_t i=0; i<to_recv_edges_high.size(); ++i) {
1430 const auto edge = to_recv_edges_high[i];
1431 const uint64_t new_source_id = edge.first;
1432 assert(new_source_id >=0 && new_source_id < m_delegate_info.size()-1);
1434 uint64_t place_pos = m_delegate_degree[new_source_id];
1435 place_pos += m_delegate_info[new_source_id];
1437 if (place_pos == m_delegate_info[new_source_id+1]) {
1443 assert(place_pos < m_delegate_info[new_source_id+1]);
1444 assert(place_pos < m_delegate_targets_size);
1446 uint64_t new_target_label = edge.second;
1447 m_delegate_targets[place_pos] = label_to_locator(new_target_label);
1448 assert(m_delegate_targets[place_pos].m_owner_dest < m_mpi_size);
1449 m_delegate_degree[new_source_id]++;
1452 assert(transfer_info.size() == 0);
1460 int64_t recv_count2 = 0;
1461 for (
size_t i = 0; i < m_delegate_degree.size(); i++) {
1462 recv_count2 += m_delegate_degree[i];
1465 int64_t difference = recv_count2 - m_delegate_targets_size;
1467 for (
size_t i = 0; i < m_delegate_info.size()-1; i++) {
1468 size_t pos = m_delegate_info[i];
1469 for (
size_t j = pos; j < m_delegate_info[i+1]; j++) {
1470 assert(m_delegate_targets[j].m_owner_dest != 0xFFFFF);
1471 assert(m_delegate_targets[j].m_local_id != 0x7FFFFFFFFF);
1487 template <
typename SegmentManager>
1495 res = m_delegate_label[locator.
local_id()];
1497 res = uint64_t(locator.
local_id()) * uint64_t(m_mpi_size) +
1498 uint64_t(locator.
owner());
1507 template <
typename SegmentManager>
1516 auto itr = m_map_delegate_locator.find(label);
1518 if(itr == m_map_delegate_locator.end()) {
1519 uint32_t owner = label % uint64_t(m_mpi_size);
1520 uint64_t local_id = label / uint64_t(m_mpi_size);
1533 template <
typename SegmentManager>
1537 boost::unordered_set<uint64_t>& global_hubs,
1538 bool local_change) {
1540 std::plus<uint32_t>(), m_mpi_comm);
1543 std::vector<uint64_t> vec_local_hubs(local_hubs.begin(), local_hubs.end());
1544 std::vector<uint64_t> vec_global_hubs;
1548 global_hubs.insert(vec_global_hubs.begin(), vec_global_hubs.end());
1556 template <
typename SegmentManager>
1563 assert(locator.
local_id() < m_delegate_info.size()-1);
1566 assert(locator.
owner() == m_mpi_rank);
1567 assert(locator.
local_id() < m_owned_info.size());
1576 template <
typename SegmentManager>
1583 assert(locator.
local_id()+1 < m_delegate_info.size());
1586 assert(locator.
owner() == m_mpi_rank);
1587 assert(locator.
local_id()+1 < m_owned_info.size());
1595 template <
typename SegmentManager>
1601 uint64_t local_id = locator.
local_id();
1603 assert(local_id < m_delegate_degree.size());
1604 return m_delegate_degree[local_id];
1606 assert(local_id + 1 < m_owned_info.size());
1607 return m_owned_info[local_id+1].low_csr_idx -
1608 m_owned_info[local_id].low_csr_idx;
1615 template <
typename SegmentManager>
1621 uint64_t local_id = locator.
local_id();
1623 assert(local_id + 1 < m_delegate_info.size());
1624 return m_delegate_info[local_id + 1] - m_delegate_info[local_id];
1626 assert(local_id + 1 < m_owned_info.size());
1627 return m_owned_info[local_id+1].low_csr_idx -
1628 m_owned_info[local_id].low_csr_idx;
1632 template <
typename SegmentManager>
1640 template <
typename SegmentManager>
1648 template <
typename SegmentManager>
1653 return m_map_delegate_locator.count(label) > 0;
1659 template <
typename SegmentManager>
1660 template <
typename T,
typename SegManagerOther>
1664 const char *obj_name)
const {
1668 if (obj_name ==
nullptr) {
1669 return segment_manager_o->template construct<mytype>(bip::anonymous_instance)
1670 (m_owned_targets_size, m_delegate_targets_size,
1673 return segment_manager_o->template construct<mytype>(obj_name)
1674 (m_owned_targets_size, m_delegate_targets_size,
1683 template <
typename SegmentManager>
1684 template <
typename T,
typename SegManagerOther>
1688 const char *obj_name)
const {
1693 if (obj_name ==
nullptr) {
1694 return segment_manager_o->template construct<mytype>(bip::anonymous_instance)
1695 (m_owned_targets_size, m_delegate_targets_size, init,
1698 return segment_manager_o->template construct<mytype>(obj_name)
1699 (m_owned_targets_size, m_delegate_targets_size, init,
1704 template <
typename SegmentManager>
1715 uint64_t low_local_size = m_owned_targets_size;
1716 uint64_t high_local_size = m_delegate_targets_size;
1717 uint64_t total_local_size = low_local_size + high_local_size;
1721 if (m_mpi_rank == 0) {
1722 std::cout <<
"Reducing.." << std::endl;
1725 std::greater<uint64_t>(), m_mpi_comm);
1727 std::greater<uint64_t>(), m_mpi_comm);
1729 std::greater<uint64_t>(), m_mpi_comm);
1731 std::plus<uint64_t>(), m_mpi_comm);
1733 std::plus<uint64_t>(), m_mpi_comm);
1735 std::plus<uint64_t>(), m_mpi_comm);
1737 if (m_mpi_rank == 0) {
1738 std::cout <<
"counting..(" << m_owned_targets_size <<
")" << std::endl;
1740 uint64_t local_count_del_target = 0;
1741 for (uint64_t i = 0; i < m_owned_targets_size; ++i) {
1742 if (m_owned_targets[i].is_delegate())
1743 ++local_count_del_target;
1744 if (m_mpi_rank == 0 && i % 10000000 == 0) {
1745 std::cout << i <<
"..." << std::flush;
1749 if (m_mpi_rank == 0) {
1750 std::cout <<
"reducing(2).." << std::endl;
1753 uint64_t total_count_del_target =
mpi_all_reduce(local_count_del_target,
1754 std::plus<uint64_t>(), m_mpi_comm);
1756 if (m_mpi_rank == 0) {
1758 <<
"========================================================" << std::endl
1759 <<
"Graph Statistics" << std::endl
1760 <<
"========================================================" << std::endl
1761 <<
"\tMax Local Vertex Id = " << m_max_vertex << std::endl
1762 <<
"\tHub vertices = " << m_map_delegate_locator.size() << std::endl
1763 <<
"\tTotal percentage good hub edges = " <<
1764 double(high_sum_size) / double(total_sum_size) * 100.0 << std::endl
1765 <<
"\tTotal count del target = " << total_count_del_target << std::endl
1766 <<
"\tTotal percentage of localized edges = " <<
1767 double(high_sum_size + total_count_del_target) /
1768 double(total_sum_size) * 100.0 << std::endl
1769 <<
"\tGlobal number of edges = " << total_sum_size << std::endl
1770 <<
"\tNumber of small degree = " << low_sum_size << std::endl
1771 <<
"\tNumber of hubs = " << high_sum_size << std::endl
1772 <<
"\toned imbalance = "
1773 << double(low_max_size) / double(low_sum_size/m_mpi_size) << std::endl
1774 <<
"\thubs imbalance = "
1775 << double(high_max_size) / double(high_sum_size/m_mpi_size) << std::endl
1776 <<
"\tTOTAL imbalance = "
1777 << double(total_max_size) / double(total_sum_size/m_mpi_size) << std::endl
1778 <<
"\tNode Partition = " << node_partitions << std::endl
1779 <<
"\tEdge Chunk Size = " << edge_chunk_size << std::endl
1782 <<
"========================================================" << std::endl
1783 <<
"End Graph Statistics" << std::endl
1784 <<
"========================================================" << std::endl
1791 #endif // HAVOQGT_MPI_IMPL_DELEGATE_PARTITIONED_GRAPH_IPP_INCLUDED
edge_iterator edges_end(vertex_locator locator) const
Returns an end iterator for edges of a vertex.
bool global_iterator_range_empty(Iterator itr, Iterator itr_end, MPI_Comm comm)
void count_edge_degrees(InputIterator unsorted_itr, InputIterator unsorted_itr_end, boost::unordered_set< uint64_t > &global_hub_set, uint64_t delegate_degree_threshold)
vertex_iterator vertices_end() const
Returns an end iterator for all local vertices.
void partition_low_degree(Container &unsorted_edges)
bip::vector< uint32_t, SegmentAllocator< uint32_t > > m_local_outgoing_count
uint64_t degree(vertex_locator locator) const
Returns the degree of a vertex.
void send_high_info(std::vector< boost::container::map< uint64_t, uint64_t > > &maps_to_send, int maps_to_send_element_count)
uint64_t locator_to_label(vertex_locator locator) const
Converts a vertex_locator to the vertex label.
void mpi_all_to_all_better(std::vector< T > &in_vec, std::vector< T > &out_vec, Partitioner &owner, MPI_Comm mpi_comm)
edge_iterator edges_begin(vertex_locator locator) const
Returns a begin iterator for edges of a vertex.
void mpi_all_reduce_inplace(Vec &vec, Op in_op, MPI_Comm mpi_comm)
bool is_label_delegate(uint64_t label) const
Tests if vertex label is a delegate.
uint64_t local_degree(vertex_locator locator) const
Returns the local degree of a vertex.
uint32_t get_dirty_pages()
vertex_iterator vertices_begin() const
Returns a begin iterator for all local vertices.
ConstructionState m_graph_state
void send_vertex_info(uint64_t &high_vertex_count, uint64_t delegate_degree_threshold, std::vector< boost::container::map< int, std::pair< uint64_t, uint64_t > > > &maps_to_send, int maps_to_send_element_count)
T mpi_all_reduce(T in_d, Op in_op, MPI_Comm mpi_comm)
void complete_construction(const SegmentAllocator< void > &seg_allocator, MPI_Comm mpi_comm, Container &edges)
Edge storage allocation phase of graph construction.
void initialize_high_meta_data(boost::unordered_set< uint64_t > &global_hubs)
void partition_high_degree(Container &unsorted_edges, std::map< uint64_t, std::deque< OverflowSendInfo > > &transfer_info)
bip::vector< vert_info, SegmentAllocator< vert_info > > m_owned_info
bip::vector< uint32_t, SegmentAllocator< uint32_t > > m_local_incoming_count
environment * havoqgt_env()
void generate_send_list(std::vector< uint64_t > &send_list, uint64_t num_send, int send_id, std::map< uint64_t, std::deque< OverflowSendInfo > > &transfer_info)
void mpi_all_gather(T _t, std::vector< T > &out_p_vec, MPI_Comm mpi_comm)
TODO: Add tests.
MPI_Datatype mpi_typeof(char)
void sync_global_hub_set(const boost::unordered_set< uint64_t > &local_hubs, boost::unordered_set< uint64_t > &global_hubs, bool local_change)
Synchronizes hub set amongst all processes.
void mpi_yield_barrier(MPI_Comm mpi_comm)
void initialize_edge_storage(const SegmentAllocator< void > &seg_allocator)
void calculate_overflow(std::map< uint64_t, std::deque< OverflowSendInfo > > &transfer_info)
const int processes_per_node
vertex_locator label_to_locator(uint64_t label) const
Converts a vertex label to a vertex_locator.
uint64_t local_id() const
void initialize_low_meta_data(boost::unordered_set< uint64_t > &global_hub_set)
const communicator & node_local_comm() const
bip::vector< uint32_t, SegmentAllocator< uint32_t > > m_owned_info_tracker
bip::allocator< T, SegementManager > SegmentAllocator
uint64_t m_global_max_vertex
delegate_partitioned_graph(const SegmentAllocator< void > &seg_allocator, MPI_Comm mpi_comm, Container &edges, uint64_t max_vertex, uint64_t delegate_degree_threshold, ConstructionState stop_after=GraphReady)
Constructor that initializes given and unsorted sequence of edges.
void count_high_degree_edges(InputIterator unsorted_itr, InputIterator unsorted_itr_end, boost::unordered_set< uint64_t > &global_hub_set)
void print_graph_statistics()
struct havoqgt::mpi::OverflowSendInfo OverflowSendInfo
edge_data< T, SegManagerOther > * create_edge_data(SegManagerOther *, const char *obj_name=nullptr) const
Creates edge_data of type T.
void mpi_all_to_all(std::vector< T > &in_vec, std::vector< int > &in_sendcnts, std::vector< T > &out_vec, std::vector< int > &out_recvcnts, MPI_Comm mpi_comm)