Cpp Concurrency

cpp concurrency

并发:

  • 多进程并发: 进程之间独立, OS为每个进程提供保护, 意味着在进程层面上并发更安全也更容易, 但进程的启动和维护会降低系统效率.
  • 多线程并发: 线程之间共享内存, 不需要系统来管理访问, 使得多线程并发效率高于多进程并发, 但需要在线程之间提供额外的保护机制.

使用并发的原因: 关注点分离(SOC)和性能
不使用并发的原因: 除非潜在的性能增益足够大或关注分离足够清晰, 能抵消所需的额外的开发时间以及与维护多线程代码相关的额外成本, 否则, 别用并发.

标准线程库的效率: 绝大多数情况下, 额外增加的复杂性和出错几率都远大于性能的小幅提升带来的收益. 即使有证据确实表明瓶颈处在了cpp标准库的工具中, 其原因也很有可能处于其低劣的应用设计, 而非低劣的库实现.

启动线程: 构造std::thread对象
启动线程后需要明确是等待线程结束(join)还是让其自主运行(detach)
如果thread对象销毁之前还没有做出决定, 程序就会终止, thread的析构函数会调用std::terminate()

如果不等待线程, 就需要保证线程结束之前, 其可访问数据的有效性.

等待线程完成: join(), 调用join()同时也就意味着清理线程相关的存储, thread对象将不再与已经完成的线程有任何关联.
如果想要分离线程, 可以在启动线程后直接使用derach()进行分离, 如果需要等待相应线程, 则需要细心的选择join()的位置, 如果线程运行后发生异常, 则join()的调用会被忽略.
避免被异常终止, 需要在异常处理过程中调用join(), 从而避免生命周期问题. 但这并非万能之法, 一个更加简洁的方案是RAII, 提供一个类, 并在析构析构函数中调用join()函数[条款8, 别让异常逃离析构函数].

后台运行线程(守护线程): detach(), 调用derach()意味着主线程不能与之产生直接交互, 也就是说不会等待该线程结束, 如果线程分离, 那么就不可能有thread对象能引用它, 分离线程的确在后台运行, 所以分离线程不能被加入, 当分离线程完成任务时, c++运行库会保证相关资源被正确回收.

向线程函数传递参数: 默认参数拷贝到线程独立内存中

  • 当指向动态变量的指针作为参数传递给线程的情况… 需要注意发生隐式转换时造成的崩溃, 在传参数进行显示转换.
  • 期望向线程传递一个引用, 但因为整个参数被复制, 向线程传递了一个副本的引用… 解决办法std::ref.
  • 线程运行的函数是成员函数时: std::thread(&C::member_func, &c), 线程将运行c.member_func(), 若带有参数则依次向后添加
  • 有些时候提供的参数只能移动, 但不能拷贝, 如需要一个std::unique_ptr作为参数, 就需要用到std::move()

每个thread实例都负责管理一个执行线程, 执行线程的所有权可以在多个thread实例中相互转移, 这依赖于[thread实例的可移动且不可复制性], 不可复制性保证了同一时间点, 一个thread实例只能关联一个执行线程, 可移动性使得程序员可以自己决定, 那个实例拥有实际执行线程的所有权.

转移线程所有权: 将线程作为返回值和参数???
量产线程, 线程自动化管理???

运行时决定线程数量: std::thread::hardware_concurrency(), 返回能同时并发在一个程序中的线程数量,

识别线程: std::thread::id, std::thread::get_id(), 在当前线程中通过std::this_thread::get_id()也可以获得线程标识

线程之间共享数据: 条件竞争
避免恶性条件竞争: 互斥量std::mutex, std::lock() std::unloak(), 不推荐直接调用成员函数, 使用RAII std::lock_guard, 在构造时提供已锁的互斥量, 在析构的时候解锁, 从而保证一个已锁的互斥量总是被正确的解锁.
互斥量保护的数据需要对接口的设计相当谨慎, 要确保互斥量能锁住任何对保护数据的访问, 且不留后门: 一个迷失的指针或引用会让lock_guard这种保护形同虚设.
切勿将受保护数据的指针会引用传递到互斥锁作用域之外, 无论是函数返回值, 还是存储在外部可见内存, 亦或是以参数的形式传递到用户提供的函数中去.

stack固有接口导致的条件竞争, 为了获得线程安全的stack需要改变固有接口
stack之所以分top和pop两个步骤, 是为了防止pop时, 若pop失败, 数据将被破坏, 所以通过top取数据, 即使没有取得数据, 数据仍是完好无损的, 但这样的分割却导致了本想避免和消除的条件竞争, 解决方案:

  1. 传入一个引用参数来获取想要的pop值: 缺点是需要构造栈存储值的类型, 用于接受目标, 有些时候这样做得不偿失, 甚至这种类型根本没有复制构造操作
  2. 无异常抛出的拷贝构造函数或移动构造函数???
  3. 返回指向pop值的指针: 弹出指针的优势在于自由拷贝, 并且不会产生异常, 缺点是返回一个指针需要对对象的内存分配进行管理, 对于简单类型如int, 内存的管理开销要远大于直接返回值. 使用shared_prt进行管理[需要重新看一下英文]

一个给定的操作需要两个或两个以上的互斥量时, 另一个潜在的问题将会出现–死锁
死锁的的解决方案:

  • 避免嵌套锁: 当一个线程已经获得一个锁时, 别再去获取第二个. 当需要获取多个锁时, 使用std::lock来做这件事–对获取锁这个操作上锁, 来避免死锁.
  • 避免在持有锁时调用用户提供的代码
  • 使用固定顺序获取锁
  • 使用锁的层次结构

std::unique_lock–灵活的锁
std::unique_lock是使用更为自由的不变量, std::unique_lock实例不会总与互斥量的数据类型相关, 使用起来比lock_guard更为灵活, 它提供了lock(), try_lock()和unlock()成员函数, 使得能够更加灵活的处理. 也可以将unique_lock对象传入std::lock()中
使用adopt_lock作为第二个参数传入构造函数, 对互斥量进行管理
使用defer_lock作为第二个参数传递进去, 表示互斥量保持解锁状态
如果unique_lock在析构时仍持有锁, 那么将调用unlock(), 可以通过owns_lock()查询是否拥有锁
如果lock_guard能够满足需求, 就没有必要使用unique_lock, unuque_lock的灵活需要些许性能作为代价

不同域互斥量所有权的传递, 一种可能的情况是允许一个函数锁住一个互斥量, 并且将所有权转移到调用者上, 所以调用者可以在这个锁的保护范围内执行额外的操作

1
2
3
4
5
6
7
8
9
10
std::unique_lock<std::mutex> get_lock(){
extern std::mutex some_mutes;
std::unique_lock<std::mutex> lk(some_mutex);
prepare_data();
return lk;
}
void prepare_data(){
std::unique_lock<std::mutex> lk(get_lock());
do_something();
}

锁的粒度: 锁保护着的数据量的大小, 在必要的时候手动解锁和加锁

保护共享数据的替代措施:

  • 保护共享数据初始化过程: std::call_once(std::once_flag, init_resource)
    初始化以定义完全在一个线程中发生, 并且没有其他线程可在初始化完成前对其进行处理, 条件竞争终止于初始化阶段.

读者-写者”锁”: 一个”作者”线程独占访问和共享访问, 让多个”读者”线程并发访问. boost::shared_lock boost::shared_mutex

嵌套锁: std::recursice_mutex

同步并发操作
等待一个事件或其他条件:

  • std::this_thread::sleep_for(std::chrono::milliseconds(100))线程短暂休眠, 对互斥量进行解锁, 在休眠结束后再对互斥量进行上锁, 期间另外的线程就有机会获取锁, 该方法很难确定正确的休眠事件
  • 条件变量: 当某些线程被终止时, 为了唤醒等待线程, 终止的线程会想等待线程广播”条件达成”的信息 std::condition_variable std::condition_variable_any #include <condition_cariable> 两者都需要一个互斥量一起才能工作, 前者仅限于与mutex一起工作, 后者可以和任意满足最低标准的互斥量一起工作.

condition_variable::wait()如果满足条件则返回继续, 不满足条件将解锁互斥量, 将该线程置于阻塞等待状态, 直到condition_variable::notiry_one()通知等待线程, 等待线程将继续获取互斥量, 重新检查状态是否满足, 满足则继续, 不满足解锁挂起. 这个过程需要使用unique_lock, 因为guard_lock没有这么灵活…

使用条件变量构建线程安全队列
notify_one将会触发一个正在执行的wait()的线程, 但这里不保证线程一定会被通知到, 即使只有一个等待线程被通知时, 所有处理线程也有可能都在处理数据
所以当很多处理线程在等待同一事件, 对于通知他们需要作出回应(如初始化后, 等待共享数据更新等情况下) -> notyfy_all()

使用期望future等待一次性事件: 当等待线程只等待一次, 当条件为真时, 它就不会再等待条件变量了, 所以一个条件变量可能并非同步机制最好选择, 尤其是, 条件在等待一组可用的数据块时. 在这样的情况下, future是一个合适的选择.
std::future<>(唯一期望)实例只能与一个事件相关联 std::shared_future<>(共享期望)实例能够关联多个事件
当任务被触发时, 等待期望的状态就会变成”就绪”

带返回值的后台任务
道格拉斯 亚当斯
std::async 启动一个异步任务, 与thread的等待方式不同, async会返回一个future对象, 当需要这个值时, 调用这个对象的get()成员函数; async会阻塞线程直到”期望”状态为就绪为止, 之后返回结果.

1
2
3
4
5
6
7
8
9
#include <future>
#include <iostream>
int find_the_answer_to_ltuae();
void do_other_stuff();
int main(){
std::future<int> the_answer = std::async(find_the_answer_to_ltuae);
do_other_stuff();
std::cout<<"the answer is " << the_answer.get()<<std::endl;
}

向async提供额外参数, std::launch std::launch::defered 用来表明函数调用被延迟到wait()或get()函数调用时才执行, std::launch::async表明函数必须在其所在的独立线程上执行, std::launch::deferred|std::launch::async表明实现可以选择这两种方式的一种

async不是唯一的让一个future与一个任务相关联的唯一方式, 将任务包装至std::packaged_task<>实例中, 或通过编写代码的方式使用std::promise<>类型模板显示设置值, 通过get_future()来获取期望

std::packaged_task<>对一个函数或可调用对象, 绑定一个期望. 当std::packaged_task<>对象被调用, 它就会调用相关

单线程多连接问题: 一对std::promise/std::feture: 在期望上可以阻塞等待线程, 同时, 提供数据的线程可以使用组合中的”承诺”来对相应值进行设置, 以及将”期望”的状态设置为”就绪” , 通过get_future()成员函数来获取以一个给定的std::promise相关的std::future对象, 当”承诺”已经设置完毕(通过set_value())对于的期望状态就变为”就绪”, 并且可以用于检索已存储的值

为期望存储异常:
等待多个线程:std::future是只移动的, std::shared_future是可拷贝的,

限定等待时间:

  1. 时延超时 wait_for(…duration) sleep_for(…duration) …
  2. 绝对超时 wait_until(…time_point) sleeo_until(…time_point) …

原子操作同步机制: 当多余两个线程访问同一个内存地址时, 对每个访问都需要定义一个顺序. 1. 使用互斥量来确定访问顺序 2. 使用原子操作的同步机制来确定两个线程的访问顺序
原子操作是不可分割的. 在Cpp中, 大多数情况下你需要一个原子类型去执行一个原子操作.

标准原子类型, 所有这种类型上的操作都是原子的
标准原子类型不能拷贝和赋值, 但可以将其隐式转换为内置类型, 在进行赋值
一个原子类型的所有操作都是应该是原子的
std::atomic<> load() store() exchange() compare_exchange_weak() compare_exchange_strong()
原子操作的内存序列: 排序一致序列 sequentially consistent, 获取-释放序列(memory_order_consume, memory_order_acquire, memory_order_release memory_order_acq_rel) 自由序列 memory_order_relaxed
Store: memory_order_relaxed memory_order_release memory_order_seq_cst
Load: memory_order_relaxed memory_order_consume memory_order_acquire memory_order_seq_cst
Read-modify-write: memory_order_relaxed memory_order_consume memory_order_acquire memory_order_release memory_order_acq_rel memory_order_seq_cst

用户自定义类型创建原子变量:

  1. 这个类型必须有拷贝赋值运算符, 意味着这个类型不能有任何虚函数或虚基类, 以及必须使用编译器创建的拷贝赋值操作
  2. 自定义类型中所有的基类和非静态数据成员也都需要支持拷贝赋值操作
  3. 这个类型必须是”位可比的”
    编译器将会对std::atomic类型的所有操作生成一个内部锁
Operation atomic_flag atomic atomic<T*> atomic<integral_type> atomic<other_type>
test_and_set 1
clear 1
is_lock_for 1 1 1 1
load 1 1 1 1
store 1 1 1 1
exchange 1 1 1 1
compare_exchange_weak, compare_exchange_strong 1 1 1 1
fetch_add, += 1 1
fetch_sub, -= 1 1
fetch_or, \ = 1
fetch_and, &= 1
fetch_xor, ^= 1
++, – 1 1

使用原子操作同步数据和强制排序

设计并发数据结构

  1. 确保访问时安全的
    • 确保无线程能够看到, 数据结构的”不变量”破坏时的状态.
    • 小心那些会引起条件竞争的接口, 提供完成操作的函数, 而非操作步骤
    • 注意数据结构的行为是否会产生异常, 从而确保”不变量”的状态稳定
    • 将死锁的概率降到最低. 使用数据结构时需要限制锁的范围, 且避免嵌套锁的存在
  2. 确保真正的并发访问
    • 锁的范围中的操作, 是否允许在外执行?
    • 数据结构中不同的却与是否能被不同的互斥量所保护?
    • 所有操作都需要同级互斥量保护吗?
    • 能否对数据结构进行简单的修改, 以增加并发访问的概率, 且不影响操作语义?
0%