ThreadPool相关
本章我们将把EventLoop和C++11的thread结合, 实现Thread / EventLoopThread / EventLoopThreadPool.
首先我们应当明晰将创建的三个类的意义何在 :
Thread : 这是对thread库的封装, 会增加对各种线程运行状态与数据的维护.
EventLoopThread :
将Thread和EventLoop组合, 真正实现 one loop per thread 思想的地方, 最终达成的效果就是创建一个线程并在线程中创建一个loop并运行该loop.
EventLoopThreadPool :
学到这里线程池的作用应该都知道, 就是提前创建线程以减少运行开销, 这里就是开一个池填充EventLoopThread, 我们最后的TcpServer中就会维护这样一个线程池存放subLoop.
Thread
对C++11的thread库封装 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| #pragma once
#include "UnCopyable.h" #include <functional> #include <thread> #include <memory> #include <unistd.h> #include <atomic> #include <string>
class Thread : UnCopyable { public: using ThreadFunc = std::function<void()>;
explicit Thread(ThreadFunc, const std::string &name = std::string()); ~Thread();
void start(); void join();
bool started() const { return _started; } pid_t tid() const { return _tid; } const std::string &name() { return _name; }
private: void setDefaultName();
bool _started; bool _joined; std::shared_ptr<std::thread> _thread; pid_t _tid; ThreadFunc _func; std::string _name; static std::atomic_int32_t _numCreated; };
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| #include "Thread.h" #include "CurrentThread.h"
#include <semaphore.h>
std::atomic_int Thread::_numCreated(0);
Thread::Thread(ThreadFunc func, const std::string &name) : _started(false), _joined(false), _tid(0), _func(std::move(func)), _name(name) { setDefaultName(); }
Thread::~Thread() { if (_started && !_joined) _thread->detach(); }
void Thread::start() { _started = true; sem_t sem; sem_init(&sem, false, 0); _thread = std::make_shared<std::thread>([this, &sem](){ _tid = CurrentThread::tid(); sem_post(&sem); _func(); }); sem_wait(&sem); }
void Thread::join() { _joined = true; _thread->join(); }
void Thread::setDefaultName() { int num = ++_numCreated; if (_name.empty()) { char buf[32] = {0}; snprintf(buf, sizeof buf, "Thread%d", num); _name = buf; } }
|
进行封装的主要目的除了进行信息记录之外, 就是可以控制创建线程的时机. 对于C++11thread库中的thread类来说, 只要创建出对象就代表了线程开启, 这样就没有进行初始化准备的时间了, 我们在封装中用智能指针维护thread类, 在start中才真正传入回调函数并创建thread对象开启线程.
这里还需要注意的一点就是为了在Thread对象中维护线程的tid, 需要使用条件变量进行线程间通信, 以获取到创建线程的tid并存储在Thread对象中.
这里补充一下CurrentThread类, 其可以获取当前线程的tid, 在上一章的EventLoop中也有使用过, 用来判断是不是当前进程 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| #pragma once namespace CurrentThread { extern __thread int t_cachedTid;
void cacheTid();
inline int tid() { if (__builtin_expect(t_cachedTid == 0, 0)) cacheTid(); return t_cachedTid; } }
#include "CurrentThread.h" #include <sys/syscall.h> #include <unistd.h>
namespace CurrentThread { __thread int t_cachedTid = 0; void cacheTid() { if (t_cachedTid == 0) { t_cachedTid = static_cast<pid_t>(::syscall(SYS_gettid)); } } }
|
EventLoopThread
EventLoopThread类内部提供了线程函数来实现构建一个EventLoop并运行的效果.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| #pragma once
#include "UnCopyable.h" #include "EventLoop.h" #include "Thread.h"
#include <functional> #include <mutex> #include <condition_variable>
class EventLoopThread : UnCopyable { public: using ThreadInitCallback = std::function<void(EventLoop *)>; EventLoopThread(const ThreadInitCallback &cb = ThreadInitCallback(), const std::string &name = std::string()); ~EventLoopThread();
EventLoop *startLoop();
private: void threadFunc();
EventLoop *_loop; bool _exiting; Thread _thread; std::mutex _mutex; std::condition_variable _cond; ThreadInitCallback _cb; };
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| #include "EventLoopThread.h"
EventLoopThread::EventLoopThread(const ThreadInitCallback &cb, const std::string &name) : _loop(nullptr) , _exiting(false) , _thread(std::bind(&EventLoopThread::threadFunc, this), name) , _mutex() , _cond() , _cb(cb) {}
EventLoopThread::~EventLoopThread() { _exiting = true; if (_loop != nullptr) { _loop->quit(); _thread.join(); } }
EventLoop *EventLoopThread::startLoop() { _thread.start();
EventLoop *loop = nullptr; { std::unique_lock<std::mutex> lock(_mutex); while (_loop == nullptr) { _cond.wait(lock); } loop = _loop; } return loop; }
void EventLoopThread::threadFunc() { EventLoop loop; if (_cb) _cb(&loop);
{ std::unique_lock<std::mutex> lock(_mutex); _loop = &loop; _cond.notify_one(); }
loop.loop(); std::unique_lock<std::mutex> lock(_mutex); _loop = nullptr; }
|
我们来梳理一遍EventLoopThread的使用流程 :
已知其上层还有EventLoopThreadPool会存入很多EventLoopThread, 并且其内部也会记录每个EventLoopThread中loop的指针, 那么实际调用流程就是 :
EventLoopThreadPool创建EventLoopThread对象.
调用startLoop函数, 创建线程并执行threadFunc.
threadFunc中, 在栈上构造loop并将_loop修改, 用条件变量唤醒startLoop.
startLoop将获取到的_loop作为返回值传出.
而这个返回值将被存入EventLoopThreadPool的_loops中.
EventLoopThreadPool
EventLoopThreadPool会构建多个EventLoopThread并储存, 而在Muduo库中是通过轮询的方式实现subLoopd的选择的, 也就是说这个线程池内部提供了轮询方法将loop指针提供出去.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| #pragma once #include "UnCopyable.h" #include "EventLoopThread.h"
#include <functional> #include <string> #include <vector> #include <memory>
class EventLoop; class EventLoopThreadPool : UnCopyable { public: using ThreadInitCallback = std::function<void(EventLoop *)>;
EventLoopThreadPool(EventLoop *mainloop, const std::string &name); ~EventLoopThreadPool();
void setThreadNum(int numThreads) { _numThreads = numThreads; }
void start(const ThreadInitCallback &cb = ThreadInitCallback());
EventLoop *getNextLoop(); std::vector<EventLoop *> getAllLoops(); bool started() const { return _started; } const std::string name() const { return _name; }
private: EventLoop *_mainLoop; std::string _name; bool _started; int _numThreads; int _next; std::vector<std::unique_ptr<EventLoopThread>> _threads; std::vector<EventLoop *> _loops; };
|
先来介绍一下成员变量 :
- _mainLoop : 这就是主线程构建的loop, 也就是我们作为用户一开始向TcpServer手动传入的loop的指针, 在EventLoopThreadPool中其存在价值在于当线程池中创建线程数为0时, 作为替补方案用来执行subLoop的工作.
- _numThreads : 我们可以通过setThreadNum来设置线程池中需要有多少个线程, 也就是有多少个subLoop.
- _threads : 存储线程对象的vector.
- _loops : 存储每个线程中构建的loop指针.
- _next : 记录当前轮询到 _loop中的哪一个.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| #include "EventLoopThreadPool.h" #include "EventLoopThread.h"
#include <memory>
EventLoopThreadPool::EventLoopThreadPool(EventLoop *mainloop, const std::string &name) : _mainLoop(mainloop), _name(name), _started(false), _numThreads(0), _next(0) { }
EventLoopThreadPool::~EventLoopThreadPool() { }
void EventLoopThreadPool::start(const ThreadInitCallback &cb) { _started = true;
for (int i = 0; i < _numThreads; i++) { char buf[_name.size() + 32]; snprintf(buf, sizeof buf, "%s%d", _name.c_str(), i); EventLoopThread *t = new EventLoopThread(cb, buf); _threads.push_back(std::unique_ptr<EventLoopThread>(t)); _loops.push_back(t->startLoop()); }
if (_numThreads == 0 && cb) { cb(_mainLoop); } }
EventLoop *EventLoopThreadPool::EventLoopThreadPool::getNextLoop() { EventLoop *loop = _mainLoop;
if (!_loops.empty()) { loop = _loops[_next]; ++_next; if (_next >= _loops.size()) _next = 0; }
return loop; }
std::vector<EventLoop *> EventLoopThreadPool::EventLoopThreadPool::getAllLoops() { if (_loops.empty()) return std::vector<EventLoop *>(1, _mainLoop); else return _loops; }
|
总共有两个重要的函数要解释 :
start :
可以见得其依据_numThreads的数目构建EventLoopThread, 将创建出的对象存入 _threads, 然后再调用该对象的startLoop方法, 开启线程中的事件循环, 将其返回的loop指针存入 _loops.
getNextLoop :
这个函数将供给上层TcpServer调用, 以轮询方式提供出一个subLoop的指针, TcpServer则会将利用runInLoop将任务发配到该subLoop上, 当然如果一个subLoop都没有, 就还由mainLoop执行这些任务.
by 天目中云