Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various SQL, api and context functionality adds #130

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions src/clojure/flambo/api.clj
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,72 @@
(conj! v (into [] (._2 t)))
(persistent! v)))

(defn tuple
"Create a `scala.Tuple2` from **a** and **b** respectively."
[a b]
(scala.Tuple2. a b))

(defn array?
"Return whether or not **a** is a Java array type."
[a]
(= \[ (-> a .getClass .getName (.charAt 0))))

(defsparkfn
^{:doc "Coerce an `org.apache.spark.sql SparkSession.Row` objects into
Clojure maps with each map created from its respective row."}
row->map [^org.apache.spark.sql.Row row]
(let [n (.length row)
schema (.schema row)
fields (if schema (.fieldNames schema))]
(loop [i 0 m (transient {})]
(if (< i n)
(recur (inc i)
(let [coln (if fields
(nth fields i)
(Integer/toString i))]
(assoc! m (keyword coln) (.get row i))))
(persistent! m)))))

(defn unwrap
"Coerce a Scala datatype into a Clojure data structure. The function
recusively coerces the data types until only Clojure and Java data types.
The function only recurses **depth-left** iterations deep.

See [Scala conversions](http://docs.scala-lang.org/overviews/collections/conversions-between-java-and-scala-collections.html)"
([obj] (unwrap obj (Integer/MAX_VALUE)))
([obj depth-left]
(let [np (dec depth-left)]
(cond (<= depth-left 0) obj

(nil? obj) nil

(or (array? obj) (sequential? obj) (instance? java.util.Collection obj))
(clojure.core/map #(unwrap % np) obj)

(instance? scala.Tuple2 obj) (unwrap (untuple obj) np)

(instance? org.apache.spark.sql.Row obj) (unwrap (row->map obj) np)

(map? obj) (zipmap (keys obj) (clojure.core/map #(unwrap % np) (vals obj)))

(instance? scala.collection.Map obj)
(unwrap (scala.collection.JavaConversions/mapAsJavaMap obj) np)

(instance? scala.collection.Set obj)
(unwrap (scala.collection.JavaConversions/setAsJavaSet obj) np)

(instance? scala.collection.Iterator obj)
(unwrap (scala.collection.JavaConversions/asJavaIterator obj) np)

(instance? scala.collection.SeqLike obj)
(unwrap (vec (scala.collection.JavaConversions/asJavaCollection obj)) np)

;; useful for data frames
(= (.getClass obj) (Class/forName "[Ljava.io.Serializable;"))
(into [] obj)

true obj))))

(defn- ftruthy?
[f]
(fn [x] (u/truthy? (f x))))
Expand Down
54 changes: 54 additions & 0 deletions src/clojure/flambo/context.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
(ns flambo.context
(:import [org.apache.spark SparkContext])
(:require [clojure.tools.logging :as log]
[flambo.api :as f]
[flambo.conf :as conf]))

(def ^:dynamic *master*
"Used when creating Spark contexts in [[create-context]]. This is can be a
URL or `yarn` for a yarn cluster. By default this is not necessary to set as
long as the `spark-submit` job is given the `--deploy-mode` option."
nil)

(def ^:dynamic *app-name*
"The application name given to [[create-context]]."
"flambo")

(defonce ^:private context-inst (atom nil))

(defn databricks-cluster?
"Return whether or not we're running in a databricks cluster."
[]
(contains? (System/getProperties) "databricks.serviceName"))

(defn- create-context
"Create a spark context using URL [*master*]. By default, this creates a
yarn cluster context."
[]
(log/infof "creating spark context")
(if (databricks-cluster?)
(-> (SparkContext/getOrCreate)
f/spark-context)
(-> (conf/spark-conf)
(conf/app-name *app-name*)
((if *master*
#(conf/master % *master*)
identity))
f/spark-context)))

(defn context
"Return the (single) JVM Spark context. [*master*] is the URL (defaults to a
yarn cluster) and used only on the first use of this function.

See [[close-context]]."
[]
(swap! context-inst #(or % (create-context))))

(defn close-context
"Stop and close and cleanup the Spark Context.

See [[context]]."
[]
(let [ctx @context-inst]
(and ctx (.stop ctx)))
(reset! context-inst nil))
11 changes: 9 additions & 2 deletions src/clojure/flambo/function.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
FilterFunction
MapPartitionsFunction
MapGroupsFunction
FlatMapGroupsFunction]))
FlatMapGroupsFunction]
[org.apache.spark.sql.api.java
UDF1
UDF2
UDF3]))

(defn- serfn? [f]
(= (type f) :serializable.fn/serializable-fn))
Expand Down Expand Up @@ -56,7 +60,7 @@
(gen-class
:name ~new-class-sym
:extends flambo.function.AbstractFlamboFunction
:implements [~(mk-sym "org.apache.spark.api.java.function.%s" clazz)]
:implements [~(mk-sym (if (re-find #"^UDF" (name clazz)) "org.apache.spark.sql.api.java.%s" "org.apache.spark.api.java.function.%s") clazz)]
:prefix ~prefix-sym
:init ~'init
:state ~'state
Expand Down Expand Up @@ -100,3 +104,6 @@
(gen-function MapPartitionsFunction map-partitions-function)
(gen-function MapGroupsFunction map-groups-function)
(gen-function FlatMapGroupsFunction flat-map-groups-function)
(gen-function UDF1 udf)
(gen-function UDF2 udf2)
(gen-function UDF3 udf3)
41 changes: 19 additions & 22 deletions src/clojure/flambo/sql.clj
Original file line number Diff line number Diff line change
Expand Up @@ -210,21 +210,7 @@
(recur (inc i) (conj! v (.get row i)))
(persistent! v)))))

(defsparkfn
^{:doc "Coerce an `org.apache.spark.sql SparkSession.Row` objects into
Clojure maps with each map created from its respective row."}
row->map [^org.apache.spark.sql.Row row]
(let [n (.length row)
schema (.schema row)
fields (if schema (.fieldNames schema))]
(loop [i 0 m (transient {})]
(if (< i n)
(recur (inc i)
(let [coln (if fields
(nth fields i)
(Integer/toString i))]
(assoc! m (keyword coln) (.get row i))))
(persistent! m)))))
(def row->map f/row->map)

(defsparkfn
^{:doc "Coerce an Scala interator into a Clojure sequence"}
Expand Down Expand Up @@ -310,19 +296,25 @@ Clojure maps with each map created from its respective row."}

* **:name** the name of the column
* **:type** the (keyword) type of the column; defaults to `:string`
* **:nullable** boolean of whether the column is nullable; defaults to `true`"
* **:nullable** boolean of whether the column is nullable; defaults to `true`
* **:array-type** like **:name** but type array with all elements of the array
as this type"
[defs]
(let [metadata (org.apache.spark.sql.types.Metadata/empty)]
(->> defs
(clojure.core/map
(fn [{:keys [name type nullable?]
(fn [{:keys [name type nullable? array-type]
:or {type :string
nullable? true}}]
(->> type
clojure.core/name
(format "\"%s\"")
DataType/fromJson
(#(StructField. name % nullable? metadata)))))
(let [json (if array-type
(-> "{\"type\":\"array\",\"elementType\":\"%s\",\"containsNull\":false}"
(format (clojure.core/name array-type))
DataType/fromJson)
(->> type
clojure.core/name
(format "\"%s\"")
DataType/fromJson))]
(StructField. name json nullable? metadata))))
(into-array StructField)
StructType.)))

Expand Down Expand Up @@ -368,6 +360,11 @@ See [[query]] for **opts** details."
f/collect
clojure.pprint/print-table))

(defn field-value
"Return a column value for field **column-name** for **row**."
[^Row row column-name]
(.get row (.fieldIndex row column-name)))


(def show (memfn show))

Expand Down
15 changes: 9 additions & 6 deletions test/flambo/sql_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
(:use midje.sweet)
(:require [flambo.api :as f]
[flambo.conf :as conf]
[flambo.sql :as sql])
[flambo.sql :as sql]
[flambo.function :as func])
(:import [org.apache.spark.sql functions RelationalGroupedDataset Column]
[org.apache.spark.sql.expressions WindowSpec]
[org.apache.spark.sql.types DataTypes IntegerType StringType]

[org.apache.spark.sql.types DataTypes IntegerType StringType]
))

(facts
Expand All @@ -21,8 +21,8 @@
test-data-2 ["{\"col1\":4,\"col2\":\"a\"}" "{\"col1\":4,\"col2\":\"a\"}" "{\"col1\":6,\"col2\":\"a\"}"]
test-df-2 (sql/json-rdd c (f/parallelize sc test-data-2))
_ (sql/register-data-frame-as-table c test-df "foo")
_ (sql/register-data-frame-as-table c test-df-2 "bar")]

_ (sql/register-data-frame-as-table c test-df-2 "bar")
_ (-> c .udf (.register "UC" (func/udf (f/fn [x] (.toUpperCase x))) DataTypes/StringType))]
(fact
"with-sql-context gives us a SQLContext"
(class c) => org.apache.spark.sql.SQLContext)
Expand All @@ -37,7 +37,10 @@
(sql/columns df) => ["year" "make" "model" "comment" "blank"]))

(fact "SQL queries work"
(f/count (sql/sql c "SELECT * FROM foo WHERE col2 = 'a'")) => 2)
(f/count (sql/sql c "SELECT * FROM foo WHERE col2 = 'a'")) => 2)

(fact "SQL UDF queries work"
(f/count (sql/sql c "SELECT * FROM foo WHERE UC(col2) = 'A'")) => 2)

(fact "table-names gets all tables"
(sql/table-names c) => (just ["foo" "bar"] :in-any-order))
Expand Down