实际应用中的LINE BOT服务器实现示例

简而言之

考虑到可以实际运营的LINE BOT服务器,如yoichiro6642在以下参考URL中所写,需要进行异步处理。
参考URL:确保大量消息也能安全处理的LINE BOT服务器架构。

Sequence Diagram.png

在一个小规模的环境中,为了能够承受一定数量的大量消息,我根据上述目标编写了适用于LINE BOT服务器的框架。
最后一部分关于“减少API调用次数”(通过在消息发送时指定多个MID来减少API调用次数)的功能尚未实现。
我使用的环境如下。

    • OS     : CentOS 7.2.1511 x86_64

 

    • BOT Server : Node.js v6.2.0

 

    • Queue    : MongoDB v3.2.6

 

    Dispatcher & jobWorker: Python 2.7.5

有一个选择是使用Amazon API Gateway+Lambda+DynamoDB,但我考虑是否可以使用Node.js+MongoDB+Python来实现一个轻量级的Dispatcher和jobWorker,以降低开销。

考虑到RabbitMQ、memcached、Redis等选择,我们选择使用MongoDB的原因如下。

    • ポーリングではなくQueueに追加されたことを契機に処理をキックできるトリガーが欲しい。

 

    • MongoDBは、シングルで(勿論シングルでなくても良い)マスターとして使えばoplogが利用でき、oplog監視することでトリガーとして使える。

 

    受け付けたMIDごとの情報の格納や参照に、所詮高速なDBが必要。

基本知識

    • CentOS 7

 

    • Node.js

 

    • MongoDB, MongoDB.oplog

 

    Python

实施范例

准备MongoDB

我特别指定的是复制和oplogSizeMB。

systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod.line_bot.log
storage:
  dbPath: /var/lib/mongo/line_bot
  journal:
    enabled: true
processManagement:
  fork: false  # fork and run in background
  pidFilePath: /var/run/mongodb/mongod.line_bot.pid  # location of pidfile
net:
  port: 27017
  bindIp: 127.0.0.1  # Listen to local interface only, comment to listen on all interfaces.
replication:
  oplogSizeMB: 3072

MongoDB的启动

以主人模式启动。

$ mongod --master -f mongod.line_bot.conf

创建收藏品

我将这个集合设为了容量限制的集合,以便无需担心容量增加。

#!/bin/bach -v
mongo --port=27017 <<EOF
use line_bot;
db.createCollection("recvq", {
  capped: true,
  size: 1048576000 // 1GB
});
EOF

机器人服务器(Node.js)

frontDesk.js会接收来自LINE服务器的消息,并立即返回响应。

// Settings of the this program
var httpsPort = 443;
var allowPath = "/callback";
var httpsOpt  = {
    "caKey"  : "/etc/letsencrypt/live/xxx/privkey.pem",
    "caCert" : "/etc/letsencrypt/live/xxx/fullchain.pem",
    "caCa"   : "/etc/letsencrypt/live/xxx/chain.pem"
};
local = {};
local['channelSecret'] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";

// Settings of the MongoDB
var mongoHost = "127.0.0.1";
var mongoPort = 27017;
var mongoDb   = "line_bot";
var mongoCol  = "recvq";

var express= require('express'),
bodyParser = require('body-parser'),
log4js     = require('log4js'),
https      = require('https'),
fs         = require('fs'),
mongo      = require('mongodb'),
path       = require('path');

var accept = require(__dirname+'/accept');
var app    = express();
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true }));

// MongoDB
var MongoClient = require('mongodb').MongoClient, assert = require('assert');
var mongoUrl = 'mongodb://'+mongoHost + ":" + mongoPort + "/" + mongoDb;
set_col(local, mongoUrl, function(rc, local, mongoUrl) {
    if (!rc) {
        console.log("set_col.rc:"+rc);
        local.db.close();
        process.exit(1);
    }
    console.log("Connected succesfully to "+mongoUrl);
});

// handle a request
app.post(allowPath, function(req, res, next) {
    local['acceptTime'] = new Date().getTime();  // record accept time(ms)

    // response ASAP
    res.status(200).send("OK");
    res.end();

    accept.post_callback(req, res, next);  // Handle the request
});

// server certificate authority
var httpsOpt = {
    key:  fs.readFileSync(httpsOpt.caKey),
    cert: fs.readFileSync(httpsOpt.caCert),
    ca:   fs.readFileSync(httpsOpt.caCa)
};
// listen port
var httpsServer = https.createServer(httpsOpt, app);
httpsServer.listen(httpsPort, function() {
    console.log('Listening on port '+httpsPort+'...'); 
}).on('error', function(err) {
    if (err.errno === 'EADDRINUSE') {
        console.log('This program is already running.');
    } else {
        console.log(err);
    }
    process.exit(1);
});

function set_col(local, url, callback) {
    // Use connect method to connect to the MongoServer
    MongoClient.connect(url, function(err, db) {
        if (err) {
            console.log("MongoDB connection error."); console.log(err);
            process.exit(1);
        }
        local['db'] = db;

        local.db.collection(mongoCol, function(err, collection) {
            if (err) {
                console.log("MongoDB collection error."); console.log(err);
                process.exit(1);
            }
            local.db['collection'] = collection;
            callback(true, local, url);
        });
    });
}

然后,使用accept.js进行签名验证,并将其注册到MongoDB中。

var crypto = require('crypto');
var assert = require('assert');

exports.post_callback = function(req, res) {
    // signatureの有無をチェック
    if ((! req.headers) || (! req.headers["x-line-channelsignature"])) {
        console.log("400. Bad Request. The request does not have a x-line-channelsignature");
        return;
    }

    // requestのresultの有無をチェック
    if ((! req.body) ||
        (! req.body['result'])) {
        console.log("400. Bad Request. The request does not have result");
        return;
    }
    var result_num = req.body.result.length;

    // HTTP bodyをchannelSecretでsha256暗号化, base64ダイジェストを求める.
    var body_str = new Buffer(JSON.stringify(req.body), 'utf8');
    computedSignature = crypto.createHmac("sha256",local['channelSecret']).update(body_str).digest("base64");

    // signatureを比較し、正当性を確認
    if (req.headers["x-line-channelsignature"] != computedSignature) {
        console.log("400. Bad Request. The x-line-channelsignature is wrong.");
        return;
    }

    // 受け付けた時刻を入れておく
    for (var i=0; i<Object.keys(req.body.result).length; i++) {
        req.body.result[i]['acceptTime'] = local['acceptTime'];
    }

    // メッセージをMongoDBに登録
    local.db.collection.insertMany(req.body.result, function(err, r) {
        assert.equal(null, err);
        assert.equal(result_num, r.insertedCount);

        toQueueTime = new Date().getTime() - local['acceptTime'];
        console.log("necessary time to store to queue: "+toQueueTime+" ms");

        return;
    });

}

调度员和工作人员

我使用Python的多线程进行了实现。
jobWorker线程会在被创建后使用wait()等待threading.Event()。
触发器线程通过监视oplog中的ts来启动处理,并将其添加到队列。
将加载的队列内容分配给空闲的jobWorker线程,并使用set()设置Event,以启动jobWorker的处理。

我意识到了对于列表和变量的线程引用和更新的问题,所以没有使用锁来进行处理,但是当我尝试以多线程方式访问LINE API服务器时,出现了同时连接数错误。因此,我在jobWorker中对LINE API服务器的访问使用acquire()方法来进行独占锁。由于文档中没有涉及到这方面的内容,我将并发数设置为1,并且访问间隔为100毫秒。另外,由于我是Python多线程的初学者,请指正我是否存在错误。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Settings of the this program
NumOfThread       = 20
searchInterval    = 100000  # uSec
mainSleepInterval = 60      # Sec

# Settings of the MongoDB
mongoHost = "127.0.0.1";
mongoPort = 27017;
mongoDb   = "line_bot";
mongoCol  = "recvq";

import os,os.path
import sys
import threading
import time
import json
import pymongo
from   pymongo.cursor import CursorType
from   datetime import datetime
import datetime
import jobWorker

usleep = lambda x: time.sleep(x/1000000.0)  # マイクロ秒スリープ


##### workerスレッド
def workerThread(tt):
    tno = str(tt[0])
    while True:
        tt[2].clear()  # Eventをクリアし、Evant発生まで待機
        tt[3] = 'w'
        tt[2].wait()
        if verbose:  # 待機終了。処理開始
            print '\nworker['+tno+']: wake up'

        # ここで実際の処理関数を呼び出す
        jobWorker.jobWorker(verbose, tno, tt[4]['o'])


##### MongoDBトリガースレッド
def TriggerMongo(t, tchain, last, searchInterval, host, port, db, col):
    dbCol = db + '.' + col
    c = pymongo.MongoClient(host, port)
    # Uncomment this for master/slave.
    oplog = c.local.oplog['$main']
    # Uncomment this for replica sets.
    #oplog = c.local.oplog.rs

    first = next(oplog.find().sort('$natural', pymongo.DESCENDING).limit(-1))
    ts = first['ts']

    while True:
        cursor = oplog.find({'ts': {'$gt': ts}}, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True)
        while cursor.alive:
            for doc in cursor:
                # 定期的に {h:0,ts:Timestamp(nn.., 1),o:{},v:2,ns:'',op:'n'} が返る
                # が op:'n' は単なるインフォメーション。無視する。
                if doc['ns']==dbCol and doc['op']!='n':
                    # 空きスレッドを探す
                    i = tchain[last]
                    while t[i][3] != 'w':
                        i = tchain[i]
                        if i == tchain[last]:  # 1周探したら
                            usleep(searchInterval)

                    t[i][4] = doc  # 空きスレッドのt[n][4]にデータを格納
                    t[i][3] = 'r'
                    t[i][2].set()  # t[n]に処理開始指示
                    last = i
                # Work with doc here
                ts = doc['ts']
        print "got out of a while corsor.alive loop"


#######################################################################

# Check of the parameter
verbose = False
if len(sys.argv)==2 and sys.argv[1]=='-v':
    verbose = True
elif len(sys.argv)!=1:
    print "Usage: %s [-v]" % (sys.argv[0],)
    quit()

# workerスレッド管理データ作成 & workerスレッド生成
# [ThreadNo, ThreadObj ,EvantObj, status, スレッドに渡すデータ]
#   (status ' ':準備中, 'w':待機中・空き, 'r':実行中)
#  :
t = [ [0 for i in range(5)] for i in range(NumOfThread)]
for i in range(NumOfThread):
    t[i][0] = i                  # Thread No.
    t[i][2] = threading.Event()  # Evantオブジェクト生成
    t[i][3] = ' '                # is_running
    # workerスレッド生成
    t[i][1] = threading.Thread(name='worker['+str(i)+']', target=workerThread,
                               args=(t[i],))
    t[i][1].setDaemon(True)

# Thread list of circulation
tc = [0 for i in range(NumOfThread)]  # 値は次のスレッドNo.
for i in range(NumOfThread):
    tc[i] = i+1
tc[i] = 0  # make a list of circulation

lastThread = i  # 最後に使ったスレッド. 次はtc[lastThread]番目のスレッドを使う.

# workerスレッド起動
for i in range(NumOfThread):
    t[i][1].start()

# workerスレッド起動後wait状態待ち
yetAllThread = True
while yetAllThread:
    for i in range(NumOfThread):
        if t[i][3] == ' ':
            break
        else:
            usleep(100)  # 監視間隔は0.1ミリ秒
    if i == NumOfThread-1:
        yetAllThread = False
    else:
        usleep(100)  # 監視間隔は0.1ミリ秒

# MongoDBトリガースレッド生成
t_mongo = threading.Thread(name='t_mongo', target=TriggerMongo, args=(t,tc,lastThread,searchInterval,mongoHost,mongoPort,mongoDb,mongoCol,))
t_mongo.setDaemon(True)
t_mongo.start()  # 開始

# mainスレッド
while True:
    time.sleep(mainSleepInterval)

jobWorker.py是执行实际处理的线程。这是一个根据发送内容类型进行简单回复的示例。
注意,根据opType,获取MID(来自)的方法会有所不同。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Note of caution:
# - This program is one of the threads.
# - Must not do exit(),quit()
# - Please use only return()

# Settings of the LINE API Server
lineApiHost      = "trialbot-api_line_me"
accessIntervalMS = 100  # ms
getProfilesUrl   = "https://trialbot-api.line.me/v1/profiles"
postUrl          = "https://trialbot-api.line.me/v1/events"
getContentUrl    = "https://trialbot-api.line.me/v1/bot/message/$messageId/content"
header =  {
    "Content-Type"                : "application/json; charser=UTF-8",
    "X-Line-ChannelID"            : "xxxxxxxxxx",
    "X-Line-ChannelSecret"        : "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
    "X-Line-Trusted-User-With-ACL": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
}
postBodyTemplate = {
    "to"          : [],
    "toChannel"   : 1383378250,
    "eventType"   : "138311608800106203",
    "content"     : {}
}

import threading
import time
import datetime
import json

usleep = lambda x: time.sleep(x/1000000.0)  # マイクロ秒スリープ

# LINE APIサーバに複数jobWorkerスレッドからの同時アクセスを回避するためのロック関連
globalLock = {}            # 接続先ごとのロック
globalLastAccessTime = {}  # 接続先ごとの最終アクセスタイム
loadTime = int(time.time()*1000)

def jobWorker(verbose, mynum, recvBody):
    global globalLock
    global globalLastAccessTime

    # 接続先ごとのロックの初期設定
    if not globalLock.has_key(lineApiHost):
        globalLock[lineApiHost] = threading.Lock()
    # 接続先ごとの最終アクセスタイムの初期設定
    if not globalLastAccessTime.has_key(lineApiHost):
        globalLastAccessTime[lineApiHost] = loadTime

    if verbose:
        recvBody['_id'] = 'ObjectId("'+str(recvBody['_id'])+'")'
        print 'worker['+mynum+'] recvBody: '+str(int(time.time()*1000)-recvBody['acceptTime'])+' ms to here from accept'
        print recvBody

    opType = recvBody['content'].get('opType')

    # blocked from user
    if opType == 8:
        # ユーザ管理からブロックユーザのMID(recvBody['content']['params'][0])を削除
        print 'please delete user "'+recvBody['content']['params'][0]+'" from management data.'
        return

    # POSTのBody部をコピー
    postBody = {}
    postBody['to'] = ''
    postBody['toChannel'] = postBodyTemplate['toChannel']
    postBody['eventType'] = postBodyTemplate['eventType']
    postBody['content'] = {}

    # メッセージ返信先
    if opType==4:  # New user
        postBody['to'] = [ recvBody['content']['params'][0] ]
    else:
        postBody['to'] = [ recvBody['content']['from'] ]

    # New user
    if opType==4:
        # ユーザプロフィールを取得
        result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, postBody['to'][0], accessIntervalMS)
        userProfile = json.loads(result.text)
        resText = 'ようこそ!'
        # ユーザ管理DBにユーザのMIDでプロフィールを追加すべき
        print 'please add '+userProfile['contacts'][0]['displayName']+'\'s profile to user management db.'
        print json.dumps(userProfile, sort_keys = True, indent=4)

    # メッセージに応じた処理
    contentType = recvBody['content'].get('contentType')
    resText = ''
    if contentType == 1:  # Text
        resText = u'はい、'+recvBody['content']['text']+u'、ですね。'
    elif contentType == 2:  # Image
        resText = u'写真ですね...'
    elif contentType == 3:  # Video
        resText = u'動画ですね...'
    elif contentType == 4:  # Audio
        resText = u'ボイスメッセージですね...'
    elif contentType == 7:  # Location
        resText = u'位置情報ですね...'
        if verbose:
            print recvBody['content']['text'].encode('utf-8')
            print recvBody['content']['location']['title'].encode('utf-8')
            print recvBody['content']['location']['address'].encode('utf-8')
    elif contentType == 8:  # Sticker
        resText = u'スタンプですね'
        if verbose:
            print recvBody['content']['contentMetadata']['STKTXT'].encode('utf-8')
    elif contentType == 10: # Contact
        # Contact(contentType==10)ならcontentMetadataのmidプロフィールを取得
        resText = recvBody['content']['contentMetadata']['displayName']+u'さんの連絡先ですね'
        result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, recvBody['content']['contentMetadata']['mid'], accessIntervalMS)
        contactProfile = json.loads(result.text)
        if verbose:
            print '\ncontactProfile: ' + str(contactProfile)

    # 応答メッセージ送信
    if resText:
        # ユーザプロフィールを取得(本来はユーザ登録時にDBに登録、必要に応じて取得)
        result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, postBody['to'][0], accessIntervalMS)
        userProfile = json.loads(result.text)
        resText = userProfile['contacts'][0]['displayName'] + u'さん、' + resText
        if verbose:
            print '\nprofile: ' + str(userProfile)

        # 送信メッセージはtext(ContentType=1). その他にImage,Video,Audio,Location,Sticker,multiple messages,rich messagesが送れる
        postBody['content'] = {
            'contentType': 1,
            'toType'     : 1,
            'text'       : resText
        }
        if verbose:
            print '\nworker['+mynum+'] ' + postUrl
            print 'worker['+mynum+'] postHeader: ' + json.dumps(header, sort_keys = True, indent = 4)
            print 'worker['+mynum+'] postBody: ' + json.dumps(postBody, sort_keys = True, indent = 4)

        # メッセージ送信
        r = apiServer(verbose, mynum, 'post', lineApiHost, postUrl, header, postBody, accessIntervalMS)

    return


# LINE APIサーバアクセス
def apiServer(verbose, mynum, method, host, url, header, body, accessIntervalMS):
    import requests
    global globalLock
    global globalLastAccessTime

    globalLock[host].acquire()  # Lock

    # LINE APIサーバへのアクセスに一定時間空ける場合、あとどのくらい待てば良いか
    currentTime = int(time.time()*1000)
    remain = accessIntervalMS - (currentTime - globalLastAccessTime[host])

    if verbose:
        print 'worker['+mynum+'] time since last access(ms): '+str(currentTime - globalLastAccessTime[host])
        print 'worker['+mynum+'] remain='+str(remain)+' ms'

    # wait accessIntervalMS from last access
    if remain > 0:
        usleep(remain*1000)

    if method=='get':
        if body:
            payload = { 'mids': body }
            r = requests.get(url, params=payload, headers=header)
        else:
            if verbose:
                print url, header
            r = requests.get(url, headers=header)
    else:
        r = requests.post(url, data=json.dumps(body), headers=header)

    if verbose and r.status_code!=200:
        print 'worker['+mynum+'] HTTP status code: ' + str(r.status_code)
        print 'worker['+mynum+'] response: ' + r.text

    globalLastAccessTime[host] = int(time.time()*1000)

    globalLock[host].release()  # release
    return r

总结

我认为我们已经实现了一个轻量级的队列机制,它可以处理大量的消息,同时也实现了调度器和工作进程的框架。
在64位的CentOS 7初始状态下,系统总共的线程数上限是30118,但是如果有5000个线程,生成将会失败。(虽然并不需要那么多线程)
这样的机制不仅在BOT服务器上需要,也在使用多个SMTP服务器高效分发大量邮件的时候需要。

如果将jobWorker转换为其他语言应用程序,我认为可以通过将其微服务化或在不同进程之间使用管道进行通信进行更改来使用。如果想要通过此机制进行负载均衡,则可以将MongoDB放在不同服务器或进行分片,同时将”4. concurrent message”及其后续阶段放在其他机器上。如果想要进一步分散jobWorker,最好将其微服务化或采用其他请求代理机制。

广告
将在 10 秒后关闭
bannerAds