TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_POSIX
16 :
17 : #include <boost/corosio/native/detail/posix/posix_stream_file.hpp>
18 : #include <boost/corosio/native/native_scheduler.hpp>
19 : #include <boost/corosio/detail/file_service.hpp>
20 : #include <boost/corosio/detail/thread_pool.hpp>
21 :
22 : #include <mutex>
23 : #include <unordered_map>
24 :
25 : namespace boost::corosio::detail {
26 :
27 : /** Stream file service for POSIX backends.
28 :
29 : Owns all posix_stream_file instances. Thread lifecycle is
30 : managed by the thread_pool service (shared with resolver).
31 : */
32 : class BOOST_COROSIO_DECL posix_stream_file_service final
33 : : public file_service
34 : {
35 : public:
36 HIT 517 : posix_stream_file_service(
37 : capy::execution_context& ctx, scheduler& sched)
38 1034 : : sched_(&sched)
39 517 : , pool_(get_or_create_pool(ctx))
40 : {
41 517 : }
42 :
43 1034 : ~posix_stream_file_service() override = default;
44 :
45 : posix_stream_file_service(posix_stream_file_service const&) = delete;
46 : posix_stream_file_service& operator=(posix_stream_file_service const&) = delete;
47 :
48 26 : io_object::implementation* construct() override
49 : {
50 26 : auto ptr = std::make_shared<posix_stream_file>(*this);
51 26 : auto* impl = ptr.get();
52 :
53 : {
54 26 : std::lock_guard<std::mutex> lock(mutex_);
55 26 : file_list_.push_back(impl);
56 26 : file_ptrs_[impl] = std::move(ptr);
57 26 : }
58 :
59 26 : return impl;
60 26 : }
61 :
62 26 : void destroy(io_object::implementation* p) override
63 : {
64 26 : auto& impl = static_cast<posix_stream_file&>(*p);
65 26 : impl.cancel();
66 26 : impl.close_file();
67 26 : destroy_impl(impl);
68 26 : }
69 :
70 43 : void close(io_object::handle& h) override
71 : {
72 43 : if (h.get())
73 : {
74 43 : auto& impl = static_cast<posix_stream_file&>(*h.get());
75 43 : impl.cancel();
76 43 : impl.close_file();
77 : }
78 43 : }
79 :
80 19 : std::error_code open_file(
81 : stream_file::implementation& impl,
82 : std::filesystem::path const& path,
83 : file_base::flags mode) override
84 : {
85 19 : if (static_cast<native_scheduler const*>(sched_)->single_threaded_)
86 MIS 0 : return std::make_error_code(std::errc::operation_not_supported);
87 HIT 19 : return static_cast<posix_stream_file&>(impl).open_file(path, mode);
88 : }
89 :
90 517 : void shutdown() override
91 : {
92 517 : std::lock_guard<std::mutex> lock(mutex_);
93 517 : for (auto* impl = file_list_.pop_front(); impl != nullptr;
94 MIS 0 : impl = file_list_.pop_front())
95 : {
96 0 : impl->cancel();
97 0 : impl->close_file();
98 : }
99 HIT 517 : file_ptrs_.clear();
100 517 : }
101 :
102 26 : void destroy_impl(posix_stream_file& impl)
103 : {
104 26 : std::lock_guard<std::mutex> lock(mutex_);
105 26 : file_list_.remove(&impl);
106 26 : file_ptrs_.erase(&impl);
107 26 : }
108 :
109 12 : void post(scheduler_op* op)
110 : {
111 12 : sched_->post(op);
112 12 : }
113 :
114 : void work_started() noexcept
115 : {
116 : sched_->work_started();
117 : }
118 :
119 : void work_finished() noexcept
120 : {
121 : sched_->work_finished();
122 : }
123 :
124 12 : thread_pool& pool() noexcept
125 : {
126 12 : return pool_;
127 : }
128 :
129 : private:
130 517 : static thread_pool& get_or_create_pool(capy::execution_context& ctx)
131 : {
132 517 : auto* p = ctx.find_service<thread_pool>();
133 517 : if (p)
134 517 : return *p;
135 MIS 0 : return ctx.make_service<thread_pool>();
136 : }
137 :
138 : scheduler* sched_;
139 : thread_pool& pool_;
140 : std::mutex mutex_;
141 : intrusive_list<posix_stream_file> file_list_;
142 : std::unordered_map<posix_stream_file*, std::shared_ptr<posix_stream_file>>
143 : file_ptrs_;
144 : };
145 :
146 : /** Get or create the stream file service for the given context. */
147 : inline posix_stream_file_service&
148 HIT 517 : get_stream_file_service(capy::execution_context& ctx, scheduler& sched)
149 : {
150 517 : return ctx.make_service<posix_stream_file_service>(sched);
151 : }
152 :
153 : // ---------------------------------------------------------------------------
154 : // posix_stream_file inline implementations (require complete service type)
155 : // ---------------------------------------------------------------------------
156 :
157 : inline std::coroutine_handle<>
158 6 : posix_stream_file::read_some(
159 : std::coroutine_handle<> h,
160 : capy::executor_ref ex,
161 : buffer_param param,
162 : std::stop_token token,
163 : std::error_code* ec,
164 : std::size_t* bytes_out)
165 : {
166 6 : auto& op = read_op_;
167 6 : op.reset();
168 6 : op.is_read = true;
169 :
170 6 : capy::mutable_buffer bufs[max_buffers];
171 6 : op.iovec_count = static_cast<int>(param.copy_to(bufs, max_buffers));
172 :
173 6 : if (op.iovec_count == 0)
174 : {
175 MIS 0 : *ec = {};
176 0 : *bytes_out = 0;
177 0 : op.cont_op.cont.h = h;
178 0 : return dispatch_coro(ex, op.cont_op.cont);
179 : }
180 :
181 HIT 12 : for (int i = 0; i < op.iovec_count; ++i)
182 : {
183 6 : op.iovecs[i].iov_base = bufs[i].data();
184 6 : op.iovecs[i].iov_len = bufs[i].size();
185 : }
186 :
187 6 : op.h = h;
188 6 : op.ex = ex;
189 6 : op.ec_out = ec;
190 6 : op.bytes_out = bytes_out;
191 6 : op.start(token);
192 :
193 6 : op.ex.on_work_started();
194 :
195 6 : read_pool_op_.file_ = this;
196 6 : read_pool_op_.ref_ = this->shared_from_this();
197 6 : read_pool_op_.func_ = &posix_stream_file::do_read_work;
198 6 : if (!svc_.pool().post(&read_pool_op_))
199 : {
200 MIS 0 : op.impl_ref = std::move(read_pool_op_.ref_);
201 0 : op.cancelled.store(true, std::memory_order_release);
202 0 : svc_.post(&read_op_);
203 : }
204 HIT 6 : return std::noop_coroutine();
205 : }
206 :
207 : inline void
208 6 : posix_stream_file::do_read_work(pool_work_item* w) noexcept
209 : {
210 6 : auto* pw = static_cast<pool_op*>(w);
211 6 : auto* self = pw->file_;
212 6 : auto& op = self->read_op_;
213 :
214 6 : if (!op.cancelled.load(std::memory_order_acquire))
215 : {
216 : ssize_t n;
217 : do
218 : {
219 10 : n = ::preadv(self->fd_, op.iovecs, op.iovec_count,
220 5 : static_cast<off_t>(self->offset_));
221 : }
222 5 : while (n < 0 && errno == EINTR);
223 :
224 5 : if (n >= 0)
225 : {
226 5 : op.errn = 0;
227 5 : op.bytes_transferred = static_cast<std::size_t>(n);
228 5 : self->offset_ += static_cast<std::uint64_t>(n);
229 : }
230 : else
231 : {
232 MIS 0 : op.errn = errno;
233 0 : op.bytes_transferred = 0;
234 : }
235 : }
236 :
237 HIT 6 : op.impl_ref = std::move(pw->ref_);
238 6 : self->svc_.post(&op);
239 6 : }
240 :
241 : inline std::coroutine_handle<>
242 6 : posix_stream_file::write_some(
243 : std::coroutine_handle<> h,
244 : capy::executor_ref ex,
245 : buffer_param param,
246 : std::stop_token token,
247 : std::error_code* ec,
248 : std::size_t* bytes_out)
249 : {
250 6 : auto& op = write_op_;
251 6 : op.reset();
252 6 : op.is_read = false;
253 :
254 6 : capy::mutable_buffer bufs[max_buffers];
255 6 : op.iovec_count = static_cast<int>(param.copy_to(bufs, max_buffers));
256 :
257 6 : if (op.iovec_count == 0)
258 : {
259 MIS 0 : *ec = {};
260 0 : *bytes_out = 0;
261 0 : op.cont_op.cont.h = h;
262 0 : return dispatch_coro(ex, op.cont_op.cont);
263 : }
264 :
265 HIT 12 : for (int i = 0; i < op.iovec_count; ++i)
266 : {
267 6 : op.iovecs[i].iov_base = bufs[i].data();
268 6 : op.iovecs[i].iov_len = bufs[i].size();
269 : }
270 :
271 6 : op.h = h;
272 6 : op.ex = ex;
273 6 : op.ec_out = ec;
274 6 : op.bytes_out = bytes_out;
275 6 : op.start(token);
276 :
277 6 : op.ex.on_work_started();
278 :
279 6 : write_pool_op_.file_ = this;
280 6 : write_pool_op_.ref_ = this->shared_from_this();
281 6 : write_pool_op_.func_ = &posix_stream_file::do_write_work;
282 6 : if (!svc_.pool().post(&write_pool_op_))
283 : {
284 MIS 0 : op.impl_ref = std::move(write_pool_op_.ref_);
285 0 : op.cancelled.store(true, std::memory_order_release);
286 0 : svc_.post(&write_op_);
287 : }
288 HIT 6 : return std::noop_coroutine();
289 : }
290 :
291 : inline void
292 6 : posix_stream_file::do_write_work(pool_work_item* w) noexcept
293 : {
294 6 : auto* pw = static_cast<pool_op*>(w);
295 6 : auto* self = pw->file_;
296 6 : auto& op = self->write_op_;
297 :
298 6 : if (!op.cancelled.load(std::memory_order_acquire))
299 : {
300 : ssize_t n;
301 : do
302 : {
303 12 : n = ::pwritev(self->fd_, op.iovecs, op.iovec_count,
304 6 : static_cast<off_t>(self->offset_));
305 : }
306 6 : while (n < 0 && errno == EINTR);
307 :
308 6 : if (n >= 0)
309 : {
310 6 : op.errn = 0;
311 6 : op.bytes_transferred = static_cast<std::size_t>(n);
312 6 : self->offset_ += static_cast<std::uint64_t>(n);
313 : }
314 : else
315 : {
316 MIS 0 : op.errn = errno;
317 0 : op.bytes_transferred = 0;
318 : }
319 : }
320 :
321 HIT 6 : op.impl_ref = std::move(pw->ref_);
322 6 : self->svc_.post(&op);
323 6 : }
324 :
325 : } // namespace boost::corosio::detail
326 :
327 : #endif // BOOST_COROSIO_POSIX
328 :
329 : #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
|