flink批處理每天執行一次,flink讀取不到文件_flink批處理從0到1

 2023-12-06 阅读 23 评论 0

摘要:一、DataSet API之Data Sources(消費者之數據源)介紹:flink提供了大量的已經實現好的source方法,你也可以自定義source 通過實現sourceFunction接口來自定義無并行度的source, 或者你也可以通過實現ParallelSourceFunction 接口 or 繼承RichParallelSou

一、DataSet API之Data Sources(消費者之數據源)

介紹:

flink提供了大量的已經實現好的source方法,你也可以自定義source 通過實現sourceFunction接口來自定義無并行度的source, 或者你也可以通過實現ParallelSourceFunction 接口 or 繼承RichParallelSourceFunction 來自定義有并行度的source。

類型:
基于文件

flink批處理每天執行一次、readTextFile(path) 讀取文本文件,文件遵循TextInputFormat 讀取規則,逐行讀取并返回。

基于集合

fromCollection(Collection) 通過java 的collection集合創建一個數據流,集合中的所有元素必須是相同類型的。

代碼實現:
1、fromCollection
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object StreamingFromCollectionScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉換import org.apache.flink.api.scala._
val data = List(10,15,20)
val text = env.fromCollection(data)//針對map接收到的數據執行加1的操作
val num = text.map(_+1)
num.print().setParallelism(1)
env.execute("StreamingFromCollectionScala")
}
}package xuwei.tech.batch;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/**
*/public class BatchWordCountJava {
public static void main(String[] args) throws Exception{
val data = List(10,15,20)
String outPath = "D:\\data\\result";//獲取運行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//獲取文件中的內容
val text = env.fromCollection(data)
DataSet> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);
counts.writeAsCsv(outPath,"\n"," ").setParallelism(1);
env.execute("batch word count");
}public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{
public void flatMap(String value, Collector> out) throws Exception {
String[] tokens = value.toLowerCase().split("\\W+");for (String token: tokens) {
if(token.length()>0){
out.collect(new Tuple2(token,1));
}
}
}
}
}
2、readTextFile
package xuwei.tech.batch;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/**
* Created by xuwei.tech on 2018/10/8.
*/public class BatchWordCountJava {
public static void main(String[] args) throws Exception{
String inputPath = "D:\\data\\file";String outPath = "D:\\data\\result";//獲取運行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//獲取文件中的內容
DataSource<String> text = env.readTextFile(inputPath);
DataSet> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);
counts.writeAsCsv(outPath,"\n"," ").setParallelism(1);
env.execute("batch word count");
}public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{
public void flatMap(String value, Collector> out) throws Exception {
String[] tokens = value.toLowerCase().split("\\W+");for (String token: tokens) {
if(token.length()>0){
out.collect(new Tuple2(token,1));
}
}
}
}
}

二、DataSet API之Transformations

介紹:

  1. Map:輸入一個元素,然后返回一個元素,中間可以做一些清洗轉換等操作

  2. FlatMap:輸入一個元素,可以返回零個,一個或者多個元素

  3. MapPartition:類似map,一次處理一個分區的數據【如果在進行map處理的時候需要獲取第三方資源鏈接,建議使用MapPartition】

  4. Filter:過濾函數,對傳入的數據進行判斷,符合條件的數據會被留下

  5. Reduce:對數據進行聚合操作,結合當前元素和上一次reduce返回的值進行聚合操作,然后返回一個新的值

  6. Aggregate:sum、max、min等

  7. Distinct:返回一個數據集中去重之后的元素,data.distinct()

  8. Join:內連接

  9. OuterJoin:外鏈接

  10. Cross:獲取兩個數據集的笛卡爾積

  11. Union:返回兩個數據集的總和,數據類型需要一致

  12. First-n:獲取集合中的前N個元素

  13. Sort Partition:在本地對數據集的所有分區進行排序,通過sortPartition()的鏈接調用來完成對多個字段的排序

代碼實現:
1、broadcast(廣播變量)
package xuwei.tech.batch.batchAPI;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;/**
* broadcast廣播變量
*
*
*
* 需求:
* flink會從數據源中獲取到用戶的姓名
*
* 最終需要把用戶的姓名和年齡信息打印出來
*
* 分析:
* 所以就需要在中間的map處理的時候獲取用戶的年齡信息
*
* 建議吧用戶的關系數據集使用廣播變量進行處理
*
*
*
*
* 注意:如果多個算子需要使用同一份數據集,那么需要在對應的多個算子后面分別注冊廣播變量
*/public class BatchDemoBroadcast {
public static void main(String[] args) throws Exception{//獲取運行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//1:準備需要廣播的數據
ArrayListString, Integer>> broadData = new ArrayList<>();
broadData.add(new Tuple2<>("zs",18));
broadData.add(new Tuple2<>("ls",20));
broadData.add(new Tuple2<>("ww",17));
DataSetString, Integer>> tupleData = env.fromCollection(broadData);//1.1:處理需要廣播的數據,把數據集轉換成map類型,map中的key就是用戶姓名,value就是用戶年齡
DataSet<HashMap> toBroadcast = tupleData.map(new MapFunction, HashMap>() {@Override
public HashMap map(Tuple2 value) throws Exception {
HashMap res = new HashMap<>();
res.put(value.f0, value.f1);
return res;
}
});//源數據
DataSource<String> data = env.fromElements("zs", "ls", "ww");//注意:在這里需要使用到RichMapFunction獲取廣播變量
DataSet<String> result = data.map(new RichMapFunction() {
List> broadCastMap = new ArrayList>();
HashMap allMap = new HashMap();/**
* 這個方法只會執行一次
* 可以在這里實現一些初始化的功能
*
* 所以,就可以在open方法中獲取廣播變量數據
*
*/@Overridepublic void open(Configuration parameters) throws Exception {
super.open(parameters);//3:獲取廣播數據this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName");for (HashMap map : broadCastMap) {
allMap.putAll(map);
}
}@Override
public String map(String value) throws Exception {
Integer age = allMap.get(value);
return value + "," + age;
}
}).withBroadcastSet(toBroadcast, "broadCastMapName");//2:執行廣播數據的操作
result.print();
}
}
2、IntCounter(累加器)
package xuwei.tech.batch.batchAPI;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;/**
* 全局累加器
*
* counter 計數器
*
* 需求:
* 計算map函數中處理了多少數據
*
*
* 注意:只有在任務執行結束后,才能獲取到累加器的值
*
*
*
* Created by xuwei.tech on 2018/10/8.
*/public class BatchDemoCounter {
public static void main(String[] args) throws Exception{//獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSource<String> data = env.fromElements("a", "b", "c", "d");
DataSet<String> result = data.map(new RichMapFunction<String, String>() {//1:創建累加器private IntCounter numLines = new IntCounter();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);//2:注冊累加器
getRuntimeContext().addAccumulator("num-lines",this.numLines);
}//int sum = 0;
@Overridepublic String map(String value) throws Exception {//如果并行度為1,使用普通的累加求和即可,但是設置多個并行度,則普通的累加求和結果就不準了
//sum++;
//System.out.println("sum:"+sum);this.numLines.add(1);
return value;
}
}).setParallelism(8);//result.print();
result.writeAsText("d:\\data\\count10");
JobExecutionResult jobResult = env.execute("counter");//3:獲取累加器int num = jobResult.getAccumulatorResult("num-lines");
System.out.println("num:"+num);
}
}
3、cross(獲取笛卡爾積)
package xuwei.tech.batch.batchAPI;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.CrossOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.ArrayList;/**
* 獲取笛卡爾積
*
* Created by xuwei.tech on 2018/10/8.
*/public class BatchDemoCross {
public static void main(String[] args) throws Exception{//獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//tuple2ArrayList<String> data1 = new ArrayList<>();
data1.add("zs");
data1.add("ww");//tuple2ArrayList<Integer> data2 = new ArrayList<>();
data2.add(1);
data2.add(2);DataSource<String> text1 = env.fromCollection(data1);DataSource<Integer> text2 = env.fromCollection(data2);CrossOperator.DefaultCross<String, Integer> cross = text1.cross(text2);
cross.print();
}
}
4、registerCachedFile(Distributed Cache)
package xuwei.tech.batch.batchAPI;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;/**
* Distributed Cache
*/public class BatchDemoDisCache {
public static void main(String[] args) throws Exception{//獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//1:注冊一個文件,可以使用hdfs或者s3上的文件
env.registerCachedFile("d:\\data\\file\\a.txt","a.txt");
DataSource<String> data = env.fromElements("a", "b", "c", "d");
DataSet<String> result = data.map(new RichMapFunction<String, String>() {
private ArrayList<String> dataList = new ArrayList<String>();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);//2:使用文件File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");
List<String> lines = FileUtils.readLines(myFile);
for (String line : lines) {
this.dataList.add(line);System.out.println("line:" + line);
}
}
@Override
public String map(String value) throws Exception {//在這里就可以使用dataListreturn value;
}
});
result.print();
}
}
5、distinct
package xuwei.tech.batch.batchAPI;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Iterator;
public class BatchDemoDistinct {
public static void main(String[] args) throws Exception{//獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();ArrayList<String> data = new ArrayList<>();
data.add("hello you");
data.add("hello me");
DataSource<String> text = env.fromCollection(data);FlatMapOperator<String, String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.toLowerCase().split("\\W+");for (String word : split) {System.out.println("單詞:"+word);
out.collect(word);
}
}
});
flatMapData.distinct()// 對數據進行整體去重
.print();
}
}
6、排序(first)
package xuwei.tech.batch.batchAPI;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.ArrayList;/**
* 獲取集合中的前N個元素
* Created by xuwei.tech on 2018/10/8.
*/public class BatchDemoFirstN {
public static void main(String[] args) throws Exception{//獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
data.add(new Tuple2<>(2,"zs"));
data.add(new Tuple2<>(4,"ls"));
data.add(new Tuple2<>(3,"ww"));
data.add(new Tuple2<>(1,"xw"));
data.add(new Tuple2<>(1,"aw"));
data.add(new Tuple2<>(1,"mw"));
DataSourceString>> text = env.fromCollection(data);//獲取前3條數據,按照數據插入的順序text.first(3).print();System.out.println("==============================");//根據數據中的第一列進行分組,獲取每組的前2個元素text.groupBy(0).first(2).print();System.out.println("==============================");//根據數據中的第一列分組,再根據第二列進行組內排序[升序],獲取每組的前2個元素text.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print();System.out.println("==============================");//不分組,全局排序獲取集合中的前3個元素,針對第一個元素升序,第二個元素倒序text.sortPartition(0,Order.ASCENDING).sortPartition(1,Order.DESCENDING).first(3).print();
}
}
7、join
package xuwei.tech.batch.batchAPI;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
public class BatchDemoJoin {
public static void main(String[] args) throws Exception{//獲取運行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//tuple2
ArrayListString>> data1 = new ArrayList<>();
data1.add(new Tuple2<>(1,"zs"));
data1.add(new Tuple2<>(2,"ls"));
data1.add(new Tuple2<>(3,"ww"));//tuple2
ArrayListString>> data2 = new ArrayList<>();
data2.add(new Tuple2<>(1,"beijing"));
data2.add(new Tuple2<>(2,"shanghai"));
data2.add(new Tuple2<>(3,"guangzhou"));
DataSourceString>> text1 = env.fromCollection(data1);
DataSource> text2 = env.fromCollection(data2);
text1.join(text2).where(0)//指定第一個數據集中需要進行比較的元素角標
.equalTo(0)//指定第二個數據集中需要進行比較的元素角標
.with(new JoinFunction, Tuple2, Tuple3>() {@Override
public Tuple3 join(Tuple2 first, Tuple2 second)
throws Exception {
return new Tuple3<>(first.f0,first.f1,second.f1);
}
}).print();
System.out.println("==================================");//注意,這里用map和上面使用的with最終效果是一致的。
/*text1.join(text2).where(0)//指定第一個數據集中需要進行比較的元素角標
.equalTo(0)//指定第二個數據集中需要進行比較的元素角標
.map(new MapFunction,Tuple2>, Tuple3>() {
@Override
public Tuple3 map(Tuple2, Tuple2> value) throws Exception {
return new Tuple3<>(value.f0.f0,value.f0.f1,value.f1.f1);
}
}).print();*/
}
}
8、outerJoin
package xuwei.tech.batch.batchAPI;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.ArrayList;/**
* 外連接
*
* 左外連接
* 右外連接
* 全外連接
*/public class BatchDemoOuterJoin {
public static void main(String[] args) throws Exception{//獲取運行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//tuple2
ArrayListString>> data1 = new ArrayList<>();
data1.add(new Tuple2<>(1,"zs"));
data1.add(new Tuple2<>(2,"ls"));
data1.add(new Tuple2<>(3,"ww"));//tuple2
ArrayListString>> data2 = new ArrayList<>();
data2.add(new Tuple2<>(1,"beijing"));
data2.add(new Tuple2<>(2,"shanghai"));
data2.add(new Tuple2<>(4,"guangzhou"));
DataSourceString>> text1 = env.fromCollection(data1);
DataSource> text2 = env.fromCollection(data2);/**
* 左外連接
*
* 注意:second這個tuple中的元素可能為null
*
*/
text1.leftOuterJoin(text2)
.where(0)
.equalTo(0)
.with(new JoinFunction, Tuple2, Tuple3>() {@Override
public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {
if(second==null){
return new Tuple3<>(first.f0,first.f1,"null");
}else{
return new Tuple3<>(first.f0,first.f1,second.f1);
}
}
}).print();
System.out.println("=============================================================================");/**
* 右外連接
*
* 注意:first這個tuple中的數據可能為null
*
*/
text1.rightOuterJoin(text2)
.where(0)
.equalTo(0)
.with(new JoinFunction, Tuple2, Tuple3>() {@Override
public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {
if(first==null){
return new Tuple3<>(second.f0,"null",second.f1);
}
return new Tuple3<>(first.f0,first.f1,second.f1);
}
}).print();
System.out.println("=============================================================================");/**
* 全外連接
*
* 注意:first和second這兩個tuple都有可能為null
*
*/
text1.fullOuterJoin(text2)
.where(0)
.equalTo(0)
.with(new JoinFunction, Tuple2, Tuple3>() {@Override
public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {
if(first==null){
return new Tuple3<>(second.f0,"null",second.f1);
}else if(second == null){
return new Tuple3<>(first.f0,first.f1,"null");
}else{
return new Tuple3<>(first.f0,first.f1,second.f1);
}
}
}).print();
}
}
9、union
package xuwei.tech.batch.batchAPI;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.UnionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.ArrayList;/**
* Created by xuwei.tech on 2018/10/8.
*/public class BatchDemoUnion {public static void main(String[] args) throws Exception{//獲取運行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayListString>> data1 = new ArrayList<>();
data1.add(new Tuple2<>(1,"zs"));
data1.add(new Tuple2<>(2,"ls"));
data1.add(new Tuple2<>(3,"ww"));
ArrayListString>> data2 = new ArrayList<>();
data2.add(new Tuple2<>(1,"lili"));
data2.add(new Tuple2<>(2,"jack"));
data2.add(new Tuple2<>(3,"jessic"));
DataSourceString>> text1 = env.fromCollection(data1);
DataSource> text2 = env.fromCollection(data2);
UnionOperator> union = text1.union(text2);
union.print();
}
}

三、DataStream API之partition

介紹:
  1. Rebalance:對數據集進行再平衡,重分區,消除數據傾斜

  2. Hash-Partition:根據指定key的哈希值對數據集進行分區

  3. partitionByHash()

  4. Range-Partition:根據指定的key對數據集進行范圍分區

  5. .partitionByRange()

  6. Custom Partitioning:自定義分區規則

  7. 自定義分區需要實現Partitioner接口

  8. partitionCustom(partitioner, "someKey")

  9. 或者partitionCustom(partitioner, 0)

代碼實現:
1、partitionByRange或partitionByHash
package xuwei.tech.batch.batchAPI;

import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Iterator;/**
* Hash-Partition
*
* Range-Partition
*
*
* Created by xuwei.tech on 2018/10/8.
*/public class BatchDemoHashRangePartition {
public static void main(String[] args) throws Exception{//獲取運行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayListInteger, String>> data = new ArrayList<>();
data.add(new Tuple2<>(1,"hello1"));
data.add(new Tuple2<>(2,"hello2"));
data.add(new Tuple2<>(2,"hello3"));
data.add(new Tuple2<>(3,"hello4"));
data.add(new Tuple2<>(3,"hello5"));
data.add(new Tuple2<>(3,"hello6"));
data.add(new Tuple2<>(4,"hello7"));
data.add(new Tuple2<>(4,"hello8"));
data.add(new Tuple2<>(4,"hello9"));
data.add(new Tuple2<>(4,"hello10"));
data.add(new Tuple2<>(5,"hello11"));
data.add(new Tuple2<>(5,"hello12"));
data.add(new Tuple2<>(5,"hello13"));
data.add(new Tuple2<>(5,"hello14"));
data.add(new Tuple2<>(5,"hello15"));
data.add(new Tuple2<>(6,"hello16"));
data.add(new Tuple2<>(6,"hello17"));
data.add(new Tuple2<>(6,"hello18"));
data.add(new Tuple2<>(6,"hello19"));
data.add(new Tuple2<>(6,"hello20"));
data.add(new Tuple2<>(6,"hello21"));
DataSourceInteger, String>> text = env.fromCollection(data);/*text.partitionByHash(0).mapPartition(new MapPartitionFunction, Tuple2>() {
@Override
public void mapPartition(Iterable> values, Collector> out) throws Exception {
Iterator> it = values.iterator();
while (it.hasNext()){
Tuple2 next = it.next();
System.out.println("當前線程id:"+Thread.currentThread().getId()+","+next);
}
}
}).print();*/text.partitionByRange(0).mapPartition(new MapPartitionFunctionInteger,String>, Tuple2>() {
@Overridepublic void mapPartition(Iterable> values, CollectorInteger, String>> out) throws Exception {
IteratorInteger, String>> it = values.iterator();
while (it.hasNext()){
Tuple2<Integer, String> next = it.next();System.out.println("當前線程id:"+Thread.currentThread().getId()+","+next);
}
}
}).print();
}
}
2、mapPartition
package xuwei.tech.batch.batchAPI;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Iterator;/**
* Created by xuwei.tech on 2018/10/8.
*/public class BatchDemoMapPartition {
public static void main(String[] args) throws Exception{//獲取運行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<String> data = new ArrayList<>();
data.add("hello you");
data.add("hello me");
DataSource<String> text = env.fromCollection(data);/*text.map(new MapFunction() {
@Override
public String map(String value) throws Exception {
//獲取數據庫連接--注意,此時是每過來一條數據就獲取一次鏈接
//處理數據
//關閉連接
return value;
}
});*/
DataSet<String> mapPartitionData = text.mapPartition(new MapPartitionFunction() {@Override
public void mapPartition(Iterable values, Collector out) throws Exception {//獲取數據庫連接--注意,此時是一個分區的數據獲取一次連接【優點,每個分區獲取一次鏈接】
//values中保存了一個分區的數據
//處理數據Iterator<String> it = values.iterator();
while (it.hasNext()) {
String next = it.next();
String[] split = next.split("\\W+");for (String word : split) {
out.collect(word);
}
}//關閉鏈接
}
});
mapPartitionData.print();
}
}

四、DataSet API之Data Sink(數據落地)

介紹:
  1. writeAsText():將元素以字符串形式逐行寫入,這些字符串通過調用每個元素的toString()方法來獲取

  2. writeAsCsv():將元組以逗號分隔寫入文件中,行及字段之間的分隔是可配置的。每個字段的值來自對象的toString()方法

  3. print():打印每個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中

代碼:
1、writeAsCsv
package xuwei.tech.batch;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/**
* Created by xuwei.tech on 2018/10/8.
*/public class BatchWordCountJava {
public static void main(String[] args) throws Exception{
String inputPath = "D:\\data\\file";String outPath = "D:\\data\\result";//獲取運行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//獲取文件中的內容
DataSource<String> text = env.readTextFile(inputPath);
DataSet> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);
counts.writeAsCsv(outPath,"\n"," ").setParallelism(1);
env.execute("batch word count");
}public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{
public void flatMap(String value, Collector> out) throws Exception {
String[] tokens = value.toLowerCase().split("\\W+");for (String token: tokens) {
if(token.length()>0){
out.collect(new Tuple2(token,1));
}
}
}
}
}

致力于大數據,機器算法,人工智能學習,共享于有需要人士,希望共享內容對大家有用,歡迎大家轉發關注。

031a0565dc2f0e2d5297703b7e8d6227.png

flink流處理從0到1

d803dae520e58801db24dde30ca39d27.gifd803dae520e58801db24dde30ca39d27.gif

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/2/188036.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息