Flink安装

操作系统AlmaLinux9.1。

Flink的运行一般分为三种模式,即local、Standalone和On Yarn。因Standalone HA和on yarn模式都依赖Hadoop,所以本次只安装local和standalone。

安装java

为了运行Flink,只需提前安装好 Java 11

1
2
yum install java-11-openjdk-devel
java -version

Local

对于 Local 模式来说,JobManager 和 TaskManager 会公用一个 JVM 来完成 Workload。如果要验证一个简单的应用,Local 模式是最方便的。实际应用中大多使用 Standalone 或者 Yarn Cluster。

1
2
3
4
5
6
7
8
9
10
11
12
wget http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.15.3/flink-1.15.3-bin-scala_2.12.tgz
tar -xzvf flink-1.15.3-bin-scala_2.12.tgz -C /usr/local/
ln -s /usr/local/flink-1.15.3/ /usr/local/flink
sed -i '/^rest.bind-address: /crest.bind-address: 0.0.0.0' /usr/local/flink/conf/flink-conf.yaml
cd /usr/local/flink/bin
./start-cluster.sh # 启动服务

firewall-cmd --permanent --add-port=8081/tcp
systemctl reload firewalld

# 停止集群命令
./bin/stop-cluster.sh

通过浏览器访问http://192.168.146.129:8081 访问 Flink Dashboard,dashboard没有账号密码,如果要设置账号密码需要借助nginx或者 apache httpd

运行 demo

1
2
3
/usr/local/flink/bin/flink run  /usr/local/flink/examples/batch/WordCount.jar
# 指定输入输出
/usr/local/flink/bin/flink run /usr/local/flink/examples/batch/WordCount.jar --input input.txt --output out.txt

执行完成在 Dashboard 就可以看到相应的已经完成的Jobs。

Standalone

Standalone模式顾名思义,是在本地集群上调度执行,不依赖于外部调度机制例如YARN。

服务器规划如下,三台机器分别安装好jdk

服务器节点 用途
node129 JobManager,TaskManager
node130 TaskManager
node131 TaskManager

在 node129 上进行如下安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
wget http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.15.3/flink-1.15.3-bin-scala_2.12.tgz
tar -xzvf flink-1.15.3-bin-scala_2.12.tgz -C /usr/local/
ln -s /usr/local/flink-1.15.3/ /usr/local/flink

# 修改masters配置文件,该文件用于指定主节点及其web端口,表示集群的JobManager
echo 'node129:8081' > /usr/local/flink/conf/masters
# 修改slaves文件,用于指定从节点,表示集群中的TaskManager
echo -e 'node129\nnode130\nnode131' > /usr/local/flink/conf/workers

# 可以查看修改后的文件内容,去除注释和空行
sed '/^#/d;/^$/d' /usr/local/flink/conf/flink-conf.yaml

tee > /usr/local/flink/conf/flink-conf.yaml <<EOF
jobmanager.rpc.address: node129
jobmanager.rpc.port: 6123
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600m
taskmanager.bind-host: 0.0.0.0
taskmanager.host: 192.168.146.129 # 所有worker节点按照实际IP地址修改
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 2
parallelism.default: 1
jobmanager.execution.failover-strategy: region
rest.address: 0.0.0.0
rest.bind-address: 0.0.0.0
EOF

# 分发flink程序
scp -r /usr/local/flink-1.16.0 root@node130:/usr/local
scp -r /usr/local/flink-1.16.0 root@node131:/usr/local

# 添加防火墙
firewall-cmd --permanent --add-port=8081/tcp
firewall-cmd --permanent --add-port=6123/tcp
systemctl reload firewalld

/usr/local/flink/bin/start-cluster.sh # 启动服务

命令执行后会要求输入node129、node130和node131节点的密码,输入后自动启动三台服务器的TaskManager。启动后服务器进程如下

1
2
3
4
5
6
7
8
9
10
11
12
13
jps
# node129
# 14643 Jps
# 14548 TaskManagerRunner
# 13773 StandaloneSessionClusterEntrypoint

# node130
# 7457 TaskManagerRunner
# 7529 Jps

# node131
# 5684 TaskManagerRunner
# 5755 Jps

测试

1
/usr/local/flink/bin/flink run  /usr/local/flink/examples/batch/WordCount.jar

通过http://192.168.146.129:8081访问web界面如下,从图中可以看到有3个TaskManager,一共6个Slot 。

至此,standalone模式已成功安装。

这里只是集群模式而已,在实际场景中,我们一般需要配置为HA,防止Jobmanager突然挂掉,导致整个集群或者任务执行失败的情况发生。