include/boost/corosio/native/detail/epoll/epoll_socket_service.hpp

81.0% Lines (336/415) 93.3% List of functions (28/30)
f(x) Functions (30)
Function Calls Lines Branches Blocks
boost::corosio::detail::epoll_socket_state::epoll_socket_state(boost::corosio::detail::epoll_scheduler&) :97 0 100.0% boost::corosio::detail::epoll_socket_service::scheduler() const :133 0 100.0% boost::corosio::detail::epoll_socket::register_op(boost::corosio::detail::epoll_op&, boost::corosio::detail::epoll_op*&, bool&, bool&) :154 0 100.0% boost::corosio::detail::epoll_op::canceller::operator()() const :191 0 100.0% boost::corosio::detail::epoll_connect_op::cancel() :197 0 0.0% boost::corosio::detail::epoll_read_op::cancel() :206 0 80.0% boost::corosio::detail::epoll_write_op::cancel() :215 0 0.0% boost::corosio::detail::epoll_op::operator()() :224 0 87.5% boost::corosio::detail::epoll_connect_op::operator()() :253 0 95.7% boost::corosio::detail::epoll_socket::epoll_socket(boost::corosio::detail::epoll_socket_service&) :289 0 100.0% boost::corosio::detail::epoll_socket::~epoll_socket() :294 0 100.0% boost::corosio::detail::epoll_socket::connect(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::endpoint, std::stop_token, std::error_code*) :297 0 47.5% boost::corosio::detail::epoll_socket::read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :360 0 98.1% boost::corosio::detail::epoll_socket::write_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :446 0 78.4% boost::corosio::detail::epoll_socket::shutdown(boost::corosio::tcp_socket::shutdown_type) :530 0 81.2% boost::corosio::detail::epoll_socket::set_option(int, int, void const*, unsigned long) :553 0 75.0% boost::corosio::detail::epoll_socket::get_option(int, int, void*, unsigned long*) const :563 0 83.3% boost::corosio::detail::epoll_socket::cancel() :574 0 73.5% boost::corosio::detail::epoll_socket::cancel_single_op(boost::corosio::detail::epoll_op&) :624 0 65.5% boost::corosio::detail::epoll_socket::close_socket() :664 0 86.0% boost::corosio::detail::epoll_socket_service::epoll_socket_service(boost::capy::execution_context&) :726 0 100.0% boost::corosio::detail::epoll_socket_service::~epoll_socket_service() :733 0 100.0% boost::corosio::detail::epoll_socket_service::shutdown() :736 0 80.0% boost::corosio::detail::epoll_socket_service::construct() :753 0 100.0% boost::corosio::detail::epoll_socket_service::destroy(boost::corosio::io_object::implementation*) :768 0 100.0% boost::corosio::detail::epoll_socket_service::open_socket(boost::corosio::tcp_socket::implementation&, int, int, int) :778 0 94.4% boost::corosio::detail::epoll_socket_service::close(boost::corosio::io_object::handle&) :810 0 100.0% boost::corosio::detail::epoll_socket_service::post(boost::corosio::detail::epoll_op*) :816 0 100.0% boost::corosio::detail::epoll_socket_service::work_started() :822 0 100.0% boost::corosio::detail::epoll_socket_service::work_finished() :828 0 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/socket_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23
24 #include <boost/corosio/native/detail/endpoint_convert.hpp>
25 #include <boost/corosio/native/detail/make_err.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/detail/except.hpp>
28 #include <boost/capy/buffers.hpp>
29
30 #include <coroutine>
31 #include <mutex>
32 #include <unordered_map>
33 #include <utility>
34
35 #include <errno.h>
36 #include <netinet/in.h>
37 #include <netinet/tcp.h>
38 #include <sys/epoll.h>
39 #include <sys/socket.h>
40 #include <unistd.h>
41
42 /*
43 epoll Socket Implementation
44 ===========================
45
46 Each I/O operation follows the same pattern:
47 1. Try the syscall immediately (non-blocking socket)
48 2. If it succeeds or fails with a real error, post to completion queue
49 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50
51 This "try first" approach avoids unnecessary epoll round-trips for
52 operations that can complete immediately (common for small reads/writes
53 on fast local connections).
54
55 One-Shot Registration
56 ---------------------
57 We use one-shot epoll registration: each operation registers, waits for
58 one event, then unregisters. This simplifies the state machine since we
59 don't need to track whether an fd is currently registered or handle
60 re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61 simplicity is worth it.
62
63 Cancellation
64 ------------
65 See op.hpp for the completion/cancellation race handling via the
66 `registered` atomic. cancel() must complete pending operations (post
67 them with cancelled flag) so coroutines waiting on them can resume.
68 close_socket() calls cancel() first to ensure this.
69
70 Impl Lifetime with shared_ptr
71 -----------------------------
72 Socket impls use enable_shared_from_this. The service owns impls via
73 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74 removal. When a user calls close(), we call cancel() which posts pending
75 ops to the scheduler.
76
77 CRITICAL: The posted ops must keep the impl alive until they complete.
78 Otherwise the scheduler would process a freed op (use-after-free). The
79 cancel() method captures shared_from_this() into op.impl_ptr before
80 posting. When the op completes, impl_ptr is cleared, allowing the impl
81 to be destroyed if no other references exist.
82
83 Service Ownership
84 -----------------
85 epoll_socket_service owns all socket impls. destroy_impl() removes the
86 shared_ptr from the map, but the impl may survive if ops still hold
87 impl_ptr refs. shutdown() closes all sockets and clears the map; any
88 in-flight ops will complete and release their refs.
89 */
90
91 namespace boost::corosio::detail {
92
93 /** State for epoll socket service. */
94 class epoll_socket_state
95 {
96 public:
97 244x explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98 {
99 244x }
100
101 epoll_scheduler& sched_;
102 std::mutex mutex_;
103 intrusive_list<epoll_socket> socket_list_;
104 std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105 socket_ptrs_;
106 };
107
108 /** epoll socket service implementation.
109
110 Inherits from socket_service to enable runtime polymorphism.
111 Uses key_type = socket_service for service lookup.
112 */
113 class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114 {
115 public:
116 explicit epoll_socket_service(capy::execution_context& ctx);
117 ~epoll_socket_service() override;
118
119 epoll_socket_service(epoll_socket_service const&) = delete;
120 epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121
122 void shutdown() override;
123
124 io_object::implementation* construct() override;
125 void destroy(io_object::implementation*) override;
126 void close(io_object::handle&) override;
127 std::error_code open_socket(
128 tcp_socket::implementation& impl,
129 int family,
130 int type,
131 int protocol) override;
132
133 327104x epoll_scheduler& scheduler() const noexcept
134 {
135 327104x return state_->sched_;
136 }
137 void post(epoll_op* op);
138 void work_started() noexcept;
139 void work_finished() noexcept;
140
141 private:
142 std::unique_ptr<epoll_socket_state> state_;
143 };
144
145 //--------------------------------------------------------------------------
146 //
147 // Implementation
148 //
149 //--------------------------------------------------------------------------
150
151 // Register an op with the reactor, handling cached edge events.
152 // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
153 inline void
154 4477x epoll_socket::register_op(
155 epoll_op& op,
156 epoll_op*& desc_slot,
157 bool& ready_flag,
158 bool& cancel_flag) noexcept
159 {
160 4477x svc_.work_started();
161
162 4477x std::lock_guard lock(desc_state_.mutex);
163 4477x bool io_done = false;
164 4477x if (ready_flag)
165 {
166 137x ready_flag = false;
167 137x op.perform_io();
168 137x io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
169 137x if (!io_done)
170 137x op.errn = 0;
171 }
172
173 4477x if (cancel_flag)
174 {
175 92x cancel_flag = false;
176 92x op.cancelled.store(true, std::memory_order_relaxed);
177 }
178
179 4477x if (io_done || op.cancelled.load(std::memory_order_acquire))
180 {
181 92x svc_.post(&op);
182 92x svc_.work_finished();
183 }
184 else
185 {
186 4385x desc_slot = &op;
187 }
188 4477x }
189
190 inline void
191 101x epoll_op::canceller::operator()() const noexcept
192 {
193 101x op->cancel();
194 101x }
195
196 inline void
197 epoll_connect_op::cancel() noexcept
198 {
199 if (socket_impl_)
200 socket_impl_->cancel_single_op(*this);
201 else
202 request_cancel();
203 }
204
205 inline void
206 95x epoll_read_op::cancel() noexcept
207 {
208 95x if (socket_impl_)
209 95x socket_impl_->cancel_single_op(*this);
210 else
211 request_cancel();
212 95x }
213
214 inline void
215 epoll_write_op::cancel() noexcept
216 {
217 if (socket_impl_)
218 socket_impl_->cancel_single_op(*this);
219 else
220 request_cancel();
221 }
222
223 inline void
224 51050x epoll_op::operator()()
225 {
226 51050x stop_cb.reset();
227
228 51050x socket_impl_->svc_.scheduler().reset_inline_budget();
229
230 51050x if (cancelled.load(std::memory_order_acquire))
231 199x *ec_out = capy::error::canceled;
232 50851x else if (errn != 0)
233 *ec_out = make_err(errn);
234 50851x else if (is_read_operation() && bytes_transferred == 0)
235 *ec_out = capy::error::eof;
236 else
237 50851x *ec_out = {};
238
239 51050x *bytes_out = bytes_transferred;
240
241 // Move to stack before resuming coroutine. The coroutine might close
242 // the socket, releasing the last wrapper ref. If impl_ptr were the
243 // last ref and we destroyed it while still in operator(), we'd have
244 // use-after-free. Moving to local ensures destruction happens at
245 // function exit, after all member accesses are complete.
246 51050x capy::executor_ref saved_ex(ex);
247 51050x std::coroutine_handle<> saved_h(h);
248 51050x auto prevent_premature_destruction = std::move(impl_ptr);
249 51050x dispatch_coro(saved_ex, saved_h).resume();
250 51050x }
251
252 inline void
253 4282x epoll_connect_op::operator()()
254 {
255 4282x stop_cb.reset();
256
257 4282x socket_impl_->svc_.scheduler().reset_inline_budget();
258
259 4282x bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
260
261 // Cache endpoints on successful connect
262 4282x if (success && socket_impl_)
263 {
264 4279x endpoint local_ep;
265 4279x sockaddr_storage local_storage{};
266 4279x socklen_t local_len = sizeof(local_storage);
267 4279x if (::getsockname(
268 4279x fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
269 0)
270 4279x local_ep = from_sockaddr(local_storage);
271 4279x static_cast<epoll_socket*>(socket_impl_)
272 4279x ->set_endpoints(local_ep, target_endpoint);
273 }
274
275 4282x if (cancelled.load(std::memory_order_acquire))
276 *ec_out = capy::error::canceled;
277 4282x else if (errn != 0)
278 3x *ec_out = make_err(errn);
279 else
280 4279x *ec_out = {};
281
282 // Move to stack before resuming. See epoll_op::operator()() for rationale.
283 4282x capy::executor_ref saved_ex(ex);
284 4282x std::coroutine_handle<> saved_h(h);
285 4282x auto prevent_premature_destruction = std::move(impl_ptr);
286 4282x dispatch_coro(saved_ex, saved_h).resume();
287 4282x }
288
289 12901x inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
290 12901x : svc_(svc)
291 {
292 12901x }
293
294 12901x inline epoll_socket::~epoll_socket() = default;
295
296 inline std::coroutine_handle<>
297 4282x epoll_socket::connect(
298 std::coroutine_handle<> h,
299 capy::executor_ref ex,
300 endpoint ep,
301 std::stop_token token,
302 std::error_code* ec)
303 {
304 4282x auto& op = conn_;
305
306 4282x sockaddr_storage storage{};
307 socklen_t addrlen =
308 4282x detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
309 4282x int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
310
311 4282x if (result == 0)
312 {
313 sockaddr_storage local_storage{};
314 socklen_t local_len = sizeof(local_storage);
315 if (::getsockname(
316 fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
317 0)
318 local_endpoint_ = detail::from_sockaddr(local_storage);
319 remote_endpoint_ = ep;
320 }
321
322 4282x if (result == 0 || errno != EINPROGRESS)
323 {
324 int err = (result < 0) ? errno : 0;
325 if (svc_.scheduler().try_consume_inline_budget())
326 {
327 *ec = err ? make_err(err) : std::error_code{};
328 return dispatch_coro(ex, h);
329 }
330 op.reset();
331 op.h = h;
332 op.ex = ex;
333 op.ec_out = ec;
334 op.fd = fd_;
335 op.target_endpoint = ep;
336 op.start(token, this);
337 op.impl_ptr = shared_from_this();
338 op.complete(err, 0);
339 svc_.post(&op);
340 return std::noop_coroutine();
341 }
342
343 // EINPROGRESS — register with reactor
344 4282x op.reset();
345 4282x op.h = h;
346 4282x op.ex = ex;
347 4282x op.ec_out = ec;
348 4282x op.fd = fd_;
349 4282x op.target_endpoint = ep;
350 4282x op.start(token, this);
351 4282x op.impl_ptr = shared_from_this();
352
353 4282x register_op(
354 4282x op, desc_state_.connect_op, desc_state_.write_ready,
355 4282x desc_state_.connect_cancel_pending);
356 4282x return std::noop_coroutine();
357 }
358
359 inline std::coroutine_handle<>
360 127505x epoll_socket::read_some(
361 std::coroutine_handle<> h,
362 capy::executor_ref ex,
363 buffer_param param,
364 std::stop_token token,
365 std::error_code* ec,
366 std::size_t* bytes_out)
367 {
368 127505x auto& op = rd_;
369 127505x op.reset();
370
371 127505x capy::mutable_buffer bufs[epoll_read_op::max_buffers];
372 127505x op.iovec_count =
373 127505x static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
374
375 127505x if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
376 {
377 1x op.empty_buffer_read = true;
378 1x op.h = h;
379 1x op.ex = ex;
380 1x op.ec_out = ec;
381 1x op.bytes_out = bytes_out;
382 1x op.start(token, this);
383 1x op.impl_ptr = shared_from_this();
384 1x op.complete(0, 0);
385 1x svc_.post(&op);
386 1x return std::noop_coroutine();
387 }
388
389 255008x for (int i = 0; i < op.iovec_count; ++i)
390 {
391 127504x op.iovecs[i].iov_base = bufs[i].data();
392 127504x op.iovecs[i].iov_len = bufs[i].size();
393 }
394
395 // Speculative read
396 ssize_t n;
397 do
398 {
399 127504x n = ::readv(fd_, op.iovecs, op.iovec_count);
400 }
401 127504x while (n < 0 && errno == EINTR);
402
403 127504x if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
404 {
405 127309x int err = (n < 0) ? errno : 0;
406 127309x auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
407
408 127309x if (svc_.scheduler().try_consume_inline_budget())
409 {
410 101892x if (err)
411 *ec = make_err(err);
412 101892x else if (n == 0)
413 5x *ec = capy::error::eof;
414 else
415 101887x *ec = {};
416 101892x *bytes_out = bytes;
417 101892x return dispatch_coro(ex, h);
418 }
419 25417x op.h = h;
420 25417x op.ex = ex;
421 25417x op.ec_out = ec;
422 25417x op.bytes_out = bytes_out;
423 25417x op.start(token, this);
424 25417x op.impl_ptr = shared_from_this();
425 25417x op.complete(err, bytes);
426 25417x svc_.post(&op);
427 25417x return std::noop_coroutine();
428 }
429
430 // EAGAIN — register with reactor
431 195x op.h = h;
432 195x op.ex = ex;
433 195x op.ec_out = ec;
434 195x op.bytes_out = bytes_out;
435 195x op.fd = fd_;
436 195x op.start(token, this);
437 195x op.impl_ptr = shared_from_this();
438
439 195x register_op(
440 195x op, desc_state_.read_op, desc_state_.read_ready,
441 195x desc_state_.read_cancel_pending);
442 195x return std::noop_coroutine();
443 }
444
445 inline std::coroutine_handle<>
446 127312x epoll_socket::write_some(
447 std::coroutine_handle<> h,
448 capy::executor_ref ex,
449 buffer_param param,
450 std::stop_token token,
451 std::error_code* ec,
452 std::size_t* bytes_out)
453 {
454 127312x auto& op = wr_;
455 127312x op.reset();
456
457 127312x capy::mutable_buffer bufs[epoll_write_op::max_buffers];
458 127312x op.iovec_count =
459 127312x static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
460
461 127312x if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
462 {
463 1x op.h = h;
464 1x op.ex = ex;
465 1x op.ec_out = ec;
466 1x op.bytes_out = bytes_out;
467 1x op.start(token, this);
468 1x op.impl_ptr = shared_from_this();
469 1x op.complete(0, 0);
470 1x svc_.post(&op);
471 1x return std::noop_coroutine();
472 }
473
474 254622x for (int i = 0; i < op.iovec_count; ++i)
475 {
476 127311x op.iovecs[i].iov_base = bufs[i].data();
477 127311x op.iovecs[i].iov_len = bufs[i].size();
478 }
479
480 // Speculative write
481 127311x msghdr msg{};
482 127311x msg.msg_iov = op.iovecs;
483 127311x msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
484
485 ssize_t n;
486 do
487 {
488 127311x n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
489 }
490 127311x while (n < 0 && errno == EINTR);
491
492 127311x if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
493 {
494 127311x int err = (n < 0) ? errno : 0;
495 127311x auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
496
497 127311x if (svc_.scheduler().try_consume_inline_budget())
498 {
499 101875x *ec = err ? make_err(err) : std::error_code{};
500 101875x *bytes_out = bytes;
501 101875x return dispatch_coro(ex, h);
502 }
503 25436x op.h = h;
504 25436x op.ex = ex;
505 25436x op.ec_out = ec;
506 25436x op.bytes_out = bytes_out;
507 25436x op.start(token, this);
508 25436x op.impl_ptr = shared_from_this();
509 25436x op.complete(err, bytes);
510 25436x svc_.post(&op);
511 25436x return std::noop_coroutine();
512 }
513
514 // EAGAIN — register with reactor
515 op.h = h;
516 op.ex = ex;
517 op.ec_out = ec;
518 op.bytes_out = bytes_out;
519 op.fd = fd_;
520 op.start(token, this);
521 op.impl_ptr = shared_from_this();
522
523 register_op(
524 op, desc_state_.write_op, desc_state_.write_ready,
525 desc_state_.write_cancel_pending);
526 return std::noop_coroutine();
527 }
528
529 inline std::error_code
530 3x epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
531 {
532 int how;
533 3x switch (what)
534 {
535 1x case tcp_socket::shutdown_receive:
536 1x how = SHUT_RD;
537 1x break;
538 1x case tcp_socket::shutdown_send:
539 1x how = SHUT_WR;
540 1x break;
541 1x case tcp_socket::shutdown_both:
542 1x how = SHUT_RDWR;
543 1x break;
544 default:
545 return make_err(EINVAL);
546 }
547 3x if (::shutdown(fd_, how) != 0)
548 return make_err(errno);
549 3x return {};
550 }
551
552 inline std::error_code
553 32x epoll_socket::set_option(
554 int level, int optname, void const* data, std::size_t size) noexcept
555 {
556 32x if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
557 0)
558 return make_err(errno);
559 32x return {};
560 }
561
562 inline std::error_code
563 31x epoll_socket::get_option(
564 int level, int optname, void* data, std::size_t* size) const noexcept
565 {
566 31x socklen_t len = static_cast<socklen_t>(*size);
567 31x if (::getsockopt(fd_, level, optname, data, &len) != 0)
568 return make_err(errno);
569 31x *size = static_cast<std::size_t>(len);
570 31x return {};
571 }
572
573 inline void
574 181x epoll_socket::cancel() noexcept
575 {
576 181x auto self = weak_from_this().lock();
577 181x if (!self)
578 return;
579
580 181x conn_.request_cancel();
581 181x rd_.request_cancel();
582 181x wr_.request_cancel();
583
584 181x epoll_op* conn_claimed = nullptr;
585 181x epoll_op* rd_claimed = nullptr;
586 181x epoll_op* wr_claimed = nullptr;
587 {
588 181x std::lock_guard lock(desc_state_.mutex);
589 181x if (desc_state_.connect_op == &conn_)
590 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
591 else
592 181x desc_state_.connect_cancel_pending = true;
593 181x if (desc_state_.read_op == &rd_)
594 3x rd_claimed = std::exchange(desc_state_.read_op, nullptr);
595 else
596 178x desc_state_.read_cancel_pending = true;
597 181x if (desc_state_.write_op == &wr_)
598 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
599 else
600 181x desc_state_.write_cancel_pending = true;
601 181x }
602
603 181x if (conn_claimed)
604 {
605 conn_.impl_ptr = self;
606 svc_.post(&conn_);
607 svc_.work_finished();
608 }
609 181x if (rd_claimed)
610 {
611 3x rd_.impl_ptr = self;
612 3x svc_.post(&rd_);
613 3x svc_.work_finished();
614 }
615 181x if (wr_claimed)
616 {
617 wr_.impl_ptr = self;
618 svc_.post(&wr_);
619 svc_.work_finished();
620 }
621 181x }
622
623 inline void
624 95x epoll_socket::cancel_single_op(epoll_op& op) noexcept
625 {
626 95x auto self = weak_from_this().lock();
627 95x if (!self)
628 return;
629
630 95x op.request_cancel();
631
632 95x epoll_op** desc_op_ptr = nullptr;
633 95x if (&op == &conn_)
634 desc_op_ptr = &desc_state_.connect_op;
635 95x else if (&op == &rd_)
636 95x desc_op_ptr = &desc_state_.read_op;
637 else if (&op == &wr_)
638 desc_op_ptr = &desc_state_.write_op;
639
640 95x if (desc_op_ptr)
641 {
642 95x epoll_op* claimed = nullptr;
643 {
644 95x std::lock_guard lock(desc_state_.mutex);
645 95x if (*desc_op_ptr == &op)
646 95x claimed = std::exchange(*desc_op_ptr, nullptr);
647 else if (&op == &conn_)
648 desc_state_.connect_cancel_pending = true;
649 else if (&op == &rd_)
650 desc_state_.read_cancel_pending = true;
651 else if (&op == &wr_)
652 desc_state_.write_cancel_pending = true;
653 95x }
654 95x if (claimed)
655 {
656 95x op.impl_ptr = self;
657 95x svc_.post(&op);
658 95x svc_.work_finished();
659 }
660 }
661 95x }
662
663 inline void
664 38675x epoll_socket::close_socket() noexcept
665 {
666 38675x auto self = weak_from_this().lock();
667 38675x if (self)
668 {
669 38675x conn_.request_cancel();
670 38675x rd_.request_cancel();
671 38675x wr_.request_cancel();
672
673 38675x epoll_op* conn_claimed = nullptr;
674 38675x epoll_op* rd_claimed = nullptr;
675 38675x epoll_op* wr_claimed = nullptr;
676 {
677 38675x std::lock_guard lock(desc_state_.mutex);
678 38675x conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
679 38675x rd_claimed = std::exchange(desc_state_.read_op, nullptr);
680 38675x wr_claimed = std::exchange(desc_state_.write_op, nullptr);
681 38675x desc_state_.read_ready = false;
682 38675x desc_state_.write_ready = false;
683 38675x desc_state_.read_cancel_pending = false;
684 38675x desc_state_.write_cancel_pending = false;
685 38675x desc_state_.connect_cancel_pending = false;
686 38675x }
687
688 38675x if (conn_claimed)
689 {
690 conn_.impl_ptr = self;
691 svc_.post(&conn_);
692 svc_.work_finished();
693 }
694 38675x if (rd_claimed)
695 {
696 1x rd_.impl_ptr = self;
697 1x svc_.post(&rd_);
698 1x svc_.work_finished();
699 }
700 38675x if (wr_claimed)
701 {
702 wr_.impl_ptr = self;
703 svc_.post(&wr_);
704 svc_.work_finished();
705 }
706
707 38675x if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
708 97x desc_state_.impl_ref_ = self;
709 }
710
711 38675x if (fd_ >= 0)
712 {
713 8576x if (desc_state_.registered_events != 0)
714 8576x svc_.scheduler().deregister_descriptor(fd_);
715 8576x ::close(fd_);
716 8576x fd_ = -1;
717 }
718
719 38675x desc_state_.fd = -1;
720 38675x desc_state_.registered_events = 0;
721
722 38675x local_endpoint_ = endpoint{};
723 38675x remote_endpoint_ = endpoint{};
724 38675x }
725
726 244x inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
727 244x : state_(
728 std::make_unique<epoll_socket_state>(
729 244x ctx.use_service<epoll_scheduler>()))
730 {
731 244x }
732
733 488x inline epoll_socket_service::~epoll_socket_service() {}
734
735 inline void
736 244x epoll_socket_service::shutdown()
737 {
738 244x std::lock_guard lock(state_->mutex_);
739
740 244x while (auto* impl = state_->socket_list_.pop_front())
741 impl->close_socket();
742
743 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
744 // drains completed_ops_, calling destroy() on each queued op. If we
745 // released our shared_ptrs now, an epoll_op::destroy() could free the
746 // last ref to an impl whose embedded descriptor_state is still linked
747 // in the queue — use-after-free on the next pop(). Letting ~state_
748 // release the ptrs (during service destruction, after scheduler
749 // shutdown) keeps every impl alive until all ops have been drained.
750 244x }
751
752 inline io_object::implementation*
753 12901x epoll_socket_service::construct()
754 {
755 12901x auto impl = std::make_shared<epoll_socket>(*this);
756 12901x auto* raw = impl.get();
757
758 {
759 12901x std::lock_guard lock(state_->mutex_);
760 12901x state_->socket_list_.push_back(raw);
761 12901x state_->socket_ptrs_.emplace(raw, std::move(impl));
762 12901x }
763
764 12901x return raw;
765 12901x }
766
767 inline void
768 12901x epoll_socket_service::destroy(io_object::implementation* impl)
769 {
770 12901x auto* epoll_impl = static_cast<epoll_socket*>(impl);
771 12901x epoll_impl->close_socket();
772 12901x std::lock_guard lock(state_->mutex_);
773 12901x state_->socket_list_.remove(epoll_impl);
774 12901x state_->socket_ptrs_.erase(epoll_impl);
775 12901x }
776
777 inline std::error_code
778 4297x epoll_socket_service::open_socket(
779 tcp_socket::implementation& impl, int family, int type, int protocol)
780 {
781 4297x auto* epoll_impl = static_cast<epoll_socket*>(&impl);
782 4297x epoll_impl->close_socket();
783
784 4297x int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
785 4297x if (fd < 0)
786 return make_err(errno);
787
788 4297x if (family == AF_INET6)
789 {
790 5x int one = 1;
791 5x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
792 }
793
794 4297x epoll_impl->fd_ = fd;
795
796 // Register fd with epoll (edge-triggered mode)
797 4297x epoll_impl->desc_state_.fd = fd;
798 {
799 4297x std::lock_guard lock(epoll_impl->desc_state_.mutex);
800 4297x epoll_impl->desc_state_.read_op = nullptr;
801 4297x epoll_impl->desc_state_.write_op = nullptr;
802 4297x epoll_impl->desc_state_.connect_op = nullptr;
803 4297x }
804 4297x scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
805
806 4297x return {};
807 }
808
809 inline void
810 21477x epoll_socket_service::close(io_object::handle& h)
811 {
812 21477x static_cast<epoll_socket*>(h.get())->close_socket();
813 21477x }
814
815 inline void
816 51046x epoll_socket_service::post(epoll_op* op)
817 {
818 51046x state_->sched_.post(op);
819 51046x }
820
821 inline void
822 4477x epoll_socket_service::work_started() noexcept
823 {
824 4477x state_->sched_.work_started();
825 4477x }
826
827 inline void
828 191x epoll_socket_service::work_finished() noexcept
829 {
830 191x state_->sched_.work_finished();
831 191x }
832
833 } // namespace boost::corosio::detail
834
835 #endif // BOOST_COROSIO_HAS_EPOLL
836
837 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
838