Appearance
Flink 大数据实践之环境准备
前言
这篇文章主要介绍 Flink 的环境准备。Flink 支持用 Scala,Java,Python(PyFlink)等语言来编写 Job 程序。接下来的一些系列实践都会用 Scala 来编写代码。
Flink 环境安装
打开搜索引擎,搜索关键词 Flink Download,进入到Apache Download Mirros
的页面,选择清华源的链接地址下载二进制程序压缩包。
bash
cd /mnt
# 下载程序
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.11.3/flink-1.11.3-bin-scala_2.12.tgz
# 解压程序
tar -zvxf flink-1.11.3-bin-scala_2.12.tgz
将 Flink 添加到环境变量中
bash
# 打开vim,编辑文件
vim ~/.bash_profile
# 追加 PATH 变量,保存退出
export PATH=$PATH:/mnt/flink-1.11.2/bin
# 执行flink就可以在任何目录下执行flink来提交,取消Job程序,更加详细的了解flink程序的参数
root@debian:/mnt/flink-1.11.2# flink
./flink <ACTION> [OPTIONS] [ARGUMENTS]
The following actions are available:
Action "run" compiles and runs a program.
Syntax: run [OPTIONS] <jar-file> <arguments>
"run" action options:
Flink 启动和关闭,提交 Job 程序
使用命令行工具 tree -d
展示 Flink 的目录结构
bash
root@debian:/mnt/flink-1.11.2# tree -d
.
├── bin # 二进制程序目录
├── conf # 配置文件
├── examples # 案例,demo
│ ├── batch # 批处理
│ ├── gelly
│ ├── python
│ │ └── table
│ │ └── batch
│ ├── streaming
│ └── table
├── lib # 依赖库
├── licenses # 许可
├── log # 日志
├── opt
│ └── python
└── plugins # 插件
├── external-resource-gpu
├── metrics-datadog
├── metrics-graphite
├── metrics-influx
├── metrics-jmx
├── metrics-prometheus
├── metrics-slf4j
└── metrics-statsd
初学者暂时不需要考虑性能调优之类的配置(conf 目录),知道如何启动和关闭 Flink,将 Job 程序提交到 Flink 就可以了。
bash
# 启动flink
sh bin/start-cluster.sh
# 关闭flink
sh bin/stop-cluster.sh
# 提交flink程序,一个批处理程序,计算单词的个数
bin/flink run examples/batch/WordCount.jar
# 或者指定输入源和输出文件
bin/flink run examples/batch/WordCount.jar --input /tmp/input.txt --output /tmp/output.txt
Flink WordCount 源码
如下是 Demo 程序 WordCount 的源码,这篇文章就不详细介绍什么是 Flink,怎么实践一个自己的 Flink 程序。
scala
package org.apache.flink.examples.scala.wordcount
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.examples.java.wordcount.util.WordCountData
object WordCount {
def main(args: Array[String]) {
val params: ParameterTool = ParameterTool.fromArgs(args)
// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// make parameters available in the web interface
env.getConfig.setGlobalJobParameters(params)
val text =
if (params.has("input")) {
env.readTextFile(params.get("input"))
} else {
println("Executing WordCount example with default input data set.")
println("Use --input to specify file input.")
env.fromCollection(WordCountData.WORDS)
}
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ")
env.execute("Scala WordCount Example")
} else {
println("Printing result to stdout. Use --output to specify output path.")
counts.print()
}
}
}
Flink Web 页面
程序启动之后,我们可以打开 Flink 的 Web 面板 http://127.0.0.1:8081
。后台可以查看正在运行的 Job,已经完成的 Job,任务管理,后台提交 Job 程序。
总结
- 介绍 Flink 的目录结构,程序启动,关闭,已经如何将自己的 Job 程序提交至 Flink 运行。
- Flink Web Dashboard 的介绍,可视化页面方便我们了解正在运行的 Job,已经完成的 Job(已经完成的列表不是持久化),参数配置。方便我们上上次 Job 程序,提交 Job 程序,取消 Job 程序。
- 简单的介绍了一下提交 WordCount 程序的案例。