TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 :
20 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21 :
22 : #include <boost/corosio/native/detail/select/select_op.hpp>
23 : #include <boost/corosio/detail/timer_service.hpp>
24 : #include <boost/corosio/native/detail/make_err.hpp>
25 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27 : #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28 : #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29 :
30 : #include <boost/corosio/detail/except.hpp>
31 :
32 : #include <sys/select.h>
33 : #include <unistd.h>
34 : #include <errno.h>
35 : #include <fcntl.h>
36 :
37 : #include <atomic>
38 : #include <chrono>
39 : #include <cstdint>
40 : #include <limits>
41 : #include <mutex>
42 : #include <unordered_map>
43 :
44 : namespace boost::corosio::detail {
45 :
46 : struct select_op;
47 : struct select_descriptor_state;
48 :
49 : /** POSIX scheduler using select() for I/O multiplexing.
50 :
51 : This scheduler implements the scheduler interface using the POSIX select()
52 : call for I/O event notification. It inherits the shared reactor threading
53 : model from reactor_scheduler_base: signal state machine, inline completion
54 : budget, work counting, and the do_one event loop.
55 :
56 : The design mirrors epoll_scheduler for behavioral consistency:
57 : - Same single-reactor thread coordination model
58 : - Same deferred I/O pattern (reactor marks ready; workers do I/O)
59 : - Same timer integration pattern
60 :
61 : Known Limitations:
62 : - FD_SETSIZE (~1024) limits maximum concurrent connections
63 : - O(n) scanning: rebuilds fd_sets each iteration
64 : - Level-triggered only (no edge-triggered mode)
65 :
66 : @par Thread Safety
67 : All public member functions are thread-safe.
68 : */
69 : class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler_base
70 : {
71 : public:
72 : /** Construct the scheduler.
73 :
74 : Creates a self-pipe for reactor interruption.
75 :
76 : @param ctx Reference to the owning execution_context.
77 : @param concurrency_hint Hint for expected thread count (unused).
78 : */
79 : select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
80 :
81 : /// Destroy the scheduler.
82 : ~select_scheduler() override;
83 :
84 : select_scheduler(select_scheduler const&) = delete;
85 : select_scheduler& operator=(select_scheduler const&) = delete;
86 :
87 : /// Shut down the scheduler, draining pending operations.
88 : void shutdown() override;
89 :
90 : /** Return the maximum file descriptor value supported.
91 :
92 : Returns FD_SETSIZE - 1, the maximum fd value that can be
93 : monitored by select(). Operations with fd >= FD_SETSIZE
94 : will fail with EINVAL.
95 :
96 : @return The maximum supported file descriptor value.
97 : */
98 : static constexpr int max_fd() noexcept
99 : {
100 : return FD_SETSIZE - 1;
101 : }
102 :
103 : /** Register a descriptor for persistent monitoring.
104 :
105 : The fd is added to the registered_descs_ map and will be
106 : included in subsequent select() calls. The reactor is
107 : interrupted so a blocked select() rebuilds its fd_sets.
108 :
109 : @param fd The file descriptor to register.
110 : @param desc Pointer to descriptor state for this fd.
111 : */
112 : void register_descriptor(int fd, select_descriptor_state* desc) const;
113 :
114 : /** Deregister a persistently registered descriptor.
115 :
116 : @param fd The file descriptor to deregister.
117 : */
118 : void deregister_descriptor(int fd) const;
119 :
120 : /** Interrupt the reactor so it rebuilds its fd_sets.
121 :
122 : Called when a write or connect op is registered after
123 : the reactor's snapshot was taken. Without this, select()
124 : may block not watching for writability on the fd.
125 : */
126 : void notify_reactor() const;
127 :
128 : private:
129 : void
130 : run_task(lock_type& lock, context_type* ctx,
131 : long timeout_us) override;
132 : void interrupt_reactor() const override;
133 : long calculate_timeout(long requested_timeout_us) const;
134 :
135 : // Self-pipe for interrupting select()
136 : int pipe_fds_[2]; // [0]=read, [1]=write
137 :
138 : // Per-fd tracking for fd_set building
139 : mutable std::unordered_map<int, select_descriptor_state*> registered_descs_;
140 : mutable int max_fd_ = -1;
141 : };
142 :
143 HIT 195 : inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
144 195 : : pipe_fds_{-1, -1}
145 195 : , max_fd_(-1)
146 : {
147 195 : if (::pipe(pipe_fds_) < 0)
148 MIS 0 : detail::throw_system_error(make_err(errno), "pipe");
149 :
150 HIT 585 : for (int i = 0; i < 2; ++i)
151 : {
152 390 : int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
153 390 : if (flags == -1)
154 : {
155 MIS 0 : int errn = errno;
156 0 : ::close(pipe_fds_[0]);
157 0 : ::close(pipe_fds_[1]);
158 0 : detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
159 : }
160 HIT 390 : if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
161 : {
162 MIS 0 : int errn = errno;
163 0 : ::close(pipe_fds_[0]);
164 0 : ::close(pipe_fds_[1]);
165 0 : detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
166 : }
167 HIT 390 : if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
168 : {
169 MIS 0 : int errn = errno;
170 0 : ::close(pipe_fds_[0]);
171 0 : ::close(pipe_fds_[1]);
172 0 : detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
173 : }
174 : }
175 :
176 HIT 195 : timer_svc_ = &get_timer_service(ctx, *this);
177 195 : timer_svc_->set_on_earliest_changed(
178 4017 : timer_service::callback(this, [](void* p) {
179 3822 : static_cast<select_scheduler*>(p)->interrupt_reactor();
180 3822 : }));
181 :
182 195 : get_resolver_service(ctx, *this);
183 195 : get_signal_service(ctx, *this);
184 195 : get_stream_file_service(ctx, *this);
185 195 : get_random_access_file_service(ctx, *this);
186 :
187 195 : completed_ops_.push(&task_op_);
188 195 : }
189 :
190 390 : inline select_scheduler::~select_scheduler()
191 : {
192 195 : if (pipe_fds_[0] >= 0)
193 195 : ::close(pipe_fds_[0]);
194 195 : if (pipe_fds_[1] >= 0)
195 195 : ::close(pipe_fds_[1]);
196 390 : }
197 :
198 : inline void
199 195 : select_scheduler::shutdown()
200 : {
201 195 : shutdown_drain();
202 :
203 195 : if (pipe_fds_[1] >= 0)
204 195 : interrupt_reactor();
205 195 : }
206 :
207 : inline void
208 7301 : select_scheduler::register_descriptor(
209 : int fd, select_descriptor_state* desc) const
210 : {
211 7301 : if (fd < 0 || fd >= FD_SETSIZE)
212 MIS 0 : detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
213 :
214 HIT 7301 : desc->registered_events = reactor_event_read | reactor_event_write;
215 7301 : desc->fd = fd;
216 7301 : desc->scheduler_ = this;
217 7301 : desc->mutex.set_enabled(!single_threaded_);
218 7301 : desc->ready_events_.store(0, std::memory_order_relaxed);
219 :
220 : {
221 7301 : conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
222 7301 : desc->impl_ref_.reset();
223 7301 : desc->read_ready = false;
224 7301 : desc->write_ready = false;
225 7301 : }
226 :
227 : {
228 7301 : mutex_type::scoped_lock lock(mutex_);
229 7301 : registered_descs_[fd] = desc;
230 7301 : if (fd > max_fd_)
231 7297 : max_fd_ = fd;
232 7301 : }
233 :
234 7301 : interrupt_reactor();
235 7301 : }
236 :
237 : inline void
238 7301 : select_scheduler::deregister_descriptor(int fd) const
239 : {
240 7301 : mutex_type::scoped_lock lock(mutex_);
241 :
242 7301 : auto it = registered_descs_.find(fd);
243 7301 : if (it == registered_descs_.end())
244 MIS 0 : return;
245 :
246 HIT 7301 : registered_descs_.erase(it);
247 :
248 7301 : if (fd == max_fd_)
249 : {
250 7244 : max_fd_ = pipe_fds_[0];
251 14403 : for (auto& [registered_fd, state] : registered_descs_)
252 : {
253 7159 : if (registered_fd > max_fd_)
254 7150 : max_fd_ = registered_fd;
255 : }
256 : }
257 7301 : }
258 :
259 : inline void
260 24529 : select_scheduler::notify_reactor() const
261 : {
262 24529 : interrupt_reactor();
263 24529 : }
264 :
265 : inline void
266 35990 : select_scheduler::interrupt_reactor() const
267 : {
268 35990 : char byte = 1;
269 35990 : [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
270 35990 : }
271 :
272 : inline long
273 115760 : select_scheduler::calculate_timeout(long requested_timeout_us) const
274 : {
275 115760 : if (requested_timeout_us == 0)
276 MIS 0 : return 0;
277 :
278 HIT 115760 : auto nearest = timer_svc_->nearest_expiry();
279 115760 : if (nearest == timer_service::time_point::max())
280 46 : return requested_timeout_us;
281 :
282 115714 : auto now = std::chrono::steady_clock::now();
283 115714 : if (nearest <= now)
284 394 : return 0;
285 :
286 : auto timer_timeout_us =
287 115320 : std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
288 115320 : .count();
289 :
290 115320 : constexpr auto long_max =
291 : static_cast<long long>((std::numeric_limits<long>::max)());
292 : auto capped_timer_us =
293 115320 : (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
294 115320 : static_cast<long long>(0)),
295 115320 : long_max);
296 :
297 115320 : if (requested_timeout_us < 0)
298 115314 : return static_cast<long>(capped_timer_us);
299 :
300 : return static_cast<long>(
301 6 : (std::min)(static_cast<long long>(requested_timeout_us),
302 6 : capped_timer_us));
303 : }
304 :
305 : inline void
306 139265 : select_scheduler::run_task(
307 : lock_type& lock, context_type* ctx, long timeout_us)
308 : {
309 : long effective_timeout_us =
310 139265 : task_interrupted_ ? 0 : calculate_timeout(timeout_us);
311 :
312 : // Snapshot registered descriptors while holding lock.
313 : // Record which fds need write monitoring to avoid a hot loop:
314 : // select is level-triggered so writable sockets (nearly always
315 : // writable) would cause select() to return immediately every
316 : // iteration if unconditionally added to write_fds.
317 : struct fd_entry
318 : {
319 : int fd;
320 : select_descriptor_state* desc;
321 : bool needs_write;
322 : };
323 : fd_entry snapshot[FD_SETSIZE];
324 139265 : int snapshot_count = 0;
325 :
326 411167 : for (auto& [fd, desc] : registered_descs_)
327 : {
328 271902 : if (snapshot_count < FD_SETSIZE)
329 : {
330 271902 : conditionally_enabled_mutex::scoped_lock desc_lock(desc->mutex);
331 271902 : snapshot[snapshot_count].fd = fd;
332 271902 : snapshot[snapshot_count].desc = desc;
333 271902 : snapshot[snapshot_count].needs_write =
334 271902 : (desc->write_op || desc->connect_op);
335 271902 : ++snapshot_count;
336 271902 : }
337 : }
338 :
339 139265 : if (lock.owns_lock())
340 115760 : lock.unlock();
341 :
342 139265 : task_cleanup on_exit{this, &lock, ctx};
343 :
344 : fd_set read_fds, write_fds, except_fds;
345 2367505 : FD_ZERO(&read_fds);
346 2367505 : FD_ZERO(&write_fds);
347 2367505 : FD_ZERO(&except_fds);
348 :
349 139265 : FD_SET(pipe_fds_[0], &read_fds);
350 139265 : int nfds = pipe_fds_[0];
351 :
352 411167 : for (int i = 0; i < snapshot_count; ++i)
353 : {
354 271902 : int fd = snapshot[i].fd;
355 271902 : FD_SET(fd, &read_fds);
356 271902 : if (snapshot[i].needs_write)
357 3597 : FD_SET(fd, &write_fds);
358 271902 : FD_SET(fd, &except_fds);
359 271902 : if (fd > nfds)
360 139001 : nfds = fd;
361 : }
362 :
363 : struct timeval tv;
364 139265 : struct timeval* tv_ptr = nullptr;
365 139265 : if (effective_timeout_us >= 0)
366 : {
367 139219 : tv.tv_sec = effective_timeout_us / 1000000;
368 139219 : tv.tv_usec = effective_timeout_us % 1000000;
369 139219 : tv_ptr = &tv;
370 : }
371 :
372 139265 : int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
373 :
374 : // EINTR: signal interrupted select(), just retry.
375 : // EBADF: an fd was closed between snapshot and select(); retry
376 : // with a fresh snapshot from registered_descs_.
377 139265 : if (ready < 0)
378 : {
379 MIS 0 : if (errno == EINTR || errno == EBADF)
380 0 : return;
381 0 : detail::throw_system_error(make_err(errno), "select");
382 : }
383 :
384 : // Process timers outside the lock
385 HIT 139265 : timer_svc_->process_expired();
386 :
387 139265 : op_queue local_ops;
388 :
389 139265 : if (ready > 0)
390 : {
391 123982 : if (FD_ISSET(pipe_fds_[0], &read_fds))
392 : {
393 : char buf[256];
394 37478 : while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
395 : {
396 : }
397 : }
398 :
399 372520 : for (int i = 0; i < snapshot_count; ++i)
400 : {
401 248538 : int fd = snapshot[i].fd;
402 248538 : select_descriptor_state* desc = snapshot[i].desc;
403 :
404 248538 : std::uint32_t flags = 0;
405 248538 : if (FD_ISSET(fd, &read_fds))
406 117030 : flags |= reactor_event_read;
407 248538 : if (FD_ISSET(fd, &write_fds))
408 3597 : flags |= reactor_event_write;
409 248538 : if (FD_ISSET(fd, &except_fds))
410 MIS 0 : flags |= reactor_event_error;
411 :
412 HIT 248538 : if (flags == 0)
413 127913 : continue;
414 :
415 120625 : desc->add_ready_events(flags);
416 :
417 120625 : bool expected = false;
418 120625 : if (desc->is_enqueued_.compare_exchange_strong(
419 : expected, true, std::memory_order_release,
420 : std::memory_order_relaxed))
421 : {
422 120625 : local_ops.push(desc);
423 : }
424 : }
425 : }
426 :
427 139265 : lock.lock();
428 :
429 139265 : if (!local_ops.empty())
430 117030 : completed_ops_.splice(local_ops);
431 139265 : }
432 :
433 : } // namespace boost::corosio::detail
434 :
435 : #endif // BOOST_COROSIO_HAS_SELECT
436 :
437 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
|