TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/capy/ex/execution_context.hpp>
15 :
16 : #include <boost/corosio/native/native_scheduler.hpp>
17 : #include <boost/corosio/detail/scheduler_op.hpp>
18 : #include <boost/corosio/detail/thread_local_ptr.hpp>
19 :
20 : #include <atomic>
21 : #include <chrono>
22 : #include <coroutine>
23 : #include <cstddef>
24 : #include <cstdint>
25 : #include <limits>
26 : #include <memory>
27 : #include <stdexcept>
28 :
29 : #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
30 : #include <boost/corosio/detail/conditionally_enabled_event.hpp>
31 :
32 : namespace boost::corosio::detail {
33 :
34 : // Forward declaration
35 : class reactor_scheduler_base;
36 :
37 : /** Per-thread state for a reactor scheduler.
38 :
39 : Each thread running a scheduler's event loop has one of these
40 : on a thread-local stack. It holds a private work queue and
41 : inline completion budget for speculative I/O fast paths.
42 : */
43 : struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
44 : {
45 : /// Scheduler this context belongs to.
46 : reactor_scheduler_base const* key;
47 :
48 : /// Next context frame on this thread's stack.
49 : reactor_scheduler_context* next;
50 :
51 : /// Private work queue for reduced contention.
52 : op_queue private_queue;
53 :
54 : /// Unflushed work count for the private queue.
55 : std::int64_t private_outstanding_work;
56 :
57 : /// Remaining inline completions allowed this cycle.
58 : int inline_budget;
59 :
60 : /// Maximum inline budget (adaptive, 2-16).
61 : int inline_budget_max;
62 :
63 : /// True if no other thread absorbed queued work last cycle.
64 : bool unassisted;
65 :
66 : /// Construct a context frame linked to @a n.
67 : reactor_scheduler_context(
68 : reactor_scheduler_base const* k,
69 : reactor_scheduler_context* n);
70 : };
71 :
72 : /// Thread-local context stack for reactor schedulers.
73 : inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
74 :
75 : /// Find the context frame for a scheduler on this thread.
76 : inline reactor_scheduler_context*
77 HIT 795818 : reactor_find_context(reactor_scheduler_base const* self) noexcept
78 : {
79 795818 : for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
80 : {
81 792846 : if (c->key == self)
82 792846 : return c;
83 : }
84 2972 : return nullptr;
85 : }
86 :
87 : /// Flush private work count to global counter.
88 : inline void
89 MIS 0 : reactor_flush_private_work(
90 : reactor_scheduler_context* ctx,
91 : std::atomic<std::int64_t>& outstanding_work) noexcept
92 : {
93 0 : if (ctx && ctx->private_outstanding_work > 0)
94 : {
95 0 : outstanding_work.fetch_add(
96 : ctx->private_outstanding_work, std::memory_order_relaxed);
97 0 : ctx->private_outstanding_work = 0;
98 : }
99 0 : }
100 :
101 : /** Drain private queue to global queue, flushing work count first.
102 :
103 : @return True if any ops were drained.
104 : */
105 : inline bool
106 0 : reactor_drain_private_queue(
107 : reactor_scheduler_context* ctx,
108 : std::atomic<std::int64_t>& outstanding_work,
109 : op_queue& completed_ops) noexcept
110 : {
111 0 : if (!ctx || ctx->private_queue.empty())
112 0 : return false;
113 :
114 0 : reactor_flush_private_work(ctx, outstanding_work);
115 0 : completed_ops.splice(ctx->private_queue);
116 0 : return true;
117 : }
118 :
119 : /** Non-template base for reactor-backed scheduler implementations.
120 :
121 : Provides the complete threading model shared by epoll, kqueue,
122 : and select schedulers: signal state machine, inline completion
123 : budget, work counting, run/poll methods, and the do_one event
124 : loop.
125 :
126 : Derived classes provide platform-specific hooks by overriding:
127 : - `run_task(lock, ctx)` to run the reactor poll
128 : - `interrupt_reactor()` to wake a blocked reactor
129 :
130 : De-templated from the original CRTP design to eliminate
131 : duplicate instantiations when multiple backends are compiled
132 : into the same binary. Virtual dispatch for run_task (called
133 : once per reactor cycle, before a blocking syscall) has
134 : negligible overhead.
135 :
136 : @par Thread Safety
137 : All public member functions are thread-safe.
138 : */
139 : class reactor_scheduler_base
140 : : public native_scheduler
141 : , public capy::execution_context::service
142 : {
143 : public:
144 : using key_type = scheduler;
145 : using context_type = reactor_scheduler_context;
146 : using mutex_type = conditionally_enabled_mutex;
147 : using lock_type = mutex_type::scoped_lock;
148 : using event_type = conditionally_enabled_event;
149 :
150 : /// Post a coroutine for deferred execution.
151 : void post(std::coroutine_handle<> h) const override;
152 :
153 : /// Post a scheduler operation for deferred execution.
154 : void post(scheduler_op* h) const override;
155 :
156 : /// Return true if called from a thread running this scheduler.
157 : bool running_in_this_thread() const noexcept override;
158 :
159 : /// Request the scheduler to stop dispatching handlers.
160 : void stop() override;
161 :
162 : /// Return true if the scheduler has been stopped.
163 : bool stopped() const noexcept override;
164 :
165 : /// Reset the stopped state so `run()` can resume.
166 : void restart() override;
167 :
168 : /// Run the event loop until no work remains.
169 : std::size_t run() override;
170 :
171 : /// Run until one handler completes or no work remains.
172 : std::size_t run_one() override;
173 :
174 : /// Run until one handler completes or @a usec elapses.
175 : std::size_t wait_one(long usec) override;
176 :
177 : /// Run ready handlers without blocking.
178 : std::size_t poll() override;
179 :
180 : /// Run at most one ready handler without blocking.
181 : std::size_t poll_one() override;
182 :
183 : /// Increment the outstanding work count.
184 : void work_started() noexcept override;
185 :
186 : /// Decrement the outstanding work count, stopping on zero.
187 : void work_finished() noexcept override;
188 :
189 : /** Reset the thread's inline completion budget.
190 :
191 : Called at the start of each posted completion handler to
192 : grant a fresh budget for speculative inline completions.
193 : */
194 : void reset_inline_budget() const noexcept;
195 :
196 : /** Consume one unit of inline budget if available.
197 :
198 : @return True if budget was available and consumed.
199 : */
200 : bool try_consume_inline_budget() const noexcept;
201 :
202 : /** Offset a forthcoming work_finished from work_cleanup.
203 :
204 : Called by descriptor_state when all I/O returned EAGAIN and
205 : no handler will be executed. Must be called from a scheduler
206 : thread.
207 : */
208 : void compensating_work_started() const noexcept;
209 :
210 : /** Drain work from thread context's private queue to global queue.
211 :
212 : Flushes private work count to the global counter, then
213 : transfers the queue under mutex protection.
214 :
215 : @param queue The private queue to drain.
216 : @param count Private work count to flush before draining.
217 : */
218 : void drain_thread_queue(op_queue& queue, std::int64_t count) const;
219 :
220 : /** Post completed operations for deferred invocation.
221 :
222 : If called from a thread running this scheduler, operations
223 : go to the thread's private queue (fast path). Otherwise,
224 : operations are added to the global queue under mutex and a
225 : waiter is signaled.
226 :
227 : @par Preconditions
228 : work_started() must have been called for each operation.
229 :
230 : @param ops Queue of operations to post.
231 : */
232 : void post_deferred_completions(op_queue& ops) const;
233 :
234 : /** Apply runtime configuration to the scheduler.
235 :
236 : Called by `io_context` after construction. Values that do
237 : not apply to this backend are silently ignored.
238 :
239 : @param max_events Event buffer size for epoll/kqueue.
240 : @param budget_init Starting inline completion budget.
241 : @param budget_max Hard ceiling on adaptive budget ramp-up.
242 : @param unassisted Budget when single-threaded.
243 : */
244 : virtual void configure_reactor(
245 : unsigned max_events,
246 : unsigned budget_init,
247 : unsigned budget_max,
248 : unsigned unassisted);
249 :
250 : /// Return the configured initial inline budget.
251 HIT 461 : unsigned inline_budget_initial() const noexcept
252 : {
253 461 : return inline_budget_initial_;
254 : }
255 :
256 : /** Enable or disable single-threaded (lockless) mode.
257 :
258 : When enabled, all scheduler mutex and condition variable
259 : operations become no-ops. Cross-thread post() is
260 : undefined behavior.
261 : */
262 MIS 0 : void configure_single_threaded(bool v) noexcept
263 : {
264 0 : single_threaded_ = v;
265 0 : mutex_.set_enabled(!v);
266 0 : cond_.set_enabled(!v);
267 0 : }
268 :
269 : protected:
270 HIT 517 : reactor_scheduler_base() = default;
271 :
272 : /** Drain completed_ops during shutdown.
273 :
274 : Pops all operations from the global queue and destroys them,
275 : skipping the task sentinel. Signals all waiting threads.
276 : Derived classes call this from their shutdown() override
277 : before performing platform-specific cleanup.
278 : */
279 : void shutdown_drain();
280 :
281 : /// RAII guard that re-inserts the task sentinel after `run_task`.
282 : struct task_cleanup
283 : {
284 : reactor_scheduler_base const* sched;
285 : lock_type* lock;
286 : context_type* ctx;
287 : ~task_cleanup();
288 : };
289 :
290 : mutable mutex_type mutex_{true};
291 : mutable event_type cond_{true};
292 : mutable op_queue completed_ops_;
293 : mutable std::atomic<std::int64_t> outstanding_work_{0};
294 : std::atomic<bool> stopped_{false};
295 : mutable std::atomic<bool> task_running_{false};
296 : mutable bool task_interrupted_ = false;
297 :
298 : // Runtime-configurable reactor tuning parameters.
299 : // Defaults match the library's built-in values.
300 : unsigned max_events_per_poll_ = 128;
301 : unsigned inline_budget_initial_ = 2;
302 : unsigned inline_budget_max_ = 16;
303 : unsigned unassisted_budget_ = 4;
304 :
305 : /// Bit 0 of `state_`: set when the condvar should be signaled.
306 : static constexpr std::size_t signaled_bit = 1;
307 :
308 : /// Increment per waiting thread in `state_`.
309 : static constexpr std::size_t waiter_increment = 2;
310 : mutable std::size_t state_ = 0;
311 :
312 : /// Sentinel op that triggers a reactor poll when dequeued.
313 : struct task_op final : scheduler_op
314 : {
315 MIS 0 : void operator()() override {}
316 0 : void destroy() override {}
317 : };
318 : task_op task_op_;
319 :
320 : /// Run the platform-specific reactor poll.
321 : virtual void
322 : run_task(lock_type& lock, context_type* ctx,
323 : long timeout_us) = 0;
324 :
325 : /// Wake a blocked reactor (e.g. write to eventfd or pipe).
326 : virtual void interrupt_reactor() const = 0;
327 :
328 : private:
329 : struct work_cleanup
330 : {
331 : reactor_scheduler_base* sched;
332 : lock_type* lock;
333 : context_type* ctx;
334 : ~work_cleanup();
335 : };
336 :
337 : std::size_t do_one(
338 : lock_type& lock, long timeout_us, context_type* ctx);
339 :
340 : void signal_all(lock_type& lock) const;
341 : bool maybe_unlock_and_signal_one(lock_type& lock) const;
342 : bool unlock_and_signal_one(lock_type& lock) const;
343 : void clear_signal() const;
344 : void wait_for_signal(lock_type& lock) const;
345 : void wait_for_signal_for(
346 : lock_type& lock, long timeout_us) const;
347 : void wake_one_thread_and_unlock(lock_type& lock) const;
348 : };
349 :
350 : /** RAII guard that pushes/pops a scheduler context frame.
351 :
352 : On construction, pushes a new context frame onto the
353 : thread-local stack. On destruction, drains any remaining
354 : private queue items to the global queue and pops the frame.
355 : */
356 : struct reactor_thread_context_guard
357 : {
358 : /// The context frame managed by this guard.
359 : reactor_scheduler_context frame_;
360 :
361 : /// Construct the guard, pushing a frame for @a sched.
362 HIT 461 : explicit reactor_thread_context_guard(
363 : reactor_scheduler_base const* sched) noexcept
364 461 : : frame_(sched, reactor_context_stack.get())
365 : {
366 461 : reactor_context_stack.set(&frame_);
367 461 : }
368 :
369 : /// Destroy the guard, draining private work and popping the frame.
370 461 : ~reactor_thread_context_guard() noexcept
371 : {
372 461 : if (!frame_.private_queue.empty())
373 MIS 0 : frame_.key->drain_thread_queue(
374 0 : frame_.private_queue, frame_.private_outstanding_work);
375 HIT 461 : reactor_context_stack.set(frame_.next);
376 461 : }
377 : };
378 :
379 : // ---- Inline implementations ------------------------------------------------
380 :
381 : inline
382 461 : reactor_scheduler_context::reactor_scheduler_context(
383 : reactor_scheduler_base const* k,
384 461 : reactor_scheduler_context* n)
385 461 : : key(k)
386 461 : , next(n)
387 461 : , private_outstanding_work(0)
388 461 : , inline_budget(0)
389 461 : , inline_budget_max(
390 461 : static_cast<int>(k->inline_budget_initial()))
391 461 : , unassisted(false)
392 : {
393 461 : }
394 :
395 : inline void
396 MIS 0 : reactor_scheduler_base::configure_reactor(
397 : unsigned max_events,
398 : unsigned budget_init,
399 : unsigned budget_max,
400 : unsigned unassisted)
401 : {
402 0 : if (max_events < 1 ||
403 0 : max_events > static_cast<unsigned>(std::numeric_limits<int>::max()))
404 : throw std::out_of_range(
405 0 : "max_events_per_poll must be in [1, INT_MAX]");
406 0 : if (budget_max < 1 ||
407 0 : budget_max > static_cast<unsigned>(std::numeric_limits<int>::max()))
408 : throw std::out_of_range(
409 0 : "inline_budget_max must be in [1, INT_MAX]");
410 :
411 : // Clamp initial and unassisted to budget_max.
412 0 : if (budget_init > budget_max)
413 0 : budget_init = budget_max;
414 0 : if (unassisted > budget_max)
415 0 : unassisted = budget_max;
416 :
417 0 : max_events_per_poll_ = max_events;
418 0 : inline_budget_initial_ = budget_init;
419 0 : inline_budget_max_ = budget_max;
420 0 : unassisted_budget_ = unassisted;
421 0 : }
422 :
423 : inline void
424 HIT 103892 : reactor_scheduler_base::reset_inline_budget() const noexcept
425 : {
426 103892 : if (auto* ctx = reactor_find_context(this))
427 : {
428 : // Cap when no other thread absorbed queued work
429 103892 : if (ctx->unassisted)
430 : {
431 103892 : ctx->inline_budget_max =
432 103892 : static_cast<int>(unassisted_budget_);
433 103892 : ctx->inline_budget =
434 103892 : static_cast<int>(unassisted_budget_);
435 103892 : return;
436 : }
437 : // Ramp up when previous cycle fully consumed budget
438 MIS 0 : if (ctx->inline_budget == 0)
439 0 : ctx->inline_budget_max = (std::min)(
440 0 : ctx->inline_budget_max * 2,
441 0 : static_cast<int>(inline_budget_max_));
442 0 : else if (ctx->inline_budget < ctx->inline_budget_max)
443 0 : ctx->inline_budget_max =
444 0 : static_cast<int>(inline_budget_initial_);
445 0 : ctx->inline_budget = ctx->inline_budget_max;
446 : }
447 : }
448 :
449 : inline bool
450 HIT 437844 : reactor_scheduler_base::try_consume_inline_budget() const noexcept
451 : {
452 437844 : if (auto* ctx = reactor_find_context(this))
453 : {
454 437844 : if (ctx->inline_budget > 0)
455 : {
456 350304 : --ctx->inline_budget;
457 350304 : return true;
458 : }
459 : }
460 87540 : return false;
461 : }
462 :
463 : inline void
464 2117 : reactor_scheduler_base::post(std::coroutine_handle<> h) const
465 : {
466 : struct post_handler final : scheduler_op
467 : {
468 : std::coroutine_handle<> h_;
469 :
470 2117 : explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
471 4234 : ~post_handler() override = default;
472 :
473 2108 : void operator()() override
474 : {
475 2108 : auto saved = h_;
476 2108 : delete this;
477 : // Ensure stores from the posting thread are visible
478 : std::atomic_thread_fence(std::memory_order_acquire);
479 2108 : saved.resume();
480 2108 : }
481 :
482 9 : void destroy() override
483 : {
484 9 : auto saved = h_;
485 9 : delete this;
486 9 : saved.destroy();
487 9 : }
488 : };
489 :
490 2117 : auto ph = std::make_unique<post_handler>(h);
491 :
492 2117 : if (auto* ctx = reactor_find_context(this))
493 : {
494 6 : ++ctx->private_outstanding_work;
495 6 : ctx->private_queue.push(ph.release());
496 6 : return;
497 : }
498 :
499 2111 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
500 :
501 2111 : lock_type lock(mutex_);
502 2111 : completed_ops_.push(ph.release());
503 2111 : wake_one_thread_and_unlock(lock);
504 2117 : }
505 :
506 : inline void
507 105187 : reactor_scheduler_base::post(scheduler_op* h) const
508 : {
509 105187 : if (auto* ctx = reactor_find_context(this))
510 : {
511 105015 : ++ctx->private_outstanding_work;
512 105015 : ctx->private_queue.push(h);
513 105015 : return;
514 : }
515 :
516 172 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
517 :
518 172 : lock_type lock(mutex_);
519 172 : completed_ops_.push(h);
520 172 : wake_one_thread_and_unlock(lock);
521 172 : }
522 :
523 : inline bool
524 1286 : reactor_scheduler_base::running_in_this_thread() const noexcept
525 : {
526 1286 : return reactor_find_context(this) != nullptr;
527 : }
528 :
529 : inline void
530 414 : reactor_scheduler_base::stop()
531 : {
532 414 : lock_type lock(mutex_);
533 414 : if (!stopped_.load(std::memory_order_acquire))
534 : {
535 376 : stopped_.store(true, std::memory_order_release);
536 376 : signal_all(lock);
537 376 : interrupt_reactor();
538 : }
539 414 : }
540 :
541 : inline bool
542 62 : reactor_scheduler_base::stopped() const noexcept
543 : {
544 62 : return stopped_.load(std::memory_order_acquire);
545 : }
546 :
547 : inline void
548 91 : reactor_scheduler_base::restart()
549 : {
550 91 : stopped_.store(false, std::memory_order_release);
551 91 : }
552 :
553 : inline std::size_t
554 388 : reactor_scheduler_base::run()
555 : {
556 776 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
557 : {
558 28 : stop();
559 28 : return 0;
560 : }
561 :
562 360 : reactor_thread_context_guard ctx(this);
563 360 : lock_type lock(mutex_);
564 :
565 360 : std::size_t n = 0;
566 : for (;;)
567 : {
568 269115 : if (!do_one(lock, -1, &ctx.frame_))
569 360 : break;
570 268755 : if (n != (std::numeric_limits<std::size_t>::max)())
571 268755 : ++n;
572 268755 : if (!lock.owns_lock())
573 172171 : lock.lock();
574 : }
575 360 : return n;
576 360 : }
577 :
578 : inline std::size_t
579 2 : reactor_scheduler_base::run_one()
580 : {
581 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
582 : {
583 MIS 0 : stop();
584 0 : return 0;
585 : }
586 :
587 HIT 2 : reactor_thread_context_guard ctx(this);
588 2 : lock_type lock(mutex_);
589 2 : return do_one(lock, -1, &ctx.frame_);
590 2 : }
591 :
592 : inline std::size_t
593 102 : reactor_scheduler_base::wait_one(long usec)
594 : {
595 204 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
596 : {
597 10 : stop();
598 10 : return 0;
599 : }
600 :
601 92 : reactor_thread_context_guard ctx(this);
602 92 : lock_type lock(mutex_);
603 92 : return do_one(lock, usec, &ctx.frame_);
604 92 : }
605 :
606 : inline std::size_t
607 6 : reactor_scheduler_base::poll()
608 : {
609 12 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
610 : {
611 1 : stop();
612 1 : return 0;
613 : }
614 :
615 5 : reactor_thread_context_guard ctx(this);
616 5 : lock_type lock(mutex_);
617 :
618 5 : std::size_t n = 0;
619 : for (;;)
620 : {
621 11 : if (!do_one(lock, 0, &ctx.frame_))
622 5 : break;
623 6 : if (n != (std::numeric_limits<std::size_t>::max)())
624 6 : ++n;
625 6 : if (!lock.owns_lock())
626 6 : lock.lock();
627 : }
628 5 : return n;
629 5 : }
630 :
631 : inline std::size_t
632 4 : reactor_scheduler_base::poll_one()
633 : {
634 8 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
635 : {
636 2 : stop();
637 2 : return 0;
638 : }
639 :
640 2 : reactor_thread_context_guard ctx(this);
641 2 : lock_type lock(mutex_);
642 2 : return do_one(lock, 0, &ctx.frame_);
643 2 : }
644 :
645 : inline void
646 26267 : reactor_scheduler_base::work_started() noexcept
647 : {
648 26267 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
649 26267 : }
650 :
651 : inline void
652 36948 : reactor_scheduler_base::work_finished() noexcept
653 : {
654 73896 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
655 368 : stop();
656 36948 : }
657 :
658 : inline void
659 145492 : reactor_scheduler_base::compensating_work_started() const noexcept
660 : {
661 145492 : auto* ctx = reactor_find_context(this);
662 145492 : if (ctx)
663 145492 : ++ctx->private_outstanding_work;
664 145492 : }
665 :
666 : inline void
667 MIS 0 : reactor_scheduler_base::drain_thread_queue(
668 : op_queue& queue, std::int64_t count) const
669 : {
670 0 : if (count > 0)
671 0 : outstanding_work_.fetch_add(count, std::memory_order_relaxed);
672 :
673 0 : lock_type lock(mutex_);
674 0 : completed_ops_.splice(queue);
675 0 : if (count > 0)
676 0 : maybe_unlock_and_signal_one(lock);
677 0 : }
678 :
679 : inline void
680 HIT 16037 : reactor_scheduler_base::post_deferred_completions(op_queue& ops) const
681 : {
682 16037 : if (ops.empty())
683 16037 : return;
684 :
685 MIS 0 : if (auto* ctx = reactor_find_context(this))
686 : {
687 0 : ctx->private_queue.splice(ops);
688 0 : return;
689 : }
690 :
691 0 : lock_type lock(mutex_);
692 0 : completed_ops_.splice(ops);
693 0 : wake_one_thread_and_unlock(lock);
694 0 : }
695 :
696 : inline void
697 HIT 517 : reactor_scheduler_base::shutdown_drain()
698 : {
699 517 : lock_type lock(mutex_);
700 :
701 1121 : while (auto* h = completed_ops_.pop())
702 : {
703 604 : if (h == &task_op_)
704 517 : continue;
705 87 : lock.unlock();
706 87 : h->destroy();
707 87 : lock.lock();
708 604 : }
709 :
710 517 : signal_all(lock);
711 517 : }
712 :
713 : inline void
714 893 : reactor_scheduler_base::signal_all(lock_type&) const
715 : {
716 893 : state_ |= signaled_bit;
717 893 : cond_.notify_all();
718 893 : }
719 :
720 : inline bool
721 2283 : reactor_scheduler_base::maybe_unlock_and_signal_one(
722 : lock_type& lock) const
723 : {
724 2283 : state_ |= signaled_bit;
725 2283 : if (state_ > signaled_bit)
726 : {
727 MIS 0 : lock.unlock();
728 0 : cond_.notify_one();
729 0 : return true;
730 : }
731 HIT 2283 : return false;
732 : }
733 :
734 : inline bool
735 319136 : reactor_scheduler_base::unlock_and_signal_one(
736 : lock_type& lock) const
737 : {
738 319136 : state_ |= signaled_bit;
739 319136 : bool have_waiters = state_ > signaled_bit;
740 319136 : lock.unlock();
741 319136 : if (have_waiters)
742 MIS 0 : cond_.notify_one();
743 HIT 319136 : return have_waiters;
744 : }
745 :
746 : inline void
747 MIS 0 : reactor_scheduler_base::clear_signal() const
748 : {
749 0 : state_ &= ~signaled_bit;
750 0 : }
751 :
752 : inline void
753 0 : reactor_scheduler_base::wait_for_signal(
754 : lock_type& lock) const
755 : {
756 0 : while ((state_ & signaled_bit) == 0)
757 : {
758 0 : state_ += waiter_increment;
759 0 : cond_.wait(lock);
760 0 : state_ -= waiter_increment;
761 : }
762 0 : }
763 :
764 : inline void
765 0 : reactor_scheduler_base::wait_for_signal_for(
766 : lock_type& lock, long timeout_us) const
767 : {
768 0 : if ((state_ & signaled_bit) == 0)
769 : {
770 0 : state_ += waiter_increment;
771 0 : cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
772 0 : state_ -= waiter_increment;
773 : }
774 0 : }
775 :
776 : inline void
777 HIT 2283 : reactor_scheduler_base::wake_one_thread_and_unlock(
778 : lock_type& lock) const
779 : {
780 2283 : if (maybe_unlock_and_signal_one(lock))
781 MIS 0 : return;
782 :
783 HIT 2283 : if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
784 : {
785 58 : task_interrupted_ = true;
786 58 : lock.unlock();
787 58 : interrupt_reactor();
788 : }
789 : else
790 : {
791 2225 : lock.unlock();
792 : }
793 : }
794 :
795 268816 : inline reactor_scheduler_base::work_cleanup::~work_cleanup()
796 : {
797 268816 : if (ctx)
798 : {
799 268816 : std::int64_t produced = ctx->private_outstanding_work;
800 268816 : if (produced > 1)
801 15 : sched->outstanding_work_.fetch_add(
802 : produced - 1, std::memory_order_relaxed);
803 268801 : else if (produced < 1)
804 26718 : sched->work_finished();
805 268816 : ctx->private_outstanding_work = 0;
806 :
807 268816 : if (!ctx->private_queue.empty())
808 : {
809 96606 : lock->lock();
810 96606 : sched->completed_ops_.splice(ctx->private_queue);
811 : }
812 : }
813 : else
814 : {
815 MIS 0 : sched->work_finished();
816 : }
817 HIT 268816 : }
818 :
819 358040 : inline reactor_scheduler_base::task_cleanup::~task_cleanup()
820 : {
821 179020 : if (!ctx)
822 MIS 0 : return;
823 :
824 HIT 179020 : if (ctx->private_outstanding_work > 0)
825 : {
826 8390 : sched->outstanding_work_.fetch_add(
827 8390 : ctx->private_outstanding_work, std::memory_order_relaxed);
828 8390 : ctx->private_outstanding_work = 0;
829 : }
830 :
831 179020 : if (!ctx->private_queue.empty())
832 : {
833 8390 : if (!lock->owns_lock())
834 MIS 0 : lock->lock();
835 HIT 8390 : sched->completed_ops_.splice(ctx->private_queue);
836 : }
837 179020 : }
838 :
839 : inline std::size_t
840 269222 : reactor_scheduler_base::do_one(
841 : lock_type& lock, long timeout_us, context_type* ctx)
842 : {
843 : for (;;)
844 : {
845 448201 : if (stopped_.load(std::memory_order_acquire))
846 360 : return 0;
847 :
848 447841 : scheduler_op* op = completed_ops_.pop();
849 :
850 : // Handle reactor sentinel — time to poll for I/O
851 447841 : if (op == &task_op_)
852 : {
853 : bool more_handlers =
854 179025 : !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
855 :
856 307730 : if (!more_handlers &&
857 257410 : (outstanding_work_.load(std::memory_order_acquire) == 0 ||
858 : timeout_us == 0))
859 : {
860 5 : completed_ops_.push(&task_op_);
861 5 : return 0;
862 : }
863 :
864 179020 : long task_timeout_us = more_handlers ? 0 : timeout_us;
865 179020 : task_interrupted_ = task_timeout_us == 0;
866 179020 : task_running_.store(true, std::memory_order_release);
867 :
868 179020 : if (more_handlers)
869 50320 : unlock_and_signal_one(lock);
870 :
871 : try
872 : {
873 179020 : run_task(lock, ctx, task_timeout_us);
874 : }
875 MIS 0 : catch (...)
876 : {
877 0 : task_running_.store(false, std::memory_order_relaxed);
878 0 : throw;
879 0 : }
880 :
881 HIT 179020 : task_running_.store(false, std::memory_order_relaxed);
882 179020 : completed_ops_.push(&task_op_);
883 179020 : if (timeout_us > 0)
884 41 : return 0;
885 178979 : continue;
886 178979 : }
887 :
888 : // Handle operation
889 268816 : if (op != nullptr)
890 : {
891 268816 : bool more = !completed_ops_.empty();
892 :
893 268816 : if (more)
894 268816 : ctx->unassisted = !unlock_and_signal_one(lock);
895 : else
896 : {
897 MIS 0 : ctx->unassisted = false;
898 0 : lock.unlock();
899 : }
900 :
901 HIT 268816 : work_cleanup on_exit{this, &lock, ctx};
902 : (void)on_exit;
903 :
904 268816 : (*op)();
905 268816 : return 1;
906 268816 : }
907 :
908 : // Try private queue before blocking
909 MIS 0 : if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
910 0 : continue;
911 :
912 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
913 : timeout_us == 0)
914 0 : return 0;
915 :
916 0 : clear_signal();
917 0 : if (timeout_us < 0)
918 0 : wait_for_signal(lock);
919 : else
920 0 : wait_for_signal_for(lock, timeout_us);
921 HIT 178979 : }
922 : }
923 :
924 : } // namespace boost::corosio::detail
925 :
926 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
|