尝试使用适用于Oracle Database 21c的高性能队列技术”Transactional Event Queues (TEQ)”
交易事件队列(TEQ)是什么?
TEQ 的主要特点是以表格形式管理队列,并且继承了 Oracle Database 传统上具备的数据一致性、性能和可用性特点,可以应用于排队功能。此外,由于支持 Kafka 的 Java 客户端,可以将 Kafka 的机制替换为 TEQ。
除了其他要点外,我从手册中摘取了一些内容。包括事务事件队列和高级排队的概述。
-
- Oracle Transactional Event Queues (TEQ)は、データベース統合型のメッセージ・キューイング機能
-
- キューごとに複数のイベント・ストリームを保持する高性能のパーティション化された実装
-
- TEQはデータベース表に実装されるため、運用上の利点である高可用性、スケーラビリティ、および信頼性のすべてがキュー・データにも当てはまる
-
- 標準のデータベース機能(リカバリ、再起動、セキュリティなど)がサポート
-
- のデータベース表と同様に、キュー表もインポートおよびエクスポート可能
-
- 高可用性に対応するためにOracle Data Guardをサポート
-
- エンキューおよびデキューは、データベース内にあることにより、分散トランザクションを必要とせずにデータベース・トランザクションに組み入れ可能
-
- 標準のSQLを使用してメッセージを問い合せ可能
- 利用可能なすべてのSQLテクノロジ(インメモリー・ラッチや表索引など)が使用可能
我尝试通过PL/SQL通过TEQ进行入队和出队操作
首先,创建一个用于TEQ的用户,并授予权限。
-- ユーザ作成
CREATE USER tequsr IDENTIFIED BY tequsr ;
-- 権限付与
GRANT CREATE SESSION TO tequsr;
GRANT RESOURCE TO tequsr;
GRANT UNLIMITED TABLESPACE TO tequsr;
GRANT AQ_USER_ROLE TO tequsr;
GRANT EXECUTE ON DBMS_AQ TO tequsr;
GRANT EXECUTE ON DBMS_AQADM TO tequsr;
GRANT EXECUTE ON DBMS_AQIN TO tequsr;
使用创建的用户登录数据库,并创建TEQ。
本次将创建用于存储数据库对象的队列。
有关每个过程参数,请参阅下面的手册。
CREATE_TRANSACTIONAL_EVENT_QUEUE过程。
CREATE type Message_type as object (subject VARCHAR2(30), text VARCHAR2(80));
/
BEGIN
DBMS_AQADM.CREATE_TRANSACTIONAL_EVENT_QUEUE(
queue_name =>'objType_TEQ',
storage_clause =>null,
multiple_consumers =>true,
max_retries =>10,
comment =>'ObjectType for TEQ',
queue_payload_type =>'Message_type',
queue_properties =>null,
replication_mode =>null);
DBMS_AQADM.START_QUEUE (queue_name=> 'objType_TEQ', enqueue =>TRUE, dequeue=> True);
END;
/
在中国只需要一种选择的情况下,用中文转述如下:
选择以下类型,其中数据类型由“queue_payload_type”指定。
-
- Message_type
-
- RAW
-
- JSON
- JMS
接下来创建订阅者。
添加订阅者过程。
AQ$_AGENT类型。
BEGIN
DBMS_AQADM.ADD_SUBSCRIBER(
queue_name => 'objType_TEQ',
subscriber => sys.aq$_agent('teqBasicObjSubscriber', null, 0),
rule => 'correlation = ''teqBasicObjSubscriber''');
END;
/
因为已经做好了预先准备,所以现在将消息入队。
这次将尝试将时钟信息作为消息入队。
入队程序。
DECLARE
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message Message_type;
dateinfo VARCHAR(20);
BEGIN
SELECT TO_CHAR(sysdate,'YYYY-MM-DD HH24:MI:SS') INTO dateinfo FROM dual;
message := Message_type('TIME', dateinfo);
message_properties.correlation := 'teqBasicObjSubscriber';
DBMS_AQ.ENQUEUE(
queue_name => 'objType_TEQ',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
COMMIT;
END;
/
将消息出队。
出队程序。
SET SERVEROUTPUT ON
DECLARE
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message Message_type;
BEGIN
dequeue_options.dequeue_mode := DBMS_AQ.REMOVE;
dequeue_options.wait := DBMS_AQ.NO_WAIT;
dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
dequeue_options.consumer_name := 'teqBasicObjSubscriber';
DBMS_AQ.DEQUEUE(
queue_name => 'objType_TEQ',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
DBMS_OUTPUT.PUT_LINE('Message: ' || message.subject || ' ... ' || message.text );
COMMIT;
END;
/
执行上述操作后,将会在标准输出中输出以下执行结果。
Message: TIME ... 2022-04-21 09:00:24
将最新创建的TEQ相关对象删除并恢复到原来的状态。
-- subscriberの削除
BEGIN
DBMS_AQADM.REMOVE_SUBSCRIBER(
queue_name => 'objType_TEQ',
subscriber => sys.aq$_agent('teqBasicObjSubscriber', null ,0));
END;
/
-- キューの停止
Execute DBMS_AQADM.STOP_QUEUE (queue_name => 'objType_TEQ');
-- キューの削除
Execute DBMS_AQADM.DROP_TRANSACTIONAL_EVENT_QUEUE(queue_name =>'objType_TEQ', force=> TRUE);
-- 検証用に作成したオブジェクト・タイプ「Message_type」の削除
DROP TYPE Message_type;
总结 jié)
我已经介绍了TEQ的概述和简单的使用步骤。
如果您已经在使用Oracle Database,您无需引入其他产品即可引入队列。
此外,TEQ可以替代分布式事务机制,使体系结构更加简单。
TEQ也可以在免费的Oracle Database 21c XE中使用,所以请随意试用!