主要学习asio网络库有关异步方面的使用
这里先给出一个最简便的TCP异步echo服务器 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 #include <boost/asio.hpp> #include <iostream> #include <memory> using boost::asio::ip::tcp;class Session : public std::enable_shared_from_this<Session> {public : Session (tcp::socket socket) : socket_ (std::move (socket)) {} void start () { do_read (); }private : void do_read () { auto self (shared_from_this()) ; socket_.async_read_some (boost::asio::buffer (data_, max_length), [this , self](boost::system::error_code ec, std::size_t length) { if (!ec) do_write (length); }); } void do_write (std::size_t length) { auto self (shared_from_this()) ; boost::asio::async_write (socket_, boost::asio::buffer (data_, length), [this , self](boost::system::error_code ec, std::size_t ) { if (!ec) do_read (); }); } tcp::socket socket_; enum { max_length = 1024 }; char data_[max_length]; };class Server {public : Server (boost::asio::io_context &io_context, short port) : acceptor_ (io_context, tcp::endpoint (tcp::v4 (), port)) { do_accept (); }private : void do_accept () { acceptor_.async_accept ( [this ](boost::system::error_code ec, tcp::socket socket) { if (!ec) std::make_shared <Session>(std::move (socket))->start (); do_accept (); }); } tcp::acceptor acceptor_; };int main () { try { boost::asio::io_context io_context; Server server (io_context, 12345 ) ; io_context.run (); } catch (std::exception &e) { std::cerr << "Exception: " << e.what () << "\n" ; } return 0 ; }
main部分 :
io_context : 事件循环核心句柄, 通过调用run来开始运作.
Server : 服务器类, 使用acceptor接受并处理连接.
acceptor : 这里网络库自带的acceptor类, 只要我们把句柄和绑定需要的数据传入, 在其构造函数中就会自动调用socket + bind + listen.
endpoint : 这就是一个存放用于绑定数据的类.
async_accept : 在新连接建立时, 会调用传入的回调函数, 参数要求为(boost::system::error_code ec, tcp::socket socket), 这样就可以取出socket进行操作, 这里调用了Session类的start, 会针对连接进行处理.
Session : 会话类, 处理每一条的连接的读写.
async_read_some : 在读事件触发时向准备好的缓冲区存入读到的数据, 然后触发其中给出的回调函数, 可以在回调中进行数据处理, echo服务器所以直接调用do_write准备发回.
async_write : 传入目标socket, 数据缓冲区, 回调函数, 就可以在发送完数据后调用回调再度调用do_read进入读事件等待阶段.
有关异步使用递归回调的设计哲学 在同步Reactor模型中, 处理事件循环我们往往会使用while直接处理, 就像下面这样 :
1 2 3 4 5 6 while (true ) { read (); process (); write (); }
但是while在异步中是使用不了的, 异步强调的是控制流的保持, 其没有任何地方是会阻塞的, 核心全在于回调函数如何处理, 假如我们要让逻辑实现一个闭环, 那就要使用递归回调实现类似于循环的效果.
1 2 3 4 5 6 async_read (..., [](...) { process (); async_write (..., [](...) { again_read (); }); });
上面的echo就是这样的逻辑, 读完进行处理, 处理完发送, 发送完再注册读事件, 这样就类似于实现了一个天然的状态机, 并且可以使线程资源最大化.
enable_shared_from_this 我们可以看到Session类继承了该类, 其作用在于可以使成员函数内部调用shared_from_this()
, 该函数会返回当前类对象的智能指针, 其实就是对this进行了封装.
其作用的核心在于延长对象的生命周期 . 这里应用到异步中, 就是可以使构建出来的Session对象始终不被析构, 就算没有实际的对象去管理它们. 因为我们的self对象会一直被lambda捕获, 其内部Session对象就不会被析构. 这里要明白一个知识 : 只要 lambda 回调函数中捕获了 shared_ptr
(哪怕它还没执行),对象的引用计数就会保持 +1 .
多线程版本 上面只是单线程版本的, 但是并发量由于异步存在已经足够高, 几万并发没什么问题, 当然我们也可以写多线程版本的, 其实只要再main函数中做修改就可以了 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 int main () { try { boost::asio::io_context io_context; Server server (io_context, 12345 ) ; int thread_count = std::thread::hardware_concurrency (); std::cout << "Start with " << thread_count << " threads\n" ; std::vector<std::thread> threads; for (int i = 0 ; i < thread_count; ++i) { threads.emplace_back ([&io_context]() { io_context.run (); }); } for (auto &t : threads) t.join (); } catch (std::exception &e) { std::cerr << "Exception: " << e.what () << "\n" ; } return 0 ; }
io_context作为核心句柄, 你可以理解其内部存在一个线程安全的消息队列, 用其在不同线程调用run(), 其实是让每个线程都可以从自己的消息队列中利用互斥锁线程安全地获取任务.
当然如果学过muduo, 我们也可以采用one loop per thread
的思想, 一个事件循环对应一个线程, 这样就可以避免消息队列中的锁竞争了, 下面是多io_context多线程版本 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 #include <boost/asio.hpp> #include <iostream> #include <memory> #include <thread> #include <vector> using boost::asio::ip::tcp;class Session : public std::enable_shared_from_this<Session> {public : Session (tcp::socket socket) : socket_ (std::move (socket)) {} void start () { do_read (); }private : void do_read () { auto self = shared_from_this (); socket_.async_read_some (boost::asio::buffer (data_), [this , self](boost::system::error_code ec, std::size_t length) { if (!ec) do_write (length); }); } void do_write (std::size_t length) { auto self = shared_from_this (); boost::asio::async_write (socket_, boost::asio::buffer (data_, length), [this , self](boost::system::error_code ec, std::size_t ) { if (!ec) do_read (); }); } tcp::socket socket_; enum { max_length = 1024 }; char data_[max_length]; };struct Worker { boost::asio::io_context io_context; boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard; std::thread thread; Worker () : work_guard (boost::asio::make_work_guard (io_context)) {} void run () { thread = std::thread ([this ]() { io_context.run (); }); } void stop () { io_context.stop (); if (thread.joinable ()) thread.join (); } };class Server {public : Server (boost::asio::io_context &main_io_context, short port, std::vector<std::shared_ptr<Worker>> &workers) : acceptor_ (main_io_context, tcp::endpoint (tcp::v4 (), port)), workers_ (workers) { do_accept (); }private : void do_accept () { acceptor_.async_accept ( [this ](boost::system::error_code ec, tcp::socket socket) { if (!ec) { auto &worker = workers_[next_worker_]; boost::asio::post (worker->io_context, [sock = std::move (socket)]() mutable { std::make_shared <Session>(std::move (sock))->start (); }); next_worker_ = (next_worker_ + 1 ) % workers_.size (); } do_accept (); }); } tcp::acceptor acceptor_; std::vector<std::shared_ptr<Worker>> &workers_; std::size_t next_worker_ = 0 ; };int main () { try { const int thread_count = std::thread::hardware_concurrency (); std::cout << "Starting server with " << thread_count << " threads\n" ; boost::asio::io_context main_io_context; std::vector<std::shared_ptr<Worker>> workers; for (int i = 0 ; i < thread_count; ++i) { auto w = std::make_shared <Worker>(); w->run (); workers.push_back (w); } Server server (main_io_context, 12345 , workers) ; main_io_context.run (); for (auto &w : workers) w->stop (); } catch (std::exception &e) { std::cerr << "Exception: " << e.what () << "\n" ; } return 0 ; }
这里的核心修改在Server和Worker中, Session中没有变动.
协程版本 asio网络库也支持协程, 其配合C++20的协程可以写出更加简短的同步风格的异步代码 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 #include <boost/asio.hpp> #include <boost/asio/awaitable.hpp> #include <boost/asio/co_spawn.hpp> #include <boost/asio/use_awaitable.hpp> #include <boost/asio/detached.hpp> #include <iostream> using boost::asio::ip::tcp;using namespace boost::asio;using namespace std::literals;awaitable<void > echo_session (tcp::socket socket) { try { char data[1024 ]; for (;;) { std::size_t n = co_await socket.async_read_some (buffer (data), use_awaitable); co_await async_write (socket, buffer(data, n), use_awaitable) ; } } catch (std::exception &e) { std::cerr << "Session ended: " << e.what () << "\n" ; } }awaitable<void > listener (uint16_t port) { auto executor = co_await this_coro::executor; tcp::acceptor acceptor (executor, tcp::endpoint(tcp::v4(), port)) ; for (;;) { tcp::socket socket = co_await acceptor.async_accept (use_awaitable); co_spawn (executor, echo_session (std::move (socket)), detached); } }int main () { try { io_context io; co_spawn (io, listener (12345 ), detached); io.run (); } catch (const std::exception &e) { std::cerr << "Server error: " << e.what () << "\n" ; } return 0 ; }
可以看到代码非常短, 并且和同步代码风格非常相似, 但也有很多要点需要解读 :
如果没有学过C++20协程编程可以看我往期的博客, 不然无法理解里面co_await的用法.
co_spawn(事件循环句柄, 协程函数, 结束方式) :
本函数用来将一个协程注册进入事件循环的executor(调度器)中, 其会先执行协程函数, 然后其中promise_type中起始回调设置的是suspend_always, 在开始就会暂停, 然后调用器会发布任务到事件循环中resume这个协程函数.
auto executor = co_await this_coro::executor;
获取绑定当前协程的executor, 可以当作固定不变的一步, 因为我们需要executor来构建acceptor.
acceptor :
这里构建acceptor的构造可以理解为调用了其针对协程的特殊版本, 用executor来告诉其新连接建立时应把任务提交给对应的io_context.
async_accept(use_awaitable) :
我们可以看到这里async_accept
有对应的协程版本, 传入的use_awaitable
代表将协程挂起, 其内部其实也会调用普通版本的async_accept
, 只不过是在挂起前调用的回调函数中调用并进行异步处理, 在回调触发后再调用resume唤醒协程, 这些asio库都会在内部帮我们实现. 后面的async_read_some
和async_write
同理.
strand(串行化) 它就是对executor的进一步封装 , 也就是特殊的调度器. 用这个调度器发布的任务会串行执行 , 也就是说其真正作用是帮助协程间共享内存 , 是一种另类高效的协程间通信 , 甚至你可以理解为一种另类的锁 .
下面是一个使用共享map实现name记录的echo服务器, 我们只需要关系里面有关strand和map的内容即可 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 #include <boost/asio.hpp> #include <boost/asio/awaitable.hpp> #include <boost/asio/co_spawn.hpp> #include <boost/asio/detached.hpp> #include <boost/asio/use_awaitable.hpp> #include <boost/asio/strand.hpp> #include <iostream> #include <unordered_map> #include <memory> #include <atomic> #include <string> #include <vector> #include <thread> using namespace boost::asio;using namespace boost::asio::ip;using namespace std::literals;using awaitable_void = awaitable<void >;using executor_t = any_io_executor; std::unordered_map<int , std::string> nicknames; std::shared_ptr<strand<executor_t >> strand_ptr; std::atomic<int > connection_id{0 };awaitable_void cleanup_nickname (int id) { co_await boost::asio::post (*strand_ptr, use_awaitable); nicknames.erase (id); co_return ; }awaitable_void handle_client (tcp::socket socket) { auto executor = co_await this_coro::executor; int id = connection_id.fetch_add (1 ); char data[1024 ]; std::string nickname; try { std::string prompt = "请输入昵称: " ; co_await async_write (socket, buffer(prompt), use_awaitable) ; std::size_t n = co_await socket.async_read_some (buffer (data), use_awaitable); nickname = std::string (data, data + n - 1 ); co_await boost::asio::post (*strand_ptr, use_awaitable); nicknames[id] = nickname; std::string hello = "你好," + nickname + "!你现在可以发消息,我会帮你回显。\n" ; co_await async_write (socket, buffer(hello), use_awaitable) ; for (;;) { std::size_t len = co_await socket.async_read_some (buffer (data), use_awaitable); std::string input (data, data + len) ; std::string name; co_await boost::asio::post (*strand_ptr, use_awaitable); name = nicknames[id]; std::string msg = "[" + name + "] 你说的是:" + input; co_await async_write (socket, buffer(msg), use_awaitable) ; } } catch (...) { co_spawn (executor, cleanup_nickname (id), detached); } }awaitable_void listener (uint16_t port) { auto executor = co_await this_coro::executor; tcp::acceptor acceptor (executor, {tcp::v4(), port}) ; for (;;) { tcp::socket socket = co_await acceptor.async_accept (use_awaitable); co_spawn (executor, handle_client (std::move (socket)), detached); } }int main () { try { io_context io; strand_ptr = std::make_shared<strand<executor_t >>(io.get_executor ()); co_spawn (io, listener (12345 ), detached); std::vector<std::thread> threads; int n = std::thread::hardware_concurrency (); for (int i = 0 ; i < n; ++i) { threads.emplace_back ([&io]() { io.run (); }); } for (auto &t : threads) t.join (); } catch (const std::exception &e) { std::cerr << "Server error: " << e.what () << "\n" ; } return 0 ; }
std::shared_ptr<strand<executor_t>> strand_ptr;
在全局设置一个串行化调度器的智能指针.
strand_ptr = std::make_shared<strand<executor_t>>(io.get_executor());
取出io_context对应的executor.
co_await boost::asio::post(*strand_ptr, use_awaitable);
在协程函数中, 所有有关全局变量map的操作前都会加这么一段代码, 其post的任务并没有什么实际作用, 其目的只在于让后面的代码串行化执行 , 以此防止共享资源的竞争. 当我们把这段代码当成一个锁来看, 就会豁然开朗许多, 那么这个锁的范围是什么? 答案是下一次挂起(co_await)或最后返回(co_return)时 , 在这之后才会串行化执行后面的任务.
当然这里只是利用strand实现协程间资源共享, 普通的线程间资源共享也是可以实现的 :
1 2 3 boost::asio::post (strand, [key, value]() { shared_map[key] = value; });
boost::asio的优势
支持回调, 协程等多种异步模型.
跨平台
对于C++20协程原生支持
有strand实现多线程多协程下的资源共享