作为一名基础设施工程师,我尝试使用Spark Streaming和Kafka来聚合访问日志
-
- インフラエンジニアっぽくNginxのアクセスログをリアルタイム集計してみる
-
- SampleがScalaが多いのでScalaで書いてみた(初Scala)
-
- なうな感じでNginx=>Fluent=>Kafka=>SparkStreaming
-
- Scala汚いのはゆるしてね
-
- 基本的にWorkCountのsampleをごにょごにょしただけ
- とりあえず集計してみる
示例日志
我使用这个来准备了LTSV格式的日志,只修改了最后一行。
puts "time:#{Time.at(now).strftime('%d/%b/%Y:%H:%M:%S %z')}\thost:#{record['host']}\tforwardedfor:#{record['host']}\treq:#{record['method']} #{record['path']} HTTP/1.1\tstatus:#{record['code']}\tsize:#{record['size']}\treferer:#{record['referer']}\tua:#{record['agent']}"
time:22/Dec/2016:18:07:56 +0900 host:164.81.181.112 forwardedfor:164.81.181.112 req:GET /category/office HTTP/1.1 status:200 size:124 referer:/item/games/3481 ua:Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3
time:22/Dec/2016:18:07:59 +0900 host:196.93.44.211 forwardedfor:196.93.44.211 req:GET /category/electronics?from=10 HTTP/1.1 status:200 size:136 referer:/category/electronics ua:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:10.0.1) Gecko/20100101 Firefox/10.0.1
time:22/Dec/2016:18:08:02 +0900 host:20.171.223.57 forwardedfor:20.171.223.57 req:GET /category/finance HTTP/1.1 status:200 size:78 referer:/category/office ua:Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)
流利=>Kafka组件
使用 fluent-plugin-kafka 将日志发送到 Kafka。
-
- 全メッセージをStringとして送り込む
- parseはFluentではなくSparkでやらせる(どっちがいいかは知らないけどSlcalaの方が早いイメージ+勉強のため)
使用 “td-agent-gem install fluent-plugin-kafka” 命令安装 fluent-plugin-kafka,并将 td-agent.conf 文件放置在相应位置。
<match **>
@type kafka
brokers 10.0.0.65:9092
zookeeper 10.0.0.65:2181
default_topic nginx
</match>
卡夫卡准备
请下载并安装。
$ cd /opt
$ wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.1.0/kafka_2.10-0.10.1.0.tgz
$ tar xfvz kafka_2.10-0.10.1.0.tgz
$ ln -s kafka_2.10-0.10.1.0 kafka
$ cd kafka
普通地启动Zookeeper。
$ ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
$ jps -v |grep zookeeper
3839 QuorumPeerMain -Xmx512M -Xms512M -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/opt/kafka/bin/../logs/zookeeper-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/opt/kafka/bin/../logs -Dlog4j.configuration=file:./bin/../config/log4j.properties
以常规方式启动Kafka
$ ./bin/kafka-server-start.sh -daemon config/server.properties
$ jps -v |grep Kafka
28603 Kafka -Xmx1G -Xms1G -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/opt/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/opt/kafka/bin/../logs -Dlog4j.configuration=file:./bin/../config/log4j.properties
我试着发送一条消息。
启动输出控制台的Worker
$ ./bin/kafka-console-consumer.sh --consumer-property=config/consumer.properties --zookeeper.0.0.65:2181 --topic nginx
将数据流入Fluent并查看
$ head sample.log| /opt/td-agent/embendfluent-cat --none data.nginx
工人在他们的控制台上输出的话,就可以了。
{"message":"time:22/Dec/2016:18:07:56 +0900\thost:164.81.181.112\tforwardedfor:164.81.181.112\treq:GET /category/office HTTP/1.1\tstatus:200\tsize:124\treferer:/item/games/3481\tua:Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3","timestamp":"2016-12-26T11:05:07+0900"}
{"message":"time:22/Dec/2016:18:07:59 +0900\thost:196.93.44.211\tforwardedfor:196.93.44.211\treq:GET /category/electronics?from=10 HTTP/1.1\tstatus:200\tsize:136\treferer:/category/electronics\tua:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:10.0.1) Gecko/20100101 Firefox/10.0.1","timestamp":"2016-12-26T11:05:07+0900"}
Spark的准备
我在这里也写过了,所以省略。
只是下载和解压了spark-2.0.0.tgz。
创建Scala环境。
安装Scala和SBT。
-
- sbt 0.13.12
- scala 2.11.6-6
可以通过以下链接下载 `apt` 安装文件:https://dl.bintray.com/sbt/debian
$ apt install scala=2.11.6-6 sbt=0.13.12
创建目录
$ mkdir -p sample/src/main/scala/
创建一个build.sbt文件
$ cat sample/build.sbt
import scala.util.Properties
name := "Test"
version := "1.0"
scalaVersion := "2.11.6"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.0.0"
libraryDependencies += "net.liftweb" % "lift-json_2.11" % "3.0.1"
libraryDependencies += "com.github.seratch" % "ltsv4s_2.11" % "1.0.+"
assemblyMergeStrategy in assembly := {
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".properties" => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".xml" => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".types" => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first
case "application.conf" => MergeStrategy.concat
case "unwanted.txt" => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
关于assemblyMergeStrategy,在执行assembly时遇到了各种错误,所以我复制黏贴了它们(不太明白)。
为了使用assembly功能,需要添加插件。
$ mkdir -p sample/project/
$ cat sample/project/plugins.sbt
resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
Kafka的代码仅获取并打印。
按照样本的要求
package com.test.spark
import java.util.Date
import java.util.Calendar
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWorker {
def main(args: Array[String]) {
// zkQuorum(127.0.0.1:2181), group(test), topics(nginx), numThreads(2), Sec
if (args.length < 5) {
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads, sec) = args
val secSleep = sec.toInt
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val sparkConf = new SparkConf().setAppName("KafkaWorker")
val ssc = new StreamingContext(sparkConf, Seconds(secSleep))
val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
ssc.checkpoint("checkpoint")
kafkaStream.foreachRDD{ rdd =>
println("### Start %s ###".format(Calendar.getInstance.getTime.toString))
rdd.foreach(print)
println("### END %s ###\n".format(Calendar.getInstance.getTime.toString))
}
ssc.start()
ssc.awaitTermination()
}
}
编译
$ cd sample/
$ sbt assembly #大量にwarnがでるけどMergingなので気にしない
$ ll target/scala-2.11/Test-assembly-1.0.jar
-rw-r--r-- 1 root root 112868840 Dec 26 11:24 target/scala-2.11/Test-assembly-1.0.jar
用Spark执行Jar文件
引数按照源代码的要求以 zkQuorum(127.0.0.1:2181)、group(test)、topics(nginx)、numThreads(2)、Sec 的顺序进行设置。
$ cd /opt/spark
$ ./bin/spark-submit --class com.test.spark.KafkaWorker /root/sample/target/scala-2.11/Testsembly-1.0.jar 127.0.0.1:2181 test nginx 2 5
### Start Mon Dec 26 11:31:05 JST 2016 ###
### END Mon Dec 26 11:31:05 JST 2016 ###
### Start Mon Dec 26 11:31:10 JST 2016 ###
### END Mon Dec 26 11:31:10 JST 2016 ###
确认了大概每5秒执行一次的处理。
将数据输入并观察
$ head sample.log| /agent/embedded/bin/fluent-cat --none data.nginx
### Start Mon Dec 26 11:32:20 JST 2016 ###
{"message":"time:22/Dec/2016:18:07:56 +0900\thost:164.81.181.112\tforwardedfor:164.81.181.112\treq:GET /category/office HTTP/1.1\tstatus:200\tsize:124\treferer:/item/games/3481\tua:Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:07:59 +0900\thost:196.93.44.211\tforwardedfor:196.93.44.211\treq:GET /category/electronics?from=10 HTTP/1.1\tstatus:200\tsize:136\treferer:/category/electronics\tua:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:10.0.1) Gecko/20100101 Firefox/10.0.1","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:02 +0900\thost:20.171.223.57\tforwardedfor:20.171.223.57\treq:GET /category/finance HTTP/1.1\tstatus:200\tsize:78\treferer:/category/office\tua:Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:06 +0900\thost:212.159.169.49\tforwardedfor:212.159.169.49\treq:GET /item/computers/2268 HTTP/1.1\tstatus:200\tsize:139\treferer:/item/networking/248\tua:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:07 +0900\thost:140.69.110.95\tforwardedfor:140.69.110.95\treq:GET /category/books HTTP/1.1\tstatus:200\tsize:109\treferer:-\tua:Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:9.0.1) Gecko/20100101 Firefox/9.0.1","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:09 +0900\thost:172.18.127.139\tforwardedfor:172.18.127.139\treq:GET /category/electronics HTTP/1.1\tstatus:200\tsize:135\treferer:-\tua:Mozilla/5.0 (Windows NT 6.0; rv:10.0.1) Gecko/20100101 Firefox/10.0.1","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:12 +0900\thost:120.222.102.169\tforwardedfor:120.222.102.169\treq:POST /search/?c=Computers+Electronics HTTP/1.1\tstatus:200\tsize:128\treferer:-\tua:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:16 +0900\thost:116.150.211.139\tforwardedfor:116.150.211.139\treq:GET /category/electronics HTTP/1.1\tstatus:200\tsize:55\treferer:-\tua:Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; GTB7.2; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C)","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:16 +0900\thost:192.42.23.199\tforwardedfor:192.42.23.199\treq:GET /category/networking HTTP/1.1\tstatus:200\tsize:59\treferer:-\tua:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.7 (KHTML, like Gecko) Chrome/16.0.912.77 Safari/535.7","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:16 +0900\thost:220.84.166.98\tforwardedfor:220.84.166.98\treq:GET /category/toys HTTP/1.1\tstatus:200\tsize:124\treferer:/item/office/4833\tua:Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)","timestamp":"2016-12-26T11:32:16+0900"}
### END Mon Dec 26 11:32:20 JST 2016 ###
我正确地从Kafka中获取并打印了数据。
我試著進行統計。
尝试对访问日志的请求路径的第一级进行每5秒进行汇总,但要排除图标、js和错误响应。所以条件是 status == 200 且 size > 100 进行筛选。
请使用中文将以下内容进行释义:
package com.test.spark
import java.util.HashMap
import java.util.Date
import java.util.Calendar
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import scala.util.parsing.json.JSON
import net.liftweb._
import net.liftweb.json._
import com.github.seratch.ltsv4s._
object KafkaWorker {
case class FluentEvent(
timestamp: String,
message: String
)
def main(args: Array[String]) {
// zkQuorum(127.0.0.1:2181), group(test), topics(imp), numThreads(2), Sec
if (args.length < 5) {
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads, sec) = args
val secSleep = sec.toInt
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val sparkConf = new SparkConf().setAppName("KafkaWorker")
val ssc = new StreamingContext(sparkConf, Seconds(secSleep))
val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
ssc.checkpoint("checkpoint")
val nginxStream = kafkaStream.map(convertFluentToMap(_))
// ステータスコード200以上でresponsが100Byte以上のリクエストをパス(第1階層)のみ抽出
val pathsStream = nginxStream.map{nginxRecord =>
if (nginxRecord("size").toInt >= 100 && nginxRecord("status").toInt == 200 ){
reqToPath(nginxRecord("req")).split("/")(1)
}
}
// path毎にcountする
val countPath = pathsStream.map((_, 1))
.reduceByKeyAndWindow(_ + _, Seconds(secSleep))
.map{case (path, count) => (count, path)}
.transform(_.sortByKey(false))
// OutPut
countPath.foreachRDD{ rdd =>
println("### Start %s ###".format(Calendar.getInstance.getTime.toString))
val path = rdd.take(10)
path.foreach{case (count, tag) =>
tag match {
case tag: String => println("%s count (%s)".format(count, tag))
case _ => println("%s count not match".format(count))
}
}
println("### END %s ###\n".format(Calendar.getInstance.getTime.toString))
}
ssc.start()
ssc.awaitTermination()
}
def parseNginxLtsv(record: String) = { LTSV.parseLine(record) }
def parseFluentJson(record: String) = {
implicit val formats = DefaultFormats
parse(record).extract[FluentEvent].message
}
def convertFluentToMap(record: String) = { parseNginxLtsv(parseFluentJson(record)) }
def reqToPath(record: String) = { record.split(" ")(1) }
}
试着动一下
$ sbt assembly
$ cd /opt/spark
$ ./bin/spark-submit --class com.test.spark.KafkaWorker /root/sample/target/scala-2.11/Test-assembly-1.0.jar 127.0.0.1:2181 test nginx 2 5
### Start Mon Dec 26 12:32:25 JST 2016 ###
879 count not match
285 count (category)
190 count (item)
48 count (search)
### END Mon Dec 26 12:32:25 JST 2016 ###
### Start Mon Dec 26 12:32:30 JST 2016 ###
802 count not match
267 count (category)
175 count (item)
38 count (search)
### END Mon Dec 26 12:32:30 JST 2016 ###
### Start Mon Dec 26 12:32:35 JST 2016 ###
895 count not match
321 count (category)
181 count (item)
53 count (search)
### END Mon Dec 26 12:32:35 JST 2016 ###
如果有时间的话,我想要试试Fluent的DataCounter、Norikura等性能测试。