本文共 3487 字,大约阅读时间需要 11 分钟。
实现UDF函数----更细粒度的控制流
Flink暴露了所有的UDF函数的接口(实现方式为接口或者抽象类)。例如MapFunction,FilterFunction,ProcessFunction等等。
举例:实现FilterFunction接口
DataStreamflinkTweets = tweets.filter(new FlinkFilter());public static class FlinkFilter implements FilterFunction { @Override public boolean filter(String s) throws Exception { return s.contains("flink"); }}
还可以将函数实现成匿名类
DataStreamflinkTweets = tweets.filter(new FilterFunction (){ @Override public boolean filter(String s) throws Exception { return s.contains("flink"); }});
我们filter的字符串“flink”还可以当作参考传进去
DataStreamtweets = env.readTextFile("INPUT_FILE");DataStream flinkTweets = tweets.filter(new KeyWordFilter("flink")); public static class KeyWordFilter implements FilterFunction { private String keyWord; KeyWordFilter(String keyWord){ this.keyWord = keyWord; } @Override public boolean filter(String s) throws Exception { return s.contains(this.keyWord); }}
DataStreamtweets = env.readTextFile("INPUT_FILE");DataStream flinkTweets = tweets.filter(tweet -> tweet.contains("flink"));
富函数是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能
Rich Function有一个生命周期的概念。典型的生命周期方法有:
public class TransformTest5_RichFunction { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从文件中读取数据 DataStreaminputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 转换成SensorReading类型 // java8 中的lamda表达式 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); DataStream > resultStream = dataStream.map(new MyMapper()); resultStream.print(); env.execute(); } // 普通函数类 public static class MyMapper0 implements MapFunction >{ @Override public Tuple2 map(SensorReading sensorReading) throws Exception { return new Tuple2<>(sensorReading.getId(), sensorReading.getId().length()); } } // 实现自定义富函数类 public static class MyMapper extends RichMapFunction >{ @Override public Tuple2 map(SensorReading sensorReading) throws Exception { return new Tuple2<>(sensorReading.getId(), getRuntimeContext().getIndexOfThisSubtask()); } @Override public void open(Configuration parameters) throws Exception { // 初始化工作,一般是定义状态、或者建立数据库连接 System.out.println("open"); super.open(parameters); } @Override public void close() throws Exception { // 关闭连接或者清空状态 System.out.println("close"); super.close(); } }}
运行结果:
open和close方法执行 4 次,是因为有 4 个分区(默认),也可通过env.setParallelism(num)
指定分区个数 openopenopenopen2> (sensor_1,1)4> (sensor_1,3)3> (sensor_4,2)2> (sensor_2,1)3> (sensor_1,2)4> (sensor_1,3)2> (sensor_3,1)closecloseclose1> (sensor_6,0)1> (sensor_7,0)close
转载地址:http://wyxii.baihongyu.com/