GVKun编程网logo

[源码分析] 从FlatMap用法到Flink的内部实现(flink flatmap原理)

15

以上就是给各位分享[源码分析]从FlatMap用法到Flink的内部实现,其中也会对flinkflatmap原理进行解释,同时本文还将给你拓展AndroidLayoutInflater.inflate

以上就是给各位分享[源码分析] 从FlatMap用法到Flink的内部实现,其中也会对flink flatmap原理进行解释,同时本文还将给你拓展Android LayoutInflater.inflate源码分析、Collection 源码分析(四):LinkedHashMap 源码分析、Flask源码分析二:路由内部实现原理、Flask源码分析(三)Flask的请求上下文等相关知识,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!

本文目录一览:

[源码分析] 从FlatMap用法到Flink的内部实现(flink flatmap原理)

[源码分析] 从FlatMap用法到Flink的内部实现

0x00 摘要

本文将从FlatMap概念和如何使用开始入手,深入到Flink是如何实现FlatMap。希望能让大家对这个概念有更深入的理解。

0x01 Map vs FlatMap

首先我们先从概念入手。

自从响应式编程慢慢壮大以来,这两个单词现在越来越被大家熟悉了。前端能见到它们的身影,后台也能见到;安卓里面有,iOS也有。很多兄弟刚遇到它们时候是懵圈的,搞不清楚之间的区别。下面我就给大家简单讲解下。

map

它把数组流中的每一个值,使用所提供的函数执行一遍,一一对应。得到与元素个数相同的数组流。然后返回这个新数据流。

flatMap

flat是扁平的意思。所以这个操作是:先映射(map),再拍扁(join)。

flatMap输入可能是多个子数组流。所以flatMap先针对 每个子数组流的每个元素进行映射操作。然后进行扁平化处理,最后汇集所有进行扁平化处理的结果集形成一个新的列表(扁平化简而言之就是去除所有的修饰)。

flatMap与map另外一个不一样的地方就是传入的函数在处理完后返回值必须是List。

实例

比如拿到一个文本文件之后,我们是按行读取,按行处理。如果要对每一行的单词数进行计数,那么应该选择Map方法,如果是统计词频,就应该选择flatMap方法。

如果还不清楚,可以看看下面这个例子:

梁山新进一批好马,准备给每个马军头领配置一批。于是得到函数以及头领名单如下:

函数 = ( 头领 => 头领 + 好马 )
五虎将 = List(关胜、林冲、秦明、呼延灼、董平 )
八骠骑 = List(花荣、徐宁、杨志、索超、张清、朱仝、史进、穆弘 )

// Map函数的例子
利用map函数,我们可以得到 五虎将马军

五虎将马军 = 五虎将.map( 头领 => 头领 + 好马 )
结果是 List( 关胜 + 马、林冲 + 马、秦明 + 马、呼延灼 + 马、董平 + 马 )

// flatMap函数的例子
但是为了得到统一的马军,则可以用flatMap:

马军头领 = List(五虎将,八骠骑)
马军 = 马军头领.flatMap( 头领 => 头领 + 好马 ) 

结果就是:List( 关胜 + 马、林冲 + 马、秦明 + 马、呼延灼 + 马、董平 + 马,花荣 + 马、徐宁 + 马、杨志 + 马、索超 + 马、张清 + 马、朱仝 + 马、史进 + 马、穆弘 + 马 )

现在大家应该清楚了吧。接下来看看几个FlatMap的实例。

Scala语言的实现

Scala本身对于List类型就有map和flatMap操作。举例如下:

val names = List("Alice","James","Apple")
val strings = names.map(x => x.toupperCase)
println(strings)
// 输出 List(ALICE,JAMES,APPLE)

val chars = names.flatMap(x=> x.toupperCase())
println(chars)
// 输出 List(A,L,I,C,E,J,A,M,S,P,E)

Flink的例子

以上是scala语言层面的实现。下面我们看看Flink框架是如何使用FlatMap的。

网上常见的一个Flink应用的例子:

//加载数据源
val source = env.fromElements("china is the best country","beijing is the capital of china")

//转化处理数据
val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)

Flink源码中的例子

case class WordWithCount(word: String,count: Long)

val text = env.socketTextStream(host,port,'\n')

val windowCounts = text.flatMap { w => w.split("\\s") }
  .map { w => WordWithCount(w,1) }
  .keyBy("word")
  .timeWindow(Time.seconds(5))
  .sum("count")

windowCounts.print()

上面提到的都是简单的使用,如果有复杂需求,在Flink中,我们可以通过继承FlatMapFunction和RichFlatMapFunction来自定义算子。

函数类FlatMapFunction

对于不涉及到状态的使用,可以直接继承 FlatMapFunction,其定义如下:

@Public
@FunctionalInterface
public interface FlatMapFunction<T,O> extends Function,Serializable {
	void flatMap(T value,Collector<O> out) throws Exception;
}

如何自定义算子呢,这个可以直接看看Flink中的官方例子

// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.
public class Tokenizer implements FlatMapFunction<String,String> {
  @Override
  public void flatMap(String value,Collector<String> out) {
    for (String token : value.split("\\W")) {
      out.collect(token);
    }
  }
}

// [...]
DataSet<String> textLines = // [...]
DataSet<String> words = textLines.flatMap(new Tokenizer());

Rich函数类RichFlatMapFunction

对于涉及到状态的情况,用户可以使用继承 RichFlatMapFunction 类的方式来实现UDF。

RichFlatMapFunction属于Flink的Rich函数类。从名称上来看,这种函数类在普通的函数类上增加了Rich前缀,比如RichMapFunctionRichFlatMapFunctionRichReduceFunction等等。比起普通的函数类,Rich函数类增加了:

  • open()方法:Flink在算子调用前会执行这个方法,可以用来进行一些初始化工作。
  • close()方法:Flink在算子最后一次调用结束后执行这个方法,可以用来释放一些资源。
  • getRuntimeContext方法:获取运行时上下文。每个并行的算子子任务都有一个运行时上下文,上下文记录了这个算子运行过程中的一些信息,包括算子当前的并行度、算子子任务序号、广播数据、累加器、监控数据。最重要的是,我们可以从上下文里获取状态数据

FlatMap对应的RichFlatMapFunction如下:

@Public
public abstract class RichFlatMapFunction<IN,OUT> extends AbstractRichFunction implements FlatMapFunction<IN,OUT> {
	@Override
	public abstract void flatMap(IN value,Collector<OUT> out) throws Exception;
}

其基类 AbstractRichFunction 如下,可以看到主要是和运行时上下文建立了联系,并且有初始化和退出操作

@Public
public abstract class AbstractRichFunction implements RichFunction,Serializable {
  
	private transient RuntimeContext runtimeContext;

	@Override
	public void setRuntimeContext(RuntimeContext t) {
		this.runtimeContext = t;
	}

	@Override
	public RuntimeContext getRuntimeContext() {
			return this.runtimeContext;
	}

	@Override
	public IterationRuntimeContext getIterationRuntimeContext() {
    if (this.runtimeContext instanceof IterationRuntimeContext) {
			return (IterationRuntimeContext) this.runtimeContext;
		} 
	}

	@Override
	public void open(Configuration parameters) throws Exception {}

	@Override
	public void close() throws Exception {}
}

如何最好的使用? 当然还是官方文档和例子最靠谱。

因为涉及到状态,所以如果使用,你必须创建一个 StateDescriptor,才能得到对应的状态句柄。 这保存了状态名称(你可以创建多个状态,并且它们必须具有唯一的名称以便可以引用它们),状态所持有值的类型,并且可能包含用户指定的函数,例如ReduceFunction。 根据不同的状态类型,可以创建ValueStateDescriptorListStateDescriptorReducingStateDescriptorFoldingStateDescriptorMapStateDescriptor

状态通过 RuntimeContext 进行访问,因此只能在 rich functions 中使用。 但是我们也会看到一个例子。RichFunctionRuntimeContext 提供如下方法:

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • AggregatingState getAggregatingState(AggregatingStateDescriptor)
  • FoldingState getFoldingState(FoldingStateDescriptor)
  • MapState getMapState(MapStateDescriptor)

下面是一个 FlatMapFunction 的例子,展示了如何将这些部分组合起来:

class CountwindowAverage extends RichFlatMapFunction[(Long,Long),(Long,Long)] {

  private var sum: ValueState[(Long,Long)] = _

  override def flatMap(input: (Long,out: Collector[(Long,Long)]): Unit = {

    // access the state value
    val tmpCurrentSum = sum.value

    // If it hasn't been used before,it will be null
    val currentSum = if (tmpCurrentSum != null) {
      tmpCurrentSum
    } else {
      (0L,0L)
    }

    // update the count
    val newSum = (currentSum._1 + 1,currentSum._2 + input._2)

    // update the state
    sum.update(newSum)

    // if the count reaches 2,emit the average and clear the state
    if (newSum._1 >= 2) {
      out.collect((input._1,newSum._2 / newSum._1))
      sum.clear()
    }
  }

  override def open(parameters: Configuration): Unit = {
    sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long,Long)]("average",createTypeinformation[(Long,Long)])
    )
  }
}

object ExampleCountwindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.fromCollection(List(
    (1L,3L),(1L,5L),7L),4L),2L)
  )).keyBy(_._1)
    .flatMap(new CountwindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)

  env.execute("ExampleManagedState")
}

这个例子实现了一个简单的计数窗口。 我们把元组的第一个元素当作 key(在示例中都 key 都是 “1”)。 该函数将出现的次数以及总和存储在 “ValueState” 中。 一旦出现次数达到 2,则将平均值发送到下游,并清除状态重新开始。 请注意,我们会为每个不同的 key(元组中第一个元素)保存一个单独的值。

0x03 从Flink源码入手看FlatMap实现

FlatMap从Flink编程模型角度讲属于一个算子,用来对数据流或者数据集进行转换。从框架角度说,FlatMap是怎么实现的呢? 或者说FlatMap是怎么从用户代码转换到Flink运行时呢 ?

1. DataSet

首先说说 DataSet相关这套系统中FlatMap的实现。

请注意,DataSteam对应的那套系统中,operator名字都是带着Stream的,比如StreamOperator。

DataSet

val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1) 这段代码调用的就是DataSet中的API。具体如下:

public abstract class DataSet<T> {
  
	public <R> FlatMapOperator<T,R> flatMap(FlatMapFunction<T,R> flatMapper) {
    
		String callLocation = Utils.getCallLocationName();
    
		Typeinformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper,getType(),callLocation,true);
		return new FlatMapOperator<>(this,resultType,clean(flatMapper),callLocation);
	}
}

FlatMapOperator

可以看出,flatMap @ DataSet 主要就是生成了一个FlatMapOperator,这个可以理解为是逻辑算子。其定义如下:

public class FlatMapOperator<IN,OUT> extends SingleInputUdfOperator<IN,OUT,FlatMapOperator<IN,OUT>> {

	protected final FlatMapFunction<IN,OUT> function;
	protected final String defaultName;

	public FlatMapOperator(DataSet<IN> input,Typeinformation<OUT> resultType,FlatMapFunction<IN,OUT> function,String defaultName) {
		super(input,resultType);
		this.function = function;
		this.defaultName = defaultName;
	}

	@Override
	protected FlatMapFunction<IN,OUT> getFunction() {
		return function;
	}

  // 这个translatetoDataFlow就是生成计划(Plan)的关键代码
	@Override
	protected FlatMapOperatorBase<IN,OUT>> translatetoDataFlow(Operator<IN> input) {
		String name = getName() != null ? getName() : "FlatMap at " + defaultName;
		// create operator
		FlatMapOperatorBase<IN,OUT>> po = new FlatMapOperatorBase<IN,OUT>>(function,new UnaryOperatorinformation<IN,OUT>(getInputType(),getResultType()),name);
		// set input
		po.setInput(input);
		// set parallelism
		if (this.getParallelism() > 0) {
			// use specified parallelism
			po.setParallelism(this.getParallelism());
		} else {
			// if no parallelism has been specified,use parallelism of input operator to enable chaining
			po.setParallelism(input.getParallelism());
		}
		return po;
	}
}

FlatMapOperator的基类如下:

public abstract class SingleInputUdfOperator<IN,O extends SingleInputUdfOperator<IN,O>> extends SingleInputoperator<IN,O> implements UdfOperator<O> {

}

// Base class for operations that operates on a single input data set.
public abstract class SingleInputoperator<IN,O extends SingleInputoperator<IN,O>> extends Operator<OUT,O> {
  	private final DataSet<IN> input;
}

生成计划

DataSet API所编写的批处理程序跟DataStream API所编写的流处理程序在生成作业图(JobGraph)之前的实现差别很大。流处理程序是生成流图(StreamGraph),而批处理程序是生成计划(Plan)并由优化器对其进行优化并生成优化后的计划(Optimizedplan)。

计划(Plan)以数据流(dataflow)的形式来表示批处理程序,但它只是批处理程序最初的表示,在一个批处理程序生成作业图之前,计划还会被进行优化以产生更高效的方案。Plan不同于流图(StreamGraph),它以sink为入口,因为一个批处理程序可能存在若干个sink,所以Plan采用集合来存储它。另外Plan还封装了批处理作业的一些基本属性:jobId、jobName以及defaultParallelism等。

生成Plan的核心部件是算子翻译器(OperatorTranslation),createProgramPlan方法通过它来”翻译“出计划,核心代码如下

public class OperatorTranslation {
  
   // 接收每个需遍历的DataSink对象,然后将其转换成GenericDataSinkBase对象
   public Plan translatetoPlan(List<DataSink<?>> sinks,String jobName) {
       List<GenericDataSinkBase<?>> planSinks = new ArrayList<>();
       //遍历sinks集合
       for (DataSink<?> sink : sinks) {
            //将翻译生成的GenericDataSinkBase加入planSinks集合*,对每个sink进行”翻译“
            planSinks.add(translate(sink));
        }
       //以planSins集合构建Plan对象
       Plan p = new Plan(planSinks);
       p.setJobName(jobName);
       return p;
    }

	private <I,O> org.apache.flink.api.common.operators.Operator<O>    translateSingleInputoperator(SingleInputoperator<?,?,?> op) {
    //会调用到 FlatMapOperator 的 translatetoDataFlow
	org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translatetoDataFlow(input);    
  }
  
}

FlatMapOperatorBase就是生成的plan中的一员。

public class FlatMapOperatorBase<IN,FT extends FlatMapFunction<IN,OUT>> extends SingleInputoperator<IN,FT> {
	@Override
	protected List<OUT> executeOnCollections(List<IN> input,RuntimeContext ctx,ExecutionConfig executionConfig) throws Exception {
		FlatMapFunction<IN,OUT> function = userFunction.getUserCodeObject();
		
		FunctionUtils.setFunctionRuntimeContext(function,ctx);
		FunctionUtils.openFunction(function,parameters);

		ArrayList<OUT> result = new ArrayList<OUT>(input.size());

		TypeSerializer<IN> inSerializer = getoperatorInfo().getInputType().createSerializer(executionConfig);
		TypeSerializer<OUT> outSerializer = getoperatorInfo().getoutputType().createSerializer(executionConfig);

		copyingListCollector<OUT> resultCollector = new copyingListCollector<OUT>(result,outSerializer);

		for (IN element : input) {
			IN incopy = inSerializer.copy(element);
			function.flatMap(incopy,resultCollector);
		}

		FunctionUtils.closeFunction(function);

		return result;
	}
}

而最后优化时候,则FlatMapOperatorBase会被优化成FlatMapNode。

public class GraphCreatingVisitor implements Visitor<Operator<?>> {
	public boolean preVisit(Operator<?> c) {
    else if (c instanceof FlatMapOperatorBase) {
			n = new FlatMapNode((FlatMapOperatorBase<?,?>) c);
		}
  }
}

自此,FlatMap就被组合到 DataSet的 Optimizedplan 中。下一步Flink会依据Optimizedplan来生成 JobGraph。

作业图(JobGraph)是唯一被Flink的数据流引擎所识别的表述作业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一。至此就完成了从用户业务代码到Flink运行系统的转化。

在运行状态下,如果上游有数据流入,则FlatMap这个算子就会发挥作用。

2. DataStream

对于DataStream,则是另外一套体系结构。首先我们找一个使用DataStream的例子看看。

//获取数据: 从socket中获取
val textDataStream = env.socketTextStream("127.0.0.1",8888,'\n')
val tupDataStream = textDataStream.flatMap(_.split(" ")).map(WordWithCount(_,1))

//groupby: 按照指定的字段聚合
val windowDstram = tupDataStream.keyBy("word").timeWindow(Time.seconds(5),Time.seconds(1))
windowDstram.sum("count").print()

上面例子中,flatMap 调用的是DataStream中的API,具体如下:

public class DataStream<T> {
  
	public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T,R> flatMapper) {
    //clean函数用来移除FlatMapFunction类对象的外部类部分,这样就可以进行序列化
    //getType用来获取类对象的输出类型
		Typeinformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),Utils.getCallLocationName(),true);
		return flatMap(flatMapper,outType);
	}
  
  // 构建了一个StreamFlatMap的Operator
	public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T,R> flatMapper,Typeinformation<R> outputType) {
		return transform("Flat Map",outputType,new StreamFlatMap<>(clean(flatMapper)));
	}  
  
  // 依次调用下去
	@PublicEvolving
	public <R> SingleOutputStreamOperator<R> transform(
			String operatorName,Typeinformation<R> outTypeInfo,OneInputStreamOperator<T,R> operator) {
		return doTransform(operatorName,outTypeInfo,SimpleOperatorFactory.of(operator));
	}
  
	protected <R> SingleOutputStreamOperator<R> doTransform(
			String operatorName,StreamOperatorFactory<R> operatorFactory) {
		// read the output type of the input Transform to coax out errors about MissingTypeInfo
		transformation.getoutputType();
    // 构建Transform对象,Transform对象中包含其上游Transform对象,这样上游下游就串成了一个Transform链。
		OneInputTransformation<T,R> resultTransform = new OneInputTransformation<>(
				this.transformation,operatorName,operatorFactory,environment.getParallelism());
		@SuppressWarnings({"unchecked","rawtypes"})
		SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment,resultTransform);
    // 将这Transform对象放入env的transform对象列表。
		getExecutionEnvironment().addOperator(resultTransform);
    // 返回流
		return returnStream;
	}  
}

上面源码中的几个概念需要澄清。

Transformation:首先,FlatMap在FLink编程模型中是算子API,在DataStream中会生成一个Transformation,即逻辑算子。

逻辑算子Transformation最后会对应到物理算子Operator,这个概念对应的就是StreamOperator

StreamOperator:DataStream 上的每一个 Transformation 都对应了一个 StreamOperator,StreamOperator是运行时的具体实现,会决定UDF(user-defined Funtion)的调用方式。

processElement()方法也是UDF的逻辑被调用的地方,例如FlatMapFunction里的flatMap()方法。

public class StreamFlatMap<IN,OUT>
		extends AbstractUdfStreamOperator<OUT,OUT>>
		implements OneInputStreamOperator<IN,OUT> {

	private transient TimestampedCollector<OUT> collector;

	public StreamFlatMap(FlatMapFunction<IN,OUT> flatMapper) {
		super(flatMapper);
		chainingStrategy = ChainingStrategy.ALWAYS;
	}

	@Override
	public void open() throws Exception {
		super.open();
		collector = new TimestampedCollector<>(output);
	}

	@Override
	public void processElement(StreamRecord<IN> element) throws Exception {
		collector.setTimestamp(element);
    // 调用用户定义的FlatMap
		userFunction.flatMap(element.getValue(),collector);
	}
}

我们可以看到,StreamFlatMap继承了AbstractUdfStreamOperator,从而间接继承了StreamOperator。

public abstract class AbstractStreamOperator<OUT>
		implements StreamOperator<OUT>,SetupableStreamOperator<OUT>,Serializable {
}

StreamOperator是根接口。对于 Streaming 来说所有的算子都继承自 StreamOperator。继承了StreamOperator的扩展接口则有OneInputStreamOperator,TwoInputStreamOperator。实现了StreamOperator的抽象类有AbstractStreamOperator以及它的子类AbstractUdfStreamOperator。

从 API 到 逻辑算子 Transformation,再到 物理算子Operator,就生成了 StreamGraph。下一步Flink会依据StreamOperator来生成 JobGraph。

作业图(JobGraph)是唯一被Flink的数据流引擎所识别的表述作业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一。至此就完成了从用户业务代码到Flink运行系统的转化。

0x04 参考

Flink中richfunction的一点小作用

【浅显易懂】scala中map与flatMap的区别

Working with State

flink简单应用: scala编写wordcount

【Flink】Flink基础之实现WordCount程序(Java与Scala版本)

Flink进阶教程:以flatMap为例,如何进行算子自定义

Flink运行时之批处理程序生成计划

Android LayoutInflater.inflate源码分析

Android LayoutInflater.inflate源码分析

LayoutInflater.inflate源码详解

LayoutInflater的inflate方法相信大家都不陌生,在Fragment的onCreateView中或者在BaseAdapter的getView方法中我们都会经常用这个方法来实例化出我们需要的View.

假设我们有一个需要实例化的布局文件menu_item.xml:

<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
  android:layout_width="match_parent"
  android:layout_height="wrap_content"
  android:orientation="vertical">

  <TextView
    android:id="@+id/id_menu_title_tv"
    android:layout_width="match_parent"
    android:layout_height="300dp"
    android:gravity="center_vertical"
    android:textColor="@android:color/black"
    android:textSize="16sp"
    android:text="@string/menu_item"/>
</LinearLayout>

我们想在BaseAdapter的getView()方法中对其进行实例化,其实例化的方法有三种,分别是:

2个参数的方法:

convertView = mInflater.inflate(R.layout.menu_item,null);

3个参数的方法(attachToRoot=false):

convertView = mInflater.inflate(R.layout.menu_item,parent,false);

3个参数的方法(attachToRoot=true):

convertView = mInflater.inflate(R.layout.menu_item,true);

究竟我们应该用哪个方法进行实例化View,这3个方法又有什么区别呢?如果有同学对三个方法的区别还不是特别清楚,那么就和我一起从源码的角度来分析一下这个问题吧.

源码

inflate

我们先来看一下两个参数的inflate方法,源码如下:

public View inflate(@LayoutRes int resource,@Nullable ViewGroup root) {
  return inflate(resource,root,root != null);
}

从代码我们看出,其实两个参数的inflate方法根据父布局parent是否为null作为第三个参数来调用三个参数的inflate方法,三个参数的inflate方法源码如下:

public View inflate(@LayoutRes int resource,@Nullable ViewGroup root,boolean attachToRoot) {
  // 获取当前应用的资源集合
  final Resources res = getContext().getResources();
  // 获取指定资源的xml解析器
  final XmlResourceParser parser = res.getLayout(resource);
  try {
    return inflate(parser,attachToRoot);
  } finally {
    // 返回View之前关闭parser资源
    parser.close();
  }
}

这里需要解释一下,我们传入的资源布局id是无法直接实例化的,需要借助XmlResourceParser.

而XmlResourceParser是借助Android的pull解析方法是解析布局文件的.继续跟踪inflate方法源码:

public View inflate(XmlPullParser parser,boolean attachToRoot) {
  synchronized (mConstructorArgs) {
    // 获取上下文对象,即LayoutInflater.from传入的Context.
    final Context inflaterContext = mContext;
    // 根据parser构建XmlPullAttributes.
    final AttributeSet attrs = Xml.asAttributeSet(parser);
    // 保存之前的Context对象.
    Context lastContext = (Context) mConstructorArgs[0];
    // 赋值为传入的Context对象.
    mConstructorArgs[0] = inflaterContext;
    // 注意,默认返回的是父布局root.
    View result = root;

    try {
      // 查找xml的开始标签.
      int type;
      while ((type = parser.next()) != XmlPullParser.START_TAG &&
          type != XmlPullParser.END_DOCUMENT) {
        // Empty
      }

      // 如果没有找到有效的开始标签,则抛出InflateException异常.
      if (type != XmlPullParser.START_TAG) {
        throw new InflateException(parser.getPositionDescription()
            + ": No start tag found!");
      }

      // 获取控件名称.
      final String name = parser.getName();

      // 特殊处理merge标签
      if (TAG_MERGE.equals(name)) {
        if (root == null || !attachToRoot) {
          throw new InflateException("<merge /> can be used only with a valid "
              + "ViewGroup root and attachToRoot=true");
        }

        rInflate(parser,inflaterContext,attrs,false);
      } else {
        // 实例化我们传入的资源布局的view
        final View temp = createViewFromTag(root,name,attrs);
        ViewGroup.LayoutParams params = null;

        // 如果传入的parent不为空.
        if (root != null) {
          if (DEBUG) {
            System.out.println("Creating params from root: " +
                root);
          }
          // 创建父类型的LayoutParams参数.
          params = root.generateLayoutParams(attrs);
          if (!attachToRoot) {
            // 如果实例化的View不需要添加到父布局上,则直接将根据父布局生成的params参数设置
            // 给它即可.
            temp.setLayoutParams(params);
          }
        }

        // 递归的创建当前布局的所有控件
        rInflateChildren(parser,temp,true);

        // 如果传入的父布局不为null,且attachToRoot为true,则将实例化的View加入到父布局root中
        if (root != null && attachToRoot) {
          root.addView(temp,params);
        }

        // 如果父布局为null或者attachToRoot为false,则将返回值设置成我们实例化的View
        if (root == null || !attachToRoot) {
          result = temp;
        }
      }

    } catch (XmlPullParserException e) {
      InflateException ex = new InflateException(e.getMessage());
      ex.initCause(e);
      throw ex;
    } catch (Exception e) {
      InflateException ex = new InflateException(
          parser.getPositionDescription()
              + ": " + e.getMessage());
      ex.initCause(e);
      throw ex;
    } finally {
      // Don't retain static reference on context.
      mConstructorArgs[0] = lastContext;
      mConstructorArgs[1] = null;
    }

    Trace.traceEnd(Trace.TRACE_TAG_VIEW);

    return result;
  }
}

上述代码中的关键部分我已经加入了中文注释.从上述代码中我们还可以发现,我们传入的布局文件是通过createViewFromTag来实例化每一个子节点的.

createViewFromTag

函数源码如下:

/**
 * 方便调用5个参数的方法,ignoreThemeAttr的值为false.
 */
private View createViewFromTag(View parent,String name,Context context,AttributeSet attrs) {
  return createViewFromTag(parent,context,false);
}

View createViewFromTag(View parent,AttributeSet attrs,boolean ignoreThemeAttr) {
  if (name.equals("view")) {
    name = attrs.getAttributeValue(null,"class");
  }

  // Apply a theme wrapper,if allowed and one is specified.
  if (!ignoreThemeAttr) {
    final TypedArray ta = context.obtainStyledAttributes(attrs,ATTRS_THEME);
    final int themeResId = ta.getResourceId(0,0);
    if (themeResId != 0) {
      context = new ContextThemeWrapper(context,themeResId);
    }
    ta.recycle();
  }

  // 特殊处理“1995”这个标签(ps: 平时我们写xml布局文件时基本没有使用过).
  if (name.equals(TAG_1995)) {
    // Let's party like it's 1995!
    return new BlinkLayout(context,attrs);
  }

  try {
    View view;
    if (mFactory2 != null) {
      view = mFactory2.onCreateView(parent,attrs);
    } else if (mFactory != null) {
      view = mFactory.onCreateView(name,attrs);
    } else {
      view = null;
    }

    if (view == null && mPrivateFactory != null) {
      view = mPrivateFactory.onCreateView(parent,attrs);
    }

    if (view == null) {
      final Object lastContext = mConstructorArgs[0];
      mConstructorArgs[0] = context;
      try {
        if (-1 == name.indexOf('.')) {
          view = onCreateView(parent,attrs);
        } else {
          view = createView(name,null,attrs);
        }
      } finally {
        mConstructorArgs[0] = lastContext;
      }
    }

    return view;
  } catch (InflateException e) {
    throw e;

  } catch (ClassNotFoundException e) {
    final InflateException ie = new InflateException(attrs.getPositionDescription()
        + ": Error inflating class " + name);
    ie.initCause(e);
    throw ie;

  } catch (Exception e) {
    final InflateException ie = new InflateException(attrs.getPositionDescription()
        + ": Error inflating class " + name);
    ie.initCause(e);
    throw ie;
  }
}

在createViewFromTag方法中,最终是通过createView方法利用反射来实例化view控件的.

createView

public final View createView(String name,String prefix,AttributeSet attrs)
  throws ClassNotFoundException,InflateException {
  // 以View的name为key,查询构造函数的缓存map中是否已经存在该View的构造函数.
  Constructor<? extends View> constructor = sConstructorMap.get(name);
  Class<? extends View> clazz = null;

  try {
    // 构造函数在缓存中未命中
    if (constructor == null) {
      // 通过类名去加载控件的字节码
      clazz = mContext.getClassLoader().loadClass(prefix != null ? (prefix + name) : name).asSubClass(View.class);
      // 如果有自定义的过滤器并且加载到字节码,则通过过滤器判断是否允许加载该View
      if (mFilter != null && clazz != null) {
        boolean allowed = mFilter.onLoadClass(clazz);
        if (!allowed) {
          failNotAllowed(name,prefix,attrs);
        }
      }
      // 得到构造函数
      constructor = clazz.getConstructor(mConstructorSignature);
      constructor.setAccessible(true);
      // 缓存构造函数
      sConstructorMap.put(name,constructor);
    } else {
      if (mFilter != null) {
        // 过滤的map是否包含了此类名
        Boolean allowedState = mFilterMap.get(name);
        if (allowedState == null) {
          // 重新加载类的字节码
          clazz = mContext.getClassLoader().loadClass(prefix != null ? (prefix + name) : name).asSubclass(View.class);
          boolean allowed = clazz != null && mFilter.onLoadClass(clazz);
          mFilterMap.put(name,allowed);
          if (!allowed) {
            failNotAllowed(name,attrs);
          }
        } else if (allowedState.equals(Boolean.FALSE)) {
          failNotAllowed(name,attrs);
        }
      }
    }

    // 实例化类的参数数组(mConstructorArgs[0]为Context,[1]为View的属性)
    Object[] args = mConstructorArgs;
    args[1] = attrs;
    // 通过构造函数实例化View
    final View view = constructor.newInstance(args);
    if (View instanceof ViewStub) {
      final ViewStub viewStub = (ViewStub) view;
      viewStub.setLayoutInflater(cloneInContext((Context)args[0]))
    }
    return view;
  } catch (NoSunchMethodException e) {
    // ......
  } catch (ClassNotFoundException e) {
    // ......
  } catch (Exception e) {
    // ......
  } finally {
    // ......
  }
}

总结

通过学习了inflate函数源码,我们再回过头去看BaseAdapter的那三种方法,我们可以得出的结论是:

第一种方法使用不够规范,且会导致实例化View的LayoutParams属性失效.(ps: 即layout_width和layout_height等参数失效,因为源码中这种情况的LayoutParams为null).

第二种是最正确,也是最标准的写法.

第三种由于attachToRoot为true,所以返回的View其实是父布局ListView,这显然不是我们想要实例化的View.因此,第三种写法是错误的.

感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

Collection 源码分析(四):LinkedHashMap 源码分析

Collection 源码分析(四):LinkedHashMap 源码分析

点开 LinkedHashMap 的源码 可以看到 继承 HashMap<K,V> 实现 Map 接口

我们注意到 在源码开头描述中有一段话

都实现了 Map 接口 并且区别在于 linkedHashMap 是一个双端队列组成

可以看见比 HashMap 的 Entry 多了两个参数 before,after

也是使用的 Hash''Map 的 Entry 构造器

包括实例化 LinkedHashMap 的时候调用的也是父类(HashMap)的构造器,put 方法,remove 方法都是走的 HashMap 的方法 可以看到 LinkedHashMap 的内存结构与 HashMap 是一样的 唯一区别就是他的链表 

HashMap 的 Entry 链表 只有一个 next 参数 相当于单向链表

而 LinkedHashMap 的链表有 after,before 两个参数 相当于双向链表

可以看出 在做大量查找操作的时候  LinkedHashMap 的效率要比 HashMap 的高 其他基本无区别

做增加操作基本等量 也就多了两个 “指针” 赋值 注意此处的指针 并不是真正的指针

Flask源码分析二:路由内部实现原理

Flask源码分析二:路由内部实现原理

前言

Flask是目前为止我最喜欢的一个Python Web框架了,为了更好的掌握其内部实现机制,这两天准备学习下Flask的源码,将由浅入深跟大家分享下,其中Flask版本为1.1.1。

上次了解了Flask服务的启动流程,今天我们来看下路由的内部实现机理。

Flask系列文章:

  1. Flask开发初探
  2. Flask源码分析一:服务启动

关于路由

所谓路由,就是处理请求URL和函数之间关系的程序。

Flask中也是对URL规则进行统一管理的,创建URL规则有两种方式:

  1. 使用@app.route修饰器,并传入URL规则作为参数,将函数绑定到URL,这个过程便将一个函数注册为路由,这个函数则被称为视图函数。
  2. 使用app.add_url_rule()。

在开始阅读源码之前,我是有这几点疑问的?

  1. 注册路由的过程是什么?
  2. Flask内部是如何进行URL规则管理的?
  3. 一个视图函数绑定多个URL内部是如何实现的?
  4. 动态URL是如何进行视图函数匹配的呢?
  5. 匹配路由的过程是怎样的呢?

那就让我们带着这几点疑问一起去学习源码吧!

正文

注册路由

首先,route()装饰器:

def route(self, rule, **options):       

        def decorator(f):
            endpoint = options.pop("endpoint", None)
            self.add_url_rule(rule, endpoint, f, **options)
            return f

        return decorator

route()有两个参数,rule表示url规则。该函数对参数进行处理之后,调用方法add_url_role(),这里也就验证了两种注册路由的方法等价。我们来看下代码:

def add_url_rule(
        self,
        rule,
        endpoint=None,
        view_func=None,
        provide_automatic_options=None,
        **options
    ):
        
        if endpoint is None:
            endpoint = _endpoint_from_view_func(view_func)
        options["endpoint"] = endpoint
        methods = options.pop("methods", None)

        # if the methods are not given and the view_func object knows its
        # methods we can use that instead.  If neither exists, we go with
        # a tuple of only ``GET`` as default.
        if methods is None:
            methods = getattr(view_func, "methods", None) or ("GET",)
        if isinstance(methods, string_types):
            raise TypeError(
                "Allowed methods have to be iterables of strings, "
                ''for example: @app.route(..., methods=["POST"])''
            )
        methods = set(item.upper() for item in methods)

        # Methods that should always be added
        required_methods = set(getattr(view_func, "required_methods", ()))

        # starting with Flask 0.8 the view_func object can disable and
        # force-enable the automatic options handling.
        if provide_automatic_options is None:
            provide_automatic_options = getattr(
                view_func, "provide_automatic_options", None
            )

        if provide_automatic_options is None:
            if "OPTIONS" not in methods:
                provide_automatic_options = True
                required_methods.add("OPTIONS")
            else:
                provide_automatic_options = False

        # Add the required methods now.
        methods |= required_methods

        rule = self.url_rule_class(rule, methods=methods, **options)
        rule.provide_automatic_options = provide_automatic_options

        self.url_map.add(rule)
        if view_func is not None:
            old_func = self.view_functions.get(endpoint)
            if old_func is not None and old_func != view_func:
                raise AssertionError(
                    "View function mapping is overwriting an "
                    "existing endpoint function: %s" % endpoint
                )
            self.view_functions[endpoint] = view_func

入参包括:

  1. rule: url规则
  2. endpoint : 要注册规则的endpoint,默认是视图函数的名儿
  3. view_func: 视图函数
  4. provide_automatic_options: 请求方法是否添加OPTIONS方法的一个标志
  5. options: 关于请求处理的一些方法等

可以看到,add_url_rule()首先进行参数处理,包括:

  1. endpoint默认为视图函数的name
  2. url请求的方法默认为GET
  3. 若请求方法中没有设置OPTIONS,添加该方法。

在处理完所有的参数后,将该URL规则写入url_map(创建好Rule对象,并添加到Map对象中),将视图函数写入view_function字典中。

其中,url_map 是werkzeug.routing:Map 类的对象,rule是 werkzeug.routing:Rule 类的对象,也就是Flask的核心路由逻辑是在werkzeug中实现的

werkzeug

werkzeug是使用Python编写的一个WSGI工具集,werkzeug.routing模块主要用于url解析。

Rule类

Rule类继承自RuleFactory类,一个Rule实例代表一个URL模式,一个WSGI应用会处理很多个不同的URL模式,与此同时产生很多个Rule实例,这些实例将作为参数传给Map类。

Map类

Map类构造的实例存储所有的url规则,解析并匹配请求对应的视图函数。

路由匹配

在应用初始化的过程中,会注册所有的路由规则,可以调用(app.url_map)查看,当服务收到URL请求时,就需要进行路由匹配,以找到对应的视图函数,对应的流程和原理是什么呢?

当用户请求进入Flask应用时,调用Flask类的wsgi_app方法:

def wsgi_app(self, environ, start_response):
    
    ctx = self.request_context(environ)
    error = None
    try:
        try:
            ctx.push()
            response = self.full_dispatch_request()
        except Exception as e:
            error = e
            response = self.handle_exception(e)
        except:  # noqa: B001
            error = sys.exc_info()[1]
            raise
        return response(environ, start_response)
    finally:
        if self.should_ignore_error(error):
            error = None
        ctx.auto_pop(error)

该函数的处理过程包括:

  1. 创建RequestContext对象,在对象初始化的过程中调用app.create_url_adapter()方法,将请求参数environ传给Map对象创建MapAdapter对象,保存在url_adapter字段中
  2. 将RequestContext对象推入_request_ctx_stack栈中
  3. 通过RequestContext的match_request方法,调用MapAdapter对象的match方法找到匹配的Rule并解析出参数,保存在request的url_rule和view_args字段中
  4. 调用full_dispatch_request()

接下来我们看下full_dispatch_request方法:

def full_dispatch_request(self):
    self.try_trigger_before_first_request_functions()
    try:
        request_started.send(self)
        rv = self.preprocess_request()
        if rv is None:
            rv = self.dispatch_request()
    except Exception as e:
        rv = self.handle_user_exception(e)
    return self.finalize_request(rv)

可以看到,重点执行dispatch_request():

def dispatch_request(self):
    req = _request_ctx_stack.top.request
    if req.routing_exception is not None:
        self.raise_routing_exception(req)
    rule = req.url_rule
    # if we provide automatic options for this URL and the
    # request came with the OPTIONS method, reply automatically
    if (
        getattr(rule, "provide_automatic_options", False)
        and req.method == "OPTIONS"
    ):
        return self.make_default_options_response()
    # otherwise dispatch to the handler for that endpoint
    return self.view_functions[rule.endpoint](**req.view_args)

处理的过程是:获取请求对象的request,找到对应的endpoint,继而从view_functions中找到对应的视图函数,传递请求参数,视图函数处理内部逻辑并返回,完成一次请求分发。

以上,就是Flask路由的内部实现原理。

Flask源码分析(三)Flask的请求上下文

Flask源码分析(三)Flask的请求上下文

flask是如何管理上下文的

1.什么是上下文

通俗的说,就是一个容器,存储了一些与程序运行相关的参数变量等,这些内容支持着程序的运行。

在flask中谈论的上下文为两个:请求上下文和应用上下文。比如常用的gsessionrequest,属于请求上下文,其内容只在各自的请求中有效。而current_app就是应用上下文。flask引入应用上下文的概念是为了更好的支持多应用开发。

2.flask是如何管理上下文的

2.1 继续从上篇文章falsk是如何处理请求的接着说。上篇文章说到wsgi_app时,提到了调用self.request_context方法时会创建请求上下文对象。

def wsgi_app(self, environ, start_response):
    # 创建当前请求的上下文空间
    ctx = self.request_context(environ)
    error = None
    try:
        try:
            # 将上下文压入栈中
            ctx.push()
            response = self.full_dispatch_request()
        except Exception as e:
            error = e
            response = self.handle_exception(e)
        except:  # noqa: B001
            error = sys.exc_info()[1]
            raise
        return response(environ, start_response)
    finally:
        if self.should_ignore_error(error):
            error = None
        # 将当前上下文从栈空间弹出
        ctx.auto_pop(error)

2.2 那就先看一下self.request_context执行了什么(已删减部分源码)。内部首先是实例化了RequestContext这个类(这个名字很明显吧),实例化方法内部还实例化了request_class这个类(继承自BaseRequest),返回了请求对象,这个请求对象包含了请求的相关信息。RequestContext类中实现了两个重要的方法:push和pop方法。看到这里,依稀明白了请求上下文的处理流程。

上面wsgi_app中提到的full_dispatch_request方法在处理请求时,会到_request_ctx_stack取栈顶的请求上下文(可继续看源码,内容太多,就不贴出来了),对请求处理结束返回相应的响应对象后,再调用auto_pop(内部调用pop)将请求上下文从栈空间弹出。

flask是支持多线程和协程的,比如多线程访问时,flask是如何保证取请求上下文不会取到同一个呢?

def request_context(self, environ):
    # self即当前app,environ是请求的参数
    return RequestContext(self, environ)

class RequestContext(object):

    def __init__(self, app, environ, request=None, session=None):
        self.app = app
        if request is None:
            request = app.request_class(environ)
        self.request = request
        self._implicit_app_ctx_stack = []

        self.preserved = False

        
    def push(self):
        """Binds the request context to the current context."""
         # 取_request_ctx_stack栈顶的请求上下文
        top = _request_ctx_stack.top
        # 如果某次异常,会导致上次请求上下文没有正常弹出,这里确保栈顶没有请求上下文
        if top is not None and top.preserved:
            top.pop(top._preserved_exc)
         # 取_app_ctx_stack栈顶的应用上下文
        app_ctx = _app_ctx_stack.top
        # 确保当前请求上下文在这个应用上下文内
        if app_ctx is None or app_ctx.app != self.app:
            app_ctx = self.app.app_context()
            app_ctx.push()
            self._implicit_app_ctx_stack.append(app_ctx)
        else:
            self._implicit_app_ctx_stack.append(None)

        if hasattr(sys, "exc_clear"):
            sys.exc_clear()
         # 将自身压入_request_ctx_stack栈中
        _request_ctx_stack.push(self)


    def pop(self, exc=_sentinel):
        app_ctx = self._implicit_app_ctx_stack.pop()

        try:
            clear_request = False
            if not self._implicit_app_ctx_stack:
                self.preserved = False
                self._preserved_exc = None
                if exc is _sentinel:
                    exc = sys.exc_info()[1]
                self.app.do_teardown_request(exc)

                if hasattr(sys, "exc_clear"):
                    sys.exc_clear()

                request_close = getattr(self.request, "close", None)
                if request_close is not None:
                    request_close()
                clear_request = True
        finally:
            # 弹出当前请求上下文
            rv = _request_ctx_stack.pop()

            if clear_request:
                rv.request.environ["werkzeug.request"] = None

            # Get rid of the app as well if necessary.
            if app_ctx is not None:
                app_ctx.pop(exc)
             # 确保弹出的上下文空间是自身
            assert rv is self, "Popped wrong request context. (%r instead of %r)" % (
                rv,
                self,
            )

2.3 答案是使用线程或协程的唯一标识,即get_ident这个函数。来看_request_ctx_stack源码,_request_ctx_stack是个全局变量,一开始实例化Flask类时,就会实例化LocalStack这个类并且导入了这个变量。

调用push方法时,会触发self._local的__getattr__方法,如果self._local没有存储当前线程或协程的唯一标识,会触发自身的__setattr__方法,然后将当前请求上下文存储到这个__storage__属性中,这样就保证并发请求时,正确使用对应的上下文啦。

_request_ctx_stack = LocalStack()
_app_ctx_stack = LocalStack()

try:
    from greenlet import getcurrent as get_ident
except ImportError:
    try:
        from thread import get_ident
    except ImportError:
        from _thread import get_ident

class Local(object):
    __slots__ = ("__storage__", "__ident_func__")

    def __init__(self):
        object.__setattr__(self, "__storage__", {})
        object.__setattr__(self, "__ident_func__", get_ident)

    def __getattr__(self, name):
        try:
            return self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)

    def __setattr__(self, name, value):
        ident = self.__ident_func__()
        storage = self.__storage__
        try:
            storage[ident][name] = value
        except KeyError:
            storage[ident] = {name: value}

            
class LocalStack(object):

    def __init__(self):
        self._local = Local()

    def push(self, obj):
        """Pushes a new item to the stack"""
        # 调用self._local对象中的__getattr__方法
        rv = getattr(self._local, "stack", None)
        if rv is None:
            # 调用self._local对象中的__setattr__方法,设置当前线程或协程唯一标识
            self._local.stack = rv = []
        # 压入栈中当前的请求上下文
        # 最终self._local中__storage__的内容类似为:
        # {241253254325: {''stack'': RequestContext}}
        rv.append(obj)
        return rv

    def pop(self):
        pass

    @property
    def top(self):
        pass

关于[源码分析] 从FlatMap用法到Flink的内部实现flink flatmap原理的介绍已经告一段落,感谢您的耐心阅读,如果想了解更多关于Android LayoutInflater.inflate源码分析、Collection 源码分析(四):LinkedHashMap 源码分析、Flask源码分析二:路由内部实现原理、Flask源码分析(三)Flask的请求上下文的相关信息,请在本站寻找。

本文标签: