无情 @ 2020-04-07 20:15:31 阅读(93)
Flink


一、主要概念


Flink 整个系统主要由两个组件组成,分别为 JobManager 和 TaskManager,Flink 架构也遵循 Master - Slave 架构设计原则,JobManager 为 Master 节点,TaskManager 为 Worker (Slave)节点


JobManager: JobManager 相当于整个集群的 Master 节点,且整个集群有且只有一个活跃的 JobManager ,负责整个集群的任务管理和资源管理


TaskManager: TaskManager 相当于整个集群的 Slave 节点,负责具体的任务执行和对应任务在每个节点上的资源申请和管理。

多个任务和 Task 之间通过 TaskSlot 方式共享系统资源,每个 TaskManager 中通过管理多个 TaskSlot 资源池进行对资源进行有效管理



JobManager 和 TaskManager 之间通过 Actor System 进行通信,获取任务执行的情况并通过 Actor System 将应用的任务执行情况发送给客户端




二、环境准备


三台机器

hadoop001

master (jobManager)

slave (TaskManagerRunner)

hadoop002

master jobManager

slave (TaskManagerRunner)

hadoop003


slave  (TaskManagerRunner)


flink版本1.10:下载 http://mirror.bit.edu.cn/apache/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz  


软件和环境要求


  1、Java 1.8.x或更高版本(3台)

  2、ssh相互免登录

  3、三台安装zookeeper(flink本身自带了zk 跟人建议还是独立安装)



三、配置修改

 

  flink-conf.yaml


 high-availability: zookeeper

 high-availability.zookeeper.quorum: hadoop001:2181,hadoop002:2181,hadoop003:2181

 high-availability.cluster-id: /cluster_one

 high-availability.zookeeper.path.root: /flink


masters

   hadoop001:8081

   hadoop002:8081


slaves

hadoop001

hadoop002

hadoop003


备注:配置好的flink目录同步到另外两台机器


四、启动


在其中一台机器上启动


./bin/start-cluster.sh 


打开web界面 http://hadoop001:8081/



五、提交一个任务




将开发好的jar提交到该任务上吗几个运行



测试代码

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
 * 需求:实时的wordcount
 * 往端口中发送数据,实时的计算数据
 */
public class SocketWordCount {
    public static void main(String[] args) throws Exception {
        //1.定义连接端口
        final int port = 9999;
        //2.创建执行环境对象
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //3.得到套接字对象(指定:主机、端口、分隔符)
        DataStreamSource<String> text = env.socketTextStream("hadoop001", port, "\n");
        //4.解析数据,统计数据-单词计数 hello lz hello world
        DataStream<WordWithCount> windowCounts = text.flatMap((FlatMapFunction<String, WordWithCount>) (s, collector) -> {
            //按照空白符进行切割
            for (String word : s.split("\\s")) {
                //<单词,1>
                collector.collect(new WordWithCount(word, 1L));
            }
        })
                //按照key进行分组
                .keyBy("word")
                //设置窗口的时间长度 5秒一次窗口 1秒计算一次
                .timeWindow(Time.seconds(3))
                //聚合,聚合函数
                .reduce((ReduceFunction<WordWithCount>) (a, b) -> {
                    //按照key聚合
                    return new WordWithCount(a.word, a.count + b.count);
                });
        //5.打印可以设置并发度
        windowCounts.print().setParallelism(1);
        //6.执行程序
        env.execute("Socket window WordCount");
    }
    public static class WordWithCount {
        public String word;
        public long count;
        public WordWithCount() {
        }
        public WordWithCount(String word, long count){
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString(){
            return word + " : " + count;
        }
    }
}

hadoop001上执行

nc -lk -p 9999



下载地址: http://mirror.bit.edu.cn/apache/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz