LCOV - code coverage report
Current view: top level - src - Worker.cpp (source / functions) Hit Total Coverage
Test: lcov.info Lines: 105 113 92.9 %
Date: 2025-04-27 01:14:20 Functions: 19 21 90.5 %
Branches: 0 0 -

           Branch data     Line data    Source code
       1                 :            : /*
       2                 :            : ** Copyright (C) 2020 CERN
       3                 :            : **
       4                 :            : ** This software is provided 'as-is', without any express or implied
       5                 :            : ** warranty.  In no event will the authors be held liable for any damages
       6                 :            : ** arising from the use of this software.
       7                 :            : **
       8                 :            : ** Permission is granted to anyone to use this software for any purpose,
       9                 :            : ** including commercial applications, and to alter it and redistribute it
      10                 :            : ** freely, subject to the following restrictions:
      11                 :            : **
      12                 :            : ** 1. The origin of this software must not be misrepresented; you must not
      13                 :            : **    claim that you wrote the original software. If you use this software
      14                 :            : **    in a product, an acknowledgment in the product documentation would be
      15                 :            : **    appreciated but is not required.
      16                 :            : ** 2. Altered source versions must be plainly marked as such, and must not be
      17                 :            : **    misrepresented as being the original software.
      18                 :            : ** 3. This notice may not be removed or altered from any source distribution.
      19                 :            : **
      20                 :            : ** Created on: 2020-02-26T10:01:02+01:00
      21                 :            : **     Author: Sylvain Fargier <sfargier> <sylvain.fargier@cern.ch>
      22                 :            : **
      23                 :            : */
      24                 :            : 
      25                 :            : #include "Worker.hpp"
      26                 :            : 
      27                 :            : #include <cerrno>
      28                 :            : 
      29                 :            : #include <logger/Logger.hpp>
      30                 :            : 
      31                 :            : #include "local-config.h"
      32                 :            : 
      33                 :            : #ifdef HAVE_PTHREAD_NAME
      34                 :            : #include <pthread.h>
      35                 :            : #endif
      36                 :            : 
      37                 :            : using namespace ccut;
      38                 :            : using namespace logger;
      39                 :            : 
      40                 :          6 : Worker::Worker(const std::string &name,
      41                 :            :                std::size_t numThreads,
      42                 :          6 :                std::size_t maxSize) :
      43                 :          6 :     m_queue(name, maxSize)
      44                 :            : {
      45                 :          6 :     m_threads.reserve(numThreads);
      46                 :          6 :     start();
      47                 :          6 : }
      48                 :            : 
      49                 :          6 : Worker::~Worker()
      50                 :            : {
      51                 :          6 :     stop();
      52                 :          6 : }
      53                 :            : 
      54                 :       1002 : const Worker::SharedTask &Worker::post(const Worker::SharedTask &task)
      55                 :            : {
      56                 :       1002 :     m_queue.post(task);
      57                 :       1002 :     return task;
      58                 :            : }
      59                 :            : 
      60                 :        501 : Worker::SharedTask Worker::post(const std::function<void()> &funTask)
      61                 :            : {
      62                 :        501 :     Worker::SharedTask task(new FunTask(funTask));
      63                 :        501 :     m_queue.post(task);
      64                 :        501 :     return task;
      65                 :          0 : }
      66                 :            : 
      67                 :          6 : void Worker::start()
      68                 :            : {
      69                 :          6 :     if (!m_threads.empty())
      70                 :          0 :         return;
      71                 :            : 
      72                 :         32 :     for (std::size_t i = 0; i < m_threads.capacity(); ++i)
      73                 :            :     {
      74                 :         26 :         m_threads.push_back(std::unique_ptr<std::thread>(
      75                 :         52 :             new std::thread(&Worker::thread_func, this)));
      76                 :            :     }
      77                 :            : }
      78                 :            : 
      79                 :          6 : void Worker::stop()
      80                 :            : {
      81                 :          6 :     m_queue.clear(true);
      82                 :            : 
      83                 :         32 :     for (std::size_t i = 0; i < m_threads.size(); ++i)
      84                 :            :     {
      85                 :         26 :         m_threads[i]->join();
      86                 :            :     }
      87                 :          6 :     m_threads.clear();
      88                 :          6 : }
      89                 :            : 
      90                 :          0 : const std::string &Worker::name() const
      91                 :            : {
      92                 :          0 :     return m_queue.getQueueName();
      93                 :            : }
      94                 :            : 
      95                 :       1349 : void Worker::thread_func()
      96                 :            : {
      97                 :            : #ifdef HAVE_PTHREAD_NAME
      98                 :            :     const std::string threadName = name().substr(0, 16);
      99                 :            :     pthread_setname_np(pthread_self(), threadName.c_str());
     100                 :            : #endif
     101                 :            : 
     102                 :            :     while (true)
     103                 :            :     {
     104                 :            :         try
     105                 :            :         {
     106                 :       1349 :             Worker::SharedTask task = m_queue.pop();
     107                 :            : 
     108                 :       1325 :             if (task->setState(Task::RUNNING))
     109                 :            :             {
     110                 :       1007 :                 task->run();
     111                 :            :             }
     112                 :       1325 :             task->setState(Task::FINISHED);
     113                 :       1324 :         }
     114                 :         26 :         catch (ccut::Exception &ex)
     115                 :            :         {
     116                 :         26 :             if (ex.getErrorCode() == ErrorCode::Interrupted)
     117                 :            :             {
     118                 :            :                 /* time to die */
     119                 :         52 :                 return;
     120                 :            :             }
     121                 :          0 :             error("ccut:worker") << ex.what();
     122                 :         26 :         }
     123                 :       1323 :     }
     124                 :            : }
     125                 :            : 
     126                 :       1504 : Worker::Task::Task() : m_state(Worker::Task::READY), m_error(Worker::Task::OK)
     127                 :       1504 : {}
     128                 :            : 
     129                 :       1504 : Worker::Task::~Task()
     130                 :            : {
     131                 :       1504 :     if (m_state != Task::FINISHED)
     132                 :            :     {
     133                 :          0 :         abort();
     134                 :            :     }
     135                 :       1504 : }
     136                 :            : 
     137                 :          4 : Worker::Task::State Worker::Task::state() const
     138                 :            : {
     139                 :          4 :     const std::lock_guard<std::mutex> lock(m_lock);
     140                 :          4 :     return m_state;
     141                 :          4 : }
     142                 :            : 
     143                 :          2 : Worker::Task::Error Worker::Task::error() const
     144                 :            : {
     145                 :          2 :     const std::lock_guard<std::mutex> lock(m_lock);
     146                 :          2 :     return m_error;
     147                 :          2 : }
     148                 :            : 
     149                 :          1 : const std::string &Worker::Task::errorString() const
     150                 :            : {
     151                 :          1 :     const std::lock_guard<std::mutex> lock(m_lock);
     152                 :          1 :     return m_errorString;
     153                 :          1 : }
     154                 :            : 
     155                 :       1502 : bool Worker::Task::wait(unsigned long msecs) const
     156                 :            : {
     157                 :       1502 :     return wait(std::chrono::milliseconds(msecs));
     158                 :            : }
     159                 :            : 
     160                 :       1502 : bool Worker::Task::wait(const std::chrono::milliseconds &msecs) const
     161                 :            : {
     162                 :       1502 :     std::unique_lock<std::mutex> lock(m_lock);
     163                 :            : 
     164                 :       1502 :     if (m_state == Task::FINISHED)
     165                 :       1496 :         return true;
     166                 :            : 
     167                 :          6 :     if (msecs.count())
     168                 :            :     {
     169                 :          6 :         m_cond.wait_for(lock, msecs);
     170                 :            :     }
     171                 :            :     else
     172                 :            :     {
     173                 :          0 :         m_cond.wait(lock);
     174                 :            :     }
     175                 :          6 :     return (m_state == Task::FINISHED);
     176                 :       1502 : }
     177                 :            : 
     178                 :        502 : bool Worker::Task::abort()
     179                 :            : {
     180                 :        502 :     std::unique_lock<std::mutex> lock(m_lock);
     181                 :        502 :     if (m_state != Task::RUNNING)
     182                 :            :     {
     183                 :        497 :         m_error = Task::ABORTED;
     184                 :        497 :         m_state = Task::FINISHED;
     185                 :        497 :         lock.unlock();
     186                 :        497 :         m_cond.notify_all();
     187                 :        497 :         return true;
     188                 :            :     }
     189                 :          5 :     return false;
     190                 :        502 : }
     191                 :            : 
     192                 :       2645 : bool Worker::Task::setState(State state)
     193                 :            : {
     194                 :       2645 :     std::unique_lock<std::mutex> lock(m_lock);
     195                 :       2646 :     switch (state)
     196                 :            :     {
     197                 :          0 :     case Task::READY: return false;
     198                 :       1323 :     case Task::RUNNING:
     199                 :       1323 :         if (m_state != Task::READY)
     200                 :        317 :             return false;
     201                 :       1006 :         m_state = Task::RUNNING;
     202                 :       1006 :         break;
     203                 :       1325 :     case Task::FINISHED:
     204                 :       1325 :         m_state = Task::FINISHED;
     205                 :       1325 :         lock.unlock();
     206                 :       1325 :         m_cond.notify_all();
     207                 :       1325 :         break;
     208                 :            :     }
     209                 :            : 
     210                 :       2329 :     return true;
     211                 :       2646 : }
     212                 :            : 
     213                 :          1 : void Worker::Task::setError(const std::string &error)
     214                 :            : {
     215                 :          1 :     std::unique_lock<std::mutex> lock(m_lock);
     216                 :          1 :     m_state = Task::FINISHED;
     217                 :          1 :     m_errorString = error;
     218                 :          1 :     m_error = Task::FAILED;
     219                 :          1 :     lock.unlock();
     220                 :          1 :     m_cond.notify_all();
     221                 :          1 : }
     222                 :            : 
     223                 :        501 : Worker::FunTask::FunTask(const std::function<void()> &fun) : m_fun(fun) {}
     224                 :            : 
     225                 :        500 : void Worker::FunTask::run()
     226                 :            : {
     227                 :            :     try
     228                 :            :     {
     229                 :        500 :         m_fun();
     230                 :            :     }
     231                 :          1 :     catch (const std::string &error)
     232                 :            :     {
     233                 :          1 :         setError(error);
     234                 :          1 :     }
     235                 :        501 : }

Generated by: LCOV version 1.14