scheduler.hpp
Go to the documentation of this file.
00001 00005 /* Copyright (c) 2005-2009,2011 Taneli Kalvas, Jan Sarén. All rights reserved. 00006 * 00007 * You can redistribute this software and/or modify it under the terms 00008 * of the GNU General Public License as published by the Free Software 00009 * Foundation; either version 2 of the License, or (at your option) 00010 * any later version. 00011 * 00012 * This library is distributed in the hope that it will be useful, but 00013 * WITHOUT ANY WARRANTY; without even the implied warranty of 00014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00015 * General Public License for more details. 00016 * 00017 * You should have received a copy of the GNU General Public License 00018 * along with this library (file "COPYING" included in the package); 00019 * if not, write to the Free Software Foundation, Inc., 51 Franklin 00020 * Street, Fifth Floor, Boston, MA 02110-1301 USA 00021 * 00022 * If you have questions about your rights to use or distribute this 00023 * software, please contact Berkeley Lab's Technology Transfer 00024 * Department at TTD@lbl.gov. Other questions, comments and bug 00025 * reports should be sent directly to the author via email at 00026 * taneli.kalvas@jyu.fi. 00027 * 00028 * NOTICE. This software was developed under partial funding from the 00029 * U.S. Department of Energy. As such, the U.S. Government has been 00030 * granted for itself and others acting on its behalf a paid-up, 00031 * nonexclusive, irrevocable, worldwide license in the Software to 00032 * reproduce, prepare derivative works, and perform publicly and 00033 * display publicly. Beginning five (5) years after the date 00034 * permission to assert copyright is obtained from the U.S. Department 00035 * of Energy, and subject to any subsequent five (5) year renewals, 00036 * the U.S. Government is granted for itself and others acting on its 00037 * behalf a paid-up, nonexclusive, irrevocable, worldwide license in 00038 * the Software to reproduce, prepare derivative works, distribute 00039 * copies to the public, perform publicly and display publicly, and to 00040 * permit others to do so. 00041 */ 00042 00043 #ifndef SCHEDULER_HPP 00044 #define SCHEDULER_HPP 1 00045 00046 00047 #include <pthread.h> 00048 #include <stdint.h> 00049 #include <iostream> 00050 #include <vector> 00051 #include <deque> 00052 #include <time.h> 00053 #include "comptime.hpp" 00054 00055 00056 //#define SCHEDULER_DEBUG 1 00057 00058 00059 //pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER; 00060 00061 00086 template <class Solv, class Prob, class Err> 00087 class Scheduler { 00088 00089 class Consumer { 00090 00091 /* 00092 enum consumer_status_e { 00093 CONSUMER_CREATED = 0, 00094 CONSUMER_RUNNING, 00095 CONSUMER_FINISHED 00096 }; 00097 */ 00098 00099 //pthread_mutex_t _mutex; //!< \brief Mutex for active check 00100 pthread_t _thread; 00101 Solv *_solver; 00102 Scheduler *_scheduler; 00103 //struct timeval _t0; 00104 //std::vector<struct timeval> _t; 00105 00106 void *consumer_main( void ) { 00107 //struct timeval t; 00108 00109 #ifdef SCHEDULER_DEBUG 00110 std::cout << "Consumer main entrance\n"; 00111 #endif 00112 //pthread_mutex_lock( &_mutex ); 00113 //_status = CONSUMER_RUNNING; 00114 //pthread_mutex_unlock( &_mutex ); 00115 00116 Prob *p; 00117 uint32_t pi; 00118 while( (p = _scheduler->get_next_problem( pi )) ) { 00119 try { 00120 //gettimeofday( &t, NULL ); 00121 //_t.push_back( t ); 00122 (*_solver)( p, pi ); 00123 //gettimeofday( &t, NULL ); 00124 //_t.push_back( t ); 00125 } catch( Err e ) { 00126 //std::cout << "on_error\n"; 00127 // Handle error and stop solving 00128 _scheduler->on_error( e, pi ); 00129 break; 00130 }; 00131 _scheduler->inc_solved_problem(); 00132 } 00133 00134 #ifdef SCHEDULER_DEBUG 00135 std::cout << "Exiting consumer\n"; 00136 #endif 00137 //pthread_mutex_lock( &_mutex ); 00138 //_status = CONSUMER_FINISHED; 00139 //pthread_mutex_unlock( &_mutex ); 00140 return( NULL ); 00141 } 00142 00143 public: 00144 00145 static void *consumer_entry( void *data ) { 00146 Consumer *consumer = (Consumer *)data; 00147 return( consumer->consumer_main() ); 00148 } 00149 00150 Consumer( Solv *solver, Scheduler *scheduler ) : _solver(solver), _scheduler(scheduler) { 00151 00152 //pthread_mutex_init( &_mutex, NULL ); 00153 #ifdef SCHEDULER_DEBUG 00154 std::cout << "Consumer constructor\n"; 00155 #endif 00156 //gettimeofday( &_t0, NULL ); 00157 } 00158 00159 ~Consumer() { 00160 //pthread_mutex_lock( &cout_mutex ); 00161 #ifdef SCHEDULER_DEBUG 00162 std::cout << "Consumer destructor\n"; 00163 #endif 00164 //for( size_t a = 0; a < _t.size(); a++ ) { 00165 //std::cout << (_t[a].tv_sec-_t0.tv_sec) + 00166 //(_t[a].tv_usec-_t0.tv_usec)/1e6 << "\n"; 00167 //a++; 00168 //std::cout << (_t[a].tv_sec-_t0.tv_sec) + 00169 //(_t[a].tv_usec-_t0.tv_usec)/1e6 << "\n\n\n"; 00170 //} 00171 //pthread_mutex_unlock( &cout_mutex ); 00172 } 00173 00174 void run( void ) { 00175 pthread_create( &_thread, NULL, consumer_entry, (void *)this ); 00176 } 00177 00178 void join( void ) { 00179 00180 #ifdef SCHEDULER_DEBUG 00181 std::cout << "Consumer join\n"; 00182 #endif 00183 //pthread_mutex_lock( &_mutex ); 00184 //if( _status == CONSUMER_FINISHED ) { 00185 //pthread_mutex_unlock( &_mutex ); 00186 //return; 00187 //} else if( _status == CONSUMER_CREATED ) { 00188 // 00189 //} 00190 //pthread_mutex_unlock( &_mutex ); 00191 pthread_join( _thread, NULL ); 00192 } 00193 00194 }; 00195 00196 00197 pthread_mutex_t _mutex; 00198 pthread_cond_t _scheduler_cond; 00199 pthread_cond_t _producer_cond; 00200 pthread_cond_t _consumer_cond; 00201 00202 //size_t _problems_in_c; //!< \brief Total problems in count 00203 //size_t _problems_err_c; //!< \brief Total error problems out count 00204 //std::deque<Prob*> _problems_out; //!< \brief Problems already solved 00205 00206 uint32_t _read_c; 00207 uint32_t _solved_c; 00208 std::vector<Prob *> &_problems; 00209 00210 pthread_t _scheduler_thread; 00211 std::vector<Consumer *> _consumers; 00212 00213 bool _join; 00214 bool _running; 00215 bool _error; 00216 bool _done; 00217 bool _finish; 00218 std::vector<Err> _err; 00219 std::vector<int32_t> _eprob; 00220 00221 00228 void on_error( Err &e, uint32_t pi ) { 00229 pthread_mutex_lock( &_mutex ); 00230 _err.push_back( e ); 00231 _eprob.push_back( pi ); 00232 _error = true; 00233 pthread_cond_broadcast( &_scheduler_cond ); 00234 pthread_mutex_unlock( &_mutex ); 00235 } 00236 00237 00240 void inc_solved_problem( void ) { 00241 pthread_mutex_lock( &_mutex ); 00242 _solved_c++; 00243 pthread_mutex_unlock( &_mutex ); 00244 } 00245 00252 Prob *get_next_problem( uint32_t &pi ) { 00253 #ifdef SCHEDULER_DEBUG 00254 std::cout << "get_next_problem()\n"; 00255 #endif 00256 pthread_mutex_lock( &_mutex ); 00257 00258 if( _done || _error ) { 00259 pthread_mutex_unlock( &_mutex ); 00260 #ifdef SCHEDULER_DEBUG 00261 std::cout << "get_next_problem(): Returning NULL\n"; 00262 #endif 00263 pi = -1; 00264 return( NULL ); 00265 } 00266 00267 if( _problems.size() == _read_c ) { 00268 #ifdef SCHEDULER_DEBUG 00269 std::cout << "get_next_problem(): No problem to return... waiting\n"; 00270 #endif 00271 // Signal producer that problems are spent 00272 pthread_cond_signal( &_scheduler_cond ); 00273 while( _problems.size() == _read_c ) { 00274 // Wait for new problems 00275 pthread_cond_wait( &_consumer_cond, &_mutex ); 00276 if( _done || _error ) { 00277 pthread_mutex_unlock( &_mutex ); 00278 #ifdef SCHEDULER_DEBUG 00279 std::cout << "get_next_problem(): Returning NULL\n"; 00280 #endif 00281 pi = -1; 00282 return( NULL ); 00283 } 00284 } 00285 } 00286 00287 // Return next problem 00288 pi = _read_c++; 00289 Prob *ret = _problems[pi]; 00290 00291 #ifdef SCHEDULER_DEBUG 00292 std::cout << "get_next_problem(): Returning problem " << pi << "\n"; 00293 #endif 00294 00295 pthread_mutex_unlock( &_mutex ); 00296 return( ret ); 00297 } 00298 00299 00302 void *scheduler_main( void ) { 00303 00304 #ifdef SCHEDULER_DEBUG 00305 std::cout << "Running scheduler_main()\n"; 00306 #endif 00307 00308 // Start consumer threads 00309 for( size_t a = 0; a < _consumers.size(); a++ ) 00310 _consumers[a]->run(); 00311 00312 pthread_mutex_lock( &_mutex ); 00313 00314 while( 1 ) { 00315 // Wait until all consumers are done with all problems or error occurs 00316 while( !(_problems.size() == _solved_c || _done || _error) ) { 00317 //std::cout << "scheduler_main(): scheduler_cond wait 1\n"; 00318 pthread_cond_wait( &_scheduler_cond, &_mutex ); 00319 } 00320 00321 if( (_finish && _problems.size() == _solved_c) || _done || _error ) 00322 break; 00323 00324 // Problems temporarily done 00325 pthread_cond_wait( &_scheduler_cond, &_mutex ); 00326 //std::cout << "scheduler_main(): prob_in = " << _problems_in_c 00327 //<< " prob_out = " << _problems_out_c << "\n"; 00328 //std::cout << "scheduler_main(): scheduler_cond wait 2\n"; 00329 00330 // Signal consumers to wake up 00331 pthread_cond_broadcast( &_consumer_cond ); 00332 } 00333 00334 // Broadcast: done 00335 _done = true; 00336 _running = false; 00337 pthread_cond_broadcast( &_consumer_cond ); 00338 pthread_mutex_unlock( &_mutex ); 00339 00340 // Join all consumers 00341 //std::cout << "scheduler_main(): Scheduler waiting in join\n"; 00342 for( size_t a = 0; a < _consumers.size(); a++ ) 00343 _consumers[a]->join(); 00344 00345 pthread_cond_broadcast( &_producer_cond ); 00346 //std::cout << "scheduler_main(): Exiting scheduler\n"; 00347 return( NULL ); 00348 } 00349 00350 00353 static void *scheduler_entry( void *data ) { 00354 Scheduler *scheduler = (Scheduler *)data; 00355 return( scheduler->scheduler_main() ); 00356 } 00357 00358 00359 public: 00360 00361 00366 Scheduler( std::vector<Prob *> &prob ) 00367 : _read_c(0), _solved_c(0), _problems(prob), _join(false), _running(false), 00368 _error(false), _done(false), _finish(false) { 00369 00370 // Initialize pthread objects 00371 pthread_mutex_init( &_mutex, NULL ); 00372 pthread_cond_init( &_scheduler_cond, NULL ); 00373 pthread_cond_init( &_consumer_cond, NULL ); 00374 pthread_cond_init( &_producer_cond, NULL ); 00375 } 00376 00377 00380 ~Scheduler() { 00381 00382 // Force exit 00383 _done = true; 00384 finish(); 00385 00386 pthread_mutex_destroy( &_mutex ); 00387 pthread_cond_destroy( &_scheduler_cond ); 00388 pthread_cond_destroy( &_consumer_cond ); 00389 pthread_cond_destroy( &_producer_cond ); 00390 } 00391 00392 00395 bool is_error( void ) { 00396 // No mutex needed for one bit read 00397 return( _error ); 00398 } 00399 00400 00403 bool is_running( void ) { 00404 // No mutex needed for one bit read 00405 return( _running ); 00406 } 00407 00408 00411 uint32_t get_solved_count( void ) { 00412 pthread_mutex_lock( &_mutex ); 00413 uint32_t ret = _solved_c; 00414 pthread_mutex_unlock( &_mutex ); 00415 return( ret ); 00416 } 00417 00418 00421 uint32_t get_problem_count( void ) { 00422 pthread_mutex_lock( &_mutex ); 00423 uint32_t ret = _problems.size(); 00424 pthread_mutex_unlock( &_mutex ); 00425 return( ret ); 00426 } 00427 00428 00437 template <class Cont1, class Cont2> 00438 size_t get_errors( Cont1 &e, Cont2 &pi ) { 00439 pthread_mutex_lock( &_mutex ); 00440 size_t r = _err.size(); 00441 for( size_t a = 0; a < _err.size(); a++ ) { 00442 e.push_back( _err[a] ); 00443 pi.push_back( _eprob[a] ); 00444 } 00445 _err.clear(); 00446 _eprob.clear(); 00447 pthread_mutex_unlock( &_mutex ); 00448 return( r ); 00449 } 00450 00451 00458 void run( std::vector<Solv *> solv ) { 00459 00460 // Do nothing if already running 00461 if( _running ) 00462 return; 00463 00464 // Create consumer threads 00465 for( size_t a = 0; a < solv.size(); a++ ) 00466 _consumers.push_back( new Consumer( solv[a], this ) ); 00467 00468 _read_c = 0; 00469 _solved_c = 0; 00470 _join = true; 00471 _running = true; 00472 _error = false; 00473 _done = false; 00474 _finish = false; 00475 _err.clear(); 00476 _eprob.clear(); 00477 00478 // Create scheduler thread 00479 pthread_create( &_scheduler_thread, NULL, scheduler_entry, (void *)this ); 00480 } 00481 00482 00485 void lock_mutex( void ) { 00486 00487 pthread_mutex_lock( &_mutex ); 00488 } 00489 00490 00493 void unlock_mutex( void ) { 00494 00495 pthread_cond_broadcast( &_scheduler_cond ); 00496 pthread_mutex_unlock( &_mutex ); 00497 } 00498 00499 00507 bool force_exit( void ) { 00508 00509 _done = true; 00510 return( finish() ); 00511 } 00512 00519 bool wait_finish( void ) { 00520 00521 pthread_mutex_lock( &_mutex ); 00522 if( _running ) { 00523 _finish = true; 00524 pthread_cond_broadcast( &_scheduler_cond ); 00525 00526 struct timespec ts; 00527 ibs_clock_gettime( CLOCK_REALTIME, &ts ); 00528 ts.tv_sec += 1; 00529 int rc = pthread_cond_timedwait( &_producer_cond, &_mutex, &ts ); 00530 if( rc == ETIMEDOUT ) { 00531 pthread_mutex_unlock( &_mutex ); 00532 return( false ); 00533 } 00534 } 00535 pthread_mutex_unlock( &_mutex ); 00536 return( true ); 00537 } 00538 00547 bool finish( void ) { 00548 00549 pthread_mutex_lock( &_mutex ); 00550 if( _running ) { 00551 _finish = true; 00552 //std::cout << "finish(): scheduler_cond broadcast\n"; 00553 pthread_cond_broadcast( &_scheduler_cond ); 00554 00555 //std::cout << "finish(): producer_cond wait\n"; 00556 pthread_cond_wait( &_producer_cond, &_mutex ); 00557 } 00558 pthread_mutex_unlock( &_mutex ); 00559 00560 if( _join ) { 00561 // Delete consumer threads 00562 for( size_t a = 0; a < _consumers.size(); a++ ) 00563 delete _consumers[a]; 00564 _consumers.clear(); 00565 00566 pthread_join( _scheduler_thread, NULL ); 00567 _join = false; 00568 } 00569 00570 if( _error ) 00571 return( false ); 00572 00573 return( true ); 00574 } 00575 00576 friend class Consumer; 00577 }; 00578 00579 00580 00581 #endif 00582