Navigation

Main Page
Download
Support
Installation
Tutorial
Examples
Reference Manual
   Version 1.0.4
      Class Index
      File List
   Version 1.0.4dev
Publications


Hosted by Get Ion Beam Simulator at SourceForge.net. Fast, secure and Free Open Source software downloads

scheduler.hpp

Go to the documentation of this file.
00001 
00005 /* Copyright (c) 2005-2009 Taneli Kalvas. All rights reserved.
00006  *
00007  * You can redistribute this software and/or modify it under the terms
00008  * of the GNU General Public License as published by the Free Software
00009  * Foundation; either version 2 of the License, or (at your option)
00010  * any later version.
00011  * 
00012  * This library is distributed in the hope that it will be useful, but
00013  * WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
00015  * General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU General Public License
00018  * along with this library (file "COPYING" included in the package);
00019  * if not, write to the Free Software Foundation, Inc., 51 Franklin
00020  * Street, Fifth Floor, Boston, MA 02110-1301 USA
00021  * 
00022  * If you have questions about your rights to use or distribute this
00023  * software, please contact Berkeley Lab's Technology Transfer
00024  * Department at TTD@lbl.gov. Other questions, comments and bug
00025  * reports should be sent directly to the author via email at
00026  * taneli.kalvas@jyu.fi.
00027  * 
00028  * NOTICE. This software was developed under partial funding from the
00029  * U.S.  Department of Energy.  As such, the U.S. Government has been
00030  * granted for itself and others acting on its behalf a paid-up,
00031  * nonexclusive, irrevocable, worldwide license in the Software to
00032  * reproduce, prepare derivative works, and perform publicly and
00033  * display publicly.  Beginning five (5) years after the date
00034  * permission to assert copyright is obtained from the U.S. Department
00035  * of Energy, and subject to any subsequent five (5) year renewals,
00036  * the U.S. Government is granted for itself and others acting on its
00037  * behalf a paid-up, nonexclusive, irrevocable, worldwide license in
00038  * the Software to reproduce, prepare derivative works, distribute
00039  * copies to the public, perform publicly and display publicly, and to
00040  * permit others to do so.
00041  */
00042 
00043 #ifndef SCHEDULER_HPP
00044 #define SCHEDULER_HPP 1
00045 
00046 
00047 #include <pthread.h>
00048 #include <iostream>
00049 #include <vector>
00050 #include <deque>
00051 //#include <sys/time.h>
00052 
00053 
00054 //pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER;
00055 
00056 
00083 template <class Solv, class Prob, class Err>
00084 class Scheduler {
00085 
00086     class Consumer {
00087 
00088         /*
00089         enum consumer_status_e {
00090             CONSUMER_CREATED = 0,
00091             CONSUMER_RUNNING,
00092             CONSUMER_FINISHED
00093         };
00094         */
00095 
00096         //pthread_mutex_t      _mutex;            //!< \brief Mutex for active check
00097         pthread_t            _thread;
00098         Solv                *_solver;
00099         Scheduler           *_scheduler;
00100         //struct timeval       _t0;
00101         //std::vector<struct timeval> _t;
00102     
00103         void *consumer_main( void ) {
00104             Prob *p;
00105             //struct timeval t;
00106             
00107             //pthread_mutex_lock( &_mutex );
00108             //_status = CONSUMER_RUNNING;
00109             //pthread_mutex_unlock( &_mutex );
00110 
00111             while( (p = _scheduler->get_next_problem()) ) {
00112                 try {
00113                     //gettimeofday( &t, NULL );
00114                     //_t.push_back( t );
00115                     (*_solver)( p, *_scheduler );
00116                     //gettimeofday( &t, NULL );
00117                     //_t.push_back( t );
00118                 } catch( Err e ) {
00119                     //std::cout << "on_error\n";
00120                     // Handle error and stop solving
00121                     _scheduler->on_error( e, p );
00122                     break;
00123                 };
00124                 _scheduler->put_solved_problem( p );
00125             }
00126       
00127             //std::cout << "Exiting consumer\n";
00128             //pthread_mutex_lock( &_mutex );
00129             //_status = CONSUMER_FINISHED;
00130             //pthread_mutex_unlock( &_mutex );
00131             return( NULL );
00132         }
00133     
00134     public:
00135 
00136         static void *consumer_entry( void *data ) {
00137             Consumer *consumer = (Consumer *)data;
00138             return( consumer->consumer_main() );
00139         }
00140 
00141         Consumer( Solv *solver, Scheduler *scheduler ) : _solver(solver), _scheduler(scheduler) { 
00142 
00143             //pthread_mutex_init( &_mutex, NULL );
00144             //std::cout << "Start\n";
00145             //gettimeofday( &_t0, NULL );
00146         }
00147 
00148         ~Consumer() {
00149             //pthread_mutex_lock( &cout_mutex );
00150             //std::cout << "End\n";
00151             //for( size_t a = 0; a < _t.size(); a++ ) {
00152             //std::cout << (_t[a].tv_sec-_t0.tv_sec) + 
00153             //(_t[a].tv_usec-_t0.tv_usec)/1e6 << "\n";
00154             //a++;
00155             //std::cout << (_t[a].tv_sec-_t0.tv_sec) + 
00156             //(_t[a].tv_usec-_t0.tv_usec)/1e6 << "\n\n\n";
00157             //}
00158             //pthread_mutex_unlock( &cout_mutex );
00159         }
00160 
00161         void run( void ) {
00162             pthread_create( &_thread, NULL, consumer_entry, (void *)this );
00163         }
00164 
00165         void join( void ) {
00166             //pthread_mutex_lock( &_mutex );
00167             //if( _status == CONSUMER_FINISHED ) {
00168             //pthread_mutex_unlock( &_mutex );
00169             //return;
00170             //} else if( _status == CONSUMER_CREATED ) {
00171             //
00172             //}
00173             //pthread_mutex_unlock( &_mutex );
00174             pthread_join( _thread, NULL );
00175         }
00176 
00177     };
00178 
00179 
00180     pthread_mutex_t         _mutex;            
00181     pthread_cond_t          _scheduler_cond;   
00182     pthread_cond_t          _producer_cond;    
00183     pthread_cond_t          _consumer_cond;    
00184 
00185     size_t                  _problems_in_c;    
00186     size_t                  _problems_out_c;   
00187     size_t                  _problems_err_c;   
00188     std::deque<Prob*>       _problems_in;      
00189     std::deque<Prob*>       _problems_out;     
00190 
00191     pthread_t               _scheduler_thread; 
00192     std::vector<Consumer *> _consumers;        
00193 
00194     bool                    _running;          
00195     bool                    _error;            
00196     bool                    _done;             
00197     bool                    _finish;           
00198     std::vector<Err>        _err;              
00199     std::vector<Prob *>     _prob;             
00200 
00201 
00207     void on_error( Err &e, Prob *p ) {
00208         pthread_mutex_lock( &_mutex );
00209         _err.push_back( e );
00210         _prob.push_back( p );
00211         _problems_err_c++;
00212         _error = true;
00213         pthread_cond_broadcast( &_scheduler_cond );
00214         pthread_mutex_unlock( &_mutex );
00215     }
00216 
00217 
00218     Prob *get_next_problem( void ) {
00219         Prob *ret;
00220         pthread_mutex_lock( &_mutex );
00221     
00222         if( _done || _error ) {
00223             pthread_mutex_unlock( &_mutex );
00224             return( NULL );
00225         }
00226 
00227         if( _problems_in.empty() ) {
00228             // Signal producer that problems are spent
00229             pthread_cond_signal( &_scheduler_cond );
00230             while( _problems_in.empty() ) {
00231                 // Wait for new problems
00232                 pthread_cond_wait( &_consumer_cond, &_mutex );
00233                 if( _done || _error ) {
00234                     pthread_mutex_unlock( &_mutex );
00235                     return( NULL );
00236                 }
00237             }
00238         }
00239 
00240         // Return next problem
00241         ret = _problems_in.front();
00242         _problems_in.pop_front();
00243         pthread_mutex_unlock( &_mutex );
00244         return( ret );
00245     }
00246 
00247 
00248     void put_solved_problem( Prob *p ) {
00249         pthread_mutex_lock( &_mutex );
00250         _problems_out_c++;
00251         //std::cout << "put_solved_problem(): " << _problems_out_c << "\n";     
00252         _problems_out.push_back( p );
00253         pthread_mutex_unlock( &_mutex );
00254     }
00255 
00256 
00257     void *scheduler_main( void ) {
00258 
00259         // Moved from
00260         for( size_t a = 0; a < _consumers.size(); a++ )
00261             _consumers[a]->run();
00262 
00263         pthread_mutex_lock( &_mutex );
00264 
00265         while( 1 ) {
00266             // Wait until all consumers are done with all problems or error occurs
00267             while( !(_problems_in.empty() || _done || _error) ) {
00268                 //std::cout << "scheduler_main(): scheduler_cond wait 1\n";
00269                 pthread_cond_wait( &_scheduler_cond, &_mutex );
00270             }
00271 
00272             if( (_finish && _problems_in_c == _problems_out_c+_problems_err_c) || 
00273                 _done || _error )
00274                 break;
00275 
00276             // Problems temporarily done
00277             pthread_cond_wait( &_scheduler_cond, &_mutex );
00278             //std::cout << "scheduler_main(): prob_in = " << _problems_in_c
00279             //<< " prob_out = " << _problems_out_c << "\n";
00280             //std::cout << "scheduler_main(): scheduler_cond wait 2\n";
00281 
00282             // Signal consumers to wake up
00283             pthread_cond_broadcast( &_consumer_cond );
00284         }
00285 
00286         // Broadcast done
00287         _done = true;
00288         pthread_cond_broadcast( &_consumer_cond );
00289         pthread_mutex_unlock( &_mutex );
00290 
00291         // Join all consumers
00292         //std::cout << "scheduler_main(): Scheduler waiting in join\n";
00293         for( size_t a = 0; a < _consumers.size(); a++ )
00294             _consumers[a]->join();
00295 
00296         pthread_cond_broadcast( &_producer_cond );
00297         //std::cout << "scheduler_main(): Exiting scheduler\n";
00298         _running = false;
00299         return( NULL );
00300     }
00301 
00302 
00303 
00304 
00305     static void *scheduler_entry( void *data ) {
00306         Scheduler *scheduler = (Scheduler *)data;
00307         return( scheduler->scheduler_main() );
00308     }
00309 
00310 
00311 public:
00312 
00313 
00319     Scheduler( std::vector<Solv *> s )
00320         : _problems_in_c(0), _problems_out_c(0), _problems_err_c(0), _running(false) {
00321 
00322         pthread_mutex_init( &_mutex, NULL );
00323         pthread_cond_init( &_scheduler_cond, NULL );
00324         pthread_cond_init( &_consumer_cond, NULL );
00325         pthread_cond_init( &_producer_cond, NULL );
00326 
00327         // Create consumer threads
00328         for( size_t a = 0; a < s.size(); a++ )
00329             _consumers.push_back( new Consumer( s[a], this ) );
00330     }
00331 
00332 
00335     ~Scheduler() {
00336         finish();
00337         pthread_join( _scheduler_thread, NULL );
00338 
00339         pthread_mutex_destroy( &_mutex );
00340         pthread_cond_destroy( &_scheduler_cond );
00341         pthread_cond_destroy( &_consumer_cond );
00342         pthread_cond_destroy( &_producer_cond );
00343 
00344         // Delete consumer threads
00345         for( size_t a = 0; a < _consumers.size(); a++ )
00346             delete _consumers[a];
00347     }
00348 
00349 
00355     template <class Cont>
00356     size_t get_solved_problems( Cont &c ) {
00357         pthread_mutex_lock( &_mutex );
00358         size_t r = _problems_out.size();
00359         while( !_problems_out.empty() ) {
00360             c.push_back( _problems_out.front() );
00361             _problems_out.pop_front();
00362         }
00363         pthread_mutex_unlock( &_mutex );
00364         return( r );
00365     }
00366 
00367     
00370     bool is_error( void ) {
00371         // No mutex needed for one bit read
00372         return( _error );
00373     }
00374 
00375 
00378     bool is_running( void ) {
00379         // No mutex needed for one bit read
00380         return( _running );
00381     }
00382 
00383 
00390     template <class Cont1, class Cont2>
00391     size_t get_errors( Cont1 &e, Cont2 &p ) {
00392         pthread_mutex_lock( &_mutex );
00393         size_t r = _err.size();
00394         for( size_t a = 0; a < _err.size(); a++ ) {
00395             e.push_back( _err[a] );
00396             p.push_back( _prob[a] );
00397         }
00398         _err.clear();
00399         _prob.clear();
00400         pthread_mutex_unlock( &_mutex );
00401         return( r );
00402     }    
00403 
00410     void run( void ) {
00411 
00412         if( _running )
00413             return;
00414         _running = true;
00415         _error = false;
00416         _done = false;
00417         _finish = false;
00418         _err.clear();
00419         _prob.clear();
00420         pthread_create( &_scheduler_thread, NULL, scheduler_entry, (void *)this );
00421     }
00422 
00423 
00426     void add_problem( Prob *p ) {
00427 
00428         pthread_mutex_lock( &_mutex );
00429         _problems_in_c++;
00430         _problems_in.push_back( p );
00431         pthread_cond_broadcast( &_scheduler_cond );     
00432         pthread_mutex_unlock( &_mutex );
00433     }
00434 
00435 
00438     void add_problems( std::vector<Prob *> p ) {
00439 
00440         pthread_mutex_lock( &_mutex );
00441         _problems_in_c += p.size();
00442         _problems_in.insert( _problems_in.end(), p.begin(), p.end() );
00443         pthread_cond_broadcast( &_scheduler_cond );     
00444         pthread_mutex_unlock( &_mutex );
00445     }
00446 
00447 
00453     bool finish( void ) {
00454         if( _finish )
00455             return( true );
00456         if( !_running )
00457             return( false );
00458 
00459         pthread_mutex_lock( &_mutex );
00460         _finish = true;
00461         //std::cout << "finish(): scheduler_cond broadcast\n";
00462         pthread_cond_broadcast( &_scheduler_cond );
00463 
00464         //std::cout << "finish(): producer_cond wait\n";
00465         pthread_cond_wait( &_producer_cond, &_mutex );
00466         pthread_mutex_unlock( &_mutex );
00467 
00468         if( _error )
00469             return( false );
00470         return( true );
00471     }
00472 
00473 
00474     friend class Consumer;
00475 };
00476 
00477 
00478 
00479 #endif
00480 
00481 
00482 
00483 
00484 
00485 
00486 
00487 
00488 
00489 
00490 
00491 
00492 
00493 
00494 
00495 
00496 
00497 
00498 


Reference manual for Ion Beam Simulator 1.0.4
Generated by Doxygen 1.7.1 on Wed Apr 13 2011 23:25:33.