Skip to content

Commit

Permalink
Merge branch 'master' into update/auth-2.20.15
Browse files Browse the repository at this point in the history
  • Loading branch information
xerial authored Mar 22, 2023
2 parents 5e57c0c + a2c4fd4 commit a6d918c
Show file tree
Hide file tree
Showing 30 changed files with 605 additions and 298 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ object HttpRequestMapperTest extends AirSpec {
case class NestedRequest(name: String, msg: String)
case class NestedRequest2(id: Int)

case class RequestValidator(message: String) {
if (message != "ok") {
throw RPCStatus.INVALID_REQUEST_U1.newException(s"Unexpected message: ${message}")
}
}

@RPC
trait MyApi {
def rpc1(p1: String): Unit = {}
Expand All @@ -39,8 +45,9 @@ object HttpRequestMapperTest extends AirSpec {
context: HttpContext[Request, Response, Future],
req: HttpRequest[Request]
): Unit = {}
def rpc8(p1: Int): Unit = {}
def rpc9(p1: Option[Int]): Unit = {}
def rpc8(p1: Int): Unit = {}
def rpc9(p1: Option[Int]): Unit = {}
def rpc10(r: RequestValidator): Unit = {}
}

trait MyApi2 {
Expand Down Expand Up @@ -90,15 +97,15 @@ object HttpRequestMapperTest extends AirSpec {

test("detect wrong parameter mapping") {
val r = findRoute("rpc1")
intercept[IllegalArgumentException] {
intercept[RPCException] {
val args = mapArgs(r, _.withJson("""{"p0":"hello"}"""))
warn(args)
}
}

test("forbid mapping a single primitive argument as a body") {
val r = findRoute("rpc1")
intercept[IllegalArgumentException] {
intercept[RPCException] {
// Note: This should work for Endpoint calls
val args = mapArgs(r, _.withContent("""hello"""))
warn(args)
Expand All @@ -113,7 +120,7 @@ object HttpRequestMapperTest extends AirSpec {

test("throw an exception when reading incompatible primitive arguments") {
val r = findRoute("rpc2")
intercept[IllegalArgumentException] {
intercept[RPCException] {
mapArgs(r, _.withJson("""{"p1":"hello","p2":"abc"}"""))
}
}
Expand Down Expand Up @@ -190,41 +197,49 @@ object HttpRequestMapperTest extends AirSpec {

test("throw an error on incompatible type") {
val r = findRoute("rpc8")
intercept[IllegalArgumentException] {
intercept[RPCException] {
mapArgs(r, _.withJson("""{"p1":"abc"}"""))
}
}

test("throw an error on incompatible type in query parameters") {
val r = findRoute("rpc8")
intercept[IllegalArgumentException] {
intercept[RPCException] {
mapArgs(r, { r => r.withUri(s"${r.uri}?p1=abc") })
}
}

test("throw an error on incompatible type in request body") {
val r = findRoute("rpc8")
intercept[IllegalArgumentException] {
intercept[RPCException] {
mapArgs(r, { r => r.withContent("abc") })
}
}

test("throw an error when mapping JSON [1] to Int") {
val r = findRoute("rpc8")
intercept[IllegalArgumentException] {
intercept[RPCException] {
val args = mapArgs(r, { r => r.withJson("""{"p1":[1]}""") })
warn(args)
}
}

test("throw an error on incompatible JSON [1] to Option[X]") {
val r = findRoute("rpc9")
intercept[IllegalArgumentException] {
intercept[RPCException] {
val args = mapArgs(r, { r => r.withJson("""{"p1":[1]}""") })
warn(args)
}
}

test("rpc10: throw an RPCException when serializing request object") {
val r = findRoute("rpc10")
val e = intercept[RPCException] {
mapArgs(r, { r => r.withJson("""{"r":{"message":"xxx"}}""") })
}
e.status shouldBe RPCStatus.INVALID_REQUEST_U1
}

test("construct objects using query parameters for GET") {
val r = findRoute("endpoint1")
val args = mapArgs(r, { r => r.withUri(s"${r.uri}?name=hello&msg=world") }, method = HttpMethod.GET)
Expand All @@ -239,7 +254,7 @@ object HttpRequestMapperTest extends AirSpec {

test("throw an error when incompatible input is found when constructing nested objects with GET") {
val r = findRoute("endpoint2")
intercept[IllegalArgumentException] {
intercept[RPCException] {
mapArgs(r, { r => r.withUri(s"${r.uri}?id=abc") }, method = HttpMethod.GET)
}
}
Expand Down
23 changes: 17 additions & 6 deletions airframe-log/.jvm/src/main/scala/wvlet/log/AsyncHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ package wvlet.log
import java.io.Flushable
import java.util
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{Executors, ThreadFactory}
import java.util.concurrent.{Executors, ThreadFactory, TimeUnit}
import java.util.{logging => jl}

/**
* Logging using background thread
* Logging using a background thread
*/
class AsyncHandler(parent: jl.Handler) extends jl.Handler with Guard with AutoCloseable with Flushable {
private val executor = {
Executors.newCachedThreadPool(
Executors.newSingleThreadExecutor(
new ThreadFactory {
override def newThread(r: Runnable): Thread = {
val t = new Thread(r, "AirframeLogAsyncHandler")
Expand Down Expand Up @@ -47,7 +47,10 @@ class AsyncHandler(parent: jl.Handler) extends jl.Handler with Guard with AutoCl
val records = Seq.newBuilder[jl.LogRecord]
guard {
while (!queue.isEmpty) {
records += queue.pollFirst()
val record = queue.pollFirst()
if (record != null) {
records += record
}
}
}

Expand All @@ -63,13 +66,21 @@ class AsyncHandler(parent: jl.Handler) extends jl.Handler with Guard with AutoCl
}

override def close(): Unit = {
flush()

if (closed.compareAndSet(false, true)) {
flush()
// Wake up the poller thread
guard {
isNotEmpty.signalAll()
}
executor.shutdown()
executor.shutdownNow()
}
}

def closeAndAwaitTermination(timeout: Int = 10, timeUnit: TimeUnit = TimeUnit.MILLISECONDS): Unit = {
close()
while (!executor.awaitTermination(timeout, timeUnit)) {
Thread.sleep(timeUnit.toMillis(timeout))
}
}
}
20 changes: 13 additions & 7 deletions airframe-log/.jvm/src/main/scala/wvlet/log/LogEnv.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
package wvlet.log
import java.io.{FileDescriptor, FileOutputStream, PrintStream}
import java.lang.management.ManagementFactory
import javax.management.{InstanceAlreadyExistsException, ObjectName, MBeanServer}
import wvlet.log.LogFormatter.SourceCodeLogFormatter

import java.io.PrintStream
import java.lang.management.ManagementFactory
import javax.management.{InstanceAlreadyExistsException, MBeanServer, ObjectName}

/**
*/
private[log] object LogEnv extends LogEnvBase {
override def isScalaJS: Boolean = false
override def defaultLogLevel: LogLevel = LogLevel.INFO

override lazy val defaultConsoleOutput: PrintStream = {
override val defaultConsoleOutput: PrintStream = {
// Note: In normal circumstances, using System.err here is fine, but
// System.err can be replaced with other implementation
// (e.g., airlift.Logging, which is used in Trino https://github.com/airlift/airlift/blob/master/log-manager/src/main/java/io/airlift/log/Logging.java),
// so we create a stderr stream explicitly here.
new PrintStream(new FileOutputStream(FileDescriptor.err))
// If that happens, we may need to create a stderr stream explicitly like this
// new PrintStream(new FileOutputStream(FileDescriptor.err))

// Use the standard System.err for sbtn native client
System.err
}
override def defaultHandler: java.util.logging.Handler = {
new ConsoleLogHandler(SourceCodeLogFormatter)
Expand All @@ -35,7 +39,6 @@ private[log] object LogEnv extends LogEnvBase {

// When class is an anonymous trait
if (name.contains("$anon$")) {
import collection.JavaConverters._
val interfaces = cl.getInterfaces
if (interfaces != null && interfaces.length > 0) {
// Use the first interface name instead of the anonymous name
Expand Down Expand Up @@ -75,6 +78,9 @@ private[log] object LogEnv extends LogEnvBase {
try {
Some(ManagementFactory.getPlatformMBeanServer)
} catch {
case e: ClassNotFoundException =>
// Pre-registered wvlet.log.AirframeLogManager might not be found when reloading the project in IntelliJ, so skip this error.
None
case e: Throwable =>
// Show an error once without using the logger itself
e.printStackTrace()
Expand Down
29 changes: 16 additions & 13 deletions airframe-log/.jvm/src/test/scala/wvlet/log/AsyncHandlerTest.scala
Original file line number Diff line number Diff line change
@@ -1,34 +1,37 @@
package wvlet.log

import java.util.concurrent.{ExecutorService, Executors, TimeUnit}

import wvlet.log.LogFormatter.BareFormatter
import wvlet.log.LogFormatter.{BareFormatter, ThreadLogFormatter}
import wvlet.log.io.IOUtil._
import wvlet.log.io.Timer

/**
*/
class AsyncHandlerTest extends Spec with Timer {

test("start background thread") {
val buf = new BufferedLogHandler(BareFormatter)
withResource(new AsyncHandler(buf)) { h =>
val logger = Logger("wvlet.log.asynctest")
logger.addHandler(h)
val buf = new BufferedLogHandler(ThreadLogFormatter)
val l1 = "hello async logger"
val l2 = "log output will be processed in a background thread"

val handler = new AsyncHandler(buf)
withResource(handler) { h =>
val logger = Logger("internal.asynctest")
logger.resetHandler(h)
logger.setLogLevel(LogLevel.INFO)

val l1 = "hello async logger"
val l2 = "log output will be processed in a background thread"
logger.info(l1)
logger.warn(l2)
logger.debug(l1) // should be ignored

h.flush()

val logs = buf.logs
assert(logs.size == 2)
assert(logs(0) == l1)
assert(logs(1) == l2)
}

handler.closeAndAwaitTermination()
val logs = buf.logs
logs.size shouldBe 2
logs(0).contains(l1) shouldBe true
logs(1).contains(l2) shouldBe true
}

test("not block at the logging code") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ class ResourceTest extends AirSpec {
}

test("find resources from jar files") {
info("find files from a jar file")
debug("find files from a jar file")

val l = Resource.listResources("scala.io", { (s: String) => s.endsWith(".class") })
assert(l.size > 0)
for (each <- l) {
info(each)
debug(each)
assert(each.url.toString.contains("/scala/io"))
}
}
Expand Down
3 changes: 2 additions & 1 deletion airframe-log/src/main/scala/wvlet/log/Handler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class BufferedLogHandler(formatter: LogFormatter) extends jl.Handler {
override def flush(): Unit = {}
override def publish(record: jl.LogRecord): Unit =
synchronized {
buf += formatter.format(record)
val log = formatter.format(record)
buf += log
}
override def close(): Unit = {
// do nothing
Expand Down
2 changes: 1 addition & 1 deletion airframe-log/src/test/scala/wvlet/log/LoggerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class LoggerTest extends Spec {
Logger.setDefaultFormatter(SourceCodeLogFormatter)
capture {
val l = Logger("org.sample")
info(s"logger name: ${l.getName}")
debug(s"logger name: ${l.getName}")
l.info("hello logger")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ object ParquetQueryPlanner extends LogSupport {
case _ =>
throw new IllegalArgumentException(s"Unknown column ${a.value}: ${op}")
}
case op @ NotEq(a: Identifier, l: Literal, _) =>
case op @ NotEq(a: Identifier, l: Literal, _, _) =>
findParameterType(a.value) match {
case Some(PrimitiveTypeName.INT32) =>
FilterApi.notEq(FilterApi.intColumn(a.value), java.lang.Integer.valueOf(l.stringValue))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ object TypeResolver extends LogSupport {
lookup(i.value).map(toResolvedAttribute(i.value, _))
case u @ UnresolvedAttribute(qualifier, name, _) =>
lookup(u.fullName).map(toResolvedAttribute(name, _).withQualifier(qualifier))
case a @ AllColumns(_, None, _) =>
case a @ AllColumns(qualifier, None, _) =>
// Resolve the inputs of AllColumn as ResolvedAttribute
// so as not to pull up too much details
val allColumns = resolvedAttributes.map {
Expand All @@ -471,7 +471,10 @@ object TypeResolver extends LogSupport {
case other =>
toResolvedAttribute(other.name, other)
}
List(a.copy(columns = Some(allColumns)))
List(a.copy(columns = Some((qualifier match {
case Some(q) => allColumns.filter(_.qualifier.contains(q))
case None => allColumns
}).map(_.withQualifier(None)))))
case _ =>
List(expr)
}
Expand Down
Loading

0 comments on commit a6d918c

Please sign in to comment.