include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp

70.9% Lines (239/337) 77.8% List of functions (35/45)
reactor_scheduler.hpp
f(x) Functions (45)
Function Calls Lines Blocks
boost::corosio::detail::reactor_find_context(boost::corosio::detail::reactor_scheduler_base const*) :77 795818x 100.0% 86.0% boost::corosio::detail::reactor_flush_private_work(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&) :89 0 0.0% 0.0% boost::corosio::detail::reactor_drain_private_queue(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :106 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::inline_budget_initial() const :251 461x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::configure_single_threaded(bool) :262 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::reactor_scheduler_base() :270 517x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::task_op::operator()() :315 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::task_op::destroy() :316 0 0.0% 0.0% boost::corosio::detail::reactor_thread_context_guard::reactor_thread_context_guard(boost::corosio::detail::reactor_scheduler_base const*) :362 461x 100.0% 100.0% boost::corosio::detail::reactor_thread_context_guard::~reactor_thread_context_guard() :370 461x 66.7% 80.0% boost::corosio::detail::reactor_scheduler_context::reactor_scheduler_context(boost::corosio::detail::reactor_scheduler_base const*, boost::corosio::detail::reactor_scheduler_context*) :382 461x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::configure_reactor(unsigned int, unsigned int, unsigned int, unsigned int) :396 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::reset_inline_budget() const :424 103892x 50.0% 45.0% boost::corosio::detail::reactor_scheduler_base::try_consume_inline_budget() const :450 437844x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const :464 2117x 100.0% 84.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :470 2117x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::~post_handler() :471 4234x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::operator()() :473 2108x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::destroy() :482 9x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(boost::corosio::detail::scheduler_op*) const :507 105187x 100.0% 87.0% boost::corosio::detail::reactor_scheduler_base::running_in_this_thread() const :524 1286x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::stop() :530 414x 100.0% 82.0% boost::corosio::detail::reactor_scheduler_base::stopped() const :542 62x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::restart() :548 91x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::run() :554 388x 100.0% 76.0% boost::corosio::detail::reactor_scheduler_base::run_one() :579 2x 75.0% 64.0% boost::corosio::detail::reactor_scheduler_base::wait_one(long) :593 102x 100.0% 70.0% boost::corosio::detail::reactor_scheduler_base::poll() :607 6x 100.0% 76.0% boost::corosio::detail::reactor_scheduler_base::poll_one() :632 4x 100.0% 70.0% boost::corosio::detail::reactor_scheduler_base::work_started() :646 26267x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::work_finished() :652 36948x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::compensating_work_started() const :659 145492x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::drain_thread_queue(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&, long) const :667 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) const :680 16037x 30.0% 35.0% boost::corosio::detail::reactor_scheduler_base::shutdown_drain() :697 517x 100.0% 88.0% boost::corosio::detail::reactor_scheduler_base::signal_all(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :714 893x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::maybe_unlock_and_signal_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :721 2283x 57.1% 50.0% boost::corosio::detail::reactor_scheduler_base::unlock_and_signal_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :735 319136x 85.7% 80.0% boost::corosio::detail::reactor_scheduler_base::clear_signal() const :747 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::wait_for_signal(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :753 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::wait_for_signal_for(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, long) const :765 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::wake_one_thread_and_unlock(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :777 2283x 87.5% 92.0% boost::corosio::detail::reactor_scheduler_base::work_cleanup::~work_cleanup() :795 268816x 92.3% 92.0% boost::corosio::detail::reactor_scheduler_base::task_cleanup::~task_cleanup() :819 179020x 83.3% 86.0% boost::corosio::detail::reactor_scheduler_base::do_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, long, boost::corosio::detail::reactor_scheduler_context*) :840 269222x 68.9% 55.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/capy/ex/execution_context.hpp>
15
16 #include <boost/corosio/native/native_scheduler.hpp>
17 #include <boost/corosio/detail/scheduler_op.hpp>
18 #include <boost/corosio/detail/thread_local_ptr.hpp>
19
20 #include <atomic>
21 #include <chrono>
22 #include <coroutine>
23 #include <cstddef>
24 #include <cstdint>
25 #include <limits>
26 #include <memory>
27 #include <stdexcept>
28
29 #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
30 #include <boost/corosio/detail/conditionally_enabled_event.hpp>
31
32 namespace boost::corosio::detail {
33
34 // Forward declaration
35 class reactor_scheduler_base;
36
37 /** Per-thread state for a reactor scheduler.
38
39 Each thread running a scheduler's event loop has one of these
40 on a thread-local stack. It holds a private work queue and
41 inline completion budget for speculative I/O fast paths.
42 */
43 struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
44 {
45 /// Scheduler this context belongs to.
46 reactor_scheduler_base const* key;
47
48 /// Next context frame on this thread's stack.
49 reactor_scheduler_context* next;
50
51 /// Private work queue for reduced contention.
52 op_queue private_queue;
53
54 /// Unflushed work count for the private queue.
55 std::int64_t private_outstanding_work;
56
57 /// Remaining inline completions allowed this cycle.
58 int inline_budget;
59
60 /// Maximum inline budget (adaptive, 2-16).
61 int inline_budget_max;
62
63 /// True if no other thread absorbed queued work last cycle.
64 bool unassisted;
65
66 /// Construct a context frame linked to @a n.
67 reactor_scheduler_context(
68 reactor_scheduler_base const* k,
69 reactor_scheduler_context* n);
70 };
71
72 /// Thread-local context stack for reactor schedulers.
73 inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
74
75 /// Find the context frame for a scheduler on this thread.
76 inline reactor_scheduler_context*
77 795818x reactor_find_context(reactor_scheduler_base const* self) noexcept
78 {
79 795818x for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
80 {
81 792846x if (c->key == self)
82 792846x return c;
83 }
84 2972x return nullptr;
85 }
86
87 /// Flush private work count to global counter.
88 inline void
89 reactor_flush_private_work(
90 reactor_scheduler_context* ctx,
91 std::atomic<std::int64_t>& outstanding_work) noexcept
92 {
93 if (ctx && ctx->private_outstanding_work > 0)
94 {
95 outstanding_work.fetch_add(
96 ctx->private_outstanding_work, std::memory_order_relaxed);
97 ctx->private_outstanding_work = 0;
98 }
99 }
100
101 /** Drain private queue to global queue, flushing work count first.
102
103 @return True if any ops were drained.
104 */
105 inline bool
106 reactor_drain_private_queue(
107 reactor_scheduler_context* ctx,
108 std::atomic<std::int64_t>& outstanding_work,
109 op_queue& completed_ops) noexcept
110 {
111 if (!ctx || ctx->private_queue.empty())
112 return false;
113
114 reactor_flush_private_work(ctx, outstanding_work);
115 completed_ops.splice(ctx->private_queue);
116 return true;
117 }
118
119 /** Non-template base for reactor-backed scheduler implementations.
120
121 Provides the complete threading model shared by epoll, kqueue,
122 and select schedulers: signal state machine, inline completion
123 budget, work counting, run/poll methods, and the do_one event
124 loop.
125
126 Derived classes provide platform-specific hooks by overriding:
127 - `run_task(lock, ctx)` to run the reactor poll
128 - `interrupt_reactor()` to wake a blocked reactor
129
130 De-templated from the original CRTP design to eliminate
131 duplicate instantiations when multiple backends are compiled
132 into the same binary. Virtual dispatch for run_task (called
133 once per reactor cycle, before a blocking syscall) has
134 negligible overhead.
135
136 @par Thread Safety
137 All public member functions are thread-safe.
138 */
139 class reactor_scheduler_base
140 : public native_scheduler
141 , public capy::execution_context::service
142 {
143 public:
144 using key_type = scheduler;
145 using context_type = reactor_scheduler_context;
146 using mutex_type = conditionally_enabled_mutex;
147 using lock_type = mutex_type::scoped_lock;
148 using event_type = conditionally_enabled_event;
149
150 /// Post a coroutine for deferred execution.
151 void post(std::coroutine_handle<> h) const override;
152
153 /// Post a scheduler operation for deferred execution.
154 void post(scheduler_op* h) const override;
155
156 /// Return true if called from a thread running this scheduler.
157 bool running_in_this_thread() const noexcept override;
158
159 /// Request the scheduler to stop dispatching handlers.
160 void stop() override;
161
162 /// Return true if the scheduler has been stopped.
163 bool stopped() const noexcept override;
164
165 /// Reset the stopped state so `run()` can resume.
166 void restart() override;
167
168 /// Run the event loop until no work remains.
169 std::size_t run() override;
170
171 /// Run until one handler completes or no work remains.
172 std::size_t run_one() override;
173
174 /// Run until one handler completes or @a usec elapses.
175 std::size_t wait_one(long usec) override;
176
177 /// Run ready handlers without blocking.
178 std::size_t poll() override;
179
180 /// Run at most one ready handler without blocking.
181 std::size_t poll_one() override;
182
183 /// Increment the outstanding work count.
184 void work_started() noexcept override;
185
186 /// Decrement the outstanding work count, stopping on zero.
187 void work_finished() noexcept override;
188
189 /** Reset the thread's inline completion budget.
190
191 Called at the start of each posted completion handler to
192 grant a fresh budget for speculative inline completions.
193 */
194 void reset_inline_budget() const noexcept;
195
196 /** Consume one unit of inline budget if available.
197
198 @return True if budget was available and consumed.
199 */
200 bool try_consume_inline_budget() const noexcept;
201
202 /** Offset a forthcoming work_finished from work_cleanup.
203
204 Called by descriptor_state when all I/O returned EAGAIN and
205 no handler will be executed. Must be called from a scheduler
206 thread.
207 */
208 void compensating_work_started() const noexcept;
209
210 /** Drain work from thread context's private queue to global queue.
211
212 Flushes private work count to the global counter, then
213 transfers the queue under mutex protection.
214
215 @param queue The private queue to drain.
216 @param count Private work count to flush before draining.
217 */
218 void drain_thread_queue(op_queue& queue, std::int64_t count) const;
219
220 /** Post completed operations for deferred invocation.
221
222 If called from a thread running this scheduler, operations
223 go to the thread's private queue (fast path). Otherwise,
224 operations are added to the global queue under mutex and a
225 waiter is signaled.
226
227 @par Preconditions
228 work_started() must have been called for each operation.
229
230 @param ops Queue of operations to post.
231 */
232 void post_deferred_completions(op_queue& ops) const;
233
234 /** Apply runtime configuration to the scheduler.
235
236 Called by `io_context` after construction. Values that do
237 not apply to this backend are silently ignored.
238
239 @param max_events Event buffer size for epoll/kqueue.
240 @param budget_init Starting inline completion budget.
241 @param budget_max Hard ceiling on adaptive budget ramp-up.
242 @param unassisted Budget when single-threaded.
243 */
244 virtual void configure_reactor(
245 unsigned max_events,
246 unsigned budget_init,
247 unsigned budget_max,
248 unsigned unassisted);
249
250 /// Return the configured initial inline budget.
251 461x unsigned inline_budget_initial() const noexcept
252 {
253 461x return inline_budget_initial_;
254 }
255
256 /** Enable or disable single-threaded (lockless) mode.
257
258 When enabled, all scheduler mutex and condition variable
259 operations become no-ops. Cross-thread post() is
260 undefined behavior.
261 */
262 void configure_single_threaded(bool v) noexcept
263 {
264 single_threaded_ = v;
265 mutex_.set_enabled(!v);
266 cond_.set_enabled(!v);
267 }
268
269 protected:
270 517x reactor_scheduler_base() = default;
271
272 /** Drain completed_ops during shutdown.
273
274 Pops all operations from the global queue and destroys them,
275 skipping the task sentinel. Signals all waiting threads.
276 Derived classes call this from their shutdown() override
277 before performing platform-specific cleanup.
278 */
279 void shutdown_drain();
280
281 /// RAII guard that re-inserts the task sentinel after `run_task`.
282 struct task_cleanup
283 {
284 reactor_scheduler_base const* sched;
285 lock_type* lock;
286 context_type* ctx;
287 ~task_cleanup();
288 };
289
290 mutable mutex_type mutex_{true};
291 mutable event_type cond_{true};
292 mutable op_queue completed_ops_;
293 mutable std::atomic<std::int64_t> outstanding_work_{0};
294 std::atomic<bool> stopped_{false};
295 mutable std::atomic<bool> task_running_{false};
296 mutable bool task_interrupted_ = false;
297
298 // Runtime-configurable reactor tuning parameters.
299 // Defaults match the library's built-in values.
300 unsigned max_events_per_poll_ = 128;
301 unsigned inline_budget_initial_ = 2;
302 unsigned inline_budget_max_ = 16;
303 unsigned unassisted_budget_ = 4;
304
305 /// Bit 0 of `state_`: set when the condvar should be signaled.
306 static constexpr std::size_t signaled_bit = 1;
307
308 /// Increment per waiting thread in `state_`.
309 static constexpr std::size_t waiter_increment = 2;
310 mutable std::size_t state_ = 0;
311
312 /// Sentinel op that triggers a reactor poll when dequeued.
313 struct task_op final : scheduler_op
314 {
315 void operator()() override {}
316 void destroy() override {}
317 };
318 task_op task_op_;
319
320 /// Run the platform-specific reactor poll.
321 virtual void
322 run_task(lock_type& lock, context_type* ctx,
323 long timeout_us) = 0;
324
325 /// Wake a blocked reactor (e.g. write to eventfd or pipe).
326 virtual void interrupt_reactor() const = 0;
327
328 private:
329 struct work_cleanup
330 {
331 reactor_scheduler_base* sched;
332 lock_type* lock;
333 context_type* ctx;
334 ~work_cleanup();
335 };
336
337 std::size_t do_one(
338 lock_type& lock, long timeout_us, context_type* ctx);
339
340 void signal_all(lock_type& lock) const;
341 bool maybe_unlock_and_signal_one(lock_type& lock) const;
342 bool unlock_and_signal_one(lock_type& lock) const;
343 void clear_signal() const;
344 void wait_for_signal(lock_type& lock) const;
345 void wait_for_signal_for(
346 lock_type& lock, long timeout_us) const;
347 void wake_one_thread_and_unlock(lock_type& lock) const;
348 };
349
350 /** RAII guard that pushes/pops a scheduler context frame.
351
352 On construction, pushes a new context frame onto the
353 thread-local stack. On destruction, drains any remaining
354 private queue items to the global queue and pops the frame.
355 */
356 struct reactor_thread_context_guard
357 {
358 /// The context frame managed by this guard.
359 reactor_scheduler_context frame_;
360
361 /// Construct the guard, pushing a frame for @a sched.
362 461x explicit reactor_thread_context_guard(
363 reactor_scheduler_base const* sched) noexcept
364 461x : frame_(sched, reactor_context_stack.get())
365 {
366 461x reactor_context_stack.set(&frame_);
367 461x }
368
369 /// Destroy the guard, draining private work and popping the frame.
370 461x ~reactor_thread_context_guard() noexcept
371 {
372 461x if (!frame_.private_queue.empty())
373 frame_.key->drain_thread_queue(
374 frame_.private_queue, frame_.private_outstanding_work);
375 461x reactor_context_stack.set(frame_.next);
376 461x }
377 };
378
379 // ---- Inline implementations ------------------------------------------------
380
381 inline
382 461x reactor_scheduler_context::reactor_scheduler_context(
383 reactor_scheduler_base const* k,
384 461x reactor_scheduler_context* n)
385 461x : key(k)
386 461x , next(n)
387 461x , private_outstanding_work(0)
388 461x , inline_budget(0)
389 461x , inline_budget_max(
390 461x static_cast<int>(k->inline_budget_initial()))
391 461x , unassisted(false)
392 {
393 461x }
394
395 inline void
396 reactor_scheduler_base::configure_reactor(
397 unsigned max_events,
398 unsigned budget_init,
399 unsigned budget_max,
400 unsigned unassisted)
401 {
402 if (max_events < 1 ||
403 max_events > static_cast<unsigned>(std::numeric_limits<int>::max()))
404 throw std::out_of_range(
405 "max_events_per_poll must be in [1, INT_MAX]");
406 if (budget_max < 1 ||
407 budget_max > static_cast<unsigned>(std::numeric_limits<int>::max()))
408 throw std::out_of_range(
409 "inline_budget_max must be in [1, INT_MAX]");
410
411 // Clamp initial and unassisted to budget_max.
412 if (budget_init > budget_max)
413 budget_init = budget_max;
414 if (unassisted > budget_max)
415 unassisted = budget_max;
416
417 max_events_per_poll_ = max_events;
418 inline_budget_initial_ = budget_init;
419 inline_budget_max_ = budget_max;
420 unassisted_budget_ = unassisted;
421 }
422
423 inline void
424 103892x reactor_scheduler_base::reset_inline_budget() const noexcept
425 {
426 103892x if (auto* ctx = reactor_find_context(this))
427 {
428 // Cap when no other thread absorbed queued work
429 103892x if (ctx->unassisted)
430 {
431 103892x ctx->inline_budget_max =
432 103892x static_cast<int>(unassisted_budget_);
433 103892x ctx->inline_budget =
434 103892x static_cast<int>(unassisted_budget_);
435 103892x return;
436 }
437 // Ramp up when previous cycle fully consumed budget
438 if (ctx->inline_budget == 0)
439 ctx->inline_budget_max = (std::min)(
440 ctx->inline_budget_max * 2,
441 static_cast<int>(inline_budget_max_));
442 else if (ctx->inline_budget < ctx->inline_budget_max)
443 ctx->inline_budget_max =
444 static_cast<int>(inline_budget_initial_);
445 ctx->inline_budget = ctx->inline_budget_max;
446 }
447 }
448
449 inline bool
450 437844x reactor_scheduler_base::try_consume_inline_budget() const noexcept
451 {
452 437844x if (auto* ctx = reactor_find_context(this))
453 {
454 437844x if (ctx->inline_budget > 0)
455 {
456 350304x --ctx->inline_budget;
457 350304x return true;
458 }
459 }
460 87540x return false;
461 }
462
463 inline void
464 2117x reactor_scheduler_base::post(std::coroutine_handle<> h) const
465 {
466 struct post_handler final : scheduler_op
467 {
468 std::coroutine_handle<> h_;
469
470 2117x explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
471 4234x ~post_handler() override = default;
472
473 2108x void operator()() override
474 {
475 2108x auto saved = h_;
476 2108x delete this;
477 // Ensure stores from the posting thread are visible
478 std::atomic_thread_fence(std::memory_order_acquire);
479 2108x saved.resume();
480 2108x }
481
482 9x void destroy() override
483 {
484 9x auto saved = h_;
485 9x delete this;
486 9x saved.destroy();
487 9x }
488 };
489
490 2117x auto ph = std::make_unique<post_handler>(h);
491
492 2117x if (auto* ctx = reactor_find_context(this))
493 {
494 6x ++ctx->private_outstanding_work;
495 6x ctx->private_queue.push(ph.release());
496 6x return;
497 }
498
499 2111x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
500
501 2111x lock_type lock(mutex_);
502 2111x completed_ops_.push(ph.release());
503 2111x wake_one_thread_and_unlock(lock);
504 2117x }
505
506 inline void
507 105187x reactor_scheduler_base::post(scheduler_op* h) const
508 {
509 105187x if (auto* ctx = reactor_find_context(this))
510 {
511 105015x ++ctx->private_outstanding_work;
512 105015x ctx->private_queue.push(h);
513 105015x return;
514 }
515
516 172x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
517
518 172x lock_type lock(mutex_);
519 172x completed_ops_.push(h);
520 172x wake_one_thread_and_unlock(lock);
521 172x }
522
523 inline bool
524 1286x reactor_scheduler_base::running_in_this_thread() const noexcept
525 {
526 1286x return reactor_find_context(this) != nullptr;
527 }
528
529 inline void
530 414x reactor_scheduler_base::stop()
531 {
532 414x lock_type lock(mutex_);
533 414x if (!stopped_.load(std::memory_order_acquire))
534 {
535 376x stopped_.store(true, std::memory_order_release);
536 376x signal_all(lock);
537 376x interrupt_reactor();
538 }
539 414x }
540
541 inline bool
542 62x reactor_scheduler_base::stopped() const noexcept
543 {
544 62x return stopped_.load(std::memory_order_acquire);
545 }
546
547 inline void
548 91x reactor_scheduler_base::restart()
549 {
550 91x stopped_.store(false, std::memory_order_release);
551 91x }
552
553 inline std::size_t
554 388x reactor_scheduler_base::run()
555 {
556 776x if (outstanding_work_.load(std::memory_order_acquire) == 0)
557 {
558 28x stop();
559 28x return 0;
560 }
561
562 360x reactor_thread_context_guard ctx(this);
563 360x lock_type lock(mutex_);
564
565 360x std::size_t n = 0;
566 for (;;)
567 {
568 269115x if (!do_one(lock, -1, &ctx.frame_))
569 360x break;
570 268755x if (n != (std::numeric_limits<std::size_t>::max)())
571 268755x ++n;
572 268755x if (!lock.owns_lock())
573 172171x lock.lock();
574 }
575 360x return n;
576 360x }
577
578 inline std::size_t
579 2x reactor_scheduler_base::run_one()
580 {
581 4x if (outstanding_work_.load(std::memory_order_acquire) == 0)
582 {
583 stop();
584 return 0;
585 }
586
587 2x reactor_thread_context_guard ctx(this);
588 2x lock_type lock(mutex_);
589 2x return do_one(lock, -1, &ctx.frame_);
590 2x }
591
592 inline std::size_t
593 102x reactor_scheduler_base::wait_one(long usec)
594 {
595 204x if (outstanding_work_.load(std::memory_order_acquire) == 0)
596 {
597 10x stop();
598 10x return 0;
599 }
600
601 92x reactor_thread_context_guard ctx(this);
602 92x lock_type lock(mutex_);
603 92x return do_one(lock, usec, &ctx.frame_);
604 92x }
605
606 inline std::size_t
607 6x reactor_scheduler_base::poll()
608 {
609 12x if (outstanding_work_.load(std::memory_order_acquire) == 0)
610 {
611 1x stop();
612 1x return 0;
613 }
614
615 5x reactor_thread_context_guard ctx(this);
616 5x lock_type lock(mutex_);
617
618 5x std::size_t n = 0;
619 for (;;)
620 {
621 11x if (!do_one(lock, 0, &ctx.frame_))
622 5x break;
623 6x if (n != (std::numeric_limits<std::size_t>::max)())
624 6x ++n;
625 6x if (!lock.owns_lock())
626 6x lock.lock();
627 }
628 5x return n;
629 5x }
630
631 inline std::size_t
632 4x reactor_scheduler_base::poll_one()
633 {
634 8x if (outstanding_work_.load(std::memory_order_acquire) == 0)
635 {
636 2x stop();
637 2x return 0;
638 }
639
640 2x reactor_thread_context_guard ctx(this);
641 2x lock_type lock(mutex_);
642 2x return do_one(lock, 0, &ctx.frame_);
643 2x }
644
645 inline void
646 26267x reactor_scheduler_base::work_started() noexcept
647 {
648 26267x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
649 26267x }
650
651 inline void
652 36948x reactor_scheduler_base::work_finished() noexcept
653 {
654 73896x if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
655 368x stop();
656 36948x }
657
658 inline void
659 145492x reactor_scheduler_base::compensating_work_started() const noexcept
660 {
661 145492x auto* ctx = reactor_find_context(this);
662 145492x if (ctx)
663 145492x ++ctx->private_outstanding_work;
664 145492x }
665
666 inline void
667 reactor_scheduler_base::drain_thread_queue(
668 op_queue& queue, std::int64_t count) const
669 {
670 if (count > 0)
671 outstanding_work_.fetch_add(count, std::memory_order_relaxed);
672
673 lock_type lock(mutex_);
674 completed_ops_.splice(queue);
675 if (count > 0)
676 maybe_unlock_and_signal_one(lock);
677 }
678
679 inline void
680 16037x reactor_scheduler_base::post_deferred_completions(op_queue& ops) const
681 {
682 16037x if (ops.empty())
683 16037x return;
684
685 if (auto* ctx = reactor_find_context(this))
686 {
687 ctx->private_queue.splice(ops);
688 return;
689 }
690
691 lock_type lock(mutex_);
692 completed_ops_.splice(ops);
693 wake_one_thread_and_unlock(lock);
694 }
695
696 inline void
697 517x reactor_scheduler_base::shutdown_drain()
698 {
699 517x lock_type lock(mutex_);
700
701 1121x while (auto* h = completed_ops_.pop())
702 {
703 604x if (h == &task_op_)
704 517x continue;
705 87x lock.unlock();
706 87x h->destroy();
707 87x lock.lock();
708 604x }
709
710 517x signal_all(lock);
711 517x }
712
713 inline void
714 893x reactor_scheduler_base::signal_all(lock_type&) const
715 {
716 893x state_ |= signaled_bit;
717 893x cond_.notify_all();
718 893x }
719
720 inline bool
721 2283x reactor_scheduler_base::maybe_unlock_and_signal_one(
722 lock_type& lock) const
723 {
724 2283x state_ |= signaled_bit;
725 2283x if (state_ > signaled_bit)
726 {
727 lock.unlock();
728 cond_.notify_one();
729 return true;
730 }
731 2283x return false;
732 }
733
734 inline bool
735 319136x reactor_scheduler_base::unlock_and_signal_one(
736 lock_type& lock) const
737 {
738 319136x state_ |= signaled_bit;
739 319136x bool have_waiters = state_ > signaled_bit;
740 319136x lock.unlock();
741 319136x if (have_waiters)
742 cond_.notify_one();
743 319136x return have_waiters;
744 }
745
746 inline void
747 reactor_scheduler_base::clear_signal() const
748 {
749 state_ &= ~signaled_bit;
750 }
751
752 inline void
753 reactor_scheduler_base::wait_for_signal(
754 lock_type& lock) const
755 {
756 while ((state_ & signaled_bit) == 0)
757 {
758 state_ += waiter_increment;
759 cond_.wait(lock);
760 state_ -= waiter_increment;
761 }
762 }
763
764 inline void
765 reactor_scheduler_base::wait_for_signal_for(
766 lock_type& lock, long timeout_us) const
767 {
768 if ((state_ & signaled_bit) == 0)
769 {
770 state_ += waiter_increment;
771 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
772 state_ -= waiter_increment;
773 }
774 }
775
776 inline void
777 2283x reactor_scheduler_base::wake_one_thread_and_unlock(
778 lock_type& lock) const
779 {
780 2283x if (maybe_unlock_and_signal_one(lock))
781 return;
782
783 2283x if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
784 {
785 58x task_interrupted_ = true;
786 58x lock.unlock();
787 58x interrupt_reactor();
788 }
789 else
790 {
791 2225x lock.unlock();
792 }
793 }
794
795 268816x inline reactor_scheduler_base::work_cleanup::~work_cleanup()
796 {
797 268816x if (ctx)
798 {
799 268816x std::int64_t produced = ctx->private_outstanding_work;
800 268816x if (produced > 1)
801 15x sched->outstanding_work_.fetch_add(
802 produced - 1, std::memory_order_relaxed);
803 268801x else if (produced < 1)
804 26718x sched->work_finished();
805 268816x ctx->private_outstanding_work = 0;
806
807 268816x if (!ctx->private_queue.empty())
808 {
809 96606x lock->lock();
810 96606x sched->completed_ops_.splice(ctx->private_queue);
811 }
812 }
813 else
814 {
815 sched->work_finished();
816 }
817 268816x }
818
819 358040x inline reactor_scheduler_base::task_cleanup::~task_cleanup()
820 {
821 179020x if (!ctx)
822 return;
823
824 179020x if (ctx->private_outstanding_work > 0)
825 {
826 8390x sched->outstanding_work_.fetch_add(
827 8390x ctx->private_outstanding_work, std::memory_order_relaxed);
828 8390x ctx->private_outstanding_work = 0;
829 }
830
831 179020x if (!ctx->private_queue.empty())
832 {
833 8390x if (!lock->owns_lock())
834 lock->lock();
835 8390x sched->completed_ops_.splice(ctx->private_queue);
836 }
837 179020x }
838
839 inline std::size_t
840 269222x reactor_scheduler_base::do_one(
841 lock_type& lock, long timeout_us, context_type* ctx)
842 {
843 for (;;)
844 {
845 448201x if (stopped_.load(std::memory_order_acquire))
846 360x return 0;
847
848 447841x scheduler_op* op = completed_ops_.pop();
849
850 // Handle reactor sentinel — time to poll for I/O
851 447841x if (op == &task_op_)
852 {
853 bool more_handlers =
854 179025x !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
855
856 307730x if (!more_handlers &&
857 257410x (outstanding_work_.load(std::memory_order_acquire) == 0 ||
858 timeout_us == 0))
859 {
860 5x completed_ops_.push(&task_op_);
861 5x return 0;
862 }
863
864 179020x long task_timeout_us = more_handlers ? 0 : timeout_us;
865 179020x task_interrupted_ = task_timeout_us == 0;
866 179020x task_running_.store(true, std::memory_order_release);
867
868 179020x if (more_handlers)
869 50320x unlock_and_signal_one(lock);
870
871 try
872 {
873 179020x run_task(lock, ctx, task_timeout_us);
874 }
875 catch (...)
876 {
877 task_running_.store(false, std::memory_order_relaxed);
878 throw;
879 }
880
881 179020x task_running_.store(false, std::memory_order_relaxed);
882 179020x completed_ops_.push(&task_op_);
883 179020x if (timeout_us > 0)
884 41x return 0;
885 178979x continue;
886 178979x }
887
888 // Handle operation
889 268816x if (op != nullptr)
890 {
891 268816x bool more = !completed_ops_.empty();
892
893 268816x if (more)
894 268816x ctx->unassisted = !unlock_and_signal_one(lock);
895 else
896 {
897 ctx->unassisted = false;
898 lock.unlock();
899 }
900
901 268816x work_cleanup on_exit{this, &lock, ctx};
902 (void)on_exit;
903
904 268816x (*op)();
905 268816x return 1;
906 268816x }
907
908 // Try private queue before blocking
909 if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
910 continue;
911
912 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
913 timeout_us == 0)
914 return 0;
915
916 clear_signal();
917 if (timeout_us < 0)
918 wait_for_signal(lock);
919 else
920 wait_for_signal_for(lock, timeout_us);
921 178979x }
922 }
923
924 } // namespace boost::corosio::detail
925
926 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
927