Navigation

Main Page
Download
Support
Installation
Tutorial
Examples
Reference Manual
   Version 1.0.5new_solver
   Version 1.0.5dev
      Class Index
      File List
   Version 1.0.5b
   Version 1.0.4dev
   Version 1.0.4
Publications


Hosted by Get Ion Beam Simulator at SourceForge.net. Fast, secure and Free Open Source software downloads

scheduler.hpp

Go to the documentation of this file.
00001 
00005 /* Copyright (c) 2005-2009,2011 Taneli Kalvas, Jan Sarén. All rights reserved.
00006  *
00007  * You can redistribute this software and/or modify it under the terms
00008  * of the GNU General Public License as published by the Free Software
00009  * Foundation; either version 2 of the License, or (at your option)
00010  * any later version.
00011  * 
00012  * This library is distributed in the hope that it will be useful, but
00013  * WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
00015  * General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU General Public License
00018  * along with this library (file "COPYING" included in the package);
00019  * if not, write to the Free Software Foundation, Inc., 51 Franklin
00020  * Street, Fifth Floor, Boston, MA 02110-1301 USA
00021  * 
00022  * If you have questions about your rights to use or distribute this
00023  * software, please contact Berkeley Lab's Technology Transfer
00024  * Department at TTD@lbl.gov. Other questions, comments and bug
00025  * reports should be sent directly to the author via email at
00026  * taneli.kalvas@jyu.fi.
00027  * 
00028  * NOTICE. This software was developed under partial funding from the
00029  * U.S.  Department of Energy.  As such, the U.S. Government has been
00030  * granted for itself and others acting on its behalf a paid-up,
00031  * nonexclusive, irrevocable, worldwide license in the Software to
00032  * reproduce, prepare derivative works, and perform publicly and
00033  * display publicly.  Beginning five (5) years after the date
00034  * permission to assert copyright is obtained from the U.S. Department
00035  * of Energy, and subject to any subsequent five (5) year renewals,
00036  * the U.S. Government is granted for itself and others acting on its
00037  * behalf a paid-up, nonexclusive, irrevocable, worldwide license in
00038  * the Software to reproduce, prepare derivative works, distribute
00039  * copies to the public, perform publicly and display publicly, and to
00040  * permit others to do so.
00041  */
00042 
00043 #ifndef SCHEDULER_HPP
00044 #define SCHEDULER_HPP 1
00045 
00046 
00047 #include <pthread.h>
00048 #include <stdint.h>
00049 #include <iostream>
00050 #include <vector>
00051 #include <deque>
00052 #include <time.h>
00053 #include "comptime.hpp"
00054 
00055 
00056 //#define SCHEDULER_DEBUG 1
00057 
00058 
00059 //pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER;
00060 
00061 
00086 template <class Solv, class Prob, class Err>
00087 class Scheduler {
00088 
00089     class Consumer {
00090 
00091         /*
00092         enum consumer_status_e {
00093             CONSUMER_CREATED = 0,
00094             CONSUMER_RUNNING,
00095             CONSUMER_FINISHED
00096         };
00097         */
00098 
00099         //pthread_mutex_t      _mutex;             //!< \brief Mutex for active check
00100         pthread_t            _thread;            
00101         Solv                *_solver;            
00102         Scheduler           *_scheduler;         
00103         //struct timeval       _t0;
00104         //std::vector<struct timeval> _t;
00105     
00106         void *consumer_main( void ) {
00107             //struct timeval t;
00108             
00109 #ifdef SCHEDULER_DEBUG
00110             std::cout << "Consumer main entrance\n";
00111 #endif
00112             //pthread_mutex_lock( &_mutex );
00113             //_status = CONSUMER_RUNNING;
00114             //pthread_mutex_unlock( &_mutex );
00115 
00116             Prob *p;
00117             uint32_t pi;
00118             while( (p = _scheduler->get_next_problem( pi )) ) {
00119                 try {
00120                     //gettimeofday( &t, NULL );
00121                     //_t.push_back( t );
00122                     (*_solver)( p, pi );
00123                     //gettimeofday( &t, NULL );
00124                     //_t.push_back( t );
00125                 } catch( Err e ) {
00126                     //std::cout << "on_error\n";
00127                     // Handle error and stop solving
00128                     _scheduler->on_error( e, pi );
00129                     break;
00130                 };
00131                 _scheduler->inc_solved_problem();
00132             }
00133 
00134 #ifdef SCHEDULER_DEBUG
00135             std::cout << "Exiting consumer\n";
00136 #endif
00137             //pthread_mutex_lock( &_mutex );
00138             //_status = CONSUMER_FINISHED;
00139             //pthread_mutex_unlock( &_mutex );
00140             return( NULL );
00141         }
00142     
00143     public:
00144 
00145         static void *consumer_entry( void *data ) {
00146             Consumer *consumer = (Consumer *)data;
00147             return( consumer->consumer_main() );
00148         }
00149 
00150         Consumer( Solv *solver, Scheduler *scheduler ) : _solver(solver), _scheduler(scheduler) { 
00151 
00152             //pthread_mutex_init( &_mutex, NULL );
00153 #ifdef SCHEDULER_DEBUG
00154             std::cout << "Consumer constructor\n";
00155 #endif
00156             //gettimeofday( &_t0, NULL );
00157         }
00158 
00159         ~Consumer() {
00160             //pthread_mutex_lock( &cout_mutex );
00161 #ifdef SCHEDULER_DEBUG
00162             std::cout << "Consumer destructor\n";
00163 #endif
00164             //for( size_t a = 0; a < _t.size(); a++ ) {
00165             //std::cout << (_t[a].tv_sec-_t0.tv_sec) + 
00166             //(_t[a].tv_usec-_t0.tv_usec)/1e6 << "\n";
00167             //a++;
00168             //std::cout << (_t[a].tv_sec-_t0.tv_sec) + 
00169             //(_t[a].tv_usec-_t0.tv_usec)/1e6 << "\n\n\n";
00170             //}
00171             //pthread_mutex_unlock( &cout_mutex );
00172         }
00173 
00174         void run( void ) {
00175             pthread_create( &_thread, NULL, consumer_entry, (void *)this );
00176         }
00177 
00178         void join( void ) {
00179 
00180 #ifdef SCHEDULER_DEBUG
00181             std::cout << "Consumer join\n";
00182 #endif
00183             //pthread_mutex_lock( &_mutex );
00184             //if( _status == CONSUMER_FINISHED ) {
00185             //pthread_mutex_unlock( &_mutex );
00186             //return;
00187             //} else if( _status == CONSUMER_CREATED ) {
00188             //
00189             //}
00190             //pthread_mutex_unlock( &_mutex );
00191             pthread_join( _thread, NULL );
00192         }
00193 
00194     };
00195 
00196 
00197     pthread_mutex_t         _mutex;            
00198     pthread_cond_t          _scheduler_cond;   
00199     pthread_cond_t          _producer_cond;    
00200     pthread_cond_t          _consumer_cond;    
00201 
00202     //size_t                  _problems_in_c;    //!< \brief Total problems in count
00203     //size_t                  _problems_err_c;   //!< \brief Total error problems out count
00204     //std::deque<Prob*>       _problems_out;     //!< \brief Problems already solved
00205 
00206     uint32_t                _read_c;           
00207     uint32_t                _solved_c;         
00208     std::vector<Prob *>    &_problems;         
00209 
00210     pthread_t               _scheduler_thread; 
00211     std::vector<Consumer *> _consumers;        
00212 
00213     bool                    _join;             
00214     bool                    _running;          
00215     bool                    _error;            
00216     bool                    _done;             
00217     bool                    _finish;           
00218     std::vector<Err>        _err;              
00219     std::vector<int32_t>    _eprob;            
00220 
00221 
00228     void on_error( Err &e, uint32_t pi ) {
00229         pthread_mutex_lock( &_mutex );
00230         _err.push_back( e );
00231         _eprob.push_back( pi );
00232         _error = true;
00233         pthread_cond_broadcast( &_scheduler_cond );
00234         pthread_mutex_unlock( &_mutex );
00235     }
00236 
00237 
00240     void inc_solved_problem( void ) {
00241         pthread_mutex_lock( &_mutex );
00242         _solved_c++;
00243         pthread_mutex_unlock( &_mutex );
00244     }
00245 
00252     Prob *get_next_problem( uint32_t &pi ) {
00253 #ifdef SCHEDULER_DEBUG
00254             std::cout << "get_next_problem()\n";
00255 #endif
00256         pthread_mutex_lock( &_mutex );
00257     
00258         if( _done || _error ) {
00259             pthread_mutex_unlock( &_mutex );
00260 #ifdef SCHEDULER_DEBUG
00261             std::cout << "get_next_problem(): Returning NULL\n";
00262 #endif
00263             pi = -1;
00264             return( NULL );
00265         }
00266 
00267         if( _problems.size() == _read_c ) {
00268 #ifdef SCHEDULER_DEBUG
00269             std::cout << "get_next_problem(): No problem to return... waiting\n";
00270 #endif
00271             // Signal producer that problems are spent
00272             pthread_cond_signal( &_scheduler_cond );
00273             while( _problems.size() == _read_c ) {
00274                 // Wait for new problems
00275                 pthread_cond_wait( &_consumer_cond, &_mutex );
00276                 if( _done || _error ) {
00277                     pthread_mutex_unlock( &_mutex );
00278 #ifdef SCHEDULER_DEBUG
00279                     std::cout << "get_next_problem(): Returning NULL\n";
00280 #endif
00281                     pi = -1;
00282                     return( NULL );
00283                 }
00284             }
00285         }
00286 
00287         // Return next problem
00288         pi = _read_c++;
00289         Prob *ret = _problems[pi];
00290 
00291 #ifdef SCHEDULER_DEBUG
00292         std::cout << "get_next_problem(): Returning problem " << pi << "\n";
00293 #endif
00294 
00295         pthread_mutex_unlock( &_mutex );
00296         return( ret );
00297     }
00298 
00299     
00302     void *scheduler_main( void ) {
00303 
00304 #ifdef SCHEDULER_DEBUG
00305         std::cout << "Running scheduler_main()\n";
00306 #endif
00307 
00308         // Start consumer threads
00309         for( size_t a = 0; a < _consumers.size(); a++ )
00310             _consumers[a]->run();
00311 
00312         pthread_mutex_lock( &_mutex );
00313 
00314         while( 1 ) {
00315             // Wait until all consumers are done with all problems or error occurs
00316             while( !(_problems.size() == _solved_c || _done || _error) ) {
00317                 //std::cout << "scheduler_main(): scheduler_cond wait 1\n";
00318                 pthread_cond_wait( &_scheduler_cond, &_mutex );
00319             }
00320 
00321             if( (_finish && _problems.size() == _solved_c) || _done || _error )
00322                 break;
00323 
00324             // Problems temporarily done
00325             pthread_cond_wait( &_scheduler_cond, &_mutex );
00326             //std::cout << "scheduler_main(): prob_in = " << _problems_in_c
00327             //<< " prob_out = " << _problems_out_c << "\n";
00328             //std::cout << "scheduler_main(): scheduler_cond wait 2\n";
00329 
00330             // Signal consumers to wake up
00331             pthread_cond_broadcast( &_consumer_cond );
00332         }
00333 
00334         // Broadcast: done
00335         _done = true;
00336         _running = false;
00337         pthread_cond_broadcast( &_consumer_cond );
00338         pthread_mutex_unlock( &_mutex );
00339 
00340         // Join all consumers
00341         //std::cout << "scheduler_main(): Scheduler waiting in join\n";
00342         for( size_t a = 0; a < _consumers.size(); a++ )
00343             _consumers[a]->join();
00344 
00345         pthread_cond_broadcast( &_producer_cond );
00346         //std::cout << "scheduler_main(): Exiting scheduler\n";
00347         return( NULL );
00348     }
00349 
00350 
00353     static void *scheduler_entry( void *data ) {
00354         Scheduler *scheduler = (Scheduler *)data;
00355         return( scheduler->scheduler_main() );
00356     }
00357 
00358 
00359 public:
00360 
00361 
00366     Scheduler( std::vector<Prob *> &prob ) 
00367         : _read_c(0), _solved_c(0), _problems(prob), _join(false), _running(false), 
00368           _error(false), _done(false), _finish(false) {
00369 
00370         // Initialize pthread objects
00371         pthread_mutex_init( &_mutex, NULL );
00372         pthread_cond_init( &_scheduler_cond, NULL );
00373         pthread_cond_init( &_consumer_cond, NULL );
00374         pthread_cond_init( &_producer_cond, NULL );
00375     }
00376 
00377 
00380     ~Scheduler() {
00381 
00382         // Force exit
00383         _done = true;
00384         finish();
00385 
00386         pthread_mutex_destroy( &_mutex );
00387         pthread_cond_destroy( &_scheduler_cond );
00388         pthread_cond_destroy( &_consumer_cond );
00389         pthread_cond_destroy( &_producer_cond );
00390     }
00391 
00392 
00395     bool is_error( void ) {
00396         // No mutex needed for one bit read
00397         return( _error );
00398     }
00399 
00400 
00403     bool is_running( void ) {
00404         // No mutex needed for one bit read
00405         return( _running );
00406     }
00407 
00408 
00411     uint32_t get_solved_count( void ) {
00412         pthread_mutex_lock( &_mutex );
00413         uint32_t ret = _solved_c;
00414         pthread_mutex_unlock( &_mutex );
00415         return( ret );
00416     }
00417 
00418 
00421     uint32_t get_problem_count( void ) {
00422         pthread_mutex_lock( &_mutex );
00423         uint32_t ret = _problems.size();
00424         pthread_mutex_unlock( &_mutex );
00425         return( ret );
00426     }
00427 
00428 
00437     template <class Cont1, class Cont2>
00438     size_t get_errors( Cont1 &e, Cont2 &pi ) {
00439         pthread_mutex_lock( &_mutex );
00440         size_t r = _err.size();
00441         for( size_t a = 0; a < _err.size(); a++ ) {
00442             e.push_back( _err[a] );
00443             pi.push_back( _eprob[a] );
00444         }
00445         _err.clear();
00446         _eprob.clear();
00447         pthread_mutex_unlock( &_mutex );
00448         return( r );
00449     }    
00450 
00451 
00458     void run( std::vector<Solv *> solv ) {
00459 
00460         // Do nothing if already running
00461         if( _running )
00462             return;
00463 
00464         // Create consumer threads
00465         for( size_t a = 0; a < solv.size(); a++ )
00466             _consumers.push_back( new Consumer( solv[a], this ) );
00467 
00468         _read_c = 0;
00469         _solved_c = 0;
00470         _join = true;
00471         _running = true;
00472         _error = false;
00473         _done = false;
00474         _finish = false;
00475         _err.clear();
00476         _eprob.clear();
00477 
00478         // Create scheduler thread
00479         pthread_create( &_scheduler_thread, NULL, scheduler_entry, (void *)this );
00480     }
00481 
00482 
00485     void lock_mutex( void ) {
00486 
00487         pthread_mutex_lock( &_mutex );
00488     }
00489 
00490     
00493     void unlock_mutex( void ) {
00494 
00495         pthread_cond_broadcast( &_scheduler_cond );     
00496         pthread_mutex_unlock( &_mutex );
00497     }
00498     
00499 
00507     bool force_exit( void ) {
00508 
00509         _done = true;
00510         return( finish() );
00511     }
00512 
00519     bool wait_finish( void ) {
00520 
00521         pthread_mutex_lock( &_mutex );
00522         if( _running ) {
00523             _finish = true;
00524             pthread_cond_broadcast( &_scheduler_cond );
00525 
00526             struct timespec ts;
00527             ibs_clock_gettime( CLOCK_REALTIME, &ts );
00528             ts.tv_sec += 1;
00529             int rc = pthread_cond_timedwait( &_producer_cond, &_mutex, &ts );
00530             if( rc == ETIMEDOUT ) {
00531                 pthread_mutex_unlock( &_mutex );
00532                 return( false );
00533             }
00534         }
00535         pthread_mutex_unlock( &_mutex );
00536         return( true );
00537     }
00538 
00547     bool finish( void ) {
00548 
00549         pthread_mutex_lock( &_mutex );
00550         if( _running ) {
00551             _finish = true;
00552             //std::cout << "finish(): scheduler_cond broadcast\n";
00553             pthread_cond_broadcast( &_scheduler_cond );
00554         
00555             //std::cout << "finish(): producer_cond wait\n";
00556             pthread_cond_wait( &_producer_cond, &_mutex );
00557         }
00558         pthread_mutex_unlock( &_mutex );
00559 
00560         if( _join ) {
00561             // Delete consumer threads
00562             for( size_t a = 0; a < _consumers.size(); a++ )
00563                 delete _consumers[a];
00564             _consumers.clear();
00565 
00566             pthread_join( _scheduler_thread, NULL );
00567             _join = false;
00568         }
00569         
00570         if( _error )
00571             return( false );
00572 
00573         return( true );
00574     }
00575 
00576     friend class Consumer;
00577 };
00578 
00579 
00580 
00581 #endif
00582 


Reference manual for Ion Beam Simulator 1.0.5dev
Generated by Doxygen 1.7.1 on Mon Feb 6 2012 15:07:16.