diff --git a/CHANGES.md b/CHANGES.md index a25c1e82194c..bd425d74b6a9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -75,6 +75,7 @@ * Upgraded the GCP Libraries BOM version to 24.0.0 and associated dependencies ([BEAM-11205]( https://issues.apache.org/jira/browse/BEAM-11205)). For Google Cloud client library versions set by this BOM, see [this table](https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/24.0.0/artifact_details.html). +* Removed avro-python3 dependency in AvroIO. Fastavro has already been our Avro library of choice on Python 3. Boolean use_fastavro is left for api compatibility, but will have no effect.([BEAM-13016](https://github.com/apache/beam/pull/15900)). ## Breaking Changes diff --git a/sdks/python/apache_beam/examples/avro_bitcoin.py b/sdks/python/apache_beam/examples/avro_bitcoin.py index 9b851a8c6e38..0b4c555548eb 100644 --- a/sdks/python/apache_beam/examples/avro_bitcoin.py +++ b/sdks/python/apache_beam/examples/avro_bitcoin.py @@ -29,7 +29,7 @@ import argparse import logging -from avro.schema import Parse +from fastavro.schema import parse_schema import apache_beam as beam from apache_beam.io.avroio import ReadFromAvro @@ -84,23 +84,26 @@ def process(self, elem): }] -SCHEMA = Parse( - ''' - { +SCHEMA = parse_schema({ "namespace": "example.avro", "type": "record", "name": "Transaction", - "fields": [ - {"name": "transaction_id", "type": "string"}, - {"name": "timestamp", "type": "long"}, - {"name": "block_id", "type": "string"}, - {"name": "previous_block", "type": "string"}, - {"name": "num_inputs", "type": "int"}, - {"name": "num_outputs", "type": "int"}, - {"name": "sum_output", "type": "long"} - ] - } - ''') + "fields": [{ + "name": "transaction_id", "type": "string" + }, { + "name": "timestamp", "type": "long" + }, { + "name": "block_id", "type": "string" + }, { + "name": "previous_block", "type": "string" + }, { + "name": "num_inputs", "type": "int" + }, { + "name": "num_outputs", "type": "int" + }, { + "name": "sum_output", "type": "long" + }] +}) def run(argv=None): @@ -140,7 +143,7 @@ def run(argv=None): # Read the avro file[pattern] into a PCollection. records = \ - p | 'read' >> ReadFromAvro(opts.input, use_fastavro=opts.use_fastavro) + p | 'read' >> ReadFromAvro(opts.input) measured = records | 'scan' >> beam.ParDo(BitcoinTxnCountDoFn()) @@ -150,7 +153,6 @@ def run(argv=None): opts.output, schema=SCHEMA, codec=('deflate' if opts.compress else 'null'), - use_fastavro=opts.use_fastavro ) result = p.run() diff --git a/sdks/python/apache_beam/examples/fastavro_it_test.py b/sdks/python/apache_beam/examples/fastavro_it_test.py index f25db8ee3faf..f37656e897bd 100644 --- a/sdks/python/apache_beam/examples/fastavro_it_test.py +++ b/sdks/python/apache_beam/examples/fastavro_it_test.py @@ -17,9 +17,11 @@ """End-to-end test for Avro IO's fastavro support. -Writes a configurable number of records to a temporary location with each of -{avro,fastavro}, then reads them back in, joins the two read datasets, and -verifies they have the same elements. +Writes a configurable number of records to a temporary location with fastavro, +then reads them back in from source, joins the generated records and records +that are read from the source, and verifies they have the same elements. + + Usage: @@ -51,7 +53,6 @@ import uuid import pytest -from avro.schema import Parse from fastavro import parse_schema from apache_beam.io.avroio import ReadAllFromAvro @@ -78,6 +79,18 @@ def record(i): } +def assertEqual(l, r): + if l != r: + raise BeamAssertException('Assertion failed: %s == %s' % (l, r)) + + +def check(element): + assert element['color'] in COLORS + assert element['label'] in LABELS + assertEqual( + sorted(element.keys()), ['color', 'label', 'number', 'number_str']) + + class FastavroIT(unittest.TestCase): SCHEMA_STRING = ''' @@ -102,6 +115,7 @@ def setUp(self): def test_avro_it(self): num_records = self.test_pipeline.get_option('records') num_records = int(num_records) if num_records else 1000000 + fastavro_output = '/'.join([self.output, 'fastavro']) # Seed a `PCollection` with indices that will each be FlatMap'd into # `batch_size` records, to avoid having a too-large list in memory at @@ -123,65 +137,44 @@ def batch_indices(start): | 'expand-batches' >> FlatMap(batch_indices) \ | 'create-records' >> Map(record) - fastavro_output = '/'.join([self.output, 'fastavro']) - avro_output = '/'.join([self.output, 'avro']) - # pylint: disable=expression-not-assigned records_pcoll \ | 'write_fastavro' >> WriteToAvro( fastavro_output, parse_schema(json.loads(self.SCHEMA_STRING)), - use_fastavro=True ) + result = self.test_pipeline.run() + result.wait_until_finish() + fastavro_pcoll = self.test_pipeline \ + | 'create-fastavro' >> Create(['%s*' % fastavro_output]) \ + | 'read-fastavro' >> ReadAllFromAvro() - # pylint: disable=expression-not-assigned - records_pcoll \ - | 'write_avro' >> WriteToAvro( - avro_output, - Parse(self.SCHEMA_STRING), - use_fastavro=False - ) + mapped_fastavro_pcoll = fastavro_pcoll | "map_fastavro" >> Map( + lambda x: (x['number'], x)) + mapped_record_pcoll = records_pcoll | "map_record" >> Map( + lambda x: (x['number'], x)) + + def validate_record(elem): + v = elem[1] + + def assertEqual(l, r): + if l != r: + raise BeamAssertException('Assertion failed: %s == %s' % (l, r)) + + assertEqual(sorted(v.keys()), ['fastavro', 'record_pcoll']) + record_pcoll_values = v['record_pcoll'] + fastavro_values = v['fastavro'] + assertEqual(record_pcoll_values, fastavro_values) + assertEqual(len(record_pcoll_values), 1) + + { + "record_pcoll": mapped_record_pcoll, "fastavro": mapped_fastavro_pcoll + } | CoGroupByKey() | Map(validate_record) result = self.test_pipeline.run() result.wait_until_finish() - assert result.state == PipelineState.DONE - with TestPipeline(is_integration_test=True) as fastavro_read_pipeline: - - fastavro_records = \ - fastavro_read_pipeline \ - | 'create-fastavro' >> Create(['%s*' % fastavro_output]) \ - | 'read-fastavro' >> ReadAllFromAvro(use_fastavro=True) \ - | Map(lambda rec: (rec['number'], rec)) - - avro_records = \ - fastavro_read_pipeline \ - | 'create-avro' >> Create(['%s*' % avro_output]) \ - | 'read-avro' >> ReadAllFromAvro(use_fastavro=False) \ - | Map(lambda rec: (rec['number'], rec)) - - def check(elem): - v = elem[1] - - def assertEqual(l, r): - if l != r: - raise BeamAssertException('Assertion failed: %s == %s' % (l, r)) - - assertEqual(sorted(v.keys()), ['avro', 'fastavro']) - avro_values = v['avro'] - fastavro_values = v['fastavro'] - assertEqual(avro_values, fastavro_values) - assertEqual(len(avro_values), 1) - - # pylint: disable=expression-not-assigned - { - 'avro': avro_records, - 'fastavro': fastavro_records - } \ - | CoGroupByKey() \ - | Map(check) - - self.addCleanup(delete_files, [self.output]) + self.addCleanup(delete_files, [self.output]) assert result.state == PipelineState.DONE diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index e861d533ad67..613917ea73be 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -43,16 +43,9 @@ Avro file. """ # pytype: skip-file - -import io import os -import zlib from functools import partial -import avro -from avro import io as avroio -from avro import datafile -from avro.schema import Parse from fastavro.read import block_reader from fastavro.write import Writer @@ -140,15 +133,12 @@ def __init__( splitting the input into bundles. validate (bool): flag to verify that the files exist during the pipeline creation time. - use_fastavro (bool); when set, use the `fastavro` library for IO, which - is significantly faster, and is now the default. + use_fastavro (bool): This flag is left for API backwards compatibility + and no longer has an effect. Do not use. """ super().__init__() self._source = _create_avro_source( - file_pattern, - min_bundle_size, - validate=validate, - use_fastavro=use_fastavro) + file_pattern, min_bundle_size, validate=validate) def expand(self, pvalue): return pvalue.pipeline | Read(self._source) @@ -180,16 +170,14 @@ def __init__( splitting the input into bundles. desired_bundle_size: the desired size in bytes, to be considered when splitting the input into bundles. - use_fastavro (bool); when set, use the `fastavro` library for IO, which - is significantly faster, and is now the default. + use_fastavro (bool): This flag is left for API backwards compatibility + and no longer has an effect. Do not use. with_filename: If True, returns a Key Value with the key being the file name and the value being the actual data. If False, it only returns the data. """ source_from_file = partial( - _create_avro_source, - min_bundle_size=min_bundle_size, - use_fastavro=use_fastavro) + _create_avro_source, min_bundle_size=min_bundle_size) self._read_all_files = filebasedsource.ReadAllFiles( True, CompressionTypes.AUTO, @@ -205,74 +193,6 @@ def expand(self, pvalue): class _AvroUtils(object): - @staticmethod - def read_meta_data_from_file(f): - """Reads metadata from a given Avro file. - - Args: - f: Avro file to read. - Returns: - a tuple containing the codec, schema, and the sync marker of the Avro - file. - - Raises: - ValueError: if the file does not start with the byte sequence defined in - the specification. - """ - if f.tell() > 0: - f.seek(0) - decoder = avroio.BinaryDecoder(f) - header = avroio.DatumReader().read_data( - datafile.META_SCHEMA, datafile.META_SCHEMA, decoder) - if header.get('magic') != datafile.MAGIC: - raise ValueError( - 'Not an Avro file. File header should start with %s but' - 'started with %s instead.' % (datafile.MAGIC, header.get('magic'))) - - meta = header['meta'] - - if datafile.CODEC_KEY in meta: - codec = meta[datafile.CODEC_KEY] - else: - codec = b'null' - - schema_string = meta[datafile.SCHEMA_KEY].decode('utf-8') - sync_marker = header['sync'] - - return codec, schema_string, sync_marker - - @staticmethod - def read_block_from_file(f, codec, schema, expected_sync_marker): - """Reads a block from a given Avro file. - - Args: - f: Avro file to read. - codec: The codec to use for block-level decompression. - Supported codecs: 'null', 'deflate', 'snappy' - schema: Avro Schema definition represented as JSON string. - expected_sync_marker: Avro synchronization marker. If the block's sync - marker does not match with this parameter then ValueError is thrown. - Returns: - A single _AvroBlock. - - Raises: - ValueError: If the block cannot be read properly because the file doesn't - match the specification. - """ - offset = f.tell() - decoder = avroio.BinaryDecoder(f) - num_records = decoder.read_long() - block_size = decoder.read_long() - block_bytes = decoder.read(block_size) - sync_marker = decoder.read(len(expected_sync_marker)) - if sync_marker != expected_sync_marker: - raise ValueError( - 'Unexpected sync marker (actual "%s" vs expected "%s"). ' - 'Maybe the underlying avro file is corrupted?' % - (sync_marker, expected_sync_marker)) - size = f.tell() - offset - return _AvroBlock(block_bytes, num_records, codec, schema, offset, size) - @staticmethod def advance_file_past_next_sync_marker(f, sync_marker): buf_size = 10000 @@ -296,131 +216,15 @@ def advance_file_past_next_sync_marker(f, sync_marker): data = f.read(buf_size) -def _create_avro_source( - file_pattern=None, min_bundle_size=0, validate=False, use_fastavro=True): +def _create_avro_source(file_pattern=None, min_bundle_size=0, validate=False): return \ _FastAvroSource( file_pattern=file_pattern, min_bundle_size=min_bundle_size, validate=validate - ) \ - if use_fastavro \ - else \ - _AvroSource( - file_pattern=file_pattern, - min_bundle_size=min_bundle_size, - validate=validate ) -class _AvroBlock(object): - """Represents a block of an Avro file.""" - def __init__( - self, block_bytes, num_records, codec, schema_string, offset, size): - # Decompress data early on (if needed) and thus decrease the number of - # parallel copies of the data in memory at any given time during block - # iteration. - self._decompressed_block_bytes = self._decompress_bytes(block_bytes, codec) - self._num_records = num_records - self._schema = Parse(schema_string) - self._offset = offset - self._size = size - - def size(self): - return self._size - - def offset(self): - return self._offset - - @staticmethod - def _decompress_bytes(data, codec): - if codec == b'null': - return data - elif codec == b'deflate': - # zlib.MAX_WBITS is the window size. '-' sign indicates that this is - # raw data (without headers). See zlib and Avro documentations for more - # details. - return zlib.decompress(data, -zlib.MAX_WBITS) - elif codec == b'snappy': - # Snappy is an optional avro codec. - # See Snappy and Avro documentation for more details. - try: - import snappy - except ImportError: - raise ValueError('python-snappy does not seem to be installed.') - - # Compressed data includes a 4-byte CRC32 checksum which we verify. - # We take care to avoid extra copies of data while slicing large objects - # by use of a memoryview. - result = snappy.decompress(memoryview(data)[:-4]) - avroio.BinaryDecoder(io.BytesIO(data[-4:])).check_crc32(result) - return result - else: - raise ValueError('Unknown codec: %r' % codec) - - def num_records(self): - return self._num_records - - def records(self): - decoder = avroio.BinaryDecoder(io.BytesIO(self._decompressed_block_bytes)) - - writer_schema = self._schema - reader_schema = self._schema - reader = avroio.DatumReader(writer_schema, reader_schema) - - current_record = 0 - while current_record < self._num_records: - yield reader.read(decoder) - current_record += 1 - - -class _AvroSource(filebasedsource.FileBasedSource): - """A source for reading Avro files. - - ``_AvroSource`` is implemented using the file-based source framework available - in module 'filebasedsource'. Hence please refer to module 'filebasedsource' - to fully understand how this source implements operations common to all - file-based sources such as file-pattern expansion and splitting into bundles - for parallel processing. - """ - def read_records(self, file_name, range_tracker): - next_block_start = -1 - - def split_points_unclaimed(stop_position): - if next_block_start >= stop_position: - # Next block starts at or after the suggested stop position. Hence - # there will not be split points to be claimed for the range ending at - # suggested stop position. - return 0 - - return iobase.RangeTracker.SPLIT_POINTS_UNKNOWN - - range_tracker.set_split_points_unclaimed_callback(split_points_unclaimed) - - start_offset = range_tracker.start_position() - if start_offset is None: - start_offset = 0 - - with self.open_file(file_name) as f: - codec, schema_string, sync_marker = \ - _AvroUtils.read_meta_data_from_file(f) - - # We have to start at current position if previous bundle ended at the - # end of a sync marker. - start_offset = max(0, start_offset - len(sync_marker)) - f.seek(start_offset) - _AvroUtils.advance_file_past_next_sync_marker(f, sync_marker) - - next_block_start = f.tell() - - while range_tracker.try_claim(next_block_start): - block = _AvroUtils.read_block_from_file( - f, codec, schema_string, sync_marker) - next_block_start = block.offset() + block.size() - for record in block.records(): - yield record - - class _FastAvroSource(filebasedsource.FileBasedSource): """A source for reading Avro files using the `fastavro` library. @@ -491,8 +295,6 @@ def __init__( only this argument is specified and num_shards, shard_name_template, and file_name_suffix use default values. schema: The schema to use (dict). - If using with avro-python3 via use_fastavro=False, provide parsed schema - as returned by avro.schema.Parse(). codec: The codec to use for block-level compression. Any string supported by the Avro specification is accepted (for example 'null'). file_name_suffix: Suffix for the files written. @@ -510,8 +312,8 @@ def __init__( is '-SSSSS-of-NNNNN' if None is passed as the shard_name_template. mime_type: The MIME type to use for the produced files, if the filesystem supports specifying MIME types. - use_fastavro (bool); when set, use the `fastavro` library for IO, which - is significantly faster, and is now the default. + use_fastavro (bool): This flag is left for API backwards compatibility + and no longer has an effect. Do not use. Returns: A WriteToAvro transform usable for writing. @@ -523,8 +325,7 @@ def __init__( file_name_suffix, num_shards, shard_name_template, - mime_type, - use_fastavro) + mime_type) def expand(self, pcoll): return pcoll | beam.io.iobase.Write(self._sink) @@ -540,31 +341,20 @@ def _create_avro_sink( file_name_suffix, num_shards, shard_name_template, - mime_type, - use_fastavro): - if use_fastavro: - if "class \'avro.schema" in str(type(schema)): - raise ValueError( - 'You are using Avro IO with fastavro (default with Beam on ' - 'Python 3), but supplying a schema parsed by avro-python3. ' - 'Please change the schema to a dict.') - return _FastAvroSink( - file_path_prefix, - schema, - codec, - file_name_suffix, - num_shards, - shard_name_template, - mime_type) - else: - return _AvroSink( - file_path_prefix, - schema, - codec, - file_name_suffix, - num_shards, - shard_name_template, - mime_type) + mime_type): + if "class \'avro.schema" in str(type(schema)): + raise ValueError( + 'You are using Avro IO with fastavro (default with Beam on ' + 'Python 3), but supplying a schema parsed by avro-python3. ' + 'Please change the schema to a dict.') + return _FastAvroSink( + file_path_prefix, + schema, + codec, + file_name_suffix, + num_shards, + shard_name_template, + mime_type) class _BaseAvroSink(filebasedsink.FileBasedSink): @@ -598,17 +388,6 @@ def display_data(self): return res -class _AvroSink(_BaseAvroSink): - """A sink for avro files using Avro. """ - def open(self, temp_path): - file_handle = super().open(temp_path) - return avro.datafile.DataFileWriter( - file_handle, avro.io.DatumWriter(), self._schema, self._codec) - - def write_record(self, writer, value): - writer.append(value) - - class _FastAvroSink(_BaseAvroSink): """A sink for avro files using FastAvro. """ def open(self, temp_path): diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index dcd1cf76d4c8..7b8171006b8d 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -26,11 +26,6 @@ import hamcrest as hc -import avro -import avro.datafile -from avro.datafile import DataFileWriter -from avro.io import DatumWriter -from avro.schema import Parse from fastavro.schema import parse_schema from fastavro import writer @@ -121,7 +116,7 @@ def _write_pattern(self, num_files, return_filenames=False): def _run_avro_test( self, pattern, desired_bundle_size, perform_splitting, expected_result): - source = _create_avro_source(pattern, use_fastavro=self.use_fastavro) + source = _create_avro_source(pattern) if perform_splitting: assert desired_bundle_size @@ -158,7 +153,6 @@ def test_source_display_data(self): _create_avro_source( file_name, validate=False, - use_fastavro=self.use_fastavro ) dd = DisplayData.create_from(source) @@ -174,8 +168,7 @@ def test_read_display_data(self): read = \ avroio.ReadFromAvro( file_name, - validate=False, - use_fastavro=self.use_fastavro) + validate=False) dd = DisplayData.create_from(read) # No extra avro parameters for AvroSource. @@ -188,14 +181,7 @@ def test_read_display_data(self): def test_sink_display_data(self): file_name = 'some_avro_sink' sink = _create_avro_sink( - file_name, - self.SCHEMA, - 'null', - '.end', - 0, - None, - 'application/x-avro', - use_fastavro=self.use_fastavro) + file_name, self.SCHEMA, 'null', '.end', 0, None, 'application/x-avro') dd = DisplayData.create_from(sink) expected_items = [ @@ -211,8 +197,7 @@ def test_sink_display_data(self): def test_write_display_data(self): file_name = 'some_avro_sink' - write = avroio.WriteToAvro( - file_name, self.SCHEMA, use_fastavro=self.use_fastavro) + write = avroio.WriteToAvro(file_name, self.SCHEMA) dd = DisplayData.create_from(write) expected_items = [ DisplayDataItemMatcher('schema', str(self.SCHEMA)), @@ -226,12 +211,12 @@ def test_write_display_data(self): def test_read_reentrant_without_splitting(self): file_name = self._write_data() - source = _create_avro_source(file_name, use_fastavro=self.use_fastavro) + source = _create_avro_source(file_name) source_test_utils.assert_reentrant_reads_succeed((source, None, None)) def test_read_reantrant_with_splitting(self): file_name = self._write_data() - source = _create_avro_source(file_name, use_fastavro=self.use_fastavro) + source = _create_avro_source(file_name) splits = [split for split in source.split(desired_bundle_size=100000)] assert len(splits) == 1 source_test_utils.assert_reentrant_reads_succeed( @@ -252,7 +237,7 @@ def test_split_points(self): sync_interval = 16000 file_name = self._write_data(count=num_records, sync_interval=sync_interval) - source = _create_avro_source(file_name, use_fastavro=self.use_fastavro) + source = _create_avro_source(file_name) splits = [split for split in source.split(desired_bundle_size=float('inf'))] assert len(splits) == 1 @@ -312,7 +297,7 @@ def test_read_with_splitting_pattern(self): def test_dynamic_work_rebalancing_exhaustive(self): def compare_split_points(file_name): - source = _create_avro_source(file_name, use_fastavro=self.use_fastavro) + source = _create_avro_source(file_name) splits = [ split for split in source.split(desired_bundle_size=float('inf')) ] @@ -340,17 +325,14 @@ def test_corrupted_file(self): f.write(corrupted_data) corrupted_file_name = f.name - source = _create_avro_source( - corrupted_file_name, use_fastavro=self.use_fastavro) + source = _create_avro_source(corrupted_file_name) with self.assertRaisesRegex(ValueError, r'expected sync marker'): source_test_utils.read_from_source(source, None, None) def test_read_from_avro(self): path = self._write_data() with TestPipeline() as p: - assert_that( - p | avroio.ReadFromAvro(path, use_fastavro=self.use_fastavro), - equal_to(self.RECORDS)) + assert_that(p | avroio.ReadFromAvro(path), equal_to(self.RECORDS)) def test_read_all_from_avro_single_file(self): path = self._write_data() @@ -358,7 +340,7 @@ def test_read_all_from_avro_single_file(self): assert_that( p \ | Create([path]) \ - | avroio.ReadAllFromAvro(use_fastavro=self.use_fastavro), + | avroio.ReadAllFromAvro(), equal_to(self.RECORDS)) def test_read_all_from_avro_many_single_files(self): @@ -369,7 +351,7 @@ def test_read_all_from_avro_many_single_files(self): assert_that( p \ | Create([path1, path2, path3]) \ - | avroio.ReadAllFromAvro(use_fastavro=self.use_fastavro), + | avroio.ReadAllFromAvro(), equal_to(self.RECORDS * 3)) def test_read_all_from_avro_file_pattern(self): @@ -378,7 +360,7 @@ def test_read_all_from_avro_file_pattern(self): assert_that( p \ | Create([file_pattern]) \ - | avroio.ReadAllFromAvro(use_fastavro=self.use_fastavro), + | avroio.ReadAllFromAvro(), equal_to(self.RECORDS * 5)) def test_read_all_from_avro_many_file_patterns(self): @@ -389,7 +371,7 @@ def test_read_all_from_avro_many_file_patterns(self): assert_that( p \ | Create([file_pattern1, file_pattern2, file_pattern3]) \ - | avroio.ReadAllFromAvro(use_fastavro=self.use_fastavro), + | avroio.ReadAllFromAvro(), equal_to(self.RECORDS * 10)) def test_read_all_from_avro_with_filename(self): @@ -399,8 +381,7 @@ def test_read_all_from_avro_with_filename(self): assert_that( p \ | Create([file_pattern]) \ - | avroio.ReadAllFromAvro(use_fastavro=self.use_fastavro, - with_filename=True), + | avroio.ReadAllFromAvro(with_filename=True), equal_to(result)) def test_sink_transform(self): @@ -410,12 +391,12 @@ def test_sink_transform(self): # pylint: disable=expression-not-assigned p \ | beam.Create(self.RECORDS) \ - | avroio.WriteToAvro(path, self.SCHEMA, use_fastavro=self.use_fastavro) + | avroio.WriteToAvro(path, self.SCHEMA,) with TestPipeline() as p: # json used for stable sortability readback = \ p \ - | avroio.ReadFromAvro(path + '*', use_fastavro=self.use_fastavro) \ + | avroio.ReadFromAvro(path + '*', ) \ | beam.Map(json.dumps) assert_that(readback, equal_to([json.dumps(r) for r in self.RECORDS])) @@ -430,55 +411,19 @@ def test_sink_transform_snappy(self): | avroio.WriteToAvro( path, self.SCHEMA, - codec='snappy', - use_fastavro=self.use_fastavro) + codec='snappy') with TestPipeline() as p: # json used for stable sortability readback = \ p \ - | avroio.ReadFromAvro(path + '*', use_fastavro=self.use_fastavro) \ + | avroio.ReadFromAvro(path + '*') \ | beam.Map(json.dumps) assert_that(readback, equal_to([json.dumps(r) for r in self.RECORDS])) -@unittest.skipIf( - os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1', - 'This test requires that Beam depends on avro-python3>=1.9 or newer. ' - 'See: BEAM-6522.') -class TestAvro(AvroBase, unittest.TestCase): - def __init__(self, methodName='runTest'): - super().__init__(methodName) - self.use_fastavro = False - self.SCHEMA = Parse(self.SCHEMA_STRING) - - def _write_data( - self, - directory=None, - prefix=tempfile.template, - codec='null', - count=len(RECORDS), - sync_interval=avro.datafile.SYNC_INTERVAL): - old_sync_interval = avro.datafile.SYNC_INTERVAL - try: - avro.datafile.SYNC_INTERVAL = sync_interval - with tempfile.NamedTemporaryFile(delete=False, - dir=directory, - prefix=prefix) as f: - writer = DataFileWriter(f, DatumWriter(), self.SCHEMA, codec=codec) - len_records = len(self.RECORDS) - for i in range(count): - writer.append(self.RECORDS[i % len_records]) - writer.close() - self._temp_files.append(f.name) - return f.name - finally: - avro.datafile.SYNC_INTERVAL = old_sync_interval - - class TestFastAvro(AvroBase, unittest.TestCase): def __init__(self, methodName='runTest'): super().__init__(methodName) - self.use_fastavro = True self.SCHEMA = parse_schema(json.loads(self.SCHEMA_STRING)) def _write_data( diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 52cd980a7a4a..f41629b41197 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -791,7 +791,7 @@ def _get_project(self): def _create_source(self, path, schema): if not self.use_json_exports: - return create_avro_source(path, use_fastavro=True) + return create_avro_source(path) else: return TextSource( path, diff --git a/sdks/python/apache_beam/io/gcp/bigquery_avro_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_avro_tools_test.py index ab4eb09b8654..eca208d26612 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_avro_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_avro_tools_test.py @@ -13,13 +13,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# -import json + import logging import unittest -import fastavro -from avro.schema import Parse +from fastavro.schema import parse_schema from apache_beam.io.gcp import bigquery_avro_tools from apache_beam.io.gcp import bigquery_tools @@ -36,156 +34,77 @@ def test_convert_bigquery_schema_to_avro_schema(self): ] fields = [ - bigquery.TableFieldSchema( - name="number", type="INTEGER", mode="REQUIRED"), - bigquery.TableFieldSchema( - name="species", type="STRING", mode="NULLABLE"), - bigquery.TableFieldSchema( - name="quality", type="FLOAT"), # default to NULLABLE - bigquery.TableFieldSchema( - name="grade", type="FLOAT64"), # default to NULLABLE - bigquery.TableFieldSchema( - name="quantity", type="INTEGER"), # default to NULLABLE - bigquery.TableFieldSchema( - name="dependents", type="INT64"), # default to NULLABLE - bigquery.TableFieldSchema( - name="birthday", type="TIMESTAMP", mode="NULLABLE"), - bigquery.TableFieldSchema( - name="birthdayMoney", type="NUMERIC", mode="NULLABLE"), - bigquery.TableFieldSchema( - name="flighted", type="BOOL", mode="NULLABLE"), - bigquery.TableFieldSchema( - name="flighted2", type="BOOLEAN", mode="NULLABLE"), - bigquery.TableFieldSchema( - name="sound", type="BYTES", mode="NULLABLE"), - bigquery.TableFieldSchema( - name="anniversaryDate", type="DATE", mode="NULLABLE"), - bigquery.TableFieldSchema( - name="anniversaryDatetime", type="DATETIME", mode="NULLABLE"), - bigquery.TableFieldSchema( - name="anniversaryTime", type="TIME", mode="NULLABLE"), - bigquery.TableFieldSchema( - name="scion", type="RECORD", mode="NULLABLE", fields=subfields), - bigquery.TableFieldSchema( - name="family", type="STRUCT", mode="NULLABLE", fields=subfields), - bigquery.TableFieldSchema( - name="associates", type="RECORD", mode="REPEATED", fields=subfields), - bigquery.TableFieldSchema( - name="geoPositions", type="GEOGRAPHY", mode="NULLABLE"), + bigquery.TableFieldSchema( + name="number", type="INTEGER", mode="REQUIRED"), + bigquery.TableFieldSchema( + name="species", type="STRING", mode="NULLABLE"), + bigquery.TableFieldSchema(name="quality", + type="FLOAT"), # default to NULLABLE + bigquery.TableFieldSchema(name="grade", + type="FLOAT64"), # default to NULLABLE + bigquery.TableFieldSchema(name="quantity", + type="INTEGER"), # default to NULLABLE + bigquery.TableFieldSchema(name="dependents", + type="INT64"), # default to NULLABLE + bigquery.TableFieldSchema( + name="birthday", type="TIMESTAMP", mode="NULLABLE"), + bigquery.TableFieldSchema( + name="birthdayMoney", type="NUMERIC", mode="NULLABLE"), + bigquery.TableFieldSchema( + name="flighted", type="BOOL", mode="NULLABLE"), + bigquery.TableFieldSchema( + name="flighted2", type="BOOLEAN", mode="NULLABLE"), + bigquery.TableFieldSchema(name="sound", type="BYTES", mode="NULLABLE"), + bigquery.TableFieldSchema( + name="anniversaryDate", type="DATE", mode="NULLABLE"), + bigquery.TableFieldSchema( + name="anniversaryDatetime", type="DATETIME", mode="NULLABLE"), + bigquery.TableFieldSchema( + name="anniversaryTime", type="TIME", mode="NULLABLE"), + bigquery.TableFieldSchema( + name="scion", type="RECORD", mode="NULLABLE", fields=subfields), + bigquery.TableFieldSchema( + name="family", type="STRUCT", mode="NULLABLE", fields=subfields), + bigquery.TableFieldSchema( + name="associates", type="RECORD", mode="REPEATED", + fields=subfields), + bigquery.TableFieldSchema( + name="geoPositions", type="GEOGRAPHY", mode="NULLABLE"), ] table_schema = bigquery.TableSchema(fields=fields) avro_schema = bigquery_avro_tools.get_record_schema_from_dict_table_schema( "root", bigquery_tools.get_dict_table_schema(table_schema)) - # Test that schema can be parsed correctly by fastavro - fastavro.parse_schema(avro_schema) - - # Test that schema can be parsed correctly by avro - parsed_schema = Parse(json.dumps(avro_schema)) - - self.assertEqual( - parsed_schema.field_map["number"].type, Parse(json.dumps("long"))) - self.assertEqual( - parsed_schema.field_map["species"].type, - Parse(json.dumps(["null", "string"]))) - self.assertEqual( - parsed_schema.field_map["quality"].type, - Parse(json.dumps(["null", "double"]))) - self.assertEqual( - parsed_schema.field_map["grade"].type, - Parse(json.dumps(["null", "double"]))) - self.assertEqual( - parsed_schema.field_map["quantity"].type, - Parse(json.dumps(["null", "long"]))) - self.assertEqual( - parsed_schema.field_map["dependents"].type, - Parse(json.dumps(["null", "long"]))) - self.assertEqual( - parsed_schema.field_map["birthday"].type, - Parse( - json.dumps( - ["null", { - "type": "long", "logicalType": "timestamp-micros" - }]))) - self.assertEqual( - parsed_schema.field_map["birthdayMoney"].type, - Parse( - json.dumps([ - "null", - { - "type": "bytes", - "logicalType": "decimal", - "precision": 38, - "scale": 9 - } - ]))) - self.assertEqual( - parsed_schema.field_map["flighted"].type, - Parse(json.dumps(["null", "boolean"]))) - self.assertEqual( - parsed_schema.field_map["flighted2"].type, - Parse(json.dumps(["null", "boolean"]))) - self.assertEqual( - parsed_schema.field_map["sound"].type, - Parse(json.dumps(["null", "bytes"]))) - self.assertEqual( - parsed_schema.field_map["anniversaryDate"].type, - Parse(json.dumps(["null", { - "type": "int", "logicalType": "date" - }]))) - self.assertEqual( - parsed_schema.field_map["anniversaryDatetime"].type, - Parse(json.dumps(["null", "string"]))) - self.assertEqual( - parsed_schema.field_map["anniversaryTime"].type, - Parse( - json.dumps(["null", { - "type": "long", "logicalType": "time-micros" - }]))) - self.assertEqual( - parsed_schema.field_map["geoPositions"].type, - Parse(json.dumps(["null", "string"]))) - - for field in ("scion", "family"): - self.assertEqual( - parsed_schema.field_map[field].type, - Parse( - json.dumps([ - "null", - { - "type": "record", - "name": field, - "fields": [ - { - "type": ["null", "string"], - "name": "species", - }, - ], - "doc": "Translated Avro Schema for {}".format(field), - "namespace": "apache_beam.io.gcp.bigquery.root.{}".format( - field), - } - ]))) - - self.assertEqual( - parsed_schema.field_map["associates"].type, - Parse( - json.dumps({ - "type": "array", - "items": { - "type": "record", - "name": "associates", - "fields": [ - { - "type": ["null", "string"], - "name": "species", - }, - ], - "doc": "Translated Avro Schema for associates", - "namespace": "apache_beam.io.gcp.bigquery.root.associates", - } - }))) + parsed_schema = parse_schema(avro_schema) + self.assertEqual(type(parsed_schema), dict) + # names: key -> name, value -> different types allowed + names = { + "number": 4, + "species": 2, + "quality": 2, + "grade": 2, + "quantity": 2, + "dependents": 2, + "birthday": 2, + "birthdayMoney": 2, + "flighted": 2, + "flighted2": 2, + "sound": 2, + "anniversaryDate": 2, + "anniversaryDatetime": 2, + "anniversaryTime": 2, + "scion": 2, + "family": 2, + "associates": 2, + "geoPositions": 2, + } + # simple test case to check if the schema is parsed right. + fields = parsed_schema["fields"] + for i in range(len(fields)): + field_ = fields[i] + assert 'name' in field_ and field_['name'] in names + self.assertEqual(len(field_['type']), names[field_['name']]) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py b/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py index 25ffd3596662..22aacff044a8 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py @@ -264,7 +264,7 @@ def _get_bq_metadata(self): def _create_source(self, path, schema): if not self.use_json_exports: - return _create_avro_source(path, use_fastavro=True) + return _create_avro_source(path) else: return _TextSource( path, diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 36cd2c26f7c3..8fbea274bc07 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -553,13 +553,6 @@ def run_pipeline(self, pipeline, options): else: debug_options.add_experiment('use_staged_dataflow_worker_jar') - # Make Dataflow workers use FastAvro on Python 3 unless use_avro experiment - # is set. Note that use_avro is only interpreted by the Dataflow runner - # at job submission and is not interpreted by Dataflow service or workers, - # which by default use avro library unless use_fastavro experiment is set. - if not debug_options.lookup_experiment('use_avro'): - debug_options.add_experiment('use_fastavro') - self.job = apiclient.Job(options, self.proto_pipeline) # Dataflow Runner v1 requires output type of the Flatten to be the same as diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index b9be5d423284..dd1de55c628b 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -515,16 +515,6 @@ def test_dataflow_worker_jar_flag_adds_use_staged_worker_jar_experiment(self): self.assertIn('beam_fn_api', experiments_for_job) self.assertIn('use_staged_dataflow_worker_jar', experiments_for_job) - def test_use_fastavro_experiment_is_added_on_py3_and_onwards(self): - remote_runner = DataflowRunner() - - with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as p: - p | ptransform.Create([1]) # pylint: disable=expression-not-assigned - - self.assertTrue( - remote_runner.job.options.view_as(DebugOptions).lookup_experiment( - 'use_fastavro', False)) - def test_use_fastavro_experiment_is_not_added_when_use_avro_is_present(self): remote_runner = DataflowRunner() self.default_properties.append('--experiment=use_avro') diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py index 46ca68c75c4d..471eef06002c 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/names.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py @@ -36,7 +36,7 @@ # Update this version to the next version whenever there is a change that will # require changes to legacy Dataflow worker execution environment. -BEAM_CONTAINER_VERSION = 'beam-master-20211015' +BEAM_CONTAINER_VERSION = 'beam-master-20211103' # Update this version to the next version whenever there is a change that # requires changes to SDK harness container or SDK harness launcher. BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20211015' diff --git a/sdks/python/apache_beam/testing/datatype_inference.py b/sdks/python/apache_beam/testing/datatype_inference.py index c8236ab1d642..b68f5ec4a125 100644 --- a/sdks/python/apache_beam/testing/datatype_inference.py +++ b/sdks/python/apache_beam/testing/datatype_inference.py @@ -17,11 +17,10 @@ # pytype: skip-file import array -import json from collections import OrderedDict import numpy as np -from avro.schema import Parse +from fastavro import parse_schema from apache_beam.typehints import trivial_inference from apache_beam.typehints import typehints @@ -71,15 +70,13 @@ def infer_typehints_schema(data): return column_types -def infer_avro_schema(data, use_fastavro=False): +def infer_avro_schema(data): """For internal use only; no backwards-compatibility guarantees. Infer avro schema for tabular data. Args: data (List[dict]): A list of dictionaries representing rows in a table. - use_fastavro (bool): A flag indicating whether the schema should be - constructed using fastavro. Returns: An avro schema object. @@ -112,11 +109,7 @@ def typehint_to_avro_type(value): "type": "record", "fields": avro_fields } - if use_fastavro: - from fastavro import parse_schema - return parse_schema(schema_dict) - else: - return Parse(json.dumps(schema_dict)) + return parse_schema(schema_dict) def infer_pyarrow_schema(data): diff --git a/sdks/python/apache_beam/testing/datatype_inference_test.py b/sdks/python/apache_beam/testing/datatype_inference_test.py index d59adbf81ff2..445ebedb4c4e 100644 --- a/sdks/python/apache_beam/testing/datatype_inference_test.py +++ b/sdks/python/apache_beam/testing/datatype_inference_test.py @@ -127,7 +127,6 @@ def nullify_avro_schema(schema): def get_collumns_in_order(test_data): """Get a list of columns while trying to maintain original order. - .. note:: Columns which do not apear until later rows are added to the end, even if they preceed some columns which have already been added. @@ -175,24 +174,12 @@ def test_infer_pyarrow_schema(self, _, data, schema): pyarrow_schema = datatype_inference.infer_pyarrow_schema(data) self.assertEqual(pyarrow_schema, schema) - @parameterized.expand([(d["name"], d["data"], d["avro_schema"]) - for d in TEST_DATA]) - def test_infer_avro_schema(self, _, data, schema): - schema = schema.copy() # Otherwise, it would be mutated by `.pop()` - avro_schema = datatype_inference.infer_avro_schema(data, use_fastavro=False) - avro_schema = avro_schema.to_json() - fields1 = avro_schema.pop("fields") - fields2 = schema.pop("fields") - self.assertDictEqual(avro_schema, schema) - for field1, field2 in zip(fields1, fields2): - self.assertDictEqual(field1, field2) - @parameterized.expand([(d["name"], d["data"], d["avro_schema"]) for d in TEST_DATA]) def test_infer_fastavro_schema(self, _, data, schema): from fastavro import parse_schema schema = parse_schema(schema) - avro_schema = datatype_inference.infer_avro_schema(data, use_fastavro=True) + avro_schema = datatype_inference.infer_avro_schema(data) fields1 = avro_schema.pop("fields") fields2 = schema.pop("fields") self.assertDictEqual(avro_schema, schema) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 225a1f4b7fe9..37dc1207184e 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -125,7 +125,6 @@ def get_version(): REQUIRED_PACKAGES = [ # Avro 1.9.2 for python3 was broken. The issue was fixed in version 1.9.2.1 - 'avro-python3>=1.8.1,!=1.9.2,<1.10.0', 'crcmod>=1.7,<2.0', # dataclasses backport for python_version<3.7. No version bound because this # is Python standard since Python 3.7 and each Python version is compatible