AmpTools
DataReaderMPI.h
Go to the documentation of this file.
1 #ifndef DATAREADERMPI
2 #define DATAREADERMPI
3 
4 #include "mpi.h"
5 
7 #include "IUAmpToolsMPI/MPITag.h"
8 
14 struct KinStruct {
15 
16  int nPart;
17  float weight;
22 };
23 
37 class DataReader;
38 
39 template < class T >
40 class DataReaderMPI : public T
41 {
42 
43 public:
44 
49  DataReaderMPI() : T() { m_isDefault = true; m_isMaster = true; }
50 
56  DataReaderMPI( const vector<string>& args );
57 
58  virtual ~DataReaderMPI();
59 
60  // override these functions with parallelized versions -- it is very
61  // important that the methods in the base class be declared virtual!
62 
63  Kinematics* getEvent();
64 
65  void resetSource();
66 
67  unsigned int numEvents() const;
68 
72  virtual DataReader* newDataReader( const vector< string >& args ) const{
73  return new DataReaderMPI<T>( args );
74  }
75 
76 
80  virtual DataReader* clone() const{
81  return ( isDefault() ? new DataReaderMPI<T>() : new DataReaderMPI<T>( arguments() ) );
82  }
83 
87  virtual vector<string> arguments() const { return m_args; }
88 
93  virtual bool isDefault() const { return ( m_isDefault == true ); }
94 
95 
96 private:
97 
98  bool m_isDefault;
99  vector<string> m_args;
100 
101  // some helper functions:
102 
103  void defineMPIType();
104  void distributeData();
105  void receiveData();
106 
107  Kinematics* createKin( KinStruct* kinStruct );
108  void fillStruct( KinStruct* kinStruct, Kinematics* kin );
109 
110  MPI_Datatype MPI_KinStruct;
111 
112  int m_rank;
113  int m_numProc;
114  bool m_isMaster;
115 
116  vector<Kinematics*> m_ptrCache;
117  vector<Kinematics*>::iterator m_ptrItr;
118 
119  unsigned int m_numEvents;
120 
121 };
122 
123 
124 template< class T >
125 DataReaderMPI<T>::DataReaderMPI( const vector< string >& args ) :
126  T( args ),
127  m_ptrCache( 0 ),
128  m_ptrItr( m_ptrCache.begin() ),
129  m_isDefault(false),
130  m_args(args)
131 {
132 
133  MPI_Comm_rank( MPI_COMM_WORLD, &m_rank );
134  MPI_Comm_size( MPI_COMM_WORLD, &m_numProc );
135 
136  m_isMaster = ( m_rank == 0 );
137 
138  defineMPIType();
139 
140  if( m_isMaster ) distributeData();
141  else receiveData();
142 }
143 
144 template< class T >
146 
147  // clean up data cache on worker nodes
148 
149  if( !m_isMaster && !m_isDefault ){
150 
151  for( vector<Kinematics*>::iterator ptrItr = m_ptrCache.begin();
152  ptrItr != m_ptrCache.end();
153  ++ptrItr ){
154 
155  delete *ptrItr;
156  }
157 
158  m_ptrCache.clear();
159  }
160 }
161 
162 template< class T >
164 {
165 
166  if( m_isMaster ) return T::getEvent();
167 
168  if( m_ptrItr != m_ptrCache.end() ){
169 
170  // the standard behavior for DataReaders is that the calling class
171  // takes ownership of the memory returned by getEvent -- this means
172  // that this memory will be deleted after the call. We don't want
173  // our cache of data on the workers to be deleted since this
174  // doesn't allow for a reset and re-read of the data. Return a new
175  // Kinematics object then.
176 
177  return new Kinematics(**m_ptrItr++);
178  }
179  else{
180 
181  return NULL;
182  }
183 }
184 
185 template< class T >
187 {
188 
189  if( m_isMaster ){
190 
191  // cout << "Resetting master data source " << m_rank << endl;
192 
193  // on the master, we want the data reader to behave
194  // like a normal instance of the non-MPI version so use the
195  // user defined source reset method
196 
197  T::resetSource();
198  }
199  else{
200 
201  // cout << "Resetting data source on process with rank " << m_rank << endl;
202 
203  // on the workers the "source" is the local cache of Kinematics
204  // objects so this should be reset
205 
206  m_ptrItr = m_ptrCache.begin();
207  }
208 }
209 
210 template< class T >
212 {
213 
214  assert( m_numProc > 1 );
215 
216  MPI_Status status;
217 
218  int totalEvents = T::numEvents();
219  int stepSize = totalEvents / ( m_numProc - 1 );
220  int remainder = totalEvents % ( m_numProc - 1 );
221 
222  KinStruct* kinArray = new KinStruct[stepSize+1];
223 
224  for( int i = 1; i < m_numProc; ++i ){
225 
226  int nEvents = ( i > remainder ? stepSize : stepSize + 1 );
227 
228  cout << "Sending process " << i << " " << nEvents << " events" << endl;
229  flush( cout );
230 
231  MPI_Send( &nEvents, 1, MPI_INT, i, MPITag::kIntSend, MPI_COMM_WORLD );
232  for( int j = 0; j < nEvents; ++j ){
233 
234  Kinematics* event = T::getEvent();
235 
236  // the pointer should never be null as long as we can count OK
237  assert( event != NULL );
238  fillStruct( &(kinArray[j]), event );
239  delete event;
240  }
241 
242  MPI_Send( kinArray, nEvents, MPI_KinStruct, i, MPITag::kDataSend,
243  MPI_COMM_WORLD );
244 
245  // cout << "Waiting for acknowledge from " << i << endl;
246  // flush( cout );
247 
248  // wait for acknowledgment before continuing
249  MPI_Recv( &nEvents, 1, MPI_INT, i, MPITag::kAcknowledge,
250  MPI_COMM_WORLD, &status );
251 
252  // cout << "Send to process " << i << " finished." << endl;
253  // flush( cout );
254  }
255 
256  delete[] kinArray;
257 }
258 
259 template< class T >
261 {
262 
263  int nEvents;
264 
265  MPI_Status status;
266 
267  MPI_Recv( &nEvents, 1, MPI_INT, 0, MPITag::kIntSend,
268  MPI_COMM_WORLD, &status );
269 
270  // cout << "Process " << m_rank << " waiting for "
271  // << nEvents << " events." << endl;
272 
273  KinStruct* kinArray = new KinStruct[nEvents];
274 
275  MPI_Recv( kinArray, nEvents, MPI_KinStruct, 0, MPITag::kDataSend,
276  MPI_COMM_WORLD, &status );
277 
278  for( int i = 0; i < nEvents; ++i ){
279 
280  m_ptrCache.push_back( createKin( &(kinArray[i]) ) );
281  }
282 
283  cout << "Process " << m_rank << " received "
284  << m_ptrCache.size() << " events." << endl;
285  flush( cout );
286 
287  // adjust the pointer iterator to point to the beginning of the cache
288  resetSource();
289  m_numEvents = static_cast< unsigned int >( m_ptrCache.size() );
290 
291  // send acknowledgment
292  MPI_Send( &nEvents, 1, MPI_INT, 0, MPITag::kAcknowledge, MPI_COMM_WORLD );
293 
294  delete[] kinArray;
295 }
296 
297 template< class T >
298 unsigned int DataReaderMPI<T>::numEvents() const
299 {
300  if( m_isMaster ){
301 
302  return T::numEvents();
303  }
304  else{
305 
306  return m_numEvents;
307  }
308 }
309 
310 template< class T >
311 void DataReaderMPI<T>::fillStruct( KinStruct* kinStruct, Kinematics* kin )
312 {
313 
314  const vector<TLorentzVector>& partList = kin->particleList();
315  kinStruct->nPart = partList.size();
316  kinStruct->weight = kin->weight();
317 
318  assert( partList.size() <= Kinematics::kMaxParticles );
319 
320  for( vector<TLorentzVector>::const_iterator vec = partList.begin();
321  vec != partList.end(); ++vec ){
322 
323  int i = ( vec - partList.begin() );
324 
325  kinStruct->e[i] = (*vec).E();
326  kinStruct->px[i] = (*vec).Px();
327  kinStruct->py[i] = (*vec).Py();
328  kinStruct->pz[i] = (*vec).Pz();
329  }
330 }
331 
332 template< class T >
334 {
335 
336  vector<TLorentzVector> partList;
337 
338  for( int i = 0; i < kinStruct->nPart; ++i ){
339 
340  partList.push_back( TLorentzVector( kinStruct->px[i],
341  kinStruct->py[i],
342  kinStruct->pz[i],
343  kinStruct->e[i] ) );
344  }
345 
346  return new Kinematics( partList, kinStruct->weight );
347 }
348 
349 template< class T >
351 {
352 
353  KinStruct kinStruct;
354 
355  // arrays used to define info about the six elements in the struct
356  int length[6];
357  MPI_Aint loc[6];
358  MPI_Datatype type[6];
359 
360  MPI_Aint baseAddress;
361  MPI_Address( &kinStruct, &baseAddress );
362 
363  length[0] = 1;
364  MPI_Address( &kinStruct.nPart, &loc[0] );
365  loc[0] -= baseAddress;
366  type[0] = MPI_INT;
367 
368  length[1] = 1;
369  MPI_Address( &kinStruct.weight, &loc[1] );
370  loc[1] -= baseAddress;
371  type[1] = MPI_FLOAT;
372 
373  length[2] = Kinematics::kMaxParticles;
374  MPI_Address( &kinStruct.e, &loc[2] );
375  loc[2] -= baseAddress;
376  type[2] = MPI_FLOAT;
377 
378  length[3] = Kinematics::kMaxParticles;
379  MPI_Address( &kinStruct.px, &loc[3] );
380  loc[3] -= baseAddress;
381  type[3] = MPI_FLOAT;
382 
383  length[4] = Kinematics::kMaxParticles;
384  MPI_Address( &kinStruct.py, &loc[4] );
385  loc[4] -= baseAddress;
386  type[4] = MPI_FLOAT;
387 
388  length[5] = Kinematics::kMaxParticles;
389  MPI_Address( &kinStruct.pz, &loc[5] );
390  loc[5] -= baseAddress;
391  type[5] = MPI_FLOAT;
392 
393  MPI_Type_struct( 6, length, loc, type, &MPI_KinStruct );
394  MPI_Type_commit( &MPI_KinStruct );
395 }
396 
397 #endif
virtual bool isDefault() const
Definition: DataReaderMPI.h:93
virtual ~DataReaderMPI()
void resetSource()
unsigned int numEvents() const
float px[Kinematics::kMaxParticles]
Definition: DataReaderMPI.h:19
#define NULL
Definition: URtypes.h:72
float weight
Definition: DataReaderMPI.h:17
float e[Kinematics::kMaxParticles]
Definition: DataReaderMPI.h:18
float py[Kinematics::kMaxParticles]
Definition: DataReaderMPI.h:20
Kinematics * getEvent()
virtual DataReader * clone() const
Definition: DataReaderMPI.h:80
const vector< TLorentzVector > & particleList() const
Definition: Kinematics.cc:47
float pz[Kinematics::kMaxParticles]
Definition: DataReaderMPI.h:21
virtual DataReader * newDataReader(const vector< string > &args) const
Definition: DataReaderMPI.h:72
float weight() const
Definition: Kinematics.h:187
virtual vector< string > arguments() const
Definition: DataReaderMPI.h:87