LCOV - code coverage report
Current view: top level - corosio/native/detail/posix - posix_random_access_file_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 86.4 % 140 121 19
Test Date: 2026-03-31 21:45:45 Functions: 100.0 % 16 16

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Michael Vandeberg
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_POSIX
      16                 : 
      17                 : #include <boost/corosio/native/detail/posix/posix_random_access_file.hpp>
      18                 : #include <boost/corosio/native/native_scheduler.hpp>
      19                 : #include <boost/corosio/detail/random_access_file_service.hpp>
      20                 : #include <boost/corosio/detail/thread_pool.hpp>
      21                 : 
      22                 : #include <limits>
      23                 : #include <mutex>
      24                 : #include <unordered_map>
      25                 : 
      26                 : namespace boost::corosio::detail {
      27                 : 
      28                 : /** Random-access file service for POSIX backends. */
      29                 : class BOOST_COROSIO_DECL posix_random_access_file_service final
      30                 :     : public random_access_file_service
      31                 : {
      32                 : public:
      33 HIT         517 :     posix_random_access_file_service(
      34                 :         capy::execution_context& ctx, scheduler& sched)
      35            1034 :         : sched_(&sched)
      36             517 :         , pool_(get_or_create_pool(ctx))
      37                 :     {
      38             517 :     }
      39                 : 
      40            1034 :     ~posix_random_access_file_service() override = default;
      41                 : 
      42                 :     posix_random_access_file_service(
      43                 :         posix_random_access_file_service const&)            = delete;
      44                 :     posix_random_access_file_service& operator=(
      45                 :         posix_random_access_file_service const&) = delete;
      46                 : 
      47              24 :     io_object::implementation* construct() override
      48                 :     {
      49              24 :         auto ptr   = std::make_shared<posix_random_access_file>(*this);
      50              24 :         auto* impl = ptr.get();
      51                 : 
      52                 :         {
      53              24 :             std::lock_guard<std::mutex> lock(mutex_);
      54              24 :             file_list_.push_back(impl);
      55              24 :             file_ptrs_[impl] = std::move(ptr);
      56              24 :         }
      57                 : 
      58              24 :         return impl;
      59              24 :     }
      60                 : 
      61              24 :     void destroy(io_object::implementation* p) override
      62                 :     {
      63              24 :         auto& impl = static_cast<posix_random_access_file&>(*p);
      64              24 :         impl.cancel();
      65              24 :         impl.close_file();
      66              24 :         destroy_impl(impl);
      67              24 :     }
      68                 : 
      69              42 :     void close(io_object::handle& h) override
      70                 :     {
      71              42 :         if (h.get())
      72                 :         {
      73              42 :             auto& impl = static_cast<posix_random_access_file&>(*h.get());
      74              42 :             impl.cancel();
      75              42 :             impl.close_file();
      76                 :         }
      77              42 :     }
      78                 : 
      79              19 :     std::error_code open_file(
      80                 :         random_access_file::implementation& impl,
      81                 :         std::filesystem::path const& path,
      82                 :         file_base::flags mode) override
      83                 :     {
      84              19 :         if (static_cast<native_scheduler const*>(sched_)->single_threaded_)
      85 MIS           0 :             return std::make_error_code(std::errc::operation_not_supported);
      86 HIT          19 :         return static_cast<posix_random_access_file&>(impl).open_file(
      87              19 :             path, mode);
      88                 :     }
      89                 : 
      90             517 :     void shutdown() override
      91                 :     {
      92             517 :         std::lock_guard<std::mutex> lock(mutex_);
      93             517 :         for (auto* impl = file_list_.pop_front(); impl != nullptr;
      94 MIS           0 :              impl       = file_list_.pop_front())
      95                 :         {
      96               0 :             impl->cancel();
      97               0 :             impl->close_file();
      98                 :         }
      99 HIT         517 :         file_ptrs_.clear();
     100             517 :     }
     101                 : 
     102              24 :     void destroy_impl(posix_random_access_file& impl)
     103                 :     {
     104              24 :         std::lock_guard<std::mutex> lock(mutex_);
     105              24 :         file_list_.remove(&impl);
     106              24 :         file_ptrs_.erase(&impl);
     107              24 :     }
     108                 : 
     109             126 :     void post(scheduler_op* op)
     110                 :     {
     111             126 :         sched_->post(op);
     112             126 :     }
     113                 : 
     114                 :     void work_started() noexcept
     115                 :     {
     116                 :         sched_->work_started();
     117                 :     }
     118                 : 
     119                 :     void work_finished() noexcept
     120                 :     {
     121                 :         sched_->work_finished();
     122                 :     }
     123                 : 
     124             126 :     thread_pool& pool() noexcept
     125                 :     {
     126             126 :         return pool_;
     127                 :     }
     128                 : 
     129                 : private:
     130             517 :     static thread_pool& get_or_create_pool(capy::execution_context& ctx)
     131                 :     {
     132             517 :         auto* p = ctx.find_service<thread_pool>();
     133             517 :         if (p)
     134             517 :             return *p;
     135 MIS           0 :         return ctx.make_service<thread_pool>();
     136                 :     }
     137                 : 
     138                 :     scheduler* sched_;
     139                 :     thread_pool& pool_;
     140                 :     std::mutex mutex_;
     141                 :     intrusive_list<posix_random_access_file> file_list_;
     142                 :     std::unordered_map<
     143                 :         posix_random_access_file*,
     144                 :         std::shared_ptr<posix_random_access_file>>
     145                 :         file_ptrs_;
     146                 : };
     147                 : 
     148                 : /** Get or create the random-access file service for the given context. */
     149                 : inline posix_random_access_file_service&
     150 HIT         517 : get_random_access_file_service(capy::execution_context& ctx, scheduler& sched)
     151                 : {
     152             517 :     return ctx.make_service<posix_random_access_file_service>(sched);
     153                 : }
     154                 : 
     155                 : // ---------------------------------------------------------------------------
     156                 : // posix_random_access_file inline implementations (require complete service)
     157                 : // ---------------------------------------------------------------------------
     158                 : 
     159                 : inline std::coroutine_handle<>
     160             116 : posix_random_access_file::read_some_at(
     161                 :     std::uint64_t offset,
     162                 :     std::coroutine_handle<> h,
     163                 :     capy::executor_ref ex,
     164                 :     buffer_param param,
     165                 :     std::stop_token token,
     166                 :     std::error_code* ec,
     167                 :     std::size_t* bytes_out)
     168                 : {
     169             116 :     capy::mutable_buffer bufs[max_buffers];
     170             116 :     auto count = param.copy_to(bufs, max_buffers);
     171                 : 
     172             116 :     if (count == 0)
     173                 :     {
     174 MIS           0 :         *ec        = {};
     175               0 :         *bytes_out = 0;
     176               0 :         return h;
     177                 :     }
     178                 : 
     179 HIT         116 :     auto* op = new raf_op();
     180             116 :     op->is_read = true;
     181             116 :     op->offset  = offset;
     182                 : 
     183             116 :     op->iovec_count = static_cast<int>(count);
     184             232 :     for (int i = 0; i < op->iovec_count; ++i)
     185                 :     {
     186             116 :         op->iovecs[i].iov_base = bufs[i].data();
     187             116 :         op->iovecs[i].iov_len  = bufs[i].size();
     188                 :     }
     189                 : 
     190             116 :     op->h         = h;
     191             116 :     op->ex        = ex;
     192             116 :     op->ec_out    = ec;
     193             116 :     op->bytes_out = bytes_out;
     194             116 :     op->file_     = this;
     195             116 :     op->file_ref  = this->shared_from_this();
     196             116 :     op->start(token);
     197                 : 
     198             116 :     op->ex.on_work_started();
     199                 : 
     200                 :     {
     201             116 :         std::lock_guard<std::mutex> lock(ops_mutex_);
     202             116 :         outstanding_ops_.push_back(op);
     203             116 :     }
     204                 : 
     205             116 :     static_cast<pool_work_item*>(op)->func_ = &raf_op::do_work;
     206             116 :     if (!svc_.pool().post(static_cast<pool_work_item*>(op)))
     207                 :     {
     208 MIS           0 :         op->cancelled.store(true, std::memory_order_release);
     209               0 :         svc_.post(static_cast<scheduler_op*>(op));
     210                 :     }
     211 HIT         116 :     return std::noop_coroutine();
     212                 : }
     213                 : 
     214                 : inline std::coroutine_handle<>
     215              10 : posix_random_access_file::write_some_at(
     216                 :     std::uint64_t offset,
     217                 :     std::coroutine_handle<> h,
     218                 :     capy::executor_ref ex,
     219                 :     buffer_param param,
     220                 :     std::stop_token token,
     221                 :     std::error_code* ec,
     222                 :     std::size_t* bytes_out)
     223                 : {
     224              10 :     capy::mutable_buffer bufs[max_buffers];
     225              10 :     auto count = param.copy_to(bufs, max_buffers);
     226                 : 
     227              10 :     if (count == 0)
     228                 :     {
     229 MIS           0 :         *ec        = {};
     230               0 :         *bytes_out = 0;
     231               0 :         return h;
     232                 :     }
     233                 : 
     234 HIT          10 :     auto* op = new raf_op();
     235              10 :     op->is_read = false;
     236              10 :     op->offset  = offset;
     237                 : 
     238              10 :     op->iovec_count = static_cast<int>(count);
     239              20 :     for (int i = 0; i < op->iovec_count; ++i)
     240                 :     {
     241              10 :         op->iovecs[i].iov_base = bufs[i].data();
     242              10 :         op->iovecs[i].iov_len  = bufs[i].size();
     243                 :     }
     244                 : 
     245              10 :     op->h         = h;
     246              10 :     op->ex        = ex;
     247              10 :     op->ec_out    = ec;
     248              10 :     op->bytes_out = bytes_out;
     249              10 :     op->file_     = this;
     250              10 :     op->file_ref  = this->shared_from_this();
     251              10 :     op->start(token);
     252                 : 
     253              10 :     op->ex.on_work_started();
     254                 : 
     255                 :     {
     256              10 :         std::lock_guard<std::mutex> lock(ops_mutex_);
     257              10 :         outstanding_ops_.push_back(op);
     258              10 :     }
     259                 : 
     260              10 :     static_cast<pool_work_item*>(op)->func_ = &raf_op::do_work;
     261              10 :     if (!svc_.pool().post(static_cast<pool_work_item*>(op)))
     262                 :     {
     263 MIS           0 :         op->cancelled.store(true, std::memory_order_release);
     264               0 :         svc_.post(static_cast<scheduler_op*>(op));
     265                 :     }
     266 HIT          10 :     return std::noop_coroutine();
     267                 : }
     268                 : 
     269                 : // -- raf_op thread-pool work function --
     270                 : 
     271                 : inline void
     272             126 : posix_random_access_file::raf_op::do_work(pool_work_item* w) noexcept
     273                 : {
     274             126 :     auto* op   = static_cast<raf_op*>(w);
     275             126 :     auto* self = op->file_;
     276                 : 
     277             126 :     if (op->cancelled.load(std::memory_order_acquire))
     278                 :     {
     279               1 :         op->errn              = ECANCELED;
     280               1 :         op->bytes_transferred = 0;
     281                 :     }
     282             250 :     else if (op->offset >
     283             125 :              static_cast<std::uint64_t>(std::numeric_limits<off_t>::max()))
     284                 :     {
     285 MIS           0 :         op->errn              = EOVERFLOW;
     286               0 :         op->bytes_transferred = 0;
     287                 :     }
     288                 :     else
     289                 :     {
     290                 :         ssize_t n;
     291 HIT         125 :         if (op->is_read)
     292                 :         {
     293                 :             do
     294                 :             {
     295             230 :                 n = ::preadv(self->fd_, op->iovecs, op->iovec_count,
     296             115 :                              static_cast<off_t>(op->offset));
     297                 :             }
     298             115 :             while (n < 0 && errno == EINTR);
     299                 :         }
     300                 :         else
     301                 :         {
     302                 :             do
     303                 :             {
     304              20 :                 n = ::pwritev(self->fd_, op->iovecs, op->iovec_count,
     305              10 :                               static_cast<off_t>(op->offset));
     306                 :             }
     307              10 :             while (n < 0 && errno == EINTR);
     308                 :         }
     309                 : 
     310             125 :         if (n >= 0)
     311                 :         {
     312             125 :             op->errn              = 0;
     313             125 :             op->bytes_transferred = static_cast<std::size_t>(n);
     314                 :         }
     315                 :         else
     316                 :         {
     317 MIS           0 :             op->errn              = errno;
     318               0 :             op->bytes_transferred = 0;
     319                 :         }
     320                 :     }
     321                 : 
     322 HIT         126 :     self->svc_.post(static_cast<scheduler_op*>(op));
     323             126 : }
     324                 : 
     325                 : } // namespace boost::corosio::detail
     326                 : 
     327                 : #endif // BOOST_COROSIO_POSIX
     328                 : 
     329                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
        

Generated by: LCOV version 2.3