44 #define SCHEDULER_HPP 1
53 #include "comptime.hpp"
86 template <
class Solv,
class Prob,
class Err>
106 void *consumer_main(
void ) {
109 #ifdef SCHEDULER_DEBUG
110 std::cout <<
"Consumer main entrance\n";
118 while( (p = _scheduler->get_next_problem( pi )) ) {
128 _scheduler->on_error( e, pi );
131 _scheduler->inc_solved_problem();
134 #ifdef SCHEDULER_DEBUG
135 std::cout <<
"Exiting consumer\n";
145 static void *consumer_entry(
void *data ) {
146 Consumer *consumer = (Consumer *)data;
147 return( consumer->consumer_main() );
150 Consumer( Solv *solver,
Scheduler *scheduler ) : _solver(solver), _scheduler(scheduler) {
153 #ifdef SCHEDULER_DEBUG
154 std::cout <<
"Consumer constructor\n";
161 #ifdef SCHEDULER_DEBUG
162 std::cout <<
"Consumer destructor\n";
175 pthread_create( &_thread, NULL, consumer_entry, (
void *)
this );
180 #ifdef SCHEDULER_DEBUG
181 std::cout <<
"Consumer join\n";
191 pthread_join( _thread, NULL );
197 pthread_mutex_t _mutex;
198 pthread_cond_t _scheduler_cond;
199 pthread_cond_t _producer_cond;
200 pthread_cond_t _consumer_cond;
208 std::vector<Prob *> &_problems;
210 pthread_t _scheduler_thread;
211 std::vector<Consumer *> _consumers;
218 std::vector<Err> _err;
219 std::vector<int32_t> _eprob;
228 void on_error( Err &e, uint32_t pi ) {
229 pthread_mutex_lock( &_mutex );
231 _eprob.push_back( pi );
233 pthread_cond_broadcast( &_scheduler_cond );
234 pthread_mutex_unlock( &_mutex );
240 void inc_solved_problem(
void ) {
241 pthread_mutex_lock( &_mutex );
243 pthread_mutex_unlock( &_mutex );
252 Prob *get_next_problem( uint32_t &pi ) {
253 #ifdef SCHEDULER_DEBUG
254 std::cout <<
"get_next_problem()\n";
256 pthread_mutex_lock( &_mutex );
258 if( _done || _error ) {
259 pthread_mutex_unlock( &_mutex );
260 #ifdef SCHEDULER_DEBUG
261 std::cout <<
"get_next_problem(): Returning NULL\n";
267 if( _problems.size() == _read_c ) {
268 #ifdef SCHEDULER_DEBUG
269 std::cout <<
"get_next_problem(): No problem to return... waiting\n";
272 pthread_cond_signal( &_scheduler_cond );
273 while( _problems.size() == _read_c ) {
275 pthread_cond_wait( &_consumer_cond, &_mutex );
276 if( _done || _error ) {
277 pthread_mutex_unlock( &_mutex );
278 #ifdef SCHEDULER_DEBUG
279 std::cout <<
"get_next_problem(): Returning NULL\n";
289 Prob *ret = _problems[pi];
291 #ifdef SCHEDULER_DEBUG
292 std::cout <<
"get_next_problem(): Returning problem " << pi <<
"\n";
295 pthread_mutex_unlock( &_mutex );
302 void *scheduler_main(
void ) {
304 #ifdef SCHEDULER_DEBUG
305 std::cout <<
"Running scheduler_main()\n";
309 for(
size_t a = 0; a < _consumers.size(); a++ )
310 _consumers[a]->
run();
312 pthread_mutex_lock( &_mutex );
316 while( !(_problems.size() == _solved_c || _done || _error) ) {
318 pthread_cond_wait( &_scheduler_cond, &_mutex );
321 if( (_finish && _problems.size() == _solved_c) || _done || _error )
325 pthread_cond_wait( &_scheduler_cond, &_mutex );
331 pthread_cond_broadcast( &_consumer_cond );
337 pthread_cond_broadcast( &_consumer_cond );
338 pthread_mutex_unlock( &_mutex );
342 for(
size_t a = 0; a < _consumers.size(); a++ )
343 _consumers[a]->join();
345 pthread_cond_broadcast( &_producer_cond );
353 static void *scheduler_entry(
void *data ) {
355 return( scheduler->scheduler_main() );
376 : _read_c(0), _solved_c(0), _problems(prob), _join(false), _running(false),
377 _error(false), _done(false), _finish(false) {
380 pthread_mutex_init( &_mutex, NULL );
381 pthread_cond_init( &_scheduler_cond, NULL );
382 pthread_cond_init( &_consumer_cond, NULL );
383 pthread_cond_init( &_producer_cond, NULL );
395 pthread_mutex_destroy( &_mutex );
396 pthread_cond_destroy( &_scheduler_cond );
397 pthread_cond_destroy( &_consumer_cond );
398 pthread_cond_destroy( &_producer_cond );
421 pthread_mutex_lock( &_mutex );
422 uint32_t ret = _solved_c;
423 pthread_mutex_unlock( &_mutex );
431 pthread_mutex_lock( &_mutex );
432 uint32_t ret = _problems.size();
433 pthread_mutex_unlock( &_mutex );
446 template <
class Cont1,
class Cont2>
448 pthread_mutex_lock( &_mutex );
449 size_t r = _err.size();
450 for(
size_t a = 0; a < _err.size(); a++ ) {
451 e.push_back( _err[a] );
452 pi.push_back( _eprob[a] );
456 pthread_mutex_unlock( &_mutex );
467 void run( std::vector<Solv *> solv ) {
474 for(
size_t a = 0; a < solv.size(); a++ )
475 _consumers.push_back(
new Consumer( solv[a],
this ) );
488 pthread_create( &_scheduler_thread, NULL, scheduler_entry, (
void *)
this );
496 pthread_mutex_lock( &_mutex );
504 pthread_cond_broadcast( &_scheduler_cond );
505 pthread_mutex_unlock( &_mutex );
530 pthread_mutex_lock( &_mutex );
533 pthread_cond_broadcast( &_scheduler_cond );
536 ibs_clock_gettime( CLOCK_REALTIME, &ts );
538 int rc = pthread_cond_timedwait( &_producer_cond, &_mutex, &ts );
539 if( rc == ETIMEDOUT ) {
540 pthread_mutex_unlock( &_mutex );
544 pthread_mutex_unlock( &_mutex );
558 pthread_mutex_lock( &_mutex );
562 pthread_cond_broadcast( &_scheduler_cond );
565 pthread_cond_wait( &_producer_cond, &_mutex );
567 pthread_mutex_unlock( &_mutex );
571 for(
size_t a = 0; a < _consumers.size(); a++ )
572 delete _consumers[a];
575 pthread_join( _scheduler_thread, NULL );
585 friend class Consumer;
Scheduler class for implementing consumer-producer threading.
Definition: scheduler.hpp:87
bool is_running(void)
Return true if scheduler is running.
Definition: scheduler.hpp:412
Scheduler(std::vector< Prob * > &prob)
Constructor.
Definition: scheduler.hpp:375
size_t get_errors(Cont1 &e, Cont2 &pi)
Fetch errors and indices of corresponding problems.
Definition: scheduler.hpp:447
void unlock_mutex(void)
Unlock mutex.
Definition: scheduler.hpp:502
bool force_exit(void)
Force exit from scheduler.
Definition: scheduler.hpp:516
uint32_t get_problem_count(void)
Return number of problems.
Definition: scheduler.hpp:430
uint32_t get_solved_count(void)
Return number of solved problems.
Definition: scheduler.hpp:420
bool is_error(void)
Return true on errors.
Definition: scheduler.hpp:404
bool finish(void)
Wait for all problems to be solved.
Definition: scheduler.hpp:556
void lock_mutex(void)
Lock mutex for adding problems.
Definition: scheduler.hpp:494
~Scheduler()
Destructor.
Definition: scheduler.hpp:389
void run(std::vector< Solv * > solv)
Run threads with N solvers.
Definition: scheduler.hpp:467
bool wait_finish(void)
Call for solvers to finish when problems are solved.
Definition: scheduler.hpp:528