尝试使用Apache Spark的Scala shell

总结

这是Scala Advent Calendar 2014的第18天。很抱歉耽误了。

我正在设计一个系统,可以解析应用程序日志并将其注册到 Elasticsearch,在 Kibana 中进行可视化。由于需要实时高速地解析大量日志,我决定尝试使用 Spark,并总结了我的研究结果。

測試環境

    • Ubuntu 14.04.1 LTS

 

    • Java 1.8.0_20

 

    • Scala 2.10.4

 

    Spark 1.1.1

请参考官方文档以获取有关Spark的安装方法等信息。

尝试解析Apache的访问日志

假设LogFormat是以下格式。

LogFormat "%h %l %u %t %>s %X %T %B \"%r\" \"%{Referer}i\" \"%{User-Agent}i\""

具体来说,就是这样的。

192.168.1.10 - - [20/Oct/2014:09:00:03 +0900] 200 + 0 1194 "POST /hoge/hoge HTTP/1.1" "http://hoge.co.jp/hoge/view?aaa=1234" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.202 Safari/535.1"

编写解析器

以下是 Parser:
在以下状态下运行 sbt package 命令,生成一个 jar 文件。


package parser

import model.AccessLog
import util.parsing.combinator._
import java.text.{SimpleDateFormat}
import java.util.{Locale, TimeZone}

object AccessLogParser extends RegexParsers {

  override val whiteSpace = """[ \t]+""".r

  def eol: Parser[Any] = """\r?\n""".r ^^ { s => "" }
  def line: Parser[AccessLog] =
    ipAddress ~ ident ~ user ~ time ~ status ~ connStatus ~ elapsedTime ~ bytes ~ method ~ uri ~ version ~ referrer ~ userAgent <~ rest ^^ {
      case ipAddress ~ ident ~ user ~ time ~ status ~ connStatus ~ elapsedTime ~ bytes ~ method ~ uri ~ version ~ referrer ~ userAgent =>
        AccessLog(ipAddress, ident, user, time, status, bytes, elapsedTime, method, uri, version, referrer, userAgent)
    }

  def ipAddress: Parser[String] = """[^ ]+""".r
  def ident: Parser[String] = """[(?:\w+)-]""".r
  def user: Parser[String] = """[(?:\w+)-]""".r
  def time: Parser[String] = "[" ~> """\S+ [^ \]]+""".r <~ "]" ^^ { convertToIso8601(_) }
  def status: Parser[Int] = """\d+""".r ^^ { _.toInt }
  def connStatus: Parser[String] = """\S+""".r
  def elapsedTime: Parser[Int] = """\d+""".r ^^ { _.toInt }
  def bytes: Parser[Int] = """[^ ]+""".r ^^ { case "-" => 0; case s => s.toInt }
  def method: Parser[String] = "\"" ~> """[A-Z]+""".r
  def uri: Parser[String] = """\S+""".r
  def version: Parser[String] = """[^ "]+""".r <~ "\""
  def referrer: Parser[String] = "\"" ~> """[^"]+""".r <~ "\""
  def userAgent: Parser[String] = "\"" ~> """[^"]+""".r <~ "\""
  def rest: Parser[String] = """[^$]*""".r

  def parse(log: String): ParseResult[AccessLog]  = parseAll(line, log)

  def convertToIso8601(strDate: String) = {
    val df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.ENGLISH)
    df.setTimeZone(TimeZone.getTimeZone("UTC"))
    val date = df.parse(strDate)
    df.applyPattern("YYYY-MM-dd'T'hh:mm:ss Z")
    df.format(date).replace(" ", "")
  }
}

case class AccessLog(
  ipAddress: String = "",
  ident: String = "",
  user: String = "",
  time: String = "",
  status: Int = 0,
  bytes: Int = 0,
  elapsedTime: Int = 0,
  method: String = "",
  uri: String = "",
  version: String = "",
  referrer: String = "",
  userAgent: String = "")


name := "LogParser"

version := "1.0"

scalaVersion := "2.10.4"

尝试在Spark的REPL中运行

在这里使用4个核心启动Spark的REPL。
顺便提一下,在之前的步骤中,我们将创建的jar文件放置在/pathToSparkHome/jar目录下。

$ cd /pathToSparkHome/
$ ./bin/spark-shell --master local[4] --jars jar/logparser_2.10-1.0.jar

对存储在/pathToAccesslog/accesslog中的大约1GB大小的日志文件进行解析。
计算有效记录的数量,并计算状态码为400的请求的数量。

scala> import parser._
import parser._

scala> val file = sc.textFile("/pathToAccesslog/accesslog")
14/12/19 10:54:22 INFO MemoryStore: ensureFreeSpace(32728) called with curMem=0, maxMem=278019440
14/12/19 10:54:22 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 265.1 MB)
14/12/19 10:54:22 INFO MemoryStore: ensureFreeSpace(4959) called with curMem=32728, maxMem=278019440
14/12/19 10:54:22 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.8 KB, free 265.1 MB)
14/12/19 10:54:22 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:35772 (size: 4.8 KB, free: 265.1 MB)
14/12/19 10:54:22 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
14/12/19 10:54:22 INFO SparkContext: Created broadcast 0 from textFile at <console>:15
file: org.apache.spark.rdd.RDD[String] = /home/ysksuzuki/access.log MappedRDD[1] at textFile at <console>:15

scala> val records = file.map{ line =>
     | AccessLogParser.parse(line).getOrElse(AccessLog())
     | }.filter(!_.ipAddress.isEmpty)
records: org.apache.spark.rdd.RDD[parser.AccessLog] = FilteredRDD[3] at filter at <console>:19

scala> records.count

(省略)

res0: Long = 2767874

scala> records.filter(_.status == 400).count

(省略)

res1: Long = 93

尝试的结果

我用大约30秒的时间解析了1GB的访问日志。虽然只是在本地环境的单台机器上进行了测试,但我觉得能够感受到Spark功能的一部分。

我原本想使用Spark-hadoop将数据注册到elasticsearch,但它并不像我想象的那样工作,所以我放弃了。我想再继续研究一下这个问题。

广告
将在 10 秒后关闭
bannerAds