无情 @ 2020-04-19 10:40:09 阅读(319)



一、什么是source


     sourceflink面向流计算的数据源头,所有的数据都是通过source进入我们的程序内部,详见下图source所在整个流程位置.

   


二、如何使用一个source


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamdataStream= env.addSource(SourceFunctionfunction);

从上面的代码来看,使用一个source非常的简单,创建一个环境对象env,然后将SourceFunction传入addSource.



三、SourceFunction的源码

     SourceFunction是Flink中所有流数据源的基本接口(源码注释里面有说明)


   1 SourceFunction接口继承关系


     SourceFunction接口继承了Function接口,其实Function接口就是一个空接口(java里面的基本功能是:实现类似多重继承的功能)


Function是一个实现自定义函数的基础接口(The base interface for all user-defined functions) ,既然是所以类的基础类,那么我们的SourceFunction继承当然也需要继承它.



2 SourceFunction接口内部方法




SourceFunction里面有run、cancel接口和一个内部接口SourceContext,  

   2.1 :  run方法的入参数是SourceContext,run方法是数据的接受入口.即对接一个外部数据源然后emit元素形成stream(大部分情况下会通过在该方法里运行一个while循环的形式来产生stream)

   2.2 :  cancel方法:取消一个source. 比较典型的是将run方法里面while(run) 甚至为run=false. 取消一个source,也即将run中的循环emit元素的行为终止


  2.3 : SourceContext是摄取源数据和水位线的的接口.

collect方法不指定时间

collectWithTimestamp 自定义EventTime时间






四、flink内置的Source方法




介绍几个常用source


1RichSourceFunction(抽象类) : 他是一个抽象类继承了AbstractRichFunction ,用于实现可访问上下文信息的并行数据源的基类

2ParallelSourceFunction(接口):   source并行接口


常用的kafkasource就属于并行的


3RichParallelSourceFunction : 继承AbstractRichFunction,并且实现RichSourceFunction (此类具有并行功能).


4SocketTextStreamFunction :数据源是socket \n或者\r作为分隔符






五、自定义source


   SourceFunction.java类里面有一个demo,代码如下



import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
/**
 * @author zhuhuipei
 * @Description:
 * @date 2020-04-18
 * @time 17:21
 */
public class DemoSource implements SourceFunction, CheckpointedFunction {
    private long count = 0L;
    private volatile boolean isRunning = true;
    private transient ListStatecheckPointedCount;
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        this.checkPointedCount.clear();
        this.checkPointedCount.add(count);
        System.out.println("snapshotState>>>>>>>");
    }
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        System.out.println("initializeState>>>>>>>");
        this.checkPointedCount = context
                .getOperatorStateStore()
                .getListState(new ListStateDescriptor<>("count", Long.class));
        if (context.isRestored()) {
            for (Long count : this.checkPointedCount.get()) {
                this.count = count;
            }
        }
    }
    @Override
    public void run(SourceContext ctx) throws Exception {
        while (isRunning && count < 500) {
            Thread.sleep(1000);
            synchronized (ctx.getCheckpointLock()) {
                ctx.collect(count);
                count++;
            }
        }
    }
    @Override
    public void cancel() {
        System.out.println("cancel>>>>>>>");
        isRunning = false;
    }
}


需要实现SourceFunction接口,如果需要checkpoint还需要实现CheckpointedFunction接口