尝试使用适用于Oracle Database 21c的高性能队列技术”Transactional Event Queues (TEQ)”

交易事件队列(TEQ)是什么?

TEQ 的主要特点是以表格形式管理队列,并且继承了 Oracle Database 传统上具备的数据一致性、性能和可用性特点,可以应用于排队功能。此外,由于支持 Kafka 的 Java 客户端,可以将 Kafka 的机制替换为 TEQ。

除了其他要点外,我从手册中摘取了一些内容。包括事务事件队列和高级排队的概述。

    • Oracle Transactional Event Queues (TEQ)は、データベース統合型のメッセージ・キューイング機能

 

    • キューごとに複数のイベント・ストリームを保持する高性能のパーティション化された実装

 

    • TEQはデータベース表に実装されるため、運用上の利点である高可用性、スケーラビリティ、および信頼性のすべてがキュー・データにも当てはまる

 

    • 標準のデータベース機能(リカバリ、再起動、セキュリティなど)がサポート

 

    • のデータベース表と同様に、キュー表もインポートおよびエクスポート可能

 

    • 高可用性に対応するためにOracle Data Guardをサポート

 

    • エンキューおよびデキューは、データベース内にあることにより、分散トランザクションを必要とせずにデータベース・トランザクションに組み入れ可能

 

    • 標準のSQLを使用してメッセージを問い合せ可能

 

    利用可能なすべてのSQLテクノロジ(インメモリー・ラッチや表索引など)が使用可能

我尝试通过PL/SQL通过TEQ进行入队和出队操作

首先,创建一个用于TEQ的用户,并授予权限。

-- ユーザ作成
CREATE USER tequsr IDENTIFIED BY tequsr ;

-- 権限付与
GRANT CREATE SESSION TO tequsr;
GRANT RESOURCE TO tequsr;
GRANT UNLIMITED TABLESPACE TO tequsr;
GRANT AQ_USER_ROLE TO tequsr;
GRANT EXECUTE ON DBMS_AQ TO tequsr;
GRANT EXECUTE ON DBMS_AQADM TO tequsr;
GRANT EXECUTE ON DBMS_AQIN TO tequsr;

使用创建的用户登录数据库,并创建TEQ。
本次将创建用于存储数据库对象的队列。
有关每个过程参数,请参阅下面的手册。
CREATE_TRANSACTIONAL_EVENT_QUEUE过程。

CREATE type Message_type as object (subject VARCHAR2(30), text VARCHAR2(80));
/

BEGIN
 DBMS_AQADM.CREATE_TRANSACTIONAL_EVENT_QUEUE(
     queue_name         =>'objType_TEQ',
     storage_clause     =>null,
     multiple_consumers =>true,
     max_retries        =>10,
     comment            =>'ObjectType for TEQ',
     queue_payload_type =>'Message_type',
     queue_properties   =>null,
     replication_mode   =>null);
 DBMS_AQADM.START_QUEUE (queue_name=> 'objType_TEQ', enqueue =>TRUE, dequeue=> True);
END;
/

在中国只需要一种选择的情况下,用中文转述如下:
选择以下类型,其中数据类型由“queue_payload_type”指定。

    • Message_type

 

    • RAW

 

    • JSON

 

    JMS

接下来创建订阅者。
添加订阅者过程。
AQ$_AGENT类型。

BEGIN
DBMS_AQADM.ADD_SUBSCRIBER(
     queue_name => 'objType_TEQ',
     subscriber => sys.aq$_agent('teqBasicObjSubscriber', null, 0),
     rule => 'correlation = ''teqBasicObjSubscriber''');
END;
/

因为已经做好了预先准备,所以现在将消息入队。
这次将尝试将时钟信息作为消息入队。
入队程序。

DECLARE
 enqueue_options     dbms_aq.enqueue_options_t;
 message_properties  dbms_aq.message_properties_t;
 message_handle      RAW(16);
 message             Message_type;
 dateinfo            VARCHAR(20);

BEGIN
 SELECT TO_CHAR(sysdate,'YYYY-MM-DD HH24:MI:SS') INTO dateinfo FROM dual;
 message := Message_type('TIME', dateinfo);
 message_properties.correlation := 'teqBasicObjSubscriber';

 DBMS_AQ.ENQUEUE(
     queue_name           => 'objType_TEQ',
     enqueue_options      => enqueue_options,
     message_properties   => message_properties,
     payload              => message,
     msgid                => message_handle);
    COMMIT;
END;
/

将消息出队。

出队程序。

SET SERVEROUTPUT ON

DECLARE
    dequeue_options     dbms_aq.dequeue_options_t;
    message_properties  dbms_aq.message_properties_t;
    message_handle      RAW(16);
    message             Message_type;

BEGIN
    dequeue_options.dequeue_mode  := DBMS_AQ.REMOVE;
    dequeue_options.wait          := DBMS_AQ.NO_WAIT;
    dequeue_options.navigation    := DBMS_AQ.FIRST_MESSAGE;
    dequeue_options.consumer_name := 'teqBasicObjSubscriber';

    DBMS_AQ.DEQUEUE(
        queue_name         => 'objType_TEQ',
        dequeue_options    => dequeue_options,
        message_properties => message_properties,
        payload            => message,
        msgid              => message_handle);

    DBMS_OUTPUT.PUT_LINE('Message: ' || message.subject || ' ... ' || message.text );
    COMMIT;
END;
/

执行上述操作后,将会在标准输出中输出以下执行结果。

Message: TIME ... 2022-04-21 09:00:24

将最新创建的TEQ相关对象删除并恢复到原来的状态。

-- subscriberの削除
BEGIN
DBMS_AQADM.REMOVE_SUBSCRIBER(
     queue_name => 'objType_TEQ',
     subscriber => sys.aq$_agent('teqBasicObjSubscriber', null ,0));
END;
/

-- キューの停止
Execute DBMS_AQADM.STOP_QUEUE (queue_name => 'objType_TEQ');

-- キューの削除
Execute DBMS_AQADM.DROP_TRANSACTIONAL_EVENT_QUEUE(queue_name =>'objType_TEQ', force=> TRUE);

-- 検証用に作成したオブジェクト・タイプ「Message_type」の削除
DROP TYPE Message_type;

总结 jié)

我已经介绍了TEQ的概述和简单的使用步骤。
如果您已经在使用Oracle Database,您无需引入其他产品即可引入队列。
此外,TEQ可以替代分布式事务机制,使体系结构更加简单。
TEQ也可以在免费的Oracle Database 21c XE中使用,所以请随意试用!

广告
将在 10 秒后关闭
bannerAds