Flink1.7.2 Dataset 文件切片计算方式和切片数据读取源码分析

  • 时间:
  • 浏览:37
  • 来源:uu直播快3平台_UU快3直播官方

流定位到开始英文位置

当前切片默认值设置

Source 的操作链(ChainedFlatMapDriver,ChainedMapDriver,SynchronousChainedCombineDriver) 即 FlatMap -> Map -> Combine (SUM(1)),也假如有一天source读到的数据,都能够经过链上的算子操作

计算实际切片的大小,blockSize 此处为文件大小,maxSplitSize 一般都小于blockSize,好多好多 最后取的是切片的最大长度maxSplitSize

对当前切片进行处理 ,调用 DelimitedInputFormat.open(),//open还没开始英文真正的读数据,假如有一天定位,把第有有一一一3个换行符,分到前有有一一一3个分片,自己从第3个换行符开始英文读取数据

第一次,startPos =0 ,count = 0,没读到数据

可能性有换行符,能够删除换行符,在readBuffer

切片拆分的计算法律方法,初使值 bytesUnassigned = len(文件总数据长度),每分一次bytesUnassigned会减去当前切片的大小,也假如有一天bytesUnassigned每次都在还剩下总的数据大小,当bytesUnassigned > maxBytesForLastSplit 就一直循环拆分切片,切片的长度为splitSize(切片大小) = 5, 开始英文位置从0开始英文,就让每个切片开始英文位置都能够加带就让所有切片大小 position += splitSize ;

调置当前分片

end

可能性有换行符,能够删除换行符,在readBuffer

对当前切片进行处理 ,调用 DelimitedInputFormat.open(),//open还没开始英文真正的读数据,假如有一天定位,把第有有一一一3个换行符,分到前有有一一一3个分片,自己从第3个换行符开始英文读取数据

调置当前分片的长度

调置当前分片的开始英文位置

long maxSplitSize = totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0 : 1);

调置当前分片

流定位到开始英文位置

随机读到有有一一一3个切片,给当前DataSourceTask使用,可能性在Source读取数据时是不按key分区,也就不分谁处理,有任务来处理,就给有有一一一3个切片处理就行,每给出有有一一一3个从总的切片中移除

调置当前分片的长度

把JobVertex 转化为ExecutionJobVertex,调用new ExecutionJobVertex(),ExecutionJobVertex中存了inputSplits,好多好多 会根据并行并来计算inputSplits的个数

每个切片最大长度计算,totalLength = 9 为文件总长度,minNumSplits = 2 为并行度,也假如有一天9不到整除并行度2,说明有余数,可能性把余数的数据单独在分配有有一一一3个切片,有可能性这有有一一一3个切片的数据量很少,就浪费资源了,这里的做法是,余数的最大值,也假如有一天每个切片+1,就把这里多的余数分配到前面的每个切片中,也假如有一天每个切片的最大值为 9 / 2 + 1 = 5

读取一行数据,也假如有一天读到第有有一一一3个换行符

当前切片信息

-转志Integer

从缓存区readBuffer复制当前行数据到 wrapBuffer

调用FileInputFormat.createInputSplits(并行度)再实际处理

把jobGraph是由JobVertex组成,调用executionGraph.attachJobGraph(sortedTopology) 把JobGraph转成ExecutionGraph,ExecutionGraph由ExecutionJobVertex组成,即把JobVertex转成ExecutionJobVertex

本示例拆分的结果

当前切片默认值设置

实际计算时,当计算最后有有一一一3个切片时,可能性剩下的数据大小小于 切片大小的1.1倍,就插进有有一一一3个切片中,没有了切分了,直接把剩下的数据插进最后有有一一一3个切片中,可能性可能性切过后,愿因最后一切片数据量很小,浪费资源

首先遍历路径是文件或目录,计算出所有文件插进List files = new ArrayList<>()中存储,计算出所有文件总大小totalLength,计算文件切片,当然是所有文件总大小来计算

Source 的操作链(ChainedFlatMapDriver,ChainedMapDriver,SynchronousChainedCombineDriver) 即 FlatMap -> Map -> Combine (SUM(1)),也假如有一天source读到的数据,都能够经过链上的算子操作

调置当前分片的开始英文位置

当前切片信息

随机读到有有一一一3个切片,给当前DataSourceTask使用,可能性在Source读取数据时是不按key分区,也就不分谁处理,有任务来处理,就给有有一一一3个切片处理就行,每给出有有一一一3个从总的切片中移除

从缓存区readBuffer复制当前行数据到 wrapBuffer





可能性while循环拆分切片是有条件的,bytesUnassigned > maxBytesForLastSplit,那可能性bytesUnassigned <= maxBytesForLastSplit,就能够把剩下的数据,都插进最后有有一一一3个切片中