-
Notifications
You must be signed in to change notification settings - Fork 2
/
appendix.tex
88 lines (75 loc) · 3.53 KB
/
appendix.tex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
\chapter{Source code of programs used in experiments}\label{appendixa}
\section{Single source shortest paths}
\begin{Verbatim}[label=Shortest paths - SparkDatalog]
val query = """ declare Path(int v, int dist aggregate Min).
Path(x, d) :- s == """ + sourceId + """ , Edge(s, x, d).
Path(x, d) :- Path(y, da), Edge(y, x, db), d = da + db."""
val resultDatabase = database.datalog(query)
\end{Verbatim}
\vspace{0cm}
\begin{Verbatim}[label=Shortest paths - Spark]
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => \{ // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) \{
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
\} else \{
Iterator.empty
\}
\},
(a,b) => math.min(a,b)) // Merge Message
val shortestPaths = sssp.vertices.map(v => (v._1, v._2))
\end{Verbatim}
\section{Triangles counting}
\begin{Verbatim}[label=Triangles counting - SparkDatalog]
val query = """ |declare Triangle(int v, int w, int u).
|declare Total(int a, int b aggregate Sum).
|Triangle(x, y, z) :- Edge(x, y), x < y, Edge (y, z), y < z, Edge(x, z).
|Total(a, c) :- Triangle(x, y, z), a = 1, c = 1. """.stripMargin
val resultDatabase: Database = database.datalog(query)
\end{Verbatim}
\vspace{0cm}
\begin{Verbatim}[label=Triangles counting - Spark]
val canonicalEdges = edgesRdd.filter(Function.tupled(_ < _)).distinct().cache()
val swappedEdgesRdd = canonicalEdges.map(_.swap)
val pathOf2 = swappedEdgesRdd.join(canonicalEdges).map( \{ case (y, (x, z)) => (x, (y, z)) \} )
val triangle = pathOf2.join(canonicalEdges)
.filter(\{ case (x, ((y, z), zp)) => z == zp \})
.map(\{ case (x, ((y, z), zp)) => (x, y, z) \})
val count = triangle.count()
\end{Verbatim}
\section{Connected components}
\begin{Verbatim}[label=Connected components - SparkDatalog]
val query = """ |declare Component(int n, int component aggregate Min).
|declare ComponentId(int n).
|Component(n, i) :- Node(n), i = n.
|Component(n, i) :- Component(p, i), Edge(p, n).
|ComponentId(id) :- Component(x, id). """.stripMargin
val resultDatabase = database.datalog(query)
\end{Verbatim}
\vspace{0.4cm}
\begin{Verbatim}[label=Connected components - Spark]
val initialGraph = graph.mapVertices((id, _) => id.toInt)
val connectedComponents = initialGraph.pregel(Int.MaxValue)(
(id, cmp, newCmp) => math.min(cmp, newCmp), // Vertex Program
triplet => \{ // Send Message
if (triplet.srcAttr < triplet.dstAttr) \{
Iterator((triplet.dstId, triplet.srcAttr))
\} else \{
Iterator.empty
\}
\},
(a, b) => math.min(a, b)) // Merge Message
\end{Verbatim}
\chapter{Contents of the attached CD}\label{appendixb}
The attached CD contains the implementation of the SparkDatalog extension for Spark described in this thesis and the results of the experiments. The repository containing the implementation is also published at \emph{https://github.com/marekrogala/sparkdatalog}. The CD has the following structure:
\begin{itemize}
\item \emph{implementation/}
\begin{itemize}
\item \emph{parsergen/} --- parser generator files,
\item \emph{sparkdatalog/} --- SparkDatalog extension source code,
\end{itemize}
\item \emph{experiments/} --- results of experiments,
\item \emph{thesis/} --- this thesis in electronic version.
\end{itemize}