-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPrestoSource.scala
376 lines (303 loc) · 14.5 KB
/
PrestoSource.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
package build.unstable.sonicd.source
import akka.actor._
import akka.http.scaladsl.model.HttpHeader.ParsingResult
import akka.http.scaladsl.model._
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.actor.ActorPublisher
import build.unstable.sonicd.model.JsonProtocol._
import build.unstable.sonicd.model._
import build.unstable.sonicd.source.http.HttpSupervisor
import build.unstable.sonicd.source.http.HttpSupervisor.HttpRequestCommand
import build.unstable.sonicd.{BuildInfo, SonicdConfig}
import build.unstable.tylog.Variation
import spray.json._
import scala.collection.immutable.Seq
import scala.concurrent.duration.{Duration, _}
object Presto {
def getSupervisorName(masterUrl: String): String = s"presto_$masterUrl"
object Headers {
private def mkHeader(name: String, value: String): HttpHeader =
HttpHeader.parse(name, value).asInstanceOf[ParsingResult.Ok].header
val USER = mkHeader("X-Presto-User", _: String)
val SOURCE = mkHeader("X-Presto-Source", "sonicd/" + BuildInfo.version + "/" + BuildInfo.commit)
val CATALOG = mkHeader("X-Presto-Catalog", _: String)
val SCHEMA = mkHeader("X-Presto-Schema", _: String)
val TZONE = mkHeader("X-Presto-Time-Zone", _: String)
val LANG = mkHeader("X-Presto-Language", _: String)
val SESSION = mkHeader("X-Presto-Session", _: String)
val SETSESSION = mkHeader("X-Presto-Set-Session", _: String)
val CLEARSESSION = mkHeader("X-Presto-Clear-Session", _: String)
val TID = mkHeader("X-Presto-Transaction-Id", _: String)
val STARTEDTID = mkHeader("X-Presto-Started-Transaction-Id", _: String)
val CLEARTID = mkHeader("X-Presto-Clear-Transaction-Id", _: String)
val STATE = mkHeader("X-Presto-Current-State", _: String)
val MAXW = mkHeader("X-Presto-Max-Wait", _: String)
val MAXS = mkHeader("X-Presto-Max-Size", _: String)
val TAID = mkHeader("X-Presto-Task-Instance-Id", _: String)
val PID = mkHeader("X-Presto-Page-Sequence-Id", _: String)
val PAGENEXT = mkHeader("X-Presto-Page-End-Sequence-Id", _: String)
val BUFCOMPLETE = mkHeader("X-Presto-Buffer-Complete", _: String)
}
case class StatementStats(state: String,
scheduled: Boolean,
nodes: Int,
totalSplits: Int,
queuedSplits: Int,
runningSplits: Int,
completedSplits: Int,
userTimeMillis: Int,
cpuTimeMillis: Int,
processedRows: Int,
processedBytes: Int)
case class ErrorLocation(lineNumber: Int, columnNumber: Int)
case class FailureInfo(message: String,
stack: Vector[String],
errorLocation: Option[ErrorLocation])
case class ErrorMessage(message: String,
errorCode: Int,
errorName: String,
errorType: String,
failureInfo: FailureInfo)
case class QueryStats(elapsedTime: String,
queuedTime: String,
totalTasks: Int,
runningTasks: Int,
completedTasks: Int)
case class Data(rows: Option[Vector[Vector[JsValue]]])
case class Root(source: Data, columns: Vector[String])
case class Plan(root: Root, distribution: String)
case class OutputStage(state: String,
plan: Plan,
types: Vector[String])
case class ColMeta(name: String, _type: String)
case class QueryResults(id: String,
infoUri: String,
partialCancelUri: Option[String],
nextUri: Option[String],
columns: Option[Vector[ColMeta]],
data: Option[Vector[Vector[JsValue]]],
stats: StatementStats,
error: Option[ErrorMessage],
updateType: Option[String],
updateCount: Option[Long]) extends HttpSupervisor.Traceable {
override def setTraceId(newId: String): HttpSupervisor.Traceable =
this.copy(id = newId)
}
case class ErrorCode(code: Int, name: String)
val USER_ERROR = "USER_ERROR"
val SYNTAX_ERROR = "SYNTAX_ERROR"
class PrestoError(msg: String) extends Exception(msg)
implicit var statementStatsJsonFormat: RootJsonFormat[StatementStats] = jsonFormat11(StatementStats.apply)
implicit var queryStatsJsonFormat: RootJsonFormat[QueryStats] = jsonFormat5(QueryStats.apply)
implicit var colMetaJsonFormat: RootJsonFormat[ColMeta] = new RootJsonFormat[ColMeta] {
override def write(obj: ColMeta): JsValue = throw new DeserializationException("json write of ColMeta not implemented")
override def read(json: JsValue): ColMeta = {
val f = json.asJsObject.fields
ColMeta(f("name").convertTo[String], f("type").convertTo[String])
}
}
implicit var dataJsonFormat: RootJsonFormat[Data] = jsonFormat1(Data.apply)
implicit var rootJsonFormat: RootJsonFormat[Root] = jsonFormat2(Root.apply)
implicit var planJsonFormat: RootJsonFormat[Plan] = jsonFormat2(Plan.apply)
implicit var outputStageJsonFormat: RootJsonFormat[OutputStage] = jsonFormat3(OutputStage.apply)
implicit var errorLocation: RootJsonFormat[ErrorLocation] = jsonFormat2(ErrorLocation.apply)
implicit var failureInfoFormat: RootJsonFormat[FailureInfo] = jsonFormat3(FailureInfo.apply)
implicit var errorMessageJsonFormat: RootJsonFormat[ErrorMessage] = jsonFormat5(ErrorMessage.apply)
implicit val queryResultsJsonFormat: RootJsonFormat[QueryResults] = jsonFormat10(QueryResults.apply)
}
class PrestoSource(query: Query, actorContext: ActorContext, context: RequestContext)
extends DataSource(query, actorContext, context) {
def prestoSupervisorProps(masterUrl: String, masterPort: Int): Props =
Props(classOf[PrestoSupervisor], masterUrl, masterPort)
val masterUrl: String = getConfig[String]("url")
val masterPort: Int = getOption[Int]("port").getOrElse(8889)
val supervisorName = Presto.getSupervisorName(masterUrl)
def getSupervisor(name: String): ActorRef = {
actorContext.child(name).getOrElse {
actorContext.actorOf(prestoSupervisorProps(masterUrl, masterPort), supervisorName)
}
}
lazy val handlerProps: Props = {
//if no presto supervisor has been initialized yet for this presto cluster, initialize one
val prestoSupervisor = actorContext.child(supervisorName).getOrElse {
actorContext.actorOf(prestoSupervisorProps(masterUrl, masterPort), supervisorName)
}
Props(classOf[PrestoPublisher], query.traceId.get, query.query, prestoSupervisor,
SonicdConfig.PRESTO_WATERMARK, SonicdConfig.PRESTO_MAX_RETRIES,
SonicdConfig.PRESTO_RETRYIN, SonicdConfig.PRESTO_RETRY_MULTIPLIER, context)
}
}
class PrestoSupervisor(val masterUrl: String, val port: Int)
extends HttpSupervisor[Presto.QueryResults] {
lazy val debug: Boolean = false
lazy val poolSettings: ConnectionPoolSettings = ConnectionPoolSettings(SonicdConfig.PRESTO_CONNECTION_POOL_SETTINGS)
implicit lazy val jsonFormat: RootJsonFormat[Presto.QueryResults] = Presto.queryResultsJsonFormat
lazy val httpEntityTimeout: FiniteDuration = SonicdConfig.PRESTO_HTTP_ENTITY_TIMEOUT
lazy val extraHeaders: Seq[HttpHeader] = scala.collection.immutable.Seq(Presto.Headers.SOURCE)
override def cancelRequestFromResult(t: Presto.QueryResults): Option[HttpRequest] =
t.partialCancelUri.map { uri ⇒
HttpRequest(HttpMethods.DELETE, uri)
}
}
class PrestoPublisher(traceId: String, query: String,
supervisor: ActorRef,
watermark: Int,
maxRetries: Int,
retryIn: FiniteDuration,
retryMultiplier: Int,
ctx: RequestContext)
extends ActorPublisher[SonicMessage] with SonicdLogging {
import Presto._
import akka.stream.actor.ActorPublisherMessage._
import context.dispatcher
//in case this publisher never gets subscribed to
override def subscriptionTimeout: Duration = 1.minute
@throws[Exception](classOf[Exception])
override def postStop(): Unit = {
debug(log, "stopping presto publisher of '{}'", traceId)
retryScheduled.map(c ⇒ if (!c.isCancelled) c.cancel())
context unwatch supervisor
}
@throws[Exception](classOf[Exception])
override def preStart(): Unit = {
debug(log, "starting presto publisher of '{}'", traceId)
context watch supervisor
}
/* HELPERS */
def tryPushDownstream() {
while (isActive && totalDemand > 0 && buffer.nonEmpty) {
onNext(buffer.dequeue())
}
}
def tryPullUpstream() {
if (lastQueryResults.isDefined && lastQueryResults.get.nextUri.isDefined && (buffer.isEmpty || shouldQueryAhead)) {
val cmd = HttpRequestCommand(traceId,
HttpRequest(HttpMethods.GET, lastQueryResults.get.nextUri.get, headers = headers))
supervisor ! cmd
lastQueryResults = None
}
}
def shouldQueryAhead: Boolean = watermark > 0 && buffer.length < watermark
def getTypeMetadata(v: Vector[ColMeta]): TypeMetadata = {
TypeMetadata(v.map {
case ColMeta(name, "boolean") ⇒ (name, JsBoolean(false))
case ColMeta(name, "bigint") ⇒ (name, JsNumber(0L))
case ColMeta(name, "double") ⇒ (name, JsNumber(0d))
case ColMeta(name, "varchar" | "time" | "date" | "timestamp") ⇒ (name, JsString(""))
case ColMeta(name, "varbinary") ⇒ (name, JsArray(JsNumber(0)))
case ColMeta(name, "array") ⇒ (name, JsArray.empty)
case ColMeta(name, "json" | "map") ⇒ (name, JsObject(Map.empty[String, JsValue]))
case ColMeta(name, anyElse) ⇒
warning(log, "could not map type {}", anyElse)
(name, JsString(""))
})
}
val submitUri = s"/${SonicdConfig.PRESTO_APIV}/statement"
val headers: scala.collection.immutable.Seq[HttpHeader] =
Seq(ctx.user.map(u ⇒ Headers.USER(u.user)).getOrElse(Headers.USER("sonicd")))
val queryCommand = {
val entity: RequestEntity = query
val httpRequest = HttpRequest.apply(HttpMethods.POST, submitUri, entity = entity, headers = headers)
HttpRequestCommand(traceId, httpRequest)
}
val units = Some("splits")
/* STATE */
var bufferedMeta: Boolean = false
val buffer = scala.collection.mutable.Queue.empty[SonicMessage]
var lastQueryResults: Option[QueryResults] = None
var retryScheduled: Option[Cancellable] = None
var retried = 0
var callType: CallType = ExecuteStatement
var completedSplits = 0
/* BEHAVIOUR */
def terminating(done: DoneWithQueryExecution): Receive = {
tryPushDownstream()
if (buffer.isEmpty && isActive && totalDemand > 0) {
onNext(done)
onCompleteThenStop()
}
{
case r: Request ⇒ terminating(done)
}
}
def materialized: Receive = commonReceive orElse {
case Request(n) ⇒
tryPushDownstream()
tryPullUpstream()
case r: QueryResults ⇒
debug(log, "received query results of query '{}'", r.id)
lastQueryResults = Some(r)
//extract type metadata
if (!bufferedMeta && r.columns.isDefined) {
buffer.enqueue(getTypeMetadata(r.columns.get))
bufferedMeta = true
}
val splits = r.stats.completedSplits - completedSplits
val totalSplits = Some(r.stats.totalSplits.toDouble)
completedSplits = r.stats.completedSplits
r.stats.state match {
case "QUEUED" | "PLANNING" ⇒
buffer.enqueue(QueryProgress(QueryProgress.Waiting, splits, totalSplits, units))
r.data.foreach(d ⇒ d.foreach(va ⇒ buffer.enqueue(OutputChunk(va))))
tryPullUpstream()
case "RUNNING" | "STARTING" | "FINISHING" ⇒
buffer.enqueue(QueryProgress(QueryProgress.Running, splits, totalSplits, units))
r.data.foreach(d ⇒ d.foreach(va ⇒ buffer.enqueue(OutputChunk(va))))
tryPullUpstream()
case "FINISHED" ⇒
r.data.foreach(d ⇒ d.foreach(va ⇒ buffer.enqueue(OutputChunk(va))))
trace(log, traceId, callType, Variation.Success, r.stats.state)
context.become(terminating(done = DoneWithQueryExecution.success))
case "FAILED" ⇒
val error = r.error.get
val e = new Exception(error.message)
trace(log, traceId, callType, Variation.Failure(e),
"query status is FAILED: {}", error)
error.errorCode match {
// presto-main/src/main/java/com/facebook/presto/operator/HttpPageBufferClient.java
// PAGE_TRANSPORT_TIMEOUT | REMOTE_TASK_ERROR | REMOTE_TASK_MISMATCH
case 65540 | 65542 | 65544 if retried < maxRetries ⇒
debug(log, "error code is {}. retrying..", error.errorCode)
retried += 1
callType = RetryStatement(retried)
retryScheduled = Some(context.system.scheduler
.scheduleOnce(if (retryMultiplier > 0) retryIn * retryMultiplier * retried else retryIn,
new Runnable { override def run(): Unit = runStatement(callType, queryCommand) }))
case _ ⇒
debug(log, "error code is {}, skipping retry", error.errorCode)
context.become(terminating(DoneWithQueryExecution.error(e)))
}
case state ⇒
val msg = s"unexpected query state from presto $state"
val e = new Exception(msg)
trace(log, traceId, callType, Variation.Failure(e), msg)
context.become(terminating(DoneWithQueryExecution.error(e)))
}
tryPushDownstream()
case Status.Failure(e) ⇒
trace(log, traceId, callType, Variation.Failure(e), "something went wrong with the http request")
context.become(terminating(DoneWithQueryExecution.error(e)))
}
def runStatement(callType: CallType, post: HttpRequestCommand) {
trace(log, traceId, callType, Variation.Attempt,
"send query to supervisor in path {}", supervisor.path)
supervisor ! post
}
def commonReceive: Receive = {
case Cancel ⇒
log.debug("client canceled")
onComplete()
context.stop(self)
}
def receive: Receive = commonReceive orElse {
case SubscriptionTimeoutExceeded ⇒
log.info(s"no subscriber in within subs timeout $subscriptionTimeout")
onCompleteThenStop()
//first time client requests
case Request(n) ⇒
buffer.enqueue(QueryProgress(QueryProgress.Started, 0, None, None))
runStatement(callType, queryCommand)
tryPushDownstream()
context.become(materialized)
}
}