Skip to content

Commit

Permalink
fix ClassCastException and Type errors in mapVertices; fixes apache#41
Browse files Browse the repository at this point in the history
…and apache#46
  • Loading branch information
vasia committed Jan 6, 2015
1 parent d26445f commit 2729ae7
Showing 1 changed file with 35 additions and 16 deletions.
51 changes: 35 additions & 16 deletions src/main/java/flink/graphs/Graph.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ public DataSet<Edge<K, EV>> getEdges() {
* @return a new graph
*/
public <NV extends Serializable> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper) {
DataSet<Vertex<K, NV>> mappedVertices = vertices.map(new ApplyMapperToVertexWithType<K, VV, NV>(mapper));
TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0);
DataSet<Vertex<K, NV>> mappedVertices = vertices.map(new ApplyMapperToVertexWithType<K, VV, NV>(mapper,
keyType));
return new Graph<K, NV, EV>(mappedVertices, this.getEdges(), this.context);
}

Expand All @@ -115,19 +117,24 @@ private static final class ApplyMapperToVertexWithType<K extends Comparable<K> &
<Vertex<K, VV>, Vertex<K, NV>>, ResultTypeQueryable<Vertex<K, NV>> {

private MapFunction<Vertex<K, VV>, NV> innerMapper;
public ApplyMapperToVertexWithType(MapFunction<Vertex<K, VV>, NV> theMapper) {
private transient TypeInformation<K> keyType;
public ApplyMapperToVertexWithType(MapFunction<Vertex<K, VV>, NV> theMapper, TypeInformation<K> keyType) {
this.innerMapper = theMapper;
this.keyType = keyType;
}

public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
return new Vertex<K, NV>(value.f0, innerMapper.map(value));
}

@SuppressWarnings("unchecked")
@Override
public TypeInformation<Vertex<K, NV>> getProducedType() {
return new TupleTypeInfo<Vertex<K, NV>>(
((TupleTypeInfo<?>)(TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 0, null, null))).getTypeAt(0),
TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null));
TypeInformation<NV> valueType = TypeExtractor
.createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null);
@SuppressWarnings("rawtypes")
TypeInformation<?> returnType = new TupleTypeInfo<Vertex>(Vertex.class, keyType, valueType);
return (TypeInformation<Vertex<K, NV>>) returnType;
}
}

Expand All @@ -137,7 +144,9 @@ public TypeInformation<Vertex<K, NV>> getProducedType() {
* @return
*/
public <NV extends Serializable> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper) {
DataSet<Edge<K, NV>> mappedEdges = edges.map(new ApplyMapperToEdgeWithType<K, EV, NV>(mapper));
TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
DataSet<Edge<K, NV>> mappedEdges = edges.map(new ApplyMapperToEdgeWithType<K, EV, NV>(mapper,
keyType));
return new Graph<K, VV, NV>(this.vertices, mappedEdges, this.context);
}

Expand All @@ -146,21 +155,25 @@ private static final class ApplyMapperToEdgeWithType<K extends Comparable<K> & S
<Edge<K, EV>, Edge<K, NV>>, ResultTypeQueryable<Edge<K, NV>> {

private MapFunction<Edge<K, EV>, NV> innerMapper;
private transient TypeInformation<K> keyType;

public ApplyMapperToEdgeWithType(MapFunction<Edge<K, EV>, NV> theMapper) {
public ApplyMapperToEdgeWithType(MapFunction<Edge<K, EV>, NV> theMapper, TypeInformation<K> keyType) {
this.innerMapper = theMapper;
this.keyType = keyType;
}

public Edge<K, NV> map(Edge<K, EV> value) throws Exception {
return new Edge<K, NV>(value.f0, value.f1, innerMapper.map(value));
}

@SuppressWarnings("unchecked")
@Override
public TypeInformation<Edge<K, NV>> getProducedType() {
TypeInformation<K> keyType = ((TupleTypeInfo<?>)
(TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 0, null, null))).getTypeAt(0);
return new TupleTypeInfo<Edge<K, NV>>(keyType, keyType,
TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null));
TypeInformation<NV> valueType = TypeExtractor
.createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null);
@SuppressWarnings("rawtypes")
TypeInformation<?> returnType = new TupleTypeInfo<Edge>(Edge.class, keyType, keyType, valueType);
return (TypeInformation<Edge<K, NV>>) returnType;
}
}

Expand Down Expand Up @@ -604,9 +617,10 @@ Graph<K, NullValue, EV> create(DataSet<Edge<K, EV>> edges, ExecutionEnvironment
public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
Graph<K, VV, EV> create(DataSet<Edge<K, EV>> edges, final MapFunction<K, VV> mapper,
ExecutionEnvironment context) {
TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
DataSet<Vertex<K, VV>> vertices =
edges.flatMap(new EmitSrcAndTargetAsTuple1<K, EV>())
.distinct().map(new ApplyMapperToVertexValuesWithType<K, VV>(mapper));
.distinct().map(new ApplyMapperToVertexValuesWithType<K, VV>(mapper, keyType));
return new Graph<K, VV, EV>(vertices, edges, context);
}

Expand All @@ -615,20 +629,25 @@ private static final class ApplyMapperToVertexValuesWithType<K extends Comparabl
<Tuple1<K>, Vertex<K, VV>>, ResultTypeQueryable<Vertex<K, VV>> {

private MapFunction<K, VV> innerMapper;
private transient TypeInformation<K> keyType;

public ApplyMapperToVertexValuesWithType(MapFunction<K, VV> theMapper) {
public ApplyMapperToVertexValuesWithType(MapFunction<K, VV> theMapper, TypeInformation<K> keyType) {
this.innerMapper = theMapper;
this.keyType = keyType;
}

public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
return new Vertex<K, VV>(value.f0, innerMapper.map(value.f0));
}

@SuppressWarnings("unchecked")
@Override
public TypeInformation<Vertex<K, VV>> getProducedType() {
return new TupleTypeInfo<Vertex<K, VV>>(
TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 0, null, null),
TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null));
TypeInformation<VV> valueType = TypeExtractor
.createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null);
@SuppressWarnings("rawtypes")
TypeInformation<?> returnType = new TupleTypeInfo<Vertex>(Vertex.class, keyType, valueType);
return (TypeInformation<Vertex<K, VV>>) returnType;
}
}

Expand Down

0 comments on commit 2729ae7

Please sign in to comment.