前言

这篇文章主要介绍 Flink 的环境准备。Flink 支持用 Scala,Java,Python(PyFlink)等语言来编写 Job 程序。接下来的一些系列实践都会用 Scala 来编写代码。

打开搜索引擎,搜索关键词 Flink Download,进入到Apache Download Mirros的页面,选择清华源的链接地址下载二进制程序压缩包。

cd /mnt
# 下载程序
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.11.3/flink-1.11.3-bin-scala_2.12.tgz
# 解压程序
tar -zvxf flink-1.11.3-bin-scala_2.12.tgz
# 打开vim,编辑文件
vim ~/.bash_profile
# 追加 PATH 变量,保存退出
export PATH=$PATH:/mnt/flink-1.11.2/bin

# 执行flink就可以在任何目录下执行flink来提交,取消Job程序,更加详细的了解flink程序的参数
root@debian:/mnt/flink-1.11.2# flink
./flink <ACTION> [OPTIONS] [ARGUMENTS]

The following actions are available:

Action "run" compiles and runs a program.

  Syntax: run [OPTIONS] <jar-file> <arguments>
  "run" action options:

使用命令行工具 tree -d 展示 Flink 的目录结构

root@debian:/mnt/flink-1.11.2# tree -d
.
├── bin         # 二进制程序目录
├── conf        # 配置文件
├── examples    # 案例,demo   ├── batch   # 批处理   ├── gelly
│   ├── python
│      └── table
│          └── batch
│   ├── streaming
│   └── table
├── lib         # 依赖库
├── licenses    # 许可
├── log         # 日志
├── opt
│   └── python
└── plugins     # 插件
    ├── external-resource-gpu
    ├── metrics-datadog
    ├── metrics-graphite
    ├── metrics-influx
    ├── metrics-jmx
    ├── metrics-prometheus
    ├── metrics-slf4j
    └── metrics-statsd

初学者暂时不需要考虑性能调优之类的配置(conf 目录),知道如何启动和关闭 Flink,将 Job 程序提交到 Flink 就可以了。

# 启动flink
sh bin/start-cluster.sh 
# 关闭flink
sh bin/stop-cluster.sh
# 提交flink程序,一个批处理程序,计算单词的个数
bin/flink run examples/batch/WordCount.jar
# 或者指定输入源和输出文件
bin/flink run examples/batch/WordCount.jar --input /tmp/input.txt --output /tmp/output.txt

如下是 Demo 程序 WordCount 的源码,这篇文章就不详细介绍什么是 Flink,怎么实践一个自己的 Flink 程序。

package org.apache.flink.examples.scala.wordcount

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.examples.java.wordcount.util.WordCountData

object WordCount {

  def main(args: Array[String]) {

    val params: ParameterTool = ParameterTool.fromArgs(args)

    // set up execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment

    // make parameters available in the web interface
    env.getConfig.setGlobalJobParameters(params)
    val text =
      if (params.has("input")) {
        env.readTextFile(params.get("input"))
      } else {
        println("Executing WordCount example with default input data set.")
        println("Use --input to specify file input.")
        env.fromCollection(WordCountData.WORDS)
      }

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    if (params.has("output")) {
      counts.writeAsCsv(params.get("output"), "\n", " ")
      env.execute("Scala WordCount Example")
    } else {
      println("Printing result to stdout. Use --output to specify output path.")
      counts.print()
    }

  }
}

程序启动之后,我们可以打开 Flink 的 Web 面板 http://127.0.0.1:8081。后台可以查看正在运行的 Job,已经完成的 Job,任务管理,后台提交 Job 程序。

总结

  1. 介绍 Flink 的目录结构,程序启动,关闭,已经如何将自己的 Job 程序提交至 Flink 运行。
  2. Flink Web Dashboard 的介绍,可视化页面方便我们了解正在运行的 Job,已经完成的 Job(已经完成的列表不是持久化),参数配置。方便我们上上次 Job 程序,提交 Job 程序,取消 Job 程序。
  3. 简单的介绍了一下提交 WordCount 程序的案例。

推荐阅读: