使用pg模块在nodejs中从PostgreSQL/PostGIS数据库读取数据
首先
关于之前使用的矢量瓦片创建程序,由于需要升级nodejs版本,所以需要测试主要的nodejs模块是否能在nodejs v18上运行。
我打算在这篇文章中重新总结一下关于从PostgreSQL/PostGIS读取数据和以GeoJSON序列方式写入的方法,以便在这个机会上做些笔记。
这次的练习仓库在这里:https://github.com/ubukawa/ex-postgis
此外,目前正在運行的程式是使用 nodejs v.16 運行的,但儲存庫位於以下網址:https://github.com/unvt/produce-gsc-6。這個程式是基於 @hfu 先生的開發進行的。
请注意
这次的练习重点放在使用pg模块在PostgreSQL数据库中进行访问。省略了使用子进程将数据传输到矢量瓦片转换工具以及使用better-queue进行处理数量控制的部分。
实验环境
-
- Windows 10
-
- Windows PowerShell
-
- nodejs version 18.12.1
- npm version 8.19.2
进行实验
我创建了一个代码库。如前所述,它位于 https://github.com/ubukawa/ex-postgis。
请安装所需的npm模块。
npm init -y
npm install --save config pg hjson
尝试访问Postgresql数据库
首先,我们简单地访问了在配置文件中编写的关系图层(relatiosn)。我们需要按照以下方式准备konfig.default.hjson和test001.js文件,然后执行。
const config = require('config')
const { Pool, Query } = require('pg')
// config constants
const host = config.get('host')
const port = config.get('port')
const dbUSer = config.get('dbUser')
const dbPassword = config.get('dbPassword')
const relations = config.get('relations')
let pools = {}
for (relation of relations){
const [database, schema, view] = relation.split('::')
if(!pools[database]){
pools[database] = new Pool({
host: host,
user: dbUSer,
port: port,
password: dbPassword,
database: database
})
}
pools[database].connect(async (err, client, release) => {
if (err) throw err
//let sql = `SELECT count(*) FROM ${schema}.${view} `
let sql = `SELECT * FROM ${schema}.${view} limit 1`
let res = await client.query(sql)
console.log(res.rows) //rows contains sql response
await client.end()
release()
})
}
由于geom不是文本格式,因此即使看到它也无法理解其含义,但数据已经被正确读取。
有关test001.js的要点如下:
-
- Poolでデータベースへのアクセス情報を含んだオブジェクトを作れる。
-
- Poolクラスのconnectファンクションを使ってデータベースにアクセスし、データベースからの応答(client)に対する処理を決めていく。
- connectを非同期処理でやっているので、うまくawaitを使ったりする。pgの説明分を読むと、promiseでも処理ができるそうだ。
阅读专栏名称(属性列表),提取其信息。
我将调查专栏名称,并取得相关记录。并且,我会稍微处理位置信息的geom(以St_AsGeoJSON的形式读取)并取得它。
test002.js的内容如下所示。
const config = require('config')
const { Pool, Query } = require('pg')
// config constants
const host = config.get('host')
const port = config.get('port')
const dbUSer = config.get('dbUser')
const dbPassword = config.get('dbPassword')
const relations = config.get('relations')
let pools = {}
for (relation of relations){
const [database, schema, view] = relation.split('::')
if(!pools[database]){
pools[database] = new Pool({
host: host,
user: dbUSer,
port: port,
password: dbPassword,
database: database
})
}
pools[database].connect(async (err, client) => {
if (err) throw err
//Getting the list of columns, then adjust it
let sql = `SELECT column_name FROM information_schema.columns WHERE table_schema = '${schema}' AND table_name = '${view}' ORDER BY ordinal_position`
let cols = await client.query(sql)
cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom') //choose "rows", then its colum_names are listed, and geom is removed.
//we will add filter if needed
cols.push(`ST_AsGeoJSON(${schema}.${view}.geom)`)
//console.log(`columns used: ${cols}`)
// Then, we will get feature record.
await client.query('BEGIN')
sql = `SELECT ${cols.toString()} FROM ${schema}.${view}`
cols = await client.query(sql)
console.log(cols.rows)
await client.query('COMMIT')
await client.end()
})
}
test002.js 的要点是:
-
- 最初のsqlでは地物ではなくて、information_schema.columnsにアクセスしてビューの持っている属性を調べています。
-
- client.queryで帰ってきた値のrowsをmapして、column_nameをアレーにしています。そしてそこからgeomを取り除き、ST_AsGeoJSONとして追加しています。
- そうしてとってきた属性リストを使って、client.query(`BEGIN`)をまって、地物をSELECTしていきます。ここのawait client.query(‘BEGIN’)とか、await client.query(‘COMMIT’)は、例えばここのページ( https://node-postgres.com/features/transactions )などに説明があります。
test002.js的问题是没有使用游标从Postgresql服务器获取数据,因此当数据量较大时,会一次性输出。
我想用鼠标进行操作,所以下一步我们来练习使用鼠标。
在使用SELECT之前使用CUR。
我创建了test003.js文件。重点如下所述。
-
- Postgresqlデータベースへsqlのリクエストをする前にcurというcursorを宣言しておきます。
-
- データの取り出しは、FETCH ${fetchSize} FROM curでカーソルから取り出す形となります。
-
- fetchSizeはコンフィグファイルで与えるようにします。
-
- データのとりだすためのファンクション(fetch)は別途独立して作っておき、データの取り出しと同時にGeoJSONSeqになるような整形をしておきます
実際の実装の現場では、modify.jsも使ってベクトルタイル変換用の情報(f.tippecanoe)も付与します。
われわれがPostGISから直接ST_AsMVTを使っていない理由がここです。ビューやその属性に応じて最大・最小ズームレベルの設定や、ベクトルタイルのレイヤ名を付けたい。あるいは、属性に応じて新しい属性を与えるとか、ソースとなるPostGISで調整するのではなくて、変換の途中で柔軟にベクトルタイルを作っていきたいと思っています。
connectの最後に接続を終わりにするとき、connectのrelease()だけでよいのか、client.end()にした方がよいのかよくわからない。client.end()を入れた方がデータ変換後に早くプロンプト画面に戻る気がする。
const config = require('config')
const { Pool, Query } = require('pg')
// config constants
const host = config.get('host')
const port = config.get('port')
const dbUSer = config.get('dbUser')
const dbPassword = config.get('dbPassword')
const relations = config.get('relations')
const fetchSize = config.get('fetchSize')
let pools = {}
const fetch = (client, database, view) =>{
return new Promise((resolve, reject) => {
let count = 0
let features = []
client.query(new Query(`FETCH ${fetchSize} FROM cur`))
.on('row', row => {
let f = {
type: 'Feature',
properties: row,
geometry: JSON.parse(row.st_asgeojson)
}
delete f.properties.st_asgeojson
f.properties._database = database
f.properties._view = view
count++
//f = modify(f)
if (f) features.push(f)
})
.on('error', err => {
console.error(err.stack)
reject()
})
.on('end', async () => {
for (f of features) {
try {
console.log(f)
} catch (e) {
throw e
}
}
resolve(count)
})
})
}
for (relation of relations){
var startTime = new Date()
const [database, schema, view] = relation.split('::')
if(!pools[database]){
pools[database] = new Pool({
host: host,
user: dbUSer,
port: port,
password: dbPassword,
database: database
})
}
pools[database].connect(async (err, client,release) => {
if (err) throw err
//Getting the list of columns, then adjust it
let sql = `SELECT column_name FROM information_schema.columns WHERE table_schema = '${schema}' AND table_name = '${view}' ORDER BY ordinal_position`
let cols = await client.query(sql)
cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom') //choose "rows", then its colum_names are listed, and geom is removed.
//we will add filter if needed
cols.push(`ST_AsGeoJSON(${schema}.${view}.geom)`)
//console.log(`columns used: ${cols}`)
// Then, we will get feature record.
await client.query('BEGIN')
sql = `
DECLARE cur CURSOR FOR
SELECT ${cols.toString()} FROM ${schema}.${view}`
cols = await client.query(sql)
//console.log(cols.rows)
try {
while (await fetch(client, database, view) !== 0) {}
} catch (e) { throw e }
await client.query(`COMMIT`)
//await client.end()
const endTime = new Date()
var diff = endTime.getTime() - startTime.getTime();
var workTime = diff / 1000
console.log(`workingTime for ${schema}.${view} in ${database} is ${workTime} (sec). End`)
release()
})
}
此外,我們在途中計算了時間,以了解單個層面(視圖)的讀取大小對其產生的影響。該資料包含10,510筆記錄(線性數據)。
一般而言,如果获取数据大小较小,则流量会逐渐增加;如果获取数据大小较大,则会迅速读取。目前,我在实现中将fetch大小设置为30,000。
使用modify.js对从PostgreSQL中读取的数据进行加工处理
我会创建以下的modify.js。为了简单起见,modify.js已经被简化了,但在实际实现时可以根据每个层和条件进行修改。将test003.js复制为test004.js,并在中间添加一行f = modify(f)来引用modify模块。
const preProcess = (f) => {
f.tippecanoe = {
layer: 'other',
minzoom: 15,
maxzoom: 15
}
return f
}
const postProcess = (f) => {
delete f.properties['_database']
delete f.properties['_view']
return f
}
const layerEdit = {
unmap_popp_p: f => {
f.tippecanoe = {
layer: 'testLayer1-point',
minzoom: 3,
maxzoom: 6
}
//write someting to adjust properties, if needed
return f
},
unmap_bndl25_l: f => {
f.tippecanoe = {
layer: 'testLayer2-line',
minzoom: 4,
maxzoom: 5
}
//write someting to adjust properties, if needed
return f
}
}
module.exports = (f) => {
return postProcess(layerEdit[f.properties._view](preProcess(f)))
}
将数据写入文件
在实际的实现中,从PostgreSQL中读取的信息将直接传递给矢量图转换工具tippecanoe,而不需要将其记录为中间文件。在这里,为了Downstream的练习,我们将使用fs模块将信息写入文件进行练习。将test004.js复制为test005.js然后进行编辑。在config/default.hjson中指定输出文件夹。
npm install --save fs
mkdir outText
const config = require('config')
const { Pool, Query } = require('pg')
const modify = require('./modify.js')
const fs = require('fs')
// config constants
const host = config.get('host')
const port = config.get('port')
const dbUSer = config.get('dbUser')
const dbPassword = config.get('dbPassword')
const relations = config.get('relations')
const fetchSize = config.get('fetchSize')
const outTextDir = config.get('outTextDir')
let pools = {}
const noPressureWrite = (stream, f) => {
return new Promise((res) => {
if (stream.write(`\x1e${JSON.stringify(f)}\n`)){
res()
} else {
stream.once('drain', () => {
res()
})
}
})
}
const fetch = (client, database, view, stream) =>{
return new Promise((resolve, reject) => {
let count = 0
let features = []
client.query(new Query(`FETCH ${fetchSize} FROM cur`))
.on('row', row => {
let f = {
type: 'Feature',
properties: row,
geometry: JSON.parse(row.st_asgeojson)
}
delete f.properties.st_asgeojson
f.properties._database = database
f.properties._view = view
count++
f = modify(f)
if (f) features.push(f)
})
.on('error', err => {
console.error(err.stack)
reject()
})
.on('end', async () => {
for (f of features) {
try {
//console.log(f)
await noPressureWrite(stream, f)
} catch (e) {
throw e
}
}
stream.end()
resolve(count)
})
})
}
for (relation of relations){
var startTime = new Date()
const [database, schema, view] = relation.split('::')
const stream = fs.createWriteStream(`${outTextDir}/${database}-${schema}-${view}.txt`)
if(!pools[database]){
pools[database] = new Pool({
host: host,
user: dbUSer,
port: port,
password: dbPassword,
database: database
})
}
pools[database].connect(async (err, client,release) => {
if (err) throw err
//Getting the list of columns, then adjust it
let sql = `SELECT column_name FROM information_schema.columns WHERE table_schema = '${schema}' AND table_name = '${view}' ORDER BY ordinal_position`
let cols = await client.query(sql)
cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom') //choose "rows", then its colum_names are listed, and geom is removed.
//we will add filter if needed
cols.push(`ST_AsGeoJSON(${schema}.${view}.geom)`)
//console.log(`columns used: ${cols}`)
// Then, we will get feature record.
await client.query('BEGIN')
sql = `
DECLARE cur CURSOR FOR
SELECT ${cols.toString()} FROM ${schema}.${view}`
cols = await client.query(sql)
//console.log(cols.rows)
try {
while (await fetch(client, database, view, stream) !== 0) {}
} catch (e) { throw e }
await client.query(`COMMIT`)
//await client.end()
const endTime = new Date()
var diff = endTime.getTime() - startTime.getTime();
var workTime = diff / 1000
console.log(`workingTime for ${schema}.${view} in ${database} is ${workTime} (sec). End`)
release()
})
}
总结
我在这里记录了使用npm的pg模块从PostgreSQL数据库中读取数据的方法。我还使用了一种处理由PostGIS管理的地理位置信息的方法,即St_AsGeoJSON。我们进行了从test001.js到test005.js的测试,每个都确认可以正常运行。
实际转换需要使用child_process或better-queue进行控制。此外,由于全球数据的批量转换很困难,因此还可以使用PostGIS的&&功能,按照瓦片区块逐个读取数据。学习每种技术是有效的。我希望今后能再次进行这些练习。
请提供更多上下文,以便我可以为您提供准确的汉语翻译。