From 07a785f5319de83522b0d51cff70c04d6d3b0e3c Mon Sep 17 00:00:00 2001 From: Paul Landes Date: Mon, 18 Dec 2017 20:03:58 -0600 Subject: [PATCH 1/3] handle array types --- src/clojure/flambo/sql.clj | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/clojure/flambo/sql.clj b/src/clojure/flambo/sql.clj index 8b3dcfe..041b3b7 100644 --- a/src/clojure/flambo/sql.clj +++ b/src/clojure/flambo/sql.clj @@ -310,19 +310,24 @@ Clojure maps with each map created from its respective row."} * **:name** the name of the column * **:type** the (keyword) type of the column; defaults to `:string` -* **:nullable** boolean of whether the column is nullable; defaults to `true`" +* **:nullable** boolean of whether the column is nullable; defaults to `true` +* **:array-type** like **:name** but type array with all elements of the array + as this type" [defs] (let [metadata (org.apache.spark.sql.types.Metadata/empty)] (->> defs - (clojure.core/map - (fn [{:keys [name type nullable?] - :or {type :string - nullable? true}}] - (->> type - clojure.core/name - (format "\"%s\"") - DataType/fromJson - (#(StructField. name % nullable? metadata))))) + (map (fn [{:keys [name type nullable? array-type] + :or {type :string + nullable? true}}] + (let [json (if array-type + (-> "{\"type\":\"array\",\"elementType\":\"%s\",\"containsNull\":false}" + (format (clojure.core/name array-type)) + DataType/fromJson) + (->> type + clojure.core/name + (format "\"%s\"") + DataType/fromJson))] + (StructField. name json nullable? metadata)))) (into-array StructField) StructType.))) From e69058086d3a1a0d59b244a638c6cf7402cda733 Mon Sep 17 00:00:00 2001 From: Paul Landes Date: Mon, 18 Dec 2017 20:32:26 -0600 Subject: [PATCH 2/3] scala -> clojure conversions; context creation convenience funcs; utility --- src/clojure/flambo/api.clj | 66 ++++++++++++++++++++++++++++++++++ src/clojure/flambo/context.clj | 54 ++++++++++++++++++++++++++++ src/clojure/flambo/sql.clj | 46 ++++++++++-------------- 3 files changed, 139 insertions(+), 27 deletions(-) create mode 100644 src/clojure/flambo/context.clj diff --git a/src/clojure/flambo/api.clj b/src/clojure/flambo/api.clj index 19ba65f..ebb9e7d 100644 --- a/src/clojure/flambo/api.clj +++ b/src/clojure/flambo/api.clj @@ -117,6 +117,72 @@ (conj! v (into [] (._2 t))) (persistent! v))) +(defn tuple + "Create a `scala.Tuple2` from **a** and **b** respectively." + [a b] + (scala.Tuple2. a b)) + +(defn array? + "Return whether or not **a** is a Java array type." + [a] + (= \[ (-> a .getClass .getName (.charAt 0)))) + +(defsparkfn + ^{:doc "Coerce an `org.apache.spark.sql SparkSession.Row` objects into +Clojure maps with each map created from its respective row."} + row->map [^org.apache.spark.sql.Row row] + (let [n (.length row) + schema (.schema row) + fields (if schema (.fieldNames schema))] + (loop [i 0 m (transient {})] + (if (< i n) + (recur (inc i) + (let [coln (if fields + (nth fields i) + (Integer/toString i))] + (assoc! m (keyword coln) (.get row i)))) + (persistent! m))))) + +(defn unwrap + "Coerce a Scala datatype into a Clojure data structure. The function + recusively coerces the data types until only Clojure and Java data types. + The function only recurses **depth-left** iterations deep. + + See [Scala conversions](http://docs.scala-lang.org/overviews/collections/conversions-between-java-and-scala-collections.html)" + ([obj] (unwrap obj (Integer/MAX_VALUE))) + ([obj depth-left] + (let [np (dec depth-left)] + (cond (<= depth-left 0) obj + + (nil? obj) nil + + (or (array? obj) (sequential? obj) (instance? java.util.Collection obj)) + (clojure.core/map #(unwrap % np) obj) + + (instance? scala.Tuple2 obj) (unwrap (untuple obj) np) + + (instance? org.apache.spark.sql.Row obj) (unwrap (row->map obj) np) + + (map? obj) (zipmap (keys obj) (clojure.core/map #(unwrap % np) (vals obj))) + + (instance? scala.collection.Map obj) + (unwrap (scala.collection.JavaConversions/mapAsJavaMap obj) np) + + (instance? scala.collection.Set obj) + (unwrap (scala.collection.JavaConversions/setAsJavaSet obj) np) + + (instance? scala.collection.Iterator obj) + (unwrap (scala.collection.JavaConversions/asJavaIterator obj) np) + + (instance? scala.collection.SeqLike obj) + (unwrap (vec (scala.collection.JavaConversions/asJavaCollection obj)) np) + + ;; useful for data frames + (= (.getClass obj) (Class/forName "[Ljava.io.Serializable;")) + (into [] obj) + + true obj)))) + (defn- ftruthy? [f] (fn [x] (u/truthy? (f x)))) diff --git a/src/clojure/flambo/context.clj b/src/clojure/flambo/context.clj new file mode 100644 index 0000000..7b200c7 --- /dev/null +++ b/src/clojure/flambo/context.clj @@ -0,0 +1,54 @@ +(ns flambo.context + (:import [org.apache.spark SparkContext]) + (:require [clojure.tools.logging :as log] + [flambo.api :as f] + [flambo.conf :as conf])) + +(def ^:dynamic *master* + "Used when creating Spark contexts in [[create-context]]. This is can be a + URL or `yarn` for a yarn cluster. By default this is not necessary to set as + long as the `spark-submit` job is given the `--deploy-mode` option." + nil) + +(def ^:dynamic *app-name* + "The application name given to [[create-context]]." + "flambo") + +(defonce ^:private context-inst (atom nil)) + +(defn databricks-cluster? + "Return whether or not we're running in a databricks cluster." + [] + (contains? (System/getProperties) "databricks.serviceName")) + +(defn- create-context + "Create a spark context using URL [*master*]. By default, this creates a + yarn cluster context." + [] + (log/infof "creating spark context") + (if (databricks-cluster?) + (-> (SparkContext/getOrCreate) + f/spark-context) + (-> (conf/spark-conf) + (conf/app-name *app-name*) + ((if *master* + #(conf/master % *master*) + identity)) + f/spark-context))) + +(defn context + "Return the (single) JVM Spark context. [*master*] is the URL (defaults to a + yarn cluster) and used only on the first use of this function. + + See [[close-context]]." + [] + (swap! context-inst #(or % (create-context)))) + +(defn close-context + "Stop and close and cleanup the Spark Context. + + See [[context]]." + [] + (let [ctx @context-inst] + (and ctx (.stop ctx))) + (reset! context-inst nil)) diff --git a/src/clojure/flambo/sql.clj b/src/clojure/flambo/sql.clj index 041b3b7..6c9c4c3 100644 --- a/src/clojure/flambo/sql.clj +++ b/src/clojure/flambo/sql.clj @@ -210,21 +210,7 @@ (recur (inc i) (conj! v (.get row i))) (persistent! v))))) -(defsparkfn - ^{:doc "Coerce an `org.apache.spark.sql SparkSession.Row` objects into -Clojure maps with each map created from its respective row."} - row->map [^org.apache.spark.sql.Row row] - (let [n (.length row) - schema (.schema row) - fields (if schema (.fieldNames schema))] - (loop [i 0 m (transient {})] - (if (< i n) - (recur (inc i) - (let [coln (if fields - (nth fields i) - (Integer/toString i))] - (assoc! m (keyword coln) (.get row i)))) - (persistent! m))))) +(def row->map f/row->map) (defsparkfn ^{:doc "Coerce an Scala interator into a Clojure sequence"} @@ -316,18 +302,19 @@ Clojure maps with each map created from its respective row."} [defs] (let [metadata (org.apache.spark.sql.types.Metadata/empty)] (->> defs - (map (fn [{:keys [name type nullable? array-type] - :or {type :string - nullable? true}}] - (let [json (if array-type - (-> "{\"type\":\"array\",\"elementType\":\"%s\",\"containsNull\":false}" - (format (clojure.core/name array-type)) - DataType/fromJson) - (->> type - clojure.core/name - (format "\"%s\"") - DataType/fromJson))] - (StructField. name json nullable? metadata)))) + (clojure.core/map + (fn [{:keys [name type nullable? array-type] + :or {type :string + nullable? true}}] + (let [json (if array-type + (-> "{\"type\":\"array\",\"elementType\":\"%s\",\"containsNull\":false}" + (format (clojure.core/name array-type)) + DataType/fromJson) + (->> type + clojure.core/name + (format "\"%s\"") + DataType/fromJson))] + (StructField. name json nullable? metadata)))) (into-array StructField) StructType.))) @@ -373,6 +360,11 @@ See [[query]] for **opts** details." f/collect clojure.pprint/print-table)) +(defn field-value + "Return a column value for field **column-name** for **row**." + [^Row row column-name] + (.get row (.fieldIndex row column-name))) + (def show (memfn show)) From 8ede1f92077b287fd016b1e8fc93338ff9089438 Mon Sep 17 00:00:00 2001 From: zhangzhihong Date: Tue, 26 Dec 2017 20:37:19 +0800 Subject: [PATCH 3/3] add User Defined Functions in SparkSQL --- src/clojure/flambo/function.clj | 11 +++++++++-- test/flambo/sql_test.clj | 15 +++++++++------ 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/clojure/flambo/function.clj b/src/clojure/flambo/function.clj index 4239abf..a0f8d54 100644 --- a/src/clojure/flambo/function.clj +++ b/src/clojure/flambo/function.clj @@ -20,7 +20,11 @@ FilterFunction MapPartitionsFunction MapGroupsFunction - FlatMapGroupsFunction])) + FlatMapGroupsFunction] + [org.apache.spark.sql.api.java + UDF1 + UDF2 + UDF3])) (defn- serfn? [f] (= (type f) :serializable.fn/serializable-fn)) @@ -56,7 +60,7 @@ (gen-class :name ~new-class-sym :extends flambo.function.AbstractFlamboFunction - :implements [~(mk-sym "org.apache.spark.api.java.function.%s" clazz)] + :implements [~(mk-sym (if (re-find #"^UDF" (name clazz)) "org.apache.spark.sql.api.java.%s" "org.apache.spark.api.java.function.%s") clazz)] :prefix ~prefix-sym :init ~'init :state ~'state @@ -100,3 +104,6 @@ (gen-function MapPartitionsFunction map-partitions-function) (gen-function MapGroupsFunction map-groups-function) (gen-function FlatMapGroupsFunction flat-map-groups-function) +(gen-function UDF1 udf) +(gen-function UDF2 udf2) +(gen-function UDF3 udf3) diff --git a/test/flambo/sql_test.clj b/test/flambo/sql_test.clj index 1b03054..1253f13 100644 --- a/test/flambo/sql_test.clj +++ b/test/flambo/sql_test.clj @@ -2,11 +2,11 @@ (:use midje.sweet) (:require [flambo.api :as f] [flambo.conf :as conf] - [flambo.sql :as sql]) + [flambo.sql :as sql] + [flambo.function :as func]) (:import [org.apache.spark.sql functions RelationalGroupedDataset Column] [org.apache.spark.sql.expressions WindowSpec] - [org.apache.spark.sql.types DataTypes IntegerType StringType] - + [org.apache.spark.sql.types DataTypes IntegerType StringType] )) (facts @@ -21,8 +21,8 @@ test-data-2 ["{\"col1\":4,\"col2\":\"a\"}" "{\"col1\":4,\"col2\":\"a\"}" "{\"col1\":6,\"col2\":\"a\"}"] test-df-2 (sql/json-rdd c (f/parallelize sc test-data-2)) _ (sql/register-data-frame-as-table c test-df "foo") - _ (sql/register-data-frame-as-table c test-df-2 "bar")] - + _ (sql/register-data-frame-as-table c test-df-2 "bar") + _ (-> c .udf (.register "UC" (func/udf (f/fn [x] (.toUpperCase x))) DataTypes/StringType))] (fact "with-sql-context gives us a SQLContext" (class c) => org.apache.spark.sql.SQLContext) @@ -37,7 +37,10 @@ (sql/columns df) => ["year" "make" "model" "comment" "blank"])) (fact "SQL queries work" - (f/count (sql/sql c "SELECT * FROM foo WHERE col2 = 'a'")) => 2) + (f/count (sql/sql c "SELECT * FROM foo WHERE col2 = 'a'")) => 2) + + (fact "SQL UDF queries work" + (f/count (sql/sql c "SELECT * FROM foo WHERE UC(col2) = 'A'")) => 2) (fact "table-names gets all tables" (sql/table-names c) => (just ["foo" "bar"] :in-any-order))