线程、一致性和原子性
这是向零售人工智能x Advent Calendar 2022的第23天贡献。
前:第22天:关于使用@takurUN的API网关Kong进行开发的话题。
次:第24天:尝试@k-yoshigai的Go Workspace。
写什么
由于不习惯写这样的文章,而且如果要写出合适的内容可能需要一些时间,我一度很犹豫。
最后,我决定写一些关于Atomic变量的事情,模糊地记得,也许将来会有使用的场合。我正在写关于rust中的线程和内存模型,但它也适用于像C++这样的系统编程语言(实际上,rust的内存模型是基于C++的)。
在thread上引发竞争
首先,我们准备了以下类似的程序。该程序用Rust编写,但即使是不了解Rust的人,也应该能够根据外观大致理解。
fn main() {
// 引数1: スレッド数
let thread_count: u8 = std::env::args().nth(1).unwrap().parse().unwrap();
// 引数2: それぞれのスレッドでのカウント数
let loop_count: u64 = std::env::args().nth(2).unwrap().parse().unwrap();
println!("thread count = {thread_count}");
println!("loop count = {loop_count}");
println!(
"result = {}",
count_up_relaxed::count_up(thread_count, loop_count)
);
}
pub fn count_up(thread_count: u8, loop_count: u64) -> u64 {
let mut value: u64 = 0;
let ptr = &mut value as *mut u64 as usize;
// 指定された数のthreadを作って、それぞれが指定された回数分1を加算する。
(0..thread_count)
.map(|_| {
std::thread::spawn(move || {
(0..loop_count).for_each(|_| unsafe {
let value = ptr as *mut u64;
if cfg!(debug_assertion) {
// debug buildの場合
*value += 1;
} else {
// release buildの場合
*value = std::ptr::read_volatile(ptr as *const u64) + 1;
}
})
})
})
.collect::<Vec<_>>()
.into_iter()
.map(std::thread::JoinHandle::join)
.for_each(Result::unwrap);
value
}
做的事情 (zuò de
-
- 准备一个数字变量,
-
- 创建由参数指定数量的线程,
- 在每个线程内部根据参数指定的次数进行递增操作。
在普通的Rust中,由于无法在线程之间共享原始指针,因此我使用unsafe块来进行指针解引用操作。请注意,在Rust中频繁使用这样的原始指针并不是很常见,除非是在使用库或其他情况下。
结果
线程数乘以计数数等于预期值。
如果是使用线程的情况下
-
- thread数 == 1
-
- count数 == 1000000
-
- 期待する結果 = 1000000
- 実際の結果 = 1000000
如果有8个线程的情况
-
- thread数 == 8
-
- count数 == 1000000
-
- 期待する結果 = 8000000
- 実際の結果 = 2266221(とか。実行するたびに変わる)
我认为只需要一个选项就可以用中文进行释义:
只要看程序,你就可以猜到结果,但是在使用两个以上的线程运行时,结果会比预期的数字少。这是由于在读取值并将其加1后写入时,其他线程可能会干扰,这就是竞争条件。
稍微解释一下,只有在发布版本(release build)时才会使用read_volatile。这是因为当我们只是将一个简单的u64指针写入并进行发布版本编译时,优化会立即生效并返回正确的值。虽然我想要在汇编代码中进行确认,但结果却非常多而且我无法完全理解,所以我不太确定。
而在调试版本(debug build)中,优化不会生效,因此会输出一些奇怪的数字。
竞态条件处理(互斥锁)
虽然走弯路了,但最容易理解的解决方法是,每个人都会首先考虑使用互斥锁。
在加法部分加上锁,这样该部分只能同时运行一个线程,从而获得预期的数字。很方便呢。
pub fn count_up(thread_count: u8, loop_count: u64) -> u64 {
use std::sync::{Arc, Mutex};
let value: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
// 指定された数のthreadを作って、それぞれが指定された回数分1を加算する。
(0..thread_count)
.map(|_| {
std::thread::spawn({
let value = Arc::clone(&value);
move || {
(0..loop_count).for_each(|_| {
let mut value = value.lock().unwrap();
*value += 1;
})
}
})
})
.collect::<Vec<_>>()
.into_iter()
.map(std::thread::JoinHandle::join)
.for_each(Result::unwrap);
Arc::try_unwrap(value).unwrap().into_inner().unwrap()
}
竞态条件防护措施(原子变量)
Mutex用来加锁,但如果使用Atomic变量,可以在内存访问的级别上得到很好的效果。我认为这是很多人根据需要做的事情。
pub fn count_up(thread_count: u8, loop_count: u64) -> u64 {
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
let value: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
// 指定された数のthreadを作って、それぞれが指定された回数分1を加算する。
(0..thread_count)
.map(|_| {
std::thread::spawn({
let value = Arc::clone(&value);
move || {
(0..loop_count).for_each(|_| {
value.fetch_add(1, Ordering::Relaxed);
})
}
})
})
.collect::<Vec<_>>()
.into_iter()
.map(std::thread::JoinHandle::join)
.for_each(Result::unwrap);
value.load(Ordering::Relaxed)
}
每个人轻松发布构建速度进行了基准测试,结果如下。
- 8 thread/1_000_000カウントしたときの結果(criterionで計測)
由于本次处理中的线程几乎完全成为了临界区,所以将其线程化实际上毫无意义,不用线程比使用线程要快得多(虽然没有进行基准测试)。
一般情况下,利用线程时会在线程内进行各种并行处理,其中可能存在临界区,但本次处理只是极端情况中的临界区。
从这里得出的观点可能是
pointerに直接加算する方法
正確な数字が必要な場合は使えない。存在の確認などを高速に行うだけなら十分使えるような気もします。
Mutexを使う方法
正確だけど遅い。
Atomic変数を使う方法
正確だし速い。Atomic変数を使えるならば使ったほうがいい。
所以,前面的铺垫很长,但是从这里开始。
在多个线程中观察内存值的方式
我会运行这样的程序。
fn main() {
loop {
let mut x = 0;
let mut y = 0;
let ptr_x = &mut x as *mut i32 as usize;
let ptr_y = &mut y as *mut i32 as usize;
let thread_0 = std::thread::spawn(move || unsafe {
// ここでなにかの処理
// store x
*(ptr_x as *mut i32) = 1;
// store y
*(ptr_y as *mut i32) = 1;
});
let thread_1 = std::thread::spawn(move || unsafe {
// ここでなにかの処理
// load y
let y = *(ptr_y as *const i32);
// load x
let x = *(ptr_x as *const i32);
(x, y)
});
thread_0.join().unwrap();
let (x, y) = thread_1.join().unwrap();
if x == 0 && y == 1 {
panic!("一貫性が無い");
}
}
}
正在做的事情
-
- 准备两个变量x和y。
-
- 线程0将存储1到x和y。
- 几乎同时启动的线程1将按照y和x的顺序加载。
期望的结果
当你长时间从事翻译工作时,不知不觉会形成一种逐句翻译的感觉,并期望得到以下类似的结果。
y
をload(y == 0)thread_1:
x
をload(x == 0)thread_0:
x
にstorethread_0:
y
にstore11thread_0: x
にstorethread_0:
y
にstorethread_1:
y
をload(y == 1)thread_1:
x
をload(x == 1)10thread_0: x
にstorethread_1:
y
をload(y == 0)thread_0:
y
にstorethread_1:
x
をload(x == 1)另一个实际发生的结果
尽管上述的三种情况是必然发生的,但除此之外,还有可能出现 x == 0,y == 1 的结果。这是导致上述程序出现 panic! 的地方。
虽然按照上述说法这种情况不会经常发生,但在每个线程中加入其他代码时,偶尔会发生。
thread_1中y看起来是1,这意味着在thread_0中x应该已经存储了1,但变成了0。很奇怪。此时发生的情况可能是以下两种之一。
-
- thread_1から観て、thread_0が、プログラマーが書いた順とは逆に、store y, store xの順で実行された(ように見えるように動いた)
- thread_0から観て、thread_1が、プログラマーが書いた順とは逆に、load x, load yの順で実行された(ように見えるように動いた)
在程序执行过程中,程序并不一定会按照程序员编写的顺序逐步执行。这是因为在执行时会对顺序进行优化。除非存在某种约束性的指令,否则CPU和编译器可以自由地对指令进行重新排序,而不会影响结果。
如果在下一个指令中需要使用的值未加载到CPU缓存中,而下一个下一个指令中需要使用的值已经加载到CPU缓存中,则据说在加载下一个指令中需要使用的值的同时,可能会优先执行下一个下一个指令。
在普通的程序中,这个功能可以很好地运行,特别是在单线程中,不仅不需要担心,反而能加快速度,它是一个值得感激的功能。但是,在多线程中使用这个功能可能会引发问题。
在上例中,使用了两个线程,但当准备好另一个线程时,可能会发生与线程1观察到的顺序相反的情况,也就是每个人都可能看到不同的结果,这是一种完全不一致的状态。
一贯性的分类
我們將探討一貫性的話題,並回收標題中第二個用語。
您可能聽過「最終一致性」這個詞。它是指在使用時能夠迅速讓值變得可見,但在使用時需要花費相當多的智力。
过去,当Cassandra刚刚推出时,如果进行GET操作(而且没有进行Read Repair),有时会返回几代以前的值,这让我感到非常困惑(虽然这种情况现在已经很少见了)。然而,正因为有这种行为,我们可以说Cassandra符合Eventual Consistency的原则。严格来说,它几乎没有任何保证。
就目前来说,除了Eventual Consistency之外,似乎还有很多其他保证一致性的级别。现在我从线程和内存的角度列举一些较为主流的选项。
1. 具有并行随机访问存储一致性(先进先出存储一致性)
-
- あるthreadがある場所にstore操作した結果は、他のthreadからも同じ順序で(一貫して)観測できる。
- 別のthreadから同じ場所に書き込まれた値は、他のthreadから異なる順序で観測されてもよい。
2. 因果一致性 (Causal Consistency)
-
- PRAM Consistencyのレベルと、
-
- 複数thread間での因果関係があるstoreの順序は、他のthreadからも同じ順で(一貫して)観測される。
- 複数thread間での因果関係が無いstoreの順序は、他のthreadから異なる順序で観測されてもよい。
如果你每天努力学习,那么你的成绩就会提高。
-
- thread_1が、変数aをstore
- thread_2が、変数aをloadした結果から計算して変数bにstoreした
当有将值存储到变量a和将值存储到变量b之间存在因果关系时,我们称之为存储到变量a和存储到变量b之间存在因果关系。
3. 连续一致性
-
- Causal Consistencyのレベルと、
-
- メモリ操作の順序は、どのthreadからも同じ順序で(一貫して)観測される。
-
- storeした値がloadで観測できるようになる時間は、thread間で異なってよい。
- 観測される値の変化の順序は、実時間におけるstoreの順序と一致しなくてもよい。
有点难理解,但这里所说的顺序并非指store指令的顺序,而是指实际写入并能被读取的顺序,意味着无论是哪个线程,都是相同的顺序。
因为保证了任何写入都能在所有线程中按照相同顺序显示,所以避免了线程间的矛盾,这是关键。
线性化
-
- Sequential Consistencyのレベルと、
- 各read/writeがグローバル時刻で順序つけられている。
5. 严格一致
-
- linearizationのレベルと、
- あるメモリ場所への任意のloadは、そこへの最も新しいstore結果になる。
据说除了这些之外,还有许多不同的一致性水平。这让人很困惑,如果没有从事这类工作的话会很快忘记,而且写出具体例子也很耗时间,所以在这里省略掉了。请自行搜索。
原子型
这是标题中第三个术语的回收。差不多要结束了。
在保持一致性的前提下,我们将整理上述措施中使用的原子类型。原子类型(变量)是指…
-
- 最新のstoreは、他のスレッドからそのうちloadできる。(可視性)
-
- 変数のstore中に、他のスレッドから中途半端な値がloadされることはない。(原子性)
- memory orderingにより一貫性が変わる。これは対象のAtomic変数だけでなく、その変数が関係する他の変数の可視性やAtomic性にも関わる。
在Rust中,std::sync::atomic为我们提供了如下的Atomic类型。每个类型都有与之相对应的原始类型,它们在内存中具有相同的二进制表示。
AtomicBool == bool
AtomicI8 == i8
AtomicI16 == i16
AtomicI32 == i32
AtomicI64 == i64
AtomicIsize == isize
AtomicPtr == *mut T
AtomicU8 == u8
AtomicU16 == u16
AtomicU32 == u32
AtomicU64 == u64
AtomicUsize == usize
内存排序(屏障)
Atomic型在C++中有很多方法,并且每个方法都接受以下类别的Ordering枚举作为参数,根据Ordering的不同,会影响内存的读写一致性顺序。
内存顺序仅仅提供了关于顺序的保证,并不能保证简单的临界区。请注意这一点。
#[non_exhaustive]
pub enum Ordering {
Relaxed,
Release,
Acquire,
AcqRel,
SeqCst,
}
我们将逐一查看不同的订单类型。
轻松自在
-
- そのAtomic変数自体の原子性と可視性のみ保証。
- その他のメモリアクセスの順序に関しては何も保証しない。
在很多情况下,只需放松不必太过担心,但需要注意的是仅有的保证是该Atomic变量是原子的,如果与其他变量在逻辑上相互作用,结果可能会出乎意料。然而,如果顺序稍有前后并无大碍的情况下,我认为可以故意使用。
发布 / 获得
在进行store操作时,请使用Release;在进行load操作时,请使用Acquire。如果在load操作中使用Release,或者在store操作中使用Acquire,将会出现panic。
作用方面来看,
thread Aが、あるAtomic変数に対して値aをRelease-storeし、
thread Bが、同じAtomic変数に対してAcquire-loadして値aを観察できた場合
thread Bは、Acquire-load後、thread Aにおける(少なくとも)Release-store以前のwriteを観察できる。Release-store以降のwriteも観察されるかもしれない。
thread Aは、Release-store前、thread Bにおける(少なくとも)Acquire-load以降のwriteを観察できない。Acquire-load以前のwriteは観察されるかもしれない。
从感觉上来说,Atomic变量就像一个能够明确前后关系的邮件队列。
-
- 当线程A向队列(原子变量)中发送信件(值)时(释放),
-
- 其他线程从队列(原子变量)中取走信件(值)后(获取),
- 在释放信件之前和获取之后的前后关系是被保证的。
以下是一个例子的样子。
use std::sync::{
atomic::{AtomicI32, Ordering},
Arc,
};
fn main() {
let ptr_x = &mut 0 as *mut i32 as usize;
let atomic_val = Arc::new(AtomicI32::new(0));
let thread_0 = std::thread::spawn({
let atomic_x = Arc::clone(&atomic_val);
move || {
unsafe {
// data = 1
*(ptr_x as *mut i32) = 1;
}
// Release-store
atomic_x.store(1, Ordering::Release);
}
});
let thread_1 = std::thread::spawn({
let atomic_x = Arc::clone(&atomic_val);
move || loop {
// busy loop開始
if atomic_x.load(Ordering::Acquire) == 1 {
// load-Acquire
if unsafe { *(ptr_x as *const i32) != 1 } {
unreachable!("ここには到達しない。");
};
break;
}
}
});
thread_0.join().unwrap();
thread_1.join().unwrap();
}
-
- thread_0でRelease-storeしたAtomic変数値を、thread_1が読み取れたということは、
-
- thread_0でのRelease-store以前の書き込みが全て読み取れることが保証される。
- つまりthread_1はptr_xへの書き込みを読めることが保証されるので、ptr_xは必ず1。
消费
只有在计算指针时,才会发生Acquire的动作。在C++中似乎比Acquire更快,但在rust的std库中并不存在,而且如果在rust中过多地使用原生指针,人们会担心是否安全,并且即使使用库来实现,也担心会不小心使用错误。省略。
收购与合并
在加载时,可以实现获取(Acquire)的效果,在存储时,可以实现释放(Release)的效果。它用于执行读取修改写入操作(如compare_and_exchange或fetch_add)时的加载和存储。
例如,对于可能失败(根据写入条件)的读-修改-写操作,如compare_and_exchange,有Ordering参数来表示成功情况下的Ordering和失败情况下的Ordering。成功情况下的Ordering与当前值的比较成功,而失败情况下的Ordering表示操作未成功。
这是一个例子的写法。
use std::sync::atomic::{AtomicU8, Ordering};
fn main() {
static COUNT: AtomicU8 = AtomicU8::new(0);
let ptr_0 = &mut 0 as *mut i32 as usize;
let ptr_1 = &mut 0 as *mut i32 as usize;
let thread_0 = std::thread::spawn(move || {
unsafe { *(ptr_0 as *mut i32) = 1 };
if COUNT.fetch_add(1, Ordering::AcqRel) == 1 {
println!("thread_0 wins.");
assert_eq!(unsafe { *(ptr_1 as *const i32) }, 1);
}
});
let thread_1 = std::thread::spawn(move || {
unsafe { *(ptr_1 as *mut i32) = 1 };
if COUNT.fetch_add(1, Ordering::AcqRel) == 1 {
println!("thread_1 wins.");
assert_eq!(unsafe { *(ptr_0 as *const i32) }, 1);
}
});
thread_0.join().unwrap();
thread_1.join().unwrap();
}
通过AcqRel,当一方的线程对Atomic变量进行的更改可以被另一方的线程读取时,该进行更改的线程之前的所有写入都可以被读取。
也就是说,无论哪个线程更晚,都保证对方可以读取Atomic变量的修改前的值。
此外,必定只有其中一方会执行if语句内的处理。不会发生两个都不处理或两个都处理的情况。
顺序一致
在AcqRel的基础上,保证了多线程对多个原子变量的修改能按照一致的顺序观察到。
换句话说,在Release/Acquire(AcqRel)中,当多个线程对多个原子变量进行修改时,有可能在线程之间呈现不同的顺序。也就是说,如果只使用一个原子变量,或者使用多个原子变量但彼此无关,则这是不必要的。
如果你感到困惑,有时会写上使用SeqCst,但这意味着很难理解,并且正在进行相当复杂的操作。要小心。
栅栏 (zhà
在与原子变量组合使用的情况下,有一个名为”memory fence”的东西。
在指定了Release、Acquire、AcqRel和SeqCst的Atomic变量命令中,
可以用指定了relaxed的Atomic变量命令和使用Release、Acquire、AcqRel、SeqCst fence来替代。
以前我一直在思考,即使只用Atomic变量也能实现,那么为什么还需要Atomic变量之外的fence有存在的意义呢?但是通过再次观察代码例子,发现在relaxed循环后面插入了fence,我稍微有点明白了。
看起来,保证了所有前后的原子操作的顺序。
额外内容:线程的join()
以原生的中文方式改述如下:可以查看Rust的线程join方法的说明,如下所述,保证了在调用thread的join()方法之前和之后的顺序。
就原子内存顺序而言,相关线程的完成与此函数的返回同步。换句话说,该线程执行的所有操作都发生在join返回之后的所有操作之前。
最后(的事情/结果)
在正常情况下,普通的网络工程师不会强行使用Ordering和fence,而是经常以原子操作的形式使用Atomic。实际上,Mutex可以实现相同的功能,所以即使通过使用Mutex对代码进行微秒级别的优化,用户甚至是同一家公司的人员也不会注意到这个变化。
此外,如果是原作者自己进行维护的话,就没有问题。但是,当其他人需要维护需要使用SeqCst级别的代码时,在某些情况下反而可能会引入错误。当你关注一些重要的高速库的git时,你会经常看到有关Ordering的讨论。
使用Atomic/CAS的原因与之前提到的计数器示例相同,因为它的速度非常快。虽然有点过时,但它的速度在这个图表上是以Atomic/CAS的速度来衡量的,而Mutex和消息队列/传递的速度则是慢100倍水平,因此没有在此图表中显示。
那些生活在微秒和纳秒世界中的人们通常会使用库、游戏和数据处理等。如果用普通的Mutex来编写追求速度的库,可能会收到其他人提出用Atomic/fence替代的pull request。
我对Go并不十分熟悉,但从稍微搜索的结果来看
不要通过共享内存进行沟通,而是通过沟通来共享内存。
因为这是一个常见的、倾向于有主见的政策,所以似乎不会进行直接操作。抱歉。
在之前的解释中我提到了不经常使用的事情,但人生实际上是相当漫长的。即使是普通的网络工程师,迟早都会面临这样的抉择:要么以高速且不花费大量金钱的方式处理海量的任务,否则公司将会倒闭,或者步入缓慢的死亡之路。
在面对这样困难的情况时,深呼吸后,如果没自信的话,就用Mutex或ReaderWriterLock来实现长时间的锁定,以编写安全的代码。与偶尔出现奇怪的错误相比,这样做更好。