尝试使用Apache Spark的Scala shell
总结
这是Scala Advent Calendar 2014的第18天。很抱歉耽误了。
我正在设计一个系统,可以解析应用程序日志并将其注册到 Elasticsearch,在 Kibana 中进行可视化。由于需要实时高速地解析大量日志,我决定尝试使用 Spark,并总结了我的研究结果。
測試環境
-
- Ubuntu 14.04.1 LTS
-
- Java 1.8.0_20
-
- Scala 2.10.4
- Spark 1.1.1
请参考官方文档以获取有关Spark的安装方法等信息。
尝试解析Apache的访问日志
假设LogFormat是以下格式。
LogFormat "%h %l %u %t %>s %X %T %B \"%r\" \"%{Referer}i\" \"%{User-Agent}i\""
具体来说,就是这样的。
192.168.1.10 - - [20/Oct/2014:09:00:03 +0900] 200 + 0 1194 "POST /hoge/hoge HTTP/1.1" "http://hoge.co.jp/hoge/view?aaa=1234" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.202 Safari/535.1"
编写解析器
以下是 Parser:
在以下状态下运行 sbt package 命令,生成一个 jar 文件。
package parser
import model.AccessLog
import util.parsing.combinator._
import java.text.{SimpleDateFormat}
import java.util.{Locale, TimeZone}
object AccessLogParser extends RegexParsers {
override val whiteSpace = """[ \t]+""".r
def eol: Parser[Any] = """\r?\n""".r ^^ { s => "" }
def line: Parser[AccessLog] =
ipAddress ~ ident ~ user ~ time ~ status ~ connStatus ~ elapsedTime ~ bytes ~ method ~ uri ~ version ~ referrer ~ userAgent <~ rest ^^ {
case ipAddress ~ ident ~ user ~ time ~ status ~ connStatus ~ elapsedTime ~ bytes ~ method ~ uri ~ version ~ referrer ~ userAgent =>
AccessLog(ipAddress, ident, user, time, status, bytes, elapsedTime, method, uri, version, referrer, userAgent)
}
def ipAddress: Parser[String] = """[^ ]+""".r
def ident: Parser[String] = """[(?:\w+)-]""".r
def user: Parser[String] = """[(?:\w+)-]""".r
def time: Parser[String] = "[" ~> """\S+ [^ \]]+""".r <~ "]" ^^ { convertToIso8601(_) }
def status: Parser[Int] = """\d+""".r ^^ { _.toInt }
def connStatus: Parser[String] = """\S+""".r
def elapsedTime: Parser[Int] = """\d+""".r ^^ { _.toInt }
def bytes: Parser[Int] = """[^ ]+""".r ^^ { case "-" => 0; case s => s.toInt }
def method: Parser[String] = "\"" ~> """[A-Z]+""".r
def uri: Parser[String] = """\S+""".r
def version: Parser[String] = """[^ "]+""".r <~ "\""
def referrer: Parser[String] = "\"" ~> """[^"]+""".r <~ "\""
def userAgent: Parser[String] = "\"" ~> """[^"]+""".r <~ "\""
def rest: Parser[String] = """[^$]*""".r
def parse(log: String): ParseResult[AccessLog] = parseAll(line, log)
def convertToIso8601(strDate: String) = {
val df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.ENGLISH)
df.setTimeZone(TimeZone.getTimeZone("UTC"))
val date = df.parse(strDate)
df.applyPattern("YYYY-MM-dd'T'hh:mm:ss Z")
df.format(date).replace(" ", "")
}
}
case class AccessLog(
ipAddress: String = "",
ident: String = "",
user: String = "",
time: String = "",
status: Int = 0,
bytes: Int = 0,
elapsedTime: Int = 0,
method: String = "",
uri: String = "",
version: String = "",
referrer: String = "",
userAgent: String = "")
name := "LogParser"
version := "1.0"
scalaVersion := "2.10.4"
尝试在Spark的REPL中运行
在这里使用4个核心启动Spark的REPL。
顺便提一下,在之前的步骤中,我们将创建的jar文件放置在/pathToSparkHome/jar目录下。
$ cd /pathToSparkHome/
$ ./bin/spark-shell --master local[4] --jars jar/logparser_2.10-1.0.jar
对存储在/pathToAccesslog/accesslog中的大约1GB大小的日志文件进行解析。
计算有效记录的数量,并计算状态码为400的请求的数量。
scala> import parser._
import parser._
scala> val file = sc.textFile("/pathToAccesslog/accesslog")
14/12/19 10:54:22 INFO MemoryStore: ensureFreeSpace(32728) called with curMem=0, maxMem=278019440
14/12/19 10:54:22 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 265.1 MB)
14/12/19 10:54:22 INFO MemoryStore: ensureFreeSpace(4959) called with curMem=32728, maxMem=278019440
14/12/19 10:54:22 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.8 KB, free 265.1 MB)
14/12/19 10:54:22 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:35772 (size: 4.8 KB, free: 265.1 MB)
14/12/19 10:54:22 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
14/12/19 10:54:22 INFO SparkContext: Created broadcast 0 from textFile at <console>:15
file: org.apache.spark.rdd.RDD[String] = /home/ysksuzuki/access.log MappedRDD[1] at textFile at <console>:15
scala> val records = file.map{ line =>
| AccessLogParser.parse(line).getOrElse(AccessLog())
| }.filter(!_.ipAddress.isEmpty)
records: org.apache.spark.rdd.RDD[parser.AccessLog] = FilteredRDD[3] at filter at <console>:19
scala> records.count
(省略)
res0: Long = 2767874
scala> records.filter(_.status == 400).count
(省略)
res1: Long = 93
尝试的结果
我用大约30秒的时间解析了1GB的访问日志。虽然只是在本地环境的单台机器上进行了测试,但我觉得能够感受到Spark功能的一部分。
我原本想使用Spark-hadoop将数据注册到elasticsearch,但它并不像我想象的那样工作,所以我放弃了。我想再继续研究一下这个问题。