Navigation

Main Page
Download
Support
Installation
Tutorial
Examples
Reference Manual
   Version 1.0.6dev
      Class Index
      File List
   Version 1.0.6
   Version 1.0.5new_solver
   Version 1.0.5dev
   Version 1.0.5b
   Version 1.0.4dev
   Version 1.0.4
Publications


Hosted by Get Ion Beam Simulator at SourceForge.net. Fast, secure and Free Open Source software downloads
scheduler.hpp
Go to the documentation of this file.
1 
5 /* Copyright (c) 2005-2009,2011 Taneli Kalvas, Jan SarĂ©n. All rights reserved.
6  *
7  * You can redistribute this software and/or modify it under the terms
8  * of the GNU General Public License as published by the Free Software
9  * Foundation; either version 2 of the License, or (at your option)
10  * any later version.
11  *
12  * This library is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this library (file "COPYING" included in the package);
19  * if not, write to the Free Software Foundation, Inc., 51 Franklin
20  * Street, Fifth Floor, Boston, MA 02110-1301 USA
21  *
22  * If you have questions about your rights to use or distribute this
23  * software, please contact Berkeley Lab's Technology Transfer
24  * Department at TTD@lbl.gov. Other questions, comments and bug
25  * reports should be sent directly to the author via email at
26  * taneli.kalvas@jyu.fi.
27  *
28  * NOTICE. This software was developed under partial funding from the
29  * U.S. Department of Energy. As such, the U.S. Government has been
30  * granted for itself and others acting on its behalf a paid-up,
31  * nonexclusive, irrevocable, worldwide license in the Software to
32  * reproduce, prepare derivative works, and perform publicly and
33  * display publicly. Beginning five (5) years after the date
34  * permission to assert copyright is obtained from the U.S. Department
35  * of Energy, and subject to any subsequent five (5) year renewals,
36  * the U.S. Government is granted for itself and others acting on its
37  * behalf a paid-up, nonexclusive, irrevocable, worldwide license in
38  * the Software to reproduce, prepare derivative works, distribute
39  * copies to the public, perform publicly and display publicly, and to
40  * permit others to do so.
41  */
42 
43 #ifndef SCHEDULER_HPP
44 #define SCHEDULER_HPP 1
45 
46 
47 #include <pthread.h>
48 #include <stdint.h>
49 #include <iostream>
50 #include <vector>
51 #include <deque>
52 #include <time.h>
53 #include "comptime.hpp"
54 
55 
56 //#define SCHEDULER_DEBUG 1
57 
58 
59 //pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER;
60 
61 
86 template <class Solv, class Prob, class Err>
87 class Scheduler {
88 
89  class Consumer {
90 
91  /*
92  enum consumer_status_e {
93  CONSUMER_CREATED = 0,
94  CONSUMER_RUNNING,
95  CONSUMER_FINISHED
96  };
97  */
98 
99  //pthread_mutex_t _mutex; //!< \brief Mutex for active check
100  pthread_t _thread;
101  Solv *_solver;
102  Scheduler *_scheduler;
103  //struct timeval _t0;
104  //std::vector<struct timeval> _t;
105 
106  void *consumer_main( void ) {
107  //struct timeval t;
108 
109 #ifdef SCHEDULER_DEBUG
110  std::cout << "Consumer main entrance\n";
111 #endif
112  //pthread_mutex_lock( &_mutex );
113  //_status = CONSUMER_RUNNING;
114  //pthread_mutex_unlock( &_mutex );
115 
116  Prob *p;
117  uint32_t pi;
118  while( (p = _scheduler->get_next_problem( pi )) ) {
119  try {
120  //gettimeofday( &t, NULL );
121  //_t.push_back( t );
122  (*_solver)( p, pi );
123  //gettimeofday( &t, NULL );
124  //_t.push_back( t );
125  } catch( Err e ) {
126  //std::cout << "on_error\n";
127  // Handle error and stop solving
128  _scheduler->on_error( e, pi );
129  break;
130  };
131  _scheduler->inc_solved_problem();
132  }
133 
134 #ifdef SCHEDULER_DEBUG
135  std::cout << "Exiting consumer\n";
136 #endif
137  //pthread_mutex_lock( &_mutex );
138  //_status = CONSUMER_FINISHED;
139  //pthread_mutex_unlock( &_mutex );
140  return( NULL );
141  }
142 
143  public:
144 
145  static void *consumer_entry( void *data ) {
146  Consumer *consumer = (Consumer *)data;
147  return( consumer->consumer_main() );
148  }
149 
150  Consumer( Solv *solver, Scheduler *scheduler ) : _solver(solver), _scheduler(scheduler) {
151 
152  //pthread_mutex_init( &_mutex, NULL );
153 #ifdef SCHEDULER_DEBUG
154  std::cout << "Consumer constructor\n";
155 #endif
156  //gettimeofday( &_t0, NULL );
157  }
158 
159  ~Consumer() {
160  //pthread_mutex_lock( &cout_mutex );
161 #ifdef SCHEDULER_DEBUG
162  std::cout << "Consumer destructor\n";
163 #endif
164  //for( size_t a = 0; a < _t.size(); a++ ) {
165  //std::cout << (_t[a].tv_sec-_t0.tv_sec) +
166  //(_t[a].tv_usec-_t0.tv_usec)/1e6 << "\n";
167  //a++;
168  //std::cout << (_t[a].tv_sec-_t0.tv_sec) +
169  //(_t[a].tv_usec-_t0.tv_usec)/1e6 << "\n\n\n";
170  //}
171  //pthread_mutex_unlock( &cout_mutex );
172  }
173 
174  void run( void ) {
175  pthread_create( &_thread, NULL, consumer_entry, (void *)this );
176  }
177 
178  void join( void ) {
179 
180 #ifdef SCHEDULER_DEBUG
181  std::cout << "Consumer join\n";
182 #endif
183  //pthread_mutex_lock( &_mutex );
184  //if( _status == CONSUMER_FINISHED ) {
185  //pthread_mutex_unlock( &_mutex );
186  //return;
187  //} else if( _status == CONSUMER_CREATED ) {
188  //
189  //}
190  //pthread_mutex_unlock( &_mutex );
191  pthread_join( _thread, NULL );
192  }
193 
194  };
195 
196 
197  pthread_mutex_t _mutex;
198  pthread_cond_t _scheduler_cond;
199  pthread_cond_t _producer_cond;
200  pthread_cond_t _consumer_cond;
201 
202  //size_t _problems_in_c; //!< \brief Total problems in count
203  //size_t _problems_err_c; //!< \brief Total error problems out count
204  //std::deque<Prob*> _problems_out; //!< \brief Problems already solved
205 
206  uint32_t _read_c;
207  uint32_t _solved_c;
208  std::vector<Prob *> &_problems;
209 
210  pthread_t _scheduler_thread;
211  std::vector<Consumer *> _consumers;
212 
213  bool _join;
214  bool _running;
215  bool _error;
216  bool _done;
217  bool _finish;
218  std::vector<Err> _err;
219  std::vector<int32_t> _eprob;
220 
221 
228  void on_error( Err &e, uint32_t pi ) {
229  pthread_mutex_lock( &_mutex );
230  _err.push_back( e );
231  _eprob.push_back( pi );
232  _error = true;
233  pthread_cond_broadcast( &_scheduler_cond );
234  pthread_mutex_unlock( &_mutex );
235  }
236 
237 
240  void inc_solved_problem( void ) {
241  pthread_mutex_lock( &_mutex );
242  _solved_c++;
243  pthread_mutex_unlock( &_mutex );
244  }
245 
252  Prob *get_next_problem( uint32_t &pi ) {
253 #ifdef SCHEDULER_DEBUG
254  std::cout << "get_next_problem()\n";
255 #endif
256  pthread_mutex_lock( &_mutex );
257 
258  if( _done || _error ) {
259  pthread_mutex_unlock( &_mutex );
260 #ifdef SCHEDULER_DEBUG
261  std::cout << "get_next_problem(): Returning NULL\n";
262 #endif
263  pi = -1;
264  return( NULL );
265  }
266 
267  if( _problems.size() == _read_c ) {
268 #ifdef SCHEDULER_DEBUG
269  std::cout << "get_next_problem(): No problem to return... waiting\n";
270 #endif
271  // Signal producer that problems are spent
272  pthread_cond_signal( &_scheduler_cond );
273  while( _problems.size() == _read_c ) {
274  // Wait for new problems
275  pthread_cond_wait( &_consumer_cond, &_mutex );
276  if( _done || _error ) {
277  pthread_mutex_unlock( &_mutex );
278 #ifdef SCHEDULER_DEBUG
279  std::cout << "get_next_problem(): Returning NULL\n";
280 #endif
281  pi = -1;
282  return( NULL );
283  }
284  }
285  }
286 
287  // Return next problem
288  pi = _read_c++;
289  Prob *ret = _problems[pi];
290 
291 #ifdef SCHEDULER_DEBUG
292  std::cout << "get_next_problem(): Returning problem " << pi << "\n";
293 #endif
294 
295  pthread_mutex_unlock( &_mutex );
296  return( ret );
297  }
298 
299 
302  void *scheduler_main( void ) {
303 
304 #ifdef SCHEDULER_DEBUG
305  std::cout << "Running scheduler_main()\n";
306 #endif
307 
308  // Start consumer threads
309  for( size_t a = 0; a < _consumers.size(); a++ )
310  _consumers[a]->run();
311 
312  pthread_mutex_lock( &_mutex );
313 
314  while( 1 ) {
315  // Wait until all consumers are done with all problems or error occurs
316  while( !(_problems.size() == _solved_c || _done || _error) ) {
317  //std::cout << "scheduler_main(): scheduler_cond wait 1\n";
318  pthread_cond_wait( &_scheduler_cond, &_mutex );
319  }
320 
321  if( (_finish && _problems.size() == _solved_c) || _done || _error )
322  break;
323 
324  // Problems temporarily done
325  pthread_cond_wait( &_scheduler_cond, &_mutex );
326  //std::cout << "scheduler_main(): prob_in = " << _problems_in_c
327  //<< " prob_out = " << _problems_out_c << "\n";
328  //std::cout << "scheduler_main(): scheduler_cond wait 2\n";
329 
330  // Signal consumers to wake up
331  pthread_cond_broadcast( &_consumer_cond );
332  }
333 
334  // Broadcast: done
335  _done = true;
336  _running = false;
337  pthread_cond_broadcast( &_consumer_cond );
338  pthread_mutex_unlock( &_mutex );
339 
340  // Join all consumers
341  //std::cout << "scheduler_main(): Scheduler waiting in join\n";
342  for( size_t a = 0; a < _consumers.size(); a++ )
343  _consumers[a]->join();
344 
345  pthread_cond_broadcast( &_producer_cond );
346  //std::cout << "scheduler_main(): Exiting scheduler\n";
347  return( NULL );
348  }
349 
350 
353  static void *scheduler_entry( void *data ) {
354  Scheduler *scheduler = (Scheduler *)data;
355  return( scheduler->scheduler_main() );
356  }
357 
360  Scheduler( const Scheduler &s ) {}
361 
364  const Scheduler &operator=( const Scheduler &s ) {
365  return( *this );
366  }
367 
368 public:
369 
370 
375  Scheduler( std::vector<Prob *> &prob )
376  : _read_c(0), _solved_c(0), _problems(prob), _join(false), _running(false),
377  _error(false), _done(false), _finish(false) {
378 
379  // Initialize pthread objects
380  pthread_mutex_init( &_mutex, NULL );
381  pthread_cond_init( &_scheduler_cond, NULL );
382  pthread_cond_init( &_consumer_cond, NULL );
383  pthread_cond_init( &_producer_cond, NULL );
384  }
385 
386 
390 
391  // Force exit
392  _done = true;
393  finish();
394 
395  pthread_mutex_destroy( &_mutex );
396  pthread_cond_destroy( &_scheduler_cond );
397  pthread_cond_destroy( &_consumer_cond );
398  pthread_cond_destroy( &_producer_cond );
399  }
400 
401 
404  bool is_error( void ) {
405  // No mutex needed for one bit read
406  return( _error );
407  }
408 
409 
412  bool is_running( void ) {
413  // No mutex needed for one bit read
414  return( _running );
415  }
416 
417 
420  uint32_t get_solved_count( void ) {
421  pthread_mutex_lock( &_mutex );
422  uint32_t ret = _solved_c;
423  pthread_mutex_unlock( &_mutex );
424  return( ret );
425  }
426 
427 
430  uint32_t get_problem_count( void ) {
431  pthread_mutex_lock( &_mutex );
432  uint32_t ret = _problems.size();
433  pthread_mutex_unlock( &_mutex );
434  return( ret );
435  }
436 
437 
446  template <class Cont1, class Cont2>
447  size_t get_errors( Cont1 &e, Cont2 &pi ) {
448  pthread_mutex_lock( &_mutex );
449  size_t r = _err.size();
450  for( size_t a = 0; a < _err.size(); a++ ) {
451  e.push_back( _err[a] );
452  pi.push_back( _eprob[a] );
453  }
454  _err.clear();
455  _eprob.clear();
456  pthread_mutex_unlock( &_mutex );
457  return( r );
458  }
459 
460 
467  void run( std::vector<Solv *> solv ) {
468 
469  // Do nothing if already running
470  if( _running )
471  return;
472 
473  // Create consumer threads
474  for( size_t a = 0; a < solv.size(); a++ )
475  _consumers.push_back( new Consumer( solv[a], this ) );
476 
477  _read_c = 0;
478  _solved_c = 0;
479  _join = true;
480  _running = true;
481  _error = false;
482  _done = false;
483  _finish = false;
484  _err.clear();
485  _eprob.clear();
486 
487  // Create scheduler thread
488  pthread_create( &_scheduler_thread, NULL, scheduler_entry, (void *)this );
489  }
490 
491 
494  void lock_mutex( void ) {
495 
496  pthread_mutex_lock( &_mutex );
497  }
498 
499 
502  void unlock_mutex( void ) {
503 
504  pthread_cond_broadcast( &_scheduler_cond );
505  pthread_mutex_unlock( &_mutex );
506  }
507 
508 
516  bool force_exit( void ) {
517 
518  _done = true;
519  return( finish() );
520  }
521 
528  bool wait_finish( void ) {
529 
530  pthread_mutex_lock( &_mutex );
531  if( _running ) {
532  _finish = true;
533  pthread_cond_broadcast( &_scheduler_cond );
534 
535  struct timespec ts;
536  ibs_clock_gettime( CLOCK_REALTIME, &ts );
537  ts.tv_sec += 1;
538  int rc = pthread_cond_timedwait( &_producer_cond, &_mutex, &ts );
539  if( rc == ETIMEDOUT ) {
540  pthread_mutex_unlock( &_mutex );
541  return( false );
542  }
543  }
544  pthread_mutex_unlock( &_mutex );
545  return( true );
546  }
547 
556  bool finish( void ) {
557 
558  pthread_mutex_lock( &_mutex );
559  if( _running ) {
560  _finish = true;
561  //std::cout << "finish(): scheduler_cond broadcast\n";
562  pthread_cond_broadcast( &_scheduler_cond );
563 
564  //std::cout << "finish(): producer_cond wait\n";
565  pthread_cond_wait( &_producer_cond, &_mutex );
566  }
567  pthread_mutex_unlock( &_mutex );
568 
569  if( _join ) {
570  // Delete consumer threads
571  for( size_t a = 0; a < _consumers.size(); a++ )
572  delete _consumers[a];
573  _consumers.clear();
574 
575  pthread_join( _scheduler_thread, NULL );
576  _join = false;
577  }
578 
579  if( _error )
580  return( false );
581 
582  return( true );
583  }
584 
585  friend class Consumer;
586 };
587 
588 
589 
590 #endif
591 
Scheduler class for implementing consumer-producer threading.
Definition: scheduler.hpp:87
bool is_running(void)
Return true if scheduler is running.
Definition: scheduler.hpp:412
Scheduler(std::vector< Prob * > &prob)
Constructor.
Definition: scheduler.hpp:375
size_t get_errors(Cont1 &e, Cont2 &pi)
Fetch errors and indices of corresponding problems.
Definition: scheduler.hpp:447
void unlock_mutex(void)
Unlock mutex.
Definition: scheduler.hpp:502
bool force_exit(void)
Force exit from scheduler.
Definition: scheduler.hpp:516
uint32_t get_problem_count(void)
Return number of problems.
Definition: scheduler.hpp:430
uint32_t get_solved_count(void)
Return number of solved problems.
Definition: scheduler.hpp:420
bool is_error(void)
Return true on errors.
Definition: scheduler.hpp:404
bool finish(void)
Wait for all problems to be solved.
Definition: scheduler.hpp:556
void lock_mutex(void)
Lock mutex for adding problems.
Definition: scheduler.hpp:494
~Scheduler()
Destructor.
Definition: scheduler.hpp:389
void run(std::vector< Solv * > solv)
Run threads with N solvers.
Definition: scheduler.hpp:467
bool wait_finish(void)
Call for solvers to finish when problems are solved.
Definition: scheduler.hpp:528


Reference manual for Ion Beam Simulator 1.0.6dev
Generated by Doxygen 1.9.1 on Thu Sep 11 2025 09:37:24.