前提条件

  1. 知道如何配置 flink 环境,如果不了解的同学可以看看这篇文章Flink 大数据实践之环境准备
  2. 知道如何使用 IDEA 等编辑器,构建 pom.xml 等文件,知道如何打包生成 jar。
  3. 有一定的 Scala 基础

背景及解决问题

  1. 分析用户请求日志,统计出每小时中接口的错误码,每个小时的日志压缩包大概有 4G,解压之后大概有 40~50G。
  2. 如果这样的需求要用代码实现就显得麻烦,编码,调试,执行生成结果会耗费大量的时间。如果对 flink/spark 这样的大数据处理引擎比较了解的话,就十分的轻松。
  3. 日志是 JSON 格式的,需要提取出 JSON 中的pathmethodextra.errno,然后对其进行数据聚合。

编码分析数据,提取日志,定义日志结构

日志的大致结构如下,其他数据做了脱敏处理。并不是所有的字段我们都需要,使用注解可以忽略@JsonIgnoreProperties(ignoreUnknown = true)@JsonProperty("path") 让程序知道 JSON 的名称就是 path。

{
    "method": "GET",
    "path": "/api/ping",
    "extra": {
        "errno": 500
    }
}

提取日志中我们需要的字段

package com.work.file

import java.beans.BeanProperty

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.{JsonIgnoreProperties, JsonProperty}

@JsonIgnoreProperties(ignoreUnknown = true)
case class
Extra(
       @JsonProperty("errno") @BeanProperty errno: Long,
     )

@JsonIgnoreProperties(ignoreUnknown = true)
case class
Log(
     @JsonProperty("path") @BeanProperty path: String,
     @JsonProperty("method") @BeanProperty method: String,
     @JsonProperty("extra") @BeanProperty extra: Extra
   )

处理程序,获取数据源,处理数据,聚合之后的数据 Sink 至文件

  1. 程序的大体意思通过注释的形式写出来了
  2. Flink 处理数据的过程是:Source -> Process -> Sink,可以分为三个步骤
  3. Source 和 Sink 可以是不同的,支持文件系统、数据库、Redis、ElasticSearch 等等
  4. 透漏一点,后面讲到 Flink SQL 你就会发现其实几条 SQL 就可以解决我们的需求了。
package com.work.file

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
import org.apache.flink.streaming.api.scala._


object Error {

  def main(args: Array[String]): Unit = {
    // 获取命令行参数
    val params = ParameterTool.fromArgs(args)
    // 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    // 将参数添加到 flink 的全局配置中
    env.getConfig.setGlobalJobParameters(params)
    // 获取输入源
    val text = env.readTextFile(params.get("input"))

    // 定义一个 JSON 解析的 mapper
    val mapper = new ObjectMapper
    // 按行切分日志
    val split = text.flatMap {
      _.split("\n")
    }
    val counter = split
      .map {
        value => {
          try {
            // 解析日志,截取日志是因为日志面前还有一些时间,非标准 JSON 结构
            val node = mapper.readValue(value.substring(value.indexOf('{')), classOf[Log])
            Some((node.method, node.path, node.extra.errno), 1)
          } catch {
            // 解析日志可能会出现问题,需要我们捕获异常,返回一个 Opiton 类型
            case _: Exception => None
          }
        }
      }
      .filter(x => x.nonEmpty)    // 过滤为 None 的集
      .map(x => x.get)            // 提取 option 中的数据
      .groupBy(0)    // 对第一个列分组 (node.method, node.path, node.extra.errno)
      .sum(1)                     // 并求和,累加到第二列

    // 数据 Sink 到文件中
    counter.writeAsCsv(params.get("output"))
    // 执行 Job
    env.execute("api_error_stat")
  }
}
  1. 将程序打 Jar 包
  2. 提交 Jar 包给 Flink,Flink 会在后台运行 Job,直到数据处理完毕
  3. 如果中途需要结束程序,可以通过 cancel 参数来中断 Job
# 生成Jar包
mvn clean scala:compile compile package

# 提交Job
flink run /mnt/flink-test/out/artifacts/api_error_stat.jar  \
  --input /tmp/request-2020110205 \
  --output /tmp/a.csv

# 此时会生成 Job ID:847f41fd98ae052485f9e26648918491,每个job都不一样
# 执行如下命令可以取消正在执行的Job
flink cancel 847f41fd98ae052485f9e26648918491

总结

  1. 了解 Flink 的工作原理,及流程。数据处理阶段其实可以简单的想成数据集合的聚合运算等
  2. 需要了解一下 mvn 的工程管理,将程序打包
  3. 将输入源解析成我们想要的数据结构,这篇文章需要我们了解 JSON 的解码
  4. 程序还有很大的优化空间,如果性能调优,分区等,会在后面的章节中一一介绍
  5. 多思考,多看文档,多实践

推荐阅读: