Java的同步器使用方法备忘录
同步器是什么。
使用自身的状态来控制线程进程的对象被称为同步器。
假设存在以下实现方法。
package synchronizer;
import java.util.concurrent.TimeUnit;
public class SynchronizerTest {
public static void main(String[] args) {
runThread(() -> {
System.out.println("Thread 1 start");
System.out.println("Thread 1 end");
});
runThread(() -> {
System.out.println("Thread 2 start");
System.out.println("Thread 2 sleeping...");
TimeUnit.SECONDS.sleep(3);
System.out.println("Thread 2 awake");
System.out.println("Thread 2 end");
});
}
public static Thread runThread(ThrowingRunnable process) {
final Thread thread = new Thread(() -> {
try {
process.run();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
thread.start();
return thread;
}
interface ThrowingRunnable {
void run() throws Exception;
}
}
以后,我们将省略对 runThread 和 ThrowingRunnable 的描述。
-
- 2つのスレッドを起動している
-
- スレッド1は標準出力に開始と終了のメッセージを書き出しているだけ
- スレッド2は途中で3秒間のスリープを入れている
Thread 1 start
Thread 1 end
Thread 2 start
Thread 2 sleeping...
Thread 2 awake
Thread 2 end
当线程2处于睡眠状态时,线程1会正常执行,因此消息会先以线程1结束的形式被输出。在这里,试着引入一个称为CyclicBarrier的同步器,如下所示。
package synchronizer;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class SynchronizerTest {
public static void main(String[] args) {
final CyclicBarrier barrier = new CyclicBarrier(2);
runThread(() -> {
System.out.println("Thread 1 start");
barrier.await(); // ここで待機
System.out.println("Thread 1 end");
});
runThread(() -> {
System.out.println("Thread 2 start");
System.out.println("Thread 2 sleeping...");
TimeUnit.SECONDS.sleep(3);
System.out.println("Thread 2 awake");
barrier.await(); // ここで待機
System.out.println("Thread 2 end");
});
}
...
}
CyclicBarrier を2で初期化してインスタンスを生成し、各スレッドの途中で await メソッドを呼び出すようにしている
Thread 1 start
Thread 2 start
Thread 2 sleeping...
Thread 2 awake
Thread 2 end
Thread 1 end
-
- 今度はスレッド1がスレッド2より後に終了している
CyclicBarrier の await メソッドは、それを呼び出したスレッドをブロックする(処理が停止する)
そして、コンストラクタで指定された数と同じ回数 await メソッドが呼ばれると、ブロックは解放されて処理が再開される
つまり、上記実装の場合は await が2回呼ばれるとブロックが解放される
したがって、スレッド1はスレッド2のスリープが終わって2回目の await が呼ばれるまで処理がブロックされていたことになる
この結果、スレッド1の終了メッセージはスレッド2のスリープ終了より後に書き出されることになった
スレッド2の終了メッセージより後になるか前になるかはタイミング次第
被称为同步器的对象,像CyclicBarrier一样,提供了控制多个线程执行的功能,利用自身的状态(在CyclicBarrier的情况下为await被调用的次数)。
使用同步器可以细致地控制多个线程的执行时机。
Java的标准API提供了以下五种同步器。
-
- CyclicBarrier
-
- CountDownLatch
-
- Phaser
-
- Semaphore
- Exchanger
在这里,我会记录这些同步器的使用方法。
信号旗
信号量(Semaphore)可用于控制分配给具有资源限制的线程的情况。
package synchronizer;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i=0; i<10; i++) {
String name = "Thread[" + i + "]";
runThread(() -> {
System.out.println(name + " start");
semaphore.acquire();
System.out.println(name + " acquire");
TimeUnit.SECONDS.sleep(1);
System.out.println(name + " release");
semaphore.release();
});
}
}
}
Semaphore を2で初期化している
acquire メソッドを呼び出した後に1秒スリープしてから release メソッドを呼ぶスレッドを同時に10個起動している
この結果は以下のようになる
Thread[4] start
Thread[0] start
Thread[0] acquire
Thread[2] start
Thread[4] acquire
Thread[7] start
Thread[3] start
Thread[9] start
Thread[6] start
Thread[8] start
Thread[1] start
Thread[5] start
Thread[4] release
Thread[0] release
Thread[1] acquire
Thread[6] acquire
Thread[6] release
Thread[1] release
Thread[2] acquire
Thread[8] acquire
Thread[2] release
Thread[8] release
Thread[3] acquire
Thread[5] acquire
Thread[3] release
Thread[5] release
Thread[7] acquire
Thread[9] acquire
Thread[7] release
Thread[9] release
如果以图表形式进行整理,可以很清晰地表达如下:
仔细观察可以发现,黄色部分(从调用Semaphore的acquire到调用release的区间)中始终最多只有两个线程。
Semaphore は、同時に利用可能なリソースの数を制御するときに利用できる
コンストラクタの引数で同時利用可能な許可の数(パーミット)を指定する
上記例では、パーミットには2を指定している
パーミットの数が余っている状態で acquire メソッドを呼ぶと、パーミットが払い出される
パーミットの数に空きがない状態で acquire メソッドを呼ぶと、パーミットが解放されるまで呼び出し元のスレッドはブロックされる
release メソッドを呼ぶと、パーミットが解放される
つまり、データベースのコネクションプールのようなことを再現できる
不引发 InterruptedException 的 acquire
package synchronizer;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) throws Exception {
final Semaphore semaphore = new Semaphore(1);
semaphore.acquire();
final Thread thread = runThread(() -> {
Thread th = Thread.currentThread();
System.out.println("[Thread] before acquireUninterruptibly (interrupted=" + th.isInterrupted() + ")");
semaphore.acquireUninterruptibly();
System.out.println("[Thread] after acquireUninterruptibly (interrupted=" + th.isInterrupted() + ")");
});
Thread.sleep(1000);
System.out.println("[main] interrupt");
thread.interrupt();
Thread.sleep(1000);
System.out.println("[main] release");
semaphore.release();
}
}
-
- パーミット数1で Semaphore を生成している
メインスレッドの方ですぐに acquire を呼んでパーミットの空きをなくしている
別スレッドを起動して、 acquireUninterruptibly でパーミットが空くのを待機する
メインスレッドの方で少し待ってから別スレッドに割り込みを行い、 Semaphore を解放する
[Thread] before acquireUninterruptibly (interrupted=false)
[main] interrupt
[main] release
[Thread] after acquireUninterruptibly (interrupted=true)
acquire メソッドは throws InterruptedException で宣言されているため、呼び出し元で InterruptedException をハンドリングしなければならない
実装例では throws Exception を宣言した ThrowingThrowable を使っているので省略できているが、実際は InterruptedException の catch が必要になる
一方で、 acquireUninterruptibly には throws 句が宣言されておらず、呼び出し元で InterruptedException のハンドリングを書かなくてもいい
パーミットに空きがない場合の挙動は acquire と同じで、呼び出し元のスレッドはブロックされる
しかし、スレッドが割り込まれてもブロックは継続される
パーミットに空きができるとブロックは解放され処理は再開される
このとき、 Thread には割り込みされたことが記録されるので、必要であれば isInterrupted などで状態を参照することでハンドリングができる
如果许可证已满,也不需要等待。
package synchronizer;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) throws Exception {
final Semaphore semaphore = new Semaphore(1);
semaphore.acquire();
runThread(() -> {
System.out.println("[thread] tryAcquire");
System.out.println("[thread] value = " + semaphore.tryAcquire());
});
System.out.println("[main] sleep");
Thread.sleep(1000);
System.out.println("[main] release");
semaphore.release();
}
}
-
- パーミット数1で Semaphore を生成している
-
- メインスレッドで即座に acquire を呼び、パーミットの空きをなくしている
-
- 別スレッドを起動して、 tryAcquire でパーミットの取得を試みている
- メインスレッドでは1秒スリープしてから、最初に取得したパーミットを解放している
[main] sleep
[thread] tryAcquire
value = false
[main] release
-
- パーミットに空きがない状態で acquire を呼んだ場合、呼び出し元のスレッドはブロックされる
-
- しかし tryAcquire の場合、パーミットに空きがない場合はすぐにメソッドが戻される
このとき、戻り値として false が返される
パーミットに空きがあり取得できた場合は true が返される
tryAcquire(long, TimeUnit) を使えば、待機時間に上限を設けることもできる
同时获取多个许可证
package synchronizer;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) throws Exception {
final Semaphore semaphore = new Semaphore(4);
for (int i=0; i<4; i++) {
int n = i+1;
runThread(() -> {
System.out.println("[" + n + "] start");
semaphore.acquire(n);
System.out.println("[" + n + "] acquire(" + n + ")");
System.out.println("[" + n + "] sleep");
Thread.sleep(100);
System.out.println("[" + n + "] awake");
System.out.println("[" + n + "] release(" + n + ")");
semaphore.release(n);
});
}
}
}
-
- パーミット数4で Semaphore を生成している
- 4つのスレッドを起動し、それぞれパーミット数1,2,3,4で acquire と release を行うようにしている
[3] start
[4] start
[1] start
[2] start
[3] acquire(3)
[1] acquire(1)
[3] sleep
[1] sleep
[1] awake
[3] awake
[1] release(1)
[3] release(3)
[4] acquire(4)
[4] sleep
[4] awake
[4] release(4)
[2] acquire(2)
[2] sleep
[2] awake
[2] release(2)
以图表的形式表示如下。
acquire(int) および release(int) を使用すると、一度に複数のパーミットの取得や解放ができる
实际测试结果如下。当以超过许可数量的数进行acquire时
package synchronizer;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) throws Exception {
final Semaphore semaphore = new Semaphore(2);
System.out.println(“acquire(3)之前”);
semaphore.acquire(3);
System.out.println(“acquire(3)之后”);
}
}
执行结果
acquire(3)之前
没有出现错误,acquire将一直等待直到许可可用为3。
那么,会变成永久阻塞的状态吗?不是的,只需要执行3次release就能继续进行。
释放超过许可数量的许可
package synchronizer;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) throws Exception {
final Semaphore semaphore = new Semaphore(2);
runThread(() -> {
semaphore.release(3);
});
System.out.println(“acquire(3)之前”);
semaphore.acquire(3);
System.out.println(“acquire(3)之后”);
}
}
执行结果
acquire(3)之前
acquire(3)之后
虽然在构造函数中指定了许可数量,但并不严格检查acquire和release的次数是否超过了该值。
无论许可数量如何,都可以自由地进行任意数量的acquire和release操作。
Semaphore的Javadoc中有以下说明。
~使用Semaphore的适当方式是由应用程序的编码约定确立的。
release(int) | Semaphore (Javadoc)
这意味着,控制acquire和release调用次数以符合许可数量的责任在于应用程序的一侧。
这是我个人的解释。
获取和释放可以在不同的线程中进行。
我会被认为在已获取的线程上调用 release,但事实并非如此。
package synchronizer;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) throws Exception {
final Semaphore semaphore = new Semaphore(1);
semaphore.acquire();
runThread(() -> {
semaphore.release();
semaphore.acquire();
System.out.println("acquire!");
});
}
}
-
- パーミット数1で Semaphore を生成している
-
- メインスレッドで acquire を呼んでいる
- 別スレッドを起動し、最初に release を呼んでから acquire をしている
acquire!
release メソッドはメインスレッドとは別のスレッドで呼ばれているが、問題なくパーミットが解放されて続く acquire でパーミットを獲得できていることが分かる
Semaphore はパーミットとスレッドとの紐づきは見ておらず、単にパーミットの数だけで制御をしている
倒数计数器
CountDownLatch 是一种适用于希望在特定条件满足之前阻止线程进行的情况下使用的工具。
Latch(ラッチ)是一个日语词,意为“外撤止”或“门闩”。
package synchronizer;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest {
public static void main(String[] args) throws Exception {
final CountDownLatch latch = new CountDownLatch(3);
for (int i=0; i<3; i++) {
String name = "Thread[" + i + "]";
runThread(() -> {
System.out.println(name + " start");
Thread.sleep(500);
System.out.println(name + " end");
latch.countDown();
});
}
char[] labels = {'a', 'b', 'c'};
for (int i=0; i<3; i++) {
String name = "Thread[" + labels[i] + "]";
runThread(() -> {
System.out.println(name + " await...");
latch.await();
System.out.println(name + " end");
});
}
}
}
-
- コンストラクタで3を指定して CountDownLatch を生成している
-
- スレッド3つ(1, 2, 3)を起動し、500ミリ秒スリープしてから CountDownLatch の countDown メソッドを呼ぶ
- 別のスレッド3つ(a, b, c)を起動し、こちらは await で処理を待機させている
Thread[2] start
Thread[1] start
Thread[0] start
Thread[a] await...
Thread[b] await...
Thread[c] await...
Thread[2] end
Thread[1] end
Thread[0] end
Thread[a] end
Thread[b] end
Thread[c] end
-
- 後に起動した3つのスレッド(a, b, c)は、先に起動した3つのスレッド(1, 2, 3)の処理が終了するまで待機していることが分かる
CountDownLatch は、まずコンストラクタでカウントの数を指定する
内部のカウンターがこの値で初期化される
countDown メソッドを呼ぶとカウントが1つ減る
countDown は待機させられることはなく、すぐ呼び出し元に処理が戻される
カウントの初期値以上の回数呼び出しても、例外になることはない
await メソッドを呼ぶと、カウントが0になるまでスレッドがブロックされる
これにより、いくつかのスレッドの処理が全て終わるまで処理を待機させる、みたいな制御が実現できる
CountDownLatch は一度カウントが減ると元に戻すことはできないので、再利用ができない
インスタンスを作り直す必要がある
循环屏障
CyclicBarrier 可用于需要同步多个线程的场合。
package synchronizer;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
public static void main(String[] args) {
final CyclicBarrier barrier = new CyclicBarrier(3);
for (int i=0; i<3; i++) {
String name = "Thread[" + i + "]";
int time = 100 * (i+1);
runThread(() -> {
System.out.println(name + " start");
System.out.println(name + " sleep " + time + "ms");
Thread.sleep(time);
System.out.println(name + " awake");
barrier.await();
System.out.println(name + " end");
});
}
}
}
-
- コンストラクタで3を指定して CyclicBarrier を生成している
- 3つのスレッドを起動し、それぞれ少しスリープしてから CyclicBarrier の await を呼んで待機している
Thread[2] start
Thread[0] start
Thread[1] start
Thread[2] sleep 300ms
Thread[1] sleep 200ms
Thread[0] sleep 100ms
Thread[0] awake
Thread[1] awake
Thread[2] awake
Thread[0] end
Thread[1] end
Thread[2] end
Thread[0] や Thread[1] は Thread[2] より短い時間だけスリープしているので、普通なら先にスレッドが終了するはずが、実際には Thread[2] の終了を待機しているのが分かる
CyclicBarrier を使うと、複数のスレッドの処理があるところまで揃うのを待機させるような制御ができるようになる
コンストラクタで足並みを揃えたいスレッド(パーティ)の数を指定する
await メソッドを呼ぶと、 await を呼んだスレッドの数がパーティ数に達するまで呼び出し元のスレッドがブロックされる
上記例ではパーティ数3で初期化しているので、3つのスレッドから await が呼ばれるまで処理がブロックされる
await の呼び出し回数がパーティ数に達すると、ブロックが解放されてそれまで待機していたスレッドの処理が一斉に再開される
进行再利用
package synchronizer;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
public static void main(String[] args) throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(2);
program(barrier);
System.out.println("=====");
program(barrier);
}
private static void program(CyclicBarrier barrier) throws Exception {
runThread(() -> {
System.out.println("Thread sleep");
Thread.sleep(500);
System.out.println("Thread await");
barrier.await();
});
System.out.println("Main await");
barrier.await();
System.out.println("Main restart");
}
}
-
- パーティ数2で CyclicBarrier を生成している
-
- 別スレッドを起動して少しスリープしてから await を呼び出している
-
- メインスレッドではすぐに await を呼んで、別スレッドの終了を待機している
-
- この処理を2回繰り返している
CyclicBarrier は、2回とも同じインスタンスを利用している
Main await
Thread sleep
Thread await
Main restart
=====
Main await
Thread sleep
Thread await
Main restart
await の呼び出し回数がパーティ数に達すると、 CyclicBarrier の状態は初期状態に戻る
このため、 CyclicBarrier はインスタンスの再利用ができる
名前に cyclic (循環式) がついている理由
当等待值达到指定的数量时,插入处理程序。
package synchronizer;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
public static void main(String[] args) throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(4, () -> {
String name = Thread.currentThread().getName();
System.out.println("[" + name + "] Tripped!");
});
for (int i=0; i<4; i++) {
int time = 100 * (i+1);
runThread(() -> {
String name = Thread.currentThread().getName();
System.out.println("[" + name + "] sleep " + time + "ms");
Thread.sleep(time);
System.out.println("[" + name + "] await");
barrier.await();
});
}
}
}
-
- パーティ数4で CyclicBarrier を生成している
-
- コンストラクタの第二引数に Runnable を渡している
- スレッドを4つ生成し、それぞれ少しスリープしてから await で待機している
[Thread-3] sleep 400ms
[Thread-1] sleep 200ms
[Thread-0] sleep 100ms
[Thread-2] sleep 300ms
[Thread-0] await
[Thread-1] await
[Thread-2] await
[Thread-3] await
[Thread-3] Tripped!
-
- コンストラクタの第二引数には Runnable を渡すことができる
-
- この Runnable は、 await の呼び出し回数がパーティ数に達して処理が再開される前にコールバックされる
処理が再開されることをトリップ(trip) と呼ぶ
また、このトリップ時に呼ばれる処理のことを バリアー・アクション と呼ぶ
トリップの処理は、トリップのトリガーとなった await の呼び出しを行ったスレッドで行われる
故障状态
package synchronizer;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
public static void main(String[] args) throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(2);
Thread th = runThread(() -> {
try {
System.out.println("Thread await");
barrier.await();
} catch (InterruptedException e) {
System.out.println("interrupted");
System.out.println("broken = " + barrier.isBroken());
}
});
Thread.sleep(500);
System.out.println("interrupt thread");
th.interrupt();
Thread.sleep(500);
try {
barrier.await();
} catch (BrokenBarrierException e) {
System.out.println("BrokenBarrierException: " + e.getMessage());
}
}
}
-
- パーティ数2で CyclicBarrier を生成している
-
- 別スレッドを起動して await で待機する
InterruptedException をキャッチして、 isBroken で CyclicBarrier の状態を確認している
メインスレッドで少し待ってから、別起動したスレッドに割り込みを行う
Thread await
interrupt thread
interrupted
broken = true
BrokenBarrierException: null
-
- 特定の条件が満たされると、 CyclicBarrier は故障状態になる
-
- 故障状態になる条件としては、以下がある
await 中にスレッドが割り込まれた場合(上記実装例のケース)
await(long, TimeUnit) での待機でタイムアウトになった場合
バリアー・アクションで例外がスローされた場合
CyclicBarrier が故障状態かどうかは、 isBroken メソッドで確認できる
故障状態の CyclicBarrier に対して await を呼ぶと、 BrokenBarrierException がスローされる
重置故障状态
package synchronizer;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class CyclicBarrierTest {
public static void main(String[] args) throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(2);
try {
barrier.await(1, TimeUnit.MICROSECONDS);
} catch (TimeoutException e) {
System.out.println("timeout");
}
System.out.println("broken = " + barrier.isBroken());
barrier.reset();
System.out.println("broken = " + barrier.isBroken());
}
}
-
- タイムアウトありで await を呼び出す
タイムアウト時間を1ミリ秒にして即座にタイムアウトさせることでバリアを故障状態にする
reset メソッドを呼び出して前後で故障状態を確認している
timeout
broken = true
broken = false
reset メソッドを呼ぶと、故障状態がクリアされて CyclicBarrier が初期状態に戻る
初期状態に戻った CyclicBarrier は再利用できる
相位调制器
使用 Phaser (フェーザー) 能够更灵活地进行与 CountDownLatch 或 CyclicBarrier 相似的控制。
特别是在线程数量需要动态变化的情况下,它特别有用。
package synchronizer;
import java.util.concurrent.Phaser;
public class PhaserTest {
public static void main(String[] args) throws Exception {
final Phaser phaser = new Phaser();
phaser.register(); // main スレッド分
runThreads(phaser, 3);
System.out.println("main await");
phaser.arriveAndAwaitAdvance();
System.out.println("main restart");
Thread.sleep(1000);
System.out.println("=====");
runThreads(phaser, 2);
System.out.println("main await");
phaser.arriveAndAwaitAdvance();
System.out.println("main restart");
}
private static void runThreads(Phaser phaser, int numberOfThreads) {
for (int i=0; i<numberOfThreads; i++) {
int n = i+1;
String name = "Thread[" + n + "]";
int time = 100 * n;
phaser.register();
runThread(() -> {
System.out.println(name + " sleep " + time + "ms");
Thread.sleep(time);
System.out.println(name + " awake");
phaser.arriveAndDeregister();
System.out.println(name + " end");
});
}
}
}
runThreads メソッドでは、引数で受け取った数だけスレッドを生成して起動している
スレッド生成前に register メソッドを呼んでいる
スレッドの中では少しスリープしてから arriveAndDeregister メソッドを呼んでいる
メインスレッドでは最初に register メソッドを1回だけ呼んでいる
その後、 runThreads メソッドを呼び出してから arriveAndAwaitAdvance メソッドを呼び出して待機している
このセットを、スレッド数を 3 -> 2 と変更して実行している
main await
Thread[2] sleep 200ms
Thread[3] sleep 300ms
Thread[1] sleep 100ms
Thread[1] awake
Thread[1] end
Thread[2] awake
Thread[2] end
Thread[3] awake
Thread[3] end
main restart
=====
main await
Thread[1] sleep 100ms
Thread[2] sleep 200ms
Thread[1] awake
Thread[1] end
Thread[2] awake
Thread[2] end
main restart
-
- メインスレッドは複数起動した別スレッドの処理が全て終了するまで待機していることがわかる
別スレッドの方は待機せずに終了している
1回目は3つのスレッドの終了を待っていたのに対して、2回目は2つのスレッドの終了を待っており、待機するスレッドの数が動的に変わっていることが分かる
Phaser的基础知识
派对的注册
Phaser は、 CyclicBarrier と同じで、まずは足並みを揃えるスレッドの数(パーティ数)を設定する必要がある
CyclicBarrier の場合はコンストラクタ引数で設定していたが、 Phaser の場合は register メソッドを使うことで動的にパーティ数を指定できる
初期のパーティ数はコンストラクタでも指定できる
final Phaser phaser = new Phaser();
phaser.register(); // main スレッド分
...
for (int i=0; i<numberOfThreads; i++) {
...
phaser.register();
register メソッドを呼ぶと、パーティ数が1つ加算される
実装例の冒頭で register を呼んでいるのは、メインスレッド分を登録している
そして、スレッドを起動するループの中では、ループの都度 register を呼ぶことで生成されるスレッド数と同じ分だけパーティを登録している
これにより、「メインスレッド+起動するスレッド数」だけのパーティが登録されたことになる
bulkRegister メソッドを使えば、一度に複数のパーティを登録することもできる
派对的到达
-
- 登録したパーティの処理が期待した場所まで到達した場合、それを Phaser に伝える必要がある
CountDownLatch の countDown, await、 CyclicBarrier の await に該当する
これを 到着(arrive) と呼ぶ
到着を伝えるメソッドはいくつかあるが、実装例では以下2つのメソッドを利用している
arriveAndDeregister
到着を Phaser に伝え、登録しているパーティ数を1つ減らす
呼び出し元のスレッドはブロックされない
arriveAndAwaitAdvance
到着を Phaser に伝え、他のパーティが全て到着するまで処理をブロックする
...
phaser.arriveAndAwaitAdvance();
System.out.println("main restart");
...
runThread(() -> {
...
phaser.arriveAndDeregister();
System.out.println(name + " end");
});
-
- 実装例では、動的に起動したスレッドは arriveAndDeregister を使うようにしていたので、スレッドの終了とともにパーティの数が1つ減るようになっている
これにより、1回目に3つのスレッドの終了を待機し終えた段階でパーティ数は12に戻るようになっている
このおかげで、2回目は改めて register を使って新しいパーティ数を設定できるようになっている
阶段
package synchronizer;
import java.util.concurrent.Phaser;
public class PhaserTest {
public static void main(String[] args) throws Exception {
final Phaser phaser = new Phaser();
phaser.register(); // main スレッド分
System.out.println("phase = " + phaser.getPhase());
phaser.arriveAndAwaitAdvance();
System.out.println("phase = " + phaser.getPhase());
runThreads(phaser, 1);
phaser.arriveAndAwaitAdvance();
System.out.println("phase = " + phaser.getPhase());
}
private static void runThreads(Phaser phaser, int numberOfThreads) {
...
}
}
phase = 0
phase = 1
Thread[1] sleep 100ms
Thread[1] awake
Thread[1] end
phase = 2
Phaser にはフェーズ(phase)と呼ばれる状態が存在する
フェーズは0から始まり、現在のフェーズで登録されているパーティが全て到着すると1つ進む(advance)
フェーズが進むと到着しているパーティ数は0にリセットされるので、 CyclicBarrier と同じように再利用ができる
フェーズ番号は Integer.MAX_VALUE に達すると0に戻る
控制到达的方法
Phaser提供了多个控制到达的方法。
整理以下是粗略区分功能差异。
arrive
YesNoNoNoNoarriveAndAwaitAdvance
YesYesNoNoNoarriveAndDeregister
YesNoYesNoNoawaitAdvance
NoYesNoNoYesawaitAdvanceInterruptibly
NoYesNoYesYes关于每种用法的详细信息,请见以下说明。
到达
arrive メソッドは、到着を通知してすぐに呼び出し元に処理を戻す
パーティの解除も行わないので、到着を通知したいだけの場合に利用する
CountDownLatch の countDown メソッドみたいな役割になる
戻り値として、現在のフェーズ番号(到着したフェーズ番号)が返される
到达并等待前进
arriveAndAwaitAdvance メソッドは、到着を通知して他のパーティが全て到着するまで処理をブロックする
ブロック中にスレッドが割り込まれても、ブロックは継続される
パーティの解除は行わない
戻り値として、現在のフェーズ番号(到着したフェーズ番号)が返される
抵达并注销
arriveAndDeregister メソッドは、到着を通知してパーティを解除したのち、すぐに呼び出し元に処理を戻す
戻り値として、現在のフェーズ番号(到着したフェーズ番号)が返される
等待推进
awaitAdvance メソッドは、到着は通知せず他のパーティが全て到着するまで処理をブロックする
ブロック中にスレッドが割り込まれても、ブロックは継続される
引数に現在のフェーズ番号を渡す必要がある
フェーズ番号が現在のフェーズ番号と一致しない場合は、すぐに処理を戻す
phaser.awaitAdvance(phaser.arriveAndDeregister()) のように、到着を通知するメソッドと組み合わせて利用することが想定されている
この場合、パーティを解除したうえでフェーズの終了を待機できるようになる
phaser.awaitAdvance(phaser.arrive()) は arriveAndAwaitAdvance() と等価
戻り値として、次のフェーズ番号が返される
可以阻塞并等待可中断的任务到达。
awaitAdvanceInterruptibly メソッドは、到着は通知せず他のパーティが到着するまで処理をブロックする
ブロック中にスレッドが割り込まれると、 InterruptedException がスローされる
引数や戻り値は awaitAdvance と同じ
タイムアウト時間を指定できるメソッドも存在する
终止状态
package synchronizer;
import java.util.concurrent.Phaser;
public class PhaserTest {
public static void main(String[] args) throws Exception {
final Phaser phaser = new Phaser();
System.out.println("terminated = " + phaser.isTerminated());
phaser.register(); // main スレッド分
System.out.println("terminated = " + phaser.isTerminated());
phaser.arriveAndAwaitAdvance();
System.out.println("terminated = " + phaser.isTerminated());
phaser.arriveAndDeregister();
System.out.println("terminated = " + phaser.isTerminated());
phaser.register();
System.out.println("terminated = " + phaser.isTerminated());
}
}
terminated = false
terminated = false
terminated = false
terminated = true
terminated = true
-
- フェーズが次に進んだときにパーティが0になっている場合、 Phaser は終了状態(termination state) になる
-
- 終了状態になると Phaser は使えなくなる
すべての同期メソッドは、待機せずにすぐに戻るようになる
register メソッドを使ってもパーティは登録できなくなる
修改达到终止状态的条件
package synchronizer;
import java.util.concurrent.Phaser;
public class PhaserTest {
public static void main(String[] args) throws Exception {
final Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("onAdvance phase=" + phase + ", registeredParties=" + registeredParties);
return false;
}
};
System.out.println("terminated = " + phaser.isTerminated());
phaser.register(); // main スレッド分
System.out.println("terminated = " + phaser.isTerminated());
phaser.arriveAndAwaitAdvance();
System.out.println("terminated = " + phaser.isTerminated());
phaser.arriveAndDeregister();
System.out.println("terminated = " + phaser.isTerminated());
phaser.register();
System.out.println("terminated = " + phaser.isTerminated());
}
}
Phaser のサブクラスで onAdvance をオーバーライドし、常に false を返すように実装している
terminated = false
terminated = false
onAdvance phase=0, registeredParties=1
terminated = false
onAdvance phase=1, registeredParties=0
terminated = false
terminated = false
onAdvance メソッドをオーバーライドして戻り値を調整することで、終了状態になる条件を任意のものに変更できる
onAdvance メソッドは、フェーズが次に進むときにコールバックされる
フェーズが変わるときに任意の処理を実行したい場合も利用できる
このメソッドが true を返すと、 Phaser は終了状態になる
デフォルトの実装では、引数で受け取る registeredParties (登録されたパーティの数)が0であれば true を返すようになっている
false を返すように実装すれば終了状態にはならない
交换者
当需要在两个线程之间同步处理并进行值的交换时,可以使用“Exchanger”。
package synchronizer;
import java.util.concurrent.Exchanger;
public class ExchangerTest {
public static void main(String[] args) throws Exception {
final Exchanger<String> exchanger = new Exchanger<>();
runThread(() -> {
System.out.println("Thread sleep");
Thread.sleep(500);
String receive = exchanger.exchange("from Thread");
System.out.println("Thread receive=" + receive);
});
System.out.println("Main exchange");
final String receive = exchanger.exchange("from Main");
System.out.println("Main receive=" + receive);
}
}
-
- メインスレッドと別スレッドの双方で exchange メソッドを呼び、戻り値を出力している
- 別スレッドの方は、冒頭で少しだけスリープしている
Main exchange
Thread sleep
Thread receive=from Main
Main receive=from Thread
-
- 別スレッドが exchange メソッドを呼ぶまでメインスレッドが待機しているのが分かる
-
- 片方が exchange の引数に渡した値が、他方の exchange の戻り値として返されていることが分かる
Exchanger を使うと、 exchange で処理を同期しつつ、任意の値を双方で渡し合うような制御ができる
各种同步器的使用区别
【スレッド数が決まっている?】---{No}--->【スレッド数が決まるまで待てる?】
| |
{Yes} |
| |
+ <-------------------{Yes}-------------------+---{No}---> [Phaser]
|
V
【スレッド数が2つで、値のやり取りが必要?】---{Yes}---> [Exchanger]
|
{No}
|
V
【限られたリソースの取り合いを制御したい?】---{Yes}---> [Semaphore]
|
{No}
|
V
【関連する全てのスレッドが所定の位置に到達するまで全てのスレッドを待機させたい?】
| |
{No} +---{Yes}---> [CyclicBarrier]
|
V
[CountDownLatch]
以下是个人观点所依据的:
-
- スレッド数が決まっていれば Phaser でできることは CyclicBarrier や CountDownLatch でもできると思うので、 Phaser を使うかどうかはスレッド数が決まるまで処理の開始を待てないような場面なのかなと思った
Exchanger と Semaphore は制御が特徴的なので、用途は限定的になるのかなと思った
CyclicBarrier と CountDownLatch の大きな違いは以下かなと思う
CyclicBarrier は、パーティに属する全てのスレッドが await を呼ぶまで、パーティに属する全てのスレッドが待機する
CountDownLatch は、 countDown が所定の回数呼ばれるまで await しているスレッドだけが待機する (countDown を呼んだスレッドは待たなくていい)
CyclicBarrier は、パーティに属する全てのスレッドが所定の場所に揃うのを待つことが目的
CountDownLatch は、待機終了のイベントが発火されるまで3スレッドを待たせることが目的
请参考
-
- Java並行処理プログラミング ―その「基盤」と「最新API」を究める― | Brian Goetz, Joshua Bloch, Doug Lea |本 | 通販 | Amazon
-
- CountDownLatch (Java SE 17 & JDK 17)
-
- CyclicBarrier (Java SE 17 & JDK 17)
-
- Semaphore (Java SE 17 & JDK 17)
-
- Phaser (Java SE 17 & JDK 17)
- Exchanger (Java SE 17 & JDK 17)
直到调用 countDown 指定的次数为止。↩