Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13016] Remove avro-python3 dependency from Beam #15900

Merged
merged 12 commits into from
Nov 11, 2021
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
* Upgraded the GCP Libraries BOM version to 24.0.0 and associated dependencies ([BEAM-11205](
https://issues.apache.org/jira/browse/BEAM-11205)). For Google Cloud client library versions set by this BOM,
see [this table](https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/24.0.0/artifact_details.html).
* Removed avro-python3 support and made Fastavro as default. Boolean use_fastavro will have no effect.([BEAM-13016](https://github.com/apache/beam/pull/15900)).
AnandInguva marked this conversation as resolved.
Show resolved Hide resolved

## Breaking Changes

Expand Down
36 changes: 19 additions & 17 deletions sdks/python/apache_beam/examples/avro_bitcoin.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import argparse
import logging

from avro.schema import Parse
from fastavro.schema import parse_schema

import apache_beam as beam
from apache_beam.io.avroio import ReadFromAvro
Expand Down Expand Up @@ -84,23 +84,26 @@ def process(self, elem):
}]


SCHEMA = Parse(
'''
{
SCHEMA = parse_schema({
"namespace": "example.avro",
"type": "record",
"name": "Transaction",
"fields": [
{"name": "transaction_id", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "block_id", "type": "string"},
{"name": "previous_block", "type": "string"},
{"name": "num_inputs", "type": "int"},
{"name": "num_outputs", "type": "int"},
{"name": "sum_output", "type": "long"}
]
}
''')
"fields": [{
"name": "transaction_id", "type": "string"
}, {
"name": "timestamp", "type": "long"
}, {
"name": "block_id", "type": "string"
}, {
"name": "previous_block", "type": "string"
}, {
"name": "num_inputs", "type": "int"
}, {
"name": "num_outputs", "type": "int"
}, {
"name": "sum_output", "type": "long"
}]
})


def run(argv=None):
Expand Down Expand Up @@ -140,7 +143,7 @@ def run(argv=None):

# Read the avro file[pattern] into a PCollection.
records = \
p | 'read' >> ReadFromAvro(opts.input, use_fastavro=opts.use_fastavro)
p | 'read' >> ReadFromAvro(opts.input)

measured = records | 'scan' >> beam.ParDo(BitcoinTxnCountDoFn())

Expand All @@ -150,7 +153,6 @@ def run(argv=None):
opts.output,
schema=SCHEMA,
codec=('deflate' if opts.compress else 'null'),
use_fastavro=opts.use_fastavro
)

result = p.run()
Expand Down
27 changes: 3 additions & 24 deletions sdks/python/apache_beam/examples/fastavro_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import uuid

import pytest
from avro.schema import Parse
from fastavro import parse_schema

from apache_beam.io.avroio import ReadAllFromAvro
Expand Down Expand Up @@ -124,22 +123,11 @@ def batch_indices(start):
| 'create-records' >> Map(record)

fastavro_output = '/'.join([self.output, 'fastavro'])
avro_output = '/'.join([self.output, 'avro'])

# pylint: disable=expression-not-assigned
records_pcoll \
| 'write_fastavro' >> WriteToAvro(
fastavro_output,
parse_schema(json.loads(self.SCHEMA_STRING)),
use_fastavro=True
)

# pylint: disable=expression-not-assigned
records_pcoll \
| 'write_avro' >> WriteToAvro(
avro_output,
Parse(self.SCHEMA_STRING),
use_fastavro=False
)

result = self.test_pipeline.run()
Expand All @@ -151,13 +139,7 @@ def batch_indices(start):
fastavro_records = \
fastavro_read_pipeline \
| 'create-fastavro' >> Create(['%s*' % fastavro_output]) \
| 'read-fastavro' >> ReadAllFromAvro(use_fastavro=True) \
| Map(lambda rec: (rec['number'], rec))

avro_records = \
fastavro_read_pipeline \
| 'create-avro' >> Create(['%s*' % avro_output]) \
| 'read-avro' >> ReadAllFromAvro(use_fastavro=False) \
| 'read-fastavro' >> ReadAllFromAvro() \
AnandInguva marked this conversation as resolved.
Show resolved Hide resolved
| Map(lambda rec: (rec['number'], rec))

def check(elem):
Expand All @@ -167,15 +149,12 @@ def assertEqual(l, r):
if l != r:
raise BeamAssertException('Assertion failed: %s == %s' % (l, r))

assertEqual(sorted(v.keys()), ['avro', 'fastavro'])
avro_values = v['avro']
assertEqual(sorted(v.keys()), ['fastavro'])
fastavro_values = v['fastavro']
assertEqual(avro_values, fastavro_values)
assertEqual(len(avro_values), 1)
assertEqual(len(fastavro_values), 1)

# pylint: disable=expression-not-assigned
{
'avro': avro_records,
'fastavro': fastavro_records
} \
| CoGroupByKey() \
Expand Down
Loading