博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink中的UDF函数类
阅读量:4096 次
发布时间:2019-05-25

本文共 3487 字,大约阅读时间需要 11 分钟。

实现UDF函数----更细粒度的控制流

一、函数类(Function Classes)

Flink暴露了所有的UDF函数的接口(实现方式为接口或者抽象类)。例如MapFunction,FilterFunction,ProcessFunction等等。

举例:实现FilterFunction接口

DataStream
flinkTweets = tweets.filter(new FlinkFilter());public static class FlinkFilter implements FilterFunction
{
@Override public boolean filter(String s) throws Exception {
return s.contains("flink"); }}

还可以将函数实现成匿名类

DataStream
flinkTweets = tweets.filter(new FilterFunction
(){
@Override public boolean filter(String s) throws Exception {
return s.contains("flink"); }});

我们filter的字符串“flink”还可以当作参考传进去

DataStream
tweets = env.readTextFile("INPUT_FILE");DataStream
flinkTweets = tweets.filter(new KeyWordFilter("flink")); public static class KeyWordFilter implements FilterFunction
{
private String keyWord; KeyWordFilter(String keyWord){
this.keyWord = keyWord; } @Override public boolean filter(String s) throws Exception {
return s.contains(this.keyWord); }}

二、匿名函数(Lambda Functions)

DataStream
tweets = env.readTextFile("INPUT_FILE");DataStream
flinkTweets = tweets.filter(tweet -> tweet.contains("flink"));

三、富函数(Rich Functions)

富函数是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能

  • RichMapFunction
  • RichFlatMapFunction
  • RichFilterFunction

Rich Function有一个生命周期的概念。典型的生命周期方法有:

  • open():是Rich Function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用
  • close():是生命周期中的最后一个调用的方法,做一些清理工作
  • getRuntimeContext():提供了函数的RuntimeContext的一些信息,例如函数执行的并行度任务的名字,以及state状态
public class TransformTest5_RichFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从文件中读取数据 DataStream
inputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 转换成SensorReading类型 // java8 中的lamda表达式 DataStream
dataStream = inputStream.map(line -> {
String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); DataStream
> resultStream = dataStream.map(new MyMapper()); resultStream.print(); env.execute(); } // 普通函数类 public static class MyMapper0 implements MapFunction
>{
@Override public Tuple2
map(SensorReading sensorReading) throws Exception { return new Tuple2<>(sensorReading.getId(), sensorReading.getId().length()); } } // 实现自定义富函数类 public static class MyMapper extends RichMapFunction
>{ @Override public Tuple2
map(SensorReading sensorReading) throws Exception { return new Tuple2<>(sensorReading.getId(), getRuntimeContext().getIndexOfThisSubtask()); } @Override public void open(Configuration parameters) throws Exception { // 初始化工作,一般是定义状态、或者建立数据库连接 System.out.println("open"); super.open(parameters); } @Override public void close() throws Exception { // 关闭连接或者清空状态 System.out.println("close"); super.close(); } }}

运行结果:

open和close方法执行 4 次,是因为有 4 个分区(默认),也可通过 env.setParallelism(num) 指定分区个数

openopenopenopen2> (sensor_1,1)4> (sensor_1,3)3> (sensor_4,2)2> (sensor_2,1)3> (sensor_1,2)4> (sensor_1,3)2> (sensor_3,1)closecloseclose1> (sensor_6,0)1> (sensor_7,0)close

转载地址:http://wyxii.baihongyu.com/

你可能感兴趣的文章
Leetcode 834. 树中距离之和 C++
查看>>
【机器学习】机器学习系统SysML 阅读表
查看>>
最小费用最大流 修改的dijkstra + Ford-Fulksonff算法
查看>>
最小费用流 Bellman-Ford与Dijkstra 模板
查看>>
实现高性能纠删码引擎 | 纠删码技术详解(下)
查看>>
scala(1)----windows环境下安装scala以及idea开发环境下配置scala
查看>>
zookeeper(3)---zookeeper API的简单使用(增删改查操作)
查看>>
zookeeper(4)---监听器Watcher
查看>>
zookeeper(2)---shell操作
查看>>
mapReduce(3)---入门示例WordCount
查看>>
hbase(3)---shell操作
查看>>
hbase(1)---概述
查看>>
hbase(5)---API示例
查看>>
SSM-CRUD(1)---环境搭建
查看>>
SSM-CRUD(2)---查询
查看>>
SSM-CRUD (3)---查询功能改造
查看>>
Nginx(2)---安装与启动
查看>>
springBoot(5)---整合servlet、Filter、Listener
查看>>
C++ 模板类型参数
查看>>
C++ 非类型模版参数
查看>>