JavaのBlockingQueueの例を示します。
今日はJavaのBlockingQueueについて調べてみましょう。java.util.concurrent.BlockingQueueは、要素の取得と削除時にキューが非空になるのを待ち、要素の追加時にキューに空きスペースが利用可能になるのを待つ操作をサポートするJavaのキューです。
JavaのBlockingQueue
JavaのBlockingQueueは、nullの値を受け入れず、キューにnullの値を格納しようとするとNullPointerExceptionをスローします。JavaのBlockingQueueの実装はスレッドセーフです。 すべてのキュー操作は、内部のロックまたは他の形式の並行制御を使用して原子的です。 JavaのBlockingQueueインターフェースは、Javaコレクションフレームワークの一部であり、主にプロデューサー消費者問題の実装に使用されます。 BlockingQueueでは、プロデューサーが利用可能なスペースを待つ必要や、オブジェクトが消費者に利用可能になるのを待つ必要はありません。これは、BlockingQueueの実装クラスによって処理されます。 Javaは、ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueueなど、いくつかのBlockingQueueの実装を提供しています。 BlockingQueueでプロデューサー消費者問題を実装する際には、ArrayBlockingQueueの実装を使用します。 以下に、いくつか重要なメソッドを示します。
- put(E e): This method is used to insert elements to the queue. If the queue is full, it waits for the space to be available.
- E take(): This method retrieves and remove the element from the head of the queue. If queue is empty it waits for the element to be available.
今は、JavaのBlockingQueueを使用してプロデューサーコンシューマー問題を実装しましょう。
JavaのBlockingQueueの例 – メッセージ
プロデューサーによって生成され、キューに追加される普通のJavaオブジェクトです。ペイロードまたはキューメッセージとしても呼ぶことができます。
package com.scdev.concurrency;
public class Message {
private String msg;
public Message(String str){
this.msg=str;
}
public String getMsg() {
return msg;
}
}
JavaのBlockingQueueの使用例 – 生産者
メッセージを作成し、それをキューに格納するプロデューサークラス。
package com.scdev.concurrency;
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private BlockingQueue<Message> queue;
public Producer(BlockingQueue<Message> q){
this.queue=q;
}
@Override
public void run() {
//produce messages
for(int i=0; i<100; i++){
Message msg = new Message(""+i);
try {
Thread.sleep(i);
queue.put(msg);
System.out.println("Produced "+msg.getMsg());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//adding exit message
Message msg = new Message("exit");
try {
queue.put(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Java BlockingQueueの例 – コンシューマー
Exitメッセージが受信されると処理を終了するキューからメッセージを処理するConsumerクラス。
package com.scdev.concurrency;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable{
private BlockingQueue<Message> queue;
public Consumer(BlockingQueue<Message> q){
this.queue=q;
}
@Override
public void run() {
try{
Message msg;
//consuming messages until exit message is received
while((msg = queue.take()).getMsg() !="exit"){
Thread.sleep(10);
System.out.println("Consumed "+msg.getMsg());
}
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}
JavaのBlockingQueueの例 – サービス
最終的に、プロデューサとコンシューマのためにBlockingQueueサービスを作成する必要があります。このプロデューサとコンシューマのサービスは、固定サイズのBlockingQueueを作成し、プロデューサとコンシューマの両方で共有します。このサービスはプロデューサとコンシューマのスレッドを開始して終了します。
package com.scdev.concurrency;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerService {
public static void main(String[] args) {
//Creating BlockingQueue of size 10
BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
//starting producer to produce messages in queue
new Thread(producer).start();
//starting consumer to consume messages from queue
new Thread(consumer).start();
System.out.println("Producer and Consumer has been started");
}
}
上記のJava BlockingQueueの例プログラムの出力は、以下に示されています。
Producer and Consumer has been started
Produced 0
Produced 1
Produced 2
Produced 3
Produced 4
Consumed 0
Produced 5
Consumed 1
Produced 6
Produced 7
Consumed 2
Produced 8
...
JavaのThread.sleepは、生産者と消費者がメッセージを遅延させて生成と消費を行うために使用されます。