使用PostgreSQL进行批量更新插入操作

从PostgreSQL 9.5版本开始引入了ON CONFLICT特性,使得可以进行Upsert(插入或更新)操作,然而无法对多行数据进行批量操作。

[2020.08 更新]
根据评论的指示增加了补充说明。
据说可以使用在 ON CONFLICT … DO UPDATE 中使用的 EXCLUDED 子句,以便在 VALUES 中指定多行来进行批量 Upsert。(未经检验)
[补充说明结束]

似乎可以通过使用CTE而不使用ON CONFLICT来实现以下方式的批量插入或更新。

WITH
-- write the new values
n(ip,visits,clicks) AS (
  VALUES ('192.168.1.1',2,12),
         ('192.168.1.2',6,18),
         ('192.168.1.3',3,4)
),
-- update existing rows
upsert AS (
  UPDATE page_views o
  SET visits=n.visits, clicks=n.clicks
  FROM n WHERE o.ip = n.ip
  RETURNING o.ip
)
-- insert missing rows
INSERT INTO page_views (ip,visits,clicks)
SELECT n.ip, n.visits, n.clicks FROM n
WHERE n.ip NOT IN (
  SELECT ip FROM upsert
)

通过使用CartoDB可以实现更快的数据更新速度 — CARTO博客

使用Python进行批量更新插入

我使用Python + Pandas + asyncpg编写了一个将CSV文件内容直接批量更新到PostgreSQL的程序。

下面是需要注意的事项。

注意事项如下。

    • Pandas DataFrame のカラム名と PostgreSQL テーブルのカラム名が完全に一致している必要がある

 

    複合ユニークキーを持つテーブルに対応させるために最後の INSERT 部分が相関サブクエリになっているので、大きなデータに対して実行するとものすごく遅い
import asyncio
import asyncpg
import pandas as pd


async def bulk_upsert(conn, df: pd.DataFrame, table: str, key_columns: list):
    assert len(df) > 0
    assert len(key_columns) > 0
    value_columns = df.iloc[:0].drop(key_columns, axis=1).columns.to_list()
    assert len(value_columns) > 0
    n = len(df.columns)
    statements = ','.join(['(' + ','.join(['$' + str(i * n + j + 1) for j in range(n)]) + ')' for i in range(len(df))])
    query = f"""
        WITH
        -- write the new values
        n({','.join(df.columns)}) AS (VALUES {statements}),
        -- update existing rows
        upsert AS (
          UPDATE {table} AS o
          SET {', '.join([f'{col}=n.{col}' for col in value_columns])}
          FROM n
          WHERE {' AND '.join([f'o.{col} = n.{col}' for col in key_columns])}
          RETURNING {', '.join([f'o.{col}' for col in key_columns])}
        )
        -- insert missing rows
        INSERT INTO {table} ({', '.join(df.columns)})
        SELECT {', '.join([f'n.{col}' for col in df.columns])}
        FROM n
        WHERE (
          SELECT COUNT(1)
          FROM upsert AS u
          WHERE {' AND '.join([f'u.{col} = n.{col}' for col in key_columns])}
        ) = 0
        """
    values = sum([list(row) for row in df.values], [])  # flatten
    await conn.execute(query, *values)


async def main():
    conn = await asyncpg.connect('postgresql://postgres@localhost/')
    await bulk_upsert(conn, pd.read_csv('users.csv'), 'users', ['id'])
    await conn.close()


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())
广告
将在 10 秒后关闭
bannerAds