使用Apache Pinot开始实时分析

首先

這是RetailAI冒險者冒險日曆2023的第9天文章。
昨天是@atsukish先生的「使用圖像生成AI生成的圖像能被CNN辨識嗎?」。
在這篇文章中,我們將使用Apache Pinot對樣本購買數據進行彙總並在儀表板上顯示。

Apache Pinot是什么?

首先,让我简单地解释一下 Apache Pinot 。
Apache Pinot 是一个专为实时分布式 OLAP 查询而设计的实时 OLAP 数据存储系统,具有低延迟特性。
比如,在超市中,我们可以利用 Apache Pinot 实时地汇总顾客购买数据,每当顾客购物时都会产生这些数据。
超市的购买数据十分庞大,但是 Apache Pinot 能够即时处理这些海量数据,并可以在仪表盘上以直观易懂的方式展示出来。这样一来,经营者和市场营销人员就能够实时了解销售额和顾客喜好等重要信息。
据说 Uber 也在使用这个知名的系统。

我們這次製作的東西

现在让我们使用Pinot实际进行数据汇总并将汇总结果显示在仪表盘上。
这次我们将使用样本购买数据,并尝试显示按店铺和商品部门分组的销售总额。

由于Pinot提供了适用于Go的客户端库,因此可以通过Go客户端从Pinot中提取聚合数据,并通过REST API将提取的数据返回给客户端,并在仪表板上显示。此外,仪表板将使用在RetailAI Adventurers Advent Calendar 2023的第4天文章中由@yoshitake_tatsuhiro先生介绍的streamlit。

我试试看

这里列出了详细的实施内容。

准备好容器

我打算使用docker-compose进行启动。
我会准备好所需的Pinot组件和Go客户端。

version: "3.7"
services:
  pinot-zookeeper:
    image: zookeeper:latest
    container_name: pinot-zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  pinot-controller:
    image: apachepinot/pinot:latest
    command: "StartController -zkAddress pinot-zookeeper:2181"
    container_name: pinot-controller
    restart: unless-stopped
    ports:
      - "9000:9000"
    environment:
      JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms1G -Xmx4G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xloggc:gc-pinot-controller.log"
    depends_on:
      - pinot-zookeeper
    volumes:
      - ./config:/config
      - ./data:/data
  pinot-broker:
    image: apachepinot/pinot:latest
    command: "StartBroker -zkAddress pinot-zookeeper:2181"
    restart: unless-stopped
    container_name: "pinot-broker"
    ports:
      - "8099:8099"
    environment:
      JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms4G -Xmx4G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xloggc:gc-pinot-broker.log"
    depends_on:
      - pinot-controller
    volumes:
      - ./config:/config
      - ./data:/data
  pinot-server:
    image: apachepinot/pinot:latest
    command: "StartServer -zkAddress pinot-zookeeper:2181"
    restart: unless-stopped
    container_name: "pinot-server"
    ports:
      - "8098:8098"
    environment:
      JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms4G -Xmx16G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xloggc:gc-pinot-server.log"
    depends_on:
      - pinot-broker
    volumes:
      - ./config:/config
      - ./data:/data
  app:
    image: pinot-client-go
    container_name: pinot_app
    build:
      context: .
      dockerfile: ./Dockerfile
      target: debug
    volumes:
      - .:/app
    ports:
      - 8000:8000
    restart: always
    depends_on:
      - pinot-server

准备数据和表格

本次我们选择使用离线方式将数据导入,离线方式包括处理流式数据的在线方式,如Apache Kafka,以及从存储服务如Google Cloud Storage导入的批处理方式。我们将进行示例数据的CSV文件和模式的定义。

{
  "schemaName": "sales",
  "dimensionFieldSpecs": [
    {
      "name": "date",
      "dataType": "STRING"
    },
    {
      "name": "storeCode",
      "dataType": "INT"
    },
    {
      "name": "store",
      "dataType": "STRING"
    },
    {
      "name": "divisionCode",
      "dataType": "INT"
    },
    {
      "name": "division",
      "dataType": "STRING"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "totalPrice",
      "dataType": "INT"
    }
  ]
}

我們用樣本數據包括日期、店舗代碼、店舗名稱、部門代碼、部門和銷售數量。這次我們準備了約10萬條記錄。

date,storeCode,store,divisionCode,division,totalPrice
20230814,2,Kumamoto,2,味噌,610
20230925,6,Kagoshima,4,油,436
20231027,6,Kagoshima,4,油,938
20230907,1,Saga,0,米,961
20231018,0,Fukuoka,2,味噌,997
...

在table.json文件中,可以对表格进行设置,包括副本数量、索引等。

{
  "tableName": "sales",
  "segmentsConfig": {
    "replication": "1",
    "schemaName": "sales"
  },
  "tableIndexConfig": {
    "invertedIndexColumns": [],
    "loadMode": "MMAP"
  },
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant"
  },
  "tableType": "OFFLINE",
  "metadata": {}
}

当生成、执行和推送段落时,将需要使用job-spec.yml。

executionFrameworkSpec:
  name: "standalone"
  segmentGenerationJobRunnerClassName: "org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner"
  segmentTarPushJobRunnerClassName: "org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner"
  segmentUriPushJobRunnerClassName: "org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner"
jobType: SegmentCreationAndTarPush
inputDirURI: "/data/"
includeFileNamePattern: "glob:**/*.csv"
outputDirURI: "/opt/pinot/data/sales/segments/"
overwriteOutput: true
pushJobSpec:
  pushFileNamePattern: "glob:**/*.tar.gz"
pinotFSSpecs:
  - scheme: file
    className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
  dataFormat: "csv"
  className: "org.apache.pinot.plugin.inputformat.csv.CSVRecordReader"
  configClassName: "org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig"
tableSpec:
  tableName: "sales"
  schemaURI: "http://localhost:9000/tables/sales/schema"
  tableConfigURI: "http://localhost:9000/tables/sales"
pinotClusterSpecs:
  - controllerURI: "http://localhost:9000"

开动

スクリーンショット 2023-12-08 13.21.46.png

Pinot由几个组件组成,其中之一是控制器。控制器主要负责管理每个组件,同时还提供了仪表盘的用户界面。您可以访问仪表盘,并尝试实际发送查询。

我想在仪表盘上显示数据。
详细的代码在上述的代码库中,但我准备了一个终端点,它返回以下类似的响应。

curl -X GET http://localhost:8000/sales/summaries
{"data":[{"store_code":1,"store":"Saga","division_code":1,"division":"卵","total_price":1483670},{"store_code":1,"store":"Saga","division_code":0,"division":"米","total_price":1536817},{"store_code":6,"store":"Kagoshima","division_code":3,"division":"肉","total_price":1558020},{"store_code":5,"store":"Nagasaki","division_code":4,"division":"油","total_price":1535054},{"store_code":3,"store":"Oita","division_code":5,"division":"野菜","total_price":1586730},{"store_code":4,"store":"Miyazaki","division_code":2,"division":"味噌","total_price":1537278},{"store_code":5,"store":"Nagasaki","division_code":3,"division":"肉","total_price":1563557},{"store_code":1,"store":"Saga","division_code":2,"division":"味噌","total_price":1552472},{"store_code":1,"store":"Saga","division_code":5,"division":"野菜","total_price":1534440},{"store_code":2,"store":"Kumamoto","division_code":2,"division":"味噌","total_price":1561910},{"store_code":3,"store":"Oita","division_code":4,"division":"油","total_price":1529267},{"store_code":6,"store":"Kagoshima","division_code":5,"division":"野菜","total_price":1534618},{"store_code":5,"store":"Nagasaki","division_code":5,"division":"野菜","total_price":1588164},{"store_code":2,"store":"Kumamoto","division_code":0,"division":"米","total_price":1501309},{"store_code":3,"store":"Oita","division_code":0,"division":"米","total_price":1597682},{"store_code":0,"store":"Fukuoka","division_code":3,"division":"肉","total_price":1567797},{"store_code":6,"store":"Kagoshima","division_code":4,"division":"油","total_price":1575007},{"store_code":3,"store":"Oita","division_code":1,"division":"卵","total_price":1551815},{"store_code":1,"store":"Saga","division_code":4,"division":"油","total_price":1560918},{"store_code":2,"store":"Kumamoto","division_code":1,"division":"卵","total_price":1552825},{"store_code":6,"store":"Kagoshima","division_code":0,"division":"米","total_price":1491855},{"store_code":4,"store":"Miyazaki","division_code":3,"division":"肉","total_price":1568506},{"store_code":6,"store":"Kagoshima","division_code":1,"division":"卵","total_price":1574272},{"store_code":2,"store":"Kumamoto","division_code":4,"division":"油","total_price":1569607},{"store_code":-2147483648,"store":"null","division_code":-2147483648,"division":"null","total_price":0},{"store_code":0,"store":"Fukuoka","division_code":4,"division":"油","total_price":1529407},{"store_code":4,"store":"Miyazaki","division_code":5,"division":"野菜","total_price":1542876},{"store_code":1,"store":"Saga","division_code":3,"division":"肉","total_price":1558422},{"store_code":5,"store":"Nagasaki","division_code":2,"division":"味噌","total_price":1505121},{"store_code":6,"store":"Kagoshima","division_code":2,"division":"味噌","total_price":1513347},{"store_code":0,"store":"Fukuoka","division_code":2,"division":"味噌","total_price":1513401},{"store_code":4,"store":"Miyazaki","division_code":4,"division":"油","total_price":1535423},{"store_code":0,"store":"Fukuoka","division_code":5,"division":"野菜","total_price":1593589},{"store_code":2,"store":"Kumamoto","division_code":5,"division":"野菜","total_price":1519526},{"store_code":5,"store":"Nagasaki","division_code":0,"division":"米","total_price":1476175},{"store_code":4,"store":"Miyazaki","division_code":1,"division":"卵","total_price":1537356},{"store_code":3,"store":"Oita","division_code":2,"division":"味噌","total_price":1590498},{"store_code":2,"store":"Kumamoto","division_code":3,"division":"肉","total_price":1578098},{"store_code":4,"store":"Miyazaki","division_code":0,"division":"米","total_price":1553047},{"store_code":3,"store":"Oita","division_code":3,"division":"肉","total_price":1562534},{"store_code":0,"store":"Fukuoka","division_code":0,"division":"米","total_price":1539771},{"store_code":0,"store":"Fukuoka","division_code":1,"division":"卵","total_price":1555064},{"store_code":5,"store":"Nagasaki","division_code":1,"division":"卵","total_price":1546240}]}
newplot.png

迷上瘾

在使用Pinot时,遇到了两个困扰。

①默认情况下,查询结果的限制是10个记录。

例如,当执行以下查询时,即使有100条符合条件的记录,但在Pinot中默认只返回10条结果。

select * from users (= select * from users limit 10 と同義)

因此,如果希望返回超过10个记录,请明确使用limit进行定义。

有人对为何只有10个的设定感到疑惑,他向社区提问了。据说这是因为Pinot是处理大规模数据的前提,如果不设置限制,可能会不小心获取大量数据,给服务器带来负担,为了避免这种情况发生,所以设定了限制。

在中文中,可以这样表达:通过metricFieldSpecs定义要进行计算的列。

在定义表的模式时,有一个字段 metricFieldSpecs,它用于指定用于执行汇总(如求和或计数)的列。如果不在 metricFieldSpecs 中定义,将无法正确执行汇总。

在本例中,我们想要获取按部门和店铺分组的总购买金额,因此在 metricFieldSpecs 中定义了 totalPrice。

总结

使用Apache Pinot时,从数据聚合到反映在仪表盘上变得非常简单!在零售AI中,我们每天收集并积累大量的客户购买数据和商品订单数据等数据。通过巧妙地分析和利用这些数据,我们希望能够实现我们公司设定的目标,即减少“流通中的浪费、无用和难题”!

祝大家新年快樂!

请参考

Apache Pinot的公式

广告
将在 10 秒后关闭
bannerAds