Cpp_Muti_Threaded

🔱 C++ 多线程编程

[TOC]

🧶 1. 进程和线程的基本概念

1.1.1 进程和线程的概念

进程是系统中正在运行的一个程序,程序一旦运行就是进程。进程可以看成程序执行的一个实例。进程是系统资源分配的独立实体,每个进程都拥有独立的地址空间。一个进程无法访问另一个进程的变量和数据结构,如果想让一个进程访问另一个进程的资源,需要使用进程间通信,比如管道、文件、套接字等。

线程是进程的进程。一个进程可以拥有多个线程,每个线程使用其所属进程的栈空间。线程的最大数量取决于CPU的核心数。

线程与进程的一个主要区别是,同一进程内的多个线程会共享部分状态,多个线程可以读写同一块内存(一个进程无法直接访问另一进程的内存)。同时,每个线程还拥有自己的寄存器和栈,其他线程可以读写这些栈内存。

线程是进程的一个实体,是进程的一条执行路径。

线程是进程的一个特定执行路径。当一个线程修改了进程的资源,它的兄弟线程可以立即看到这种变化。

1.1.2 并发和并行的概念

并发(Concurrent):并发指在同一时间段内,多个任务都在进行中,但并不一定同时执行。在单核 CPU 上,多个任务占用不同的 CPU 时间片,物理上还是串行执行的。但是由于每个任务所占用的 CPU 时间片非常短(如:10ms),看起来就像是多个任务在共同执行一样,这样的场景就叫做并发

并发
并发概念图

并行(Parallel):并行是指在同一时刻,多个任务同时执行。并行需要多核 CPU 或者多处理器系统,让不同的任务在不同的核心上同时运行。但是这并不意味着每个核心上只运行一个任务,每个核心上的运行策略其实和单核 CPU 没有太大区别(即也是并发的)。

并行概念图
并行概念图

🧶 2. 多线程的设计

开发一个任务时,要不要实现成并发程序呢?多线程一定更好吗?

在某些情况下多线程具有一定的优势,但是在某些情况下多线程并不具有优势,这需要根据当前程序的类型进行分析和判断。

程序的类型有以下两种:一种是 I/O密集型;一种是 CPU 密集型。

2.1 I/O 密集型程序

I/O密集型,即程序里面指令的执行,涉及一些 I/O 操作,比如设备、文件、网络操作(如等待客户端的连接)等,I/O操作是可以将进程阻塞住的,如果我们再给这样的程序分配时间片,其实就相当于 CPU 空闲下来了。

在 I/O 密集型的程序在执行的时候,在 I/O 操作没有准备好时,程序是会被放在阻塞队列中的,在阻塞队列中是不受操作系统调度的。

正因如此,I/O 密集型程序更适合设计成多线程程序。

I/O 密集型程序不论是在单核还是多核的情况下,都是适合设计成多线程程序的,因为他不会造成 CPU 资源的浪费。

2.2 CPU 密集型程序

CPU 密集型程序,即程序里面的指令都是用来做计算用的,例如大量的加减乘除、深度学习都是在进行大量的计算。

CPU密集型程序,在单核情况下是不适合设计成多线程程序的,因为线程的调度有额外的花费:线程的上下文切换(当前线程调度完了,该调度下一个线程)。在这种情况下,相当于只有一个计算器,单线程是一个人一直计算,而多线程是多个人一人算一段。但是传递计算器的过程会产生一定的花销。而单线程进行上下文切换时,要获取之前计算到的信息,这也是一笔开销。

但是其在多核情况下是比较适合设计成多线程程序的。

🧶 3. 线程同步

线程同步有两种场景:① 线程互斥;② 线程通信

线程互斥:

  • 互斥锁 mutex
  • 原子类型 atomic

线程通信:

  • 条件变量 condition_variable
  • 信号量 semaphore

3.1 数据竞态(竞态条件)

一个进程中的所有线程共享整个进程的堆内存,每个线程私有自己的栈内存。

如下图所示,当我们有多个线程想要执行 Code 时,那我们就要考虑这段代码能否在多线程环境下执行。是否能在多线程环境下执行主要要看这段代码是否存在数据竞态(或称竞态条件)。

竞态条件:代码片段 Code 在多线程环境下执行,随着线程的调度顺序不同,而得到不同的运行结果,这就说明这段代码存在竞态条件。简单来说就是,$Thread1→Thread2→Thread3$ 会得到一个结果,而 $Thread2→Thread1→Thread3$ 可能是另一个结果。这是我们不期待的。

存在竞态条件的代码片段称为临界区代码段

为了保证不出现临界区代码段,我们应该要保证代码的原子操作

如果在多线程环境下不存在竞态条件,那么我们称这段代码片段是可重入的(就是一段代码在没执行完的情况下又被运行了),否则是不可重入的。

3.2 线程互斥

线程互斥包括:① 互斥锁 mutex;② 原子操作atomic

mutexlock 操作(悲观锁)、unlock 操作或者 try_lock 操作(活锁、乐观锁)。mutex 是重量级的锁。

但是我们有时候不需要很重量级的锁,比如我们有时候可能只是执行一个自增操作 x++,或者是较为简单的一些操作,这时候我们就不太需要一个重量级的锁,C++ 11 已经提供了 $CAS$ (无锁机制)操作,即 Compare & Set/Compare & Swap。无锁机制并不是说没有锁,而是说这个锁是轻量级的,我们可以用 $CAS$ 实现无锁队列、无锁链表、无锁数组等。

C++ 11count++ 为例,count++ 在操作系统中其实进行了三步指令操作,如下图所示:

  • 假设首先执行 $Thread \ 1$,此时执行 mov eax, count 指令,将 count 的值写到 eax 此时 eax 的值由未知数 x 变为 0
  • 然后 $Thread \ 1$ 执行 add eax, 1 指令,将 eax 寄存器中的值加 1
  • 此时时间片完,进行线程切换操作,切换到 $Thread \ 2$;
  • 此时执行 $Thread \ 2$,执行 mov eax, count 指令,将 count 的值写到 eax,此时 count 为从全局获取到的 0eax 为 未知数x,此时 eax 寄存器由 x 被写为 0
  • 然后 $Thread \ 2$​ 执行 add eax, 1 指令,将 eax 寄存器中的值加 1
  • 此时时间片完,进行线程切换操作,切换到 $Thread 1$;
  • 此时执行 $Thread \ 1$,执行 mov count, eax 指令,将 $Thread \ 1$ 上次切换之前的 eax 的值(为 $1$)写到 count 中,count0 变成 1
  • 此时 $Thread \ 1$ 执行完毕,切换到 $Thread \ 2$;
  • 此时执行 $Thread \ 2$,执行 mov count, eax 指令,将 $Thread \ 2$ 上次切换之前的 eax 的值(为 $1$)写到 count 中,count1 变成 1

我们可以发现,即使是简单的 count++ 也不是一个原子操作。

对于这种操作,我们当然可以用重量级的锁(lock)来进行锁定,但是我们更倾向于使用 $CAS$ 来实现这个问题。

3.2 线程通信

线程通信包括:① 条件变量 condition_variable;② 信号量 semaphore

在线程中,并不往往都是几个线程互不相干,有时候会存在一定的依赖关系,如 $Thread \ 1$ 的某段代码需要依赖于 $Thread \ 2$ 的某段代码,因为我们并不能保证线程的调度顺序(即有可能被依赖代码可能需要好久才能执行完毕,而已经有线程需要这个代码的结果了),所以我们这个时候就需要进行线程间的通信。

条件变量

条件变量需要配合互斥锁(互斥量)一起使用,其构造函数就需要传入一个互斥锁(互斥量),即 mutex + condition_variable。一个常见的用法就是生产者——消费者模型。*线程池其实就是使用了生产者——消费者模型*。

mutex 互斥锁就是资源计数只能是 $0$ 或 $1$ 的互斥锁,即执行 mutex.lock() 后锁的资源计数 $1→0$,执行 mutex.unlock() 后锁的资源计数 $0→1$。而信号量可以看作 资源计数没有限制mutex 互斥锁。

信号量

信号量都是单独使用的,不需要配合其他条件一同使用。C++ 11 并没有提供信号量的操作,直到 C++ 20 才从语言层面支持了信号量,但是我们完全可以使用 C++ 11 来自己实现一个信号量。信号量也可以用于实现生产者消费者模型,但是无法做到精细的控制。初始信号量为$0$,生产者生产后信号量增加,消费者消费后信号量减少。通过判断信号量来进行简单的线程间通信。

此外还有二元信号量的概念,semaphore sem(1) 资源计数 $0 / 1$,可以完成和 mutex 互斥锁同样的线程互斥操作,但是其和 mutex 是存在一定区别的。mutex 只能是哪个线程获取锁,哪个线程释放锁;semaphore 则不同,sem.wait()sem.post() 可以处在不同的线程中调用。例如:有三个线程调用了 sem.wait() 等待执行一块代码 Code,其中一个线程 $Thread \ A$ 开始执行代码,其他两个线程 $Thread \ B$ 和 $Thread \ C$ 依旧在等待。如果此时有其他代码(存在误操作或者其他原因)调用了 sem.post(),此时会使信号量增加,$Thread \ B$ 或 $Thread \ C$ 其中一个会执行代码,若此时 $Thread \ A$ 还没有执行完代码,这就有可能导致数据竞态问题。

🧶 4. C++ 11 Windows 线程库的基本使用

C++ 11 线程库头文件 #include <thread>

  • 创建线程

    可以使用 std::thread 来创建线程,这其实是使用线程类 std::thread 声明一个线程实例 printThread 的过程。这里我们直接这样创建线程,生成应用程序之后运行输出结果:Hello World - I'm NilEra @-@,但是运行之后会出现错误。

    这里出现错误的原因是:当我们启动了 thread_print 线程,此时程序不会在这里等待线程执行完成,而是会继续向下执行,导致出现程序已经执行到 return 0 时,线程还没有执行完成,因此这里会产生报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <iostream>
#include <thread>

// 定义线程需要执行的函数
void printThread(void) {
std::cout << "Hello World - I'm NilEra @-@" << std::endl;
}

int main(void) {
// 创建线程
std::thread thread_print(printThread);
return 0;
}
  • 主程序等待线程执行完毕

    为了解决上述出现的问题,我们需要让主程序等待的线程执行完毕再进行退出,这时我们就需要用到 join() 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <iostream>
#include <thread>
#include <string>

// 定义线程需要执行的函数
void printHelloWorld(std::string msg) {
std::cout << msg << std::endl;
}

int main() {
// 创建线程
std::thread thread_print(printHelloWorld, "Hello Thread @-@ I'm NilEra...");

// 等待线程执行完成
thread_print.join();
return 0;
}
  • 分离线程

    上述的问题还可以使用 detach() 函数来进行解决。当执行下面的程序时,控制台不会又任何输出,直接退出程序。

    这是因为线程执行完线程的创建之后,紧接着执行了 detach(),此时线程的具体操作还未来得及执行就进行了线程的分离。进程结束之后,thread_print 线程还在后台运行。但是因为此时进程已经结束,线程执行的过程中不会有输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <iostream>
#include <thread>
#include <string>

// 定义线程需要执行的函数
void printHelloWorld(std::string msg) {
std::cout << msg << std::endl;
}

int main() {
// 创建线程
std::thread thread_print(printHelloWorld, "Hello Thread @-@ I'm NilEra...");

// 线程分离
thread_print.detach();
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 使用如下代码可以更清晰的看到执行 detach() 的效果, 以及 join() 和 detach() 的区别
// join() 是阻塞的
#include <iostream>
#include <thread>
#include <string>
#include <Windows.h>

// 定义线程需要执行的函数
void printHelloWorld(std::string msg) {
std::cout << msg << std::endl;
for (int i = 0; i < 10000; i++) {
std::cout << i << std::endl;
}
}

int main() {
// 创建线程
std::thread thread_print(printHelloWorld, "Hello Thread @-@ I'm NilEra...");
Sleep(100);
thread_print.detach();

return 0;
}
  • 判断线程是否可以合并

    有时我们需要对线程判断其是否可以进行 join() 操作,此时我们可以调用 joinable() 函数,joinable() 会返回一个 bool 值,用于判断线程是否可以进行 join() 操作。

    如果我们对一个不可使用 join() 或者 detach() 的线程进行了 join()detach() 操作,会出现一个 SystemError,在一些比较严谨的项目中,会先使用 joinable() 进行判断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <iostream>
#include <thread>
#include <string>

// 定义线程需要执行的函数
void printHelloWorld(std::string msg) {
std::cout << msg << std::endl;
}

int main() {
// 创建线程
std::thread thread_print(printHelloWorld, "Hello Thread @-@ I'm NilEra...");
bool isJoin = thread_print.joinable();
if (isJoin) {
thread_print.join();
}
return 0;
}

🧶 5. 线程函数中的数据未定义错误

5.1 传递临时变量问题

  • 错误示例:如下使用 std::thread 类时,传入线程函数 foo 和 参数 a,这里的参数 a 会被作为值传递,即传递的不是 a 的引用,而是 a 所存储的值 1。但是如果是值传递的话,这里调用的实际上是 foo((int&) 1) 会产生编译错误,因为 1 实际上是一个右值,而非常量引用的初始值必须为左值,所以这里会产生编译错误。

    我们平时调用 foo 时,可以使用 foo(a),实际上我们进行了隐式转换,执行的实际上是 foo((int&) a)

1
2
3
4
5
6
7
8
9
10
11
12
13
/* 错误示例 */
#include <iostream>
#include <thread>

void foo(int& x) {
x += 1;
}

int main(void) {
int a = 1;
std::thread thread_plus(foo, a);
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
/* 修改示例 */
#include <iostream>
#include <thread>

void foo(int& x) {
x += 1;
}

int main(void) {
int a = 1;
std::thread thread_plus(foo, std::ref(a));
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/* 拓展 */
#include <iostream>
#include <thread>
#include <string>
#include <Windows.h>

void foo(int& x) {
x += 1;
}

void const_foo(const int& x) {
std::cout << "const_foo param x is : " << x << std::endl;
}

int main(void) {
int a = 1;
std::thread thread_plus(foo, std::ref(a));
Sleep(10);
std::thread thread_plus_const(const_foo, a);
thread_plus.join();
thread_plus_const.join();
return 0;
}

5.2 传递指针或引用指向局部变量的问题

  • 错误示例:在下面的程序中,我们定义了一个全局线程变量,并且在 test 函数被调用时开启了这个线程 t,在 main 函数中,执行了 test 函数,在执行到 t = std::thread(foo, std::ref(a)); 时,线程启动。这时会出现两种情况:

    ① (大概率出现)在线程启动的时候,test 函数已经结束了运行,而局部变量 a 的内存被释放,此时出现空指针错误;

    ② (小概率出现)程序正常执行,这是因为线程执行的比 test 函数更快,当线程执行结束时,a 还没有被释放。我们可以在 t = std::thread(foo, std::ref(a)); 下添加 Sleep(10),使程序暂停 10 ms,保证线程结束的时候 test 还未执行完成来观察这一现象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/* 错误示例 */
#include <iostream>
#include <thread>
#include <string>

std::thread t;

void foo(int& x) {
x += 1;
}

void test(void) {
int a = 1;
t = std::thread(foo, std::ref(a));
}

int main(void) {
test();
t.join();
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* 修改示例 */
#include <iostream>
#include <thread>
#include <string>

std::thread t;
int a = 1; // 延长 a 的声明周期

void foo(int& x) {
x += 1;
}

void test(void) {
// int a = 1;
t = std::thread(foo, std::ref(a));
}

int main(void) {
test();
t.join();
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* 拓展 */
#include <iostream>
#include <thread>
#include <string>

std::thread t;

void foo(int& x) {
x += 1;
}

void test(void) {
int a = 1;
t = std::thread(foo, std::ref(a));
Sleep(10);
}

int main(void) {
test();
t.join();
return 0;
}

5.3 传递指针或引用指向已经释放的内存问题

  • 错误示例:这个问题和上面的问题是差不多的问题,这里有可能会① 直接报编译错误;② 通过编译,但是给出不期待的结果;③ 极小概率出现正常执行的情况。

    当我们启动线程后,手动释放 ptr_a,此时若线程 t 的执行在释放内存之前(小概率),不会出现不期待的访问结果;但是如果线程 t 在释放内存之后执行(大概率),则会出现不期待的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* 错误示例 */
#include <iostream>
#include <thread>
#include <string>
#include <Windows.h>

std::thread t;

void foo(int* x) {
*x += 1;
std::cout << *x << std::endl;
}

int main(void) {
int* ptr_a = new int(1); // 定义一个指针类型的变量 ptr_a
// 初始化为其内存中存储的值为 1
std::thread t(foo, ptr_a);
delete ptr_a; // 这里手动释放 ptr_a

t.join();
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/* 修改示例 */
#include <iostream>
#include <thread>
#include <string>
#include <Windows.h>

std::thread t;

void foo(int* x) {
*x += 1;
std::cout << *x << std::endl;
}

int main(void) {
int* ptr_a = new int(1); // 定义一个指针类型的变量 ptr_a
// 初始化为其内存中存储的值为 1
std::thread t(foo, ptr_a);
Sleep(10); // 启动线程后等待 10 ms, 此时线程大概率已经执行完毕
delete ptr_a; // 这里手动释放 ptr_a

t.join();
return 0;
}

5.4 类成员作为入口函数,类对象被提前释放

  • 错误示例:这个错误和上面的错误基本上没有区别,只不过是将要 int 型换成了 类型。

    main 函数中,创建了一个 MyClass 类型的对象 obj。接着,启动了一个线程 t,这个线程执行 MyClass::func,并传递 obj 的地址 &obj 给它,这里的 &objthis 指针。当 t 线程启动时,它会在后台执行 MyClass::func,但是,main 函数在启动线程后立即返回,而没有等待线程完成。此时,局部变量 obj 会被销毁。如果 t 线程还没有执行完 MyClass::func,则它将尝试访问一个已经销毁的对象,导致未定义行为和运行时错误。

    我们可以使用智能指针的方式来防止出现指针提前释放的情况,使用 std::shared_ptr<MyClass> 创建并管理 MyClass 对象的生命周期。std::make_shared<MyClass>() 创建一个 shared_ptr,并返回一个指向堆上分配的 MyClass 对象的共享指针。传递 objshared_ptr)给 std::thread 的构造函数时,会增加引用计数,确保 MyClass 对象在 obj 和线程中都有效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/* 错误示例 */
#include <iostream>
#include <thread>
#include <string>
#include <memory>

std::thread t;

class MyClass {
public:
void func(void) {
std::cout << "[Thread " << std::this_thread::get_id() << "] Started..." << std::endl;
std::cout << "[Thread " << std::this_thread::get_id() << "] Finished..." << std::endl;
}
};

int main(void) {
MyClass obj;
std::thread t(&MyClass::func, &obj);
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* 正确修改 */
#include <iostream>
#include <thread>
#include <string>
#include <memory>

std::thread t;

class MyClass {
public:
void func(void) {
std::cout << "[Thread " << std::this_thread::get_id() << "] Started..." << std::endl;
std::cout << "[Thread " << std::this_thread::get_id() << "] Finished..." << std::endl;
}
};

int main(void) {
// 使用智能指针
std::shared_ptr<MyClass> obj = std::make_shared<MyClass>();
std::thread t(&MyClass::func, obj);

// 保证线程运行结束再退出主程序
t.join();
return 0;
}

🧶 6. 互斥量解决多线程数据共享问题

数据共享问题:在多个线程中共享数据时,需要注意 线程安全 问题。如果多个线程同时访问同一个变量,并且其中至少有一个线程对该变量进行了写操作,那么就会出现数据竞争问题。数据竞争可能导致程序崩溃、产生未定义错误或者得到错误的结果。

为了避免数据竞争问题,需要使用同步机制来确保多个线程之间对共享数据的访问量是安全的。常见的同步机制包括互斥量条件操作原子操作等。

可以看到下面这张图,来体会一下没有锁的情况下导致的多线程数据共享的问题。我们有两个线程 Thread 1Thread 2,这两个线程都在执行的过程中,其操作都是 a += 1。具体过程如下:
① 此时我们有一个变量 a = 1,首先 Thread 1 获取到了 a,执行操作后 a=2

Thread 2 获取到了 a,执行操作后 a=3

Thread 1 获取到了 a,执行操作后 a=4

Thread 2 获取到了 a,执行操作后 a=5

Thread 1Thread 2 同时获取到了 a,同时执行操作后 a=6

一共执行了 6 次操作,a 应该由 1 变成 7。但是实际上他最终的结果是 6

下面演示这种错误:

在这个案例中,我们运行了两个线程 t1t2,每个线程都让 a50,000,因此我们期待的结果是 100,000。但是实际上运行的结果一般是小于 100,000 的。比如我运行了几次,分别是:78,3016160354843。说明两个线程多个瞬间同时获取到了变量 a

当然,循环次数较小的时候也许会出现结果正确的问题,这是因为编译器帮我们汇编成了原子操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* 错误示例 */
#include <iostream>
#include <thread>
#include <string>
#include <memory>

int a = 0;

void func(void) {
for (int i = 0; i < 50000; i++) {
a += 1;
}
}

int main(void) {
std::thread t1(func);
std::thread t2(func);
t1.join();
t2.join();
std::cout << a << std::endl;
return 0;
}

为了解决这个问题,我们可以使用 互斥量mutex 对变量进行上锁操作。**互斥量的头文件是:#include <mutex>**。

我们修改程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <iostream>
#include <thread>
#include <memory>
#include <mutex>

int a = 0;
std::mutex mtx; // 定义互斥量

void func(void) {
for (int i = 0; i < 50000; i++) {
mtx.lock(); // 加锁操作
a += 1;
mtx.unlock(); // 解锁操作
}
}

int main(void) {
std::thread t1(func);
std::thread t2(func);
t1.join();
t2.join();
std::cout << a << std::endl;
return 0;
}

如何判断线程是否是安全的:当多线程程序每一次运行的结果,和单线程程序运行的结果始终都一样,则认为你的线程是安全的。

🧶 7. 互斥量死锁

假设有两个线程 Thread 1Thread 2,他们需要对互斥量 mtx1 和互斥量mtx2 进行访问,而且需要按照以下顺序获取互斥量的所有权(获取所有权可以理解为加锁操作

  • Thread 1 先获取 mtx1 的所有权,再获取 mtx2 的所有权。
  • Thread 2 先获取 mtx2 的所有权,再获取 mtx1 的所有权。

此时如果两个线程同时运行,就会产生死锁:

Thread 1 拿到了 mtx1,同时 Thread 2 拿到了 mtx2

但是此时因为 Thread 2 占有 mtx2 所以 Thread 1 等待 mtx2 空闲;而 Thread 1 占有 mtx1,所以 Thread 2 等待 mtx1 空闲。

所以此时 Thread 1Thread 2 都无法进一步操作,所以造成了死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/* 死锁模拟 */
#include <iostream>
#include <thread>
#include <string>
#include <memory>
#include <mutex>
#include <Windows.h>

std::mutex mtx1, mtx2;
void func_1(void) {
mtx1.lock(); // 先获取 mtx1
Sleep(1000); // 这里停等 1s, 确保 thread_2 抢占到 mtx2
mtx2.lock(); // 再获取 mtx2
mtx2.unlock(); // 释放 mtx2
mtx1.unlock(); // 释放 mtx1
}

void func_2(void) {
mtx2.lock(); // 先获取 mtx2
mtx1.lock(); // 再获取 mtx1
mtx1.unlock(); // 释放 mtx1
mtx2.unlock(); // 释放 mtx2
}

int main(void) {
std::thread thread_1(func_1);
std::thread thread_2(func_2);
thread_1.join();
thread_2.join();
std::cout << "THREAD OVER..." << std::endl;
return 0;
}

可以用顺序锁解决上述问题,即两个函数都先获取 mtx1 的所有权。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <iostream>
#include <thread>
#include <string>
#include <memory>
#include <mutex>
#include <Windows.h>

std::mutex mtx1, mtx2;
void func_1(void) {
mtx1.lock(); // 先获取 mtx1
mtx2.lock(); // 再获取 mtx2
mtx2.unlock(); // 释放 mtx2
mtx1.unlock(); // 释放 mtx1
}

void func_2(void) {
mtx1.lock(); // 先获取 mtx1
mtx2.lock(); // 再获取 mtx2
mtx2.unlock(); // 释放 mtx2
mtx1.unlock(); // 释放 mtx1
}

int main(void) {
std::thread thread_1(func_1);
std::thread thread_2(func_2);
thread_1.join();
thread_2.join();
std::cout << "THREAD OVER..." << std::endl;
return 0;
}

🧶 8. lock_guardunique_lock()

8.1 lock_guard

lock_guardC++ 标准库中一种互斥量的封装类,用于保护共享数据,防止多个线程同时访问统一资源而导致的数据竞争问题。lock_guard 具有以下特点。

  • 当构造函数被调用时,该互斥量会被自动锁定
  • 当析构函数被调用时,该互斥量会被自动解锁
  • std::lock_guard 对象不能复制或移动,因此他只能在局部作用域中使用

下面简单看几段代码,来体会一下不使用 lock_guard 和使用 lock_guard 的区别:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* 不使用 lock_guard 和 unique_lock() */
#include <iostream>
#include <thread>
#include <memory>
#include <mutex>

int a = 0;
std::mutex mtx; // 定义互斥量

void func(void) {
for (int i = 0; i < 50000; i++) {
mtx.lock(); // 加锁操作
a += 1;
mtx.unlock(); // 解锁操作
}
}

int main(void) {
std::thread t1(func);
std::thread t2(func);
t1.join();
t2.join();
std::cout << a << std::endl;
return 0;
}

当我们添加 lock_guard 后,代码会变成这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <thread>
#include <memory>
#include <mutex>

int a = 0;
std::mutex mtx; // 定义互斥量

void func(void) {
for (int i = 0; i < 50000; i++) {
std::lock_guard<std::mutex> lg(mtx); // 这里调用了构造函数, mtx 自动加锁
a += 1;
} // 这里调用了析构函数, mtx 自动解锁
}

int main(void) {
std::thread t1(func);
std::thread t2(func);
t1.join();
t2.join();
std::cout << a << std::endl;
return 0;
}

我们可以详细剖析 lock_guard 的源码,来深刻理解该类。

补充阅读:C++ using 用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
template <class _Mutex>
class _NODISCARD lock_guard { // class with destructor that unlocks a mutex
// 这里定义了一个模板类 lock_guard, 其中 _Mutex 是模板参数, 表示互斥锁的类型
// _NODISCARD 是一个属性, 表示这个类不应该被忽略(通常是为了警告开发者不要忽略这个类的对象创建)
public:
using mutex_type = _Mutex; // 定义一个别名 mutex_type, 指向模板参数 _Mutex
// 这样可以方便地在类的其他地方使用 mutex_type 来表示互斥锁的类型

// 这是一个构造函数, 接收一个互斥锁的引用 _Mtx
// explicit 关键字防止隐式转换
// 在构造函数体内, 调用 _MyMutex.lock() 方法锁定互斥锁
// 这样, 当 lock_guard 对象被创建时, 互斥锁会自动被锁定
explicit lock_guard(_Mutex& _Mtx) : _MyMutex(_Mtx) { // construct and lock
_MyMutex.lock();
}

// 这是另一个构造函数, 接收一个互斥锁的引用 _Mtx 和一个标记类型 adopt_lock_t
// 这个构造函数不会锁定互斥锁, 假设互斥锁已经被锁定
// adopt_lock_t 通常是一个空的结构体类型, 用于区分不同的构造函数
lock_guard(_Mutex& _Mtx, adopt_lock_t) : _MyMutex(_Mtx) {} // construct but don't lock

// 析构函数标记为 noexcept, 表示不会抛出异常
// 当 lock_guard 对象被销毁时, 调用 _MyMutex.unlock() 方法解锁互斥锁
// 这确保了即使在异常情况下,互斥锁也会被解锁。
~lock_guard() noexcept {
_MyMutex.unlock();
}

// 显式删除拷贝构造函数和赋值操作符
// 防止 lock_guard 对象被复制或赋值
// 这是因为互斥锁的所有权不应该在多个对象之间共享, 以避免潜在的并发问题。
lock_guard(const lock_guard&) = delete;
lock_guard& operator=(const lock_guard&) = delete;

private:
_Mutex& _MyMutex;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include <thread>
#include <memory>
#include <mutex>

int a = 0;
std::mutex mtx; // 定义互斥量

void func(void) {
for (int i = 0; i < 50000; i++) {
std::lock_guard<std::mutex> lg(mtx); // 1. 创建 lock_guard 对象 lg
// 2. 传递 mtx 给构造函数的参数 _Mtx
// 3. lock_guard 构造函数被调用
// 4. _MyMutex(_Mtx) 初始化私有成员 _MyMutex 为 mtx 的引用
// 5. _MyMutex.lock() 锁定 mtx
a += 1;
} // 6. 循环结束时, lg 对象销毁, 调用析构函数 _MyMutex.unlock() 解锁 mtx
}
int main(void) {
std::thread t1(func);
std::thread t2(func);
t1.join();
t2.join();
std::cout << a << std::endl;
return 0;
}

8.2 unique_lock

std::unique_lockC++ 标准库中提供的一个互斥量封装类,用于在多线程程序中对互斥量进行加锁和解锁操作。与 lock_guard 仅提供自动的加锁、解锁操作不同,unique_lock 还提供了对互斥量进行更加灵活的管理,包括:延迟加锁、条件变量、超时等。

std::unique_lock 提供了以下几个成员函数:

  • lock:尝试对互斥量进行加锁操作,如果当前互斥量已经被其他线程持有,则当前线程会被阻塞,直到互斥量被成功加锁。
  • try_lock:尝试对互斥量进行加锁操作,如果当前互斥量已经被其他线程持有,则立刻返回 false,否则返回 true
  • try_lock_for(const std::chrono::duration<Rep, Period>& rel_time):尝试对互斥量进行加锁操作,,如果当前互斥量已经被其他线程持有,则当前线程会被阻塞,直到互斥量被成功加锁或超过了指定时间。
  • try_lock_until(const std::chrono::time_point<Clock, Duration>& abs_time):尝试对互斥量进行加锁操作,如果当前互斥量已经被其他线程持有,则当前线程会被阻塞,直到互斥量被成功加锁或超过了指定时间点。
  • unlock():对互斥量进行解锁操作

下面简单看几段代码,来体会一下不使用 unique_lock 和使用 unique_lock 的区别:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* 不使用 unique_lock */
#include <iostream>
#include <thread>
#include <memory>
#include <mutex>

int a = 0;
std::mutex mtx; // 定义互斥量

void func(void) {
for (int i = 0; i < 50000; i++) {
mtx.lock(); // 加锁操作
a += 1;
mtx.unlock(); // 解锁操作
}
}

int main(void) {
std::thread t1(func);
std::thread t2(func);
t1.join();
t2.join();
std::cout << a << std::endl;
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/* 使用 unique_lock */
#include <iostream>
#include <thread>
#include <memory>
#include <mutex>

int a = 0;
std::mutex mtx; // 定义互斥量

void func(void) {
for (int i = 0; i < 50000; i++) {
std::unique_lock<std::mutex> uniqueLock(mtx);
a += 1;
}
}

int main(void) {
std::thread t1(func);
std::thread t2(func);
t1.join();
t2.join();
std::cout << a << std::endl;
return 0;
}

下面我们演示一下 unique_lock 的更多操作:

8.2.1 lock()/unlock() 手动加锁/解锁

既然 unique_lock 支持自动加锁和自动解锁,那么我们为什么不让它自动的加锁和解锁呢?这是因为 unique_lock 提供了更多的加锁方式,在使用其他加锁方式之前,我们需要保证 unique_lock 不能自动加锁。

1
2
std::unique_lock<std::mutex> uniqueLock(mtx, std::defer_lock); 	// 传入 defer_lock 表示构造函数什么都不做
// 加锁/解锁操作需要由程序员自己完成

那么下面将代码修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/* 不使用 unique_lock */
#include <iostream>
#include <thread>
#include <memory>
#include <mutex>

int a = 0;
std::mutex mtx; // 定义互斥量

void func(void) {
for (int i = 0; i < 50000; i++) {
std::unique_lock<std::mutex> uniqueLock(mtx, std::defer_lock); // 使用 unique_lock 进行自动加锁
uniqueLock.lock();
a += 1;
uniqueLock.unlock();
}
}

int main(void) {
std::thread t1(func);
std::thread t2(func);
t1.join();
t2.join();
std::cout << a << std::endl;
return 0;
}
8.2.2 try_lock 尝试加锁

尝试对互斥量进行加锁操作,如果当前互斥量已经被其他线程持有,则当前线程会被阻塞,直到互斥量被成功加锁。

8.2.3 try_lock_for(const std::chrono::duration<Rep, Period>& rel_time)

尝试对互斥量进行加锁操作,,如果当前互斥量已经被其他线程持有,则当前线程会被阻塞,直到互斥量被成功加锁或超过了指定时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
#include <thread>
#include <memory>
#include <mutex>

int a = 0;
std::timed_mutex mtx; // 定义互斥量

void func(void) {
for (int i = 0; i < 50; i++) {
std::unique_lock<std::timed_mutex> uniqueLock(mtx, std::defer_lock);
// 这里进行判断, 通过检查 try_lock_for 的返回值, 只有在成功锁定时才修改 a, 保证了互斥锁的有效性。
if (uniqueLock.try_lock_for(std::chrono::microseconds(5))) {
std::this_thread::sleep_for(std::chrono::microseconds(10));
a += 1;
}
}
}

int main(void) {
std::thread t1(func);
std::thread t2(func);
t1.join();
t2.join();
std::cout << a << std::endl;
return 0;
}

常见错误:"try_lock_for": 不是 "std::mutex" 的成员std::mutex 不支持延迟加锁,当我们想要进行延迟加锁操作时,需要保证互斥量是时间锁,即 timed_mutex

此外,不要想当然,以下是一种常见的错误写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* 不使用 unique_lock */
#include <iostream>
#include <thread>
#include <memory>
#include <mutex>
#include <Windows.h>

int a = 0;
std::timed_mutex mtx; // 定义互斥量

void func(void) {
for (int i = 0; i < 200; i++) {
std::unique_lock<std::timed_mutex> uniqueLock(mtx, std::defer_lock);
// try_lock_for 尝试锁定互斥锁, 但返回值没有被检查
// 如果锁定失败, 代码仍然会执行 a += 1, 这会导致数据竞争
uniqueLock.try_lock_for(std::chrono::microseconds(5));

// 即使 try_lock_for 失败, 代码仍然会增加 a, 这使得互斥锁形同虚设
std::this_thread::sleep_for(std::chrono::microseconds(10));
a += 1;
}
}
8.2.4 try_lock_until(const std::chrono::time_point<Clock, Duration>& abs_time)

尝试对互斥量进行加锁操作,如果当前互斥量已经被其他线程持有,则当前线程会被阻塞,直到互斥量被成功加锁或超过了指定时间点。

🧶 9. call_once 实现单例模式

9.1 单例模式

单例模式:单例模式是一种常见的设计模式,用于确保在系统的整个声明周期内,某个类只能创建一个实例,确保该类的唯一性。由于单例模式是全局唯一的,因此在多线程环境中使用单例模式时,需要考虑线程安全问题。

为什么要使用单例模式:① 节约资源,一个类只有一个实例,不存在多份实例,节省资源;② 方便控制,在一些操作公共资源的场景时,避免了多个对象引起的复杂操作。

单例模式分类:单例模式可以分为 懒汉式饿汉式 ,两者之间的区别在于创建实例的时间不同。

  • 懒汉式:系统运行中,实例并不存在,只有当需要使用该实例时,才会去创建并使用实例,延迟实例化。这种方式要考虑线程安全。
  • 饿汉式:系统一运行,就初始化创建实例,当需要时,直接调用即可,提前实例化。这种方式本身就线程安全,没有多线程的线程安全问题。

单例类的特点:

  • 构造函数和析构函数为私有类型,目的是禁止外部构造和析构。
  • 拷贝构造函数和赋值构造函数是私有类型,目的是禁止外部拷贝和赋值,确保实例的唯一性。
  • 类中有一个获取实例的静态方法,可以全局访问。

9.2 emplace_back 函数快速了解

这里使用到了 emplace_backemplace_backC++ 11 引入的一个 STL 容器方法,用于在容器的末尾直接构造元素。为了更好的理解代码,这里补充一下 emplace_back 的基本用法:

emplace_back 方法提供了一种高效、简便的方式在容器末尾添加新元素。与 push_back 方法不同,emplace_back 直接在容器内部构造元素,而不是先构造临时对象然后再移动或复制到容器中。

9.2.1 具体示例

假设我们有一个包含复杂对象的 std::vector,我们可以使用 emplace_back 来避免不必要的临时对象创建和销毁。

  • 使用 push_back
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <vector>
#include <string>

struct MyStruct {
int x;
std::string y;

MyStruct(int a, const std::string& b) : x(a), y(b) {}
};

int main() {
std::vector<MyStruct> vec;
MyStruct obj(1, "example"); // 先从外部构造 obj
vec.push_back(obj); // 复制构造
vec.push_back(MyStruct(2, "example2")); // 临时对象构造然后移动构造
}
  • 使用 emplace_back
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <vector>
#include <string>

struct MyStruct {
int x;
std::string y;

MyStruct(int a, const std::string& b) : x(a), y(b) {}
};

int main() {
std::vector<MyStruct> vec;
vec.emplace_back(1, "example"); // 直接在容器内构造
vec.emplace_back(2, "example2"); // 直接在容器内构造
}

在使用 emplace_back 时,构造函数的参数直接传递给容器内的新对象构造函数,避免了临时对象的创建。

9.2.2 在多线程中使用 emplace_back
1
2
3
for (int i = 0; i < 10; ++i) {
threads.emplace_back(threadFunction);
}

这里的 emplace_back 用于将新的 std::thread 对象添加到 std::vector<std::thread> 容器中。

等效的 push_back 用法如下:

1
2
3
for (int i = 0; i < 10; ++i) {
threads.push_back(std::thread(threadFunction));
}

在这种情况下,push_backemplace_back 都可以使用,但 emplace_back 更加高效,因为它避免了临时对象的创建和销毁。使用 emplace_back 时,std::thread 对象直接在 threads 容器中构造:

  1. 构造新对象emplace_back 方法直接在容器的内存空间中构造新对象,而不是先在别处构造然后移动到容器中。
  2. 传递参数emplace_back 将传递的参数直接用于新对象的构造函数,此处 threadFunction 被作为构造函数参数传递给 std::thread
9.2.3 总结

**push_back**:需要一个已经构造好的对象(可能会导致额外的复制或移动)。

**emplace_back**:直接在容器的内存空间内构造对象,避免了额外的临时对象创建和移动操作。

在多线程代码中,使用 emplace_back 可以使代码更加高效和简洁,尤其是在添加新对象到容器时,可以避免不必要的对象拷贝和临时对象创建。

9.3 线程不安全的懒汉模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include <iostream>
#include <thread>
#include <mutex>
#include <fstream>
#include <vector>
#include <stdexcept>

// 日志类
class Logger {
public:
static Logger& getInstance() {
if (instance == nullptr) {
instance = new Logger();
}
return *instance;
}

Logger(const Logger&) = delete;
Logger& operator=(const Logger&) = delete;

void log(const std::string& message) {
std::lock_guard<std::mutex> lock(mtx_);
logFile_ << message << std::endl;
}

void printLog(const std::string& message) {
std::cout << "Instance Address IS: " << this << std::endl;
std::cout << message << std::endl;
}

private:
Logger() {
logFile_.open("log.txt", std::ios::out | std::ios::app);
if (!logFile_.is_open()) {
throw std::runtime_error("Unable to open log file");
}
}

~Logger() {
if (logFile_.is_open()) {
logFile_.close();
}
}

static Logger* instance;
std::ofstream logFile_;
std::mutex mtx_;
};

Logger* Logger::instance = nullptr;

void threadFunction() {
Logger& logger = Logger::getInstance();
logger.printLog("Logging from thread");
}

int main() {
std::vector<std::thread> threads;

for (int i = 0; i < 10; ++i) {
threads.emplace_back(threadFunction);
}

for (auto& thread : threads) {
thread.join();
}

return 0;
}

输出结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Instance Address IS: Instance Address IS: 000001FE29979930Instance Address IS: 000001FE2997A2A0
Logging from thread
000001FE29979930
Logging from thread
Instance Address IS: 000001FE2997C330
Logging from thread
Instance Address IS: 000001FE2997C330
Logging from thread
Instance Address IS: 000001FE2997B840
Logging from thread
Instance Address IS: 000001FE2997ACD0
Logging from thread
Instance Address IS: 000001FE2997C330
Logging from thread
Logging from thread

Instance Address IS: 000001FE2997C330
Logging from thread
Instance Address IS: 000001FE2997C330
Logging from thread

我们可以看到多个不同的实例地址,这是因为:在多线程环境中,当多个线程同时调用 getInstance 方法时,有可能多个线程同时通过 if (instance == nullptr) 检查,并进入实例化代码块。这会导致多个线程同时创建多个实例,违背了单例模式的初衷。

9.4 线程安全的懒汉模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#include <iostream>
#include <thread>
#include <mutex>
#include <fstream>
#include <vector>
#include <stdexcept>

class Logger {
public:
static Logger& getInstance() {
if (instance == nullptr) {
std::lock_guard<std::mutex> lock(mtx_);
if (instance == nullptr) {
instance = new Logger();
}
}
return *instance;
}

Logger(const Logger&) = delete;
Logger& operator=(const Logger&) = delete;

void log(const std::string& message) {
std::lock_guard<std::mutex> lock(log_mtx_);
logFile_ << message << std::endl;
}

void printLog(const std::string& message) {
std::cout << "Instance Address IS: " << this << std::endl;
std::cout << message << std::endl;
}

private:
Logger() {
logFile_.open("log.txt", std::ios::out | std::ios::app);
if (!logFile_.is_open()) {
throw std::runtime_error("Unable to open log file");
}
}

~Logger() {
if (logFile_.is_open()) {
logFile_.close();
}
}

static Logger* instance;
static std::mutex mtx_;
std::mutex log_mtx_;
std::ofstream logFile_;
};

Logger* Logger::instance = nullptr;
std::mutex Logger::mtx_;

void threadFunction() {
Logger& logger = Logger::getInstance();
logger.printLog("Logging from thread");
}

int main() {
std::vector<std::thread> threads;

for (int i = 0; i < 10; ++i) {
threads.emplace_back(threadFunction);
}

for (auto& thread : threads) {
thread.join();
}

return 0;
}

输出结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Instance Address IS: 000001E13A41AFE0
Logging from thread
Instance Address IS: 000001E13A41AFE0
Logging from thread
Instance Address IS: 000001E13A41AFE0
Logging from thread
Instance Address IS: 000001E13A41AFE0
Logging from thread
Instance Address IS: 000001E13A41AFE0
Logging from thread
Instance Address IS: 000001E13A41AFE0
Logging from thread
Instance Address IS: 000001E13A41AFE0
Logging from thread
Instance Address IS: 000001E13A41AFE0
Logging from thread
Instance Address IS: 000001E13A41AFE0
Logging from thread
Instance Address IS: 000001E13A41AFE0
Logging from thread

9.5 饿汉模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <iostream>
#include <thread>
#include <mutex>
#include <fstream>
#include <vector>
#include <stdexcept>

class Logger {
public:
static Logger& getInstance() {
return instance;
}

Logger(const Logger&) = delete;
Logger& operator=(const Logger&) = delete;

void log(const std::string& message) {
std::lock_guard<std::mutex> lock(mtx_);
logFile_ << message << std::endl;
}

void printLog(const std::string& message) {
std::cout << "Instance Address IS: " << this << std::endl;
std::cout << message << std::endl;
}

private:
Logger() {
logFile_.open("log.txt", std::ios::out | std::ios::app);
if (!logFile_.is_open()) {
throw std::runtime_error("Unable to open log file");
}
}

~Logger() {
if (logFile_.is_open()) {
logFile_.close();
}
}

static Logger instance;
std::ofstream logFile_;
std::mutex mtx_;
};

Logger Logger::instance;

void threadFunction() {
Logger& logger = Logger::getInstance();
logger.printLog("Logging from thread");
}

int main() {
std::vector<std::thread> threads;

for (int i = 0; i < 10; ++i) {
threads.emplace_back(threadFunction);
}

for (auto& thread : threads) {
thread.join();
}

return 0;
}

输出结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Instance Address IS: 00007FF652E1F6B0
Logging from thread
Instance Address IS: 00007FF652E1F6B0
Logging from thread
Instance Address IS: 00007FF652E1F6B0
Logging from thread
Instance Address IS: 00007FF652E1F6B0
Logging from thread
Instance Address IS: 00007FF652E1F6B0
Logging from thread
Instance Address IS: 00007FF652E1F6B0
Logging from thread
Instance Address IS: 00007FF652E1F6B0
Logging from thread
Instance Address IS: 00007FF652E1F6B0
Logging from thread
Instance Address IS: 00007FF652E1F6B0
Logging from thread
Instance Address IS: 00007FF652E1F6B0
Logging from thread

9.6 使用 call_once 实现单例模式

为了使用 std::call_once 保证 Logger 类的线程安全,我们可以利用 std::call_oncestd::once_flag 来确保单例实例只被创建一次。std::call_once 是一个 C++ 11 引入的机制,用于确保给定的函数只被调用一次,即使在多线程环境下。

以下是一个使用 std::call_once 实现线程安全单例模式的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#include <iostream>
#include <thread>
#include <mutex>
#include <fstream>
#include <vector>
#include <stdexcept>

// Logger 类
class Logger {
public:
static Logger& getInstance() {
std::call_once(initInstanceFlag, &Logger::initSingleton); // 传入 once_flag 和 initSingleton 函数
return *instance;
}

Logger(const Logger&) = delete;
Logger& operator=(const Logger&) = delete;

void log(const std::string& message) {
std::lock_guard<std::mutex> lock(mtx_);
logFile_ << message << std::endl;
}

void printLog(const std::string& message) {
std::cout << "Instance Address IS: " << this << std::endl;
std::cout << message << std::endl;
}

private:
Logger() {
logFile_.open("log.txt", std::ios::out | std::ios::app);
if (!logFile_.is_open()) {
throw std::runtime_error("Unable to open log file");
}
}

~Logger() {
if (logFile_.is_open()) {
logFile_.close();
}
}

static void initSingleton() {
instance = new Logger();
}

static Logger* instance;
static std::once_flag initInstanceFlag;
std::ofstream logFile_;
std::mutex mtx_;
};

// 初始化静态成员
Logger* Logger::instance = nullptr;
std::once_flag Logger::initInstanceFlag;

void threadFunction() {
Logger& logger = Logger::getInstance();
logger.printLog("Logging from thread");
}

int main() {
std::vector<std::thread> threads;

for (int i = 0; i < 10; ++i) {
threads.emplace_back(threadFunction);
}

for (auto& thread : threads) {
thread.join();
}

return 0;
}

详细解释:

  1. 静态成员变量

    • static Logger* instance:指向单例实例的指针。
    • static std::once_flag initInstanceFlag:用于保证 initSingleton 只被调用一次的标志。
  2. getInstance 方法

    • std::call_once(initInstanceFlag, &Logger::initSingleton)std::call_once 保证 initSingleton 在多线程环境下只被调用一次。initInstanceFlag 确保 initSingleton 只会被执行一次,即使多个线程同时调用 getInstance
  3. initSingleton 方法

    • initSingleton 是一个静态方法,用于初始化单例实例。std::call_once 会调用此方法来创建单例实例。
  4. 构造函数和析构函数

    • Logger 的构造函数和析构函数负责打开和关闭日志文件。
  5. logprintLog 方法

    • log 方法使用互斥锁 mtx_ 保护对日志文件的写操作,以确保线程安全。
    • printLog 方法输出实例的地址和消息。

运行上述代码时,所有线程都会调用 Logger::getInstance() 获取单例实例。由于使用了 std::call_onceinitSingleton 方法只会被执行一次,从而确保整个程序中只有一个 Logger 实例。通过输出的实例地址,可以验证所有线程获取的都是相同的实例。

使用 std::call_oncestd::once_flag 可以确保单例实例在多线程环境下只被创建一次,从而实现线程安全的懒汉单例模式。这样不仅保证了线程安全性,还避免了不必要的锁开销。

🧶 10. condition_variable 条件变量

conditon_variable 可以用来实现一个生产者消费者程序。

例如对于一个队列:

  • 只要队列不满,生产者就可以进行生产
  • 只要队列满了,生产者就停止生产
  • 只要队列不空,消费者就可以进行消费
  • 只要队列空了,消费者就停止消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>
#include <atomic>

std::queue<int> buffer; // 共享缓冲区
const unsigned int maxBufferSize = 10; // 缓冲区的最大大小
std::mutex mtx; // 互斥锁, 用于保护共享缓冲区
std::condition_variable cv; // 条件变量, 用于通知生产者和消费者
std::atomic<bool> done(false); // 原子变量, 用于通知消费者生产者已经完成生产

// 生产者函数
void producer(int id, int numItems) {
for (int i = 0; i < numItems; ++i) {
std::unique_lock<std::mutex> lock(mtx); // 锁定互斥锁

// 等待直到缓冲区有空闲位置
cv.wait(lock, [] { return buffer.size() < maxBufferSize; });

// 将生成的数据放入缓冲区
buffer.push(i);
std::cout << "Producer " << id << " produced: " << i << std::endl;

lock.unlock(); // 解锁互斥锁
cv.notify_all(); // 通知消费者缓冲区中有新数据

// 模拟生产延迟
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

// 设置 done 标志,通知消费者生产已经完成
done = true;
cv.notify_all(); // 通知消费者停止等待
}

// 消费者函数
void consumer(int id) {
while (true) {
std::unique_lock<std::mutex> lock(mtx); // 锁定互斥锁

// 等待直到缓冲区有数据或生产已经完成
cv.wait(lock, [] { return !buffer.empty() || done; });

// 如果生产已经完成且缓冲区为空, 退出循环
if (done && buffer.empty()) {
break;
}

// 从缓冲区取出数据
int item = buffer.front();
buffer.pop();
std::cout << "Consumer " << id << " consumed: " << item << std::endl;

lock.unlock(); // 解锁互斥锁
cv.notify_all(); // 通知生产者有空闲位置

// 模拟消费延迟
std::this_thread::sleep_for(std::chrono::milliseconds(150));
}
}

int main() {
const int numProducers = 2; // 生产者线程数量
const int numConsumers = 2; // 消费者线程数量
const int numItems = 40; // 每个生产者生成的项目数量

std::vector<std::thread> producers; // 生产者线程列表
std::vector<std::thread> consumers; // 消费者线程列表

// 创建生产者线程
for (int i = 0; i < numProducers; ++i) {
producers.emplace_back(producer, i, numItems);
}

// 创建消费者线程
for (int i = 0; i < numConsumers; ++i) {
consumers.emplace_back(consumer, i);
}

// 等待所有生产者线程完成
for (auto& producer : producers) {
producer.join();
}

// 等待所有消费者线程完成
for (auto& consumer : consumers) {
consumer.join();
}

return 0;
}

🧶 11. 线程池 Thread Pool

11.1 线程的消耗

为了完成任务,创建很多线程可以吗?线程真的是越多越好吗?

  • 线程的创建和销毁都是非常”重”的操作

    线程的创建和销毁都需要执行不少操作,下面的操作只是一个简化的操作,具体的操作可以深入了解操作系统的原理。

    那么如果要在业务执行的过程中去实时的创建和销毁线程,那么是一种很消耗系统资源和性能的操作。

线程创建的操作(简化版)
  • 线程栈本身占用大量内存

    32 位操作系统的地址空间大小为 $2^{32}$ 个地址,即 4 GB。其中一部分地址空间用于操作系统的内核空间,而另一部分则用于用户空间。通常情况下,用户空间可以是 2 GB 或者 3 GB,剩下的全部是内核空间。具体取决于操作系统的设置。由当前进程创建的所有线程,共享进程的地址空间。

    那么一个进程最多可以开多少线程呢?

    假设用户空间为 3 GB,即 $3 × 1024 = 3072$,在 Linux 环境下使用 ulimit -a 可以查询到系统的一些信息,如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    core file size          (blocks, -c) 0
    data seg size (kbytes, -d) unlimited
    scheduling priority (-e) 0
    file size (blocks, -f) unlimited
    pending signals (-i) 3795
    max locked memory (kbytes, -l) 64
    max memory size (kbytes, -m) unlimited
    open files (-n) 1024
    pipe size (512 bytes, -p) 8
    POSIX message queues (bytes, -q) 819200
    real-time priority (-r) 0
    stack size (kbytes, -s) 8192
    cpu time (seconds, -t) unlimited
    max user processes (-u) 3795
    virtual memory (kbytes, -v) unlimited
    file locks (-x) unlimited

    我们可以看到 Stack Size8192 kbytes,即 8 MB,则可计算 $3072 ÷ 8 = 384$,所以在 Linux 环境下,一个进程最多创建 $384$ 个线程。

    我们将线程函数所用的栈空间就是线程栈。

    通过上面的描述,我们可以发现,如果线程数量太多,创建了一大批线程,还没有具体做事情,每一个线程都需要线程栈,栈几乎都被占用完了,就没内存运行其他程序了。

  • 线程的上下文切换要占用大量时间

    线程过多,线程的调度是需要上下文切换的,也需要花费大量的 CPU 时间,如果更多的时间花费到上下文切换中,那么实际业务中利用 CPU 的时间就降低了,CPU 的利用率就降低了。

  • 大量线程同时唤醒会使系统经常出现锯齿状负载或者瞬间负载量很大导致宕机

    这种情况下,如果同一时间,很多 I/O 操作都准备好了,或者说很多线程都再等待一个 I/O 操作,有可能导致大量线程同时被唤醒,导致系统经常出现锯齿状负载或者瞬间负载量很大导致宕机。

所以我们可以看出,线程不是越多越好。那么创建多少线程才是合适的呢?

一般来说,创建线程的数量一般由 CPU 的核心数来确定的,即:有几个核创建几个线程。C++ 的很多开源库如 moduolibeventJavaNettymina 等,都采取了这一策略。

当然如果一个功能是重 I/O 的,可以做出适当的调整,增加线程数量。

11.2 什么是线程池?为什么使使用线程池

线程过多会带来调度开销,进而影响缓存局部性和整体性能。操作系统上创建线程和销毁线程都是很“重”的操作,耗时耗性能都比较多,那么在服务执行的过程中,如果业务量比较大,实时的去创建线程、执行业务、业务完成后销毁线程,那么会导致系统的实时性能降低,业务的处理能力也会降低。

线程池是一种预先创建一定数量线程的机制,或者说其是一种线程使用模式。

线程池维护着多个线程,这些线程可以在需要时被重复使用,而无需每次都重新创建和销毁线程。

线程池的主要目的是提高性能和资源利用效率,特别是在需要频繁创建和销毁大量线程的场景下,线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数据取决于可用的并发处理器、处理器内核、内存、网络 sockets 等数量。

线程池的优势:线程池的优势就是(每个池都有自己的优势),在服务进程启动之初,就事先创建好线程池里面的线程,当业务流量到来时需要分配线程,直接从线程池中获取一个空闲线程执行 Task 任务即可,Task 执行完成后,也不用释放线程,而是把线程归还到线程池中继续给后续的 Task 提供服务。

线程池的优点:

  • 线程和任务分离,提升线程重用性;
  • 控制线程并发数量,降低服务器压力,统一管理所有线程;
  • 提高性能:提升系统响应速度,假如创建线程用的时间为 T1,执行任务用的时间为T2,销毁线程用的时间为 T3,那么使用线程池就免去了 T1T3 的时间。创建和销毁线程都是相对昂贵的操作,特别是在高并发场景下,频繁地创建和销毁线程会极大地降低程序的性能。通过线程池预先创建一定数量的线程并保存在内存中,可以避免频繁地创建和销毁线程,从而提高程序的性能。
  • 资源管理:线程是操作系统级别的资源,如果线程数量过多,可能会导致系统资源的过度消耗,甚至可能导致系统崩溃。通过线程池,可以控制同时运行的线程数量,避免资源过度消耗。
  • 任务调度:线程池可以更方便地进行任务的调度。通过线程池,可以将任务分配给不同的线程执行,实现并行处理,提高程序的执行效率。
  • 简化编程:使用线程池可以简化多线程编程的复杂性。程序员只需要将任务提交给线程池,而不需要关心线程的创建、管理和销毁等细节,降低了多线程编程的难度。

因此,C++线程池的出现是为了解决在高并发场景下创建和销毁线程的开销问题,提高程序的性能和并发处理能力,简化多线程编程的复杂性。

在项目中如何使用线程池?

以一个添加订单功能为例,我们需要查询用户的收获地址和商品信息。在单线程的代码中,我们需要按照顺序进行查询;而在多线程的代码中,我们在保证两个功能没有依赖关系的情况下可以同时查询(即不需要先查询用户的收获地址,再根据收获地址查询商品信息)。

顺序执行的速度是 503 ms,而多线程执行的速度是 329 ms,能明显提高运行速度(效率提升幅度在 50%~60%)。

11.3 线程池的两种模式

11.3.1 fixed 模式线程池

fixed 模式线程池里面的线程个数是固定不变的,一般是 ThreadPool 创建时,根据当前机器的 CPU 核心数量进行指定。

11.3.2 cache 模式线程池

假设一个线程池内有四个线程,然后此时出现了四个非常耗时的 I/O 操作任务,此时每个线程分配了一个任务,导致四个线程都被占用阻塞在这里了。此时又出现了一些任务,但是由于线程池中的线程长时间阻塞,所以新的任务可能迟迟得不到处理,此时新的任务堵在任务队列中,相当于整个程序卡死在了这里。在这种情况下,我们希望我们线程池的大小是可以动态改变的,这就是 cache 模式的线程池。

cache模式的线程池里面的线程个数是可动态增长的,根据任务的数量动态的增加线程的数量,但是会设置一个线程数量的阈值(线程过多的坏处上面已经讲过了),任务处理完成,如果动态增长的线程空闲了60s还没有处理其它任务,那么关闭线程,保持池中最初数量的线程即可。

11.4 线程池的架构、流程和使用方式

由图所示,我们的线程池主要需要完成以下几个操作:

  • 创建线程池、设置线程池模式、启动线程池
  • 提交异步任务:Result result = pool.submitTask(concreteTask);
  • 保证可以接受各种各样的任务
  • 获取异步任务的处理结果(这里需要用到 Any 上帝类)
  • 实现任务队列,这里任务队列要保证线程安全,且任务队列的任务数不宜过多(过多会导致大量的内存占用)

参考文献

基于C++11实现线程池的工作原理 - 靑い空゛ - 博客园 (cnblogs.com)

C++线程池的原理(画图)及简单实现+例子(加深理解)_C++ 线程池原理 - CSDN博客

C++: 每一个C++程序员都应该知道的RAII - 个人文章 - SegmentFault 思否

C++线程池 - BrianX - 博客园 (cnblogs.com)

C++笔记-Atomic原子操作/CAS(Compare and Swap) - 流了个火 - 博客园 (cnblogs.com)

作者

NilEra

发布于

2024-07-29

更新于

2024-08-06

许可协议

评论