flink-日志文件批处理 - 聚合统计接口错误码
前提条件
- 知道如何配置 flink 环境,如果不了解的同学可以看看这篇文章Flink 大数据实践之环境准备
- 知道如何使用 IDEA 等编辑器,构建 pom.xml 等文件,知道如何打包生成 jar。
- 有一定的 Scala 基础
背景及解决问题
- 分析用户请求日志,统计出每小时中接口的错误码,每个小时的日志压缩包大概有 4G,解压之后大概有 40~50G。
- 如果这样的需求要用代码实现就显得麻烦,编码,调试,执行生成结果会耗费大量的时间。如果对 flink/spark 这样的大数据处理引擎比较了解的话,就十分的轻松。
- 日志是 JSON 格式的,需要提取出 JSON 中的
path
,method
,extra.errno
,然后对其进行数据聚合。
编码分析数据,提取日志,定义日志结构
日志的大致结构如下,其他数据做了脱敏处理。并不是所有的字段我们都需要,使用注解可以忽略@JsonIgnoreProperties(ignoreUnknown = true)
。@JsonProperty("path")
让程序知道 JSON 的名称就是 path。
{
"method": "GET",
"path": "/api/ping",
"extra": {
"errno": 500
}
}
提取日志中我们需要的字段
package com.work.file
import java.beans.BeanProperty
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.{JsonIgnoreProperties, JsonProperty}
@JsonIgnoreProperties(ignoreUnknown = true)
case class
Extra(
@JsonProperty("errno") @BeanProperty errno: Long,
)
@JsonIgnoreProperties(ignoreUnknown = true)
case class
Log(
@JsonProperty("path") @BeanProperty path: String,
@JsonProperty("method") @BeanProperty method: String,
@JsonProperty("extra") @BeanProperty extra: Extra
)
处理程序,获取数据源,处理数据,聚合之后的数据 Sink 至文件
- 程序的大体意思通过注释的形式写出来了
- Flink 处理数据的过程是:Source -> Process -> Sink,可以分为三个步骤
- Source 和 Sink 可以是不同的,支持文件系统、数据库、Redis、ElasticSearch 等等
- 透漏一点,后面讲到 Flink SQL 你就会发现其实几条 SQL 就可以解决我们的需求了。
package com.work.file
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
import org.apache.flink.streaming.api.scala._
object Error {
def main(args: Array[String]): Unit = {
// 获取命令行参数
val params = ParameterTool.fromArgs(args)
// 获取执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 将参数添加到 flink 的全局配置中
env.getConfig.setGlobalJobParameters(params)
// 获取输入源
val text = env.readTextFile(params.get("input"))
// 定义一个 JSON 解析的 mapper
val mapper = new ObjectMapper
// 按行切分日志
val split = text.flatMap {
_.split("\n")
}
val counter = split
.map {
value => {
try {
// 解析日志,截取日志是因为日志面前还有一些时间,非标准 JSON 结构
val node = mapper.readValue(value.substring(value.indexOf('{')), classOf[Log])
Some((node.method, node.path, node.extra.errno), 1)
} catch {
// 解析日志可能会出现问题,需要我们捕获异常,返回一个 Opiton 类型
case _: Exception => None
}
}
}
.filter(x => x.nonEmpty) // 过滤为 None 的集
.map(x => x.get) // 提取 option 中的数据
.groupBy(0) // 对第一个列分组 (node.method, node.path, node.extra.errno)
.sum(1) // 并求和,累加到第二列
// 数据 Sink 到文件中
counter.writeAsCsv(params.get("output"))
// 执行 Job
env.execute("api_error_stat")
}
}
将 Scala 程序打 Jar 包,提交到 Flink 执行
- 将程序打 Jar 包
- 提交 Jar 包给 Flink,Flink 会在后台运行 Job,直到数据处理完毕
- 如果中途需要结束程序,可以通过 cancel 参数来中断 Job
# 生成Jar包
mvn clean scala:compile compile package
# 提交Job
flink run /mnt/flink-test/out/artifacts/api_error_stat.jar \
--input /tmp/request-2020110205 \
--output /tmp/a.csv
# 此时会生成 Job ID:847f41fd98ae052485f9e26648918491,每个job都不一样
# 执行如下命令可以取消正在执行的Job
flink cancel 847f41fd98ae052485f9e26648918491
总结
- 了解 Flink 的工作原理,及流程。数据处理阶段其实可以简单的想成数据集合的聚合运算等
- 需要了解一下 mvn 的工程管理,将程序打包
- 将输入源解析成我们想要的数据结构,这篇文章需要我们了解 JSON 的解码
- 程序还有很大的优化空间,如果性能调优,分区等,会在后面的章节中一一介绍
- 多思考,多看文档,多实践