这篇文章主要围绕和PySparkreduceByKey之后的嵌套列表展开,旨在为您提供一份详细的参考资料。我们将全面介绍的优缺点,解答PySparkreduceByKey之后的嵌套列表的相关问题,同时
这篇文章主要围绕和PySparkreduceByKey之后的嵌套列表展开,旨在为您提供一份详细的参考资料。我们将全面介绍的优缺点,解答PySparkreduceByKey之后的嵌套列表的相关问题,同时也会为您带来Apache Spark-reducebyKey-Java-、combineByKey&groupByKey&sortedByKey、eclipse 开发 spark 程序找不到 reduceByKey 操作、eclipse 开发spark程序找不到reduceByKey操作的实用方法。
本文目录一览:- (PySpark)reduceByKey之后的嵌套列表(pyspark regexp_replace)
- Apache Spark-reducebyKey-Java-
- combineByKey&groupByKey&sortedByKey
- eclipse 开发 spark 程序找不到 reduceByKey 操作
- eclipse 开发spark程序找不到reduceByKey操作
(PySpark)reduceByKey之后的嵌套列表(pyspark regexp_replace)
我敢肯定这很简单,但是我没有发现任何与此相关的东西。
我的代码很简单:
... stream = stream.map(mapper) stream = stream.reduceByKey(reducer) ...
没什么特别的,输出看起来像这样:
... key1 value1 key2 [value2, value3] key3 [[value4, value5], value6] ...
等等。因此,有时我得到一个固定值(如果是单个值)。有时-嵌套列表可能非常深(在我的简单测试数据中,深度为3级)。
我尝试在源中搜索“ flat”之类的东西-但仅发现flatMap方法(据我所知)不是我所需要的。
我不知道为什么这些列表是嵌套的。我的猜测是,它们是由不同的流程(工作人员?)处理的,然后合并在一起却没有拼合。
当然,我可以用Python编写代码,以展开该列表并将其展平。但是我认为这不是正常情况-我认为几乎每个人都需要固定的产出。
itertools.chain在发现不可迭代的值时停止展开。换句话说,它仍然需要一些编码(上一段)。
那么-如何使用PySpark的本机方法展平列表?
谢谢
答案1
小编典典这里的问题是您的reduce函数。对于每个键,reduceByKey
请使用一对值调用reduce函数,并期望它产生相同类型的组合值。
例如,说我想执行字计数操作。首先,我可以将每个单词映射为一(word, 1)
对,然后可以reduceByKey(lambda x, y: x +y)
对每个单词的计数求和。最后,我剩下了(word, count)
成对的RDD 。
这是来自PySpark
API文档的示例:
>>> from operator import add>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])>>> sorted(rdd.reduceByKey(add).collect())[(''a'', 2), (''b'', 1)]
要了解为什么您的示例不起作用,您可以想象应用了reduce函数,如下所示:
reduce(reduce(reduce(firstValue, secondValue), thirdValue), fourthValue) ...
根据您的reduce函数,听起来您可能正在尝试实现内置groupByKey
操作,该操作将每个键与其值列表进行分组。
此外,看一看combineByKey
,的推广reduceByKey()
,使精简函数的输入和输出类型不同(reduceByKey
是实现在以下方面combineByKey
)
Apache Spark-reducebyKey-Java-
我正在尝试reduceByKey
使用java作为编程语言来了解Spark中的工作。
说我有一句话“我就是我”。我把句子分解成单词并将其存储为列表[I,am,who,I,am]
。
现在,此函数将分配1
给每个单词:
JavaPairRDD<String,Integer> ones = words.mapToPair(new PairFunction<String,String,Integer>() {
@Override
public Tuple2<String,Integer> call(String s) {
return new Tuple2<String,Integer>(s,1);
}
});
所以输出是这样的:
(I,1)
(am,1)
(who,1)
(I,1)
(am,1)
现在,如果我有3个reducer运行,则每个reducer将获得一个键和与该键关联的值:
reducer 1:
(I,1)
(I,1)
reducer 2:
(am,1)
(am,1)
reducer 3:
(who,1)
我想知道
一个。在下面的函数中到底发生了什么。
b。参数是什么new Function2<Integer,Integer,Integer>
c。基本上,JavaPairRDD是如何形成的。
JavaPairRDD<String,Integer> counts = ones.reduceByKey(new Function2<Integer,Integer>() {
@Override
public Integer call(Integer i1,Integer i2) {
return i1 + i2;
}
});
combineByKey&groupByKey&sortedByKey
分组操作
在分布式程序中,通信的代价是很大的,因此控制数据分布以获得最少的网络传输可以极大地提升整体性能。和单节点的程序需要为记录集合选择合适的数据结构一样,Spark 程序可以通过控制RDD 分区方式来减少通信开销。
分区并不是对所有应用都有好处的——比如,如果给定RDD 只需要被扫描一次,我们完全没有必要对其预先进行分区处理。只有当数据集多次在诸如连接这种基于键的操作中使用时,分区才会有帮助。
比如, sortByKey() 和 groupByKey()会分别生成范围分区的 RDD 和哈希分区的 RDD。而另一方面,诸如 map() 这样的操作会导致新的 RDD 失去父 RDD 的分区信息,因为这样的操作理论上可能会修改每条记录的键。
从分区中获益的操作
Spark 的许多操作都引入了将数据根据键跨节点进行混洗的过程。所有这些操作都会从数据分区中获益。就 Spark 1.0 而言,能够从数据分区中获益的操作有 cogroup() 、groupWith() 、 join() 、 leftOuterJoin() 、 rightOuterJoin() 、 groupByKey() 、 reduceByKey() 、combineByKey() 以及 lookup() 。
而对于诸如 cogroup() 和join() 这样的二元操作,预先进行数据分区会导致其中至少一个 RDD(使用已知分区器的那个 RDD)不发生数据混洗。如果两个 RDD 使用同样的分区方式,并且它们还缓存在同样的机器上(比如一个 RDD 是通过 mapValues() 从另一个 RDD 中创建出来的,这两个RDD 就会拥有相同的键和分区方式),或者其中一个 RDD 还没有被计算出来,那么跨节点的数据混洗就不会发生了。
影响分区方式的操作
所有会为生成的结果 RDD 设好分区方式的操作: cogroup() 、 groupWith() 、join() 、 leftOuterJoin() 、 rightOuterJoin() 、 groupByKey() 、 reduceByKey() 、combineByKey() 、 partitionBy() 、 sort() 、 mapValues() (如果父 RDD 有分区方式的话)、flatMapValues() (如果父 RDD 有分区方式的话),以及 filter() (如果父 RDD 有分区方式的话)。其他所有的操作生成的结果都不会存在特定的分区方式。
最后,对于二元操作,输出数据的分区方式取决于父 RDD 的分区方式。默认情况下,结果会采用哈希分区,分区的数量和操作的并行度一样。不过,如果其中的一个父 RDD 已
经设置过分区方式,那么结果就会采用那种分区方式;如果两个父 RDD 都设置过分区方式,结果 RDD 会采用第一个父 RDD 的分区方式。
1. combineByKey
test1
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Scanner;
import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.AbstractJavaRDDLike;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
//JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y);
public class CombineByKeyTest3 {
public static void main(String[] xx){
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("WordCounter");
conf.set("spark.testing.memory", "2147480000");
conf.set("spark.default.parallelism", "4");
JavaSparkContext ctx = new JavaSparkContext(conf);
//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>();
data.add(new Tuple2<>("Cake", 2));
data.add(new Tuple2<>("Bread", 3));
data.add(new Tuple2<>("Cheese", 4));
data.add(new Tuple2<>("Milk", 1));
data.add(new Tuple2<>("Toast", 2));
data.add(new Tuple2<>("Bread", 2));
data.add(new Tuple2<>("Egg", 6));
JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data, 2);
JavaPairRDD<String, Integer> mapRdd = rdd1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {
return t;
}
}).partitionBy(new HashPartitioner(2)).persist(StorageLevel.MEMORY_ONLY());
JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd1 = mapRdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<String, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> t) throws Exception {
return new Tuple2<String, Tuple2<Integer, Integer>>(t._1(), new Tuple2<Integer, Integer>(t._2() , 1));
}
});
mapRdd1.foreach(x->System.out.println(x));
/*
* 全部使用List或者Iterable都能实现
*/
// JavaPairRDD<String, List<Tuple2<Integer, Integer>>> results = mapRdd1.groupByKey();
// JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>> results = mapRdd1.groupByKey();
JavaPairRDD<String, List<Tuple2<Integer, Integer>>> results = mapRdd1.combineByKey(
new Function<Tuple2<Integer,Integer>, List<Tuple2<Integer, Integer>>>() {
@Override
public List<Tuple2<Integer, Integer>> call(Tuple2<Integer, Integer> value) throws Exception {
List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>();
list.add(value);
return list;
}
},
new Function2<List<Tuple2<Integer, Integer>>, Tuple2<Integer, Integer>, List<Tuple2<Integer, Integer>>>() {
@Override
public List<Tuple2<Integer, Integer>> call(
List<Tuple2<Integer, Integer>> it,
Tuple2<Integer, Integer> value) throws Exception {
// List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>();
// it.forEach(list::add);
// list.add(value);
((List<Tuple2<Integer, Integer>>)it).add(value);
return it;
}
},
new Function2<List<Tuple2<Integer, Integer>>, List<Tuple2<Integer, Integer>>, List<Tuple2<Integer, Integer>>>() {
@Override
public List<Tuple2<Integer, Integer>> call(
List<Tuple2<Integer, Integer>> it1,
List<Tuple2<Integer, Integer>> it2) throws Exception {
// List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>();
// it1.forEach(list::add);
// it2.forEach(list::add);
// return list;
((List)it1).addAll((List)it2);
return it1;
}
});
results.foreach(x->System.out.println(x));
//其实,distinct 基于 reduceByKey实现
// mapRdd1.distinct();
ctx.stop();
}
}
test2
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Scanner;
import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.AbstractJavaRDDLike;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
//JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y);
public class CombineByKeyTest2 {
public static void main(String[] xx){
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.set("spark.testing.memory", "2147480000");
conf.setAppName("WordCounter");
conf.set("spark.default.parallelism", "4");
JavaSparkContext ctx = new JavaSparkContext(conf);
//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>();
data.add(new Tuple2<>("Cake", 2));
data.add(new Tuple2<>("Bread", 3));
data.add(new Tuple2<>("Cheese", 4));
data.add(new Tuple2<>("Milk", 1));
data.add(new Tuple2<>("Toast", 2));
data.add(new Tuple2<>("Bread", 2));
data.add(new Tuple2<>("Egg", 6));
int index="Code".hashCode() % 4;
JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data, 2);
JavaPairRDD<String, Integer> mapRdd = rdd1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {
return t;
}
}).partitionBy(new HashPartitioner(4)).persist(StorageLevel.MEMORY_ONLY());
// JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd.combineByKey(
// (value) -> new Tuple2<Integer, Integer>(value,1),
// (acc, value) -> new Tuple2<Integer, Integer>(acc._1() + value, acc._2() + 1),
// (acc1, acc2) -> new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2()),
// new HashPartitioner(2),
// false,
// null
// );
// JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd.aggregateByKey(
// new Tuple2<Integer, Integer>(0,0),
// (acc, value) -> new Tuple2<Integer, Integer>(acc._1() + value, acc._2() + 1),
// (acc1, acc2) -> new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2())
// );
// JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd1 = mapRdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Tuple2<Integer, Integer>>() {
// @Override
// public Tuple2<String, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> t) throws Exception {
// return new Tuple2<String, Tuple2<Integer, Integer>>(t._1(), new Tuple2<Integer, Integer>(t._2() , 1));
// }
// });
// mapRdd1.foreach(System.out::println);
// JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd1.reduceByKey(new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
// @Override
// public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception {
// return new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2());
// }
// });
//results.foreach(System.out::println);
// results = mapRdd1.foldByKey(new Tuple2<Integer, Integer>(0, 0), new Function2<Tuple2<Integer,Integer>, Tuple2<Integer,Integer>, Tuple2<Integer,Integer>>() {
// @Override
// public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception {
// return new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2());
// }
// });
//results.foreach(System.out::println);
//思考:如何用combineByKey实现groupByKey
// mapRdd.groupByKey().foreach(System.out::println);
Function<Integer, List<Integer>> createCombiner=new Function<Integer, List<Integer>>() {
@Override
public List<Integer> call(Integer arg0) throws Exception {
List<Integer>list=new ArrayList<Integer>();
list.add(arg0);
return list;
}
};
Function2<List<Integer>, Integer, List<Integer>> mergeValue=new Function2<List<Integer>, Integer, List<Integer>>() {
@Override
public List<Integer> call(List<Integer> list, Integer value) throws Exception {
list.add(value);
return list;
}
};
Function2< List<Integer>,List<Integer> ,List<Integer> > mergeCombiners=new Function2<List<Integer>, List<Integer>, List<Integer>>() {
@Override
public List<Integer> call(List<Integer> list1, List<Integer> list2) throws Exception {
List<Integer> list=new ArrayList<Integer>();
// list.addAll(list1);
// list.addAll(list2);
list1.forEach(list::add);
list2.forEach(list::add);
return list;
}
};
JavaPairRDD<String, List<Integer>> results=mapRdd.combineByKey(createCombiner, mergeValue, mergeCombiners);
results.foreach(x->System.out.println(x));
JavaPairRDD<String, Integer> re=mapRdd.partitionBy(new HashPartitioner(2));
System.out.println(re.glom().collect());
//第四个参数是分区数,glom()打印分区状态
JavaPairRDD<String, List<Integer>> results2=mapRdd.combineByKey(createCombiner, mergeValue, mergeCombiners, 2);
System.out.println(results2.glom().collect());
System.out.println(results2.getNumPartitions());
//第四个参数自定义分区器
JavaPairRDD<String, List<Integer>> results3=mapRdd.combineByKey(createCombiner, mergeValue, mergeCombiners,new HashPartitioner(3));
System.out.println(results3.glom().collect());
System.out.println(results3.getNumPartitions());
//第四个参数自定义分区器,第五个参数Boolean类型(map短是否merge),第六个参数定义序列化规则,null为默认序列化规则
JavaPairRDD<String, List<Integer>> results4=mapRdd.combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(3), true, null);
System.out.println(results4.glom().collect());
System.out.println(results4.getNumPartitions());
// mapRdd1.combineByKey(
// new Function<Tuple2<Integer,Integer>, Tuple2<Integer,Integer>>() {
// @Override
// public Tuple2<Integer,Integer> call(Tuple2<Integer, Integer> arg0) throws Exception {
// return arg0;
// }
// },
//
// new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>[]>() {
// @Override
// public Tuple2<Integer, Integer>[] call(Tuple2<Integer, Integer> arg0, Integer arg1) throws Exception {
// return null;
// }
// },
// mergeCombiners);
//其实,distinct 基于 reduceByKey实现
// mapRdd1.distinct();
ctx.stop();
}
}
2.group&join
groupByKey
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Scanner;
import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.AbstractJavaRDDLike;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
//JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y);
public class CombineByKeyTest {
@SuppressWarnings("serial")
public static void main(String[] xx){
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("WordCounter");
conf.set("spark.testing.memory", "5435657567560");
conf.set("spark.default.parallelism", "4");
JavaSparkContext ctx = new JavaSparkContext(conf);
//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>();
data.add(new Tuple2<>("Cake", 2));
data.add(new Tuple2<>("Bread", 3)); //<"Bread", <3,1>>
data.add(new Tuple2<>("Cheese", 4));
data.add(new Tuple2<>("Milk", 1));
data.add(new Tuple2<>("Toast", 2));
data.add(new Tuple2<>("Bread", 2));
data.add(new Tuple2<>("Egg", 6));
JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data, 2);
JavaPairRDD<String, Integer> mapRdd = rdd1.mapToPair(
new PairFunction<Tuple2<String, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {
return t;
}
});
// JavaPairRDD<String, Integer> mapRdd=ctx.parallelizePairs(data,2);
mapRdd.groupByKey().foreach(x->System.out.println(x));
// JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd.combineByKey(
// new Function<Integer, Tuple2<Integer, Integer>>() {
// @Override
// public Tuple2<Integer, Integer> call(Integer v1) throws Exception {
// return new Tuple2<Integer, Integer>(v1 ,1);
// }
// }, new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() {
// @Override
// public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Integer v2) throws Exception {
// return new Tuple2<Integer, Integer>(v1._1() + v2, v1._2() + 1);
// }
// }, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
// @Override
// public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception {
// return new Tuple2<Integer, Integer>(v1._1() + v2._1(), v1._2() + v2._2());
// }
// });
JavaPairRDD<String, Tuple2<Integer, Integer>> result2s = mapRdd.combineByKey(
(Integer value) -> new Tuple2<Integer, Integer>(value,1),
(Tuple2<Integer, Integer> acc, Integer value) -> new Tuple2<Integer, Integer>(acc._1() + value, acc._2() + 1),
(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) -> new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2()),
new HashPartitioner(3),
true,
null
);
result2s.foreach(x->System.out.println(x));
JavaPairRDD<String, Tuple2<Integer, Integer>> results3 = mapRdd.aggregateByKey(
new Tuple2<Integer, Integer>(0,0),
(acc, value) -> new Tuple2<Integer, Integer>(acc._1() + value, acc._2() + 1),
(acc1, acc2) -> new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2())
);
results3.foreach(x->System.out.println(x));
JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd1 = mapRdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<String, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> t) throws Exception {
return new Tuple2<String, Tuple2<Integer, Integer>>(t._1(), new Tuple2<Integer, Integer>(t._2() , 1));
}
});
JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd1.reduceByKey(new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception {
return new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2());
}
});
// results.foreach(System.out::println);
results.foreach(x->System.out.println(x));
ctx.stop();
}
}
join
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Scanner;
import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.AbstractJavaRDDLike;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
//JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y);
public class CogroupApiTest {
public static void main(String[] xx){
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.set("spark.testing.memory", "2147480000");
conf.setAppName("WordCounter");
conf.set("spark.default.parallelism", "4");
JavaSparkContext ctx = new JavaSparkContext(conf);
//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
data1.add(new Tuple2<>("Cake", 2));
data1.add(new Tuple2<>("Bread", 3));
data1.add(new Tuple2<>("Cheese", 4));
data1.add(new Tuple2<>("Milk", 1));
data1.add(new Tuple2<>("Toast", 2));
data1.add(new Tuple2<>("Bread", 2));
data1.add(new Tuple2<>("Egg", 6));
// JavaPairRDD<String, Integer> mapRdd1=ctx.parallelizePairs(data1);
JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data1, 2);
JavaPairRDD<String, Integer> mapRdd1 = rdd1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {
return t;
}
}).partitionBy(new HashPartitioner(2)).persist(StorageLevel.MEMORY_ONLY());
List<Tuple2<String, Integer>> data2 = new ArrayList<Tuple2<String, Integer>>();
data2.add(new Tuple2<>("Cake", 2));
data2.add(new Tuple2<>("Bread", 3));
data2.add(new Tuple2<>("Cheese", 4));
data2.add(new Tuple2<>("Milk", 1));
data2.add(new Tuple2<>("Toast", 2));
JavaRDD<Tuple2<String, Integer>> rdd2 = ctx.parallelize(data2, 2);
JavaPairRDD<String, Integer> mapRdd2 = rdd2.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {
return t;
}
}).partitionBy(new HashPartitioner(2)).persist(StorageLevel.MEMORY_ONLY());
//groupWith,和cogroup是一样的效果 (Bread,([3, 2],[3]))
JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> mapRdd3 = mapRdd1.cogroup(mapRdd2);
mapRdd3.foreach(x->System.out.println(x));
//(Bread,(3,3)),(Bread,(2,3)),(Cake,(2,2)) 聚合操作
// JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd3 = mapRdd1.join(mapRdd2);
// mapRdd3.foreach(x->System.out.println(x));
//(Bread,(Optional[3],3)), (Bread,(Optional[3],2)),(Cake,(Optional[2],2)) 聚合操作,主集合可以为optional.empty
// JavaPairRDD<String, Tuple2<Optional<Integer>, Integer>> mapRdd3 = mapRdd2.rightOuterJoin(mapRdd1);
// mapRdd3.foreach(x->System.out.println(x));
//(Cheese,(4,Optional[4])), (Toast,(2,Optional[2])), (Egg,(6,Optional.empty))
// JavaPairRDD<String, Tuple2<Integer, Optional<Integer>>> mapRdd4 = mapRdd1.leftOuterJoin(mapRdd2);
// mapRdd4.foreach(x->System.out.println(x));
//两边都能为空
// JavaPairRDD<String, Tuple2<Optional<Integer>, Optional<Integer>>> mapRdd5 = mapRdd1.fullOuterJoin(mapRdd2);
// mapRdd5.foreach(x->System.out.println(x));
//groupWith,和cogroup是一样的效果 (Bread,([3, 2],[3]))
// JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> mapRdd6 = mapRdd1.groupWith(mapRdd2);
// mapRdd6.foreach(x->System.out.println(x));
//(Bread,(3,3)),(Bread,(2,3)),(Cake,(2,2)) 聚合操作
// JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd7=mapRdd1.join(mapRdd2);
// mapRdd7.foreach(x->System.out.println(x));
//聚合操作,将两个maprdd并集,重复元素不会被删掉
// JavaPairRDD<String,Integer> mapRdd8=mapRdd1.union(mapRdd2);
// mapRdd8.foreach(x->System.out.println(x));
//删除key相同的元素
// JavaPairRDD<String, Integer> mapRdd9=mapRdd1.subtractByKey(mapRdd2);
// mapRdd9.foreach(x->System.out.println(x));
//求交集,只返回key,value相同的tuple
// JavaPairRDD<String, Integer> mapRdd10=mapRdd1.intersection(mapRdd2);
// mapRdd10.foreach(x->System.out.println(x));
}
}
2.sortBykey
test1
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
public class SortByKeyApi {
public static void main(String[] xx){
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("WordCounter");
conf.set("spark.testing.memory", "2147480000");
conf.set("spark.default.parallelism", "4");
JavaSparkContext ctx = new JavaSparkContext(conf);
//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>();
data.add(new Tuple2<>("Cake", 2));
data.add(new Tuple2<>("Bread", 3));
data.add(new Tuple2<>("Cheese", 4));
data.add(new Tuple2<>("Milk", 1));
data.add(new Tuple2<>("Toast", 2));
data.add(new Tuple2<>("Bread", 2));
data.add(new Tuple2<>("Egg", 6));
JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data, 2);
JavaPairRDD<String, Integer> mapRdd = rdd1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {
return t;
}
}).partitionBy(new HashPartitioner(2)).persist(StorageLevel.MEMORY_ONLY());
//mapRdd.sortByKey().foreach(System.out::println);
mapRdd.sortByKey(false).foreach(x->System.out.println(x));
// mapRdd.sortByKey(new Comparator<Tuple2<String, Integer>>() {
// @Override
// public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
// return 0;
// }
// });
// mapRdd.f
// mapRdd.mapValues(x->x+1).foreach(x->System.out.println(x));
// mapRdd.flatMapValues(()->Arrays.asList(1,1,1));
ctx.stop();
}
}
test2
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
public class SortByKeyApiTest {
public static void main(String[] xx){
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("WordCounter");
conf.set("spark.default.parallelism", "4");
conf.set("spark.testing.memory", "2147480000");
JavaSparkContext ctx = new JavaSparkContext(conf);
//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
List<Person> data1 =
new ArrayList<Person>();
data1.add(new Person("Cake",32));
data1.add(new Person("Bread",21));
data1.add(new Person("Smith",32));
data1.add(new Person("Hourse",21));
data1.add(new Person("Mary",32));
data1.add(new Person("Greey",21));
data1.add(new Person("Greey",21));
data1.add(new Person("Tom",32));
data1.add(new Person("Gao",21));
System.out.println(ctx.parallelize(data1).distinct().count());
// .sortBy(x->x, true, 2).foreach(x->System.out.println(x));
List<Tuple2<Person, Integer>> data =
new ArrayList<Tuple2<Person, Integer>>();
data.add(new Tuple2<Person, Integer>(new Person("Cake",32), 2));
data.add(new Tuple2<Person, Integer>(new Person("Bread",21), 3));
data.add(new Tuple2<Person, Integer>(new Person("Smith",32), 2));
data.add(new Tuple2<Person, Integer>(new Person("Hourse",21), 3));
data.add(new Tuple2<Person, Integer>(new Person("Mary",32), 2));
data.add(new Tuple2<Person, Integer>(new Person("Greey",21), 3));
data.add(new Tuple2<Person, Integer>(new Person("Greey",11), 3));
data.add(new Tuple2<Person, Integer>(new Person("Tom",32), 2));
data.add(new Tuple2<Person, Integer>(new Person("Gao",21), 3));
JavaPairRDD<Person, Integer> dataRdd = ctx.parallelizePairs(data);
dataRdd.sortByKey().foreach(x->System.out.println(x));
dataRdd.sortByKey(new Comparator<Person>() {
@Override
public int compare(Person o1, Person o2) {
int res = o1.name.compareTo(o2.name);
if(res == 0){
res = o1.age - o2.age;
}
return res;
}
});
ctx.close();
ctx.stop();
}
}
class Person implements Serializable, Comparable<Person>{
private static final long serialVersionUID = 1L;
public Person(String name, int age) {
super();
this.name = name;
this.age = age;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + age;
result = prime * result + ((name == null) ? 0 : name.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Person other = (Person) obj;
if (age != other.age)
return false;
if (name == null) {
if (other.name != null)
return false;
} else if (!name.equals(other.name))
return false;
return true;
}
String name;
int age;
@Override
public int compareTo(Person p) {
int res = this.name.compareTo(p.name);
if(res == 0){
res = this.age - p.age;
}
return res;
}
@Override
public String toString() {
return "Person [name=" + name + ", age=" + age + "]";
}
}
eclipse 开发 spark 程序找不到 reduceByKey 操作
看下列代码。用 eclipse 开发 spark wordCount 时找不到 reduceByKey (_+_) 操作是由于缺少 导入包
import org.apache.spark.SparkContext._ 就可以解决了或者直接用 import org.apache.spark._ 也行
package com.scala.spark.wordcount
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object WordCount2 extends App {
val conf = new SparkConf().setAppName("WordCount").setMaster("");
val sc = new SparkContext(conf);
val lines = sc.textFile("/home/streamsadmin/data/WT06557-dex5sr");
// println(lines.count());
val b = lines.flatMap(_.split(","));
val c = b.map(x => (x, 1));
val d = c.reduceByKey(_ + _);
println(d.count())
}
}
eclipse 开发spark程序找不到reduceByKey操作
看下列代码。用eclipse 开发spark wordCount时找不到reduceByKey(_+_)操作是由于缺少 导入包
import org.apache.spark.SparkContext._ 就可以解决了或者直接用 import org.apache.spark._ 也行
package com.scala.spark.wordcount
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object WordCount2 extends App {
val conf = new SparkConf().setAppName("WordCount").setMaster("");
val sc = new SparkContext(conf);
val lines = sc.textFile("/home/streamsadmin/data/WT06557-dex5sr");
// println(lines.count());
val b = lines.flatMap(_.split(","));
val c = b.map(x => (x, 1));
val d = c.reduceByKey(_ + _);
println(d.count())
}
}
今天关于和PySparkreduceByKey之后的嵌套列表的讲解已经结束,谢谢您的阅读,如果想了解更多关于Apache Spark-reducebyKey-Java-、combineByKey&groupByKey&sortedByKey、eclipse 开发 spark 程序找不到 reduceByKey 操作、eclipse 开发spark程序找不到reduceByKey操作的相关知识,请在本站搜索。
本文将为您提供关于Problems during Skeletonization image for extracting contours的详细介绍,同时,我们还将为您提供关于5、AFM(Attention+FM)-----Attentional Factorization Machines:Learning the Weight of Feature Interac...、AutoLayout and animateWithDuration problems、Configuration problem: Failed to import bean definitions from relative location、Connecting Language and Knowledge Bases with Embedding Models for Relation Extraction的实用信息。
本文目录一览:- Problems during Skeletonization image for extracting contours
- 5、AFM(Attention+FM)-----Attentional Factorization Machines:Learning the Weight of Feature Interac...
- AutoLayout and animateWithDuration problems
- Configuration problem: Failed to import bean definitions from relative location
- Connecting Language and Knowledge Bases with Embedding Models for Relation Extraction
Problems during Skeletonization image for extracting contours
I found this code to get a skeletonized image. I have a circle image
(https://docs.google.com/file/d/0ByS6Z5WRz-h2RXdzVGtXUTlPSGc/edit?usp=sharing).
img = cv2.imread(nomeimg,0)size = np.size(img)skel = np.zeros(img.shape,np.uint8)ret,img = cv2.threshold(img,127,255,0)element = cv2.getStructuringElement(cv2.MORPH_CROSS,(3,3))done = Falsewhile( not done): eroded = cv2.erode(img,element) temp = cv2.dilate(eroded,element) temp = cv2.subtract(img,temp) skel = cv2.bitwise_or(skel,temp) img = eroded.copy() zeros = size - cv2.countNonZero(img) if zeros==size: done = Trueprint("skel")print(skel)cv2.imshow("skel",skel)cv2.waitKey(0)
问题在于图像结果不是”skeleton”而是一组点!我的目的是在对图像进行骨架化后提取轮廓周长。如何编辑我的代码来解决呢?使用cv2.findContours查找骨架圆是否正确?
答案1
小编典典您需要反转白色和黑色,并cv2.dilate首先通过调用填充所有孔:
import numpy as npimport cv2img = cv2.imread("e_5.jpg",0)size = np.size(img)skel = np.zeros(img.shape,np.uint8)ret,img = cv2.threshold(img,127,255,0)element = cv2.getStructuringElement(cv2.MORPH_CROSS,(3,3))img = 255 - imgimg = cv2.dilate(img, element, iterations=3)done = Falsewhile( not done): eroded = cv2.erode(img,element) temp = cv2.dilate(eroded,element) temp = cv2.subtract(img,temp) skel = cv2.bitwise_or(skel,temp) img = eroded.copy() zeros = size - cv2.countNonZero(img) if zeros==size: done = True
但是,结果是不好的,因为存在许多差距。以下算法更好,它在中使用函数scipy.ndimage.morphology:
import scipy.ndimage.morphology as mimport numpy as npimport cv2def skeletonize(img): h1 = np.array([[0, 0, 0],[0, 1, 0],[1, 1, 1]]) m1 = np.array([[1, 1, 1],[0, 0, 0],[0, 0, 0]]) h2 = np.array([[0, 0, 0],[1, 1, 0],[0, 1, 0]]) m2 = np.array([[0, 1, 1],[0, 0, 1],[0, 0, 0]]) hit_list = [] miss_list = [] for k in range(4): hit_list.append(np.rot90(h1, k)) hit_list.append(np.rot90(h2, k)) miss_list.append(np.rot90(m1, k)) miss_list.append(np.rot90(m2, k)) img = img.copy() while True: last = img for hit, miss in zip(hit_list, miss_list): hm = m.binary_hit_or_miss(img, hit, miss) img = np.logical_and(img, np.logical_not(hm)) if np.all(img == last): break return imgimg = cv2.imread("e_5.jpg",0)ret,img = cv2.threshold(img,127,255,0)element = cv2.getStructuringElement(cv2.MORPH_CROSS,(3,3))img = 255 - imgimg = cv2.dilate(img, element, iterations=3)skel = skeletonize(img)imshow(skel, cmap="gray", interpolation="nearest")
5、AFM(Attention+FM)-----Attentional Factorization Machines:Learning the Weight of Feature Interac...
1、摘要:
提出一个Attentional FM,Attention模型+因子分解机,其通过Attention学习到特征交叉的权重。因为很显然不是所有的二阶特征交互的重要性都是一样的,如何通过机器自动的从中学习到这些重要性是这篇论文解决的最重要的问题,
比如:作者举了一个例子,在句子"US continues taking a leading role on foreign payment transparency"中,除了"foreign payment transparency",其它句子明显与财经新闻无关,它们之间的交叉作用可认为对主题预测是一种噪音。
2、FM
3、注意力机制
AFM模型架构:
AutoLayout and animateWithDuration problems
+ (void)animateWithDuration:(NSTimeInterval)duration animations:(void (^)(void))animations completion:(void (^)(BOOL finished))completion NS_AVAILABLE_IOS(4_0); // delay = 0.0, options = 0
当你使用次动画实现方式的时候,需要把当期的View的autolayout 去掉
translatesAutoresizingMaskIntoConstraints = YES;
不然动画没有任何效果
Configuration problem: Failed to import bean definitions from relative location
问题现象:
最近开始做新需求,然后在Tomcat上部署项目时,出现了如下报错:
[12-05 09:54:27,161 ERROR] ContextLoader.java:351 - Context initialization failed
org.springframework.beans.factory.parsing.BeanDefinitionParsingException: Configuration problem: Failed to import bean definitions from relative location [spring-controller-slave.xml]
Offending resource: class path resource [spring.xml]; nested exception is org.springframework.beans.factory.BeanDefinitionStoreException: IOException parsing XML document from URL [file:/E:/java/tomcat/apache-tomcat-8.5.31/webapps/ROOT/WEB-INF/classes/spring-controller-slave.xml]; nested exception is java.io.FileNotFoundException: E:\java\tomcat\apache-tomcat-8.5.31\webapps\ROOT\WEB-INF\classes\spring-controller-slave.xml
具体就是IO解析错误,无法解析XML文件,
IOException parsing XML document from class path resource [file:/E:/java/tomcat/apache-tomcat-8.5.31/webapps/ROOT/WEB-INF/classes/spring-controller-slave.xml]
问题分析:
这个问题是概率性出现的,文件是存在的,但是还是报错了,怀疑是Tomcat缓存问题。
于是清除了Tomcat缓存,clean了项目,甚至update了maven项目,然后再启动tomcat,有概率性成功;如果还是失败,就在Tomcat发布项目的地方,将发布项目的文件夹删除,然后重新clean,一般来说,就会成功了。
这个问题虽然这样可以解决,但具体的原理还是不清楚,待以后慢慢研究,先记下再说。
最终发现是编译后的target文件夹里面里面缺少xml文件导致的,
最简单的就是手动copy缺少的xml文件进去重启tomcat解决
Connecting Language and Knowledge Bases with Embedding Models for Relation Extraction
摘要:这篇文章提出了一种新的从无结构文本中进行关系抽取的方式,这种方式从文本和现存知识中抽取关系。
信息抽取目标在于从无结构文本中生成结构化数据来补全知识库。
这篇文章主要关注基于弱监督从知识库(KB knowledge base)中进行关系抽取(RE relation extraction)
关系(RE)抽取是信息抽取(IE information extraction)的子任务,考虑所有的实体通过不同的方式已经被检测出来,比如命名实体识别。关系抽取是在给定一对提取的实体(h,t)作为上下文的情况下,将文本序列中陈述为真的关系对应到知识库中的关系。该任务是弱监督的,因为文本中检测到的每一个实体对(h,t),所有提及的关系都将被标记为知识库中连接h和t的关系,无论是否实际表达是或否。
我们的方法更容易整合到现存的系统中,因为KB数据是通过额外的评分项目来使用的,评分是预先单独训练的,不共享嵌入式表示。此外,我们实验部分展示了我们系统可以处理大量关系。
3 基于嵌入式的框架(Embedding-based Framework)
我们学习两个模型:
1、将文本中提及的关系对应知识库中关系
2、知识库中实体和关系的嵌入式表示向量
上两种模型使得我们可以同时使用文本语料库和知识库信息进行关系抽取
每一个子模型的目标都是学习知识库中实体或关系,或者文本中的单词或特征的嵌入式表示向量
嵌入式到底是什么
3.1 连接文本和关系
Sm2r(m,r)函数:基于嵌入式,对文本中提到的关系m和知识库中关系r的相似性进行评分。
首先将单词和特征投影到嵌入空间,计算这个投影和关系嵌入之间的相似性(文中的点积)
f(m)将文本中存在的单词或特征的向量简单的累加,再与r相乘
关系抽取的性能指标有时使用针对同一实体对的所有提及聚合的精确召回曲线(precision recall curves)来测量。在这种情况下,需要校准不同提及的预测分数,以便最有信息的分数越高。
今天关于Problems during Skeletonization image for extracting contours的讲解已经结束,谢谢您的阅读,如果想了解更多关于5、AFM(Attention+FM)-----Attentional Factorization Machines:Learning the Weight of Feature Interac...、AutoLayout and animateWithDuration problems、Configuration problem: Failed to import bean definitions from relative location、Connecting Language and Knowledge Bases with Embedding Models for Relation Extraction的相关知识,请在本站搜索。
本文标签: