使用Smolder在实时中充分利用电子医疗记录
实时使用Smolder、Apache Spark™和Delta Lake进行的EHR分析——Databricks博客的翻译。
在上一篇文章中,我们看到了处理电子健康记录(EHR)中获得的患者数据的两种不同工作流程。这些工作流程侧重于对EHR数据的历史批量提取。然而,在现实世界中,数据不断地被输入到EHR中。在许多重要的预测性健康分析中,如败血症预测和急诊室拥堵检测,我们需要处理流入EHR的医疗数据。
实际上,电子健康记录(EHR)中的数据是如何流入的呢?大多数情况下,存在几条路径需要通过以改善数据。在某些EHR实施中,数据首先以近实时的形式到达NoSQL风格的操作性存储。第二天,这些NoSQL存储的新数据会从操作性存储中被规范化并转移到具有维度的SQL存储中。其他EHR实施中可能使用不同的数据库实现,但数据直到成为最终的“历史”EHR记录可以用于批处理分析之前仍然会有延迟。为了实时分析医疗数据,需要访问推动式的HL7消息源或拉取式的FHIR API端点。这些消息源和端点包含各种健康信息数据。例如,入院/出院/转院(ADT)消息用于追踪患者在不同单位之间的移动时间,订购录入(ORM)用于表示分配,更改和取消患者的氧气供应或特定实验室检测等指示的消息。通过组合这些源和资源,可以获得对患者和医院的全面视图。
在这篇文章的剩余部分中,我们将使用Smolder库来实时分析EHR数据,以确定需要高级治疗的患者。首先,我们将深入研究HL7v2标准,了解其工作原理和意义。然后,我们将讨论指导Smolder开发的设计原则。最后,我们将展示如何使用Apache Spark的结构化流API和Smolder库,实时将数据加载到Delta Lake中。
处理HL7消息
HL7v2 迈向第七代
HL7是医疗健康可互操作性的国际标准组织Health Level 7的简称。该组织负责定义用于医疗健康数据效果的REST API和JSON模式的FHIR医疗健康数据交换标准,以及基于XML的医疗文档结构(CDA和C-CDA)标准,同时也维护原始的HL7v2标准。这些标准具有不同的目的。FHIR定义了在应用程序开发中有用的API端点,C-CDA定义了最佳的患者历史信息交换标准,而HL7v2是一种用于捕捉实时更新电子病历中状态和数据的消息语言。在应用程序开发中,最近FHIR备受关注,但由于HL7v2广泛应用,对于分析使用而言,HL7v2通常是最合适的标准。许多机构目前都使用HL7 feed来利用旧系统。因此,不需要等待所有系统支持FHIR,就可以深入大量数据并生成丰富的分析基础。此外,HL7 feed本身通常基于TCP/IP的数据流,因此非常适合事件驱动的流式架构。
MSH|^~\&|||||20201020150800.739+0000||ADT^A03^ADT_A03|11374301|P|2.4
EVN|A03|20170820074419-0500
PID|||d40726da-9b7a-49eb-9eeb-e406708bbb60||Heller^Keneth||||||140 Pacocha Way Suite 52^^Northampton^Massachusetts^^USA
PV1||a|^^^COOLEY DICKINSON HOSPITAL INC THE|ambulatory||||49318f80-bd8b-3fc7-a096-ac43088b0c12^THE^COOLEY||||||||||||||||||||||||||||||||||||20170820074419-05\
00
这段代码是一个HL7消息的样例。该消息是ADT_A03消息,提供了关于出院患者的信息。我们是通过开源的Synthea医疗记录模拟器生成的这些数据。
由于到目前为止已经熟悉了HL7v2,接下来我们来看一下HL7v2消息的内容。图中展示了一个多行、以竖线分隔的单个HL7v2消息。还存在XML版本的规范。FHIR是基于JSON的规范。每个消息类型和段类型的模式都在HL7v2规范中确定。每行消息段都有一个“段描述符”,该描述符表示该段的模式是什么。在解析上述ADT_A03消息时,包含有关患者ID信息的“PID”段标识符以及包含有关即将出院患者访问信息的“PV1”段。在患者ID段中,第二个字段是患者的ID,第四个字段是指定。
那我们该如何处理这些数据呢?我们曾经在许多客户使用Apache Spark的流处理功能将HL7数据导入到Databricks中提供帮助。在过去,我们见过客户将管道连接到自己的EHR与Apache Kafka™️、Kinesis、EventHubs等流服务之间。然后,我们将这些管道连接到能够生成文本数据框架的Apache Spark上。最后,我们使用手写的解析器或像HAPI这样的低级别库来解析这些文本。
这种方法在一些客户那里取得了成功,但是却需要依赖手写解析库或者像HAPI这样的库,这带来了一些问题。数据湖的一个重要优点是可以推迟流水线验证。通常,我们将其采用生态系统的原始状态,并存储在铜层中。这样一来,您就可以灵活地保持观察到的历史数据,而无需针对数据的大小和形态做出任何选择。
在处理面临业务的使用案例之前,验证和分析过去的数据是特别有用的。例如,考虑验证初级医疗提供者(PCP)领域的情况。如果发现医疗系统错误地交换了护理团队的另一个外科医生和PCP,您可能希望对所有这些记录进行过去的修改。根据设计,Smolder遵循这种场景的范例。
不同于HAPI,Smolder会将消息分解为适当的HL7消息结构,而不会对其进行以上的验证。这样可以将数据注入到银色层并查询,同时可以捕获观察到的数据。
Smolder是一个用于处理HL7的开源库,它是基于Apache Spark的设计。
我们选择以下方法来开发Smolder,旨在提供易于使用的系统,实现处理HL7v2消息的近实时延迟,并使数据科学和可视化的大规模生态系统能够访问数据。
1行のコードでHL7メッセージをデータフレームに変換: データフレームは、Pandas、R、Sparkに関係なく、データサイエンスで広く利用されており、SQLのように広くアクセスされている宣言型プログラミングフレームワークを通じて使用することができます。1行のコードでHL7メッセージをデータフレームにロードできるのであれば、HL7メッセージを取り扱うことのできる後段の場所を劇的に増やすことができます。
メッセージからデータを抽出するためにシンプルかつ宣言型のAPIを活用: HAPIのようなライブラリはHL7v2メッセージを取り扱うためのAPIを提供しますが、これらのAPIは複雑であり、非常にオブジェクト指向で、HL7v2メッセージングフォーマットに関する多くの知識を必要とします。代わりに、1行のSQLライクの関数を人々に提供できるのであれば、彼らは新しく複雑なAPIを学ぶ必要なしにHL7メッセージのデータを理解することができるようになります。
ソースに関係なしにHL7メッセージに対して一貫性のあるスキーマとセマンティクスを提供: Smolderでは、HL7v2メッセージの直接取り込みと、Apache Kafkaのようなオープンソースツールや、AWSのKinesisやAzureのEventHubsのようなクラウド固有サービスからの別のストリーミングソースからやってくるHL7v2メッセージテキストの取り込みの両方をサポートしています。ソースに関係なく、メッセージは常に同じスキーマにパーシングされます。Apache Sparkの構造化ストリーミングセマンティクスと組み合わせることで、バッチやストリーミングデータ処理を通じて同じように容易に検証実行できるポータブルかつプラットフォーム中立なコードを実現することができます。
在最終結果中,通過這種方法,Smolder 成為了一個容易學習和使用的輕量級庫,並支持對大量 HL7 消息的嚴格 SLA。然後,我們將看一下 Smolder 的 API 詳細信息以及構建醫院入院模式分析儀表板的方法。
使用Smolder进行HL7消息解析。
通过使用Apache Spark™的结构化流API,可以利用Spark SQL API的扩展来处理流数据。结合Smolder库,可以使用Smolder来批量读取原始的HL7v2消息,或者使用Smolder解析来自其他流数据源的HL7v2消息文本,将HL7v2消息加载到数据框中。例如,如果存在应加载的消息批处理,可以简单地调用hl7阅读器。
scala> val df = spark.read.format("hl7").load("path/to/hl7/messages")
df: org.apache.spark.sql.DataFrame = [message: string, segments: array<struct<id:string,fields:array>>]
返回的模式包含一个消息列,其中包含消息的标头。消息段被嵌套在segments列中,并且是一个包含两个嵌套字段的数组。对于段落,它是一个包含字符串id(例如患者识别段的PID)和段字段数组。
另外,您可以使用Smolder在原始的消息文本中。这可以在HL7消息最初到达中间源(如Kafka流)时发生。为了做到这一点,您可以使用Smolder的parse_hl7_message帮助函数。首先,从包含HL7消息文本的数据框开始。
scala> val textMessageDf = ...
textMessageDf: org.apache.spark.sql.DataFrame = [value: string]
scala> textMessageDf.show()
+--------------------+
| value|
+--------------------+
|MSH|^~\&|||||2020...|
+--------------------+
接下来,从com.databricks.labs.smolder.functions对象中导入parse_hl7_message消息,并将其应用于要解析的列。
scala> import com.databricks.labs.smolder.functions.parse_hl7_message
import com.databricks.labs.smolder.functions.parse_hl7_message
scala> val parsedDf = textMessageDf.select(parse_hl7_message($"value").as("message"))
parsedDf: org.apache.spark.sql.DataFrame = [message: struct>>>]
通过这种方式,您可以获得与HL7数据源相同的模式。
使用Smolder从HL7v2消息段中提取数据
Smolder提供了一个方便使用的模式(schema)来处理HL7消息,同时还提供了位于com.databricks.labs.smolder.functions的帮助函数(helper function)来从消息段(segment)中提取子字段(subfield)。例如,假设我们想从患者ID(PID)段的第5个字段中提取患者的姓名。我们可以使用segment_field函数来实现这一目的。
scala> import com.databricks.labs.smolder.functions.segment_field
import com.databricks.labs.smolder.functions.segment_field
scala> val nameDf = df.select(segment_field("PID", 4).alias("name"))
nameDf: org.apache.spark.sql.DataFrame = [name: string]
scala> nameDf.show()
+-------------+
| name|
+-------------+
|Heller^Keneth|
+-------------+
如果要获取患者的名字,可以使用subfield函数。
scala> import com.databricks.labs.smolder.functions.subfield
import com.databricks.labs.smolder.functions.subfield
scala> val firstNameDf = nameDf.select(subfield($"name", 1).alias("firstname"))
firstNameDf: org.apache.spark.sql.DataFrame = [firstname: string]
scala> firstNameDf.show()
+---------+
|firstname|
+---------+
| Keneth|
+---------+
ブロンズレイヤーには生のメッセージフィード(ADP、ORM、ORUのフィードごとのテーブルなど)が格納されます。
シルバーレイヤーでは後段のアプリケーション(時間経過を示す患者レコード、病院のリソースに関する集計など)で有用なテーブルにこの情報が集約されます。
ゴールドレイヤーにはアプリケーションレベルのデータ(病棟、病院レベルの専有率などによる病院混雑のアラートシステムなど)が格納されます。
为什么选择在Delta Lake上构建呢?首先,Delta是一种开放格式,可以轻松地从许多分析系统中进行访问,可以通过Apache Spark和数据仓库系统(如Synapse)来访问数据科学生态系统。此外,Delta Lake被设计为支持级联流,意味着数据可以从青铜层流式传输到银层,最终到达黄金层。此外,Delta Lake提供了许多优化表格以改善查询性能的技术方法。例如,您可能希望加快对患者ID和就诊日期等经常被查询的字段的查询速度,您可以利用之前讨论过的Z-ordering技术。Delta Lake支持多维数据聚类的Z-ordering,以提供这两种查询模式的优异性能。
让我们试着使用Smolder来开始构建健康数据湖。
在这篇文章中,我们介绍了一种名为Smolder的基于Apache 2许可证的库,用于从EHR加载患者数据。您可以从项目文档开始阅读,并创建一个fork来为代码做出贡献。如果您想了解如何存储和处理医疗数据集使用Delta Lake,可以下载一本关于处理真实世界医疗数据集的免费电子书。您也可以使用本文介绍的笔记本来启动免费试用。
Databricks 免费试用
敦睦大数据免费试用