Skip to content

Spark HyperLogLog Functions

Peter J. Martel edited this page Dec 15, 2020 · 4 revisions

HyperLogLog background

Precise distinct counts are expensive to compute in Spark because:

  1. When large cardinalities are involved, precise distinct counts require a lot of memory/IO, and

  2. Every row of data has to be processed for every query because distinct counts cannot be reaggregated

HyperLogLog (HLL) can address both problems. Memory use is reduced via the tight binary sketch representation and the data can be pre-aggregated because HLL binary sketches--unlike distinct counts--are mergeable. It is unfortunate that Spark's HLL implementation does not expose the binary HLL sketches, which makes its usefulness rather limited: it addresses (1) but not (2) above.

Additional reading

spark-alchemy's approach to approximate counting

The HLL Spark native functions in spark-alchemy provide two key benefits:

  1. They expose HLL sketches as binary columns, enabling 1,000+x speedups in approximate distinct count computation via pre-aggregation.

  2. They enable interoperability at the HLL sketch level with other data processing systems. We use an open-source HLL library with an independent storage specification and built-in support for Postgres-compatible databases and even JavaScript. This allows Spark to serve as a universal data (pre-)processing platform for systems that require fast query turnaround times, e.g., portals & dashboards.

spark-alchemy allows selecting between two underlying HLL implementations:

  • The Aggregate Knowledge (AGKN or AGGREGATE_KNOWLEDGE) implementation is compatible with Postgres.

  • The StreamLib (STRM or STREAM_LIB) implementation uses an improved cardinality estimation algorithm (HLL++) that corrects some bias and improves accuracy.

spark-alchemy provides a richer set of HLL functions than either Spark or BigQuery. The full list of HLL functions is:

  • hll_cardinality(hll_sketch[, implementation]): returns the cardinality (distinct count) of items in the set represented by the HLL sketch.

    • implementation: here and everywhere else, determines the underlying HLL implementation to use (one of AGKN/AGGREGATE_KNOWLEDGE/STRM/STREAM_LIB). See "Default Implementation" below for other ways to configure which implementation is used.
  • hll_intersect_cardinality(hll_sketch, hll_sketch[, implementation]): computes a merged (unioned) sketch and uses the fact that |A intersect B| = (|A| + |B|) - |A union B| to estimate the intersection cardinality of the two sketches.

  • hll_init(column[, relativeSD[, implementation]]): creates an HLL sketch (a binary column) for each value in column. hll_init() is designed for use outside of aggregation (GROUP BY) as a privacy protection tool (because it hashes potentially sensitive IDs) and/or to prepare granular data for re-aggregation. Instead of hll_init(), you will typically use hll_init_agg().

    • relativeSD: here and everywhere else, determines the expected relative error for cardinality computation. The smaller the error, the greater the size of the binary sketch. The default value is 0.05 (5%), just as with Spark's approx_distinct_count().
    • The name relativeSD stands for relative standard deviation as the confidence interval for the error is at one standard deviation (SD) around the mean. In a true normal (Gaussian) distribution, 68% of values fall within one standard deviation from the mean. For HLL, Google suggests using a 65% confidence interval (CI) for 1 SD instead of 68%. The relative error at 95% CI (2 SD) will be twice as large and at 99% (3 SD, again Google suggests a slightly more conservative value than the 99.7% for a true Gaussian) four times as large.
    • See HLL in Spark for background on the relationship between precision and computation cost. Sometimes, it is worth using high precision and suffering the slow computation as long as the data can be pre-aggregated as follow-on re-aggregation will perform much faster on the existing aggregates.
  • hll_init_agg(column[, relativeSD[, implementation]]): like hll_init(), it creates an HLL sketch, but designed for use with aggregation (GROUP BY), i.e., it creates a sketch for all values in a group. Logically equivalent to hll_merge(hll_init(...)).

  • hll_init_collection(array_or_map_column[, relativeSD[, implementation]]): creates an HLL sketch for each value in the column, which has to be an array or a map. Having collection versions of HLL functions is a performance optimization, eliminating the need to explode & re-group data.

  • hll_init_collection_agg(array_or_map_column[, relativeSD[, implementation]]): like hll_init_collection(), but designed for use with aggregation (GROUP BY), i.e., it creates a sketch for all values in the arrays/maps of a group. Logically equivalent to hll_merge(hll_init_collection(...)).

  • hll_merge(hll_sketch[, implementation]): merges HLL sketches during an aggregation operation.

  • hll_row_merge([implementation,] hll_sketches*): merges multiple sketches in one row into a single field.

  • hll_convert(hll_sketch, from, to): Converts between sketch types. Currently the only conversion supported is StreamLib to Aggregate Knowledge—this can be used to load existing StreamLib sketches into Postgres.

    • NOTE: converted sketches SHOULD NOT be merged with non-converted sketches of the same type—it will result in double counting of all values that are present in both a converted and "native" sketch.

Using HLL functions

Default Implementation

You can change the default HLL implementation used via a Spark session setting (com.swoop.alchemy.spark.expressions.hll.#IMPLEMENTATION_CONFIG_KEY):

spark.conf.set("com.swoop.alchemy.hll.implementation", "AGGREGATE_KNOWLEDGE")

If the config value is unset the default implementation will be StreamLib.

From SparkSQL

For maximum performance and flexibility, spark-alchemy implements HLL functionality as native Spark functions. Similar to user-defined functions, native functions require registration using the following Scala command.

// Register spark-alchemy HLL functions for use from SparkSQL
com.swoop.alchemy.spark.expressions.hll.HLLFunctionRegistration.registerFunctions(spark)

NOTE: OSS Spark has an extension mechanism for automatically registering native function (SPARK-25560). Unfortunately, this and other Spark extensions such as custom optimization rules don't work by default on Databricks, where we do much of our Spark processing, which is why we have not prioritized implementing this capability. PRs are welcome. (To learn more about Spark extensions, see this presentation.)

From Scala

import com.swoop.alchemy.spark.expressions.hll.functions._

Example set up

The following examples assume we are working with 100,000 distinct IDs from 0..99,999 in a dataframe with a single column id. SparkSQL examples assume this data is available as a table/view called ids. You can create it using

spark.range(100000).createOrReplaceTempView("ids")

Basic approximate counting

Let's compute the distinct count of IDs using exact counting, Spark's built-in approximate counting function and spark-alchemy's functions. We'll look at the output at different precisions.

You will note that for basic approximate counts spark-alchemy requires two functions (hll_cardinality(hll_init_agg(...))) vs. Spark's single (approx_count_distinct(...)). The reason will become apparent in the next section.

SparkSQL

select
    -- exact distinct count
    count(distinct id) as cntd,
    -- Spark's HLL implementation with default 5% precision
    approx_count_distinct(id) as anctd_spark_default,
    -- approximate distinct count with default 5% precision
    hll_cardinality(hll_init_agg(id)) as acntd_default,
    -- approximate distinct counts with custom precision
    map(
        0.005, hll_cardinality(hll_init_agg(id, 0.005)),
        0.020, hll_cardinality(hll_init_agg(id, 0.020)),
        0.050, hll_cardinality(hll_init_agg(id, 0.050)),
        0.100, hll_cardinality(hll_init_agg(id, 0.100))
    ) as acntd
from ids

Scala

import org.apache.spark.sql.functions._
import com.swoop.alchemy.spark.expressions.hll.functions._

spark.range(100000).select(
  // exact distinct count
  countDistinct('id).as("cntd"),
  // Spark's HLL implementation with default 5% precision
  approx_count_distinct('id).as("anctd_spark_default"),
  // approximate distinct count with default 5% precision
  hll_cardinality(hll_init_agg('id)).as("acntd_default"),
  // approximate distinct counts with custom precision
  map(
    Seq(0.005, 0.02, 0.05, 0.1).flatMap { error =>
      lit(error) :: hll_cardinality(hll_init_agg('id, error)) :: Nil
    }: _*
  ).as("acntd")
).show(false)

Output

+------+-------------------+-------------+-------------------------------------------------------------+
|cntd  |anctd_spark_default|acntd_default|acntd                                                        |
+------+-------------------+-------------+-------------------------------------------------------------+
|100000|95546              |98566        |[0.005 -> 99593, 0.02 -> 98859, 0.05 -> 98566, 0.1 -> 106476]|
+------+-------------------+-------------+-------------------------------------------------------------+

High-performance distinct counts for analytics

The following example shows how to use the pre-aggregate -> re-aggregate -> finalize pattern for high-performance distinct counting. We will calculate approximate distinct counts for odd vs. even (modulo 2) IDs in three steps:

  1. Pre-aggregate the data modulo 10 using hll_init_agg().
  2. Re-aggregate modulo 2 using hll_merge().
  3. Produce a final result using hll_cardinality().

In a product environment, the pre-aggregates from step (1) and, in the case of very large data, re-aggregations at various granularities will be computed and persisted so that final reports can be created without having to look at the rows of data from step (1).

SparkSQL

with
-- pre-aggregates contain a binary HLL sketch column
pre_aggregate as (
  select
    id % 10 as id_mod10,
    hll_init_agg(id) as hll_id
  from ids
  group by id % 10
),
-- HLL sketch columns can be re-aggregated using hll_merge() just like counts can be re-aggregated with sum().
-- Note that you cannot combine distinct counts with sum(); this is where HLL sketches shine.
aggregate as (
  select
    id_mod10 % 2 as id_mod2,
    hll_merge(hll_id) as hll_id
  from pre_aggregate
  group by id_mod10 % 2
)
-- When a final result has to be produced, use hll_cardinality() on the HLL sketches
select
  id_mod2,
  hll_cardinality(hll_id) as acntd
from aggregate
order by id_mod2

Scala

spark.range(100000)
  // pre-aggregate
  .groupBy(('id % 10).as("id_mod10"))
  .agg(hll_init_agg('id).as("hll_id"))
  // reaggregate
  .groupBy(('id_mod10 % 2).as("id_mod2"))
  .agg(hll_merge('hll_id).as("hll_id"))
  // final report
  .select('id_mod2, hll_cardinality('hll_id).as("acntd"))
  .orderBy('id_mod2)
  .show(false)

Output

+-------+-----+
|id_mod2|acntd|
+-------+-----+
|0      |47305|
|1      |53156|
+-------+-----+

Approximate counting of collection elements

Let's change the previous example such that, instead of grouping the IDs modulo 10 into HLL sketches, we collect them in arrays.

SparkSQL

with
-- Rather than grouping the modulo 10 IDs into binary sketches, collect them in arrays
grouped as (
  select
    id % 10 as id_mod10,
    collect_list(id) as ids
  from ids
  group by id % 10
),
-- For aggregation, use hll_init_collection_agg() to create HLL sketches
aggregate as (
  select
    id_mod10 % 2 as id_mod2,
    hll_init_collection_agg(ids) as hll_id
  from grouped
  group by id_mod10 % 2
)
-- When a final result has to be produced, use hll_cardinality() as before
select
  id_mod2,
  hll_cardinality(hll_id) as acntd
from aggregate
order by id_mod2

Scala

import org.apache.spark.sql.functions._
import com.swoop.alchemy.spark.expressions.hll.functions._

spark.range(100000)
  // group into arrays
  .groupBy(('id % 10).as("id_mod10"))
  .agg(collect_list('id).as("ids"))
  // aggregate
  .groupBy(('id_mod10 % 2).as("id_mod2"))
  .agg(hll_init_collection_agg('ids).as("hll_id"))
  // final report
  .select('id_mod2, hll_cardinality('hll_id).as("acntd"))
  .orderBy('id_mod2)
  .show(false)

Output

+-------+-----+
|id_mod2|acntd|
+-------+-----+
|0      |47305|
|1      |53156|
+-------+-----+

Binding HLL sketch precision

Imagine yourself having to build HLL sketches for various columns using the same precision. Now imagine yourself having to do this over and over in many different cells in a notebook. Wouldn't it be nice to not have to keep typing the precision when you use hll_init_* in your transformations? Some of us at Swoop thought so and we added BoundHLL just for this purpose. In the following slightly longer example we will compute approximate distinct counts for odd vs. even IDs using different precisions.

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._
import com.swoop.alchemy.spark.expressions.hll.functions._
import com.swoop.alchemy.spark.expressions.hll.BoundHLL

def preAggregateIds(error: Double)(ds: Dataset[_]) = {
  val hll = BoundHLL(error)
  import hll._ // imports hll_init_* versions bound to error
  ds.toDF("id")
    .groupBy(('id % 2).as("id_mod"))
    .agg(hll_init_agg('id).as("hll_id"))
    .withColumn("error", lit(error))
}

val ids = spark.range(100000)

Seq(0.005, 0.01, 0.02, 0.05, 0.1)
  .map(error => ids.transform(preAggregateIds(error)))
  .reduce(_ union _)
  .groupBy('error).pivot('id_mod)
  .agg(hll_cardinality(hll_merge('hll_id)).as("acntd"))
  .orderBy('error)
  .show(false)
+-----+-----+-----+
|error|0    |1    |
+-----+-----+-----+
|0.005|49739|49908|
|0.01 |49740|49662|
|0.02 |51024|49712|
|0.05 |47305|53156|
|0.1  |52324|56113|
+-----+-----+-----+

You can use the import hll._ pattern in a notebook cell to bind all hll_init_*() functions in all notebook cells to the precision provided to BoundHLL but you have to keep two things in mind that another import, e.g., com.swoop.alchemy.spark.expressions.hll.functions._ can override the bound import. Also, if you attach the notebook to a new Spark/REPL session, you have to re-run the import. For that reason, we typically recommend that notebook-level precision binding happens in a single cell, e.g.,

import com.swoop.alchemy.spark.expressions.hll.functions._
import com.swoop.alchemy.spark.expressions.hll.BoundHLL

val hll = BoundHLL(0.02) // Spark default is 0.05
import hll._

When you bind the precision of hll_init_*(), attempting to use a version with an explicit precision will generate a compiler error. This is by design. To keep things consistent, we also bind the precision of Spark's own approx_count_distinct().

BoundHLL.apply() takes an optional implicit Implementation, which allows you to specify the HLL implementation used.

The space cost of precision

HLL sketch size depends on the desired precision and is independent of data size. A simple rule of thumb is that a 2x increase in HLL cardinality estimation precision requires a 4x increase in the size of HLL sketches.

spark.range(100000)
  .select(map(
    Seq(0.005, 0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.1).flatMap { error =>
      lit(error) :: length(hll_init_agg('id, error)) :: Nil
    }: _*
  ).as("lengths"))
  .select(explode('lengths).as("error" :: "sketch_size_in_bytes" :: Nil))
  .show(false)
+-----+--------------------+
|error|sketch_size_in_bytes|
+-----+--------------------+
|0.005|43702               |
|0.01 |10933               |
|0.02 |2741                |
|0.03 |1377                |
|0.04 |693                 |
|0.05 |353                 |
|0.06 |353                 |
|0.07 |181                 |
|0.08 |181                 |
|0.09 |181                 |
|0.1  |96                  |
+-----+--------------------+

Python Interoperability

The script below lays out a simple way to register and use spark-alchemy HLL functions through PySpark. Note that this is just an example, and can be expanded upon greatly to mirror HLLFunctions.

from pyspark.sql import SparkSession
from pyspark.sql import Column
from pyspark import SparkContext
from pyspark.sql.functions import expr, col, array


def register_hll_functions(sc: SparkContext):
    sc._jvm.com.swoop.alchemy.spark.expressions.hll.HLLFunctionRegistration.registerFunctions(spark._jsparkSession)


def hll_init(c: Column) -> Column:
    """
    Creates a composable HLL sketch for each row
    """
    return expr('hll_init({})'.format(unsafe_sql(c)))


def hll_init_collection(c: Column) -> Column:
    """
    Creates a composable HLL sketch for each input row. Column must be a collection type (array, map), and collection
    elements are treated as individual values.
    """
    return expr('hll_init_collection({})'.format(unsafe_sql(c)))


def hll_init_agg(c: Column) -> Column:
    """
    Combines all input into a single HLL sketch
    """
    return expr('hll_init_agg({})'.format(unsafe_sql(c)))


def hll_init_collection_agg(c: Column) -> Column:
    """
    Combines all input into a single HLL sketch. Column must be a collection type (array, map), and collection
    elements are treated as individual values.
    """
    return expr('hll_init_collection_agg({})'.format(unsafe_sql(c)))


def hll_merge(c: Column) -> Column:
    """
    Aggregates all sketches into a single merged sketch that represents the union of the constituents.
    """
    return expr('hll_merge({})'.format(unsafe_sql(c)))


def hll_row_merge(*c: Column) -> Column:
    """
    Merges multiple "sketches" in one row into a single field.
    """
    return expr('hll_row_merge({})'.format(', '.join(list(map(unsafe_sql, c)))))


def hll_cardinality(c: Column) -> Column:
    """
    Returns the estimated cardinality of an HLL "sketch"
    """
    return expr('hll_cardinality({})'.format(unsafe_sql(c)))


def hll_intersect_cardinality(left: Column, right: Column) -> Column:
    """
    Returns the estimated intersection cardinality of the binary representations produced by
    HyperLogLog. Computes a merged (unioned) sketch and uses the fact that |A intersect B| = (|A| + |B|) - |A union B|.
    Returns null if both sketches are null, but 0 if only one is
    """
    return expr('hll_intersect_cardinality({})'.format(', '.join(list(map(unsafe_sql, [left, right])))))


def unsafe_sql(c: Column) -> str:
    return c._jc.expr.__call__().sql.__call__()


if __name__ == '__main__':
    spark = SparkSession.builder.appName('foo').getOrCreate()
    sc = spark.sparkContext

    # Register HLL functions
    register_hll_functions(sc)

    ids = spark \
        .range(100000). \
        toDF('primary_id') \
        .withColumn('secondary_ids', array(col('primary_id') + 1, col('primary_id') + 2, col('primary_id') + 3))

    # primary_id cardinality
    ids.select(hll_cardinality(hll_init_agg(col('primary_id'))).alias('primary_ids_count')).show()

    # Alternately, you can do
    ids.select(hll_cardinality(hll_merge(hll_init(col('primary_id')))).alias('primary_ids_count')).show()

    # secondary_ids cardinality per row
    ids.select(
        col('*'),
        hll_cardinality(hll_init_collection(col('secondary_ids'))).alias('secondary_ids_count_per_row'),
    ).show()

    # secondary_ids cardinality across all rows
    ids.select(
        hll_cardinality(hll_init_collection_agg(col('secondary_ids'))).alias('secondary_ids_count'),
    ).show()

    # Alternately, you can do
    ids.select(
        hll_cardinality(hll_merge(hll_init_collection(col('secondary_ids')))).alias('secondary_ids_count'),
    ).show()

    # For each row, compute |primary_id union secondary_ids|
    ids.select(
        col('*'),
        hll_cardinality(
            hll_row_merge(hll_init(col('primary_id')), hll_init_collection(col('secondary_ids')))
        ).alias('primary_and_secondary_id_count')
    ).show()

    # For each row, compute |primary_id intersect secondary_ids|
    ids.select(
        col('*'),
        hll_intersect_cardinality(
            hll_init(col('primary_id')),
            hll_init_collection(col('secondary_ids'))
        ).alias('primary_and_secondary_intersect_count')
    ).show()

    sc.stop()

Run the script using spark-submit with the comma separated URLs of the requisite JARs:

spark-submit --jars <url_prefix>/spark-alchemy-x.y.z.jar,<url_prefix>/hll-x.y.z.jar num_ids.py

Alternately, one can also assemble a fat JAR of spark-alchemy. (Look at the README for instructions on how to do this):

spark-submit --jars <url_prefix>/spark-alchemy-assembly-x.y.z.jar num_ids.py

PostgreSQL Interoperability

If the Aggregate Knowledge implementation is used, the binary sketches can be prepared in Spark, and then loaded into Postgres and re-aggreated/counted there.

An example of loading sketches using the JDBC connector can be found in the test suite. See the Aggregate Knowledge documentation for information about serializing the binary data out as hex to load in Postgres via other methods.

Note: it's not currently possible to add values to sketches prepared in Spark from Postgres, as this would require exactly replicating the hashing functions that were used when building the sketch