LCOV - code coverage report
Current view: top level - corosio/native/detail/epoll - epoll_scheduler.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 82.7 % 150 124 26
Test Date: 2026-03-31 21:45:45 Functions: 90.9 % 11 10 1

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_EPOLL
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : 
      20                 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
      21                 : 
      22                 : #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
      23                 : #include <boost/corosio/detail/timer_service.hpp>
      24                 : #include <boost/corosio/native/detail/make_err.hpp>
      25                 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
      26                 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
      27                 : #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
      28                 : #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
      29                 : 
      30                 : #include <boost/corosio/detail/except.hpp>
      31                 : 
      32                 : #include <atomic>
      33                 : #include <chrono>
      34                 : #include <cstdint>
      35                 : #include <mutex>
      36                 : #include <vector>
      37                 : 
      38                 : #include <errno.h>
      39                 : #include <sys/epoll.h>
      40                 : #include <sys/eventfd.h>
      41                 : #include <sys/timerfd.h>
      42                 : #include <unistd.h>
      43                 : 
      44                 : namespace boost::corosio::detail {
      45                 : 
      46                 : struct epoll_op;
      47                 : struct descriptor_state;
      48                 : 
      49                 : /** Linux scheduler using epoll for I/O multiplexing.
      50                 : 
      51                 :     This scheduler implements the scheduler interface using Linux epoll
      52                 :     for efficient I/O event notification. It uses a single reactor model
      53                 :     where one thread runs epoll_wait while other threads
      54                 :     wait on a condition variable for handler work. This design provides:
      55                 : 
      56                 :     - Handler parallelism: N posted handlers can execute on N threads
      57                 :     - No thundering herd: condition_variable wakes exactly one thread
      58                 :     - IOCP parity: Behavior matches Windows I/O completion port semantics
      59                 : 
      60                 :     When threads call run(), they first try to execute queued handlers.
      61                 :     If the queue is empty and no reactor is running, one thread becomes
      62                 :     the reactor and runs epoll_wait. Other threads wait on a condition
      63                 :     variable until handlers are available.
      64                 : 
      65                 :     @par Thread Safety
      66                 :     All public member functions are thread-safe.
      67                 : */
      68                 : class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
      69                 : {
      70                 : public:
      71                 :     /** Construct the scheduler.
      72                 : 
      73                 :         Creates an epoll instance, eventfd for reactor interruption,
      74                 :         and timerfd for kernel-managed timer expiry.
      75                 : 
      76                 :         @param ctx Reference to the owning execution_context.
      77                 :         @param concurrency_hint Hint for expected thread count (unused).
      78                 :     */
      79                 :     epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
      80                 : 
      81                 :     /// Destroy the scheduler.
      82                 :     ~epoll_scheduler() override;
      83                 : 
      84                 :     epoll_scheduler(epoll_scheduler const&)            = delete;
      85                 :     epoll_scheduler& operator=(epoll_scheduler const&) = delete;
      86                 : 
      87                 :     /// Shut down the scheduler, draining pending operations.
      88                 :     void shutdown() override;
      89                 : 
      90                 :     /// Apply runtime configuration, resizing the event buffer.
      91                 :     void configure_reactor(
      92                 :         unsigned max_events,
      93                 :         unsigned budget_init,
      94                 :         unsigned budget_max,
      95                 :         unsigned unassisted) override;
      96                 : 
      97                 :     /** Return the epoll file descriptor.
      98                 : 
      99                 :         Used by socket services to register file descriptors
     100                 :         for I/O event notification.
     101                 : 
     102                 :         @return The epoll file descriptor.
     103                 :     */
     104                 :     int epoll_fd() const noexcept
     105                 :     {
     106                 :         return epoll_fd_;
     107                 :     }
     108                 : 
     109                 :     /** Register a descriptor for persistent monitoring.
     110                 : 
     111                 :         The fd is registered once and stays registered until explicitly
     112                 :         deregistered. Events are dispatched via descriptor_state which
     113                 :         tracks pending read/write/connect operations.
     114                 : 
     115                 :         @param fd The file descriptor to register.
     116                 :         @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
     117                 :     */
     118                 :     void register_descriptor(int fd, descriptor_state* desc) const;
     119                 : 
     120                 :     /** Deregister a persistently registered descriptor.
     121                 : 
     122                 :         @param fd The file descriptor to deregister.
     123                 :     */
     124                 :     void deregister_descriptor(int fd) const;
     125                 : 
     126                 : private:
     127                 :     void
     128                 :     run_task(lock_type& lock, context_type* ctx,
     129                 :         long timeout_us) override;
     130                 :     void interrupt_reactor() const override;
     131                 :     void update_timerfd() const;
     132                 : 
     133                 :     int epoll_fd_;
     134                 :     int event_fd_;
     135                 :     int timer_fd_;
     136                 : 
     137                 :     // Edge-triggered eventfd state
     138                 :     mutable std::atomic<bool> eventfd_armed_{false};
     139                 : 
     140                 :     // Set when the earliest timer changes; flushed before epoll_wait
     141                 :     mutable std::atomic<bool> timerfd_stale_{false};
     142                 : 
     143                 :     // Event buffer sized from max_events_per_poll_ (set at construction,
     144                 :     // resized by configure_reactor via io_context_options).
     145                 :     std::vector<epoll_event> event_buffer_;
     146                 : };
     147                 : 
     148 HIT         322 : inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
     149             322 :     : epoll_fd_(-1)
     150             322 :     , event_fd_(-1)
     151             322 :     , timer_fd_(-1)
     152             644 :     , event_buffer_(max_events_per_poll_)
     153                 : {
     154             322 :     epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
     155             322 :     if (epoll_fd_ < 0)
     156 MIS           0 :         detail::throw_system_error(make_err(errno), "epoll_create1");
     157                 : 
     158 HIT         322 :     event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
     159             322 :     if (event_fd_ < 0)
     160                 :     {
     161 MIS           0 :         int errn = errno;
     162               0 :         ::close(epoll_fd_);
     163               0 :         detail::throw_system_error(make_err(errn), "eventfd");
     164                 :     }
     165                 : 
     166 HIT         322 :     timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
     167             322 :     if (timer_fd_ < 0)
     168                 :     {
     169 MIS           0 :         int errn = errno;
     170               0 :         ::close(event_fd_);
     171               0 :         ::close(epoll_fd_);
     172               0 :         detail::throw_system_error(make_err(errn), "timerfd_create");
     173                 :     }
     174                 : 
     175 HIT         322 :     epoll_event ev{};
     176             322 :     ev.events   = EPOLLIN | EPOLLET;
     177             322 :     ev.data.ptr = nullptr;
     178             322 :     if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
     179                 :     {
     180 MIS           0 :         int errn = errno;
     181               0 :         ::close(timer_fd_);
     182               0 :         ::close(event_fd_);
     183               0 :         ::close(epoll_fd_);
     184               0 :         detail::throw_system_error(make_err(errn), "epoll_ctl");
     185                 :     }
     186                 : 
     187 HIT         322 :     epoll_event timer_ev{};
     188             322 :     timer_ev.events   = EPOLLIN | EPOLLERR;
     189             322 :     timer_ev.data.ptr = &timer_fd_;
     190             322 :     if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
     191                 :     {
     192 MIS           0 :         int errn = errno;
     193               0 :         ::close(timer_fd_);
     194               0 :         ::close(event_fd_);
     195               0 :         ::close(epoll_fd_);
     196               0 :         detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
     197                 :     }
     198                 : 
     199 HIT         322 :     timer_svc_ = &get_timer_service(ctx, *this);
     200             322 :     timer_svc_->set_on_earliest_changed(
     201            4927 :         timer_service::callback(this, [](void* p) {
     202            4605 :             auto* self = static_cast<epoll_scheduler*>(p);
     203            4605 :             self->timerfd_stale_.store(true, std::memory_order_release);
     204            4605 :             self->interrupt_reactor();
     205            4605 :         }));
     206                 : 
     207             322 :     get_resolver_service(ctx, *this);
     208             322 :     get_signal_service(ctx, *this);
     209             322 :     get_stream_file_service(ctx, *this);
     210             322 :     get_random_access_file_service(ctx, *this);
     211                 : 
     212             322 :     completed_ops_.push(&task_op_);
     213             322 : }
     214                 : 
     215             644 : inline epoll_scheduler::~epoll_scheduler()
     216                 : {
     217             322 :     if (timer_fd_ >= 0)
     218             322 :         ::close(timer_fd_);
     219             322 :     if (event_fd_ >= 0)
     220             322 :         ::close(event_fd_);
     221             322 :     if (epoll_fd_ >= 0)
     222             322 :         ::close(epoll_fd_);
     223             644 : }
     224                 : 
     225                 : inline void
     226             322 : epoll_scheduler::shutdown()
     227                 : {
     228             322 :     shutdown_drain();
     229                 : 
     230             322 :     if (event_fd_ >= 0)
     231             322 :         interrupt_reactor();
     232             322 : }
     233                 : 
     234                 : inline void
     235 MIS           0 : epoll_scheduler::configure_reactor(
     236                 :     unsigned max_events,
     237                 :     unsigned budget_init,
     238                 :     unsigned budget_max,
     239                 :     unsigned unassisted)
     240                 : {
     241               0 :     reactor_scheduler_base::configure_reactor(
     242                 :         max_events, budget_init, budget_max, unassisted);
     243               0 :     event_buffer_.resize(max_events_per_poll_);
     244               0 : }
     245                 : 
     246                 : inline void
     247 HIT        8877 : epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
     248                 : {
     249            8877 :     epoll_event ev{};
     250            8877 :     ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
     251            8877 :     ev.data.ptr = desc;
     252                 : 
     253            8877 :     if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
     254 MIS           0 :         detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
     255                 : 
     256 HIT        8877 :     desc->registered_events = ev.events;
     257            8877 :     desc->fd                = fd;
     258            8877 :     desc->scheduler_        = this;
     259            8877 :     desc->mutex.set_enabled(!single_threaded_);
     260            8877 :     desc->ready_events_.store(0, std::memory_order_relaxed);
     261                 : 
     262            8877 :     conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
     263            8877 :     desc->impl_ref_.reset();
     264            8877 :     desc->read_ready  = false;
     265            8877 :     desc->write_ready = false;
     266            8877 : }
     267                 : 
     268                 : inline void
     269            8877 : epoll_scheduler::deregister_descriptor(int fd) const
     270                 : {
     271            8877 :     ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
     272            8877 : }
     273                 : 
     274                 : inline void
     275            5218 : epoll_scheduler::interrupt_reactor() const
     276                 : {
     277            5218 :     bool expected = false;
     278            5218 :     if (eventfd_armed_.compare_exchange_strong(
     279                 :             expected, true, std::memory_order_release,
     280                 :             std::memory_order_relaxed))
     281                 :     {
     282            5007 :         std::uint64_t val       = 1;
     283            5007 :         [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
     284                 :     }
     285            5218 : }
     286                 : 
     287                 : inline void
     288            9170 : epoll_scheduler::update_timerfd() const
     289                 : {
     290            9170 :     auto nearest = timer_svc_->nearest_expiry();
     291                 : 
     292            9170 :     itimerspec ts{};
     293            9170 :     int flags = 0;
     294                 : 
     295            9170 :     if (nearest == timer_service::time_point::max())
     296                 :     {
     297                 :         // No timers — disarm by setting to 0 (relative)
     298                 :     }
     299                 :     else
     300                 :     {
     301            9117 :         auto now = std::chrono::steady_clock::now();
     302            9117 :         if (nearest <= now)
     303                 :         {
     304                 :             // Use 1ns instead of 0 — zero disarms the timerfd
     305             360 :             ts.it_value.tv_nsec = 1;
     306                 :         }
     307                 :         else
     308                 :         {
     309            8757 :             auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
     310            8757 :                             nearest - now)
     311            8757 :                             .count();
     312            8757 :             ts.it_value.tv_sec  = nsec / 1000000000;
     313            8757 :             ts.it_value.tv_nsec = nsec % 1000000000;
     314            8757 :             if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
     315 MIS           0 :                 ts.it_value.tv_nsec = 1;
     316                 :         }
     317                 :     }
     318                 : 
     319 HIT        9170 :     if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
     320 MIS           0 :         detail::throw_system_error(make_err(errno), "timerfd_settime");
     321 HIT        9170 : }
     322                 : 
     323                 : inline void
     324           39755 : epoll_scheduler::run_task(
     325                 :     lock_type& lock, context_type* ctx, long timeout_us)
     326                 : {
     327                 :     int timeout_ms;
     328           39755 :     if (task_interrupted_)
     329           26815 :         timeout_ms = 0;
     330           12940 :     else if (timeout_us < 0)
     331           12932 :         timeout_ms = -1;
     332                 :     else
     333               8 :         timeout_ms = static_cast<int>((timeout_us + 999) / 1000);
     334                 : 
     335           39755 :     if (lock.owns_lock())
     336           12940 :         lock.unlock();
     337                 : 
     338           39755 :     task_cleanup on_exit{this, &lock, ctx};
     339                 : 
     340                 :     // Flush deferred timerfd programming before blocking
     341           39755 :     if (timerfd_stale_.exchange(false, std::memory_order_acquire))
     342            4583 :         update_timerfd();
     343                 : 
     344           39755 :     int nfds = ::epoll_wait(
     345                 :         epoll_fd_, event_buffer_.data(),
     346           39755 :         static_cast<int>(event_buffer_.size()), timeout_ms);
     347                 : 
     348           39755 :     if (nfds < 0 && errno != EINTR)
     349 MIS           0 :         detail::throw_system_error(make_err(errno), "epoll_wait");
     350                 : 
     351 HIT       39755 :     bool check_timers = false;
     352           39755 :     op_queue local_ops;
     353                 : 
     354           90001 :     for (int i = 0; i < nfds; ++i)
     355                 :     {
     356           50246 :         if (event_buffer_[i].data.ptr == nullptr)
     357                 :         {
     358                 :             std::uint64_t val;
     359                 :             // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
     360            4685 :             [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
     361            4685 :             eventfd_armed_.store(false, std::memory_order_relaxed);
     362            4685 :             continue;
     363            4685 :         }
     364                 : 
     365           45561 :         if (event_buffer_[i].data.ptr == &timer_fd_)
     366                 :         {
     367                 :             std::uint64_t expirations;
     368                 :             // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
     369                 :             [[maybe_unused]] auto r =
     370            4587 :                 ::read(timer_fd_, &expirations, sizeof(expirations));
     371            4587 :             check_timers = true;
     372            4587 :             continue;
     373            4587 :         }
     374                 : 
     375                 :         auto* desc =
     376           40974 :             static_cast<descriptor_state*>(event_buffer_[i].data.ptr);
     377           40974 :         desc->add_ready_events(event_buffer_[i].events);
     378                 : 
     379           40974 :         bool expected = false;
     380           40974 :         if (desc->is_enqueued_.compare_exchange_strong(
     381                 :                 expected, true, std::memory_order_release,
     382                 :                 std::memory_order_relaxed))
     383                 :         {
     384           40974 :             local_ops.push(desc);
     385                 :         }
     386                 :     }
     387                 : 
     388           39755 :     if (check_timers)
     389                 :     {
     390            4587 :         timer_svc_->process_expired();
     391            4587 :         update_timerfd();
     392                 :     }
     393                 : 
     394           39755 :     lock.lock();
     395                 : 
     396           39755 :     if (!local_ops.empty())
     397           26284 :         completed_ops_.splice(local_ops);
     398           39755 : }
     399                 : 
     400                 : } // namespace boost::corosio::detail
     401                 : 
     402                 : #endif // BOOST_COROSIO_HAS_EPOLL
     403                 : 
     404                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
        

Generated by: LCOV version 2.3