我尝试进行了Equalum更新验证:3
这一次是有点番外篇的内容…
最近,我們在進行Equalum的推廣活動時,收到了很多關於Equalum的優勢的問題。Equalum本身也強調了「實時的虛幻」,因此往往會把話題集中在這一點上,但對於那些在相關領域有經驗的人來說,通常會給予「不太可能吧!」這樣冷靜的反應。
实际上,如果所有事情都能在CPU的内部总线或主板上完成,那么可能实现非常接近这种理想状态的世界。但从实际角度来看,您所面对的IT市场环境下,在这种化学实验室的理想状况下实现,可以说是“从某种意义上讲是不可能的”。
在下面我想用现实情况来解释Equalum的优势和优点是什么。
实时性和低延迟
基本上,计算机的处理性能建立在使用的CPU性能、内存容量和性能、内部总线性能、存储性能以及网络性能之上。当然,整合并创造高度附加值的软件的存在也非常重要,但总的来说,软件无法超越硬件性能…这也是一种命运(软件从硬件角度来看,可以说是“瓶颈的根源”)。
Equalum的方法是通过将CDC和其周边的情况偏移(如时间戳驱动)作为消息处理,以选择轻量且微小的行动来获得最大效果的策略,与许多先前和现有的相似产品有所不同。
所以选择作为“要素技术”的就是kafka和SPARK。
Equalum开发团队准确评估和判断了在这些云计算和分布式处理系统中具备实绩和声誉的解决方案的优点和缺点。前者通过Equalum自身开发的相关模块的协同效应实现了高附加值,而后者(尤其是kafka和SPARK领域一直存在的至少一次的问题)得到了解决。对于在技术上可行的事物,Equalum提供了下文提到的Exactly Once选项。
由于Equalum担任集成SQL事件消息代理的角色,我们能够实现一种机制,即时且高效地将源端生成的SQL消息传递到目标端,实现了卓越的性能。这种数据专用的消息处理与一般的查询、判断、选择结果并将其自动处理在下游方面的方法有所不同,可以说这就是Kafka作为消息处理工具的特点。
基于先进的CDC技术,实现了Exactly Once的实现
正如您所知,CDC概念的历史悠久,已经在市场上以多种方法得以实施并得到广泛应用。Equalum的优点在于如何高效、准确地处理CDC(在许多情况下,需要以人类无法直接阅读的二进制格式积累日志),这成为了处理能力的“首要优势和劣势”,而其在这一领域的应用经验基于业界顶级技术和知识,支撑着企业和组织活动的数据管道。(最近据说已被用作支持大型医疗制药公司的最先进药品开发的数据基础设施)
此外,我认为在某种意义上,这是它最大的优势,正是因为在单独的kafka环境周围,一直存在“企业级”的问题,在“端到端”的“Exactly Once”的实现方面,Equalum已经实现了通信状态的变化,以及处理状态出现问题的情况下,它可以充分利用“整体的处理协同功能”,确保用户数据的一致性。(可以通过CDC兼容的结构化数据库和通过kafka连接器进行连接来使用)各种云原生的数据解决方案正在市场上得到应用,但在企业现场,我认为电子账簿类型的数据操作仍然主要是通过结构化数据库实现的。对于这些机制,我们可以通过“利用他们标准化具备的功能”以及以“尽可能中立的外部机制”来支持高级数据利用和数据战略,这就是Equalum的潜力和附加值。
可以构建基于最新云技术的无需编程的数据通信环境。
好了,我已经详细阐述了前言,那么现在让我们考虑一下,如果要实际投入使用这些产品的性能、功能以及在实地使用的综合环境,将需要多少开发成本(包括人力和时间)呢?
无论如何,可能有人可以从零开始开发一个定制的系统,但如果要具体实施,需要有具备相应技术水平的人力配置来估算成本,另外还要考虑到在过程中发生的规格变更或新增需求等造成的额外成本,因此在许多情况下,“嗯,我们可以在运营中想办法应付一下……”可能是一个更常见的选择。
在Equalum的情况下,作为在引入和部署环境后需要注意的事项,可以通过检查环境的计算能力是否足够来应对可能超出预期的情况,并以简单地添加集群配置的方式进行处理。此外,在现场看来,它还可以进行数据流的变更和添加,无需进行Java编程,而是可以按需自助操作。最近,一些关键词如数据民主化和Data Ops开始出现,但通过利用Equalum,也许可以“意外地轻松”实现这些。我认为不仅仅是将数据存储在电子账簿上,还要考虑如何运营数据,这也是一个有趣的想法。
其实,在之前的帖子中,我曾计划写一篇关于旧MemSQL时代的kafka管道的介绍,为此我匆忙开始学习kafka的编程、结构和配置,并希望顺利地撰写文章发布。但由于现实生活和时间的牵绊,我无奈地放弃了这个计划,成为了过去的悲伤经历。
然而,数年后…
使用Equalum可以让任何人在瞬间成为高级的Kafka使用者,并且能够在不编写代码的情况下,以高速高效的方式从数据库进行CDC处理环境的搭建。在上次的失败后,得知了Equalum的存在,并经历了一番曲折后,最终通过完全依赖他人的力量实现了对上次失败的报复,一旦熟悉了这种轻松的环境后,再也回不到那个曾经对星云的热情的时期了…(苦笑)
我将介绍几个实验结果。
首先,通过能够在边缘端完整处理数据的环境(即使网络连接中断,前端仍能负责处理数据的机制),利用Equalum的CDC流式传输技术来同步数据,并在此过程中自动将数据分类到最终形式的数据库中,实现数据的落地。
实际上,我们可以在验证环境中的每个设置的店铺上,针对相应的MySQL数据库使用Python创建销售数据(类似的)的连续生成和插入。这样一来,Equalum的CDC流会立即启动,从每个店铺的数据中提取和预处理数据以进行即时同步,然后将其用于用途特定的综合数据库,并可将这种情况可视化。
尽管视频只剪切了起始部分,但通过Equalum也能够实现足够的实时分类和可视化。我认为您可以理解这一点(当然,SingleStore的I/O性能也起到了帮助作用…)。
接下来,作为本次Equalum更新验证系列的最后一项实验,我想进行CDC数据流的多层次构建实验。
过去的设想是Equalum在数据生成方的数据库上进行一些类似防火墙的工作,以一种方式来处理目标方产生的大量SQL事务对原始数据的影响,以减轻其负担。我认为这种机制几乎可以应对大部分情况,但在这个版本升级中,我们将验证是否可以正常使用的复制功能,这也是一种类似BCP的结构。
这是将最早插入数据的上段和中段的MySQL情况可视化的结果,下段的两个部分使用Equalum的CDC流式传输在第二层的MySQL中进行整合,利用了本次版本中可用的MySQL复制功能,将情况复制至最终目标的SingleStore,并进行了可视化(通过销售方向进行聚合处理并可视化)。
看起来一切顺利,达到了预期的结果。
顺便提一下,这次为了可视化所创建的Python代码如下所示。
# coding: utf-8
#
# 多段デモの経過状況可視化
#
import pymysql.cursors
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
%matplotlib
# 日本語フォントの設定
from matplotlib import rcParams
rcParams['font.family'] = 'sans-serif'
rcParams['font.sans-serif'] = ['Hiragino Maru Gothic Pro', 'Yu Gothic', 'Meirio', 'Takao', 'IPAexGothic', 'IPAPGothic', 'VL PGothic', 'Noto Sans CJK JP']
# SQL文で使う情報設定
# 各店舗データを可視化(商品名で集計表示)
SQL00 = "SELECT SUM(Units) as UU, Product FROM S0_CDC_Table GROUP BY Product ORDER BY UU DESC "
SQL01 = "SELECT SUM(Units) as UU, Product FROM S1_CDC_Table GROUP BY Product ORDER BY UU DESC "
SQL02 = "SELECT SUM(Units) as UU, Product FROM S2_CDC_Table GROUP BY Product ORDER BY UU DESC "
SQL03 = "SELECT SUM(Units) as UU, Product FROM S3_CDC_Table GROUP BY Product ORDER BY UU DESC "
# 各統括データを可視化(カテゴリ別の売り上げで集計表示)
SQL10 = "SELECT SUM(Payment) as PP , Category FROM M0_CDC_Table GROUP BY Category ORDER BY PP DESC "
SQL11 = "SELECT SUM(Payment) as PP , Category FROM M1_CDC_Table GROUP BY Category ORDER BY PP DESC "
# 共通関数の定義(可視化グラフの表示)
def chart_draw(db,cursor,sql, ax, Color, Title1,Title2, scale, offset):
i = 0
Tmp_Data = []
Data_Label = []
Data_Data = []
cursor.execute(sql)
db.commit()
for Query_Data in cursor.fetchall():
for item in Query_Data.values():
Tmp_Data.append(item)
for start in range(0, len(Tmp_Data), offset):
Data_Data.append(Tmp_Data[i]/scale)
Data_Label.append(Tmp_Data[i + 1])
i = i + offset
y_pos = np.arange(len(Data_Label))
ax.barh(y_pos, Data_Data, color = Color)
ax.set_yticks(y_pos)
ax.set_yticklabels(Data_Label)
ax.invert_yaxis() # labels read top-to-bottom
ax.set_xlabel(Title1, fontsize = 12)
ax.set_title(Title2, fontsize = 16)
try:
# 使用変数の初期化
Counter = 0 # 処理回数のカウント用
Time_Wait = 0.5 # 描画間隔の調整用
Size1 = 2 # カテゴリ・配送センター別のデータ抽出用オフセット
# 可視化処理関連の準備
fig = plt.figure(figsize=(14,8))
gs = gridspec.GridSpec(3,2)
fig.subplots_adjust(hspace=0.6, wspace=0.4)
# add_subplot()でグラフを描画する領域を追加する.引数は行,列,場所
# 酒類の受注状況 MySQL1
ax1 = fig.add_subplot(gs[0,0])
# 家電の受注状況 MySQL1
ax2 = fig.add_subplot(gs[0,1])
# 書籍の受注状況 MySQL1
ax3 = fig.add_subplot(gs[1,0])
# 雑貨の受注状況 MySQL1
ax4 = fig.add_subplot(gs[1,1])
# 複製された集約DB1の状況 SingleStore
ax5 = fig.add_subplot(gs[2,0])
# 複製された集約DB2の状況 SingleStore
ax6 = fig.add_subplot(gs[2,1])
# 1段目のMySQLとの接続
db1 = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
port=3306,
user='xxxxxxxx',
password='zzzzzzzz',
db='xxxxxxxx',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
# 3段目のSingleStoreとの接続
db3 = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
port=3306,
user='xxxxxxxx',
password='zzzzzzzz',
db='xxxxxxxx',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
# 無限ループで対応(停止はSTOPボタン(■)で行う)
while True:
with db1.cursor() as cursor1, db3.cursor() as cursor3:
# クエリ関連の初期化設定
cursor1.arraysize = 1000
cursor3.arraysize = 1000
chart_draw(db1,cursor1,SQL00, ax1, 'Green', '受注数', '酒類取り扱い状況(1段目のMySQL)', 1, Size1)
chart_draw(db1,cursor1,SQL01, ax2, 'Red', '受注数', '家電取り扱い状況(1段目のMySQL)', 1, Size1)
chart_draw(db1,cursor1,SQL02, ax3, 'Blue', '受注数', '書籍取り扱い状況(1段目のMySQL)', 1, Size1)
chart_draw(db1,cursor1,SQL03, ax4, 'Yellow', '受注数', '雑貨取り扱い状況(1段目のMySQL)', 1, Size1)
chart_draw(db3,cursor3,SQL10, ax5, 'Magenta', 'カテゴリ別の総売上(単位:千円)', '統括データベース1の状況(SingleStore)', 1000, Size1)
chart_draw(db3,cursor3,SQL11, ax6, 'Cyan', 'カテゴリ別の総売上(単位:千円)', '統括データベース2の状況(SingleStore)', 1000, Size1)
fig.tight_layout()
# 画面更新までの待ち時間(適宜調整)
plt.pause(Time_Wait)
# 表示を初期化
ax1.cla()
ax2.cla()
ax3.cla()
ax4.cla()
ax5.cla()
ax6.cla()
Counter = Counter + 1
except KeyboardInterrupt:
db1.close()
db3.close()
print('!!!!! 割り込み発生 !!!!!')
finally:
print('処理の終了')
print(str(Counter) + "回の処理を実行しました")
最终……
在迎接5G时代和各种连接-X社会到来的同时,我们认为各种数据流动也将悄然而又戏剧性地引发一种革命。此外,在这个世界中,“数据流通过程”和“操作过程”也必须具备对时代速度和变化的灵活应对能力。Equalum在几次更新验证中展示出了成为新一代数据平台的潜力,并通过与周边环境高度协作,支持实现“新型数据驱动环境”下的“新型DX”。另外,在现有的BI环境已经陷入“前进困境”或陷入瓶颈状态的情况下,我们认为通过将Equalum部署为SQL事务桥接和防火墙,能够大幅度改善数据利用和数据操作机制。
将数据流通路径控制在可持续发展的解决方案中。这将为周边和新兴数据业务创造巨大优势和潜力,并使Equalum成为您在选择”下一步”时能”根据您的需要选择”的”杀手级解决方案”,对吧?(在日本国内,似乎正在正式成立外销(SI)代理商……)
非常令人匆匆而忙碌的介绍与验证工作,但这次就先告一段落!感谢大家一直以来的陪伴,真心谢谢大家。
感谢
此实验使用Equalum公司的官方版本(V2.23)进行。
在此向提供这个宝贵机会的Equalum公司表示感谢,并请注意,如果本内容与Equalum公司官方网站上公开的内容有所不同,请以Equalum公司的信息为准。