Skip to content

Commit

Permalink
[DF] Add first tutorial for distributed rdf
Browse files Browse the repository at this point in the history
  • Loading branch information
vepadulano committed Mar 31, 2021
1 parent 46401b9 commit 126c9c8
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 0 deletions.
10 changes: 10 additions & 0 deletions tutorials/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,16 @@ if(ROOT_pyroot_FOUND)
list(APPEND pyveto pyroot/pyroot004_NumbaDeclare.py)
endif()

# Rules specific to distributed RDataFrame with Pyspark
# Disable distributed RDF tutorials if we didn't check dependencies in the environment first
if(NOT test_distrdf_pyspark)
list(APPEND pyveto dataframe/distrdf*)
endif()
# Use main Python executable to run in PySpark driver and executors
if(test_distrdf_pyspark)
list(APPEND ROOT_environ PYSPARK_PYTHON=${PYTHON_EXECUTABLE_Development_Main})
endif()

find_python_module(xgboost QUIET)
if(NOT PY_XGBOOST_FOUND OR NOT dataframe)
file(GLOB tmva_veto_py RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} tmva/tmva10*.py)
Expand Down
77 changes: 77 additions & 0 deletions tutorials/dataframe/distrdf001_spark_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
## \file
## \ingroup tutorial_dataframe
## \notebook -draw
## Configure a Spark connection and fill two histograms distributedly.
##
## This tutorial shows the ingredients needed to setup the connection to a Spark
## cluster, namely a SparkConf object holding configuration parameters and a
## SparkContext object created with the desired options. After this initial
## setup, an RDataFrame with distributed capabilities is created and connected
## to the SparkContext instance. Finally, a couple of histograms are drawn from
## the created columns in the dataset.
##
## \macro_code
## \macro_image
##
## \date March 2021
## \author Vincenzo Eduardo Padulano
import pyspark
import ROOT

# Point RDataFrame calls to Spark RDataFrame object
RDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame

# Setup the connection to Spark
# First create a dictionary with keys representing Spark specific configuration
# parameters. In this tutorial we use the following configuration parameters:
#
# 1. spark.app.name: The name of the Spark application
# 2. spark.master: The Spark endpoint responsible for running the
# application. With the syntax "local[4]" we signal Spark we want to run
# locally on the same machine with 4 cores, each running a separate
# process. The default behaviour of a Spark application would run
# locally on the same machine with as many concurrent processes as
# available cores, that could be also written as "local[*]".
#
# If you have access to a remote cluster you should substitute the endpoint URL
# of your Spark master in the form "spark://HOST:PORT" in the value of
# `spark.master`. Depending on the availability of your cluster you may request
# more computing nodes or cores per node with a similar configuration:
#
# sparkconf = pyspark.SparkConf().setAll(
# {"spark.master": "spark://HOST:PORT",
# "spark.executor.instances": <number_of_nodes>,
# "spark.executor.cores" <cores_per_node>,}.items())
#
# You can find all configuration options and more details in the official Spark
# documentation at https://spark.apache.org/docs/latest/configuration.html .

# Create a SparkConf object with all the desired Spark configuration parameters
sparkconf = pyspark.SparkConf().setAll(
{"spark.app.name": "distrdf001_spark_connection",
"spark.master": "local[4]", }.items())
# Create a SparkContext with the configuration stored in `sparkconf`
sparkcontext = pyspark.SparkContext(conf=sparkconf)

# Create an RDataFrame that will use Spark as a backend for computations
df = RDataFrame(1000, sparkcontext=sparkcontext)

# Set the random seed and define two columns of the dataset with random numbers.
ROOT.gRandom.SetSeed(1)
df_1 = df.Define("gaus", "gRandom->Gaus(10, 1)").Define("exponential", "gRandom->Exp(10)")

# Book an histogram for each column
h_gaus = df_1.Histo1D(("gaus", "Normal distribution", 50, 0, 30), "gaus")
h_exp = df_1.Histo1D(("exponential", "Exponential distribution", 50, 0, 30), "exponential")

# Plot the histograms side by side on a canvas
c = ROOT.TCanvas("distrdf001", "distrdf001", 1280, 800)
c.Divide(2, 1)
c.cd(1)
h_gaus.DrawCopy()
c.cd(2)
h_exp.DrawCopy()

# Save the canvas
c.SaveAs("distrdf001_spark_connection.png")
print("Saved figure to distrdf001_spark_connection.png")

0 comments on commit 126c9c8

Please sign in to comment.