核心操作
线程创建
1 2 3 4 5 6 7 8
| std::thread t1(print_hello, 1);
std::thread t([]() { std::cout << "Hello from lambda thread!" << std::endl; });
std::thread t(calculate_sum, 3, 5, print_result);
|
线程等待
为什么需要线程等待?
目的在于接收信息和同步.
join中并没有实际和接收信息相关的操作, 信息交流是靠其他几种线程间通信的方式来实现的. 关键在于如果不用join, 将无法安全使用线程间通信得来的信息, 因为如果不等待子线程运行结束, 主线程和子线程并发运行, 主线程没有办法确定什么时候子线程会得到结果并发送过来, 简单来说就是无法实现同步.
可以通过joinable()函数来确定线程是否可join而未被分离.
线程分离
线程分离就是为了实现和线程等待相反的目的, 也就是不需要同步的情况, 当代码不需要和主线程互通有无的时候便可以调用detach, 让线程自己干自己的去.
线程间通信
这里线程间通信的定义其实很广泛, 可以细化为线程间权限申请, 数据交互, 函数调用等.
锁
1 2 3 4
| mutex mtx; mtx.lock();
mtx.unlock();
|
1 2 3 4 5
| mutex mtx; { lock_guard<std::mutex> lock(mtx); }
|
条件变量
1 2
| template <typename Predicate> void wait(std::unique_lock<std::mutex>& lock, Predicate pred);
|
条件变量需要和互斥锁绑定, 并且需要条件撑腰, 在等待时是否原先持有的锁, 被唤醒时重新取回拥有的锁并进行之后的代码, 有两种方式判断条件 :
- 直接套在while循环里, 只有符合条件才能离开循环, 这里循环不是意味着要频繁判断, wait本身是阻塞的, 而是为了避免虚假唤醒的情况, 可以理解为系统问题.
1 2 3 4 5 6 7 8 9 10
| std::mutex mtx; std::condition_variable cv;
std::unique_lock<std::mutex> lck(mtx); while (!ready) { cv.wait(lck); }
cv.notify_all(); cv.notify_one();
|
使用wait的第二个参数, 传入一个返回值为bool类型的回调函数, 官方称其为谓词.
谓词只在第一次触发wait和被唤醒之后发生, 如果谓词返回值为true, 则继续后面的代码, 反之继续阻塞挂起.
1 2 3
| condition.wait(lock, [this] { return stop || !taskQueue.empty(); });
|
其实底层就是对C风格互斥锁和条件变量的封装.
共享内存
这是一个非常宽泛的概念, 我们知道一个进程的不同线程间数据块,堆都是共享的, 所以在其中申请的资源都是共享内存, 锁也是为了应对使用共享内存的情况而被设计的.
消息队列
其实就是STL中的queue, 这个容器其实非常强大, 因为其是线程安全的, 例如我们就可以直接在生产者线程中将产物push进queue, 然后直接在消费者线程中从queue中取出, 这其中没有任何线程安全问题!
future和promise
future可以得到未来从其他线程传来的一个结果.
这里只介绍future在线程中的普通用法, 一般和promise共用.
- future : 未来, 用于表示在未来这里会获取一个结果.
- promise : 承诺, 可以和future关联, 承诺会设置future要获取的结果.
1 2 3 4 5 6 7 8 9 10 11
| std::promise<int> prom; std::future<int> fut = prom.get_future();
std::thread t([&prom]() { int result = 42; prom.set_value(result); });
int value = fut.get(); t.join(); std::cout << "Result: " << value << "\n";
|
async : async
是 asynchronous 的缩写,表示“异步”的意思. 不过异步这个意义很宽泛, 很多都可以被视为异步. 有些异步相对复杂, 比如异步I/O, 有些有十分简单, 比如我们现在学的async.
其实它的实际功效就是上面future + promise + thread的统合, 可以理解为和future强绑定的thread.
可以向async中传入一个回调函数, async会直接在一个新线程中运行它, 会返回一个future对象, 新线程运行结束会将结果设置到future对象中.
1 2 3
| template< class F, class... Args > std::future<std::invoke_result_t<F, Args...>> async(F&& f, Args&&... args);
|
- f : 用于确定async的启动策略, 一般写
std::launch::async
, 代表强制任务在新线程中执行.
- args : 可变参数, 传入回调函数需要的参数.
1 2 3 4 5 6 7 8
| std::future<int> fut = std::async(std::launch::async, []() { int result = 42; return result; });
int value = fut.get(); std::cout << "Result: " << value << "\n";
|
共享内存 vs 消息队列 vs future
这三个其实都可以做到线程间传递数据, 但是应用场景不同.
- 共享内存 : 用于多数据, 需求复杂的情况, 需要互斥锁和条件变量维护.
- 消息队列 : 适合生产者消费者模型, 简单易用, 线程安全, 但是需要排队, 可能需要条件变量配合.
- future : 只传递单数据, 简单易用, 并且线程安全不需要考虑额外内容.
atomic
原子操作本身和线程间通信没关系, 只是它本身就是为了服务多线程环境的一种语法, 可以减少线程间通信的发生, 取消部分资源对互斥锁的使用, 毕竟它是原子的, 根本不需要锁.
1 2 3 4 5 6 7 8 9 10 11
| std::atomic<int> x{0}; x.store(10); int value = x.load(); int old1 = x.fetch_add(1); int old2 = x.fetch_sub(1); int old_value = x.exchange(20);
bool success = x.compare_exchange_strong(expected, desired); int old3 = x.fetch_and(0xFF); int old4 = x.fetch_or(0xFF); int old5 = x.fetch_xor(0xFF);
|
需要注意的atomic的模板类必须是普通类型, 还有一种情况是 : 在一个自定义类型中全是普通类型允许填入, 术语叫做POD.
this_thread
这是 C++11 引入的一个命名空间,用来提供一些和当前线程相关的操作.
1 2 3 4 5 6 7 8 9
| void sleep_for(const std::chrono::duration& d); std::this_thread::sleep_for(std::chrono::seconds(2));
void sleep_until(const std::chrono::time_point& t); auto wakeup_time = std::chrono::system_clock::now() + std::chrono::seconds(3); std::this_thread::sleep_until(wakeup_time);
void yield(); std::this_thread::yield();
|
thread的移动语义
线程对象不可复制, 因为线程本身就不好复制, 从底层来讲thread类的拷贝构造函数本身就是删除的.
但是thread类可以被移动, 其实就相当于转移控制权, 可以使用move转换.
假如我们想把多个线程存入一个vector中, 可以使用emplace_back在vector中直接构建, 也可以选择在外部构建然后用move移动进去.
1 2
| std::thread t(task); threads.push_back(std::move(t));
|
简易线程池
AI生成仅供学习 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| #include <iostream> #include <thread> #include <vector> #include <queue> #include <functional> #include <mutex> #include <condition_variable> #include <atomic>
class ThreadPool { public: ThreadPool(size_t numThreads) { for (size_t i = 0; i < numThreads; ++i) { workers.emplace_back([this] { while (true) { std::function<void()> task; { std::unique_lock<std::mutex> lock(queueMutex); condition.wait(lock, [this] { return stop || !taskQueue.empty(); }); if (stop && taskQueue.empty()) { return; } task = std::move(taskQueue.front()); taskQueue.pop(); } task(); } }); } }
template <class F> void enqueue(F&& f) { { std::unique_lock<std::mutex> lock(queueMutex); if (stop) { throw std::runtime_error("ThreadPool is stopped"); } taskQueue.push(std::forward<F>(f)); } condition.notify_one(); }
void stopPool() { { std::unique_lock<std::mutex> lock(queueMutex); stop = true; } condition.notify_all();
for (std::thread& worker : workers) { worker.join(); } }
~ThreadPool() { if (!stop) { stopPool(); } }
private: std::vector<std::thread> workers; std::queue<std::function<void()>> taskQueue; std::mutex queueMutex; std::condition_variable condition; std::atomic<bool> stop{false}; };
int main() { ThreadPool pool(4); for (int i = 0; i < 10; ++i) { pool.enqueue([i] { std::cout << "Task " << i << " is being executed by thread " << std::this_thread::get_id() << std::endl; }); } pool.stopPool(); return 0; }
|