scheduler.hpp
Go to the documentation of this file.
00001 00005 /* Copyright (c) 2005-2009,2011 Taneli Kalvas, Jan Sarén. All rights reserved. 00006 * 00007 * You can redistribute this software and/or modify it under the terms 00008 * of the GNU General Public License as published by the Free Software 00009 * Foundation; either version 2 of the License, or (at your option) 00010 * any later version. 00011 * 00012 * This library is distributed in the hope that it will be useful, but 00013 * WITHOUT ANY WARRANTY; without even the implied warranty of 00014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00015 * General Public License for more details. 00016 * 00017 * You should have received a copy of the GNU General Public License 00018 * along with this library (file "COPYING" included in the package); 00019 * if not, write to the Free Software Foundation, Inc., 51 Franklin 00020 * Street, Fifth Floor, Boston, MA 02110-1301 USA 00021 * 00022 * If you have questions about your rights to use or distribute this 00023 * software, please contact Berkeley Lab's Technology Transfer 00024 * Department at TTD@lbl.gov. Other questions, comments and bug 00025 * reports should be sent directly to the author via email at 00026 * taneli.kalvas@jyu.fi. 00027 * 00028 * NOTICE. This software was developed under partial funding from the 00029 * U.S. Department of Energy. As such, the U.S. Government has been 00030 * granted for itself and others acting on its behalf a paid-up, 00031 * nonexclusive, irrevocable, worldwide license in the Software to 00032 * reproduce, prepare derivative works, and perform publicly and 00033 * display publicly. Beginning five (5) years after the date 00034 * permission to assert copyright is obtained from the U.S. Department 00035 * of Energy, and subject to any subsequent five (5) year renewals, 00036 * the U.S. Government is granted for itself and others acting on its 00037 * behalf a paid-up, nonexclusive, irrevocable, worldwide license in 00038 * the Software to reproduce, prepare derivative works, distribute 00039 * copies to the public, perform publicly and display publicly, and to 00040 * permit others to do so. 00041 */ 00042 00043 #ifndef SCHEDULER_HPP 00044 #define SCHEDULER_HPP 1 00045 00046 00047 #include <pthread.h> 00048 #include <stdint.h> 00049 #include <iostream> 00050 #include <vector> 00051 #include <deque> 00052 //#include <sys/time.h> 00053 00054 00055 //#define SCHEDULER_DEBUG 1 00056 00057 00058 //pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER; 00059 00060 00085 template <class Solv, class Prob, class Err> 00086 class Scheduler { 00087 00088 class Consumer { 00089 00090 /* 00091 enum consumer_status_e { 00092 CONSUMER_CREATED = 0, 00093 CONSUMER_RUNNING, 00094 CONSUMER_FINISHED 00095 }; 00096 */ 00097 00098 //pthread_mutex_t _mutex; //!< \brief Mutex for active check 00099 pthread_t _thread; 00100 Solv *_solver; 00101 Scheduler *_scheduler; 00102 //struct timeval _t0; 00103 //std::vector<struct timeval> _t; 00104 00105 void *consumer_main( void ) { 00106 //struct timeval t; 00107 00108 #ifdef SCHEDULER_DEBUG 00109 std::cout << "Consumer main entrance\n"; 00110 #endif 00111 //pthread_mutex_lock( &_mutex ); 00112 //_status = CONSUMER_RUNNING; 00113 //pthread_mutex_unlock( &_mutex ); 00114 00115 Prob *p; 00116 uint32_t pi; 00117 while( (p = _scheduler->get_next_problem( pi )) ) { 00118 try { 00119 //gettimeofday( &t, NULL ); 00120 //_t.push_back( t ); 00121 (*_solver)( p, pi ); 00122 //gettimeofday( &t, NULL ); 00123 //_t.push_back( t ); 00124 } catch( Err e ) { 00125 //std::cout << "on_error\n"; 00126 // Handle error and stop solving 00127 _scheduler->on_error( e, pi ); 00128 break; 00129 }; 00130 _scheduler->inc_solved_problem(); 00131 } 00132 00133 #ifdef SCHEDULER_DEBUG 00134 std::cout << "Exiting consumer\n"; 00135 #endif 00136 //pthread_mutex_lock( &_mutex ); 00137 //_status = CONSUMER_FINISHED; 00138 //pthread_mutex_unlock( &_mutex ); 00139 return( NULL ); 00140 } 00141 00142 public: 00143 00144 static void *consumer_entry( void *data ) { 00145 Consumer *consumer = (Consumer *)data; 00146 return( consumer->consumer_main() ); 00147 } 00148 00149 Consumer( Solv *solver, Scheduler *scheduler ) : _solver(solver), _scheduler(scheduler) { 00150 00151 //pthread_mutex_init( &_mutex, NULL ); 00152 #ifdef SCHEDULER_DEBUG 00153 std::cout << "Consumer constructor\n"; 00154 #endif 00155 //gettimeofday( &_t0, NULL ); 00156 } 00157 00158 ~Consumer() { 00159 //pthread_mutex_lock( &cout_mutex ); 00160 #ifdef SCHEDULER_DEBUG 00161 std::cout << "Consumer destructor\n"; 00162 #endif 00163 //for( size_t a = 0; a < _t.size(); a++ ) { 00164 //std::cout << (_t[a].tv_sec-_t0.tv_sec) + 00165 //(_t[a].tv_usec-_t0.tv_usec)/1e6 << "\n"; 00166 //a++; 00167 //std::cout << (_t[a].tv_sec-_t0.tv_sec) + 00168 //(_t[a].tv_usec-_t0.tv_usec)/1e6 << "\n\n\n"; 00169 //} 00170 //pthread_mutex_unlock( &cout_mutex ); 00171 } 00172 00173 void run( void ) { 00174 pthread_create( &_thread, NULL, consumer_entry, (void *)this ); 00175 } 00176 00177 void join( void ) { 00178 00179 #ifdef SCHEDULER_DEBUG 00180 std::cout << "Consumer join\n"; 00181 #endif 00182 //pthread_mutex_lock( &_mutex ); 00183 //if( _status == CONSUMER_FINISHED ) { 00184 //pthread_mutex_unlock( &_mutex ); 00185 //return; 00186 //} else if( _status == CONSUMER_CREATED ) { 00187 // 00188 //} 00189 //pthread_mutex_unlock( &_mutex ); 00190 pthread_join( _thread, NULL ); 00191 } 00192 00193 }; 00194 00195 00196 pthread_mutex_t _mutex; 00197 pthread_cond_t _scheduler_cond; 00198 pthread_cond_t _producer_cond; 00199 pthread_cond_t _consumer_cond; 00200 00201 //size_t _problems_in_c; //!< \brief Total problems in count 00202 //size_t _problems_err_c; //!< \brief Total error problems out count 00203 //std::deque<Prob*> _problems_out; //!< \brief Problems already solved 00204 00205 uint32_t _read_c; 00206 uint32_t _solved_c; 00207 std::vector<Prob *> &_problems; 00208 00209 pthread_t _scheduler_thread; 00210 std::vector<Consumer *> _consumers; 00211 00212 bool _join; 00213 bool _running; 00214 bool _error; 00215 bool _done; 00216 bool _finish; 00217 std::vector<Err> _err; 00218 std::vector<int32_t> _eprob; 00219 00220 00227 void on_error( Err &e, uint32_t pi ) { 00228 pthread_mutex_lock( &_mutex ); 00229 _err.push_back( e ); 00230 _eprob.push_back( pi ); 00231 _error = true; 00232 pthread_cond_broadcast( &_scheduler_cond ); 00233 pthread_mutex_unlock( &_mutex ); 00234 } 00235 00236 00239 void inc_solved_problem( void ) { 00240 pthread_mutex_lock( &_mutex ); 00241 _solved_c++; 00242 pthread_mutex_unlock( &_mutex ); 00243 } 00244 00251 Prob *get_next_problem( uint32_t &pi ) { 00252 #ifdef SCHEDULER_DEBUG 00253 std::cout << "get_next_problem()\n"; 00254 #endif 00255 pthread_mutex_lock( &_mutex ); 00256 00257 if( _done || _error ) { 00258 pthread_mutex_unlock( &_mutex ); 00259 #ifdef SCHEDULER_DEBUG 00260 std::cout << "get_next_problem(): Returning NULL\n"; 00261 #endif 00262 pi = -1; 00263 return( NULL ); 00264 } 00265 00266 if( _problems.size() == _read_c ) { 00267 #ifdef SCHEDULER_DEBUG 00268 std::cout << "get_next_problem(): No problem to return... waiting\n"; 00269 #endif 00270 // Signal producer that problems are spent 00271 pthread_cond_signal( &_scheduler_cond ); 00272 while( _problems.size() == _read_c ) { 00273 // Wait for new problems 00274 pthread_cond_wait( &_consumer_cond, &_mutex ); 00275 if( _done || _error ) { 00276 pthread_mutex_unlock( &_mutex ); 00277 #ifdef SCHEDULER_DEBUG 00278 std::cout << "get_next_problem(): Returning NULL\n"; 00279 #endif 00280 pi = -1; 00281 return( NULL ); 00282 } 00283 } 00284 } 00285 00286 // Return next problem 00287 pi = _read_c++; 00288 Prob *ret = _problems[pi]; 00289 00290 #ifdef SCHEDULER_DEBUG 00291 std::cout << "get_next_problem(): Returning problem " << pi << "\n"; 00292 #endif 00293 00294 pthread_mutex_unlock( &_mutex ); 00295 return( ret ); 00296 } 00297 00298 00301 void *scheduler_main( void ) { 00302 00303 #ifdef SCHEDULER_DEBUG 00304 std::cout << "Running scheduler_main()\n"; 00305 #endif 00306 00307 // Start consumer threads 00308 for( size_t a = 0; a < _consumers.size(); a++ ) 00309 _consumers[a]->run(); 00310 00311 pthread_mutex_lock( &_mutex ); 00312 00313 while( 1 ) { 00314 // Wait until all consumers are done with all problems or error occurs 00315 while( !(_problems.size() == _solved_c || _done || _error) ) { 00316 //std::cout << "scheduler_main(): scheduler_cond wait 1\n"; 00317 pthread_cond_wait( &_scheduler_cond, &_mutex ); 00318 } 00319 00320 if( (_finish && _problems.size() == _solved_c) || _done || _error ) 00321 break; 00322 00323 // Problems temporarily done 00324 pthread_cond_wait( &_scheduler_cond, &_mutex ); 00325 //std::cout << "scheduler_main(): prob_in = " << _problems_in_c 00326 //<< " prob_out = " << _problems_out_c << "\n"; 00327 //std::cout << "scheduler_main(): scheduler_cond wait 2\n"; 00328 00329 // Signal consumers to wake up 00330 pthread_cond_broadcast( &_consumer_cond ); 00331 } 00332 00333 // Broadcast: done 00334 _done = true; 00335 _running = false; 00336 pthread_cond_broadcast( &_consumer_cond ); 00337 pthread_mutex_unlock( &_mutex ); 00338 00339 // Join all consumers 00340 //std::cout << "scheduler_main(): Scheduler waiting in join\n"; 00341 for( size_t a = 0; a < _consumers.size(); a++ ) 00342 _consumers[a]->join(); 00343 00344 pthread_cond_broadcast( &_producer_cond ); 00345 //std::cout << "scheduler_main(): Exiting scheduler\n"; 00346 return( NULL ); 00347 } 00348 00349 00352 static void *scheduler_entry( void *data ) { 00353 Scheduler *scheduler = (Scheduler *)data; 00354 return( scheduler->scheduler_main() ); 00355 } 00356 00357 00358 public: 00359 00360 00365 Scheduler( std::vector<Prob *> &prob ) 00366 : _read_c(0), _solved_c(0), _problems(prob), _join(false), _running(false), 00367 _error(false), _done(false), _finish(false) { 00368 00369 // Initialize pthread objects 00370 pthread_mutex_init( &_mutex, NULL ); 00371 pthread_cond_init( &_scheduler_cond, NULL ); 00372 pthread_cond_init( &_consumer_cond, NULL ); 00373 pthread_cond_init( &_producer_cond, NULL ); 00374 } 00375 00376 00379 ~Scheduler() { 00380 00381 // Force exit 00382 _done = true; 00383 finish(); 00384 00385 pthread_mutex_destroy( &_mutex ); 00386 pthread_cond_destroy( &_scheduler_cond ); 00387 pthread_cond_destroy( &_consumer_cond ); 00388 pthread_cond_destroy( &_producer_cond ); 00389 } 00390 00391 00394 bool is_error( void ) { 00395 // No mutex needed for one bit read 00396 return( _error ); 00397 } 00398 00399 00402 bool is_running( void ) { 00403 // No mutex needed for one bit read 00404 return( _running ); 00405 } 00406 00407 00410 uint32_t get_solved_count( void ) { 00411 pthread_mutex_lock( &_mutex ); 00412 uint32_t ret = _solved_c; 00413 pthread_mutex_unlock( &_mutex ); 00414 return( ret ); 00415 } 00416 00417 00426 template <class Cont1, class Cont2> 00427 size_t get_errors( Cont1 &e, Cont2 &pi ) { 00428 pthread_mutex_lock( &_mutex ); 00429 size_t r = _err.size(); 00430 for( size_t a = 0; a < _err.size(); a++ ) { 00431 e.push_back( _err[a] ); 00432 pi.push_back( _eprob[a] ); 00433 } 00434 _err.clear(); 00435 _eprob.clear(); 00436 pthread_mutex_unlock( &_mutex ); 00437 return( r ); 00438 } 00439 00440 00447 void run( std::vector<Solv *> solv ) { 00448 00449 // Do nothing if already running 00450 if( _running ) 00451 return; 00452 00453 // Create consumer threads 00454 for( size_t a = 0; a < solv.size(); a++ ) 00455 _consumers.push_back( new Consumer( solv[a], this ) ); 00456 00457 _read_c = 0; 00458 _solved_c = 0; 00459 _join = true; 00460 _running = true; 00461 _error = false; 00462 _done = false; 00463 _finish = false; 00464 _err.clear(); 00465 _eprob.clear(); 00466 00467 // Create scheduler thread 00468 pthread_create( &_scheduler_thread, NULL, scheduler_entry, (void *)this ); 00469 } 00470 00471 00474 void lock_mutex( void ) { 00475 00476 pthread_mutex_lock( &_mutex ); 00477 } 00478 00479 00482 void unlock_mutex( void ) { 00483 00484 pthread_cond_broadcast( &_scheduler_cond ); 00485 pthread_mutex_unlock( &_mutex ); 00486 } 00487 00488 00496 bool force_exit( void ) { 00497 00498 _done = true; 00499 return( finish() ); 00500 } 00501 00510 bool finish( void ) { 00511 00512 pthread_mutex_lock( &_mutex ); 00513 if( _running ) { 00514 _finish = true; 00515 //std::cout << "finish(): scheduler_cond broadcast\n"; 00516 pthread_cond_broadcast( &_scheduler_cond ); 00517 00518 //std::cout << "finish(): producer_cond wait\n"; 00519 pthread_cond_wait( &_producer_cond, &_mutex ); 00520 } 00521 pthread_mutex_unlock( &_mutex ); 00522 00523 if( _join ) { 00524 // Delete consumer threads 00525 for( size_t a = 0; a < _consumers.size(); a++ ) 00526 delete _consumers[a]; 00527 _consumers.clear(); 00528 00529 pthread_join( _scheduler_thread, NULL ); 00530 _join = false; 00531 } 00532 00533 if( _error ) 00534 return( false ); 00535 00536 return( true ); 00537 } 00538 00539 friend class Consumer; 00540 }; 00541 00542 00543 00544 #endif