HavoqGT
parallel_edge_list_reader.hpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013, Lawrence Livermore National Security, LLC.
3  * Produced at the Lawrence Livermore National Laboratory.
4  * Written by Roger Pearce <rpearce@llnl.gov>.
5  * LLNL-CODE-644630.
6  * All rights reserved.
7  *
8  * This file is part of HavoqGT, Version 0.1.
9  * For details, see https://computation.llnl.gov/casc/dcca-pub/dcca/Downloads.html
10  *
11  * Please also read this link – Our Notice and GNU Lesser General Public License.
12  * http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html
13  *
14  * This program is free software; you can redistribute it and/or modify it under
15  * the terms of the GNU Lesser General Public License (as published by the Free
16  * Software Foundation) version 2.1 dated February 1999.
17  *
18  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
19  * WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or FITNESS FOR A
20  * PARTICULAR PURPOSE. See the terms and conditions of the GNU General Public
21  * License for more details.
22  *
23  * You should have received a copy of the GNU Lesser General Public License along
24  * with this program; if not, write to the Free Software Foundation, Inc.,
25  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26  *
27  * OUR NOTICE AND TERMS AND CONDITIONS OF THE GNU GENERAL PUBLIC LICENSE
28  *
29  * Our Preamble Notice
30  *
31  * A. This notice is required to be provided under our contract with the
32  * U.S. Department of Energy (DOE). This work was produced at the Lawrence
33  * Livermore National Laboratory under Contract No. DE-AC52-07NA27344 with the DOE.
34  *
35  * B. Neither the United States Government nor Lawrence Livermore National
36  * Security, LLC nor any of their employees, makes any warranty, express or
37  * implied, or assumes any liability or responsibility for the accuracy,
38  * completeness, or usefulness of any information, apparatus, product, or process
39  * disclosed, or represents that its use would not infringe privately-owned rights.
40  *
41  * C. Also, reference herein to any specific commercial products, process, or
42  * services by trade name, trademark, manufacturer or otherwise does not
43  * necessarily constitute or imply its endorsement, recommendation, or favoring by
44  * the United States Government or Lawrence Livermore National Security, LLC. The
45  * views and opinions of authors expressed herein do not necessarily state or
46  * reflect those of the United States Government or Lawrence Livermore National
47  * Security, LLC, and shall not be used for advertising or product endorsement
48  * purposes.
49  *
50  */
51 
52 #ifndef HAVOQGT_PARALLEL_EDGE_LIST_READER_INCLUDED
53 #define HAVOQGT_PARALLEL_EDGE_LIST_READER_INCLUDED
54 
55 #include <vector>
56 #include <fstream>
57 #include <deque>
58 #include <string>
59 #include <sstream>
60 #include <utility>
61 #include <stdint.h>
62 
63 #include <havoqgt/environment.hpp>
64 
65 namespace havoqgt {
66 
70 
71 public:
72  typedef uint64_t vertex_descriptor;
73  typedef std::pair<uint64_t, uint64_t> edge_type;
74 
75 
78 
79  class input_iterator_type : public std::iterator<std::input_iterator_tag, edge_type, ptrdiff_t, const edge_type*, const edge_type&> {
80 
81  public:
82  input_iterator_type(parallel_edge_list_reader* ptr_reader, uint64_t count)
83  : m_ptr_reader(ptr_reader)
84  , m_count(count){
85  if(m_count == 0) {
86  get_next();
87  m_count = 0; //reset to zero
88  }
89  }
90 
91  const edge_type& operator*() const { return m_current; }
92  //const uint64_t* operator->() const { return &(operator*()); }
94  get_next();
95  return *this;
96  }
97 
99  input_iterator_type __tmp = *this;
100  get_next();
101  return __tmp;
102  }
103 
104  edge_type *operator->() {
105  return &m_current;
106  }
107 
108  bool is_equal(const input_iterator_type& _x) const {
109  return m_count == (_x.m_count);
110  }
111 
113  friend bool
115  { return x.is_equal(y); }
116 
118  friend bool
120  { return !x.is_equal(y); }
121 
122  private:
124 
125  void get_next() {
126  bool ret = m_ptr_reader->try_read_edge(m_current);
127  ++m_count;
128  assert(m_current.first <= m_ptr_reader->max_vertex_id());
129  assert(m_current.second <= m_ptr_reader->max_vertex_id());
130  }
131 
133  uint64_t m_count;
134  edge_type m_current;
135  };
136 
137 
139  parallel_edge_list_reader(const std::vector< std::string >& filenames ) {
140  int mpi_rank = havoqgt_env()->world_comm().rank();
141  int mpi_size = havoqgt_env()->world_comm().size();
142  m_local_edge_count = 0;
144 
145  // identify filenames to be read by local rank
146  for(size_t i=0; i<filenames.size(); ++i) {
147  if(i % mpi_size == mpi_rank) {
148  m_local_filenames.push_back(filenames[i]);
149  }
150  }
151 
152  // First pass to calc max vertex and count edges.
153  open_files();
154  std::cout << "files open" << std::endl;
155  edge_type edge;
156  uint64_t local_max_vertex = 0;
157  while(try_read_edge(edge)) {
159  local_max_vertex = std::max(edge.first, local_max_vertex);
160  local_max_vertex = std::max(edge.second, local_max_vertex);
161  }
162  m_global_max_vertex = mpi::mpi_all_reduce(local_max_vertex, std::greater<uint64_t>(), MPI_COMM_WORLD);
163  }
164 
165 
168  // Reset and prepare to have readers
169  open_files();
170  return input_iterator_type(this, 0);
171  }
172 
176  }
177 
179  uint64_t max_vertex_id() {
180  return m_global_max_vertex;
181  }
182 
183  size_t size() {
184  return m_local_edge_count;
185  }
186 
187 protected:
188 
189  bool try_read_edge(edge_type& edge) {
190  std::string line;
191  while(!m_ptr_ifstreams.empty()) {
192  if(std::getline(*(m_ptr_ifstreams.front()), line)) {
193  std::stringstream ssline(line);
194  ssline >> edge.first >> edge.second;
195  return true;
196  } else { //No remaining lines, close file.
197  delete m_ptr_ifstreams.front();
198  m_ptr_ifstreams.pop_front();
199  }
200  }
201  return false;
202  }
203 
204  void open_files() {
205  if(!m_ptr_ifstreams.empty()) {
206  HAVOQGT_ERROR_MSG("m_ptr_ifstreams not empty.");
207  }
208  for(auto itr=m_local_filenames.begin(); itr!=m_local_filenames.end(); ++itr) {
209  std::ifstream* ptr = new std::ifstream(*itr);
210  if(ptr->good()) {
211  m_ptr_ifstreams.push_back(ptr);
212  } else {
213  std::cerr << "Error opening filename: " << *itr;
214  }
215  }
216  }
217 
218  std::vector< std::string > m_local_filenames;
219  std::deque< std::ifstream* > m_ptr_ifstreams;
222 
223 };
224 
225 } //end namespace havoqgt
226 
227 #endif //HAVOQGT_PARALLEL_EDGE_LIST_READER_INCLUDED
std::pair< uint64_t, uint64_t > edge_type
input_iterator_type(parallel_edge_list_reader *ptr_reader, uint64_t count)
parallel_edge_list_reader(const std::vector< std::string > &filenames)
friend bool operator==(const input_iterator_type &x, const input_iterator_type &y)
Return true if x and y are both end or not end, or x and y are the same.
std::deque< std::ifstream * > m_ptr_ifstreams
const communicator & world_comm() const
input_iterator_type begin()
Returns the begin of the input iterator.
int size() const
Definition: mpi.hpp:618
T mpi_all_reduce(T in_d, Op in_op, MPI_Comm mpi_comm)
Definition: mpi.hpp:176
int rank() const
Definition: mpi.hpp:619
environment * havoqgt_env()
input_iterator_type end()
Returns the end of the input iterator.
friend bool operator!=(const input_iterator_type &x, const input_iterator_type &y)
Return false if x and y are both end or not end, or x and y are the same.
#define HAVOQGT_ERROR_MSG(msg)
Definition: error.hpp:62