TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 :
20 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21 :
22 : #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23 : #include <boost/corosio/detail/timer_service.hpp>
24 : #include <boost/corosio/native/detail/make_err.hpp>
25 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27 : #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28 : #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29 :
30 : #include <boost/corosio/detail/except.hpp>
31 :
32 : #include <atomic>
33 : #include <chrono>
34 : #include <cstdint>
35 : #include <mutex>
36 : #include <vector>
37 :
38 : #include <errno.h>
39 : #include <sys/epoll.h>
40 : #include <sys/eventfd.h>
41 : #include <sys/timerfd.h>
42 : #include <unistd.h>
43 :
44 : namespace boost::corosio::detail {
45 :
46 : struct epoll_op;
47 : struct descriptor_state;
48 :
49 : /** Linux scheduler using epoll for I/O multiplexing.
50 :
51 : This scheduler implements the scheduler interface using Linux epoll
52 : for efficient I/O event notification. It uses a single reactor model
53 : where one thread runs epoll_wait while other threads
54 : wait on a condition variable for handler work. This design provides:
55 :
56 : - Handler parallelism: N posted handlers can execute on N threads
57 : - No thundering herd: condition_variable wakes exactly one thread
58 : - IOCP parity: Behavior matches Windows I/O completion port semantics
59 :
60 : When threads call run(), they first try to execute queued handlers.
61 : If the queue is empty and no reactor is running, one thread becomes
62 : the reactor and runs epoll_wait. Other threads wait on a condition
63 : variable until handlers are available.
64 :
65 : @par Thread Safety
66 : All public member functions are thread-safe.
67 : */
68 : class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
69 : {
70 : public:
71 : /** Construct the scheduler.
72 :
73 : Creates an epoll instance, eventfd for reactor interruption,
74 : and timerfd for kernel-managed timer expiry.
75 :
76 : @param ctx Reference to the owning execution_context.
77 : @param concurrency_hint Hint for expected thread count (unused).
78 : */
79 : epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
80 :
81 : /// Destroy the scheduler.
82 : ~epoll_scheduler() override;
83 :
84 : epoll_scheduler(epoll_scheduler const&) = delete;
85 : epoll_scheduler& operator=(epoll_scheduler const&) = delete;
86 :
87 : /// Shut down the scheduler, draining pending operations.
88 : void shutdown() override;
89 :
90 : /// Apply runtime configuration, resizing the event buffer.
91 : void configure_reactor(
92 : unsigned max_events,
93 : unsigned budget_init,
94 : unsigned budget_max,
95 : unsigned unassisted) override;
96 :
97 : /** Return the epoll file descriptor.
98 :
99 : Used by socket services to register file descriptors
100 : for I/O event notification.
101 :
102 : @return The epoll file descriptor.
103 : */
104 : int epoll_fd() const noexcept
105 : {
106 : return epoll_fd_;
107 : }
108 :
109 : /** Register a descriptor for persistent monitoring.
110 :
111 : The fd is registered once and stays registered until explicitly
112 : deregistered. Events are dispatched via descriptor_state which
113 : tracks pending read/write/connect operations.
114 :
115 : @param fd The file descriptor to register.
116 : @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
117 : */
118 : void register_descriptor(int fd, descriptor_state* desc) const;
119 :
120 : /** Deregister a persistently registered descriptor.
121 :
122 : @param fd The file descriptor to deregister.
123 : */
124 : void deregister_descriptor(int fd) const;
125 :
126 : private:
127 : void
128 : run_task(lock_type& lock, context_type* ctx,
129 : long timeout_us) override;
130 : void interrupt_reactor() const override;
131 : void update_timerfd() const;
132 :
133 : int epoll_fd_;
134 : int event_fd_;
135 : int timer_fd_;
136 :
137 : // Edge-triggered eventfd state
138 : mutable std::atomic<bool> eventfd_armed_{false};
139 :
140 : // Set when the earliest timer changes; flushed before epoll_wait
141 : mutable std::atomic<bool> timerfd_stale_{false};
142 :
143 : // Event buffer sized from max_events_per_poll_ (set at construction,
144 : // resized by configure_reactor via io_context_options).
145 : std::vector<epoll_event> event_buffer_;
146 : };
147 :
148 HIT 322 : inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
149 322 : : epoll_fd_(-1)
150 322 : , event_fd_(-1)
151 322 : , timer_fd_(-1)
152 644 : , event_buffer_(max_events_per_poll_)
153 : {
154 322 : epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
155 322 : if (epoll_fd_ < 0)
156 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_create1");
157 :
158 HIT 322 : event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
159 322 : if (event_fd_ < 0)
160 : {
161 MIS 0 : int errn = errno;
162 0 : ::close(epoll_fd_);
163 0 : detail::throw_system_error(make_err(errn), "eventfd");
164 : }
165 :
166 HIT 322 : timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
167 322 : if (timer_fd_ < 0)
168 : {
169 MIS 0 : int errn = errno;
170 0 : ::close(event_fd_);
171 0 : ::close(epoll_fd_);
172 0 : detail::throw_system_error(make_err(errn), "timerfd_create");
173 : }
174 :
175 HIT 322 : epoll_event ev{};
176 322 : ev.events = EPOLLIN | EPOLLET;
177 322 : ev.data.ptr = nullptr;
178 322 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
179 : {
180 MIS 0 : int errn = errno;
181 0 : ::close(timer_fd_);
182 0 : ::close(event_fd_);
183 0 : ::close(epoll_fd_);
184 0 : detail::throw_system_error(make_err(errn), "epoll_ctl");
185 : }
186 :
187 HIT 322 : epoll_event timer_ev{};
188 322 : timer_ev.events = EPOLLIN | EPOLLERR;
189 322 : timer_ev.data.ptr = &timer_fd_;
190 322 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
191 : {
192 MIS 0 : int errn = errno;
193 0 : ::close(timer_fd_);
194 0 : ::close(event_fd_);
195 0 : ::close(epoll_fd_);
196 0 : detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
197 : }
198 :
199 HIT 322 : timer_svc_ = &get_timer_service(ctx, *this);
200 322 : timer_svc_->set_on_earliest_changed(
201 4927 : timer_service::callback(this, [](void* p) {
202 4605 : auto* self = static_cast<epoll_scheduler*>(p);
203 4605 : self->timerfd_stale_.store(true, std::memory_order_release);
204 4605 : self->interrupt_reactor();
205 4605 : }));
206 :
207 322 : get_resolver_service(ctx, *this);
208 322 : get_signal_service(ctx, *this);
209 322 : get_stream_file_service(ctx, *this);
210 322 : get_random_access_file_service(ctx, *this);
211 :
212 322 : completed_ops_.push(&task_op_);
213 322 : }
214 :
215 644 : inline epoll_scheduler::~epoll_scheduler()
216 : {
217 322 : if (timer_fd_ >= 0)
218 322 : ::close(timer_fd_);
219 322 : if (event_fd_ >= 0)
220 322 : ::close(event_fd_);
221 322 : if (epoll_fd_ >= 0)
222 322 : ::close(epoll_fd_);
223 644 : }
224 :
225 : inline void
226 322 : epoll_scheduler::shutdown()
227 : {
228 322 : shutdown_drain();
229 :
230 322 : if (event_fd_ >= 0)
231 322 : interrupt_reactor();
232 322 : }
233 :
234 : inline void
235 MIS 0 : epoll_scheduler::configure_reactor(
236 : unsigned max_events,
237 : unsigned budget_init,
238 : unsigned budget_max,
239 : unsigned unassisted)
240 : {
241 0 : reactor_scheduler_base::configure_reactor(
242 : max_events, budget_init, budget_max, unassisted);
243 0 : event_buffer_.resize(max_events_per_poll_);
244 0 : }
245 :
246 : inline void
247 HIT 8877 : epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
248 : {
249 8877 : epoll_event ev{};
250 8877 : ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
251 8877 : ev.data.ptr = desc;
252 :
253 8877 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
254 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
255 :
256 HIT 8877 : desc->registered_events = ev.events;
257 8877 : desc->fd = fd;
258 8877 : desc->scheduler_ = this;
259 8877 : desc->mutex.set_enabled(!single_threaded_);
260 8877 : desc->ready_events_.store(0, std::memory_order_relaxed);
261 :
262 8877 : conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
263 8877 : desc->impl_ref_.reset();
264 8877 : desc->read_ready = false;
265 8877 : desc->write_ready = false;
266 8877 : }
267 :
268 : inline void
269 8877 : epoll_scheduler::deregister_descriptor(int fd) const
270 : {
271 8877 : ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
272 8877 : }
273 :
274 : inline void
275 5218 : epoll_scheduler::interrupt_reactor() const
276 : {
277 5218 : bool expected = false;
278 5218 : if (eventfd_armed_.compare_exchange_strong(
279 : expected, true, std::memory_order_release,
280 : std::memory_order_relaxed))
281 : {
282 5007 : std::uint64_t val = 1;
283 5007 : [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
284 : }
285 5218 : }
286 :
287 : inline void
288 9170 : epoll_scheduler::update_timerfd() const
289 : {
290 9170 : auto nearest = timer_svc_->nearest_expiry();
291 :
292 9170 : itimerspec ts{};
293 9170 : int flags = 0;
294 :
295 9170 : if (nearest == timer_service::time_point::max())
296 : {
297 : // No timers — disarm by setting to 0 (relative)
298 : }
299 : else
300 : {
301 9117 : auto now = std::chrono::steady_clock::now();
302 9117 : if (nearest <= now)
303 : {
304 : // Use 1ns instead of 0 — zero disarms the timerfd
305 360 : ts.it_value.tv_nsec = 1;
306 : }
307 : else
308 : {
309 8757 : auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
310 8757 : nearest - now)
311 8757 : .count();
312 8757 : ts.it_value.tv_sec = nsec / 1000000000;
313 8757 : ts.it_value.tv_nsec = nsec % 1000000000;
314 8757 : if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
315 MIS 0 : ts.it_value.tv_nsec = 1;
316 : }
317 : }
318 :
319 HIT 9170 : if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
320 MIS 0 : detail::throw_system_error(make_err(errno), "timerfd_settime");
321 HIT 9170 : }
322 :
323 : inline void
324 39755 : epoll_scheduler::run_task(
325 : lock_type& lock, context_type* ctx, long timeout_us)
326 : {
327 : int timeout_ms;
328 39755 : if (task_interrupted_)
329 26815 : timeout_ms = 0;
330 12940 : else if (timeout_us < 0)
331 12932 : timeout_ms = -1;
332 : else
333 8 : timeout_ms = static_cast<int>((timeout_us + 999) / 1000);
334 :
335 39755 : if (lock.owns_lock())
336 12940 : lock.unlock();
337 :
338 39755 : task_cleanup on_exit{this, &lock, ctx};
339 :
340 : // Flush deferred timerfd programming before blocking
341 39755 : if (timerfd_stale_.exchange(false, std::memory_order_acquire))
342 4583 : update_timerfd();
343 :
344 39755 : int nfds = ::epoll_wait(
345 : epoll_fd_, event_buffer_.data(),
346 39755 : static_cast<int>(event_buffer_.size()), timeout_ms);
347 :
348 39755 : if (nfds < 0 && errno != EINTR)
349 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_wait");
350 :
351 HIT 39755 : bool check_timers = false;
352 39755 : op_queue local_ops;
353 :
354 90001 : for (int i = 0; i < nfds; ++i)
355 : {
356 50246 : if (event_buffer_[i].data.ptr == nullptr)
357 : {
358 : std::uint64_t val;
359 : // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
360 4685 : [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
361 4685 : eventfd_armed_.store(false, std::memory_order_relaxed);
362 4685 : continue;
363 4685 : }
364 :
365 45561 : if (event_buffer_[i].data.ptr == &timer_fd_)
366 : {
367 : std::uint64_t expirations;
368 : // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
369 : [[maybe_unused]] auto r =
370 4587 : ::read(timer_fd_, &expirations, sizeof(expirations));
371 4587 : check_timers = true;
372 4587 : continue;
373 4587 : }
374 :
375 : auto* desc =
376 40974 : static_cast<descriptor_state*>(event_buffer_[i].data.ptr);
377 40974 : desc->add_ready_events(event_buffer_[i].events);
378 :
379 40974 : bool expected = false;
380 40974 : if (desc->is_enqueued_.compare_exchange_strong(
381 : expected, true, std::memory_order_release,
382 : std::memory_order_relaxed))
383 : {
384 40974 : local_ops.push(desc);
385 : }
386 : }
387 :
388 39755 : if (check_timers)
389 : {
390 4587 : timer_svc_->process_expired();
391 4587 : update_timerfd();
392 : }
393 :
394 39755 : lock.lock();
395 :
396 39755 : if (!local_ops.empty())
397 26284 : completed_ops_.splice(local_ops);
398 39755 : }
399 :
400 : } // namespace boost::corosio::detail
401 :
402 : #endif // BOOST_COROSIO_HAS_EPOLL
403 :
404 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
|