#ifndef UTIL__THREAD_POOL__HPP #define UTIL__THREAD_POOL__HPP /* $Id: thread_pool.hpp 99785 2023-05-10 17:50:50Z vasilche $ * =========================================================================== * * PUBLIC DOMAIN NOTICE * National Center for Biotechnology Information * * This software/database is a "United States Government Work" under the * terms of the United States Copyright Act. It was written as part of * the author's official duties as a United States Government employee and * thus cannot be copyrighted. This software/database is freely available * to the public for use. The National Library of Medicine and the U.S. * Government have not placed any restriction on its use or reproduction. * * Although all reasonable efforts have been taken to ensure the accuracy * and reliability of the software and data, the NLM and the U.S. * Government do not and cannot warrant the performance or results that * may be obtained by using this software or data. The NLM and the U.S. * Government disclaim all warranties, express or implied, including * warranties of performance, merchantability or fitness for any particular * purpose. * * Please cite the author in any work or product based on this material. * * =========================================================================== * * Author: Denis Vakatov, Pavel Ivanov * */ /// @file thread_pool.hpp /// Pool of generic task-executing threads. /// /// CThreadPool -- base implementation of pool of threads /// CThreadPool_Thread -- base implementation of thread in pool of threads /// CThreadPool_Task -- abstract task for executing in thread pool /// CThreadPool_Controller /// -- abstract class to control the number of threads /// in pool of threads // Old interfaces and classes #include <util/thread_pool_old.hpp> /** @addtogroup ThreadedPools * * @{ */ BEGIN_NCBI_SCOPE class CThreadPool; class CThreadPool_Impl; class CThreadPool_Task; class CThreadPool_Controller; class CThreadPool_Thread; class CThreadPool_ThreadImpl; class CThreadPoolException; /// Abstract class for representing single task executing in pool of threads /// To use this class in application you should inherit your own class from it /// and define method Execute() - the main method where all task logic /// executes. /// Every single task can be executed (or canceled before execution) /// only once and only in one pool. class NCBI_XUTIL_EXPORT CThreadPool_Task : public CObject { public: /// Status of the task enum EStatus { eIdle, ///< has not been placed in queue yet eQueued, ///< in the queue, awaiting execution eExecuting, ///< being executed eCompleted, ///< executed successfully eFailed, ///< failure during execution eCanceled ///< canceled - possible only if canceled before ///< processing was started or if method Execute() ///< returns result eCanceled }; /// Constructor /// @param priority /// Priority of the task - the smaller the priority, /// the sooner the task will execute CThreadPool_Task(unsigned int priority = 0); /// Do the actual job. Called by whichever thread handles this task. /// @return /// Result of task execution (the status will be set accordingly) /// @note /// Only 3 values are allowed: eCompleted, eFailed, eCanceled. virtual EStatus Execute(void) = 0; /// Cancel the task. /// Equivalent to calling CThreadPool::CancelTask(task). /// @note /// If the task is executing it may not be canceled right away. It is /// responsibility of method Execute() implementation to check /// value of IsCancelRequested() periodically and finish its execution /// when this value is TRUE. /// @note /// If the task has already finished its execution then do nothing. void RequestToCancel(void); /// Check if cancellation of the task was requested bool IsCancelRequested(void) const; /// Get status of the task EStatus GetStatus(void) const; /// Check if task execution has been already finished /// (successfully or not) bool IsFinished(void) const; /// Get priority of the task unsigned int GetPriority(void) const; protected: /// Callback to notify on changes in the task status /// @param old /// Task status before the change. Current value can be obtained from /// GetStatus(). /// /// @note /// Status eQueued is set before task is actually pushed to the queue. /// After eQueued status eIdle can appear if /// insertion into the queue failed because of timeout. /// Status eCanceled will be set only in 2 cases: /// - if task is not executing yet and RequestToCancel() called, or /// - if method Execute() returned eCanceled. /// To check if task cancellation is requested during its execution /// use methods OnCancelRequested() or IsCancelRequested(). /// @sa OnCancelRequested(), IsCancelRequested(), GetStatus() virtual void OnStatusChange(EStatus old); /// Callback to notify when cancellation of the task is requested /// @sa OnStatusChange() virtual void OnCancelRequested(void); /// Copy ctor CThreadPool_Task(const CThreadPool_Task& other); /// Assignment /// @note /// There is a possible race condition if request is assigned /// and added to the pool at the same time by different threads. /// It is a responsibility of the derived class to avoid this race. CThreadPool_Task& operator= (const CThreadPool_Task& other); /// The thread pool which accepted this task for execution /// @sa CThreadPool::AddTask() CThreadPool* GetPool(void) const; /// Destructor. Will be called from CRef. virtual ~CThreadPool_Task(void); private: friend class CThreadPool_Impl; /// Init all members in constructor /// @param priority /// Priority of the task void x_Init(unsigned int priority); /// Set pool as owner of this task. void x_SetOwner(CThreadPool_Impl* pool); /// Detach task from the pool (if insertion into the pool has failed). void x_ResetOwner(void); /// Set task status void x_SetStatus(EStatus new_status); /// Internal canceling of the task void x_RequestToCancel(void); /// Flag indicating that the task is already added to some pool CAtomicCounter_WithAutoInit m_IsBusy; /// Pool owning this task atomic<CThreadPool_Impl*> m_Pool; /// Priority of the task unsigned int m_Priority; /// Status of the task atomic<EStatus> m_Status; /// Flag indicating if cancellation of the task was already requested atomic<bool> m_CancelRequested; }; /// Main class implementing functionality of pool of threads. /// /// @note This class can be safely used as a member of some other class or as /// a scoped variable; for more details see the destructor comments. /// @sa ~CThreadPool() class NCBI_XUTIL_EXPORT CThreadPool { public: /// Constructor /// @param queue_size /// Maximum number of tasks waiting in the queue. If 0 then tasks /// cannot be queued and are added only when there are threads /// to process them. If greater than 0 and there will be attempt to add /// new task over this maximum then method AddTask() will wait for the /// given timeout for some empty space in the queue. /// @param max_threads /// Maximum number of threads allowed to be launched in the pool. /// Value cannot be less than min_threads or equal to 0. /// @param min_threads /// Minimum number of threads that have to be launched even /// if there are no tasks added. Value cannot be greater /// than max_threads. /// @param threads_mode /// Running mode of all threads in thread pool. Values fRunDetached and /// fRunAllowST are ignored. /// /// @sa AddTask() CThreadPool(unsigned int queue_size, unsigned int max_threads, unsigned int min_threads = 2, CThread::TRunMode threads_mode = CThread::fRunDefault); /// Add task to the pool for execution. /// @note /// The pool will acquire a CRef ownership to the task which it will /// hold until the task goes out of the pool (when finished) /// @param task /// Task to add /// @param timeout /// Time to wait if the tasks queue has reached its maximum length. /// If NULL, then wait indefinitely. void AddTask(CThreadPool_Task* task, const CTimeSpan* timeout = NULL); /// Request to cancel the task and remove it from queue if it is there. /// /// @sa CThreadPool_Task::RequestToCancel() void CancelTask(CThreadPool_Task* task); /// Destructor -- will wait for all its threads to finish with the timeout /// set by CThreadPool::SetDestroyTimeout(); current default is 10 seconds. /// If this timeout is not enough for threads to terminate CThreadPool /// will be destroyed but all threads will finish later without any /// "segmentation fault" errors because CThreadPool_Impl object will remain /// in memory until all its threads finish. /// @note /// If this CThreadPool object is destroyed at the end of the /// application and it will fail to finish all threads in destructor, /// then all memory allocated by CThreadPool_Impl can be shown as leakage /// in the memory checking tools (like Valgrind). To avoid these /// leakages or for some other reasons to make sure that ThreadPool /// finishes all its operations before the destructor you can call /// method Abort() at any place in your application. /// @sa Abort(), SetDestroyTimeout() virtual ~CThreadPool(void); // // Extended functionality: // /// Abort all functions of the pool -- cancel all queued tasks, send /// cancellation notifications to all currently executing tasks. /// @note /// The already executing tasks may take some time to actually finish /// execution -- see CancelTasks() and /// CThreadPool_Task::RequestToCancel()for details /// @attention /// This call renders the pool unusable in the sense that you must not /// call any of its methods after that! /// @param timeout /// Maximum time to wait for the termination of the pooled threads. /// If this time is not enough for all threads to terminate, the Abort() /// method returns, and all threads are terminated in the background. /// If NULL, then wait for as long as it takes for all threads to finish. void Abort(const CTimeSpan* timeout = NULL); /// Constructor with custom controller /// @param queue_size /// Maximum number of tasks waiting in the queue. If 0 then tasks /// cannot be queued and are added only when there are threads /// to process them. If greater than 0 and there will be attempt to add /// new task over this maximum then method AddTask() will wait for the /// given timeout for some empty space in the queue. /// @param controller /// Custom controller object that will be responsible for number /// of threads in the pool, when new threads have to be launched and /// old and unused threads have to be finished. Default controller /// implementation (set for the pool in case of using other /// constructor) is CThreadPool_Controller_PID class. /// @param threads_mode /// Running mode of all threads in thread pool. Values fRunDetached and /// fRunAllowST are ignored. CThreadPool(unsigned int queue_size, CThreadPool_Controller* controller, CThread::TRunMode threads_mode = CThread::fRunDefault); /// Set timeout to wait for all threads to finish before the pool /// should be able to destroy. /// Default value is 10 seconds /// @note /// This method is meant to be called very rarely. Because of that it is /// implemented in non-threadsafe manner. While this method is working /// it is not allowed to call itself or GetDestroyTimeout() in other /// threads. void SetDestroyTimeout(const CTimeSpan& timeout); /// Get timeout to wait for all threads to finish before the pool /// will be able to destroy. /// @note /// This method is meant to be called very rarely. Because of that it is /// implemented in non-threadsafe manner. While this method is working /// (and after that if timeout is stored in some variable as reference) /// it is not allowed to call SetDestroyTimeout() in other threads. const CTimeSpan& GetDestroyTimeout(void) const; /// Binary flags indicating different possible options in what environment /// the pool will execute exclusive task /// /// @sa TExclusiveFlags, RequestExclusiveExecution() enum EExclusiveFlags { /// Do not allow to add new tasks to the pool during /// exclusive task execution fDoNotAllowNewTasks = (1 << 0), /// Finish all threads currently running in the pool fFlushThreads = (1 << 1), /// Cancel all currently executing tasks fCancelExecutingTasks = (1 << 2), /// Cancel all tasks waiting in the queue and not yet executing fCancelQueuedTasks = (1 << 3), /// Execute all tasks waiting in the queue before execution /// of exclusive task fExecuteQueuedTasks = (1 << 4) }; /// Type of bit-masked combination of several values from EExclusiveFlags /// /// @sa EExclusiveFlags, RequestExclusiveExecution() typedef unsigned int TExclusiveFlags; /// Add the task for exclusive execution in the pool /// By default the pool suspends all new and queued tasks processing, /// finishes execution of all currently executing tasks and then executes /// exclusive task in special thread devoted to this work. The environment /// in which exclusive task executes can be modified by flags parameter. /// This method does not wait for exclusive execution, it is just adds /// the task to exclusive queue and starts the process of exclusive /// environment preparation. If next exclusive task will be added before /// preveous finishes (or even starts) its execution then they will be /// executed consequently each in its own exclusive environment (if flags /// parameter for them is different). /// /// @param task /// Task to execute exclusively /// @param flags /// Parameters of the exclusive environment void RequestExclusiveExecution(CThreadPool_Task* task, TExclusiveFlags flags = 0); /// Cancel the selected groups of tasks in the pool. /// /// @param tasks_group /// Must be a combination of fCancelQueuedTasks and/or /// fCancelExecutingTasks. Cannot be zero. void CancelTasks(TExclusiveFlags tasks_group); /// When to start new threads after flushing old ones /// /// @sa FlushThreads() enum EFlushType { eStartImmediately, ///< New threads can be started immediately eWaitToFinish ///< New threads can be started only when all old ///< threads finished their execution }; /// Finish all current threads and replace them with new ones /// @param flush_type /// If new threads can be launched immediately after call to this /// method or only after all "old" threads have been finished. void FlushThreads(EFlushType flush_type); /// Get total number of threads currently running in pool unsigned int GetThreadsCount(void) const; /// Get the number of tasks currently waiting in queue unsigned int GetQueuedTasksCount(void) const; /// Get the number of currently executing tasks unsigned int GetExecutingTasksCount(void) const; /// Does method Abort() was already called for this ThreadPool bool IsAborted(void) const; protected: /// Create new thread for the pool virtual CThreadPool_Thread* CreateThread(void); /// Get the mutex that protects all changes in the pool CMutex& GetMainPoolMutex(void); private: friend class CThreadPool_Impl; /// Prohibit copying and assigning CThreadPool(const CThreadPool&); CThreadPool& operator= (const CThreadPool&); /// Actual implementation of the pool CThreadPool_Impl* m_Impl; }; /// Base class for a thread running inside CThreadPool and executing tasks. /// /// Class can be inherited if it doesn't fit your specific needs. But to use /// inherited class you also will need to inherit CThreadPool and override /// its method CreateThread(). class NCBI_XUTIL_EXPORT CThreadPool_Thread : public CThread { public: /// Get the task currently executing in the thread CRef<CThreadPool_Task> GetCurrentTask(void) const; protected: /// Construct and attach to the pool CThreadPool_Thread(CThreadPool* pool); /// Destructor virtual ~CThreadPool_Thread(void); /// Init this thread. It is called at beginning of Main() virtual void Initialize(void); /// Clean up. It is called by OnExit() virtual void Finalize(void); /// Get the thread pool in which this thread is running CThreadPool* GetPool(void) const; private: friend class CThreadPool_ThreadImpl; /// Prohibit copying and assigning CThreadPool_Thread(const CThreadPool_Thread&); CThreadPool_Thread& operator= (const CThreadPool_Thread&); /// To prevent overriding - main thread function virtual void* Main(void); /// To prevent overriding - do cleanup after exiting from thread virtual void OnExit(void); /// Actual implementation of the thread CThreadPool_ThreadImpl* m_Impl; }; /// Abstract class for controlling the number of threads in pool. /// Every time when something happens in the pool (new task accepted, task has /// started processing or has been processed, new threads started or some /// threads killed) method HandleEvent() of this class is called. It makes /// some common stuff and then calls OnEvent(). The algorithm in OnEvent() /// has to decide how many threads should be in the pool and call method /// SetThreadsCount() accordingly. For making your own algorithm /// you should inherit this class. You then can pass an instance of the class /// to the ThreadPool's constructor. /// Controller is strictly attached to the pool in the pool's constructor. /// One controller cannot track several ThreadPools and one ThreadPool cannot /// be tracked by several controllers. /// Implementation of this class is threadsafe, so all its parameters can be /// changed during ThreadPool operation. class NCBI_XUTIL_EXPORT CThreadPool_Controller : public CObject { public: /// Constructor /// @param max_threads /// Maximum number of threads in pool /// @param min_threads /// Minimum number of threads in pool CThreadPool_Controller(unsigned int max_threads, unsigned int min_threads); /// Set the minimum number of threads in pool void SetMinThreads(unsigned int min_threads); /// Get the minimum number of threads in pool unsigned int GetMinThreads(void) const; /// Set the maximum number of threads in pool void SetMaxThreads(unsigned int max_threads); /// Get the maximum number of threads in pool unsigned int GetMaxThreads(void) const; /// Events that can happen with ThreadPool enum EEvent { eSuspend, ///< ThreadPool is suspended for exclusive task execution eResume, ///< ThreadPool is resumed after exclusive task execution eOther ///< All other events (happen asynchronously, so cannot be ///< further distinguished) }; /// This method is called every time something happens in a pool, /// such as: new task added, task is started or finished execution, /// new threads started or some threads finished. /// It does the hardcoded must-do processing of the event, and also /// calls OnEvent() callback to run the controlling algorithm. /// Method ensures that OnEvent() always called protected with ThreadPool /// main mutex and that ThreadPool itself is not aborted or in suspended /// for exclusive execution state (except the eSuspend event). /// /// @sa OnEvent() void HandleEvent(EEvent event); /// Get maximum timeout for which calls to method HandleEvent() can be /// missing. Method HandleEvent() will be called after this timeout /// for sure if ThreadPool will not be aborted or in suspended state /// at this moment. virtual CTimeSpan GetSafeSleepTime(void) const; protected: /// Destructor. Have to be called only from CRef virtual ~CThreadPool_Controller(void); /// Main method for the implementation of controlling algorithm. /// Method should not implement any excessive calculations because it /// will be called guarded with main pool mutex and because of that /// it will block several important pool operations. /// @note /// Method will never be called recursively or concurrently in different /// threads (HandleEvent() will take care of this). /// /// @sa HandleEvent() virtual void OnEvent(EEvent event) = 0; /// Get pool to which this class is attached CThreadPool* GetPool(void) const; /// Get mutex which guards access to pool /// All work in controller should be based on the same mutex as in pool. /// So every time when you need to guard access to some members of derived /// class it is recommended to use this very mutex. But NB: it's assumed /// everywhere that this mutex is locked on the small periods of time. So /// be careful and implement the same pattern. CMutex& GetMainPoolMutex(CThreadPool* pool) const; /// Ensure that constraints of minimum and maximum count of threads in pool /// are met. Start new threads or finish overflow threads if needed. void EnsureLimits(void); /// Set number of threads in pool /// Adjust given number to conform to minimum and maximum threads count /// constraints if needed. void SetThreadsCount(unsigned int count); private: friend class CThreadPool_Impl; /// Prohibit copying and assigning CThreadPool_Controller(const CThreadPool_Controller&); CThreadPool_Controller& operator= (const CThreadPool_Controller&); /// Attach the controller to ThreadPool void x_AttachToPool(CThreadPool_Impl* pool); /// Detach the controller from pool when pool is aborted void x_DetachFromPool(void); /// ThreadPool to which this controller is attached atomic<CThreadPool_Impl*> m_Pool; /// Minimum number of threads in pool unsigned int m_MinThreads; /// Maximum number of threads in pool unsigned int m_MaxThreads; /// If controller is already inside HandleEvent() processing bool m_InHandleEvent; }; /// Exception class for all ThreadPool-related classes class NCBI_XUTIL_EXPORT CThreadPoolException : public CException { public: enum EErrCode { eControllerBusy, ///< attempt to create several ThreadPools with ///< the same controller eTaskBusy, ///< attempt to change task when it's already placed ///< into ThreadPool or to put task in ThreadPool ///< several times eProhibited, ///< attempt to do something when ThreadPool was ///< already aborted or to add task when it is ///< prohibited by flags of exclusive execution eInactive, ///< attempt to call active methods in ///< ThreadPool_Controller when it is not attached ///< to any ThreadPool eInvalid ///< attempt to operate task added in one ThreadPool ///< by means of methods of another ThreadPool or ///< invalid parameters in the constructor }; virtual const char* GetErrCodeString(void) const override; NCBI_EXCEPTION_DEFAULT(CThreadPoolException, CException); }; ////////////////////////////////////////////////////////////////////////// // All inline methods ////////////////////////////////////////////////////////////////////////// inline bool CThreadPool_Task::IsCancelRequested(void) const { return m_CancelRequested; } inline CThreadPool_Task::EStatus CThreadPool_Task::GetStatus(void) const { return m_Status; } inline bool CThreadPool_Task::IsFinished(void) const { return m_Status >= eCompleted; } inline unsigned int CThreadPool_Task::GetPriority(void) const { return m_Priority; } inline unsigned int CThreadPool_Controller::GetMinThreads(void) const { return m_MinThreads; } inline unsigned int CThreadPool_Controller::GetMaxThreads(void) const { return m_MaxThreads; } END_NCBI_SCOPE /* @} */ #endif /* UTIL__THREAD_POOL__HPP */
0001 0002 0003 0004 0005 0006 0007 0008 0009 0010 0011 0012 0013 0014 0015 0016 0017 0018 0019 0020 0021 0022 0023 0024 0025 0026 0027 0028 0029 0030 0031 0032 0033 0034 0035 0036 0037 0038 0039 0040 0041 0042 0043 0044 0045 0046 0047 0048 0049 0050 0051 0052 0053 0054 0055 0056 0057 0058 0059 0060 0061 0062 0063 0064 0065 0066 0067 0068 0069 0070 0071 0072 0073 0074 0075 0076 0077 0078 0079 0080 0081 0082 0083 0084 0085 0086 0087 0088 0089 0090 0091 0092 0093 0094 0095 0096 0097 0098 0099 0100 0101 0102 0103 0104 0105 0106 0107 0108 0109 0110 0111 0112 0113 0114 0115 0116 0117 0118 0119 0120 0121 0122 0123 0124 0125 0126 0127 0128 0129 0130 0131 0132 0133 0134 0135 0136 0137 0138 0139 0140 0141 0142 0143 0144 0145 0146 0147 0148 0149 0150 0151 0152 0153 0154 0155 0156 0157 0158 0159 0160 0161 0162 0163 0164 0165 0166 0167 0168 0169 0170 0171 0172 0173 0174 0175 0176 0177 0178 0179 0180 0181 0182 0183 0184 0185 0186 0187 0188 0189 0190 0191 0192 0193 0194 0195 0196 0197 0198 0199 0200 0201 0202 0203 0204 0205 0206 0207 0208 0209 0210 0211 0212 0213 0214 0215 0216 0217 0218 0219 0220 0221 0222 0223 0224 0225 0226 0227 0228 0229 0230 0231 0232 0233 0234 0235 0236 0237 0238 0239 0240 0241 0242 0243 0244 0245 0246 0247 0248 0249 0250 0251 0252 0253 0254 0255 0256 0257 0258 0259 0260 0261 0262 0263 0264 0265 0266 0267 0268 0269 0270 0271 0272 0273 0274 0275 0276 0277 0278 0279 0280 0281 0282 0283 0284 0285 0286 0287 0288 0289 0290 0291 0292 0293 0294 0295 0296 0297 0298 0299 0300 0301 0302 0303 0304 0305 0306 0307 0308 0309 0310 0311 0312 0313 0314 0315 0316 0317 0318 0319 0320 0321 0322 0323 0324 0325 0326 0327 0328 0329 0330 0331 0332 0333 0334 0335 0336 0337 0338 0339 0340 0341 0342 0343 0344 0345 0346 0347 0348 0349 0350 0351 0352 0353 0354 0355 0356 0357 0358 0359 0360 0361 0362 0363 0364 0365 0366 0367 0368 0369 0370 0371 0372 0373 0374 0375 0376 0377 0378 0379 0380 0381 0382 0383 0384 0385 0386 0387 0388 0389 0390 0391 0392 0393 0394 0395 0396 0397 0398 0399 0400 0401 0402 0403 0404 0405 0406 0407 0408 0409 0410 0411 0412 0413 0414 0415 0416 0417 0418 0419 0420 0421 0422 0423 0424 0425 0426 0427 0428 0429 0430 0431 0432 0433 0434 0435 0436 0437 0438 0439 0440 0441 0442 0443 0444 0445 0446 0447 0448 0449 0450 0451 0452 0453 0454 0455 0456 0457 0458 0459 0460 0461 0462 0463 0464 0465 0466 0467 0468 0469 0470 0471 0472 0473 0474 0475 0476 0477 0478 0479 0480 0481 0482 0483 0484 0485 0486 0487 0488 0489 0490 0491 0492 0493 0494 0495 0496 0497 0498 0499 0500 0501 0502 0503 0504 0505 0506 0507 0508 0509 0510 0511 0512 0513 0514 0515 0516 0517 0518 0519 0520 0521 0522 0523 0524 0525 0526 0527 0528 0529 0530 0531 0532 0533 0534 0535 0536 0537 0538 0539 0540 0541 0542 0543 0544 0545 0546 0547 0548 0549 0550 0551 0552 0553 0554 0555 0556 0557 0558 0559 0560 0561 0562 0563 0564 0565 0566 0567 0568 0569 0570 0571 0572 0573 0574 0575 0576 0577 0578 0579 0580 0581 0582 0583 0584 0585 0586 0587 0588 0589 0590 0591 0592 0593 0594 0595 0596 0597 0598 0599 0600 0601 0602 0603 0604 0605 0606 0607 0608 0609 0610 0611 0612 0613 0614 0615 0616 0617 0618 0619 0620 0621 0622 0623 0624 0625 0626 0627 0628 0629 0630 0631 0632 0633 0634 0635 0636 0637 0638 0639 0640 0641 0642 0643 0644 0645 0646 0647 0648 0649 0650 0651 0652 0653 0654 0655 0656 0657 0658 0659 0660 0661 0662 0663 0664 0665 0666 0667 0668 0669 0670 0671 0672 0673 0674 0675