Flink为了能够处理有边界的数据集和无边界的数据集,提供了对应的DataSet API和DataStream API。我们可以开发对应的Java程序或者Scala程序来完成相应的功能。下面举例了一些DataSet API中的基本的算子。

下面我们通过具体的代码来为大家演示每个算子的作用。
1、Map、FlatMap与MapPartition
//获取运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();ArrayList<String> data = new ArrayList<String>();data.add("I love Beijing");data.add("I love China");data.add("Beijing is the capital of China");DataSource<String> text = env.fromCollection(data);DataSet<List<String>> mapData = text.map(new MapFunction<String, List<String>>() { public List<String> map(String data) throws Exception { String[] words = data.split(" "); //创建一个List List<String> result = new ArrayList<String>(); for(String w:words){ result.add(w); } return result; }});mapData.print();System.out.println("*****************************************");DataSet<String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() { public void flatMap(String data, Collector<String> collection) throws Exception { String[] words = data.split(" "); for(String w:words){ collection.collect(w); } }});flatMapData.print();System.out.println("*****************************************");/* new MapPartitionFunction<String, String> 第一个String:表示分区中的数据元素类型 第二个String:表示处理后的数据元素类型*/DataSet<String> mapPartitionData = text.mapPartition(new MapPartitionFunction<String, String>() { public void mapPartition(Iterable<String> values, Collector<String> out) throws Exception { //针对分区进行操作的好处是:比如要进行数据库的操作,一个分区只需要创建一个Connection //values中保存了一个分区的数据 Iterator<String> it = values.iterator(); while (it.hasNext()) { String next = it.next(); String[] split = next.split(" "); for (String word : split) { out.collect(word); } } //关闭链接 }});mapPartitionData.print();
2、Filter与Distinct
//获取运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();ArrayList<String> data = new ArrayList<String>();data.add("I love Beijing");data.add("I love China");data.add("Beijing is the capital of China");DataSource<String> text = env.fromCollection(data);DataSet<String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() { public void flatMap(String data, Collector<String> collection) throws Exception { String[] words = data.split(" "); for(String w:words){ collection.collect(w); } }});//去掉重复的单词flatMapData.distinct().print();System.out.println("*********************");//选出长度大于3的单词flatMapData.filter(new FilterFunction<String>() { public boolean filter(String word) throws Exception { int length = word.length(); return length>3?true:false; }}).print();
3、Join操作
//获取运行的环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//创建第一张表:用户ID 姓名ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();data1.add(new Tuple2(1,"Tom"));data1.add(new Tuple2(2,"Mike"));data1.add(new Tuple2(3,"Mary"));data1.add(new Tuple2(4,"Jone"));//创建第二张表:用户ID 所在的城市ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();data2.add(new Tuple2(1,"北京"));data2.add(new Tuple2(2,"上海"));data2.add(new Tuple2(3,"广州"));data2.add(new Tuple2(4,"重庆"));//实现join的多表查询:用户ID 姓名 所在的程序DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);table1.join(table2).where(0).equalTo(0)/*第一个Tuple2<Integer,String>:表示第一张表 * 第二个Tuple2<Integer,String>:表示第二张表 * Tuple3<Integer,String, String>:多表join连接查询后的返回结果 */ .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String, String>>() { public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1, Tuple2<Integer, String> table2) throws Exception { return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1); } }).print();
4、笛卡尔积
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//创建第一张表:用户ID 姓名ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();data1.add(new Tuple2(1,"Tom"));data1.add(new Tuple2(2,"Mike"));data1.add(new Tuple2(3,"Mary"));data1.add(new Tuple2(4,"Jone"));//创建第二张表:用户ID 所在的城市ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();data2.add(new Tuple2(1,"北京"));data2.add(new Tuple2(2,"上海"));data2.add(new Tuple2(3,"广州"));data2.add(new Tuple2(4,"重庆"));//实现join的多表查询:用户ID 姓名 所在的程序DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);//生成笛卡尔积table1.cross(table2).print();
5、First-N
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//这里的数据是:员工姓名、薪水、部门号DataSet<Tuple3<String, Integer,Integer>> grade = env.fromElements(new Tuple3<String, Integer,Integer>("Tom",1000,10), new Tuple3<String, Integer,Integer>("Mary",1500,20), new Tuple3<String, Integer,Integer>("Mike",1200,30), new Tuple3<String, Integer,Integer>("Jerry",2000,10));//按照插入顺序取前三条记录grade.first(3).print();System.out.println("**********************");//先按照部门号排序,在按照薪水排序grade.sortPartition(2, Order.ASCENDING).sortPartition(1, Order.ASCENDING).print();System.out.println("**********************");//按照部门号分组,求每组的第一条记录grade.groupBy(2).first(1).print();
6、外链接操作
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//创建第一张表:用户ID 姓名ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();data1.add(new Tuple2(1,"Tom"));data1.add(new Tuple2(3,"Mary"));data1.add(new Tuple2(4,"Jone"));//创建第二张表:用户ID 所在的城市ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();data2.add(new Tuple2(1,"北京"));data2.add(new Tuple2(2,"上海"));data2.add(new Tuple2(4,"重庆"));//实现join的多表查询:用户ID 姓名 所在的程序DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);//左外连接table1.leftOuterJoin(table2).where(0).equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1, Tuple2<Integer, String> table2) throws Exception { // 左外连接表示等号左边的信息会被包含 if(table2 == null){ return new Tuple3<Integer, String, String>(table1.f0,table1.f1,null); }else{ return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1); } } }).print();System.out.println("***********************************");//右外连接table1.rightOuterJoin(table2).where(0).equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1, Tuple2<Integer, String> table2) throws Exception { //右外链接表示等号右边的表的信息会被包含 if(table1 == null){ return new Tuple3<Integer, String, String>(table2.f0,null,table2.f1); }else{ return new Tuple3<Integer, String, String>(table2.f0,table1.f1,table2.f1); } } }).print();System.out.println("***********************************");//全外连接table1.fullOuterJoin(table2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1, Tuple2<Integer, String> table2) throws Exception { if(table1 == null){ return new Tuple3<Integer, String, String>(table2.f0,null,table2.f1); }else if(table2 == null){ return new Tuple3<Integer, String, String>(table1.f0,table1.f1,null); }else{ return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1); } } }).print();
原文转载:http://www.shaoqun.com/a/480834.html
airwallex:https://www.ikjzd.com/w/1011
马士基集团:https://www.ikjzd.com/w/1296
芒果店长:https://www.ikjzd.com/w/1533
Flink为了能够处理有边界的数据集和无边界的数据集,提供了对应的DataSetAPI和DataStreamAPI。我们可以开发对应的Java程序或者Scala程序来完成相应的功能。下面举例了一些DataSetAPI中的基本的算子。下面我们通过具体的代码来为大家演示每个算子的作用。1、Map、FlatMap与MapPartition//获取运行环境ExecutionEnvironmentenv=E
reddit:https://www.ikjzd.com/w/180
kk馆:https://www.ikjzd.com/w/1713
为什么有些商人去了非洲就不想再回来?:https://www.ikjzd.com/home/111241
东亚、东南亚、中东电商消费者消费习惯:https://www.ikjzd.com/home/11261
口罩卖断货,却要裁员1500人,3M究竟是家什么样的公司?:https://www.ikjzd.com/home/115466
No comments:
Post a Comment