Skip to content

Commit

Permalink
Update scalafmt to 3.0.0 (#3953)
Browse files Browse the repository at this point in the history
  • Loading branch information
nevillelyh authored Aug 23, 2021
1 parent 00d46b6 commit 73d731d
Show file tree
Hide file tree
Showing 129 changed files with 2,160 additions and 1,796 deletions.
3 changes: 1 addition & 2 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = "2.7.5"
version = "3.0.0"
maxColumn = 100
lineEndings=preserve
binPack.literalArgumentLists = true
Expand All @@ -18,7 +18,6 @@ verticalMultiline {
newlineBeforeImplicitKW = true
}

docstrings = JavaDoc
docstrings.oneline = fold
docstrings.style = Asterisk

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ val commonSettings = Def
"com\\.spotify\\.scio\\.repl\\..*",
"com\\.spotify\\.scio\\.util\\.MultiJoin",
"com\\.spotify\\.scio\\.smb\\.util\\.SMBMultiJoin"
) ++ (2 to 10).map(x => s"com\\.spotify\\.scio\\.sql\\.Query${x}")).mkString(";"),
) ++ (2 to 10).map(x => s"com\\.spotify\\.scio\\.sql\\.Query$x")).mkString(";"),
coverageHighlighting := true,
licenses := Seq("Apache 2" -> url("http://www.apache.org/licenses/LICENSE-2.0.txt")),
homepage := Some(url("https://github.com/spotify/scio")),
Expand Down
29 changes: 15 additions & 14 deletions scio-avro/src/main/scala/com/spotify/scio/avro/AvroIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,17 @@ final case class ProtobufIO[T <: Message: ClassTag](path: String) extends ScioIO
/**
* Get an SCollection for a Protobuf file.
*
* Protobuf messages are serialized into `Array[Byte]` and stored in Avro files to leverage
* Avro's block file format.
* Protobuf messages are serialized into `Array[Byte]` and stored in Avro files to leverage Avro's
* block file format.
*/
override protected def read(sc: ScioContext, params: ReadP): SCollection[T] =
sc.read(ObjectFileIO[T](path)(protoCoder))

/**
* Save this SCollection as a Protobuf file.
*
* Protobuf messages are serialized into `Array[Byte]` and stored in Avro files to leverage
* Avro's block file format.
* Protobuf messages are serialized into `Array[Byte]` and stored in Avro files to leverage Avro's
* block file format.
*/
override protected def write(data: SCollection[T], params: WriteP): Tap[T] = {
val metadata = params.metadata ++ ProtobufUtil.schemaMetadataOf[T]
Expand Down Expand Up @@ -147,8 +147,8 @@ final case class SpecificRecordIO[T <: SpecificRecord: ClassTag: Coder](path: St
override def testId: String = s"AvroIO($path)"

/**
* Get an SCollection of [[org.apache.avro.specific.SpecificRecord SpecificRecord]]
* from an Avro file.
* Get an SCollection of [[org.apache.avro.specific.SpecificRecord SpecificRecord]] from an Avro
* file.
*/
override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = {
val cls = ScioUtil.classOf[T]
Expand All @@ -157,8 +157,8 @@ final case class SpecificRecordIO[T <: SpecificRecord: ClassTag: Coder](path: St
}

/**
* Save this SCollection of [[org.apache.avro.specific.SpecificRecord SpecificRecord]] as
* an Avro file.
* Save this SCollection of [[org.apache.avro.specific.SpecificRecord SpecificRecord]] as an Avro
* file.
*/
override protected def write(data: SCollection[T], params: WriteP): Tap[T] = {
val cls = ScioUtil.classOf[T]
Expand Down Expand Up @@ -190,7 +190,9 @@ final case class GenericRecordIO(path: String, schema: Schema) extends AvroIO[Ge
sc.applyTransform(t)
}

/** Save this SCollection [[org.apache.avro.generic.GenericRecord GenericRecord]] as a Avro file. */
/**
* Save this SCollection [[org.apache.avro.generic.GenericRecord GenericRecord]] as a Avro file.
*/
override protected def write(
data: SCollection[GenericRecord],
params: WriteP
Expand All @@ -207,9 +209,9 @@ final case class GenericRecordIO(path: String, schema: Schema) extends AvroIO[Ge
}

/**
* Given a parseFn, read [[org.apache.avro.generic.GenericRecord GenericRecord]]
* and apply a function mapping [[GenericRecord => T]] before producing output.
* This IO applies the function at the time of de-serializing Avro GenericRecords.
* Given a parseFn, read [[org.apache.avro.generic.GenericRecord GenericRecord]] and apply a
* function mapping [[GenericRecord => T]] before producing output. This IO applies the function at
* the time of de-serializing Avro GenericRecords.
*
* This IO doesn't define write, and should not be used to write Avro GenericRecords.
*/
Expand All @@ -223,8 +225,7 @@ final case class GenericRecordParseIO[T](path: String, parseFn: GenericRecord =>

/**
* Get an SCollection[T] by applying the [[parseFn]] on
* [[org.apache.avro.generic.GenericRecord GenericRecord]]
* from an Avro file.
* [[org.apache.avro.generic.GenericRecord GenericRecord]] from an Avro file.
*/
override protected def read(sc: ScioContext, params: Unit): SCollection[T] = {
val t = beam.AvroIO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ final class GenericRecordSCollectionOps(private val self: SCollection[GenericRec
extends AnyVal {

/**
* Save this SCollection of type
* [[org.apache.avro.specific.SpecificRecord SpecificRecord]] as an Avro file.
* Save this SCollection of type [[org.apache.avro.specific.SpecificRecord SpecificRecord]] as an
* Avro file.
*/
def saveAsAvroFile(
path: String,
Expand Down Expand Up @@ -75,8 +75,8 @@ final class SpecificRecordSCollectionOps[T <: SpecificRecord](private val self:
extends AnyVal {

/**
* Save this SCollection of type
* [[org.apache.avro.specific.SpecificRecord SpecificRecord]] as an Avro file.
* Save this SCollection of type [[org.apache.avro.specific.SpecificRecord SpecificRecord]] as an
* Avro file.
*/
def saveAsAvroFile(
path: String,
Expand Down Expand Up @@ -114,8 +114,8 @@ final class ProtobufSCollectionOps[T <: Message](private val self: SCollection[T
/**
* Save this SCollection as a Protobuf file.
*
* Protobuf messages are serialized into `Array[Byte]` and stored in Avro files to leverage
* Avro's block file format.
* Protobuf messages are serialized into `Array[Byte]` and stored in Avro files to leverage Avro's
* block file format.
*/
def saveAsProtobufFile(
path: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,20 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal {
self.read(GenericRecordIO(path, schema))

/**
* Get an SCollection of type [[T]] for data stored in Avro format after applying
* parseFn to map a serialized [[org.apache.avro.generic.GenericRecord GenericRecord]]
* to type [[T]].
* Get an SCollection of type [[T]] for data stored in Avro format after applying parseFn to map a
* serialized [[org.apache.avro.generic.GenericRecord GenericRecord]] to type [[T]].
*
* This API should be used with caution as the `parseFn` reads from a `GenericRecord` and hence
* is not type checked.
* This API should be used with caution as the `parseFn` reads from a `GenericRecord` and hence is
* not type checked.
*
* This is intended to be used when attempting to read `GenericRecord`s without specifying a
* schema (hence the writer schema is used to deserialize) and then directly converting
* to a type [[T]] using a `parseFn`. This avoids creation of an intermediate
* `SCollection[GenericRecord]` which can be in efficient because `Coder[GenericRecord]` is
* inefficient without a known Avro schema.
* schema (hence the writer schema is used to deserialize) and then directly converting to a type
* [[T]] using a `parseFn`. This avoids creation of an intermediate `SCollection[GenericRecord]`
* which can be in efficient because `Coder[GenericRecord]` is inefficient without a known Avro
* schema.
*
* Example usage:
* This code reads Avro fields "id" and "name" and de-serializes only those two into CaseClass
* Example usage: This code reads Avro fields "id" and "name" and de-serializes only those two
* into CaseClass
*
* {{{
* val sColl: SCollection[CaseClass] =
Expand All @@ -75,8 +74,8 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal {
self.read(GenericRecordParseIO[T](path, parseFn))

/**
* Get an SCollection of type [[org.apache.avro.specific.SpecificRecord SpecificRecord]]
* for an Avro file.
* Get an SCollection of type [[org.apache.avro.specific.SpecificRecord SpecificRecord]] for an
* Avro file.
*/
def avroFile[T <: SpecificRecord: ClassTag: Coder](path: String): SCollection[T] =
self.read(SpecificRecordIO[T](path))
Expand All @@ -97,8 +96,8 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal {
/**
* Get an SCollection for a Protobuf file.
*
* Protobuf messages are serialized into `Array[Byte]` and stored in Avro files to leverage
* Avro's block file format.
* Protobuf messages are serialized into `Array[Byte]` and stored in Avro files to leverage Avro's
* block file format.
*/
def protobufFile[T <: Message: ClassTag](path: String): SCollection[T] =
self.read(ProtobufIO[T](path))
Expand Down
11 changes: 5 additions & 6 deletions scio-avro/src/main/scala/com/spotify/scio/avro/taps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ final case class SpecificRecordTap[T <: SpecificRecord: ClassTag: Coder](path: S
}

/**
* Tap for reading [[org.apache.avro.generic.GenericRecord GenericRecord]] Avro files and applying
* a parseFn to parse it to the given type [[T]]
* Tap for reading [[org.apache.avro.generic.GenericRecord GenericRecord]] Avro files and applying a
* parseFn to parse it to the given type [[T]]
*/
final case class GenericRecordParseTap[T: Coder](
path: String,
Expand Down Expand Up @@ -96,8 +96,7 @@ final case class AvroTaps(self: Taps) {
self.mkTap(s"Object file: $path", () => self.isPathDone(path), () => ObjectFileTap[T](path))

/**
* Get a `Future[Tap[T]]` for [[org.apache.avro.generic.GenericRecord GenericRecord]] Avro
* file.
* Get a `Future[Tap[T]]` for [[org.apache.avro.generic.GenericRecord GenericRecord]] Avro file.
*/
def avroFile(path: String, schema: Schema): Future[Tap[GenericRecord]] =
self.mkTap(
Expand All @@ -107,8 +106,8 @@ final case class AvroTaps(self: Taps) {
)

/**
* Get a `Future[Tap[T]]` for
* [[org.apache.avro.specific.SpecificRecord SpecificRecord]] Avro file.
* Get a `Future[Tap[T]]` for [[org.apache.avro.specific.SpecificRecord SpecificRecord]] Avro
* file.
*/
def avroFile[T <: SpecificRecord: ClassTag: Coder](path: String): Future[Tap[T]] =
self.mkTap(s"Avro: $path", () => self.isPathDone(path), () => SpecificRecordTap[T](path))
Expand Down
52 changes: 28 additions & 24 deletions scio-avro/src/main/scala/com/spotify/scio/avro/types/AvroType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,22 @@ import scala.reflect.runtime.universe._
* RECORD Nested case class
* }}}
*
* @groupname trait Traits for annotated types
* @groupname annotation Type annotations
* @groupname converters Converters
* @groupname Ungrouped Other Members
* @groupname trait
* Traits for annotated types
* @groupname annotation
* Type annotations
* @groupname converters
* Converters
* @groupname Ungrouped
* Other Members
*/
object AvroType {

/**
* Macro annotation for an Avro schema.
*
* Generate case classes for an Avro schema. Note that `schema` must be a single string literal
* of the JSON schema with optional `.stripMargin` at the end. For example:
* Generate case classes for an Avro schema. Note that `schema` must be a single string literal of
* the JSON schema with optional `.stripMargin` at the end. For example:
*
* {{{
* @AvroType.fromSchema(
Expand Down Expand Up @@ -89,12 +93,12 @@ object AvroType {
/**
* Macro annotation for a path containing Avro files.
*
* Generates case classes from a path which contains Avro files.
* Path needs to represent a folder, hence it always needs to end with `/`.
* Inside of the folder needs to exist at least one file matching `*.avro` glob.
* Generates case classes from a path which contains Avro files. Path needs to represent a folder,
* hence it always needs to end with `/`. Inside of the folder needs to exist at least one file
* matching `*.avro` glob.
*
* Note that path must be a single string literal with optional `.stripMargin` at the end.
* For example:
* Note that path must be a single string literal with optional `.stripMargin` at the end. For
* example:
*
* {{{
* @AvroType.fromPath("gs://myBucket/myFolder/")
Expand All @@ -105,10 +109,10 @@ object AvroType {
*
* {{{
* @AvroType.fromPath(
* """
* """
* | gs://myBucket/myFolder/
* | myLooooooooooooooooongPath/
* """.stripMargin)
* """.stripMargin)
* class MyRecord
* }}}
*
Expand All @@ -132,16 +136,16 @@ object AvroType {
/**
* Macro annotation for a file which contains Avro schema.
*
* Generate case classes for an Avro schema. File can be either local or remote files.
* For example file can be located on Google Cloud Storage (GCS):
* Generate case classes for an Avro schema. File can be either local or remote files. For example
* file can be located on Google Cloud Storage (GCS):
*
* {{{
* @AvroType.fromSchemaFile("gs://myBucket/myFolder/schema-file.avsc")
* class MyRecord
* }}}
*
* For local files, you need to either provide absolute path,
* or path relative to project root directory. For example:
* For local files, you need to either provide absolute path, or path relative to project root
* directory. For example:
*
* {{{
* @AvroType.fromSchemaFile("sub-project/src/main/avro/schema-file.avsc")
Expand Down Expand Up @@ -170,12 +174,12 @@ object AvroType {
* case class Result(name: Option[String] = None, score: Option[Double] = None)
* }}}
*
* It is recommended that you define all of your fields as Option.
* This way you could stop populating them in the future if you notice that you don't need them.
* It is recommended that you define all of your fields as Option. This way you could stop
* populating them in the future if you notice that you don't need them.
*
* This macro doesn't help you with schema evolution.
* It's up to you to follow the best practices on how to do evolution of your Avro schemas.
* Rule of thumb is to only add new fields, without removing the old ones.
* This macro doesn't help you with schema evolution. It's up to you to follow the best practices
* on how to do evolution of your Avro schemas. Rule of thumb is to only add new fields, without
* removing the old ones.
* @group annotation
*/
@compileTimeOnly(
Expand Down Expand Up @@ -217,8 +221,8 @@ object AvroType {
def schemaOf[T: TypeTag]: Schema = SchemaProvider.schemaOf[T]

/**
* Generate a converter function from [[org.apache.avro.generic.GenericRecord GenericRecord]]
* to the given case class `T`.
* Generate a converter function from [[org.apache.avro.generic.GenericRecord GenericRecord]] to
* the given case class `T`.
* @group converters
*/
def fromGenericRecord[T]: GenericRecord => T =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.spotify.scio.cassandra

import java.math.{BigInteger, BigDecimal => JBigDecimal}
import java.math.{BigDecimal => JBigDecimal, BigInteger}
import java.nio.ByteBuffer
import java.time.{Instant, LocalTime}
import java.util.{Date, UUID}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ final case class CassandraIO[T](opts: CassandraOptions) extends ScioIO[T] {
/**
* Save this SCollection as a Cassandra table.
*
* Cassandra `org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter` is used to perform bulk
* writes for better throughput. The [[com.spotify.scio.values.SCollection SCollection]] is
* grouped by the table partition key before written to the cluster. Therefore writes only
* occur at the end of each window in streaming mode. The bulk writer writes to all nodes in a
* cluster so remote nodes in a multi-datacenter cluster may become a bottleneck.
* Cassandra `org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter` is used to perform bulk writes
* for better throughput. The [[com.spotify.scio.values.SCollection SCollection]] is grouped by
* the table partition key before written to the cluster. Therefore writes only occur at the end
* of each window in streaming mode. The bulk writer writes to all nodes in a cluster so remote
* nodes in a multi-datacenter cluster may become a bottleneck.
*/
override protected def write(data: SCollection[T], params: WriteP): Tap[Nothing] = {
val bulkOps = new BulkOperations(opts, params.parallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ package object cassandra {
)

/**
* Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with Cassandra
* methods.
* Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with Cassandra methods.
*/
implicit
class CassandraSCollection[T](@transient private val self: SCollection[T]) extends AnyVal {
Expand All @@ -50,16 +49,19 @@ package object cassandra {
*
* Cassandra `org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter` is used to perform bulk
* writes for better throughput. The [[com.spotify.scio.values.SCollection SCollection]] is
* grouped by the table partition key before written to the cluster. Therefore writes only
* occur at the end of each window in streaming mode. The bulk writer writes to all nodes in a
* cluster so remote nodes in a multi-datacenter cluster may become a bottleneck.
* grouped by the table partition key before written to the cluster. Therefore writes only occur
* at the end of each window in streaming mode. The bulk writer writes to all nodes in a cluster
* so remote nodes in a multi-datacenter cluster may become a bottleneck.
*
* '''NOTE: this module is optimized for throughput in batch mode and not recommended for
* streaming mode.'''
*
* @param opts Cassandra options
* @param parallelism number of concurrent bulk writers, default to number of Cassandra nodes
* @param f function to convert input data to values for the CQL statement
* @param opts
* Cassandra options
* @param parallelism
* number of concurrent bulk writers, default to number of Cassandra nodes
* @param f
* function to convert input data to values for the CQL statement
*/
def saveAsCassandra(
opts: CassandraOptions,
Expand Down
Loading

0 comments on commit 73d731d

Please sign in to comment.