91.76% Lines (345/376) 97.78% Functions (44/45)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // Copyright (c) 2026 Steve Gerbino 3   // Copyright (c) 2026 Steve Gerbino
4   // 4   //
5   // Distributed under the Boost Software License, Version 1.0. (See accompanying 5   // Distributed under the Boost Software License, Version 1.0. (See accompanying
6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7   // 7   //
8   // Official repository: https://github.com/cppalliance/corosio 8   // Official repository: https://github.com/cppalliance/corosio
9   // 9   //
10   10  
11   #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP 11   #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12   #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP 12   #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13   13  
14   #include <boost/corosio/timer.hpp> 14   #include <boost/corosio/timer.hpp>
15   #include <boost/corosio/io_context.hpp> 15   #include <boost/corosio/io_context.hpp>
16   #include <boost/corosio/detail/scheduler_op.hpp> 16   #include <boost/corosio/detail/scheduler_op.hpp>
17   #include <boost/corosio/detail/intrusive.hpp> 17   #include <boost/corosio/detail/intrusive.hpp>
18   #include <boost/corosio/detail/thread_local_ptr.hpp> 18   #include <boost/corosio/detail/thread_local_ptr.hpp>
19   #include <boost/capy/error.hpp> 19   #include <boost/capy/error.hpp>
20   #include <boost/capy/ex/execution_context.hpp> 20   #include <boost/capy/ex/execution_context.hpp>
21   #include <boost/capy/ex/executor_ref.hpp> 21   #include <boost/capy/ex/executor_ref.hpp>
22   #include <system_error> 22   #include <system_error>
23   23  
24   #include <atomic> 24   #include <atomic>
25   #include <chrono> 25   #include <chrono>
26   #include <coroutine> 26   #include <coroutine>
27   #include <cstddef> 27   #include <cstddef>
28   #include <limits> 28   #include <limits>
29   #include <mutex> 29   #include <mutex>
30   #include <optional> 30   #include <optional>
31   #include <stop_token> 31   #include <stop_token>
32   #include <utility> 32   #include <utility>
33   #include <vector> 33   #include <vector>
34   34  
35   namespace boost::corosio::detail { 35   namespace boost::corosio::detail {
36   36  
37   struct scheduler; 37   struct scheduler;
38   38  
39   /* 39   /*
40   Timer Service 40   Timer Service
41   ============= 41   =============
42   42  
43   Data Structures 43   Data Structures
44   --------------- 44   ---------------
45   waiter_node holds per-waiter state: coroutine handle, executor, 45   waiter_node holds per-waiter state: coroutine handle, executor,
46   error output, stop_token, embedded completion_op. Each concurrent 46   error output, stop_token, embedded completion_op. Each concurrent
47   co_await t.wait() allocates one waiter_node. 47   co_await t.wait() allocates one waiter_node.
48   48  
49   timer_service::implementation holds per-timer state: expiry, 49   timer_service::implementation holds per-timer state: expiry,
50   heap index, and an intrusive_list of waiter_nodes. Multiple 50   heap index, and an intrusive_list of waiter_nodes. Multiple
51   coroutines can wait on the same timer simultaneously. 51   coroutines can wait on the same timer simultaneously.
52   52  
53   timer_service owns a min-heap of active timers, a free list 53   timer_service owns a min-heap of active timers, a free list
54   of recycled impls, and a free list of recycled waiter_nodes. The 54   of recycled impls, and a free list of recycled waiter_nodes. The
55   heap is ordered by expiry time; the scheduler queries 55   heap is ordered by expiry time; the scheduler queries
56   nearest_expiry() to set the epoll/timerfd timeout. 56   nearest_expiry() to set the epoll/timerfd timeout.
57   57  
58   Optimization Strategy 58   Optimization Strategy
59   --------------------- 59   ---------------------
60   1. Deferred heap insertion — expires_after() stores the expiry 60   1. Deferred heap insertion — expires_after() stores the expiry
61   but does not insert into the heap. Insertion happens in wait(). 61   but does not insert into the heap. Insertion happens in wait().
62   2. Thread-local impl cache — single-slot per-thread cache. 62   2. Thread-local impl cache — single-slot per-thread cache.
63   3. Embedded completion_op — eliminates heap allocation per fire/cancel. 63   3. Embedded completion_op — eliminates heap allocation per fire/cancel.
64   4. Cached nearest expiry — atomic avoids mutex in nearest_expiry(). 64   4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
65   5. might_have_pending_waits_ flag — skips lock when no wait issued. 65   5. might_have_pending_waits_ flag — skips lock when no wait issued.
66   6. Thread-local waiter cache — single-slot per-thread cache. 66   6. Thread-local waiter cache — single-slot per-thread cache.
67   67  
68   Concurrency 68   Concurrency
69   ----------- 69   -----------
70   stop_token callbacks can fire from any thread. The impl_ 70   stop_token callbacks can fire from any thread. The impl_
71   pointer on waiter_node is used as a "still in list" marker. 71   pointer on waiter_node is used as a "still in list" marker.
72   */ 72   */
73   73  
74   struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node; 74   struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
75   75  
76   inline void timer_service_invalidate_cache() noexcept; 76   inline void timer_service_invalidate_cache() noexcept;
77   77  
78   // timer_service class body — member function definitions are 78   // timer_service class body — member function definitions are
79   // out-of-class (after implementation and waiter_node are complete) 79   // out-of-class (after implementation and waiter_node are complete)
80   class BOOST_COROSIO_DECL timer_service final 80   class BOOST_COROSIO_DECL timer_service final
81   : public capy::execution_context::service 81   : public capy::execution_context::service
82   , public io_object::io_service 82   , public io_object::io_service
83   { 83   {
84   public: 84   public:
85   using clock_type = std::chrono::steady_clock; 85   using clock_type = std::chrono::steady_clock;
86   using time_point = clock_type::time_point; 86   using time_point = clock_type::time_point;
87   87  
88   /// Type-erased callback for earliest-expiry-changed notifications. 88   /// Type-erased callback for earliest-expiry-changed notifications.
89   class callback 89   class callback
90   { 90   {
91   void* ctx_ = nullptr; 91   void* ctx_ = nullptr;
92   void (*fn_)(void*) = nullptr; 92   void (*fn_)(void*) = nullptr;
93   93  
94   public: 94   public:
95   /// Construct an empty callback. 95   /// Construct an empty callback.
HITCBC 96   605 callback() = default; 96   605 callback() = default;
97   97  
98   /// Construct a callback with the given context and function. 98   /// Construct a callback with the given context and function.
HITCBC 99   605 callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {} 99   605 callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
100   100  
101   /// Return true if the callback is non-empty. 101   /// Return true if the callback is non-empty.
102   explicit operator bool() const noexcept 102   explicit operator bool() const noexcept
103   { 103   {
104   return fn_ != nullptr; 104   return fn_ != nullptr;
105   } 105   }
106   106  
107   /// Invoke the callback. 107   /// Invoke the callback.
HITCBC 108   7663 void operator()() const 108   6430 void operator()() const
109   { 109   {
HITCBC 110   7663 if (fn_) 110   6430 if (fn_)
HITCBC 111   7663 fn_(ctx_); 111   6430 fn_(ctx_);
HITCBC 112   7663 } 112   6430 }
113   }; 113   };
114   114  
115   struct implementation; 115   struct implementation;
116   116  
117   private: 117   private:
118   struct heap_entry 118   struct heap_entry
119   { 119   {
120   time_point time_; 120   time_point time_;
121   implementation* timer_; 121   implementation* timer_;
122   }; 122   };
123   123  
124   scheduler* sched_ = nullptr; 124   scheduler* sched_ = nullptr;
125   mutable std::mutex mutex_; 125   mutable std::mutex mutex_;
126   std::vector<heap_entry> heap_; 126   std::vector<heap_entry> heap_;
127   implementation* free_list_ = nullptr; 127   implementation* free_list_ = nullptr;
128   waiter_node* waiter_free_list_ = nullptr; 128   waiter_node* waiter_free_list_ = nullptr;
129   callback on_earliest_changed_; 129   callback on_earliest_changed_;
130   bool shutting_down_ = false; 130   bool shutting_down_ = false;
131   // Avoids mutex in nearest_expiry() and empty() 131   // Avoids mutex in nearest_expiry() and empty()
132   mutable std::atomic<std::int64_t> cached_nearest_ns_{ 132   mutable std::atomic<std::int64_t> cached_nearest_ns_{
133   (std::numeric_limits<std::int64_t>::max)()}; 133   (std::numeric_limits<std::int64_t>::max)()};
134   134  
135   public: 135   public:
136   /// Construct the timer service bound to a scheduler. 136   /// Construct the timer service bound to a scheduler.
HITCBC 137   605 inline timer_service(capy::execution_context&, scheduler& sched) 137   605 inline timer_service(capy::execution_context&, scheduler& sched)
HITCBC 138   605 : sched_(&sched) 138   605 : sched_(&sched)
139   { 139   {
HITCBC 140   605 } 140   605 }
141   141  
142   /// Return the associated scheduler. 142   /// Return the associated scheduler.
HITCBC 143   15410 inline scheduler& get_scheduler() noexcept 143   12942 inline scheduler& get_scheduler() noexcept
144   { 144   {
HITCBC 145   15410 return *sched_; 145   12942 return *sched_;
146   } 146   }
147   147  
148   /// Destroy the timer service. 148   /// Destroy the timer service.
HITCBC 149   1210 ~timer_service() override = default; 149   1210 ~timer_service() override = default;
150   150  
151   timer_service(timer_service const&) = delete; 151   timer_service(timer_service const&) = delete;
152   timer_service& operator=(timer_service const&) = delete; 152   timer_service& operator=(timer_service const&) = delete;
153   153  
154   /// Register a callback invoked when the earliest expiry changes. 154   /// Register a callback invoked when the earliest expiry changes.
HITCBC 155   605 inline void set_on_earliest_changed(callback cb) 155   605 inline void set_on_earliest_changed(callback cb)
156   { 156   {
HITCBC 157   605 on_earliest_changed_ = cb; 157   605 on_earliest_changed_ = cb;
HITCBC 158   605 } 158   605 }
159   159  
160   /// Return true if no timers are in the heap. 160   /// Return true if no timers are in the heap.
161   inline bool empty() const noexcept 161   inline bool empty() const noexcept
162   { 162   {
163   return cached_nearest_ns_.load(std::memory_order_acquire) == 163   return cached_nearest_ns_.load(std::memory_order_acquire) ==
164   (std::numeric_limits<std::int64_t>::max)(); 164   (std::numeric_limits<std::int64_t>::max)();
165   } 165   }
166   166  
167   /// Return the nearest timer expiry without acquiring the mutex. 167   /// Return the nearest timer expiry without acquiring the mutex.
HITCBC 168   209022 inline time_point nearest_expiry() const noexcept 168   107262 inline time_point nearest_expiry() const noexcept
169   { 169   {
HITCBC 170   209022 auto ns = cached_nearest_ns_.load(std::memory_order_acquire); 170   107262 auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
HITCBC 171   209022 return time_point(time_point::duration(ns)); 171   107262 return time_point(time_point::duration(ns));
172   } 172   }
173   173  
174   /// Cancel all pending timers and free cached resources. 174   /// Cancel all pending timers and free cached resources.
175   inline void shutdown() override; 175   inline void shutdown() override;
176   176  
177   /// Construct a new timer implementation. 177   /// Construct a new timer implementation.
178   inline io_object::implementation* construct() override; 178   inline io_object::implementation* construct() override;
179   179  
180   /// Destroy a timer implementation, cancelling pending waiters. 180   /// Destroy a timer implementation, cancelling pending waiters.
181   inline void destroy(io_object::implementation* p) override; 181   inline void destroy(io_object::implementation* p) override;
182   182  
183   /// Cancel and recycle a timer implementation. 183   /// Cancel and recycle a timer implementation.
184   inline void destroy_impl(implementation& impl); 184   inline void destroy_impl(implementation& impl);
185   185  
186   /// Create or recycle a waiter node. 186   /// Create or recycle a waiter node.
187   inline waiter_node* create_waiter(); 187   inline waiter_node* create_waiter();
188   188  
189   /// Return a waiter node to the cache or free list. 189   /// Return a waiter node to the cache or free list.
190   inline void destroy_waiter(waiter_node* w); 190   inline void destroy_waiter(waiter_node* w);
191   191  
192   /// Update the timer expiry, cancelling existing waiters. 192   /// Update the timer expiry, cancelling existing waiters.
193   inline std::size_t update_timer(implementation& impl, time_point new_time); 193   inline std::size_t update_timer(implementation& impl, time_point new_time);
194   194  
195   /// Insert a waiter into the timer's waiter list and the heap. 195   /// Insert a waiter into the timer's waiter list and the heap.
196   inline void insert_waiter(implementation& impl, waiter_node* w); 196   inline void insert_waiter(implementation& impl, waiter_node* w);
197   197  
198   /// Cancel all waiters on a timer. 198   /// Cancel all waiters on a timer.
199   inline std::size_t cancel_timer(implementation& impl); 199   inline std::size_t cancel_timer(implementation& impl);
200   200  
201   /// Cancel a single waiter ( stop_token callback path ). 201   /// Cancel a single waiter ( stop_token callback path ).
202   inline void cancel_waiter(waiter_node* w); 202   inline void cancel_waiter(waiter_node* w);
203   203  
204   /// Cancel one waiter on a timer. 204   /// Cancel one waiter on a timer.
205   inline std::size_t cancel_one_waiter(implementation& impl); 205   inline std::size_t cancel_one_waiter(implementation& impl);
206   206  
207   /// Complete all waiters whose timers have expired. 207   /// Complete all waiters whose timers have expired.
208   inline std::size_t process_expired(); 208   inline std::size_t process_expired();
209   209  
210   private: 210   private:
HITCBC 211   248939 inline void refresh_cached_nearest() noexcept 211   134726 inline void refresh_cached_nearest() noexcept
212   { 212   {
HITCBC 213   248939 auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)() 213   134726 auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
HITCBC 214   248399 : heap_[0].time_.time_since_epoch().count(); 214   134185 : heap_[0].time_.time_since_epoch().count();
HITCBC 215   248939 cached_nearest_ns_.store(ns, std::memory_order_release); 215   134726 cached_nearest_ns_.store(ns, std::memory_order_release);
HITCBC 216   248939 } 216   134726 }
217   217  
218   inline void remove_timer_impl(implementation& impl); 218   inline void remove_timer_impl(implementation& impl);
219   inline void up_heap(std::size_t index); 219   inline void up_heap(std::size_t index);
220   inline void down_heap(std::size_t index); 220   inline void down_heap(std::size_t index);
221   inline void swap_heap(std::size_t i1, std::size_t i2); 221   inline void swap_heap(std::size_t i1, std::size_t i2);
222   }; 222   };
223   223  
224   struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node 224   struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
225   : intrusive_list<waiter_node>::node 225   : intrusive_list<waiter_node>::node
226   { 226   {
227   // Embedded completion op — avoids heap allocation per fire/cancel 227   // Embedded completion op — avoids heap allocation per fire/cancel
228   struct completion_op final : scheduler_op 228   struct completion_op final : scheduler_op
229   { 229   {
230   waiter_node* waiter_ = nullptr; 230   waiter_node* waiter_ = nullptr;
231   231  
232   static void do_complete( 232   static void do_complete(
233   void* owner, scheduler_op* base, std::uint32_t, std::uint32_t); 233   void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
234   234  
HITCBC 235   214 completion_op() noexcept : scheduler_op(&do_complete) {} 235   214 completion_op() noexcept : scheduler_op(&do_complete) {}
236   236  
237   void operator()() override; 237   void operator()() override;
238   void destroy() override; 238   void destroy() override;
239   }; 239   };
240   240  
241   // Per-waiter stop_token cancellation 241   // Per-waiter stop_token cancellation
242   struct canceller 242   struct canceller
243   { 243   {
244   waiter_node* waiter_; 244   waiter_node* waiter_;
245   void operator()() const; 245   void operator()() const;
246   }; 246   };
247   247  
248   // nullptr once removed from timer's waiter list (concurrency marker) 248   // nullptr once removed from timer's waiter list (concurrency marker)
249   timer_service::implementation* impl_ = nullptr; 249   timer_service::implementation* impl_ = nullptr;
250   timer_service* svc_ = nullptr; 250   timer_service* svc_ = nullptr;
251   std::coroutine_handle<> h_; 251   std::coroutine_handle<> h_;
252   capy::continuation* cont_ = nullptr; 252   capy::continuation* cont_ = nullptr;
253   capy::executor_ref d_; 253   capy::executor_ref d_;
254   std::error_code* ec_out_ = nullptr; 254   std::error_code* ec_out_ = nullptr;
255   std::stop_token token_; 255   std::stop_token token_;
256   std::optional<std::stop_callback<canceller>> stop_cb_; 256   std::optional<std::stop_callback<canceller>> stop_cb_;
257   completion_op op_; 257   completion_op op_;
258   std::error_code ec_value_; 258   std::error_code ec_value_;
259   waiter_node* next_free_ = nullptr; 259   waiter_node* next_free_ = nullptr;
260   260  
HITCBC 261   214 waiter_node() noexcept 261   214 waiter_node() noexcept
HITCBC 262   214 { 262   214 {
HITCBC 263   214 op_.waiter_ = this; 263   214 op_.waiter_ = this;
HITCBC 264   214 } 264   214 }
265   }; 265   };
266   266  
267   struct timer_service::implementation final : timer::implementation 267   struct timer_service::implementation final : timer::implementation
268   { 268   {
269   using clock_type = std::chrono::steady_clock; 269   using clock_type = std::chrono::steady_clock;
270   using time_point = clock_type::time_point; 270   using time_point = clock_type::time_point;
271   using duration = clock_type::duration; 271   using duration = clock_type::duration;
272   272  
273   timer_service* svc_ = nullptr; 273   timer_service* svc_ = nullptr;
274   intrusive_list<waiter_node> waiters_; 274   intrusive_list<waiter_node> waiters_;
275   275  
276   // Free list linkage (reused when impl is on free_list) 276   // Free list linkage (reused when impl is on free_list)
277   implementation* next_free_ = nullptr; 277   implementation* next_free_ = nullptr;
278   278  
279   inline explicit implementation(timer_service& svc) noexcept; 279   inline explicit implementation(timer_service& svc) noexcept;
280   280  
281   inline std::coroutine_handle<> wait( 281   inline std::coroutine_handle<> wait(
282   std::coroutine_handle<>, 282   std::coroutine_handle<>,
283   capy::executor_ref, 283   capy::executor_ref,
284   std::stop_token, 284   std::stop_token,
285   std::error_code*, 285   std::error_code*,
286   capy::continuation*) override; 286   capy::continuation*) override;
287   }; 287   };
288   288  
289   // Thread-local caches avoid hot-path mutex acquisitions: 289   // Thread-local caches avoid hot-path mutex acquisitions:
290   // 1. Impl cache — single-slot, validated by comparing svc_ 290   // 1. Impl cache — single-slot, validated by comparing svc_
291   // 2. Waiter cache — single-slot, no service affinity 291   // 2. Waiter cache — single-slot, no service affinity
292   // All caches are cleared by timer_service_invalidate_cache() during shutdown. 292   // All caches are cleared by timer_service_invalidate_cache() during shutdown.
293   293  
294   inline thread_local_ptr<timer_service::implementation> tl_cached_impl; 294   inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
295   inline thread_local_ptr<waiter_node> tl_cached_waiter; 295   inline thread_local_ptr<waiter_node> tl_cached_waiter;
296   296  
297   inline timer_service::implementation* 297   inline timer_service::implementation*
HITCBC 298   7921 try_pop_tl_cache(timer_service* svc) noexcept 298   6680 try_pop_tl_cache(timer_service* svc) noexcept
299   { 299   {
HITCBC 300   7921 auto* impl = tl_cached_impl.get(); 300   6680 auto* impl = tl_cached_impl.get();
HITCBC 301   7921 if (impl) 301   6680 if (impl)
302   { 302   {
HITCBC 303   7668 tl_cached_impl.set(nullptr); 303   6427 tl_cached_impl.set(nullptr);
HITCBC 304   7668 if (impl->svc_ == svc) 304   6427 if (impl->svc_ == svc)
HITCBC 305   7668 return impl; 305   6427 return impl;
306   // Stale impl from a destroyed service 306   // Stale impl from a destroyed service
MISUBC 307   delete impl; 307   delete impl;
308   } 308   }
HITCBC 309   253 return nullptr; 309   253 return nullptr;
310   } 310   }
311   311  
312   inline bool 312   inline bool
HITCBC 313   7913 try_push_tl_cache(timer_service::implementation* impl) noexcept 313   6672 try_push_tl_cache(timer_service::implementation* impl) noexcept
314   { 314   {
HITCBC 315   7913 if (!tl_cached_impl.get()) 315   6672 if (!tl_cached_impl.get())
316   { 316   {
HITCBC 317   7833 tl_cached_impl.set(impl); 317   6592 tl_cached_impl.set(impl);
HITCBC 318   7833 return true; 318   6592 return true;
319   } 319   }
HITCBC 320   80 return false; 320   80 return false;
321   } 321   }
322   322  
323   inline waiter_node* 323   inline waiter_node*
HITCBC 324   7709 try_pop_waiter_tl_cache() noexcept 324   6475 try_pop_waiter_tl_cache() noexcept
325   { 325   {
HITCBC 326   7709 auto* w = tl_cached_waiter.get(); 326   6475 auto* w = tl_cached_waiter.get();
HITCBC 327   7709 if (w) 327   6475 if (w)
328   { 328   {
HITCBC 329   7493 tl_cached_waiter.set(nullptr); 329   6259 tl_cached_waiter.set(nullptr);
HITCBC 330   7493 return w; 330   6259 return w;
331   } 331   }
HITCBC 332   216 return nullptr; 332   216 return nullptr;
333   } 333   }
334   334  
335   inline bool 335   inline bool
HITCBC 336   7693 try_push_waiter_tl_cache(waiter_node* w) noexcept 336   6459 try_push_waiter_tl_cache(waiter_node* w) noexcept
337   { 337   {
HITCBC 338   7693 if (!tl_cached_waiter.get()) 338   6459 if (!tl_cached_waiter.get())
339   { 339   {
HITCBC 340   7613 tl_cached_waiter.set(w); 340   6379 tl_cached_waiter.set(w);
HITCBC 341   7613 return true; 341   6379 return true;
342   } 342   }
HITCBC 343   80 return false; 343   80 return false;
344   } 344   }
345   345  
346   inline void 346   inline void
HITCBC 347   605 timer_service_invalidate_cache() noexcept 347   605 timer_service_invalidate_cache() noexcept
348   { 348   {
HITCBC 349   605 delete tl_cached_impl.get(); 349   605 delete tl_cached_impl.get();
HITCBC 350   605 tl_cached_impl.set(nullptr); 350   605 tl_cached_impl.set(nullptr);
351   351  
HITCBC 352   605 delete tl_cached_waiter.get(); 352   605 delete tl_cached_waiter.get();
HITCBC 353   605 tl_cached_waiter.set(nullptr); 353   605 tl_cached_waiter.set(nullptr);
HITCBC 354   605 } 354   605 }
355   355  
356   // timer_service out-of-class member function definitions 356   // timer_service out-of-class member function definitions
357   357  
HITCBC 358   253 inline timer_service::implementation::implementation( 358   253 inline timer_service::implementation::implementation(
HITCBC 359   253 timer_service& svc) noexcept 359   253 timer_service& svc) noexcept
HITCBC 360   253 : svc_(&svc) 360   253 : svc_(&svc)
361   { 361   {
HITCBC 362   253 } 362   253 }
363   363  
364   inline void 364   inline void
HITCBC 365   605 timer_service::shutdown() 365   605 timer_service::shutdown()
366   { 366   {
HITCBC 367   605 timer_service_invalidate_cache(); 367   605 timer_service_invalidate_cache();
HITCBC 368   605 shutting_down_ = true; 368   605 shutting_down_ = true;
369   369  
370   // Snapshot impls and detach them from the heap so that 370   // Snapshot impls and detach them from the heap so that
371   // coroutine-owned timer destructors (triggered by h.destroy() 371   // coroutine-owned timer destructors (triggered by h.destroy()
372   // below) cannot re-enter remove_timer_impl() and mutate the 372   // below) cannot re-enter remove_timer_impl() and mutate the
373   // vector during iteration. 373   // vector during iteration.
HITCBC 374   605 std::vector<implementation*> impls; 374   605 std::vector<implementation*> impls;
HITCBC 375   605 impls.reserve(heap_.size()); 375   605 impls.reserve(heap_.size());
HITCBC 376   613 for (auto& entry : heap_) 376   613 for (auto& entry : heap_)
377   { 377   {
HITCBC 378   8 entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)(); 378   8 entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)();
HITCBC 379   8 impls.push_back(entry.timer_); 379   8 impls.push_back(entry.timer_);
380   } 380   }
HITCBC 381   605 heap_.clear(); 381   605 heap_.clear();
HITCBC 382   605 cached_nearest_ns_.store( 382   605 cached_nearest_ns_.store(
383   (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release); 383   (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
384   384  
385   // Cancel waiting timers. Each waiter called work_started() 385   // Cancel waiting timers. Each waiter called work_started()
386   // in implementation::wait(). On IOCP the scheduler shutdown 386   // in implementation::wait(). On IOCP the scheduler shutdown
387   // loop exits when outstanding_work_ reaches zero, so we must 387   // loop exits when outstanding_work_ reaches zero, so we must
388   // call work_finished() here to balance it. On other backends 388   // call work_finished() here to balance it. On other backends
389   // this is harmless. 389   // this is harmless.
HITCBC 390   613 for (auto* impl : impls) 390   613 for (auto* impl : impls)
391   { 391   {
HITCBC 392   16 while (auto* w = impl->waiters_.pop_front()) 392   16 while (auto* w = impl->waiters_.pop_front())
393   { 393   {
HITCBC 394   8 w->stop_cb_.reset(); 394   8 w->stop_cb_.reset();
HITCBC 395   8 auto h = std::exchange(w->h_, {}); 395   8 auto h = std::exchange(w->h_, {});
HITCBC 396   8 sched_->work_finished(); 396   8 sched_->work_finished();
HITCBC 397   8 if (h) 397   8 if (h)
HITCBC 398   8 h.destroy(); 398   8 h.destroy();
HITCBC 399   8 delete w; 399   8 delete w;
HITCBC 400   8 } 400   8 }
HITCBC 401   8 delete impl; 401   8 delete impl;
402   } 402   }
403   403  
404   // Delete free-listed impls 404   // Delete free-listed impls
HITCBC 405   685 while (free_list_) 405   685 while (free_list_)
406   { 406   {
HITCBC 407   80 auto* next = free_list_->next_free_; 407   80 auto* next = free_list_->next_free_;
HITCBC 408   80 delete free_list_; 408   80 delete free_list_;
HITCBC 409   80 free_list_ = next; 409   80 free_list_ = next;
410   } 410   }
411   411  
412   // Delete free-listed waiters 412   // Delete free-listed waiters
HITCBC 413   683 while (waiter_free_list_) 413   683 while (waiter_free_list_)
414   { 414   {
HITCBC 415   78 auto* next = waiter_free_list_->next_free_; 415   78 auto* next = waiter_free_list_->next_free_;
HITCBC 416   78 delete waiter_free_list_; 416   78 delete waiter_free_list_;
HITCBC 417   78 waiter_free_list_ = next; 417   78 waiter_free_list_ = next;
418   } 418   }
HITCBC 419   605 } 419   605 }
420   420  
421   inline io_object::implementation* 421   inline io_object::implementation*
HITCBC 422   7921 timer_service::construct() 422   6680 timer_service::construct()
423   { 423   {
HITCBC 424   7921 implementation* impl = try_pop_tl_cache(this); 424   6680 implementation* impl = try_pop_tl_cache(this);
HITCBC 425   7921 if (impl) 425   6680 if (impl)
426   { 426   {
HITCBC 427   7668 impl->svc_ = this; 427   6427 impl->svc_ = this;
HITCBC 428   7668 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)(); 428   6427 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
HITCBC 429   7668 impl->might_have_pending_waits_ = false; 429   6427 impl->might_have_pending_waits_ = false;
HITCBC 430   7668 return impl; 430   6427 return impl;
431   } 431   }
432   432  
HITCBC 433   253 std::lock_guard lock(mutex_); 433   253 std::lock_guard lock(mutex_);
HITCBC 434   253 if (free_list_) 434   253 if (free_list_)
435   { 435   {
MISUBC 436   impl = free_list_; 436   impl = free_list_;
MISUBC 437   free_list_ = impl->next_free_; 437   free_list_ = impl->next_free_;
MISUBC 438   impl->next_free_ = nullptr; 438   impl->next_free_ = nullptr;
MISUBC 439   impl->svc_ = this; 439   impl->svc_ = this;
MISUBC 440   impl->heap_index_ = (std::numeric_limits<std::size_t>::max)(); 440   impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
MISUBC 441   impl->might_have_pending_waits_ = false; 441   impl->might_have_pending_waits_ = false;
442   } 442   }
443   else 443   else
444   { 444   {
HITCBC 445   253 impl = new implementation(*this); 445   253 impl = new implementation(*this);
446   } 446   }
HITCBC 447   253 return impl; 447   253 return impl;
HITCBC 448   253 } 448   253 }
449   449  
450   inline void 450   inline void
HITCBC 451   7919 timer_service::destroy(io_object::implementation* p) 451   6678 timer_service::destroy(io_object::implementation* p)
452   { 452   {
HITCBC 453   7919 destroy_impl(static_cast<implementation&>(*p)); 453   6678 destroy_impl(static_cast<implementation&>(*p));
HITCBC 454   7919 } 454   6678 }
455   455  
456   inline void 456   inline void
HITCBC 457   7919 timer_service::destroy_impl(implementation& impl) 457   6678 timer_service::destroy_impl(implementation& impl)
458   { 458   {
459   // During shutdown the impl is owned by the shutdown loop. 459   // During shutdown the impl is owned by the shutdown loop.
460   // Re-entering here (from a coroutine-owned timer destructor 460   // Re-entering here (from a coroutine-owned timer destructor
461   // triggered by h.destroy()) must not modify the heap or 461   // triggered by h.destroy()) must not modify the heap or
462   // recycle the impl — shutdown deletes it directly. 462   // recycle the impl — shutdown deletes it directly.
HITCBC 463   7919 if (shutting_down_) 463   6678 if (shutting_down_)
HITCBC 464   7839 return; 464   6598 return;
465   465  
HITCBC 466   7913 cancel_timer(impl); 466   6672 cancel_timer(impl);
467   467  
HITCBC 468   7913 if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)()) 468   6672 if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
469   { 469   {
MISUBC 470   std::lock_guard lock(mutex_); 470   std::lock_guard lock(mutex_);
MISUBC 471   remove_timer_impl(impl); 471   remove_timer_impl(impl);
MISUBC 472   refresh_cached_nearest(); 472   refresh_cached_nearest();
MISUBC 473   } 473   }
474   474  
HITCBC 475   7913 if (try_push_tl_cache(&impl)) 475   6672 if (try_push_tl_cache(&impl))
HITCBC 476   7833 return; 476   6592 return;
477   477  
HITCBC 478   80 std::lock_guard lock(mutex_); 478   80 std::lock_guard lock(mutex_);
HITCBC 479   80 impl.next_free_ = free_list_; 479   80 impl.next_free_ = free_list_;
HITCBC 480   80 free_list_ = &impl; 480   80 free_list_ = &impl;
HITCBC 481   80 } 481   80 }
482   482  
483   inline waiter_node* 483   inline waiter_node*
HITCBC 484   7709 timer_service::create_waiter() 484   6475 timer_service::create_waiter()
485   { 485   {
HITCBC 486   7709 if (auto* w = try_pop_waiter_tl_cache()) 486   6475 if (auto* w = try_pop_waiter_tl_cache())
HITCBC 487   7493 return w; 487   6259 return w;
488   488  
HITCBC 489   216 std::lock_guard lock(mutex_); 489   216 std::lock_guard lock(mutex_);
HITCBC 490   216 if (waiter_free_list_) 490   216 if (waiter_free_list_)
491   { 491   {
HITCBC 492   2 auto* w = waiter_free_list_; 492   2 auto* w = waiter_free_list_;
HITCBC 493   2 waiter_free_list_ = w->next_free_; 493   2 waiter_free_list_ = w->next_free_;
HITCBC 494   2 w->next_free_ = nullptr; 494   2 w->next_free_ = nullptr;
HITCBC 495   2 return w; 495   2 return w;
496   } 496   }
497   497  
HITCBC 498   214 return new waiter_node(); 498   214 return new waiter_node();
HITCBC 499   216 } 499   216 }
500   500  
501   inline void 501   inline void
HITCBC 502   7693 timer_service::destroy_waiter(waiter_node* w) 502   6459 timer_service::destroy_waiter(waiter_node* w)
503   { 503   {
HITCBC 504   7693 if (try_push_waiter_tl_cache(w)) 504   6459 if (try_push_waiter_tl_cache(w))
HITCBC 505   7613 return; 505   6379 return;
506   506  
HITCBC 507   80 std::lock_guard lock(mutex_); 507   80 std::lock_guard lock(mutex_);
HITCBC 508   80 w->next_free_ = waiter_free_list_; 508   80 w->next_free_ = waiter_free_list_;
HITCBC 509   80 waiter_free_list_ = w; 509   80 waiter_free_list_ = w;
HITCBC 510   80 } 510   80 }
511   511  
512   inline std::size_t 512   inline std::size_t
HITCBC 513   6 timer_service::update_timer(implementation& impl, time_point new_time) 513   6 timer_service::update_timer(implementation& impl, time_point new_time)
514   { 514   {
515   bool in_heap = 515   bool in_heap =
HITCBC 516   6 (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)()); 516   6 (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
HITCBC 517   6 if (!in_heap && impl.waiters_.empty()) 517   6 if (!in_heap && impl.waiters_.empty())
MISUBC 518   return 0; 518   return 0;
519   519  
HITCBC 520   6 bool notify = false; 520   6 bool notify = false;
HITCBC 521   6 intrusive_list<waiter_node> canceled; 521   6 intrusive_list<waiter_node> canceled;
522   522  
523   { 523   {
HITCBC 524   6 std::lock_guard lock(mutex_); 524   6 std::lock_guard lock(mutex_);
525   525  
HITCBC 526   16 while (auto* w = impl.waiters_.pop_front()) 526   16 while (auto* w = impl.waiters_.pop_front())
527   { 527   {
HITCBC 528   10 w->impl_ = nullptr; 528   10 w->impl_ = nullptr;
HITCBC 529   10 canceled.push_back(w); 529   10 canceled.push_back(w);
HITCBC 530   10 } 530   10 }
531   531  
HITCBC 532   6 if (impl.heap_index_ < heap_.size()) 532   6 if (impl.heap_index_ < heap_.size())
533   { 533   {
HITCBC 534   6 time_point old_time = heap_[impl.heap_index_].time_; 534   6 time_point old_time = heap_[impl.heap_index_].time_;
HITCBC 535   6 heap_[impl.heap_index_].time_ = new_time; 535   6 heap_[impl.heap_index_].time_ = new_time;
536   536  
HITCBC 537   6 if (new_time < old_time) 537   6 if (new_time < old_time)
HITCBC 538   6 up_heap(impl.heap_index_); 538   6 up_heap(impl.heap_index_);
539   else 539   else
MISUBC 540   down_heap(impl.heap_index_); 540   down_heap(impl.heap_index_);
541   541  
HITCBC 542   6 notify = (impl.heap_index_ == 0); 542   6 notify = (impl.heap_index_ == 0);
543   } 543   }
544   544  
HITCBC 545   6 refresh_cached_nearest(); 545   6 refresh_cached_nearest();
HITCBC 546   6 } 546   6 }
547   547  
HITCBC 548   6 std::size_t count = 0; 548   6 std::size_t count = 0;
HITCBC 549   16 while (auto* w = canceled.pop_front()) 549   16 while (auto* w = canceled.pop_front())
550   { 550   {
HITCBC 551   10 w->ec_value_ = make_error_code(capy::error::canceled); 551   10 w->ec_value_ = make_error_code(capy::error::canceled);
HITCBC 552   10 sched_->post(&w->op_); 552   10 sched_->post(&w->op_);
HITCBC 553   10 ++count; 553   10 ++count;
HITCBC 554   10 } 554   10 }
555   555  
HITCBC 556   6 if (notify) 556   6 if (notify)
HITCBC 557   6 on_earliest_changed_(); 557   6 on_earliest_changed_();
558   558  
HITCBC 559   6 return count; 559   6 return count;
560   } 560   }
561   561  
562   inline void 562   inline void
HITCBC 563   7709 timer_service::insert_waiter(implementation& impl, waiter_node* w) 563   6475 timer_service::insert_waiter(implementation& impl, waiter_node* w)
564   { 564   {
HITCBC 565   7709 bool notify = false; 565   6475 bool notify = false;
566   { 566   {
HITCBC 567   7709 std::lock_guard lock(mutex_); 567   6475 std::lock_guard lock(mutex_);
HITCBC 568   7709 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)()) 568   6475 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
569   { 569   {
HITCBC 570   7687 impl.heap_index_ = heap_.size(); 570   6453 impl.heap_index_ = heap_.size();
HITCBC 571   7687 heap_.push_back({impl.expiry_, &impl}); 571   6453 heap_.push_back({impl.expiry_, &impl});
HITCBC 572   7687 up_heap(heap_.size() - 1); 572   6453 up_heap(heap_.size() - 1);
HITCBC 573   7687 notify = (impl.heap_index_ == 0); 573   6453 notify = (impl.heap_index_ == 0);
HITCBC 574   7687 refresh_cached_nearest(); 574   6453 refresh_cached_nearest();
575   } 575   }
HITCBC 576   7709 impl.waiters_.push_back(w); 576   6475 impl.waiters_.push_back(w);
HITCBC 577   7709 } 577   6475 }
HITCBC 578   7709 if (notify) 578   6475 if (notify)
HITCBC 579   7657 on_earliest_changed_(); 579   6424 on_earliest_changed_();
HITCBC 580   7709 } 580   6475 }
581   581  
582   inline std::size_t 582   inline std::size_t
HITCBC 583   7921 timer_service::cancel_timer(implementation& impl) 583   6680 timer_service::cancel_timer(implementation& impl)
584   { 584   {
HITCBC 585   7921 if (!impl.might_have_pending_waits_) 585   6680 if (!impl.might_have_pending_waits_)
HITCBC 586   7897 return 0; 586   6656 return 0;
587   587  
588   // Not in heap and no waiters — just clear the flag 588   // Not in heap and no waiters — just clear the flag
HITCBC 589   24 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() && 589   24 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
MISUBC 590   impl.waiters_.empty()) 590   impl.waiters_.empty())
591   { 591   {
MISUBC 592   impl.might_have_pending_waits_ = false; 592   impl.might_have_pending_waits_ = false;
MISUBC 593   return 0; 593   return 0;
594   } 594   }
595   595  
HITCBC 596   24 intrusive_list<waiter_node> canceled; 596   24 intrusive_list<waiter_node> canceled;
597   597  
598   { 598   {
HITCBC 599   24 std::lock_guard lock(mutex_); 599   24 std::lock_guard lock(mutex_);
HITCBC 600   24 remove_timer_impl(impl); 600   24 remove_timer_impl(impl);
HITCBC 601   52 while (auto* w = impl.waiters_.pop_front()) 601   52 while (auto* w = impl.waiters_.pop_front())
602   { 602   {
HITCBC 603   28 w->impl_ = nullptr; 603   28 w->impl_ = nullptr;
HITCBC 604   28 canceled.push_back(w); 604   28 canceled.push_back(w);
HITCBC 605   28 } 605   28 }
HITCBC 606   24 refresh_cached_nearest(); 606   24 refresh_cached_nearest();
HITCBC 607   24 } 607   24 }
608   608  
HITCBC 609   24 impl.might_have_pending_waits_ = false; 609   24 impl.might_have_pending_waits_ = false;
610   610  
HITCBC 611   24 std::size_t count = 0; 611   24 std::size_t count = 0;
HITCBC 612   52 while (auto* w = canceled.pop_front()) 612   52 while (auto* w = canceled.pop_front())
613   { 613   {
HITCBC 614   28 w->ec_value_ = make_error_code(capy::error::canceled); 614   28 w->ec_value_ = make_error_code(capy::error::canceled);
HITCBC 615   28 sched_->post(&w->op_); 615   28 sched_->post(&w->op_);
HITCBC 616   28 ++count; 616   28 ++count;
HITCBC 617   28 } 617   28 }
618   618  
HITCBC 619   24 return count; 619   24 return count;
620   } 620   }
621   621  
622   inline void 622   inline void
HITCBC 623   30 timer_service::cancel_waiter(waiter_node* w) 623   30 timer_service::cancel_waiter(waiter_node* w)
624   { 624   {
625   { 625   {
HITCBC 626   30 std::lock_guard lock(mutex_); 626   30 std::lock_guard lock(mutex_);
627   // Already removed by cancel_timer or process_expired 627   // Already removed by cancel_timer or process_expired
HITCBC 628   30 if (!w->impl_) 628   30 if (!w->impl_)
MISUBC 629   return; 629   return;
HITCBC 630   30 auto* impl = w->impl_; 630   30 auto* impl = w->impl_;
HITCBC 631   30 w->impl_ = nullptr; 631   30 w->impl_ = nullptr;
HITCBC 632   30 impl->waiters_.remove(w); 632   30 impl->waiters_.remove(w);
HITCBC 633   30 if (impl->waiters_.empty()) 633   30 if (impl->waiters_.empty())
634   { 634   {
HITCBC 635   28 remove_timer_impl(*impl); 635   28 remove_timer_impl(*impl);
HITCBC 636   28 impl->might_have_pending_waits_ = false; 636   28 impl->might_have_pending_waits_ = false;
637   } 637   }
HITCBC 638   30 refresh_cached_nearest(); 638   30 refresh_cached_nearest();
HITCBC 639   30 } 639   30 }
640   640  
HITCBC 641   30 w->ec_value_ = make_error_code(capy::error::canceled); 641   30 w->ec_value_ = make_error_code(capy::error::canceled);
HITCBC 642   30 sched_->post(&w->op_); 642   30 sched_->post(&w->op_);
643   } 643   }
644   644  
645   inline std::size_t 645   inline std::size_t
HITCBC 646   2 timer_service::cancel_one_waiter(implementation& impl) 646   2 timer_service::cancel_one_waiter(implementation& impl)
647   { 647   {
HITCBC 648   2 if (!impl.might_have_pending_waits_) 648   2 if (!impl.might_have_pending_waits_)
MISUBC 649   return 0; 649   return 0;
650   650  
HITCBC 651   2 waiter_node* w = nullptr; 651   2 waiter_node* w = nullptr;
652   652  
653   { 653   {
HITCBC 654   2 std::lock_guard lock(mutex_); 654   2 std::lock_guard lock(mutex_);
HITCBC 655   2 w = impl.waiters_.pop_front(); 655   2 w = impl.waiters_.pop_front();
HITCBC 656   2 if (!w) 656   2 if (!w)
MISUBC 657   return 0; 657   return 0;
HITCBC 658   2 w->impl_ = nullptr; 658   2 w->impl_ = nullptr;
HITCBC 659   2 if (impl.waiters_.empty()) 659   2 if (impl.waiters_.empty())
660   { 660   {
MISUBC 661   remove_timer_impl(impl); 661   remove_timer_impl(impl);
MISUBC 662   impl.might_have_pending_waits_ = false; 662   impl.might_have_pending_waits_ = false;
663   } 663   }
HITCBC 664   2 refresh_cached_nearest(); 664   2 refresh_cached_nearest();
HITCBC 665   2 } 665   2 }
666   666  
HITCBC 667   2 w->ec_value_ = make_error_code(capy::error::canceled); 667   2 w->ec_value_ = make_error_code(capy::error::canceled);
HITCBC 668   2 sched_->post(&w->op_); 668   2 sched_->post(&w->op_);
HITCBC 669   2 return 1; 669   2 return 1;
670   } 670   }
671   671  
672   inline std::size_t 672   inline std::size_t
HITCBC 673   241190 timer_service::process_expired() 673   128211 timer_service::process_expired()
674   { 674   {
HITCBC 675   241190 intrusive_list<waiter_node> expired; 675   128211 intrusive_list<waiter_node> expired;
676   676  
677   { 677   {
HITCBC 678   241190 std::lock_guard lock(mutex_); 678   128211 std::lock_guard lock(mutex_);
HITCBC 679   241190 auto now = clock_type::now(); 679   128211 auto now = clock_type::now();
680   680  
HITCBC 681   248817 while (!heap_.empty() && heap_[0].time_ <= now) 681   134604 while (!heap_.empty() && heap_[0].time_ <= now)
682   { 682   {
HITCBC 683   7627 implementation* t = heap_[0].timer_; 683   6393 implementation* t = heap_[0].timer_;
HITCBC 684   7627 remove_timer_impl(*t); 684   6393 remove_timer_impl(*t);
HITCBC 685   15258 while (auto* w = t->waiters_.pop_front()) 685   12790 while (auto* w = t->waiters_.pop_front())
686   { 686   {
HITCBC 687   7631 w->impl_ = nullptr; 687   6397 w->impl_ = nullptr;
HITCBC 688   7631 w->ec_value_ = {}; 688   6397 w->ec_value_ = {};
HITCBC 689   7631 expired.push_back(w); 689   6397 expired.push_back(w);
HITCBC 690   7631 } 690   6397 }
HITCBC 691   7627 t->might_have_pending_waits_ = false; 691   6393 t->might_have_pending_waits_ = false;
692   } 692   }
693   693  
HITCBC 694   241190 refresh_cached_nearest(); 694   128211 refresh_cached_nearest();
HITCBC 695   241190 } 695   128211 }
696   696  
HITCBC 697   241190 std::size_t count = 0; 697   128211 std::size_t count = 0;
HITCBC 698   248821 while (auto* w = expired.pop_front()) 698   134608 while (auto* w = expired.pop_front())
699   { 699   {
HITCBC 700   7631 sched_->post(&w->op_); 700   6397 sched_->post(&w->op_);
HITCBC 701   7631 ++count; 701   6397 ++count;
HITCBC 702   7631 } 702   6397 }
703   703  
HITCBC 704   241190 return count; 704   128211 return count;
705   } 705   }
706   706  
707   inline void 707   inline void
HITCBC 708   7679 timer_service::remove_timer_impl(implementation& impl) 708   6445 timer_service::remove_timer_impl(implementation& impl)
709   { 709   {
HITCBC 710   7679 std::size_t index = impl.heap_index_; 710   6445 std::size_t index = impl.heap_index_;
HITCBC 711   7679 if (index >= heap_.size()) 711   6445 if (index >= heap_.size())
MISUBC 712   return; // Not in heap 712   return; // Not in heap
713   713  
HITCBC 714   7679 if (index == heap_.size() - 1) 714   6445 if (index == heap_.size() - 1)
715   { 715   {
716   // Last element, just pop 716   // Last element, just pop
HITCBC 717   157 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)(); 717   158 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
HITCBC 718   157 heap_.pop_back(); 718   158 heap_.pop_back();
719   } 719   }
720   else 720   else
721   { 721   {
722   // Swap with last and reheapify 722   // Swap with last and reheapify
HITCBC 723   7522 swap_heap(index, heap_.size() - 1); 723   6287 swap_heap(index, heap_.size() - 1);
HITCBC 724   7522 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)(); 724   6287 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
HITCBC 725   7522 heap_.pop_back(); 725   6287 heap_.pop_back();
726   726  
HITCBC 727   7522 if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_) 727   6287 if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
MISUBC 728   up_heap(index); 728   up_heap(index);
729   else 729   else
HITCBC 730   7522 down_heap(index); 730   6287 down_heap(index);
731   } 731   }
732   } 732   }
733   733  
734   inline void 734   inline void
HITCBC 735   7693 timer_service::up_heap(std::size_t index) 735   6459 timer_service::up_heap(std::size_t index)
736   { 736   {
HITCBC 737   15195 while (index > 0) 737   12727 while (index > 0)
738   { 738   {
HITCBC 739   7532 std::size_t parent = (index - 1) / 2; 739   6297 std::size_t parent = (index - 1) / 2;
HITCBC 740   7532 if (!(heap_[index].time_ < heap_[parent].time_)) 740   6297 if (!(heap_[index].time_ < heap_[parent].time_))
HITCBC 741   30 break; 741   29 break;
HITCBC 742   7502 swap_heap(index, parent); 742   6268 swap_heap(index, parent);
HITCBC 743   7502 index = parent; 743   6268 index = parent;
744   } 744   }
HITCBC 745   7693 } 745   6459 }
746   746  
747   inline void 747   inline void
HITCBC 748   7522 timer_service::down_heap(std::size_t index) 748   6287 timer_service::down_heap(std::size_t index)
749   { 749   {
HITCBC 750   7522 std::size_t child = index * 2 + 1; 750   6287 std::size_t child = index * 2 + 1;
HITCBC 751   7522 while (child < heap_.size()) 751   6287 while (child < heap_.size())
752   { 752   {
HITCBC 753   6 std::size_t min_child = (child + 1 == heap_.size() || 753   6 std::size_t min_child = (child + 1 == heap_.size() ||
MISUBC 754   heap_[child].time_ < heap_[child + 1].time_) 754   heap_[child].time_ < heap_[child + 1].time_)
HITCBC 755   6 ? child 755   6 ? child
HITCBC 756   6 : child + 1; 756   6 : child + 1;
757   757  
HITCBC 758   6 if (heap_[index].time_ < heap_[min_child].time_) 758   6 if (heap_[index].time_ < heap_[min_child].time_)
HITCBC 759   6 break; 759   6 break;
760   760  
MISUBC 761   swap_heap(index, min_child); 761   swap_heap(index, min_child);
MISUBC 762   index = min_child; 762   index = min_child;
MISUBC 763   child = index * 2 + 1; 763   child = index * 2 + 1;
764   } 764   }
HITCBC 765   7522 } 765   6287 }
766   766  
767   inline void 767   inline void
HITCBC 768   15024 timer_service::swap_heap(std::size_t i1, std::size_t i2) 768   12555 timer_service::swap_heap(std::size_t i1, std::size_t i2)
769   { 769   {
HITCBC 770   15024 heap_entry tmp = heap_[i1]; 770   12555 heap_entry tmp = heap_[i1];
HITCBC 771   15024 heap_[i1] = heap_[i2]; 771   12555 heap_[i1] = heap_[i2];
HITCBC 772   15024 heap_[i2] = tmp; 772   12555 heap_[i2] = tmp;
HITCBC 773   15024 heap_[i1].timer_->heap_index_ = i1; 773   12555 heap_[i1].timer_->heap_index_ = i1;
HITCBC 774   15024 heap_[i2].timer_->heap_index_ = i2; 774   12555 heap_[i2].timer_->heap_index_ = i2;
HITCBC 775   15024 } 775   12555 }
776   776  
777   // waiter_node out-of-class member function definitions 777   // waiter_node out-of-class member function definitions
778   778  
779   inline void 779   inline void
HITCBC 780   30 waiter_node::canceller::operator()() const 780   30 waiter_node::canceller::operator()() const
781   { 781   {
HITCBC 782   30 waiter_->svc_->cancel_waiter(waiter_); 782   30 waiter_->svc_->cancel_waiter(waiter_);
HITCBC 783   30 } 783   30 }
784   784  
785   inline void 785   inline void
MISUBC 786   waiter_node::completion_op::do_complete( 786   waiter_node::completion_op::do_complete(
787   [[maybe_unused]] void* owner, 787   [[maybe_unused]] void* owner,
788   scheduler_op* base, 788   scheduler_op* base,
789   std::uint32_t, 789   std::uint32_t,
790   std::uint32_t) 790   std::uint32_t)
791   { 791   {
792   // owner is always non-null here. The destroy path (owner == nullptr) 792   // owner is always non-null here. The destroy path (owner == nullptr)
793   // is unreachable because completion_op overrides destroy() directly, 793   // is unreachable because completion_op overrides destroy() directly,
794   // bypassing scheduler_op::destroy() which would call func_(nullptr, ...). 794   // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
MISUBC 795   BOOST_COROSIO_ASSERT(owner); 795   BOOST_COROSIO_ASSERT(owner);
MISUBC 796   static_cast<completion_op*>(base)->operator()(); 796   static_cast<completion_op*>(base)->operator()();
MISUBC 797   } 797   }
798   798  
799   inline void 799   inline void
HITCBC 800   7693 waiter_node::completion_op::operator()() 800   6459 waiter_node::completion_op::operator()()
801   { 801   {
HITCBC 802   7693 auto* w = waiter_; 802   6459 auto* w = waiter_;
HITCBC 803   7693 w->stop_cb_.reset(); 803   6459 w->stop_cb_.reset();
HITCBC 804   7693 if (w->ec_out_) 804   6459 if (w->ec_out_)
HITCBC 805   7693 *w->ec_out_ = w->ec_value_; 805   6459 *w->ec_out_ = w->ec_value_;
806   806  
HITCBC 807   7693 auto* cont = w->cont_; 807   6459 auto* cont = w->cont_;
HITCBC 808   7693 auto d = w->d_; 808   6459 auto d = w->d_;
HITCBC 809   7693 auto* svc = w->svc_; 809   6459 auto* svc = w->svc_;
HITCBC 810   7693 auto& sched = svc->get_scheduler(); 810   6459 auto& sched = svc->get_scheduler();
811   811  
HITCBC 812   7693 svc->destroy_waiter(w); 812   6459 svc->destroy_waiter(w);
813   813  
HITCBC 814   7693 d.post(*cont); 814   6459 d.post(*cont);
HITCBC 815   7693 sched.work_finished(); 815   6459 sched.work_finished();
HITCBC 816   7693 } 816   6459 }
817   817  
818   // GCC 14 false-positive: inlining ~optional<stop_callback> through 818   // GCC 14 false-positive: inlining ~optional<stop_callback> through
819   // delete loses track that stop_cb_ was already .reset() above. 819   // delete loses track that stop_cb_ was already .reset() above.
820   #if defined(__GNUC__) && !defined(__clang__) 820   #if defined(__GNUC__) && !defined(__clang__)
821   #pragma GCC diagnostic push 821   #pragma GCC diagnostic push
822   #pragma GCC diagnostic ignored "-Wmaybe-uninitialized" 822   #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
823   #endif 823   #endif
824   inline void 824   inline void
HITCBC 825   8 waiter_node::completion_op::destroy() 825   8 waiter_node::completion_op::destroy()
826   { 826   {
827   // Called during scheduler shutdown drain when this completion_op is 827   // Called during scheduler shutdown drain when this completion_op is
828   // in the scheduler's ready queue (posted by cancel_timer() or 828   // in the scheduler's ready queue (posted by cancel_timer() or
829   // process_expired()). Balances the work_started() from 829   // process_expired()). Balances the work_started() from
830   // implementation::wait(). The scheduler drain loop separately 830   // implementation::wait(). The scheduler drain loop separately
831   // balances the work_started() from post(). On IOCP both decrements 831   // balances the work_started() from post(). On IOCP both decrements
832   // are required for outstanding_work_ to reach zero; on other 832   // are required for outstanding_work_ to reach zero; on other
833   // backends this is harmless. 833   // backends this is harmless.
834   // 834   //
835   // This override also prevents scheduler_op::destroy() from calling 835   // This override also prevents scheduler_op::destroy() from calling
836   // do_complete(nullptr, ...). See also: timer_service::shutdown() 836   // do_complete(nullptr, ...). See also: timer_service::shutdown()
837   // which drains waiters still in the timer heap (the other path). 837   // which drains waiters still in the timer heap (the other path).
HITCBC 838   8 auto* w = waiter_; 838   8 auto* w = waiter_;
HITCBC 839   8 w->stop_cb_.reset(); 839   8 w->stop_cb_.reset();
HITCBC 840   8 auto h = std::exchange(w->h_, {}); 840   8 auto h = std::exchange(w->h_, {});
HITCBC 841   8 auto& sched = w->svc_->get_scheduler(); 841   8 auto& sched = w->svc_->get_scheduler();
HITCBC 842   8 delete w; 842   8 delete w;
HITCBC 843   8 sched.work_finished(); 843   8 sched.work_finished();
HITCBC 844   8 if (h) 844   8 if (h)
HITCBC 845   8 h.destroy(); 845   8 h.destroy();
HITCBC 846   8 } 846   8 }
847   #if defined(__GNUC__) && !defined(__clang__) 847   #if defined(__GNUC__) && !defined(__clang__)
848   #pragma GCC diagnostic pop 848   #pragma GCC diagnostic pop
849   #endif 849   #endif
850   850  
851   inline std::coroutine_handle<> 851   inline std::coroutine_handle<>
HITCBC 852   7713 timer_service::implementation::wait( 852   6478 timer_service::implementation::wait(
853   std::coroutine_handle<> h, 853   std::coroutine_handle<> h,
854   capy::executor_ref d, 854   capy::executor_ref d,
855   std::stop_token token, 855   std::stop_token token,
856   std::error_code* ec, 856   std::error_code* ec,
857   capy::continuation* cont) 857   capy::continuation* cont)
858   { 858   {
859   // Already-expired fast path — no waiter_node, no mutex. 859   // Already-expired fast path — no waiter_node, no mutex.
860   // Post instead of dispatch so the coroutine yields to the 860   // Post instead of dispatch so the coroutine yields to the
861   // scheduler, allowing other queued work to run. 861   // scheduler, allowing other queued work to run.
HITCBC 862   7713 if (heap_index_ == (std::numeric_limits<std::size_t>::max)()) 862   6478 if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
863   { 863   {
HITCBC 864   7691 if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now()) 864   6456 if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
865   { 865   {
HITCBC 866   4 if (ec) 866   3 if (ec)
HITCBC 867   4 *ec = {}; 867   3 *ec = {};
HITCBC 868   4 d.post(*cont); 868   3 d.post(*cont);
HITCBC 869   4 return std::noop_coroutine(); 869   3 return std::noop_coroutine();
870   } 870   }
871   } 871   }
872   872  
HITCBC 873   7709 auto* w = svc_->create_waiter(); 873   6475 auto* w = svc_->create_waiter();
HITCBC 874   7709 w->impl_ = this; 874   6475 w->impl_ = this;
HITCBC 875   7709 w->svc_ = svc_; 875   6475 w->svc_ = svc_;
HITCBC 876   7709 w->h_ = h; 876   6475 w->h_ = h;
HITCBC 877   7709 w->cont_ = cont; 877   6475 w->cont_ = cont;
HITCBC 878   7709 w->d_ = d; 878   6475 w->d_ = d;
HITCBC 879   7709 w->token_ = std::move(token); 879   6475 w->token_ = std::move(token);
HITCBC 880   7709 w->ec_out_ = ec; 880   6475 w->ec_out_ = ec;
881   881  
HITCBC 882   7709 svc_->insert_waiter(*this, w); 882   6475 svc_->insert_waiter(*this, w);
HITCBC 883   7709 might_have_pending_waits_ = true; 883   6475 might_have_pending_waits_ = true;
HITCBC 884   7709 svc_->get_scheduler().work_started(); 884   6475 svc_->get_scheduler().work_started();
885   885  
HITCBC 886   7709 if (w->token_.stop_possible()) 886   6475 if (w->token_.stop_possible())
HITCBC 887   48 w->stop_cb_.emplace(w->token_, waiter_node::canceller{w}); 887   48 w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
888   888  
HITCBC 889   7709 return std::noop_coroutine(); 889   6475 return std::noop_coroutine();
890   } 890   }
891   891  
892   // Free functions 892   // Free functions
893   893  
894   struct timer_service_access 894   struct timer_service_access
895   { 895   {
HITCBC 896   7921 static timer_service& get_timer(io_context& ctx) noexcept 896   6680 static timer_service& get_timer(io_context& ctx) noexcept
897   { 897   {
HITCBC 898   7921 return *ctx.timer_svc_; 898   6680 return *ctx.timer_svc_;
899   } 899   }
900   900  
HITCBC 901   605 static void set_timer(io_context& ctx, timer_service& svc) noexcept 901   605 static void set_timer(io_context& ctx, timer_service& svc) noexcept
902   { 902   {
HITCBC 903   605 ctx.timer_svc_ = &svc; 903   605 ctx.timer_svc_ = &svc;
HITCBC 904   605 } 904   605 }
905   }; 905   };
906   906  
907   // Bypass find_service() mutex by reading io_context's cached pointer 907   // Bypass find_service() mutex by reading io_context's cached pointer
908   inline io_object::io_service& 908   inline io_object::io_service&
HITCBC 909   7921 timer_service_direct(capy::execution_context& ctx) noexcept 909   6680 timer_service_direct(capy::execution_context& ctx) noexcept
910   { 910   {
HITCBC 911   7921 return timer_service_access::get_timer(static_cast<io_context&>(ctx)); 911   6680 return timer_service_access::get_timer(static_cast<io_context&>(ctx));
912   } 912   }
913   913  
914   inline std::size_t 914   inline std::size_t
HITCBC 915   6 timer_service_update_expiry(timer::implementation& base) 915   6 timer_service_update_expiry(timer::implementation& base)
916   { 916   {
HITCBC 917   6 auto& impl = static_cast<timer_service::implementation&>(base); 917   6 auto& impl = static_cast<timer_service::implementation&>(base);
HITCBC 918   6 return impl.svc_->update_timer(impl, impl.expiry_); 918   6 return impl.svc_->update_timer(impl, impl.expiry_);
919   } 919   }
920   920  
921   inline std::size_t 921   inline std::size_t
HITCBC 922   8 timer_service_cancel(timer::implementation& base) noexcept 922   8 timer_service_cancel(timer::implementation& base) noexcept
923   { 923   {
HITCBC 924   8 auto& impl = static_cast<timer_service::implementation&>(base); 924   8 auto& impl = static_cast<timer_service::implementation&>(base);
HITCBC 925   8 return impl.svc_->cancel_timer(impl); 925   8 return impl.svc_->cancel_timer(impl);
926   } 926   }
927   927  
928   inline std::size_t 928   inline std::size_t
HITCBC 929   2 timer_service_cancel_one(timer::implementation& base) noexcept 929   2 timer_service_cancel_one(timer::implementation& base) noexcept
930   { 930   {
HITCBC 931   2 auto& impl = static_cast<timer_service::implementation&>(base); 931   2 auto& impl = static_cast<timer_service::implementation&>(base);
HITCBC 932   2 return impl.svc_->cancel_one_waiter(impl); 932   2 return impl.svc_->cancel_one_waiter(impl);
933   } 933   }
934   934  
935   inline timer_service& 935   inline timer_service&
HITCBC 936   605 get_timer_service(capy::execution_context& ctx, scheduler& sched) 936   605 get_timer_service(capy::execution_context& ctx, scheduler& sched)
937   { 937   {
HITCBC 938   605 auto& svc = ctx.make_service<timer_service>(sched); 938   605 auto& svc = ctx.make_service<timer_service>(sched);
HITCBC 939   605 timer_service_access::set_timer(static_cast<io_context&>(ctx), svc); 939   605 timer_service_access::set_timer(static_cast<io_context&>(ctx), svc);
HITCBC 940   605 return svc; 940   605 return svc;
941   } 941   }
942   942  
943   } // namespace boost::corosio::detail 943   } // namespace boost::corosio::detail
944   944  
945   #endif 945   #endif