Skip to content

Commit

Permalink
[BEAM-13016] Remove avro-python3 dependency from Beam (#15900)
Browse files Browse the repository at this point in the history
  • Loading branch information
AnandInguva authored Nov 11, 2021
1 parent 8fb22a8 commit e9ebaa4
Show file tree
Hide file tree
Showing 14 changed files with 182 additions and 581 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
* Upgraded the GCP Libraries BOM version to 24.0.0 and associated dependencies ([BEAM-11205](
https://issues.apache.org/jira/browse/BEAM-11205)). For Google Cloud client library versions set by this BOM,
see [this table](https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/24.0.0/artifact_details.html).
* Removed avro-python3 dependency in AvroIO. Fastavro has already been our Avro library of choice on Python 3. Boolean use_fastavro is left for api compatibility, but will have no effect.([BEAM-13016](https://github.com/apache/beam/pull/15900)).

## Breaking Changes

Expand Down
36 changes: 19 additions & 17 deletions sdks/python/apache_beam/examples/avro_bitcoin.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import argparse
import logging

from avro.schema import Parse
from fastavro.schema import parse_schema

import apache_beam as beam
from apache_beam.io.avroio import ReadFromAvro
Expand Down Expand Up @@ -84,23 +84,26 @@ def process(self, elem):
}]


SCHEMA = Parse(
'''
{
SCHEMA = parse_schema({
"namespace": "example.avro",
"type": "record",
"name": "Transaction",
"fields": [
{"name": "transaction_id", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "block_id", "type": "string"},
{"name": "previous_block", "type": "string"},
{"name": "num_inputs", "type": "int"},
{"name": "num_outputs", "type": "int"},
{"name": "sum_output", "type": "long"}
]
}
''')
"fields": [{
"name": "transaction_id", "type": "string"
}, {
"name": "timestamp", "type": "long"
}, {
"name": "block_id", "type": "string"
}, {
"name": "previous_block", "type": "string"
}, {
"name": "num_inputs", "type": "int"
}, {
"name": "num_outputs", "type": "int"
}, {
"name": "sum_output", "type": "long"
}]
})


def run(argv=None):
Expand Down Expand Up @@ -140,7 +143,7 @@ def run(argv=None):

# Read the avro file[pattern] into a PCollection.
records = \
p | 'read' >> ReadFromAvro(opts.input, use_fastavro=opts.use_fastavro)
p | 'read' >> ReadFromAvro(opts.input)

measured = records | 'scan' >> beam.ParDo(BitcoinTxnCountDoFn())

Expand All @@ -150,7 +153,6 @@ def run(argv=None):
opts.output,
schema=SCHEMA,
codec=('deflate' if opts.compress else 'null'),
use_fastavro=opts.use_fastavro
)

result = p.run()
Expand Down
97 changes: 45 additions & 52 deletions sdks/python/apache_beam/examples/fastavro_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

"""End-to-end test for Avro IO's fastavro support.
Writes a configurable number of records to a temporary location with each of
{avro,fastavro}, then reads them back in, joins the two read datasets, and
verifies they have the same elements.
Writes a configurable number of records to a temporary location with fastavro,
then reads them back in from source, joins the generated records and records
that are read from the source, and verifies they have the same elements.
Usage:
Expand Down Expand Up @@ -51,7 +53,6 @@
import uuid

import pytest
from avro.schema import Parse
from fastavro import parse_schema

from apache_beam.io.avroio import ReadAllFromAvro
Expand All @@ -78,6 +79,18 @@ def record(i):
}


def assertEqual(l, r):
if l != r:
raise BeamAssertException('Assertion failed: %s == %s' % (l, r))


def check(element):
assert element['color'] in COLORS
assert element['label'] in LABELS
assertEqual(
sorted(element.keys()), ['color', 'label', 'number', 'number_str'])


class FastavroIT(unittest.TestCase):

SCHEMA_STRING = '''
Expand All @@ -102,6 +115,7 @@ def setUp(self):
def test_avro_it(self):
num_records = self.test_pipeline.get_option('records')
num_records = int(num_records) if num_records else 1000000
fastavro_output = '/'.join([self.output, 'fastavro'])

# Seed a `PCollection` with indices that will each be FlatMap'd into
# `batch_size` records, to avoid having a too-large list in memory at
Expand All @@ -123,65 +137,44 @@ def batch_indices(start):
| 'expand-batches' >> FlatMap(batch_indices) \
| 'create-records' >> Map(record)

fastavro_output = '/'.join([self.output, 'fastavro'])
avro_output = '/'.join([self.output, 'avro'])

# pylint: disable=expression-not-assigned
records_pcoll \
| 'write_fastavro' >> WriteToAvro(
fastavro_output,
parse_schema(json.loads(self.SCHEMA_STRING)),
use_fastavro=True
)
result = self.test_pipeline.run()
result.wait_until_finish()
fastavro_pcoll = self.test_pipeline \
| 'create-fastavro' >> Create(['%s*' % fastavro_output]) \
| 'read-fastavro' >> ReadAllFromAvro()

# pylint: disable=expression-not-assigned
records_pcoll \
| 'write_avro' >> WriteToAvro(
avro_output,
Parse(self.SCHEMA_STRING),
use_fastavro=False
)
mapped_fastavro_pcoll = fastavro_pcoll | "map_fastavro" >> Map(
lambda x: (x['number'], x))
mapped_record_pcoll = records_pcoll | "map_record" >> Map(
lambda x: (x['number'], x))

def validate_record(elem):
v = elem[1]

def assertEqual(l, r):
if l != r:
raise BeamAssertException('Assertion failed: %s == %s' % (l, r))

assertEqual(sorted(v.keys()), ['fastavro', 'record_pcoll'])
record_pcoll_values = v['record_pcoll']
fastavro_values = v['fastavro']
assertEqual(record_pcoll_values, fastavro_values)
assertEqual(len(record_pcoll_values), 1)

{
"record_pcoll": mapped_record_pcoll, "fastavro": mapped_fastavro_pcoll
} | CoGroupByKey() | Map(validate_record)

result = self.test_pipeline.run()
result.wait_until_finish()
assert result.state == PipelineState.DONE

with TestPipeline(is_integration_test=True) as fastavro_read_pipeline:

fastavro_records = \
fastavro_read_pipeline \
| 'create-fastavro' >> Create(['%s*' % fastavro_output]) \
| 'read-fastavro' >> ReadAllFromAvro(use_fastavro=True) \
| Map(lambda rec: (rec['number'], rec))

avro_records = \
fastavro_read_pipeline \
| 'create-avro' >> Create(['%s*' % avro_output]) \
| 'read-avro' >> ReadAllFromAvro(use_fastavro=False) \
| Map(lambda rec: (rec['number'], rec))

def check(elem):
v = elem[1]

def assertEqual(l, r):
if l != r:
raise BeamAssertException('Assertion failed: %s == %s' % (l, r))

assertEqual(sorted(v.keys()), ['avro', 'fastavro'])
avro_values = v['avro']
fastavro_values = v['fastavro']
assertEqual(avro_values, fastavro_values)
assertEqual(len(avro_values), 1)

# pylint: disable=expression-not-assigned
{
'avro': avro_records,
'fastavro': fastavro_records
} \
| CoGroupByKey() \
| Map(check)

self.addCleanup(delete_files, [self.output])
self.addCleanup(delete_files, [self.output])
assert result.state == PipelineState.DONE


Expand Down
Loading

0 comments on commit e9ebaa4

Please sign in to comment.