Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Scala] optimize scala collections serialization #765

Closed
Tracked by #682
chaokunyang opened this issue Jul 26, 2023 · 2 comments · Fixed by #1073
Closed
Tracked by #682

[Scala] optimize scala collections serialization #765

chaokunyang opened this issue Jul 26, 2023 · 2 comments · Fixed by #1073
Labels
enhancement New feature or request good first issue Good for newcomers java

Comments

@chaokunyang
Copy link
Collaborator

chaokunyang commented Jul 26, 2023

Is your feature request related to a problem? Please describe.

Scala has its own collections, which is different from jdk collections. And scala collections customized jdk serialization.

For example, scala.collection.immutable.List customized writeReplace method.

@chaokunyang
Copy link
Collaborator Author

chaokunyang commented Nov 2, 2023

@pjfanning I found a simple way to optimize scala collection serialization. Scala collections implements scala.collection.generic.DefaultSerializable:

trait DefaultSerializable extends Serializable { this: scala.collection.Iterable[_] =>
  protected[this] def writeReplace(): AnyRef = {
    val f: Factory[Any, Any] = this match {
      case it: scala.collection.SortedMap[_, _] => it.sortedMapFactory.sortedMapFactory[Any, Any](it.ordering.asInstanceOf[Ordering[Any]]).asInstanceOf[Factory[Any, Any]]
      case it: scala.collection.Map[_, _] => it.mapFactory.mapFactory[Any, Any].asInstanceOf[Factory[Any, Any]]
      case it: scala.collection.SortedSet[_] => it.sortedIterableFactory.evidenceIterableFactory[Any](it.ordering.asInstanceOf[Ordering[Any]])
      case it => it.iterableFactory.iterableFactory
    }
    new DefaultSerializationProxy(f, this)
  }
}

writeReplace returns a DefaultSerializationProxy for serialization:

@SerialVersionUID(3L)
final class DefaultSerializationProxy[A](factory: Factory[A, Any], @transient private[this] val coll: Iterable[A]) extends Serializable {

  @transient protected var builder: Builder[A, Any] = _

  private[this] def writeObject(out: ObjectOutputStream): Unit = {
    out.defaultWriteObject()
    val k = coll.knownSize
    out.writeInt(k)
    var count = 0
    coll.foreach { x =>
      out.writeObject(x)
      count += 1
    }
    if(k >= 0) {
      if(count != k) throw new IllegalStateException(s"Illegal size $count of collection, expected $k")
    } else out.writeObject(SerializeEnd)
  }

  private[this] def readObject(in: ObjectInputStream): Unit = {
    in.defaultReadObject()
    builder = factory.newBuilder
    val k = in.readInt()
    if(k >= 0) {
      builder.sizeHint(k)
      var count = 0
      while(count < k) {
        builder += in.readObject().asInstanceOf[A]
        count += 1
      }
    } else {
      while (true) in.readObject match {
        case SerializeEnd => return
        case a => builder += a.asInstanceOf[A]
      }
    }
  }

  protected[this] def readResolve(): Any = builder.result()
}

We can implememt a serializer for all DefaultSerializable subclasses in fury to adapt to DefaultSerializationProxy.
DefaultSerializationProxy didn't use generics in scala, this can be done in fury too

@chaokunyang
Copy link
Collaborator Author

I'll try to poc this in a upcoming pr, shouldn't take too much work to adapt it to our existing collection serialization abstraction.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request good first issue Good for newcomers java
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant