使用ZeroMQ+NodeJS自己实现Docker容器的自动缩放功能
首先
我现在正在开发一种所谓的自助数据科学平台,类似于在常规的BI工具上添加了数据预处理和机器学习功能的产品。
UI是一个Web应用程序,但在后端需要异步运行R和Python程序,并且希望能够根据用户的负载来增加或减少计算资源。因此,我们使用Docker来执行R和Python的处理,并在Web服务端实现了对它们的简单控制功能。
构成图

-
- WebアプリケーションのサーバサイドはNodejs(Express)を使用
-
- WebサーバホストとワーカホストはVMを別にする
-
- データ処理を行う計算エンジン(Pythonで実装)はDockerコンテナ上で行う
-
- Webサーバと計算エンジンとのやり取りは、ZeroMQ上でJSONで行う
- データ本体の格納・アクセスにはサーバホストとワーカホスト両方から読み書きできる共有ファイルストレージ(NFS等)を利用
工作进程(负责使用Python进行数据处理的部分)。
构成图中写着“计算引擎”的部分是进行数据处理的工作进程。这里是一个实施选择的选项。
-
- データ処理のリクエストのたびにプロセスを生成・終了させる
- プロセスはデーモン的に稼働させたままリクエストのあるたびにデータ処理を行う
有两个选择,但我选择了后者。
我有以下的原因,即使是 Docker 容器,每个数据处理步骤都生成和删除容器可能会造成很大的负载,而且使用 ZeroMQ 可以在保持进程的同时轻松进行异步通信。
Web服务器和计算引擎之间的异步通信模型
在ZeroMQ的套接字中,有几种类型,如PUB/SUB,PUSH/PULL,REQ/REP等(如果你对它们的区别不太了解,可以在Qiita上找到一些很好的解释文章,可以从Qiita的标签上追溯一下)。
重要的文件
首先,在设计中有以下要求。
-
- コンテナ数は負荷に応じて変動するためDockerネットワーク上のIPアドレスがコンテナごとに変わる(ポートは固定しておける)
-
- ZeroMQのソケットを実行中に生成・消滅させるような設計は複雑すぎるのでダメ(予め決められた数・種類のソケットをプロセスの最初に作ってそれを使うだけで十分であるべき)
- Pythonのプロセスがコケたり手動でDockerコンテナを起動・停止したりしてもWebサーバ側がそれなりに柔軟に対応できる
为了满足这些要求,我设计了下面这个通信模型。
请用中文把以下内容改述一遍,只需提供一种选项:
套接字的构成和顺序

关于端口号,PUB/SUB绑定在5556,PUSH/PULL绑定在5557,这些是固定的。无论是Web服务器端还是容器端,都分别通过bind和connect进行连接。
我认为顺序图总体上看起来很自然,但有一个地方可能会感到不舒服,就是发送命令的部分使用了“PUB”套接字进行处理,但这正是这个设计的亮点。
在ZeroMQ中,要建立与对方的连接,当然需要指定对方的IP地址和端口。但是,根据上述要求,在容器自由增减的环境中,由于IP地址也可以自由增减,为了能够使用单播方式执行处理命令,Web服务器进程也必须为每个容器拥有自己的ZeroMQ套接字。
因此,首先在空闲的进程中的一段时间(在当前实现中为500毫秒)内进行募集(ping),工作进程会在自己的ID(即主机名或IP地址)之后进行回应(pong)。从回应中选择一个进程,并在数据处理命令的JSON中指定该ID,再像ping一样使用PUB套接字进行广播。
每个进程(包括正在执行处理的进程)都会接收到该命令,并在被分配到的情况下执行处理。在ZeroMQ的PUB/SUB模式中,SUB端可以轻松地过滤想要订阅的消息。因此,通过使用空格分隔消息字符串,将其首部的标记(主题名称)设置为工作进程ID,然后订阅两个主题:“自己”或“所有人”,即可。另外,当接收到消息时,正在执行处理的节点会在处理完成后读取该消息的命令,但因为主题不是自己的,所以可以顺利地跳过该消息。
通过在初始阶段进行500毫秒的ping等待和招募,无论是工作进程(容器)崩溃还是手动重新启动,Web服务器进程都不需要每次都去了解这些情况,这也是这个设计的关键之处。这个设计借鉴了旧式以太网LAN的CSMA(载波侦听多路访问)机制。
好的,那我们现在开始介绍实际的代码内容。
创建Docker容器
FROM python:2
WORKDIR /usr/src/app
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
build-essential \
libzmq-dev
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
RUN mkdir ./work
COPY . ./work
RUN useradd -u 1000 isobe
USER isobe
CMD [ "python", "./engine.py" ]
按照上述的Dockerfile编写需求(在requirements.txt中写入需要的库,如scikit-learn和pandas),使用docker命令创建镜像。
% docker run -d -v /var/data:/var/data my-engine
/var/data是挂载在(容器主机)虚拟机上的共享存储路径。
顺便提一下,开发过程中的一个问题是,即使用户名称相同,容器上的UserID与开发机器或虚拟机上的UserID可能不同。在这种情况下,如果在创建镜像时不使用useradd命令的-u选项来使容器内的UserID与之匹配,那么就无法在挂载的目录上进行读写操作,因为没有权限。
工作进程(Python)
import sys
import os
import json
import zmq
import subprocess
def get_container_id():
return os.uname()[1].strip()
def get_webserver_ip():
cmd = "ip route | awk 'NR==1 {print $3}'"
return subprocess.check_output(cmd,shell=True).strip()
if __name__=='__main__':
cid = get_container_id()
ip = get_webserver_ip()
context = zmq.Context()
sub = context.socket(zmq.SUB)
sub.connect('tcp://'+ip+':5556')
sub.setsockopt(zmq.SUBSCRIBE, '*')
sub.setsockopt(zmq.SUBSCRIBE, cid)
push = context.socket(zmq.PUSH)
push.connect('tcp://'+ip+':5557')
pong = json.dumps({
'cid': cid,
'type': 'pong'
})
push.send(pong)
while True:
print "wait"
s = sub.recv()
index = s.find(' ')
topic = s[:index]
msg = s[index+1:]
if msg=='ping':
push.send(pong)
continue
command = json.loads(msg)
以降、処理本体
这是将上面展示的序列图直接实施的版本。如果为Web服务器 VM 和 Worker VM 共享了 Docker 网络,那么可以使用上述 Python 代码中的 get_webserver_ip() 来简单地通过 ip route 命令获取 Web 服务器的 IP。
Web服务器在创建套接字时已经将其绑定到“*”上。(考虑到云基础设施的内部性质,更注重实现的简便性而非安全性)
在Web服务器端
下面将提供一种较长的实现方式。在nodejs中,控制Docker容器最常见的库似乎是dockerode。
就难点而言,这个JS模块只暴露了一个async方法,在使用该模块的Web服务端(Node-Express)来看,就像是一个同步的简单API,从而实现了隐藏这种复杂的异步处理(在这种情况下,Promise和ES2017的async/await非常方便)。
首先是使用此模块(Node-Express)的代码。调用工作进程(计算引擎)只需调用engine模块的方法并进行await操作。非常简单而清晰。
var express = require('express')
const router = express.Router()
var engine = require('./engine')
(中略)
router.post('/some_rest_api', async (req,res) => {
// Do some web parameter process,
// to make JSON as commands for engines.
var cmd = ...
try {
var result = await engine(cmd) // awesome!! what a beautiful implementation!!
var api_response_data = ... // some post process
res.send(api_response_data)
} catch (e) {
res.status(500)
res.send(e)
}
})
以下是engine的控制模块。
(目前只是复制粘贴了代码,但稍后我会写每个方法的说明。)
const zmq = require('zmq');
const pub = zmq.socket('pub');
const pull = zmq.socket('pull');
const moment = require('moment')
const Docker = require('dockerode')
const docker = new Docker({socketPath: '/var/run/docker.sock'});
const production = process.env.NODE_ENV=='production'
const config = {
INITIAL_WAIT: 1000,
MONITOR_INTERVAL: production ? 30000 : 10000, // milli seconds
PING_TIMEOUT: 500, // milli seconds
IDLE_LIMIT: 60, // seconds
MINIMUM_IDLE_WORKERS: production ? 3 : 1,
AUTO_SCALE: production
}
const workers = {}
const state_char = {
ready:'.',reserved:'o',running:'O',stopping:'x',
}
function show_workers_string() {
return Object.values(workers)
.sort((a,b)=>(a.discovered>b.discovered?+1:-1))
.reduce((ss,w)=>(ss+state_char[w.state]),'')
}
function show_workers() {
const ss = show_workers_string()
console.log(`workers: [${ss}]`)
}
pub.bind('tcp://*:5556',()=>{
console.log('engine(pub): bound')
setTimeout(init,config.INITIAL_WAIT)
monitor()
})
pull.bind('tcp://*:5557',()=>{
console.log('engine(pull): bound')
pull.on('message',handle_response)
})
async function start_worker() {
const container = await docker.createContainer({
Image: 'datarecipe-eng',
AttachStdin: false,
AttachStdout: true,
AttachStderr: true,
Tty: false,
OpenStdin: false,
StdinOnce: false,
HostConfig: {
Binds: ['/var/data:/var/data']
}
})
await container.start()
}
async function stop_worker(cid) {
workers[cid].state = 'stopping'
const container = docker.getContainer(cid)
if (container) {
await container.stop()
await container.remove()
console.log(`conatiner ${cid} stopped/removed.`)
}
delete workers[cid]
}
function init() {
console.log('ping')
pub.send(['* ping'],0,()=>{
setTimeout(()=>{
const ready = Object.keys(workers).length
const shortage = config.MINIMUM_IDLE_WORKERS-ready
if (shortage>0 && config.AUTO_SCALE) {
console.log(`shortage ${shortage} workers.`)
for (var i=0; i<shortage; i++) {
start_worker()
}
}
},config.PING_TIMEOUT)
})
}
function monitor() {
setInterval(()=>{
const limit = moment().subtract(config.IDLE_LIMIT,'s')
const idle_workers = Object.entries(workers).filter(([cid,worker])=>{
return worker.state=='ready' && worker.moment<=limit
})
//console.log(`${idle_workers.length} idle workers.`)
const excess = idle_workers.length - config.MINIMUM_IDLE_WORKERS
if (excess>0 && config.AUTO_SCALE) {
console.log(`extra ${excess} workers.`)
for (var i=0; i<excess; i++) {
const [cid,worker] = idle_workers[i]
stop_worker(cid)
}
}
show_workers()
},config.MONITOR_INTERVAL)
}
function handle_response(msg) {
const info = JSON.parse(msg);
console.log(`engine(${info.cid}): ${info.type}`);
if (!(info.cid in workers)) {
workers[info.cid] = {cid:info.cid,discovered:moment()}
}
worker = workers[info.cid]
if (info.type=='pong') {
worker.state = 'ready'
if (worker.cb) {
const err = 'worker restarted'
worker.cb(err,null)
}
} else if (info.type=='start') {
worker.state = 'running'
} else if (info.type=='finish') {
if ('cb' in worker) {
worker.cb(info.error,info.result)
delete worker.cb
}
worker.state = 'ready'
}
worker.moment = moment()
show_workers()
}
function select_worker() {
return new Promise(cb=>{
const now = moment()
console.log('ping')
pub.send(['* ping'],0,()=>{
setTimeout(()=>{
const found = Object.values(workers)
.find(w=>(w.state=='ready' && w.moment>=now))
if (found) found.state = 'reserved'
cb(found)
},config.PING_TIMEOUT)
})
})
}
async function request(cmd) {
var worker = await select_worker()
if (!worker) {
if (config.AUTO_SCALE) {
console.log('not found ready worker, so create one.')
await start_worker()
worker = await select_worker()
}
if (!worker) {
const err = 'cannot assign worker.'
console.log(err)
throw err
}
}
console.log(`worker ${worker.cid}: selected`)
return await new Promise((ok,ng) => {
const msg = JSON.stringify(cmd)
pub.send([worker.cid+' '+msg],0,() => {
console.log('engine: sent ');
worker.cb = (err,ret) => {
if (err) ng(err); else ok(ret)
}
show_workers()
})
})
}
module.exports = request