This repository was archived by the owner on Feb 3, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 66
/
Copy pathspark.py
226 lines (175 loc) · 7.43 KB
/
spark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
"""
Code that handle spark configuration
"""
import datetime
import time
import os
import json
import shutil
from subprocess import call, Popen, check_output
from typing import List
import azure.batch.models as batchmodels
from core import config
from install import pick_master
batch_client = config.batch_client
spark_home = "/home/spark-current"
spark_conf_folder = os.path.join(spark_home, "conf")
def get_pool() -> batchmodels.CloudPool:
return batch_client.pool.get(config.pool_id)
def get_node(node_id: str) -> batchmodels.ComputeNode:
return batch_client.compute_node.get(config.pool_id, node_id)
def list_nodes() -> List[batchmodels.ComputeNode]:
"""
List all the nodes in the pool.
"""
# TODO use continuation token & verify against current/target dedicated of
# pool
return batch_client.compute_node.list(config.pool_id)
def setup_connection():
"""
This setup spark config with which nodes are slaves and which are master
"""
master_node_id = pick_master.get_master_node_id(
batch_client.pool.get(config.pool_id))
master_node = get_node(master_node_id)
master_config_file = os.path.join(spark_conf_folder, "master")
master_file = open(master_config_file, 'w')
print("Adding master node ip {0} to config file '{1}'".format(
master_node.ip_address, master_config_file))
master_file.write("{0}\n".format(master_node.ip_address))
master_file.close()
def wait_for_master():
print("Waiting for master to be ready.")
master_node_id = pick_master.get_master_node_id(
batch_client.pool.get(config.pool_id))
if master_node_id == config.node_id:
return
while True:
master_node = get_node(master_node_id)
if master_node.state in [batchmodels.ComputeNodeState.idle, batchmodels.ComputeNodeState.running]:
break
else:
print("{0} Still waiting on master", datetime.datetime.now())
time.sleep(10)
def start_spark_master():
master_ip = get_node(config.node_id).ip_address
exe = os.path.join(spark_home, "sbin", "start-master.sh")
cmd = [exe, "-h", master_ip, "--webui-port",
str(config.spark_web_ui_port)]
print("Starting master with '{0}'".format(" ".join(cmd)))
call(cmd)
try:
start_history_server()
except Exception as e:
print("Failed to start history server with the following exception:")
print(e)
def start_history_server():
# configure the history server
spark_event_log_enabled_key = 'spark.eventLog.enabled'
spark_event_log_directory_key = 'spark.eventLog.dir'
path_to_spark_defaults_conf = os.path.join(spark_home, 'conf/spark-defaults.conf')
properties = parse_configuration_file(path_to_spark_defaults_conf)
# only enable the history server if it was enabled in the configuration file
if properties and spark_event_log_enabled_key in properties:
if spark_event_log_directory_key in properties:
configure_history_server_log_path(properties[spark_event_log_directory_key])
exe = os.path.join(spark_home, "sbin", "start-history-server.sh")
cmd = [exe]
print("Starting history server")
call(cmd)
def start_spark_worker():
wait_for_master()
exe = os.path.join(spark_home, "sbin", "start-slave.sh")
master_node_id = pick_master.get_master_node_id(
batch_client.pool.get(config.pool_id))
master_node = get_node(master_node_id)
cmd = [exe, "spark://{0}:7077".format(master_node.ip_address),
"--webui-port", str(config.spark_worker_ui_port)]
print("Connecting to master with '{0}'".format(" ".join(cmd)))
call(cmd)
# enable the shuffle service on all slaves
start_shuffle_service()
def start_shuffle_service():
exe = os.path.join(spark_home, "sbin", "start-shuffle-service.sh")
print("Starting the shuffle service with {}".format(exe))
call([exe, " &"])
def copyfile(src, dest):
try:
os.makedirs(os.path.dirname(dest), exist_ok=True)
shutil.copyfile(src, dest)
file_stat = os.stat(dest)
os.chmod(dest, file_stat.st_mode | 0o777)
except Exception as e:
print("Failed to copy", src)
print(e)
def setup_conf():
"""
Copy spark conf files to spark_home if they were uplaoded
"""
copy_spark_env()
copy_core_site()
copy_spark_defaults()
copy_jars()
setup_ssh_keys()
def setup_ssh_keys():
pub_key_path_src = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'id_rsa.pub')
priv_key_path_src = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'id_rsa')
ssh_key_dest = '/root/.ssh'
if not os.path.exists(ssh_key_dest):
os.mkdir(ssh_key_dest)
copyfile(pub_key_path_src, os.path.join(ssh_key_dest, os.path.basename(pub_key_path_src)))
copyfile(priv_key_path_src, os.path.join(ssh_key_dest, os.path.basename(priv_key_path_src)))
def copy_spark_env():
spark_env_path_src = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'conf/spark-env.sh')
spark_env_path_dest = os.path.join(spark_home, 'conf/spark-env.sh')
copyfile(spark_env_path_src, spark_env_path_dest)
def copy_spark_defaults():
spark_default_path_src = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'conf/spark-defaults.conf')
spark_default_path_dest = os.path.join(spark_home, 'conf/spark-defaults.conf')
copyfile(spark_default_path_src, spark_default_path_dest)
def copy_core_site():
spark_core_site_src = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'conf/core-site.xml')
spark_core_site_dest = os.path.join(spark_home, 'conf/core-site.xml')
copyfile(spark_core_site_src, spark_core_site_dest)
def copy_jars():
# Copy jars to $SPARK_HOME/jars
spark_default_path_src = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'jars')
spark_default_path_dest = os.path.join(spark_home, 'jars')
try:
jar_files = os.listdir(spark_default_path_src)
for jar in jar_files:
src = os.path.join(spark_default_path_src, jar)
dest = os.path.join(spark_default_path_dest, jar)
print("copy {} to {}".format(src, dest))
copyfile(src, dest)
except Exception as e:
print("Failed to copy jar files with error:")
print(e)
def parse_configuration_file(path_to_file: str):
try:
file = open(path_to_file, 'r')
properties = {}
for line in file:
if (not line.startswith('#') and len(line) > 1):
split = line.split()
properties[split[0]] = split[1]
return properties
except Exception as e:
print("Failed to parse configuration file:", path_to_file, "with error:")
print(e)
def configure_history_server_log_path(path_to_log_file):
# Check if the file path starts with a local file extension
# If so, create the path on disk otherwise ignore
print('Configuring spark history server log directory {}.'.format(path_to_log_file))
if path_to_log_file.startswith('file:/'):
# create the local path on disk
directory = path_to_log_file.replace('file:', '')
if os.path.exists(directory):
print('Skipping. Directory {} already exists.'.format(directory))
else:
print('Create direcotory {}.'.format(directory))
os.makedirs(directory)
# Make sure the directory can be accessed by all users
os.chmod(directory, mode=0o777)
else:
print('Skipping. The eventLog directory is not local.')