LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_scheduler.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 70.9 % 337 239 98
Test Date: 2026-03-31 21:45:45 Functions: 78.3 % 46 36 10

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/config.hpp>
      14                 : #include <boost/capy/ex/execution_context.hpp>
      15                 : 
      16                 : #include <boost/corosio/native/native_scheduler.hpp>
      17                 : #include <boost/corosio/detail/scheduler_op.hpp>
      18                 : #include <boost/corosio/detail/thread_local_ptr.hpp>
      19                 : 
      20                 : #include <atomic>
      21                 : #include <chrono>
      22                 : #include <coroutine>
      23                 : #include <cstddef>
      24                 : #include <cstdint>
      25                 : #include <limits>
      26                 : #include <memory>
      27                 : #include <stdexcept>
      28                 : 
      29                 : #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
      30                 : #include <boost/corosio/detail/conditionally_enabled_event.hpp>
      31                 : 
      32                 : namespace boost::corosio::detail {
      33                 : 
      34                 : // Forward declaration
      35                 : class reactor_scheduler_base;
      36                 : 
      37                 : /** Per-thread state for a reactor scheduler.
      38                 : 
      39                 :     Each thread running a scheduler's event loop has one of these
      40                 :     on a thread-local stack. It holds a private work queue and
      41                 :     inline completion budget for speculative I/O fast paths.
      42                 : */
      43                 : struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
      44                 : {
      45                 :     /// Scheduler this context belongs to.
      46                 :     reactor_scheduler_base const* key;
      47                 : 
      48                 :     /// Next context frame on this thread's stack.
      49                 :     reactor_scheduler_context* next;
      50                 : 
      51                 :     /// Private work queue for reduced contention.
      52                 :     op_queue private_queue;
      53                 : 
      54                 :     /// Unflushed work count for the private queue.
      55                 :     std::int64_t private_outstanding_work;
      56                 : 
      57                 :     /// Remaining inline completions allowed this cycle.
      58                 :     int inline_budget;
      59                 : 
      60                 :     /// Maximum inline budget (adaptive, 2-16).
      61                 :     int inline_budget_max;
      62                 : 
      63                 :     /// True if no other thread absorbed queued work last cycle.
      64                 :     bool unassisted;
      65                 : 
      66                 :     /// Construct a context frame linked to @a n.
      67                 :     reactor_scheduler_context(
      68                 :         reactor_scheduler_base const* k,
      69                 :         reactor_scheduler_context* n);
      70                 : };
      71                 : 
      72                 : /// Thread-local context stack for reactor schedulers.
      73                 : inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
      74                 : 
      75                 : /// Find the context frame for a scheduler on this thread.
      76                 : inline reactor_scheduler_context*
      77 HIT      795818 : reactor_find_context(reactor_scheduler_base const* self) noexcept
      78                 : {
      79          795818 :     for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
      80                 :     {
      81          792846 :         if (c->key == self)
      82          792846 :             return c;
      83                 :     }
      84            2972 :     return nullptr;
      85                 : }
      86                 : 
      87                 : /// Flush private work count to global counter.
      88                 : inline void
      89 MIS           0 : reactor_flush_private_work(
      90                 :     reactor_scheduler_context* ctx,
      91                 :     std::atomic<std::int64_t>& outstanding_work) noexcept
      92                 : {
      93               0 :     if (ctx && ctx->private_outstanding_work > 0)
      94                 :     {
      95               0 :         outstanding_work.fetch_add(
      96                 :             ctx->private_outstanding_work, std::memory_order_relaxed);
      97               0 :         ctx->private_outstanding_work = 0;
      98                 :     }
      99               0 : }
     100                 : 
     101                 : /** Drain private queue to global queue, flushing work count first.
     102                 : 
     103                 :     @return True if any ops were drained.
     104                 : */
     105                 : inline bool
     106               0 : reactor_drain_private_queue(
     107                 :     reactor_scheduler_context* ctx,
     108                 :     std::atomic<std::int64_t>& outstanding_work,
     109                 :     op_queue& completed_ops) noexcept
     110                 : {
     111               0 :     if (!ctx || ctx->private_queue.empty())
     112               0 :         return false;
     113                 : 
     114               0 :     reactor_flush_private_work(ctx, outstanding_work);
     115               0 :     completed_ops.splice(ctx->private_queue);
     116               0 :     return true;
     117                 : }
     118                 : 
     119                 : /** Non-template base for reactor-backed scheduler implementations.
     120                 : 
     121                 :     Provides the complete threading model shared by epoll, kqueue,
     122                 :     and select schedulers: signal state machine, inline completion
     123                 :     budget, work counting, run/poll methods, and the do_one event
     124                 :     loop.
     125                 : 
     126                 :     Derived classes provide platform-specific hooks by overriding:
     127                 :     - `run_task(lock, ctx)` to run the reactor poll
     128                 :     - `interrupt_reactor()` to wake a blocked reactor
     129                 : 
     130                 :     De-templated from the original CRTP design to eliminate
     131                 :     duplicate instantiations when multiple backends are compiled
     132                 :     into the same binary. Virtual dispatch for run_task (called
     133                 :     once per reactor cycle, before a blocking syscall) has
     134                 :     negligible overhead.
     135                 : 
     136                 :     @par Thread Safety
     137                 :     All public member functions are thread-safe.
     138                 : */
     139                 : class reactor_scheduler_base
     140                 :     : public native_scheduler
     141                 :     , public capy::execution_context::service
     142                 : {
     143                 : public:
     144                 :     using key_type     = scheduler;
     145                 :     using context_type = reactor_scheduler_context;
     146                 :     using mutex_type = conditionally_enabled_mutex;
     147                 :     using lock_type = mutex_type::scoped_lock;
     148                 :     using event_type = conditionally_enabled_event;
     149                 : 
     150                 :     /// Post a coroutine for deferred execution.
     151                 :     void post(std::coroutine_handle<> h) const override;
     152                 : 
     153                 :     /// Post a scheduler operation for deferred execution.
     154                 :     void post(scheduler_op* h) const override;
     155                 : 
     156                 :     /// Return true if called from a thread running this scheduler.
     157                 :     bool running_in_this_thread() const noexcept override;
     158                 : 
     159                 :     /// Request the scheduler to stop dispatching handlers.
     160                 :     void stop() override;
     161                 : 
     162                 :     /// Return true if the scheduler has been stopped.
     163                 :     bool stopped() const noexcept override;
     164                 : 
     165                 :     /// Reset the stopped state so `run()` can resume.
     166                 :     void restart() override;
     167                 : 
     168                 :     /// Run the event loop until no work remains.
     169                 :     std::size_t run() override;
     170                 : 
     171                 :     /// Run until one handler completes or no work remains.
     172                 :     std::size_t run_one() override;
     173                 : 
     174                 :     /// Run until one handler completes or @a usec elapses.
     175                 :     std::size_t wait_one(long usec) override;
     176                 : 
     177                 :     /// Run ready handlers without blocking.
     178                 :     std::size_t poll() override;
     179                 : 
     180                 :     /// Run at most one ready handler without blocking.
     181                 :     std::size_t poll_one() override;
     182                 : 
     183                 :     /// Increment the outstanding work count.
     184                 :     void work_started() noexcept override;
     185                 : 
     186                 :     /// Decrement the outstanding work count, stopping on zero.
     187                 :     void work_finished() noexcept override;
     188                 : 
     189                 :     /** Reset the thread's inline completion budget.
     190                 : 
     191                 :         Called at the start of each posted completion handler to
     192                 :         grant a fresh budget for speculative inline completions.
     193                 :     */
     194                 :     void reset_inline_budget() const noexcept;
     195                 : 
     196                 :     /** Consume one unit of inline budget if available.
     197                 : 
     198                 :         @return True if budget was available and consumed.
     199                 :     */
     200                 :     bool try_consume_inline_budget() const noexcept;
     201                 : 
     202                 :     /** Offset a forthcoming work_finished from work_cleanup.
     203                 : 
     204                 :         Called by descriptor_state when all I/O returned EAGAIN and
     205                 :         no handler will be executed. Must be called from a scheduler
     206                 :         thread.
     207                 :     */
     208                 :     void compensating_work_started() const noexcept;
     209                 : 
     210                 :     /** Drain work from thread context's private queue to global queue.
     211                 : 
     212                 :         Flushes private work count to the global counter, then
     213                 :         transfers the queue under mutex protection.
     214                 : 
     215                 :         @param queue The private queue to drain.
     216                 :         @param count Private work count to flush before draining.
     217                 :     */
     218                 :     void drain_thread_queue(op_queue& queue, std::int64_t count) const;
     219                 : 
     220                 :     /** Post completed operations for deferred invocation.
     221                 : 
     222                 :         If called from a thread running this scheduler, operations
     223                 :         go to the thread's private queue (fast path). Otherwise,
     224                 :         operations are added to the global queue under mutex and a
     225                 :         waiter is signaled.
     226                 : 
     227                 :         @par Preconditions
     228                 :         work_started() must have been called for each operation.
     229                 : 
     230                 :         @param ops Queue of operations to post.
     231                 :     */
     232                 :     void post_deferred_completions(op_queue& ops) const;
     233                 : 
     234                 :     /** Apply runtime configuration to the scheduler.
     235                 : 
     236                 :         Called by `io_context` after construction. Values that do
     237                 :         not apply to this backend are silently ignored.
     238                 : 
     239                 :         @param max_events  Event buffer size for epoll/kqueue.
     240                 :         @param budget_init Starting inline completion budget.
     241                 :         @param budget_max  Hard ceiling on adaptive budget ramp-up.
     242                 :         @param unassisted  Budget when single-threaded.
     243                 :     */
     244                 :     virtual void configure_reactor(
     245                 :         unsigned max_events,
     246                 :         unsigned budget_init,
     247                 :         unsigned budget_max,
     248                 :         unsigned unassisted);
     249                 : 
     250                 :     /// Return the configured initial inline budget.
     251 HIT         461 :     unsigned inline_budget_initial() const noexcept
     252                 :     {
     253             461 :         return inline_budget_initial_;
     254                 :     }
     255                 : 
     256                 :     /** Enable or disable single-threaded (lockless) mode.
     257                 : 
     258                 :         When enabled, all scheduler mutex and condition variable
     259                 :         operations become no-ops. Cross-thread post() is
     260                 :         undefined behavior.
     261                 :     */
     262 MIS           0 :     void configure_single_threaded(bool v) noexcept
     263                 :     {
     264               0 :         single_threaded_ = v;
     265               0 :         mutex_.set_enabled(!v);
     266               0 :         cond_.set_enabled(!v);
     267               0 :     }
     268                 : 
     269                 : protected:
     270 HIT         517 :     reactor_scheduler_base() = default;
     271                 : 
     272                 :     /** Drain completed_ops during shutdown.
     273                 : 
     274                 :         Pops all operations from the global queue and destroys them,
     275                 :         skipping the task sentinel. Signals all waiting threads.
     276                 :         Derived classes call this from their shutdown() override
     277                 :         before performing platform-specific cleanup.
     278                 :     */
     279                 :     void shutdown_drain();
     280                 : 
     281                 :     /// RAII guard that re-inserts the task sentinel after `run_task`.
     282                 :     struct task_cleanup
     283                 :     {
     284                 :         reactor_scheduler_base const* sched;
     285                 :         lock_type* lock;
     286                 :         context_type* ctx;
     287                 :         ~task_cleanup();
     288                 :     };
     289                 : 
     290                 :     mutable mutex_type mutex_{true};
     291                 :     mutable event_type cond_{true};
     292                 :     mutable op_queue completed_ops_;
     293                 :     mutable std::atomic<std::int64_t> outstanding_work_{0};
     294                 :     std::atomic<bool> stopped_{false};
     295                 :     mutable std::atomic<bool> task_running_{false};
     296                 :     mutable bool task_interrupted_ = false;
     297                 : 
     298                 :     // Runtime-configurable reactor tuning parameters.
     299                 :     // Defaults match the library's built-in values.
     300                 :     unsigned max_events_per_poll_   = 128;
     301                 :     unsigned inline_budget_initial_ = 2;
     302                 :     unsigned inline_budget_max_     = 16;
     303                 :     unsigned unassisted_budget_     = 4;
     304                 : 
     305                 :     /// Bit 0 of `state_`: set when the condvar should be signaled.
     306                 :     static constexpr std::size_t signaled_bit = 1;
     307                 : 
     308                 :     /// Increment per waiting thread in `state_`.
     309                 :     static constexpr std::size_t waiter_increment = 2;
     310                 :     mutable std::size_t state_                    = 0;
     311                 : 
     312                 :     /// Sentinel op that triggers a reactor poll when dequeued.
     313                 :     struct task_op final : scheduler_op
     314                 :     {
     315 MIS           0 :         void operator()() override {}
     316               0 :         void destroy() override {}
     317                 :     };
     318                 :     task_op task_op_;
     319                 : 
     320                 :     /// Run the platform-specific reactor poll.
     321                 :     virtual void
     322                 :     run_task(lock_type& lock, context_type* ctx,
     323                 :         long timeout_us) = 0;
     324                 : 
     325                 :     /// Wake a blocked reactor (e.g. write to eventfd or pipe).
     326                 :     virtual void interrupt_reactor() const = 0;
     327                 : 
     328                 : private:
     329                 :     struct work_cleanup
     330                 :     {
     331                 :         reactor_scheduler_base* sched;
     332                 :         lock_type* lock;
     333                 :         context_type* ctx;
     334                 :         ~work_cleanup();
     335                 :     };
     336                 : 
     337                 :     std::size_t do_one(
     338                 :         lock_type& lock, long timeout_us, context_type* ctx);
     339                 : 
     340                 :     void signal_all(lock_type& lock) const;
     341                 :     bool maybe_unlock_and_signal_one(lock_type& lock) const;
     342                 :     bool unlock_and_signal_one(lock_type& lock) const;
     343                 :     void clear_signal() const;
     344                 :     void wait_for_signal(lock_type& lock) const;
     345                 :     void wait_for_signal_for(
     346                 :         lock_type& lock, long timeout_us) const;
     347                 :     void wake_one_thread_and_unlock(lock_type& lock) const;
     348                 : };
     349                 : 
     350                 : /** RAII guard that pushes/pops a scheduler context frame.
     351                 : 
     352                 :     On construction, pushes a new context frame onto the
     353                 :     thread-local stack. On destruction, drains any remaining
     354                 :     private queue items to the global queue and pops the frame.
     355                 : */
     356                 : struct reactor_thread_context_guard
     357                 : {
     358                 :     /// The context frame managed by this guard.
     359                 :     reactor_scheduler_context frame_;
     360                 : 
     361                 :     /// Construct the guard, pushing a frame for @a sched.
     362 HIT         461 :     explicit reactor_thread_context_guard(
     363                 :         reactor_scheduler_base const* sched) noexcept
     364             461 :         : frame_(sched, reactor_context_stack.get())
     365                 :     {
     366             461 :         reactor_context_stack.set(&frame_);
     367             461 :     }
     368                 : 
     369                 :     /// Destroy the guard, draining private work and popping the frame.
     370             461 :     ~reactor_thread_context_guard() noexcept
     371                 :     {
     372             461 :         if (!frame_.private_queue.empty())
     373 MIS           0 :             frame_.key->drain_thread_queue(
     374               0 :                 frame_.private_queue, frame_.private_outstanding_work);
     375 HIT         461 :         reactor_context_stack.set(frame_.next);
     376             461 :     }
     377                 : };
     378                 : 
     379                 : // ---- Inline implementations ------------------------------------------------
     380                 : 
     381                 : inline
     382             461 : reactor_scheduler_context::reactor_scheduler_context(
     383                 :     reactor_scheduler_base const* k,
     384             461 :     reactor_scheduler_context* n)
     385             461 :     : key(k)
     386             461 :     , next(n)
     387             461 :     , private_outstanding_work(0)
     388             461 :     , inline_budget(0)
     389             461 :     , inline_budget_max(
     390             461 :           static_cast<int>(k->inline_budget_initial()))
     391             461 :     , unassisted(false)
     392                 : {
     393             461 : }
     394                 : 
     395                 : inline void
     396 MIS           0 : reactor_scheduler_base::configure_reactor(
     397                 :     unsigned max_events,
     398                 :     unsigned budget_init,
     399                 :     unsigned budget_max,
     400                 :     unsigned unassisted)
     401                 : {
     402               0 :     if (max_events < 1 ||
     403               0 :         max_events > static_cast<unsigned>(std::numeric_limits<int>::max()))
     404                 :         throw std::out_of_range(
     405               0 :             "max_events_per_poll must be in [1, INT_MAX]");
     406               0 :     if (budget_max < 1 ||
     407               0 :         budget_max > static_cast<unsigned>(std::numeric_limits<int>::max()))
     408                 :         throw std::out_of_range(
     409               0 :             "inline_budget_max must be in [1, INT_MAX]");
     410                 : 
     411                 :     // Clamp initial and unassisted to budget_max.
     412               0 :     if (budget_init > budget_max)
     413               0 :         budget_init = budget_max;
     414               0 :     if (unassisted > budget_max)
     415               0 :         unassisted = budget_max;
     416                 : 
     417               0 :     max_events_per_poll_   = max_events;
     418               0 :     inline_budget_initial_ = budget_init;
     419               0 :     inline_budget_max_     = budget_max;
     420               0 :     unassisted_budget_     = unassisted;
     421               0 : }
     422                 : 
     423                 : inline void
     424 HIT      103892 : reactor_scheduler_base::reset_inline_budget() const noexcept
     425                 : {
     426          103892 :     if (auto* ctx = reactor_find_context(this))
     427                 :     {
     428                 :         // Cap when no other thread absorbed queued work
     429          103892 :         if (ctx->unassisted)
     430                 :         {
     431          103892 :             ctx->inline_budget_max =
     432          103892 :                 static_cast<int>(unassisted_budget_);
     433          103892 :             ctx->inline_budget =
     434          103892 :                 static_cast<int>(unassisted_budget_);
     435          103892 :             return;
     436                 :         }
     437                 :         // Ramp up when previous cycle fully consumed budget
     438 MIS           0 :         if (ctx->inline_budget == 0)
     439               0 :             ctx->inline_budget_max = (std::min)(
     440               0 :                 ctx->inline_budget_max * 2,
     441               0 :                 static_cast<int>(inline_budget_max_));
     442               0 :         else if (ctx->inline_budget < ctx->inline_budget_max)
     443               0 :             ctx->inline_budget_max =
     444               0 :                 static_cast<int>(inline_budget_initial_);
     445               0 :         ctx->inline_budget = ctx->inline_budget_max;
     446                 :     }
     447                 : }
     448                 : 
     449                 : inline bool
     450 HIT      437844 : reactor_scheduler_base::try_consume_inline_budget() const noexcept
     451                 : {
     452          437844 :     if (auto* ctx = reactor_find_context(this))
     453                 :     {
     454          437844 :         if (ctx->inline_budget > 0)
     455                 :         {
     456          350304 :             --ctx->inline_budget;
     457          350304 :             return true;
     458                 :         }
     459                 :     }
     460           87540 :     return false;
     461                 : }
     462                 : 
     463                 : inline void
     464            2117 : reactor_scheduler_base::post(std::coroutine_handle<> h) const
     465                 : {
     466                 :     struct post_handler final : scheduler_op
     467                 :     {
     468                 :         std::coroutine_handle<> h_;
     469                 : 
     470            2117 :         explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
     471            4234 :         ~post_handler() override = default;
     472                 : 
     473            2108 :         void operator()() override
     474                 :         {
     475            2108 :             auto saved = h_;
     476            2108 :             delete this;
     477                 :             // Ensure stores from the posting thread are visible
     478                 :             std::atomic_thread_fence(std::memory_order_acquire);
     479            2108 :             saved.resume();
     480            2108 :         }
     481                 : 
     482               9 :         void destroy() override
     483                 :         {
     484               9 :             auto saved = h_;
     485               9 :             delete this;
     486               9 :             saved.destroy();
     487               9 :         }
     488                 :     };
     489                 : 
     490            2117 :     auto ph = std::make_unique<post_handler>(h);
     491                 : 
     492            2117 :     if (auto* ctx = reactor_find_context(this))
     493                 :     {
     494               6 :         ++ctx->private_outstanding_work;
     495               6 :         ctx->private_queue.push(ph.release());
     496               6 :         return;
     497                 :     }
     498                 : 
     499            2111 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     500                 : 
     501            2111 :     lock_type lock(mutex_);
     502            2111 :     completed_ops_.push(ph.release());
     503            2111 :     wake_one_thread_and_unlock(lock);
     504            2117 : }
     505                 : 
     506                 : inline void
     507          105187 : reactor_scheduler_base::post(scheduler_op* h) const
     508                 : {
     509          105187 :     if (auto* ctx = reactor_find_context(this))
     510                 :     {
     511          105015 :         ++ctx->private_outstanding_work;
     512          105015 :         ctx->private_queue.push(h);
     513          105015 :         return;
     514                 :     }
     515                 : 
     516             172 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     517                 : 
     518             172 :     lock_type lock(mutex_);
     519             172 :     completed_ops_.push(h);
     520             172 :     wake_one_thread_and_unlock(lock);
     521             172 : }
     522                 : 
     523                 : inline bool
     524            1286 : reactor_scheduler_base::running_in_this_thread() const noexcept
     525                 : {
     526            1286 :     return reactor_find_context(this) != nullptr;
     527                 : }
     528                 : 
     529                 : inline void
     530             414 : reactor_scheduler_base::stop()
     531                 : {
     532             414 :     lock_type lock(mutex_);
     533             414 :     if (!stopped_.load(std::memory_order_acquire))
     534                 :     {
     535             376 :         stopped_.store(true, std::memory_order_release);
     536             376 :         signal_all(lock);
     537             376 :         interrupt_reactor();
     538                 :     }
     539             414 : }
     540                 : 
     541                 : inline bool
     542              62 : reactor_scheduler_base::stopped() const noexcept
     543                 : {
     544              62 :     return stopped_.load(std::memory_order_acquire);
     545                 : }
     546                 : 
     547                 : inline void
     548              91 : reactor_scheduler_base::restart()
     549                 : {
     550              91 :     stopped_.store(false, std::memory_order_release);
     551              91 : }
     552                 : 
     553                 : inline std::size_t
     554             388 : reactor_scheduler_base::run()
     555                 : {
     556             776 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     557                 :     {
     558              28 :         stop();
     559              28 :         return 0;
     560                 :     }
     561                 : 
     562             360 :     reactor_thread_context_guard ctx(this);
     563             360 :     lock_type lock(mutex_);
     564                 : 
     565             360 :     std::size_t n = 0;
     566                 :     for (;;)
     567                 :     {
     568          269115 :         if (!do_one(lock, -1, &ctx.frame_))
     569             360 :             break;
     570          268755 :         if (n != (std::numeric_limits<std::size_t>::max)())
     571          268755 :             ++n;
     572          268755 :         if (!lock.owns_lock())
     573          172171 :             lock.lock();
     574                 :     }
     575             360 :     return n;
     576             360 : }
     577                 : 
     578                 : inline std::size_t
     579               2 : reactor_scheduler_base::run_one()
     580                 : {
     581               4 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     582                 :     {
     583 MIS           0 :         stop();
     584               0 :         return 0;
     585                 :     }
     586                 : 
     587 HIT           2 :     reactor_thread_context_guard ctx(this);
     588               2 :     lock_type lock(mutex_);
     589               2 :     return do_one(lock, -1, &ctx.frame_);
     590               2 : }
     591                 : 
     592                 : inline std::size_t
     593             102 : reactor_scheduler_base::wait_one(long usec)
     594                 : {
     595             204 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     596                 :     {
     597              10 :         stop();
     598              10 :         return 0;
     599                 :     }
     600                 : 
     601              92 :     reactor_thread_context_guard ctx(this);
     602              92 :     lock_type lock(mutex_);
     603              92 :     return do_one(lock, usec, &ctx.frame_);
     604              92 : }
     605                 : 
     606                 : inline std::size_t
     607               6 : reactor_scheduler_base::poll()
     608                 : {
     609              12 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     610                 :     {
     611               1 :         stop();
     612               1 :         return 0;
     613                 :     }
     614                 : 
     615               5 :     reactor_thread_context_guard ctx(this);
     616               5 :     lock_type lock(mutex_);
     617                 : 
     618               5 :     std::size_t n = 0;
     619                 :     for (;;)
     620                 :     {
     621              11 :         if (!do_one(lock, 0, &ctx.frame_))
     622               5 :             break;
     623               6 :         if (n != (std::numeric_limits<std::size_t>::max)())
     624               6 :             ++n;
     625               6 :         if (!lock.owns_lock())
     626               6 :             lock.lock();
     627                 :     }
     628               5 :     return n;
     629               5 : }
     630                 : 
     631                 : inline std::size_t
     632               4 : reactor_scheduler_base::poll_one()
     633                 : {
     634               8 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     635                 :     {
     636               2 :         stop();
     637               2 :         return 0;
     638                 :     }
     639                 : 
     640               2 :     reactor_thread_context_guard ctx(this);
     641               2 :     lock_type lock(mutex_);
     642               2 :     return do_one(lock, 0, &ctx.frame_);
     643               2 : }
     644                 : 
     645                 : inline void
     646           26267 : reactor_scheduler_base::work_started() noexcept
     647                 : {
     648           26267 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     649           26267 : }
     650                 : 
     651                 : inline void
     652           36948 : reactor_scheduler_base::work_finished() noexcept
     653                 : {
     654           73896 :     if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
     655             368 :         stop();
     656           36948 : }
     657                 : 
     658                 : inline void
     659          145492 : reactor_scheduler_base::compensating_work_started() const noexcept
     660                 : {
     661          145492 :     auto* ctx = reactor_find_context(this);
     662          145492 :     if (ctx)
     663          145492 :         ++ctx->private_outstanding_work;
     664          145492 : }
     665                 : 
     666                 : inline void
     667 MIS           0 : reactor_scheduler_base::drain_thread_queue(
     668                 :     op_queue& queue, std::int64_t count) const
     669                 : {
     670               0 :     if (count > 0)
     671               0 :         outstanding_work_.fetch_add(count, std::memory_order_relaxed);
     672                 : 
     673               0 :     lock_type lock(mutex_);
     674               0 :     completed_ops_.splice(queue);
     675               0 :     if (count > 0)
     676               0 :         maybe_unlock_and_signal_one(lock);
     677               0 : }
     678                 : 
     679                 : inline void
     680 HIT       16037 : reactor_scheduler_base::post_deferred_completions(op_queue& ops) const
     681                 : {
     682           16037 :     if (ops.empty())
     683           16037 :         return;
     684                 : 
     685 MIS           0 :     if (auto* ctx = reactor_find_context(this))
     686                 :     {
     687               0 :         ctx->private_queue.splice(ops);
     688               0 :         return;
     689                 :     }
     690                 : 
     691               0 :     lock_type lock(mutex_);
     692               0 :     completed_ops_.splice(ops);
     693               0 :     wake_one_thread_and_unlock(lock);
     694               0 : }
     695                 : 
     696                 : inline void
     697 HIT         517 : reactor_scheduler_base::shutdown_drain()
     698                 : {
     699             517 :     lock_type lock(mutex_);
     700                 : 
     701            1121 :     while (auto* h = completed_ops_.pop())
     702                 :     {
     703             604 :         if (h == &task_op_)
     704             517 :             continue;
     705              87 :         lock.unlock();
     706              87 :         h->destroy();
     707              87 :         lock.lock();
     708             604 :     }
     709                 : 
     710             517 :     signal_all(lock);
     711             517 : }
     712                 : 
     713                 : inline void
     714             893 : reactor_scheduler_base::signal_all(lock_type&) const
     715                 : {
     716             893 :     state_ |= signaled_bit;
     717             893 :     cond_.notify_all();
     718             893 : }
     719                 : 
     720                 : inline bool
     721            2283 : reactor_scheduler_base::maybe_unlock_and_signal_one(
     722                 :     lock_type& lock) const
     723                 : {
     724            2283 :     state_ |= signaled_bit;
     725            2283 :     if (state_ > signaled_bit)
     726                 :     {
     727 MIS           0 :         lock.unlock();
     728               0 :         cond_.notify_one();
     729               0 :         return true;
     730                 :     }
     731 HIT        2283 :     return false;
     732                 : }
     733                 : 
     734                 : inline bool
     735          319136 : reactor_scheduler_base::unlock_and_signal_one(
     736                 :     lock_type& lock) const
     737                 : {
     738          319136 :     state_ |= signaled_bit;
     739          319136 :     bool have_waiters = state_ > signaled_bit;
     740          319136 :     lock.unlock();
     741          319136 :     if (have_waiters)
     742 MIS           0 :         cond_.notify_one();
     743 HIT      319136 :     return have_waiters;
     744                 : }
     745                 : 
     746                 : inline void
     747 MIS           0 : reactor_scheduler_base::clear_signal() const
     748                 : {
     749               0 :     state_ &= ~signaled_bit;
     750               0 : }
     751                 : 
     752                 : inline void
     753               0 : reactor_scheduler_base::wait_for_signal(
     754                 :     lock_type& lock) const
     755                 : {
     756               0 :     while ((state_ & signaled_bit) == 0)
     757                 :     {
     758               0 :         state_ += waiter_increment;
     759               0 :         cond_.wait(lock);
     760               0 :         state_ -= waiter_increment;
     761                 :     }
     762               0 : }
     763                 : 
     764                 : inline void
     765               0 : reactor_scheduler_base::wait_for_signal_for(
     766                 :     lock_type& lock, long timeout_us) const
     767                 : {
     768               0 :     if ((state_ & signaled_bit) == 0)
     769                 :     {
     770               0 :         state_ += waiter_increment;
     771               0 :         cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
     772               0 :         state_ -= waiter_increment;
     773                 :     }
     774               0 : }
     775                 : 
     776                 : inline void
     777 HIT        2283 : reactor_scheduler_base::wake_one_thread_and_unlock(
     778                 :     lock_type& lock) const
     779                 : {
     780            2283 :     if (maybe_unlock_and_signal_one(lock))
     781 MIS           0 :         return;
     782                 : 
     783 HIT        2283 :     if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
     784                 :     {
     785              58 :         task_interrupted_ = true;
     786              58 :         lock.unlock();
     787              58 :         interrupt_reactor();
     788                 :     }
     789                 :     else
     790                 :     {
     791            2225 :         lock.unlock();
     792                 :     }
     793                 : }
     794                 : 
     795          268816 : inline reactor_scheduler_base::work_cleanup::~work_cleanup()
     796                 : {
     797          268816 :     if (ctx)
     798                 :     {
     799          268816 :         std::int64_t produced = ctx->private_outstanding_work;
     800          268816 :         if (produced > 1)
     801              15 :             sched->outstanding_work_.fetch_add(
     802                 :                 produced - 1, std::memory_order_relaxed);
     803          268801 :         else if (produced < 1)
     804           26718 :             sched->work_finished();
     805          268816 :         ctx->private_outstanding_work = 0;
     806                 : 
     807          268816 :         if (!ctx->private_queue.empty())
     808                 :         {
     809           96606 :             lock->lock();
     810           96606 :             sched->completed_ops_.splice(ctx->private_queue);
     811                 :         }
     812                 :     }
     813                 :     else
     814                 :     {
     815 MIS           0 :         sched->work_finished();
     816                 :     }
     817 HIT      268816 : }
     818                 : 
     819          358040 : inline reactor_scheduler_base::task_cleanup::~task_cleanup()
     820                 : {
     821          179020 :     if (!ctx)
     822 MIS           0 :         return;
     823                 : 
     824 HIT      179020 :     if (ctx->private_outstanding_work > 0)
     825                 :     {
     826            8390 :         sched->outstanding_work_.fetch_add(
     827            8390 :             ctx->private_outstanding_work, std::memory_order_relaxed);
     828            8390 :         ctx->private_outstanding_work = 0;
     829                 :     }
     830                 : 
     831          179020 :     if (!ctx->private_queue.empty())
     832                 :     {
     833            8390 :         if (!lock->owns_lock())
     834 MIS           0 :             lock->lock();
     835 HIT        8390 :         sched->completed_ops_.splice(ctx->private_queue);
     836                 :     }
     837          179020 : }
     838                 : 
     839                 : inline std::size_t
     840          269222 : reactor_scheduler_base::do_one(
     841                 :     lock_type& lock, long timeout_us, context_type* ctx)
     842                 : {
     843                 :     for (;;)
     844                 :     {
     845          448201 :         if (stopped_.load(std::memory_order_acquire))
     846             360 :             return 0;
     847                 : 
     848          447841 :         scheduler_op* op = completed_ops_.pop();
     849                 : 
     850                 :         // Handle reactor sentinel — time to poll for I/O
     851          447841 :         if (op == &task_op_)
     852                 :         {
     853                 :             bool more_handlers =
     854          179025 :                 !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
     855                 : 
     856          307730 :             if (!more_handlers &&
     857          257410 :                 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
     858                 :                  timeout_us == 0))
     859                 :             {
     860               5 :                 completed_ops_.push(&task_op_);
     861               5 :                 return 0;
     862                 :             }
     863                 : 
     864          179020 :             long task_timeout_us = more_handlers ? 0 : timeout_us;
     865          179020 :             task_interrupted_ = task_timeout_us == 0;
     866          179020 :             task_running_.store(true, std::memory_order_release);
     867                 : 
     868          179020 :             if (more_handlers)
     869           50320 :                 unlock_and_signal_one(lock);
     870                 : 
     871                 :             try
     872                 :             {
     873          179020 :                 run_task(lock, ctx, task_timeout_us);
     874                 :             }
     875 MIS           0 :             catch (...)
     876                 :             {
     877               0 :                 task_running_.store(false, std::memory_order_relaxed);
     878               0 :                 throw;
     879               0 :             }
     880                 : 
     881 HIT      179020 :             task_running_.store(false, std::memory_order_relaxed);
     882          179020 :             completed_ops_.push(&task_op_);
     883          179020 :             if (timeout_us > 0)
     884              41 :                 return 0;
     885          178979 :             continue;
     886          178979 :         }
     887                 : 
     888                 :         // Handle operation
     889          268816 :         if (op != nullptr)
     890                 :         {
     891          268816 :             bool more = !completed_ops_.empty();
     892                 : 
     893          268816 :             if (more)
     894          268816 :                 ctx->unassisted = !unlock_and_signal_one(lock);
     895                 :             else
     896                 :             {
     897 MIS           0 :                 ctx->unassisted = false;
     898               0 :                 lock.unlock();
     899                 :             }
     900                 : 
     901 HIT      268816 :             work_cleanup on_exit{this, &lock, ctx};
     902                 :             (void)on_exit;
     903                 : 
     904          268816 :             (*op)();
     905          268816 :             return 1;
     906          268816 :         }
     907                 : 
     908                 :         // Try private queue before blocking
     909 MIS           0 :         if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
     910               0 :             continue;
     911                 : 
     912               0 :         if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
     913                 :             timeout_us == 0)
     914               0 :             return 0;
     915                 : 
     916               0 :         clear_signal();
     917               0 :         if (timeout_us < 0)
     918               0 :             wait_for_signal(lock);
     919                 :         else
     920               0 :             wait_for_signal_for(lock, timeout_us);
     921 HIT      178979 :     }
     922                 : }
     923                 : 
     924                 : } // namespace boost::corosio::detail
     925                 : 
     926                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
        

Generated by: LCOV version 2.3