Skip to content

Commit

Permalink
feat(livy): connect livy.
Browse files Browse the repository at this point in the history
  • Loading branch information
locona committed Feb 4, 2020
1 parent 0db6885 commit 087d9c8
Show file tree
Hide file tree
Showing 9 changed files with 297 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"python.pythonPath": "/Users/locona/.local/share/virtualenvs/livy-tyJWaNGI/bin/python"
}
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.DEFAULT_GOAL := pyspark

pyspark:
@python pyspark.py
13 changes: 13 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]

[packages]
requests = "*"
flake8 = "*"

[requires]
python_version = "3.7"
93 changes: 93 additions & 0 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import requests
from pprint import pprint

host = 'http://localhost:8998'
headers = {'Content-Type': 'application/json'}


def delete_all():
r = requests.get(host + '/sessions', {}, headers=headers)
sessions = r.json()['sessions']
for sess in sessions:
session_id = sess["id"]
session_url = f"http://localhost:8998/sessions/${session_id}"
print(session_url)
r = requests.delete(session_url, headers=headers)
pprint(r.json())


delete_all()
39 changes: 39 additions & 0 deletions livy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from pprint import pprint
import json, pprint, requests, textwrap

host = 'htp://localhost:8998'
data = {'kind': 'spark'}
headers = {'Content-Type': 'application/json'}

r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)

session_url = host + r.headers['location']
r = requests.get(session_url, headers=headers)
pprint(r.json())
pprint(r.headers)

pprint(r.json())

statements_url = session_url + '/statements'
data = {'code': '1 + 1'}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
pprint(r.json())

data = {
'code': textwrap.dedent("""
val NUM_SAMPLES = 100000;
val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
val x = Math.random();
val y = Math.random();
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _);
println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
""")
}

r = requests.post(statements_url, data=json.dumps(data), headers=headers)
pprint(r.json())

statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
pprint.pprint(r.json())
56 changes: 56 additions & 0 deletions pyspark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import json
import requests
import textwrap
import sys

from pprint import pprint

host = 'http://localhost:8998'
headers = {'Content-Type': 'application/json'}


def session():
data = {'kind': 'pyspark'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
pprint(r.json())

session_url = host + r.headers['location']
r = requests.get(session_url, headers=headers)
pprint(r.json())
print(f"session_url: ${session_url}")


def run(session_url):
statements_url = session_url + '/statements'
data = {'code': '1 + 1'}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
print(f"statements_url: ${statements_url}")
r.json()

data = {
"code": textwrap.dedent("""
import random
NUM_SAMPLES = 100000
def sample(p):
x, y = random.random(), random.random()
return 1 if x*x + y*y < 1 else 0
count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample).reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
""")
}

print(f"statements_url: ${statements_url}")
r = requests.post(statements_url, data = json.dumps(data), headers = headers)

print()
pprint(r.json())


if __name__ == '__main__':
args = sys.argv
print(args)
if len(args) == 1:
session()
else:
run(args[1])
57 changes: 57 additions & 0 deletions spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import json
import requests
import textwrap
import sys

from pprint import pprint

host = 'http://localhost:8998'
headers = {'Content-Type': 'application/json'}


def session():
data = {'kind': 'spark'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
pprint(r.json())

session_url = host + r.headers['location']
r = requests.get(session_url, headers=headers)
pprint(r.json())
print(f"session_url: ${session_url}")


def run(session_url):
statements_url = session_url + '/statements'
data = {'code': '1 + 1'}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
print(f"statements_url: ${statements_url}")
r.json()

data = {
'code': textwrap.dedent("""
val NUM_SAMPLES = 100000;
val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
val x = Math.random();
val y = Math.random();
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _);
println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
""")
}

print(f"statements_url: ${statements_url}")
r = requests.post(statements_url, data=json.dumps(data), headers=headers)

statement_url = host + r.headers['location']
print(statement_url)
r = requests.get(statement_url, headers=headers)
pprint(r.json())


if __name__ == '__main__':
args = sys.argv
print(args)
if len(args) == 1:
session()
else:
run(args[1])
13 changes: 13 additions & 0 deletions statement.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import requests
from pprint import pprint

host = 'http://localhost:8998'
headers = {'Content-Type': 'application/json'}


def delete_all():
statement_url = "http://localhost:8998/sessions/0/statements"
r = requests.get(statement_url, headers=headers)
pprint(r.json())

delete_all()

0 comments on commit 087d9c8

Please sign in to comment.