作为一名基础设施工程师,我尝试使用Spark Streaming和Kafka来聚合访问日志

    • インフラエンジニアっぽくNginxのアクセスログをリアルタイム集計してみる

 

    • SampleがScalaが多いのでScalaで書いてみた(初Scala)

 

    • なうな感じでNginx=>Fluent=>Kafka=>SparkStreaming

 

    • Scala汚いのはゆるしてね

 

    • 基本的にWorkCountのsampleをごにょごにょしただけ

 

    とりあえず集計してみる

示例日志

我使用这个来准备了LTSV格式的日志,只修改了最后一行。

puts "time:#{Time.at(now).strftime('%d/%b/%Y:%H:%M:%S %z')}\thost:#{record['host']}\tforwardedfor:#{record['host']}\treq:#{record['method']} #{record['path']} HTTP/1.1\tstatus:#{record['code']}\tsize:#{record['size']}\treferer:#{record['referer']}\tua:#{record['agent']}"
time:22/Dec/2016:18:07:56 +0900 host:164.81.181.112     forwardedfor:164.81.181.112     req:GET /category/office HTTP/1.1      status:200      size:124        referer:/item/games/3481        ua:Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3
time:22/Dec/2016:18:07:59 +0900 host:196.93.44.211      forwardedfor:196.93.44.211      req:GET /category/electronics?from=10 HTTP/1.1 status:200      size:136        referer:/category/electronics   ua:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:10.0.1) Gecko/20100101 Firefox/10.0.1
time:22/Dec/2016:18:08:02 +0900 host:20.171.223.57      forwardedfor:20.171.223.57      req:GET /category/finance HTTP/1.1     status:200      size:78 referer:/category/office        ua:Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)

流利=>Kafka组件

使用 fluent-plugin-kafka 将日志发送到 Kafka。

    • 全メッセージをStringとして送り込む

 

    parseはFluentではなくSparkでやらせる(どっちがいいかは知らないけどSlcalaの方が早いイメージ+勉強のため)

使用 “td-agent-gem install fluent-plugin-kafka” 命令安装 fluent-plugin-kafka,并将 td-agent.conf 文件放置在相应位置。

<match **>
  @type kafka
  brokers 10.0.0.65:9092
  zookeeper 10.0.0.65:2181
  default_topic nginx
</match>

卡夫卡准备

请下载并安装。

$ cd /opt
$ wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.1.0/kafka_2.10-0.10.1.0.tgz
$ tar xfvz kafka_2.10-0.10.1.0.tgz
$ ln -s kafka_2.10-0.10.1.0 kafka
$ cd kafka

普通地启动Zookeeper。

$ ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 
$ jps -v |grep zookeeper
3839 QuorumPeerMain -Xmx512M -Xms512M -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/opt/kafka/bin/../logs/zookeeper-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/opt/kafka/bin/../logs -Dlog4j.configuration=file:./bin/../config/log4j.properties

以常规方式启动Kafka

$ ./bin/kafka-server-start.sh -daemon config/server.properties
$ jps -v |grep Kafka
28603 Kafka -Xmx1G -Xms1G -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/opt/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/opt/kafka/bin/../logs -Dlog4j.configuration=file:./bin/../config/log4j.properties

我试着发送一条消息。

启动输出控制台的Worker

$ ./bin/kafka-console-consumer.sh --consumer-property=config/consumer.properties --zookeeper.0.0.65:2181 --topic nginx

将数据流入Fluent并查看

$ head sample.log| /opt/td-agent/embendfluent-cat --none data.nginx

工人在他们的控制台上输出的话,就可以了。

{"message":"time:22/Dec/2016:18:07:56 +0900\thost:164.81.181.112\tforwardedfor:164.81.181.112\treq:GET /category/office HTTP/1.1\tstatus:200\tsize:124\treferer:/item/games/3481\tua:Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3","timestamp":"2016-12-26T11:05:07+0900"}
{"message":"time:22/Dec/2016:18:07:59 +0900\thost:196.93.44.211\tforwardedfor:196.93.44.211\treq:GET /category/electronics?from=10 HTTP/1.1\tstatus:200\tsize:136\treferer:/category/electronics\tua:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:10.0.1) Gecko/20100101 Firefox/10.0.1","timestamp":"2016-12-26T11:05:07+0900"}

Spark的准备

我在这里也写过了,所以省略。
只是下载和解压了spark-2.0.0.tgz。

创建Scala环境。

安装Scala和SBT。

    • sbt 0.13.12

 

    scala 2.11.6-6

可以通过以下链接下载 `apt` 安装文件:https://dl.bintray.com/sbt/debian

$ apt install scala=2.11.6-6 sbt=0.13.12

创建目录

$ mkdir -p sample/src/main/scala/

创建一个build.sbt文件

$ cat sample/build.sbt 
import scala.util.Properties

name := "Test"
version := "1.0"
scalaVersion := "2.11.6"

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.0.0"
libraryDependencies += "net.liftweb" % "lift-json_2.11" % "3.0.1"
libraryDependencies += "com.github.seratch" % "ltsv4s_2.11" % "1.0.+"

assemblyMergeStrategy in assembly := {
  case PathList("javax", "servlet", xs @ _*)         => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".properties" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".xml" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".types" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first
  case "application.conf"                            => MergeStrategy.concat
  case "unwanted.txt"                                => MergeStrategy.discard
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

关于assemblyMergeStrategy,在执行assembly时遇到了各种错误,所以我复制黏贴了它们(不太明白)。

为了使用assembly功能,需要添加插件。

$ mkdir -p sample/project/
$ cat sample/project/plugins.sbt 
resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)
  addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

Kafka的代码仅获取并打印。

按照样本的要求

package com.test.spark

import java.util.Date
import java.util.Calendar
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWorker {
  def main(args: Array[String]) {
    // zkQuorum(127.0.0.1:2181), group(test), topics(nginx), numThreads(2), Sec
    if (args.length < 5) {
      System.exit(1)
    }
    val Array(zkQuorum, group, topics, numThreads, sec) = args
    val secSleep    = sec.toInt
    val topicMap    = topics.split(",").map((_, numThreads.toInt)).toMap
    val sparkConf   = new SparkConf().setAppName("KafkaWorker")
    val ssc         = new StreamingContext(sparkConf, Seconds(secSleep))
    val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    ssc.checkpoint("checkpoint")

    kafkaStream.foreachRDD{ rdd => 
      println("### Start %s ###".format(Calendar.getInstance.getTime.toString))
      rdd.foreach(print)
      println("### END %s ###\n".format(Calendar.getInstance.getTime.toString))
    }

    ssc.start()
    ssc.awaitTermination()
  }
}

编译

$ cd sample/
$ sbt assembly #大量にwarnがでるけどMergingなので気にしない
$ ll target/scala-2.11/Test-assembly-1.0.jar 
-rw-r--r-- 1 root root 112868840 Dec 26 11:24 target/scala-2.11/Test-assembly-1.0.jar

用Spark执行Jar文件

引数按照源代码的要求以 zkQuorum(127.0.0.1:2181)、group(test)、topics(nginx)、numThreads(2)、Sec 的顺序进行设置。

$ cd /opt/spark
$ ./bin/spark-submit --class com.test.spark.KafkaWorker  /root/sample/target/scala-2.11/Testsembly-1.0.jar 127.0.0.1:2181 test nginx 2 5
### Start Mon Dec 26 11:31:05 JST 2016 ###
### END Mon Dec 26 11:31:05 JST 2016 ###

### Start Mon Dec 26 11:31:10 JST 2016 ###
### END Mon Dec 26 11:31:10 JST 2016 ###

确认了大概每5秒执行一次的处理。

将数据输入并观察

$ head sample.log| /agent/embedded/bin/fluent-cat --none data.nginx
### Start Mon Dec 26 11:32:20 JST 2016 ###
{"message":"time:22/Dec/2016:18:07:56 +0900\thost:164.81.181.112\tforwardedfor:164.81.181.112\treq:GET /category/office HTTP/1.1\tstatus:200\tsize:124\treferer:/item/games/3481\tua:Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:07:59 +0900\thost:196.93.44.211\tforwardedfor:196.93.44.211\treq:GET /category/electronics?from=10 HTTP/1.1\tstatus:200\tsize:136\treferer:/category/electronics\tua:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:10.0.1) Gecko/20100101 Firefox/10.0.1","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:02 +0900\thost:20.171.223.57\tforwardedfor:20.171.223.57\treq:GET /category/finance HTTP/1.1\tstatus:200\tsize:78\treferer:/category/office\tua:Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:06 +0900\thost:212.159.169.49\tforwardedfor:212.159.169.49\treq:GET /item/computers/2268 HTTP/1.1\tstatus:200\tsize:139\treferer:/item/networking/248\tua:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:07 +0900\thost:140.69.110.95\tforwardedfor:140.69.110.95\treq:GET /category/books HTTP/1.1\tstatus:200\tsize:109\treferer:-\tua:Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:9.0.1) Gecko/20100101 Firefox/9.0.1","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:09 +0900\thost:172.18.127.139\tforwardedfor:172.18.127.139\treq:GET /category/electronics HTTP/1.1\tstatus:200\tsize:135\treferer:-\tua:Mozilla/5.0 (Windows NT 6.0; rv:10.0.1) Gecko/20100101 Firefox/10.0.1","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:12 +0900\thost:120.222.102.169\tforwardedfor:120.222.102.169\treq:POST /search/?c=Computers+Electronics HTTP/1.1\tstatus:200\tsize:128\treferer:-\tua:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:16 +0900\thost:116.150.211.139\tforwardedfor:116.150.211.139\treq:GET /category/electronics HTTP/1.1\tstatus:200\tsize:55\treferer:-\tua:Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; GTB7.2; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C)","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:16 +0900\thost:192.42.23.199\tforwardedfor:192.42.23.199\treq:GET /category/networking HTTP/1.1\tstatus:200\tsize:59\treferer:-\tua:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.7 (KHTML, like Gecko) Chrome/16.0.912.77 Safari/535.7","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:16 +0900\thost:220.84.166.98\tforwardedfor:220.84.166.98\treq:GET /category/toys HTTP/1.1\tstatus:200\tsize:124\treferer:/item/office/4833\tua:Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)","timestamp":"2016-12-26T11:32:16+0900"}
### END Mon Dec 26 11:32:20 JST 2016 ###

我正确地从Kafka中获取并打印了数据。

我試著進行統計。

尝试对访问日志的请求路径的第一级进行每5秒进行汇总,但要排除图标、js和错误响应。所以条件是 status == 200 且 size > 100 进行筛选。

请使用中文将以下内容进行释义:

package com.test.spark

import java.util.HashMap
import java.util.Date
import java.util.Calendar
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import scala.util.parsing.json.JSON
import net.liftweb._
import net.liftweb.json._
import com.github.seratch.ltsv4s._

object KafkaWorker {
  case class FluentEvent(
      timestamp: String,
      message: String
    )

  def main(args: Array[String]) {
    // zkQuorum(127.0.0.1:2181), group(test), topics(imp), numThreads(2), Sec
    if (args.length < 5) {
      System.exit(1)
    }
    val Array(zkQuorum, group, topics, numThreads, sec) = args
    val secSleep    = sec.toInt
    val topicMap    = topics.split(",").map((_, numThreads.toInt)).toMap
    val sparkConf   = new SparkConf().setAppName("KafkaWorker")
    val ssc         = new StreamingContext(sparkConf, Seconds(secSleep))
    val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

    ssc.checkpoint("checkpoint")

    val nginxStream = kafkaStream.map(convertFluentToMap(_))
    // ステータスコード200以上でresponsが100Byte以上のリクエストをパス(第1階層)のみ抽出
    val pathsStream = nginxStream.map{nginxRecord =>
      if (nginxRecord("size").toInt >= 100 && nginxRecord("status").toInt == 200 ){
        reqToPath(nginxRecord("req")).split("/")(1)
      }
    }
    // path毎にcountする
    val countPath = pathsStream.map((_, 1))
      .reduceByKeyAndWindow(_ + _, Seconds(secSleep))
      .map{case (path, count) => (count, path)}
      .transform(_.sortByKey(false))

    // OutPut
    countPath.foreachRDD{ rdd => 
      println("### Start %s ###".format(Calendar.getInstance.getTime.toString))
      val path = rdd.take(10) 
      path.foreach{case (count, tag) => 
        tag match {
          case tag: String => println("%s count (%s)".format(count, tag))
          case _ => println("%s count not match".format(count))
        }
      }
      println("### END %s ###\n".format(Calendar.getInstance.getTime.toString))
    }

    ssc.start()
    ssc.awaitTermination()
  }
  def parseNginxLtsv(record: String) = { LTSV.parseLine(record) }
  def parseFluentJson(record: String) = {
    implicit val formats = DefaultFormats
    parse(record).extract[FluentEvent].message
  }
  def convertFluentToMap(record: String) = { parseNginxLtsv(parseFluentJson(record)) }
  def reqToPath(record: String) = { record.split(" ")(1) }
}

试着动一下

$ sbt assembly
$ cd /opt/spark
$ ./bin/spark-submit --class com.test.spark.KafkaWorker  /root/sample/target/scala-2.11/Test-assembly-1.0.jar 127.0.0.1:2181 test nginx 2 5
### Start Mon Dec 26 12:32:25 JST 2016 ###                                      
879 count not match
285 count (category)
190 count (item)
48 count (search)
### END Mon Dec 26 12:32:25 JST 2016 ###

### Start Mon Dec 26 12:32:30 JST 2016 ###
802 count not match
267 count (category)
175 count (item)
38 count (search)
### END Mon Dec 26 12:32:30 JST 2016 ###

### Start Mon Dec 26 12:32:35 JST 2016 ###
895 count not match
321 count (category)
181 count (item)
53 count (search)
### END Mon Dec 26 12:32:35 JST 2016 ###

如果有时间的话,我想要试试Fluent的DataCounter、Norikura等性能测试。

广告
将在 10 秒后关闭
bannerAds