LCOV - code coverage report
Current view: top level - corosio/native/detail/posix - posix_stream_file_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 84.1 % 145 122 23
Test Date: 2026-03-31 21:45:45 Functions: 100.0 % 17 17

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Michael Vandeberg
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_POSIX
      16                 : 
      17                 : #include <boost/corosio/native/detail/posix/posix_stream_file.hpp>
      18                 : #include <boost/corosio/native/native_scheduler.hpp>
      19                 : #include <boost/corosio/detail/file_service.hpp>
      20                 : #include <boost/corosio/detail/thread_pool.hpp>
      21                 : 
      22                 : #include <mutex>
      23                 : #include <unordered_map>
      24                 : 
      25                 : namespace boost::corosio::detail {
      26                 : 
      27                 : /** Stream file service for POSIX backends.
      28                 : 
      29                 :     Owns all posix_stream_file instances. Thread lifecycle is
      30                 :     managed by the thread_pool service (shared with resolver).
      31                 : */
      32                 : class BOOST_COROSIO_DECL posix_stream_file_service final
      33                 :     : public file_service
      34                 : {
      35                 : public:
      36 HIT         517 :     posix_stream_file_service(
      37                 :         capy::execution_context& ctx, scheduler& sched)
      38            1034 :         : sched_(&sched)
      39             517 :         , pool_(get_or_create_pool(ctx))
      40                 :     {
      41             517 :     }
      42                 : 
      43            1034 :     ~posix_stream_file_service() override = default;
      44                 : 
      45                 :     posix_stream_file_service(posix_stream_file_service const&)            = delete;
      46                 :     posix_stream_file_service& operator=(posix_stream_file_service const&) = delete;
      47                 : 
      48              26 :     io_object::implementation* construct() override
      49                 :     {
      50              26 :         auto ptr   = std::make_shared<posix_stream_file>(*this);
      51              26 :         auto* impl = ptr.get();
      52                 : 
      53                 :         {
      54              26 :             std::lock_guard<std::mutex> lock(mutex_);
      55              26 :             file_list_.push_back(impl);
      56              26 :             file_ptrs_[impl] = std::move(ptr);
      57              26 :         }
      58                 : 
      59              26 :         return impl;
      60              26 :     }
      61                 : 
      62              26 :     void destroy(io_object::implementation* p) override
      63                 :     {
      64              26 :         auto& impl = static_cast<posix_stream_file&>(*p);
      65              26 :         impl.cancel();
      66              26 :         impl.close_file();
      67              26 :         destroy_impl(impl);
      68              26 :     }
      69                 : 
      70              43 :     void close(io_object::handle& h) override
      71                 :     {
      72              43 :         if (h.get())
      73                 :         {
      74              43 :             auto& impl = static_cast<posix_stream_file&>(*h.get());
      75              43 :             impl.cancel();
      76              43 :             impl.close_file();
      77                 :         }
      78              43 :     }
      79                 : 
      80              19 :     std::error_code open_file(
      81                 :         stream_file::implementation& impl,
      82                 :         std::filesystem::path const& path,
      83                 :         file_base::flags mode) override
      84                 :     {
      85              19 :         if (static_cast<native_scheduler const*>(sched_)->single_threaded_)
      86 MIS           0 :             return std::make_error_code(std::errc::operation_not_supported);
      87 HIT          19 :         return static_cast<posix_stream_file&>(impl).open_file(path, mode);
      88                 :     }
      89                 : 
      90             517 :     void shutdown() override
      91                 :     {
      92             517 :         std::lock_guard<std::mutex> lock(mutex_);
      93             517 :         for (auto* impl = file_list_.pop_front(); impl != nullptr;
      94 MIS           0 :              impl       = file_list_.pop_front())
      95                 :         {
      96               0 :             impl->cancel();
      97               0 :             impl->close_file();
      98                 :         }
      99 HIT         517 :         file_ptrs_.clear();
     100             517 :     }
     101                 : 
     102              26 :     void destroy_impl(posix_stream_file& impl)
     103                 :     {
     104              26 :         std::lock_guard<std::mutex> lock(mutex_);
     105              26 :         file_list_.remove(&impl);
     106              26 :         file_ptrs_.erase(&impl);
     107              26 :     }
     108                 : 
     109              12 :     void post(scheduler_op* op)
     110                 :     {
     111              12 :         sched_->post(op);
     112              12 :     }
     113                 : 
     114                 :     void work_started() noexcept
     115                 :     {
     116                 :         sched_->work_started();
     117                 :     }
     118                 : 
     119                 :     void work_finished() noexcept
     120                 :     {
     121                 :         sched_->work_finished();
     122                 :     }
     123                 : 
     124              12 :     thread_pool& pool() noexcept
     125                 :     {
     126              12 :         return pool_;
     127                 :     }
     128                 : 
     129                 : private:
     130             517 :     static thread_pool& get_or_create_pool(capy::execution_context& ctx)
     131                 :     {
     132             517 :         auto* p = ctx.find_service<thread_pool>();
     133             517 :         if (p)
     134             517 :             return *p;
     135 MIS           0 :         return ctx.make_service<thread_pool>();
     136                 :     }
     137                 : 
     138                 :     scheduler* sched_;
     139                 :     thread_pool& pool_;
     140                 :     std::mutex mutex_;
     141                 :     intrusive_list<posix_stream_file> file_list_;
     142                 :     std::unordered_map<posix_stream_file*, std::shared_ptr<posix_stream_file>>
     143                 :         file_ptrs_;
     144                 : };
     145                 : 
     146                 : /** Get or create the stream file service for the given context. */
     147                 : inline posix_stream_file_service&
     148 HIT         517 : get_stream_file_service(capy::execution_context& ctx, scheduler& sched)
     149                 : {
     150             517 :     return ctx.make_service<posix_stream_file_service>(sched);
     151                 : }
     152                 : 
     153                 : // ---------------------------------------------------------------------------
     154                 : // posix_stream_file inline implementations (require complete service type)
     155                 : // ---------------------------------------------------------------------------
     156                 : 
     157                 : inline std::coroutine_handle<>
     158               6 : posix_stream_file::read_some(
     159                 :     std::coroutine_handle<> h,
     160                 :     capy::executor_ref ex,
     161                 :     buffer_param param,
     162                 :     std::stop_token token,
     163                 :     std::error_code* ec,
     164                 :     std::size_t* bytes_out)
     165                 : {
     166               6 :     auto& op = read_op_;
     167               6 :     op.reset();
     168               6 :     op.is_read = true;
     169                 : 
     170               6 :     capy::mutable_buffer bufs[max_buffers];
     171               6 :     op.iovec_count = static_cast<int>(param.copy_to(bufs, max_buffers));
     172                 : 
     173               6 :     if (op.iovec_count == 0)
     174                 :     {
     175 MIS           0 :         *ec        = {};
     176               0 :         *bytes_out = 0;
     177               0 :         op.cont_op.cont.h = h;
     178               0 :         return dispatch_coro(ex, op.cont_op.cont);
     179                 :     }
     180                 : 
     181 HIT          12 :     for (int i = 0; i < op.iovec_count; ++i)
     182                 :     {
     183               6 :         op.iovecs[i].iov_base = bufs[i].data();
     184               6 :         op.iovecs[i].iov_len  = bufs[i].size();
     185                 :     }
     186                 : 
     187               6 :     op.h         = h;
     188               6 :     op.ex        = ex;
     189               6 :     op.ec_out    = ec;
     190               6 :     op.bytes_out = bytes_out;
     191               6 :     op.start(token);
     192                 : 
     193               6 :     op.ex.on_work_started();
     194                 : 
     195               6 :     read_pool_op_.file_ = this;
     196               6 :     read_pool_op_.ref_  = this->shared_from_this();
     197               6 :     read_pool_op_.func_ = &posix_stream_file::do_read_work;
     198               6 :     if (!svc_.pool().post(&read_pool_op_))
     199                 :     {
     200 MIS           0 :         op.impl_ref = std::move(read_pool_op_.ref_);
     201               0 :         op.cancelled.store(true, std::memory_order_release);
     202               0 :         svc_.post(&read_op_);
     203                 :     }
     204 HIT           6 :     return std::noop_coroutine();
     205                 : }
     206                 : 
     207                 : inline void
     208               6 : posix_stream_file::do_read_work(pool_work_item* w) noexcept
     209                 : {
     210               6 :     auto* pw   = static_cast<pool_op*>(w);
     211               6 :     auto* self = pw->file_;
     212               6 :     auto& op   = self->read_op_;
     213                 : 
     214               6 :     if (!op.cancelled.load(std::memory_order_acquire))
     215                 :     {
     216                 :         ssize_t n;
     217                 :         do
     218                 :         {
     219              10 :             n = ::preadv(self->fd_, op.iovecs, op.iovec_count,
     220               5 :                          static_cast<off_t>(self->offset_));
     221                 :         }
     222               5 :         while (n < 0 && errno == EINTR);
     223                 : 
     224               5 :         if (n >= 0)
     225                 :         {
     226               5 :             op.errn              = 0;
     227               5 :             op.bytes_transferred = static_cast<std::size_t>(n);
     228               5 :             self->offset_ += static_cast<std::uint64_t>(n);
     229                 :         }
     230                 :         else
     231                 :         {
     232 MIS           0 :             op.errn              = errno;
     233               0 :             op.bytes_transferred = 0;
     234                 :         }
     235                 :     }
     236                 : 
     237 HIT           6 :     op.impl_ref = std::move(pw->ref_);
     238               6 :     self->svc_.post(&op);
     239               6 : }
     240                 : 
     241                 : inline std::coroutine_handle<>
     242               6 : posix_stream_file::write_some(
     243                 :     std::coroutine_handle<> h,
     244                 :     capy::executor_ref ex,
     245                 :     buffer_param param,
     246                 :     std::stop_token token,
     247                 :     std::error_code* ec,
     248                 :     std::size_t* bytes_out)
     249                 : {
     250               6 :     auto& op = write_op_;
     251               6 :     op.reset();
     252               6 :     op.is_read = false;
     253                 : 
     254               6 :     capy::mutable_buffer bufs[max_buffers];
     255               6 :     op.iovec_count = static_cast<int>(param.copy_to(bufs, max_buffers));
     256                 : 
     257               6 :     if (op.iovec_count == 0)
     258                 :     {
     259 MIS           0 :         *ec        = {};
     260               0 :         *bytes_out = 0;
     261               0 :         op.cont_op.cont.h = h;
     262               0 :         return dispatch_coro(ex, op.cont_op.cont);
     263                 :     }
     264                 : 
     265 HIT          12 :     for (int i = 0; i < op.iovec_count; ++i)
     266                 :     {
     267               6 :         op.iovecs[i].iov_base = bufs[i].data();
     268               6 :         op.iovecs[i].iov_len  = bufs[i].size();
     269                 :     }
     270                 : 
     271               6 :     op.h         = h;
     272               6 :     op.ex        = ex;
     273               6 :     op.ec_out    = ec;
     274               6 :     op.bytes_out = bytes_out;
     275               6 :     op.start(token);
     276                 : 
     277               6 :     op.ex.on_work_started();
     278                 : 
     279               6 :     write_pool_op_.file_ = this;
     280               6 :     write_pool_op_.ref_  = this->shared_from_this();
     281               6 :     write_pool_op_.func_ = &posix_stream_file::do_write_work;
     282               6 :     if (!svc_.pool().post(&write_pool_op_))
     283                 :     {
     284 MIS           0 :         op.impl_ref = std::move(write_pool_op_.ref_);
     285               0 :         op.cancelled.store(true, std::memory_order_release);
     286               0 :         svc_.post(&write_op_);
     287                 :     }
     288 HIT           6 :     return std::noop_coroutine();
     289                 : }
     290                 : 
     291                 : inline void
     292               6 : posix_stream_file::do_write_work(pool_work_item* w) noexcept
     293                 : {
     294               6 :     auto* pw   = static_cast<pool_op*>(w);
     295               6 :     auto* self = pw->file_;
     296               6 :     auto& op   = self->write_op_;
     297                 : 
     298               6 :     if (!op.cancelled.load(std::memory_order_acquire))
     299                 :     {
     300                 :         ssize_t n;
     301                 :         do
     302                 :         {
     303              12 :             n = ::pwritev(self->fd_, op.iovecs, op.iovec_count,
     304               6 :                           static_cast<off_t>(self->offset_));
     305                 :         }
     306               6 :         while (n < 0 && errno == EINTR);
     307                 : 
     308               6 :         if (n >= 0)
     309                 :         {
     310               6 :             op.errn              = 0;
     311               6 :             op.bytes_transferred = static_cast<std::size_t>(n);
     312               6 :             self->offset_ += static_cast<std::uint64_t>(n);
     313                 :         }
     314                 :         else
     315                 :         {
     316 MIS           0 :             op.errn              = errno;
     317               0 :             op.bytes_transferred = 0;
     318                 :         }
     319                 :     }
     320                 : 
     321 HIT           6 :     op.impl_ref = std::move(pw->ref_);
     322               6 :     self->svc_.post(&op);
     323               6 : }
     324                 : 
     325                 : } // namespace boost::corosio::detail
     326                 : 
     327                 : #endif // BOOST_COROSIO_POSIX
     328                 : 
     329                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
        

Generated by: LCOV version 2.3