Java的同步器使用方法备忘录

同步器是什么。

使用自身的状态来控制线程进程的对象被称为同步器。

假设存在以下实现方法。

package synchronizer;

import java.util.concurrent.TimeUnit;

public class SynchronizerTest {

    public static void main(String[] args) {
        runThread(() -> {
            System.out.println("Thread 1 start");
            System.out.println("Thread 1 end");
        });

        runThread(() -> {
            System.out.println("Thread 2 start");

            System.out.println("Thread 2 sleeping...");
            TimeUnit.SECONDS.sleep(3);
            System.out.println("Thread 2 awake");

            System.out.println("Thread 2 end");
        });
    }

    public static Thread runThread(ThrowingRunnable process) {
        final Thread thread = new Thread(() -> {
            try {
                process.run();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        thread.start();
        return thread;
    }

    interface ThrowingRunnable {
        void run() throws Exception;
    }
}

以后,我们将省略对 runThread 和 ThrowingRunnable 的描述。

    • 2つのスレッドを起動している

 

    • スレッド1は標準出力に開始と終了のメッセージを書き出しているだけ

 

    スレッド2は途中で3秒間のスリープを入れている
Thread 1 start
Thread 1 end
Thread 2 start
Thread 2 sleeping...
Thread 2 awake
Thread 2 end

当线程2处于睡眠状态时,线程1会正常执行,因此消息会先以线程1结束的形式被输出。在这里,试着引入一个称为CyclicBarrier的同步器,如下所示。

package synchronizer;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class SynchronizerTest {

    public static void main(String[] args) {
        final CyclicBarrier barrier = new CyclicBarrier(2);

        runThread(() -> {
            System.out.println("Thread 1 start");
            barrier.await(); // ここで待機
            System.out.println("Thread 1 end");
        });

        runThread(() -> {
            System.out.println("Thread 2 start");

            System.out.println("Thread 2 sleeping...");
            TimeUnit.SECONDS.sleep(3);
            System.out.println("Thread 2 awake");

            barrier.await(); // ここで待機
            System.out.println("Thread 2 end");
        });
    }

    ...
}

CyclicBarrier を2で初期化してインスタンスを生成し、各スレッドの途中で await メソッドを呼び出すようにしている

Thread 1 start
Thread 2 start
Thread 2 sleeping...
Thread 2 awake
Thread 2 end
Thread 1 end
    • 今度はスレッド1がスレッド2より後に終了している

CyclicBarrier の await メソッドは、それを呼び出したスレッドをブロックする(処理が停止する)
そして、コンストラクタで指定された数と同じ回数 await メソッドが呼ばれると、ブロックは解放されて処理が再開される

つまり、上記実装の場合は await が2回呼ばれるとブロックが解放される

したがって、スレッド1はスレッド2のスリープが終わって2回目の await が呼ばれるまで処理がブロックされていたことになる
この結果、スレッド1の終了メッセージはスレッド2のスリープ終了より後に書き出されることになった

スレッド2の終了メッセージより後になるか前になるかはタイミング次第

被称为同步器的对象,像CyclicBarrier一样,提供了控制多个线程执行的功能,利用自身的状态(在CyclicBarrier的情况下为await被调用的次数)。

使用同步器可以细致地控制多个线程的执行时机。

Java的标准API提供了以下五种同步器。

    • CyclicBarrier

 

    • CountDownLatch

 

    • Phaser

 

    • Semaphore

 

    Exchanger

在这里,我会记录这些同步器的使用方法。

信号旗

信号量(Semaphore)可用于控制分配给具有资源限制的线程的情况。

package synchronizer;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreTest {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(2);

        for (int i=0; i<10; i++) {
            String name = "Thread[" + i + "]";
            runThread(() -> {
                System.out.println(name + " start");

                semaphore.acquire();
                System.out.println(name + " acquire");

                TimeUnit.SECONDS.sleep(1);

                System.out.println(name + " release");
                semaphore.release();
            });
        }
    }
}

Semaphore を2で初期化している

acquire メソッドを呼び出した後に1秒スリープしてから release メソッドを呼ぶスレッドを同時に10個起動している
この結果は以下のようになる

Thread[4] start
Thread[0] start
Thread[0] acquire
Thread[2] start
Thread[4] acquire
Thread[7] start
Thread[3] start
Thread[9] start
Thread[6] start
Thread[8] start
Thread[1] start
Thread[5] start
Thread[4] release
Thread[0] release
Thread[1] acquire
Thread[6] acquire
Thread[6] release
Thread[1] release
Thread[2] acquire
Thread[8] acquire
Thread[2] release
Thread[8] release
Thread[3] acquire
Thread[5] acquire
Thread[3] release
Thread[5] release
Thread[7] acquire
Thread[9] acquire
Thread[7] release
Thread[9] release

如果以图表形式进行整理,可以很清晰地表达如下:

image.png

仔细观察可以发现,黄色部分(从调用Semaphore的acquire到调用release的区间)中始终最多只有两个线程。

Semaphore は、同時に利用可能なリソースの数を制御するときに利用できる
コンストラクタの引数で同時利用可能な許可の数(パーミット)を指定する

上記例では、パーミットには2を指定している

パーミットの数が余っている状態で acquire メソッドを呼ぶと、パーミットが払い出される
パーミットの数に空きがない状態で acquire メソッドを呼ぶと、パーミットが解放されるまで呼び出し元のスレッドはブロックされる

release メソッドを呼ぶと、パーミットが解放される
つまり、データベースのコネクションプールのようなことを再現できる

不引发 InterruptedException 的 acquire

package synchronizer;

import java.util.concurrent.Semaphore;

public class SemaphoreTest {
    public static void main(String[] args) throws Exception {
        final Semaphore semaphore = new Semaphore(1);
        semaphore.acquire();

        final Thread thread = runThread(() -> {
            Thread th = Thread.currentThread();
            System.out.println("[Thread] before acquireUninterruptibly (interrupted=" + th.isInterrupted() + ")");
            semaphore.acquireUninterruptibly();
            System.out.println("[Thread] after acquireUninterruptibly (interrupted=" + th.isInterrupted() + ")");
        });

        Thread.sleep(1000);
        System.out.println("[main] interrupt");
        thread.interrupt();
        Thread.sleep(1000);

        System.out.println("[main] release");
        semaphore.release();
    }
}
    • パーミット数1で Semaphore を生成している

メインスレッドの方ですぐに acquire を呼んでパーミットの空きをなくしている

別スレッドを起動して、 acquireUninterruptibly でパーミットが空くのを待機する
メインスレッドの方で少し待ってから別スレッドに割り込みを行い、 Semaphore を解放する

[Thread] before acquireUninterruptibly (interrupted=false)
[main] interrupt
[main] release
[Thread] after acquireUninterruptibly (interrupted=true)

acquire メソッドは throws InterruptedException で宣言されているため、呼び出し元で InterruptedException をハンドリングしなければならない

実装例では throws Exception を宣言した ThrowingThrowable を使っているので省略できているが、実際は InterruptedException の catch が必要になる

一方で、 acquireUninterruptibly には throws 句が宣言されておらず、呼び出し元で InterruptedException のハンドリングを書かなくてもいい
パーミットに空きがない場合の挙動は acquire と同じで、呼び出し元のスレッドはブロックされる
しかし、スレッドが割り込まれてもブロックは継続される
パーミットに空きができるとブロックは解放され処理は再開される
このとき、 Thread には割り込みされたことが記録されるので、必要であれば isInterrupted などで状態を参照することでハンドリングができる

如果许可证已满,也不需要等待。

package synchronizer;

import java.util.concurrent.Semaphore;

public class SemaphoreTest {
    public static void main(String[] args) throws Exception {
        final Semaphore semaphore = new Semaphore(1);
        semaphore.acquire();

        runThread(() -> {
            System.out.println("[thread] tryAcquire");
            System.out.println("[thread] value = " + semaphore.tryAcquire());
        });

        System.out.println("[main] sleep");
        Thread.sleep(1000);
        System.out.println("[main] release");
        semaphore.release();
    }
}
    • パーミット数1で Semaphore を生成している

 

    • メインスレッドで即座に acquire を呼び、パーミットの空きをなくしている

 

    • 別スレッドを起動して、 tryAcquire でパーミットの取得を試みている

 

    メインスレッドでは1秒スリープしてから、最初に取得したパーミットを解放している
[main] sleep
[thread] tryAcquire
value = false
[main] release
    • パーミットに空きがない状態で acquire を呼んだ場合、呼び出し元のスレッドはブロックされる

 

    • しかし tryAcquire の場合、パーミットに空きがない場合はすぐにメソッドが戻される

このとき、戻り値として false が返される
パーミットに空きがあり取得できた場合は true が返される

tryAcquire(long, TimeUnit) を使えば、待機時間に上限を設けることもできる

同时获取多个许可证

package synchronizer;

import java.util.concurrent.Semaphore;

public class SemaphoreTest {
    public static void main(String[] args) throws Exception {
        final Semaphore semaphore = new Semaphore(4);

        for (int i=0; i<4; i++) {
            int n = i+1;
            runThread(() -> {
                System.out.println("[" + n + "] start");
                semaphore.acquire(n);
                System.out.println("[" + n + "] acquire(" + n + ")");
                System.out.println("[" + n + "] sleep");
                Thread.sleep(100);
                System.out.println("[" + n + "] awake");
                System.out.println("[" + n + "] release(" + n + ")");
                semaphore.release(n);
            });
        }
    }
}
    • パーミット数4で Semaphore を生成している

 

    4つのスレッドを起動し、それぞれパーミット数1,2,3,4で acquire と release を行うようにしている
[3] start
[4] start
[1] start
[2] start
[3] acquire(3)
[1] acquire(1)
[3] sleep
[1] sleep
[1] awake
[3] awake
[1] release(1)
[3] release(3)
[4] acquire(4)
[4] sleep
[4] awake
[4] release(4)
[2] acquire(2)
[2] sleep
[2] awake
[2] release(2)

以图表的形式表示如下。

image.png

acquire(int) および release(int) を使用すると、一度に複数のパーミットの取得や解放ができる

如果使用超过构造函数指定的许可数量进行acquire,会发生什么?
实际测试结果如下。当以超过许可数量的数进行acquire时
package synchronizer;

import java.util.concurrent.Semaphore;

public class SemaphoreTest {
public static void main(String[] args) throws Exception {
final Semaphore semaphore = new Semaphore(2);

System.out.println(“acquire(3)之前”);
semaphore.acquire(3);
System.out.println(“acquire(3)之后”);
}
}

执行结果
acquire(3)之前

没有出现错误,acquire将一直等待直到许可可用为3。
那么,会变成永久阻塞的状态吗?不是的,只需要执行3次release就能继续进行。

释放超过许可数量的许可
package synchronizer;

import java.util.concurrent.Semaphore;

public class SemaphoreTest {
public static void main(String[] args) throws Exception {
final Semaphore semaphore = new Semaphore(2);

runThread(() -> {
semaphore.release(3);
});

System.out.println(“acquire(3)之前”);
semaphore.acquire(3);
System.out.println(“acquire(3)之后”);
}
}

执行结果
acquire(3)之前
acquire(3)之后

虽然在构造函数中指定了许可数量,但并不严格检查acquire和release的次数是否超过了该值。
无论许可数量如何,都可以自由地进行任意数量的acquire和release操作。
Semaphore的Javadoc中有以下说明。

~使用Semaphore的适当方式是由应用程序的编码约定确立的。
release(int) | Semaphore (Javadoc)

这意味着,控制acquire和release调用次数以符合许可数量的责任在于应用程序的一侧。
这是我个人的解释。

获取和释放可以在不同的线程中进行。

我会被认为在已获取的线程上调用 release,但事实并非如此。

package synchronizer;

import java.util.concurrent.Semaphore;

public class SemaphoreTest {
    public static void main(String[] args) throws Exception {
        final Semaphore semaphore = new Semaphore(1);
        semaphore.acquire();

        runThread(() -> {
            semaphore.release();
            semaphore.acquire();
            System.out.println("acquire!");
        });
    }
}
    • パーミット数1で Semaphore を生成している

 

    • メインスレッドで acquire を呼んでいる

 

    別スレッドを起動し、最初に release を呼んでから acquire をしている
acquire!

release メソッドはメインスレッドとは別のスレッドで呼ばれているが、問題なくパーミットが解放されて続く acquire でパーミットを獲得できていることが分かる

Semaphore はパーミットとスレッドとの紐づきは見ておらず、単にパーミットの数だけで制御をしている

倒数计数器

CountDownLatch 是一种适用于希望在特定条件满足之前阻止线程进行的情况下使用的工具。

Latch(ラッチ)是一个日语词,意为“外撤止”或“门闩”。

package synchronizer;

import java.util.concurrent.CountDownLatch;

public class CountDownLatchTest {
    public static void main(String[] args) throws Exception {
        final CountDownLatch latch = new CountDownLatch(3);

        for (int i=0; i<3; i++) {
            String name = "Thread[" + i + "]";
            runThread(() -> {
                System.out.println(name + " start");
                Thread.sleep(500);
                System.out.println(name + " end");
                latch.countDown();
            });
        }

        char[] labels = {'a', 'b', 'c'};
        for (int i=0; i<3; i++) {
            String name = "Thread[" + labels[i] + "]";
            runThread(() -> {
                System.out.println(name + " await...");
                latch.await();
                System.out.println(name + " end");
            });
        }
    }
}
    • コンストラクタで3を指定して CountDownLatch を生成している

 

    • スレッド3つ(1, 2, 3)を起動し、500ミリ秒スリープしてから CountDownLatch の countDown メソッドを呼ぶ

 

    別のスレッド3つ(a, b, c)を起動し、こちらは await で処理を待機させている
Thread[2] start
Thread[1] start
Thread[0] start
Thread[a] await...
Thread[b] await...
Thread[c] await...
Thread[2] end
Thread[1] end
Thread[0] end
Thread[a] end
Thread[b] end
Thread[c] end
    • 後に起動した3つのスレッド(a, b, c)は、先に起動した3つのスレッド(1, 2, 3)の処理が終了するまで待機していることが分かる

CountDownLatch は、まずコンストラクタでカウントの数を指定する

内部のカウンターがこの値で初期化される

countDown メソッドを呼ぶとカウントが1つ減る

countDown は待機させられることはなく、すぐ呼び出し元に処理が戻される
カウントの初期値以上の回数呼び出しても、例外になることはない

await メソッドを呼ぶと、カウントが0になるまでスレッドがブロックされる
これにより、いくつかのスレッドの処理が全て終わるまで処理を待機させる、みたいな制御が実現できる

CountDownLatch は一度カウントが減ると元に戻すことはできないので、再利用ができない

インスタンスを作り直す必要がある

循环屏障

CyclicBarrier 可用于需要同步多个线程的场合。

package synchronizer;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {

    public static void main(String[] args) {
        final CyclicBarrier barrier = new CyclicBarrier(3);

        for (int i=0; i<3; i++) {
            String name = "Thread[" + i + "]";
            int time = 100 * (i+1);

            runThread(() -> {
                System.out.println(name + " start");

                System.out.println(name + " sleep " + time + "ms");
                Thread.sleep(time);
                System.out.println(name + " awake");
                barrier.await();

                System.out.println(name + " end");
            });
        }
    }
}
    • コンストラクタで3を指定して CyclicBarrier を生成している

 

    3つのスレッドを起動し、それぞれ少しスリープしてから CyclicBarrier の await を呼んで待機している
Thread[2] start
Thread[0] start
Thread[1] start
Thread[2] sleep 300ms
Thread[1] sleep 200ms
Thread[0] sleep 100ms
Thread[0] awake
Thread[1] awake
Thread[2] awake
Thread[0] end
Thread[1] end
Thread[2] end

Thread[0] や Thread[1] は Thread[2] より短い時間だけスリープしているので、普通なら先にスレッドが終了するはずが、実際には Thread[2] の終了を待機しているのが分かる

CyclicBarrier を使うと、複数のスレッドの処理があるところまで揃うのを待機させるような制御ができるようになる
コンストラクタで足並みを揃えたいスレッド(パーティ)の数を指定する

await メソッドを呼ぶと、 await を呼んだスレッドの数がパーティ数に達するまで呼び出し元のスレッドがブロックされる

上記例ではパーティ数3で初期化しているので、3つのスレッドから await が呼ばれるまで処理がブロックされる

await の呼び出し回数がパーティ数に達すると、ブロックが解放されてそれまで待機していたスレッドの処理が一斉に再開される

进行再利用

package synchronizer;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {

    public static void main(String[] args) throws Exception {
        final CyclicBarrier barrier = new CyclicBarrier(2);

        program(barrier);
        System.out.println("=====");
        program(barrier);
    }

    private static void program(CyclicBarrier barrier) throws Exception {
        runThread(() -> {
            System.out.println("Thread sleep");
            Thread.sleep(500);
            System.out.println("Thread await");
            barrier.await();
        });

        System.out.println("Main await");
        barrier.await();

        System.out.println("Main restart");
    }
}
    • パーティ数2で CyclicBarrier を生成している

 

    • 別スレッドを起動して少しスリープしてから await を呼び出している

 

    • メインスレッドではすぐに await を呼んで、別スレッドの終了を待機している

 

    • この処理を2回繰り返している

CyclicBarrier は、2回とも同じインスタンスを利用している

Main await
Thread sleep
Thread await
Main restart
=====
Main await
Thread sleep
Thread await
Main restart

await の呼び出し回数がパーティ数に達すると、 CyclicBarrier の状態は初期状態に戻る

このため、 CyclicBarrier はインスタンスの再利用ができる
名前に cyclic (循環式) がついている理由

当等待值达到指定的数量时,插入处理程序。

package synchronizer;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {

    public static void main(String[] args) throws Exception {
        final CyclicBarrier barrier = new CyclicBarrier(4, () -> {
            String name = Thread.currentThread().getName();
            System.out.println("[" + name + "] Tripped!");
        });

        for (int i=0; i<4; i++) {
            int time = 100 * (i+1);
            runThread(() -> {
                String name = Thread.currentThread().getName();
                System.out.println("[" + name + "] sleep " + time + "ms");
                Thread.sleep(time);
                System.out.println("[" + name + "] await");
                barrier.await();
            });
        }
    }
}
    • パーティ数4で CyclicBarrier を生成している

 

    • コンストラクタの第二引数に Runnable を渡している

 

    スレッドを4つ生成し、それぞれ少しスリープしてから await で待機している
[Thread-3] sleep 400ms
[Thread-1] sleep 200ms
[Thread-0] sleep 100ms
[Thread-2] sleep 300ms
[Thread-0] await
[Thread-1] await
[Thread-2] await
[Thread-3] await
[Thread-3] Tripped!
    • コンストラクタの第二引数には Runnable を渡すことができる

 

    • この Runnable は、 await の呼び出し回数がパーティ数に達して処理が再開される前にコールバックされる

処理が再開されることをトリップ(trip) と呼ぶ
また、このトリップ時に呼ばれる処理のことを バリアー・アクション と呼ぶ

トリップの処理は、トリップのトリガーとなった await の呼び出しを行ったスレッドで行われる

故障状态

package synchronizer;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {

    public static void main(String[] args) throws Exception {
        final CyclicBarrier barrier = new CyclicBarrier(2);

        Thread th = runThread(() -> {
            try {
                System.out.println("Thread await");
                barrier.await();
            } catch (InterruptedException e) {
                System.out.println("interrupted");
                System.out.println("broken = " + barrier.isBroken());
            }
        });

        Thread.sleep(500);
        System.out.println("interrupt thread");
        th.interrupt();

        Thread.sleep(500);
        try {
            barrier.await();
        } catch (BrokenBarrierException e) {
            System.out.println("BrokenBarrierException: " + e.getMessage());
        }
    }
}
    • パーティ数2で CyclicBarrier を生成している

 

    • 別スレッドを起動して await で待機する

InterruptedException をキャッチして、 isBroken で CyclicBarrier の状態を確認している

メインスレッドで少し待ってから、別起動したスレッドに割り込みを行う

Thread await
interrupt thread
interrupted
broken = true
BrokenBarrierException: null
    • 特定の条件が満たされると、 CyclicBarrier は故障状態になる

 

    • 故障状態になる条件としては、以下がある

await 中にスレッドが割り込まれた場合(上記実装例のケース)

await(long, TimeUnit) での待機でタイムアウトになった場合
バリアー・アクションで例外がスローされた場合

CyclicBarrier が故障状態かどうかは、 isBroken メソッドで確認できる
故障状態の CyclicBarrier に対して await を呼ぶと、 BrokenBarrierException がスローされる

重置故障状态

package synchronizer;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class CyclicBarrierTest {

    public static void main(String[] args) throws Exception {
        final CyclicBarrier barrier = new CyclicBarrier(2);

        try {
            barrier.await(1, TimeUnit.MICROSECONDS);
        } catch (TimeoutException e) {
            System.out.println("timeout");
        }

        System.out.println("broken = " + barrier.isBroken());
        barrier.reset();
        System.out.println("broken = " + barrier.isBroken());
    }
}
    • タイムアウトありで await を呼び出す

タイムアウト時間を1ミリ秒にして即座にタイムアウトさせることでバリアを故障状態にする

reset メソッドを呼び出して前後で故障状態を確認している

timeout
broken = true
broken = false

reset メソッドを呼ぶと、故障状態がクリアされて CyclicBarrier が初期状態に戻る
初期状態に戻った CyclicBarrier は再利用できる

相位调制器

使用 Phaser (フェーザー) 能够更灵活地进行与 CountDownLatch 或 CyclicBarrier 相似的控制。
特别是在线程数量需要动态变化的情况下,它特别有用。

package synchronizer;

import java.util.concurrent.Phaser;

public class PhaserTest {
    public static void main(String[] args) throws Exception {
        final Phaser phaser = new Phaser();

        phaser.register(); // main スレッド分

        runThreads(phaser, 3);
        System.out.println("main await");
        phaser.arriveAndAwaitAdvance();
        System.out.println("main restart");

        Thread.sleep(1000);
        System.out.println("=====");

        runThreads(phaser, 2);
        System.out.println("main await");
        phaser.arriveAndAwaitAdvance();
        System.out.println("main restart");
    }

    private static void runThreads(Phaser phaser, int numberOfThreads) {
        for (int i=0; i<numberOfThreads; i++) {
            int n = i+1;
            String name = "Thread[" + n + "]";
            int time = 100 * n;

            phaser.register();

            runThread(() -> {
                System.out.println(name + " sleep " + time + "ms");
                Thread.sleep(time);
                System.out.println(name + " awake");
                phaser.arriveAndDeregister();
                System.out.println(name + " end");
            });
        }
    }
}

runThreads メソッドでは、引数で受け取った数だけスレッドを生成して起動している

スレッド生成前に register メソッドを呼んでいる
スレッドの中では少しスリープしてから arriveAndDeregister メソッドを呼んでいる

メインスレッドでは最初に register メソッドを1回だけ呼んでいる
その後、 runThreads メソッドを呼び出してから arriveAndAwaitAdvance メソッドを呼び出して待機している

このセットを、スレッド数を 3 -> 2 と変更して実行している

main await
Thread[2] sleep 200ms
Thread[3] sleep 300ms
Thread[1] sleep 100ms
Thread[1] awake
Thread[1] end
Thread[2] awake
Thread[2] end
Thread[3] awake
Thread[3] end
main restart
=====
main await
Thread[1] sleep 100ms
Thread[2] sleep 200ms
Thread[1] awake
Thread[1] end
Thread[2] awake
Thread[2] end
main restart
    • メインスレッドは複数起動した別スレッドの処理が全て終了するまで待機していることがわかる

別スレッドの方は待機せずに終了している

1回目は3つのスレッドの終了を待っていたのに対して、2回目は2つのスレッドの終了を待っており、待機するスレッドの数が動的に変わっていることが分かる

Phaser的基础知识

派对的注册

Phaser は、 CyclicBarrier と同じで、まずは足並みを揃えるスレッドの数(パーティ数)を設定する必要がある

CyclicBarrier の場合はコンストラクタ引数で設定していたが、 Phaser の場合は register メソッドを使うことで動的にパーティ数を指定できる

初期のパーティ数はコンストラクタでも指定できる

        final Phaser phaser = new Phaser();

        phaser.register(); // main スレッド分

        ...

        for (int i=0; i<numberOfThreads; i++) {
            ...
            phaser.register();

register メソッドを呼ぶと、パーティ数が1つ加算される

実装例の冒頭で register を呼んでいるのは、メインスレッド分を登録している
そして、スレッドを起動するループの中では、ループの都度 register を呼ぶことで生成されるスレッド数と同じ分だけパーティを登録している
これにより、「メインスレッド+起動するスレッド数」だけのパーティが登録されたことになる

bulkRegister メソッドを使えば、一度に複数のパーティを登録することもできる

派对的到达

    • 登録したパーティの処理が期待した場所まで到達した場合、それを Phaser に伝える必要がある

CountDownLatch の countDown, await、 CyclicBarrier の await に該当する
これを 到着(arrive) と呼ぶ

到着を伝えるメソッドはいくつかあるが、実装例では以下2つのメソッドを利用している

arriveAndDeregister

到着を Phaser に伝え、登録しているパーティ数を1つ減らす
呼び出し元のスレッドはブロックされない

arriveAndAwaitAdvance

到着を Phaser に伝え、他のパーティが全て到着するまで処理をブロックする

        ...
        phaser.arriveAndAwaitAdvance();
        System.out.println("main restart");

        ...

            runThread(() -> {
                ...
                phaser.arriveAndDeregister();
                System.out.println(name + " end");
            });
    • 実装例では、動的に起動したスレッドは arriveAndDeregister を使うようにしていたので、スレッドの終了とともにパーティの数が1つ減るようになっている

これにより、1回目に3つのスレッドの終了を待機し終えた段階でパーティ数は12に戻るようになっている
このおかげで、2回目は改めて register を使って新しいパーティ数を設定できるようになっている

阶段

package synchronizer;

import java.util.concurrent.Phaser;

public class PhaserTest {
    public static void main(String[] args) throws Exception {
        final Phaser phaser = new Phaser();
        phaser.register(); // main スレッド分

        System.out.println("phase = " + phaser.getPhase());

        phaser.arriveAndAwaitAdvance();
        System.out.println("phase = " + phaser.getPhase());

        runThreads(phaser, 1);
        phaser.arriveAndAwaitAdvance();
        System.out.println("phase = " + phaser.getPhase());
    }

    private static void runThreads(Phaser phaser, int numberOfThreads) {
        ...
    }
}
phase = 0
phase = 1
Thread[1] sleep 100ms
Thread[1] awake
Thread[1] end
phase = 2

Phaser にはフェーズ(phase)と呼ばれる状態が存在する
フェーズは0から始まり、現在のフェーズで登録されているパーティが全て到着すると1つ進む(advance)
フェーズが進むと到着しているパーティ数は0にリセットされるので、 CyclicBarrier と同じように再利用ができる
フェーズ番号は Integer.MAX_VALUE に達すると0に戻る

控制到达的方法

Phaser提供了多个控制到达的方法。

整理以下是粗略区分功能差异。

メソッド到着通知ブロックパーティ解除IntrruptedExceptionフェーズの指定arriveYesNoNoNoNoarriveAndAwaitAdvanceYesYesNoNoNoarriveAndDeregisterYesNoYesNoNoawaitAdvanceNoYesNoNoYesawaitAdvanceInterruptiblyNoYesNoYesYes

关于每种用法的详细信息,请见以下说明。

到达

arrive メソッドは、到着を通知してすぐに呼び出し元に処理を戻す
パーティの解除も行わないので、到着を通知したいだけの場合に利用する

CountDownLatch の countDown メソッドみたいな役割になる
戻り値として、現在のフェーズ番号(到着したフェーズ番号)が返される

到达并等待前进

arriveAndAwaitAdvance メソッドは、到着を通知して他のパーティが全て到着するまで処理をブロックする
ブロック中にスレッドが割り込まれても、ブロックは継続される
パーティの解除は行わない
戻り値として、現在のフェーズ番号(到着したフェーズ番号)が返される

抵达并注销

arriveAndDeregister メソッドは、到着を通知してパーティを解除したのち、すぐに呼び出し元に処理を戻す
戻り値として、現在のフェーズ番号(到着したフェーズ番号)が返される

等待推进

awaitAdvance メソッドは、到着は通知せず他のパーティが全て到着するまで処理をブロックする
ブロック中にスレッドが割り込まれても、ブロックは継続される
引数に現在のフェーズ番号を渡す必要がある

フェーズ番号が現在のフェーズ番号と一致しない場合は、すぐに処理を戻す

phaser.awaitAdvance(phaser.arriveAndDeregister()) のように、到着を通知するメソッドと組み合わせて利用することが想定されている

この場合、パーティを解除したうえでフェーズの終了を待機できるようになる

phaser.awaitAdvance(phaser.arrive()) は arriveAndAwaitAdvance() と等価

戻り値として、次のフェーズ番号が返される

可以阻塞并等待可中断的任务到达。

awaitAdvanceInterruptibly メソッドは、到着は通知せず他のパーティが到着するまで処理をブロックする
ブロック中にスレッドが割り込まれると、 InterruptedException がスローされる
引数や戻り値は awaitAdvance と同じ

タイムアウト時間を指定できるメソッドも存在する

终止状态

package synchronizer;

import java.util.concurrent.Phaser;

public class PhaserTest {
    public static void main(String[] args) throws Exception {
        final Phaser phaser = new Phaser();

        System.out.println("terminated = " + phaser.isTerminated());

        phaser.register(); // main スレッド分

        System.out.println("terminated = " + phaser.isTerminated());

        phaser.arriveAndAwaitAdvance();

        System.out.println("terminated = " + phaser.isTerminated());

        phaser.arriveAndDeregister();

        System.out.println("terminated = " + phaser.isTerminated());

        phaser.register();

        System.out.println("terminated = " + phaser.isTerminated());
    }
}
terminated = false
terminated = false
terminated = false
terminated = true
terminated = true
    • フェーズが次に進んだときにパーティが0になっている場合、 Phaser は終了状態(termination state) になる

 

    • 終了状態になると Phaser は使えなくなる

すべての同期メソッドは、待機せずにすぐに戻るようになる

register メソッドを使ってもパーティは登録できなくなる

修改达到终止状态的条件

package synchronizer;

import java.util.concurrent.Phaser;

public class PhaserTest {
    public static void main(String[] args) throws Exception {
        final Phaser phaser = new Phaser() {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("onAdvance phase=" + phase + ", registeredParties=" + registeredParties);
                return false;
            }
        };

        System.out.println("terminated = " + phaser.isTerminated());

        phaser.register(); // main スレッド分

        System.out.println("terminated = " + phaser.isTerminated());

        phaser.arriveAndAwaitAdvance();

        System.out.println("terminated = " + phaser.isTerminated());

        phaser.arriveAndDeregister();

        System.out.println("terminated = " + phaser.isTerminated());

        phaser.register();

        System.out.println("terminated = " + phaser.isTerminated());
    }
}

Phaser のサブクラスで onAdvance をオーバーライドし、常に false を返すように実装している

terminated = false
terminated = false
onAdvance phase=0, registeredParties=1
terminated = false
onAdvance phase=1, registeredParties=0
terminated = false
terminated = false

onAdvance メソッドをオーバーライドして戻り値を調整することで、終了状態になる条件を任意のものに変更できる

onAdvance メソッドは、フェーズが次に進むときにコールバックされる

フェーズが変わるときに任意の処理を実行したい場合も利用できる

このメソッドが true を返すと、 Phaser は終了状態になる

デフォルトの実装では、引数で受け取る registeredParties (登録されたパーティの数)が0であれば true を返すようになっている

false を返すように実装すれば終了状態にはならない

交换者

当需要在两个线程之间同步处理并进行值的交换时,可以使用“Exchanger”。

package synchronizer;

import java.util.concurrent.Exchanger;

public class ExchangerTest {

    public static void main(String[] args) throws Exception {
        final Exchanger<String> exchanger = new Exchanger<>();

        runThread(() -> {
            System.out.println("Thread sleep");
            Thread.sleep(500);
            String receive = exchanger.exchange("from Thread");
            System.out.println("Thread receive=" + receive);
        });

        System.out.println("Main exchange");
        final String receive = exchanger.exchange("from Main");
        System.out.println("Main receive=" + receive);
    }
}
    • メインスレッドと別スレッドの双方で exchange メソッドを呼び、戻り値を出力している

 

    別スレッドの方は、冒頭で少しだけスリープしている
Main exchange
Thread sleep
Thread receive=from Main
Main receive=from Thread
    • 別スレッドが exchange メソッドを呼ぶまでメインスレッドが待機しているのが分かる

 

    • 片方が exchange の引数に渡した値が、他方の exchange の戻り値として返されていることが分かる

Exchanger を使うと、 exchange で処理を同期しつつ、任意の値を双方で渡し合うような制御ができる

各种同步器的使用区别

因为这仅仅是根据个人的想法描绘出来的,所以存在可能出错的情况。
【スレッド数が決まっている?】---{No}--->【スレッド数が決まるまで待てる?】
   |                                             |
 {Yes}                                           |
   |                                             |
   + <-------------------{Yes}-------------------+---{No}---> [Phaser]
   |
   V
【スレッド数が2つで、値のやり取りが必要?】---{Yes}---> [Exchanger]
   |
  {No}
   |
   V
【限られたリソースの取り合いを制御したい?】---{Yes}---> [Semaphore]
   |
  {No}
   |
   V
【関連する全てのスレッドが所定の位置に到達するまで全てのスレッドを待機させたい?】
   |                            |
  {No}                          +---{Yes}---> [CyclicBarrier]
   |
   V
 [CountDownLatch]

以下是个人观点所依据的:

    • スレッド数が決まっていれば Phaser でできることは CyclicBarrier や CountDownLatch でもできると思うので、 Phaser を使うかどうかはスレッド数が決まるまで処理の開始を待てないような場面なのかなと思った

Exchanger と Semaphore は制御が特徴的なので、用途は限定的になるのかなと思った

CyclicBarrier と CountDownLatch の大きな違いは以下かなと思う

CyclicBarrier は、パーティに属する全てのスレッドが await を呼ぶまで、パーティに属する全てのスレッドが待機する

CountDownLatch は、 countDown が所定の回数呼ばれるまで await しているスレッドだけが待機する (countDown を呼んだスレッドは待たなくていい)

CyclicBarrier は、パーティに属する全てのスレッドが所定の場所に揃うのを待つことが目的

CountDownLatch は、待機終了のイベントが発火されるまで3スレッドを待たせることが目的

请参考

    • Java並行処理プログラミング ―その「基盤」と「最新API」を究める― | Brian Goetz, Joshua Bloch, Doug Lea |本 | 通販 | Amazon

 

    • CountDownLatch (Java SE 17 & JDK 17)

 

    • CyclicBarrier (Java SE 17 & JDK 17)

 

    • Semaphore (Java SE 17 & JDK 17)

 

    • Phaser (Java SE 17 & JDK 17)

 

    Exchanger (Java SE 17 & JDK 17)
严格来说,这段文字是关于解释 acquire 和 release 不需要在同一个线程中调用,但是我理解的是维护许可数量的控制也是同一个主题。↩主线程的部分还剩下。↩

直到调用 countDown 指定的次数为止。↩

广告
将在 10 秒后关闭
bannerAds