Navigation

Main Page
Download
Support
Installation
Tutorial
Examples
Reference Manual
   Version 1.0.4
   Version 1.0.4dev
      Class Index
      File List
Publications


Hosted by Get Ion Beam Simulator at SourceForge.net. Fast, secure and Free Open Source software downloads

scheduler.hpp

Go to the documentation of this file.
00001 
00005 /* Copyright (c) 2005-2009,2011 Taneli Kalvas, Jan Sarén. All rights reserved.
00006  *
00007  * You can redistribute this software and/or modify it under the terms
00008  * of the GNU General Public License as published by the Free Software
00009  * Foundation; either version 2 of the License, or (at your option)
00010  * any later version.
00011  * 
00012  * This library is distributed in the hope that it will be useful, but
00013  * WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
00015  * General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU General Public License
00018  * along with this library (file "COPYING" included in the package);
00019  * if not, write to the Free Software Foundation, Inc., 51 Franklin
00020  * Street, Fifth Floor, Boston, MA 02110-1301 USA
00021  * 
00022  * If you have questions about your rights to use or distribute this
00023  * software, please contact Berkeley Lab's Technology Transfer
00024  * Department at TTD@lbl.gov. Other questions, comments and bug
00025  * reports should be sent directly to the author via email at
00026  * taneli.kalvas@jyu.fi.
00027  * 
00028  * NOTICE. This software was developed under partial funding from the
00029  * U.S.  Department of Energy.  As such, the U.S. Government has been
00030  * granted for itself and others acting on its behalf a paid-up,
00031  * nonexclusive, irrevocable, worldwide license in the Software to
00032  * reproduce, prepare derivative works, and perform publicly and
00033  * display publicly.  Beginning five (5) years after the date
00034  * permission to assert copyright is obtained from the U.S. Department
00035  * of Energy, and subject to any subsequent five (5) year renewals,
00036  * the U.S. Government is granted for itself and others acting on its
00037  * behalf a paid-up, nonexclusive, irrevocable, worldwide license in
00038  * the Software to reproduce, prepare derivative works, distribute
00039  * copies to the public, perform publicly and display publicly, and to
00040  * permit others to do so.
00041  */
00042 
00043 #ifndef SCHEDULER_HPP
00044 #define SCHEDULER_HPP 1
00045 
00046 
00047 #include <pthread.h>
00048 #include <stdint.h>
00049 #include <iostream>
00050 #include <vector>
00051 #include <deque>
00052 //#include <sys/time.h>
00053 
00054 
00055 //#define SCHEDULER_DEBUG 1
00056 
00057 
00058 //pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER;
00059 
00060 
00085 template <class Solv, class Prob, class Err>
00086 class Scheduler {
00087 
00088     class Consumer {
00089 
00090         /*
00091         enum consumer_status_e {
00092             CONSUMER_CREATED = 0,
00093             CONSUMER_RUNNING,
00094             CONSUMER_FINISHED
00095         };
00096         */
00097 
00098         //pthread_mutex_t      _mutex;             //!< \brief Mutex for active check
00099         pthread_t            _thread;            
00100         Solv                *_solver;            
00101         Scheduler           *_scheduler;         
00102         //struct timeval       _t0;
00103         //std::vector<struct timeval> _t;
00104     
00105         void *consumer_main( void ) {
00106             //struct timeval t;
00107             
00108 #ifdef SCHEDULER_DEBUG
00109             std::cout << "Consumer main entrance\n";
00110 #endif
00111             //pthread_mutex_lock( &_mutex );
00112             //_status = CONSUMER_RUNNING;
00113             //pthread_mutex_unlock( &_mutex );
00114 
00115             Prob *p;
00116             uint32_t pi;
00117             while( (p = _scheduler->get_next_problem( pi )) ) {
00118                 try {
00119                     //gettimeofday( &t, NULL );
00120                     //_t.push_back( t );
00121                     (*_solver)( p, pi );
00122                     //gettimeofday( &t, NULL );
00123                     //_t.push_back( t );
00124                 } catch( Err e ) {
00125                     //std::cout << "on_error\n";
00126                     // Handle error and stop solving
00127                     _scheduler->on_error( e, pi );
00128                     break;
00129                 };
00130                 _scheduler->inc_solved_problem();
00131             }
00132 
00133 #ifdef SCHEDULER_DEBUG
00134             std::cout << "Exiting consumer\n";
00135 #endif
00136             //pthread_mutex_lock( &_mutex );
00137             //_status = CONSUMER_FINISHED;
00138             //pthread_mutex_unlock( &_mutex );
00139             return( NULL );
00140         }
00141     
00142     public:
00143 
00144         static void *consumer_entry( void *data ) {
00145             Consumer *consumer = (Consumer *)data;
00146             return( consumer->consumer_main() );
00147         }
00148 
00149         Consumer( Solv *solver, Scheduler *scheduler ) : _solver(solver), _scheduler(scheduler) { 
00150 
00151             //pthread_mutex_init( &_mutex, NULL );
00152 #ifdef SCHEDULER_DEBUG
00153             std::cout << "Consumer constructor\n";
00154 #endif
00155             //gettimeofday( &_t0, NULL );
00156         }
00157 
00158         ~Consumer() {
00159             //pthread_mutex_lock( &cout_mutex );
00160 #ifdef SCHEDULER_DEBUG
00161             std::cout << "Consumer destructor\n";
00162 #endif
00163             //for( size_t a = 0; a < _t.size(); a++ ) {
00164             //std::cout << (_t[a].tv_sec-_t0.tv_sec) + 
00165             //(_t[a].tv_usec-_t0.tv_usec)/1e6 << "\n";
00166             //a++;
00167             //std::cout << (_t[a].tv_sec-_t0.tv_sec) + 
00168             //(_t[a].tv_usec-_t0.tv_usec)/1e6 << "\n\n\n";
00169             //}
00170             //pthread_mutex_unlock( &cout_mutex );
00171         }
00172 
00173         void run( void ) {
00174             pthread_create( &_thread, NULL, consumer_entry, (void *)this );
00175         }
00176 
00177         void join( void ) {
00178 
00179 #ifdef SCHEDULER_DEBUG
00180             std::cout << "Consumer join\n";
00181 #endif
00182             //pthread_mutex_lock( &_mutex );
00183             //if( _status == CONSUMER_FINISHED ) {
00184             //pthread_mutex_unlock( &_mutex );
00185             //return;
00186             //} else if( _status == CONSUMER_CREATED ) {
00187             //
00188             //}
00189             //pthread_mutex_unlock( &_mutex );
00190             pthread_join( _thread, NULL );
00191         }
00192 
00193     };
00194 
00195 
00196     pthread_mutex_t         _mutex;            
00197     pthread_cond_t          _scheduler_cond;   
00198     pthread_cond_t          _producer_cond;    
00199     pthread_cond_t          _consumer_cond;    
00200 
00201     //size_t                  _problems_in_c;    //!< \brief Total problems in count
00202     //size_t                  _problems_err_c;   //!< \brief Total error problems out count
00203     //std::deque<Prob*>       _problems_out;     //!< \brief Problems already solved
00204 
00205     uint32_t                _read_c;           
00206     uint32_t                _solved_c;         
00207     std::vector<Prob *>    &_problems;         
00208 
00209     pthread_t               _scheduler_thread; 
00210     std::vector<Consumer *> _consumers;        
00211 
00212     bool                    _join;             
00213     bool                    _running;          
00214     bool                    _error;            
00215     bool                    _done;             
00216     bool                    _finish;           
00217     std::vector<Err>        _err;              
00218     std::vector<int32_t>    _eprob;            
00219 
00220 
00227     void on_error( Err &e, uint32_t pi ) {
00228         pthread_mutex_lock( &_mutex );
00229         _err.push_back( e );
00230         _eprob.push_back( pi );
00231         _error = true;
00232         pthread_cond_broadcast( &_scheduler_cond );
00233         pthread_mutex_unlock( &_mutex );
00234     }
00235 
00236 
00239     void inc_solved_problem( void ) {
00240         pthread_mutex_lock( &_mutex );
00241         _solved_c++;
00242         pthread_mutex_unlock( &_mutex );
00243     }
00244 
00251     Prob *get_next_problem( uint32_t &pi ) {
00252 #ifdef SCHEDULER_DEBUG
00253             std::cout << "get_next_problem()\n";
00254 #endif
00255         pthread_mutex_lock( &_mutex );
00256     
00257         if( _done || _error ) {
00258             pthread_mutex_unlock( &_mutex );
00259 #ifdef SCHEDULER_DEBUG
00260             std::cout << "get_next_problem(): Returning NULL\n";
00261 #endif
00262             pi = -1;
00263             return( NULL );
00264         }
00265 
00266         if( _problems.size() == _read_c ) {
00267 #ifdef SCHEDULER_DEBUG
00268             std::cout << "get_next_problem(): No problem to return... waiting\n";
00269 #endif
00270             // Signal producer that problems are spent
00271             pthread_cond_signal( &_scheduler_cond );
00272             while( _problems.size() == _read_c ) {
00273                 // Wait for new problems
00274                 pthread_cond_wait( &_consumer_cond, &_mutex );
00275                 if( _done || _error ) {
00276                     pthread_mutex_unlock( &_mutex );
00277 #ifdef SCHEDULER_DEBUG
00278                     std::cout << "get_next_problem(): Returning NULL\n";
00279 #endif
00280                     pi = -1;
00281                     return( NULL );
00282                 }
00283             }
00284         }
00285 
00286         // Return next problem
00287         pi = _read_c++;
00288         Prob *ret = _problems[pi];
00289 
00290 #ifdef SCHEDULER_DEBUG
00291         std::cout << "get_next_problem(): Returning problem " << pi << "\n";
00292 #endif
00293 
00294         pthread_mutex_unlock( &_mutex );
00295         return( ret );
00296     }
00297 
00298     
00301     void *scheduler_main( void ) {
00302 
00303 #ifdef SCHEDULER_DEBUG
00304         std::cout << "Running scheduler_main()\n";
00305 #endif
00306 
00307         // Start consumer threads
00308         for( size_t a = 0; a < _consumers.size(); a++ )
00309             _consumers[a]->run();
00310 
00311         pthread_mutex_lock( &_mutex );
00312 
00313         while( 1 ) {
00314             // Wait until all consumers are done with all problems or error occurs
00315             while( !(_problems.size() == _solved_c || _done || _error) ) {
00316                 //std::cout << "scheduler_main(): scheduler_cond wait 1\n";
00317                 pthread_cond_wait( &_scheduler_cond, &_mutex );
00318             }
00319 
00320             if( (_finish && _problems.size() == _solved_c) || _done || _error )
00321                 break;
00322 
00323             // Problems temporarily done
00324             pthread_cond_wait( &_scheduler_cond, &_mutex );
00325             //std::cout << "scheduler_main(): prob_in = " << _problems_in_c
00326             //<< " prob_out = " << _problems_out_c << "\n";
00327             //std::cout << "scheduler_main(): scheduler_cond wait 2\n";
00328 
00329             // Signal consumers to wake up
00330             pthread_cond_broadcast( &_consumer_cond );
00331         }
00332 
00333         // Broadcast: done
00334         _done = true;
00335         _running = false;
00336         pthread_cond_broadcast( &_consumer_cond );
00337         pthread_mutex_unlock( &_mutex );
00338 
00339         // Join all consumers
00340         //std::cout << "scheduler_main(): Scheduler waiting in join\n";
00341         for( size_t a = 0; a < _consumers.size(); a++ )
00342             _consumers[a]->join();
00343 
00344         pthread_cond_broadcast( &_producer_cond );
00345         //std::cout << "scheduler_main(): Exiting scheduler\n";
00346         return( NULL );
00347     }
00348 
00349 
00352     static void *scheduler_entry( void *data ) {
00353         Scheduler *scheduler = (Scheduler *)data;
00354         return( scheduler->scheduler_main() );
00355     }
00356 
00357 
00358 public:
00359 
00360 
00365     Scheduler( std::vector<Prob *> &prob ) 
00366         : _read_c(0), _solved_c(0), _problems(prob), _join(false), _running(false), 
00367           _error(false), _done(false), _finish(false) {
00368 
00369         // Initialize pthread objects
00370         pthread_mutex_init( &_mutex, NULL );
00371         pthread_cond_init( &_scheduler_cond, NULL );
00372         pthread_cond_init( &_consumer_cond, NULL );
00373         pthread_cond_init( &_producer_cond, NULL );
00374     }
00375 
00376 
00379     ~Scheduler() {
00380 
00381         // Force exit
00382         _done = true;
00383         finish();
00384 
00385         pthread_mutex_destroy( &_mutex );
00386         pthread_cond_destroy( &_scheduler_cond );
00387         pthread_cond_destroy( &_consumer_cond );
00388         pthread_cond_destroy( &_producer_cond );
00389     }
00390 
00391 
00394     bool is_error( void ) {
00395         // No mutex needed for one bit read
00396         return( _error );
00397     }
00398 
00399 
00402     bool is_running( void ) {
00403         // No mutex needed for one bit read
00404         return( _running );
00405     }
00406 
00407 
00410     uint32_t get_solved_count( void ) {
00411         pthread_mutex_lock( &_mutex );
00412         uint32_t ret = _solved_c;
00413         pthread_mutex_unlock( &_mutex );
00414         return( ret );
00415     }
00416 
00417 
00426     template <class Cont1, class Cont2>
00427     size_t get_errors( Cont1 &e, Cont2 &pi ) {
00428         pthread_mutex_lock( &_mutex );
00429         size_t r = _err.size();
00430         for( size_t a = 0; a < _err.size(); a++ ) {
00431             e.push_back( _err[a] );
00432             pi.push_back( _eprob[a] );
00433         }
00434         _err.clear();
00435         _eprob.clear();
00436         pthread_mutex_unlock( &_mutex );
00437         return( r );
00438     }    
00439 
00440 
00447     void run( std::vector<Solv *> solv ) {
00448 
00449         // Do nothing if already running
00450         if( _running )
00451             return;
00452 
00453         // Create consumer threads
00454         for( size_t a = 0; a < solv.size(); a++ )
00455             _consumers.push_back( new Consumer( solv[a], this ) );
00456 
00457         _read_c = 0;
00458         _solved_c = 0;
00459         _join = true;
00460         _running = true;
00461         _error = false;
00462         _done = false;
00463         _finish = false;
00464         _err.clear();
00465         _eprob.clear();
00466 
00467         // Create scheduler thread
00468         pthread_create( &_scheduler_thread, NULL, scheduler_entry, (void *)this );
00469     }
00470 
00471 
00474     void lock_mutex( void ) {
00475 
00476         pthread_mutex_lock( &_mutex );
00477     }
00478 
00479     
00482     void unlock_mutex( void ) {
00483 
00484         pthread_cond_broadcast( &_scheduler_cond );     
00485         pthread_mutex_unlock( &_mutex );
00486     }
00487     
00488 
00496     bool force_exit( void ) {
00497 
00498         _done = true;
00499         return( finish() );
00500     }
00501 
00510     bool finish( void ) {
00511 
00512         pthread_mutex_lock( &_mutex );
00513         if( _running ) {
00514             _finish = true;
00515             //std::cout << "finish(): scheduler_cond broadcast\n";
00516             pthread_cond_broadcast( &_scheduler_cond );
00517         
00518             //std::cout << "finish(): producer_cond wait\n";
00519             pthread_cond_wait( &_producer_cond, &_mutex );
00520         }
00521         pthread_mutex_unlock( &_mutex );
00522 
00523         if( _join ) {
00524             // Delete consumer threads
00525             for( size_t a = 0; a < _consumers.size(); a++ )
00526                 delete _consumers[a];
00527             _consumers.clear();
00528 
00529             pthread_join( _scheduler_thread, NULL );
00530             _join = false;
00531         }
00532         
00533         if( _error )
00534             return( false );
00535 
00536         return( true );
00537     }
00538 
00539     friend class Consumer;
00540 };
00541 
00542 
00543 
00544 #endif


Reference manual for Ion Beam Simulator 1.0.4dev
Generated by Doxygen 1.7.1 on Wed May 18 2011 23:03:48.