From 2915e3cd726090ecdc1e9dfb8c06f289d38cc592 Mon Sep 17 00:00:00 2001 From: Sven Kreiss Date: Sat, 12 Nov 2022 17:30:29 +0100 Subject: [PATCH 1/7] major lint run (#164) --- .github/workflows/deploy.yml | 8 +- .github/workflows/tests.yml | 12 +- .pylintrc | 2 +- pysparkling/fileio/fs/file_system.py | 32 ++--- pysparkling/rdd.py | 2 +- pysparkling/sql/casts.py | 9 +- pysparkling/sql/column.py | 2 +- pysparkling/sql/conf.py | 3 +- pysparkling/sql/dataframe.py | 20 ++-- pysparkling/sql/internal_utils/writers.py | 6 +- pysparkling/sql/readwriter.py | 4 +- pysparkling/sql/session.py | 4 +- pysparkling/sql/tests/test_write.py | 2 +- pysparkling/sql/types.py | 139 ++++++++++------------ pysparkling/stat_counter.py | 6 +- pysparkling/storagelevel.py | 8 +- pysparkling/tests/profile_textfile.py | 2 +- pysparkling/tests/test_multiprocessing.py | 4 +- pysparkling/tests/test_textFile.py | 95 +++++++-------- pysparkling/utils.py | 9 +- scripts/benchmark_csv.py | 2 +- scripts/tcpperf_client.py | 7 +- scripts/tcpperf_plot.py | 2 +- scripts/tcpperf_server.py | 4 +- setup.py | 4 +- 25 files changed, 184 insertions(+), 204 deletions(-) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 98fcff084..b2957e58c 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -19,11 +19,11 @@ jobs: name: Build Python source distribution runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 with: fetch-depth: 0 - - uses: actions/setup-python@v2 + - uses: actions/setup-python@v4 name: Install Python with: python-version: '3.7' @@ -31,7 +31,7 @@ jobs: - name: Build sdist run: python setup.py sdist - - uses: actions/upload-artifact@v2 + - uses: actions/upload-artifact@v3 with: path: dist/*.tar.gz @@ -43,7 +43,7 @@ jobs: # alternatively, to publish when a GitHub Release is created, use the following rule: if: github.event_name == 'release' && github.event.action == 'published' steps: - - uses: actions/download-artifact@v2 + - uses: actions/download-artifact@v3 with: name: artifact path: dist diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index bb4353256..a83ab748f 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -9,22 +9,22 @@ jobs: strategy: matrix: os: [ ubuntu-latest, macos-latest, windows-latest ] - python: [ 3.6, 3.7, 3.8, 3.9 ] + python: [ 3.7, 3.8, 3.9 ] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 with: fetch-depth: 0 - name: Set up Python ${{ matrix.python }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python }} - name: Install run: | - python -m pip install --upgrade pip - python -m pip install -e ".[tests,scripts]" --use-deprecated=legacy-resolver + python -m pip install --upgrade pip setuptools + python -m pip install -e ".[tests,scripts]" - name: Print environment run: | @@ -45,7 +45,7 @@ jobs: - name: Install SQL Dependencies run: | - python -m pip install -e ".[sql]" --use-deprecated=legacy-resolver + python -m pip install -e ".[sql]" - name: Lint if: matrix.python == '3.9' diff --git a/.pylintrc b/.pylintrc index d913d9d0c..72a4d7947 100644 --- a/.pylintrc +++ b/.pylintrc @@ -3,7 +3,7 @@ variable-rgx=[a-z0-9_]{1,30}$ good-names=log -disable=invalid-name,unused-argument,too-few-public-methods,no-self-use,missing-docstring,logging-format-interpolation,too-many-instance-attributes,duplicate-code,too-many-public-methods,too-many-arguments,protected-access,too-many-lines +disable=invalid-name,unused-argument,too-few-public-methods,missing-docstring,logging-format-interpolation,too-many-instance-attributes,duplicate-code,too-many-public-methods,too-many-arguments,protected-access,too-many-lines,missing-timeout,unnecessary-lambda-assignment [FORMAT] max-line-length=119 diff --git a/pysparkling/fileio/fs/file_system.py b/pysparkling/fileio/fs/file_system.py index 7bb0ad799..a9fc4d45d 100644 --- a/pysparkling/fileio/fs/file_system.py +++ b/pysparkling/fileio/fs/file_system.py @@ -1,4 +1,6 @@ +import io import logging +import typing as t log = logging.getLogger(__name__) @@ -8,55 +10,56 @@ class FileSystem: :param str file_name: File name. """ - def __init__(self, file_name): - self.file_name = file_name + def __init__(self, file_name: str): + self.file_name: str = file_name @staticmethod - def resolve_filenames(expr): + def resolve_filenames(expr: str) -> t.List[str]: """Resolve the given glob-like expression to filenames. :rtype: list """ log.error('Cannot resolve: %s', expr) + raise NotImplementedError @staticmethod - def resolve_content(expr): + def resolve_content(expr: str) -> t.List[str]: """Return all the files matching expr or in a folder matching expr :rtype: list """ log.error('Cannot resolve: %s', expr) + raise NotImplementedError - def exists(self): + def exists(self) -> bool: """Check whether the given file_name exists. :rtype: bool """ log.warning('Could not determine whether %s exists due to unhandled scheme.', self.file_name) + raise NotImplementedError - def load(self): - """Load a file to a stream. - - :rtype: io.BytesIO - """ + def load(self) -> io.BytesIO: + """Load a file to a stream.""" log.error('Cannot load: %s', self.file_name) + raise NotImplementedError - def load_text(self, encoding='utf8', encoding_errors='ignore'): + def load_text(self, encoding: str = 'utf8', encoding_errors: str = 'ignore') -> io.StringIO: """Load a file to a stream. :param str encoding: Text encoding. :param str encoding_errors: How to handle encoding errors. - - :rtype: io.StringIO """ log.error('Cannot load: %s', self.file_name) + raise NotImplementedError - def dump(self, stream): + def dump(self, stream: io.BytesIO): """Dump a stream to a file. :param io.BytesIO stream: Input tream. """ log.error('Cannot dump: %s', self.file_name) + raise NotImplementedError def make_public(self, recursive=False): """Make the file public (only on some file systems). @@ -65,3 +68,4 @@ def make_public(self, recursive=False): :rtype: FileSystem """ log.warning('Cannot make %s public.', self.file_name) + raise NotImplementedError diff --git a/pysparkling/rdd.py b/pysparkling/rdd.py index eca05e426..3cbcafe7c 100644 --- a/pysparkling/rdd.py +++ b/pysparkling/rdd.py @@ -1840,7 +1840,7 @@ def takeSample(self, withReplacement, num, seed=None): maxSampleSize = sys.maxsize - int(numStDev * math.sqrt(sys.maxsize)) if num > maxSampleSize: raise ValueError( - "Sample size cannot be greater than %d." % maxSampleSize) + f"Sample size cannot be greater than {maxSampleSize}.") fraction = RDD._computeFractionForSampleSize(num, initial_count, withReplacement) samples = self.sample(withReplacement, fraction, seed).collect() diff --git a/pysparkling/sql/casts.py b/pysparkling/sql/casts.py index 076a589cc..06b318f13 100644 --- a/pysparkling/sql/casts.py +++ b/pysparkling/sql/casts.py @@ -73,12 +73,11 @@ def cast_map(value, from_type, options): if sub_value is not None else None)) for key, sub_value in value.items() ] - return "[{0}]".format( - ", ".join("{0} ->{1}".format( - casted_key, - f" {casted_value}" if casted_value is not None else "" - ) for casted_key, casted_value in casted_values) + joined_values = ", ".join( + f"{casted_key} ->{(' ' + casted_value) if casted_value is not None else ''}" + for casted_key, casted_value in casted_values ) + return f"[{joined_values}]" def cast_sequence(value, from_type, options): diff --git a/pysparkling/sql/column.py b/pysparkling/sql/column.py index 42f15e337..3ee8bd945 100644 --- a/pysparkling/sql/column.py +++ b/pysparkling/sql/column.py @@ -461,7 +461,7 @@ def alias(self, *alias, **kwargs): """ metadata = kwargs.pop('metadata', None) - assert not kwargs, 'Unexpected kwargs where passed: %s' % kwargs + assert not kwargs, f'Unexpected kwargs where passed: {kwargs}' if metadata: # pylint: disable=W0511 diff --git a/pysparkling/sql/conf.py b/pysparkling/sql/conf.py index 8cbe566f1..553133f4d 100644 --- a/pysparkling/sql/conf.py +++ b/pysparkling/sql/conf.py @@ -21,8 +21,7 @@ def unset(self, key): def _checkType(self, obj, identifier): if not isinstance(obj, str): - raise TypeError("expected %s '%s' to be a string (was '%s')" % - (identifier, obj, type(obj).__name__)) + raise TypeError(f"expected {identifier} '{obj}' to be a string (was '{type(obj).__name__}')") def isModifiable(self, key): raise NotImplementedError("pysparkling does not support yet this feature") diff --git a/pysparkling/sql/dataframe.py b/pysparkling/sql/dataframe.py index 7e5e27fe9..6cfaff353 100644 --- a/pysparkling/sql/dataframe.py +++ b/pysparkling/sql/dataframe.py @@ -7,8 +7,7 @@ from .internal_utils.joins import CROSS_JOIN, JOIN_TYPES from .internals import CUBE_TYPE, InternalGroupedDataFrame, ROLLUP_TYPE from .types import ( - _check_series_convert_timestamps_local_tz, ByteType, FloatType, IntegerType, IntegralType, ShortType, - TimestampType + _check_series_convert_timestamps_local_tz, ByteType, FloatType, IntegerType, IntegralType, ShortType, TimestampType ) from .utils import AnalysisException, IllegalArgumentException, require_minimum_pandas_version @@ -187,7 +186,8 @@ def show(self, n=20, truncate=True, vertical=False): print(self._jdf.showString(n, int(truncate), vertical)) def __repr__(self): - return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes)) + values = ", ".join(f"{c1}: {c2}" for c1, c2 in self.dtypes) + return f"DataFrame[{values}]" def checkpoint(self, eager=True): raise NotImplementedError("Streaming is not supported in PySparkling") @@ -510,7 +510,7 @@ def sample(self, withReplacement=None, fraction=None, seed=None): raise TypeError( "withReplacement (optional), fraction (required) and seed (optional)" " should be a bool, float and number; however, " - "got [%s]." % ", ".join(argtypes)) + f"got [{', '.join(argtypes)}].") if is_withReplacement_omitted_args: if fraction is not None: @@ -829,7 +829,7 @@ def _sort_cols(cols, kwargs): elif isinstance(ascending, list): cols = [jc if asc else jc.desc() for asc, jc in zip(ascending, cols)] else: - raise TypeError("ascending can only be boolean or list, but got %s" % type(ascending)) + raise TypeError(f"ascending can only be boolean or list, but got {type(ascending)}") if kwargs: raise TypeError(f"Unrecognized arguments: {kwargs}") @@ -961,7 +961,7 @@ def __getitem__(self, item): return self.select(*item) if isinstance(item, int): return Column(FieldAsExpression(self._jdf.bound_schema[item])) - raise TypeError("unexpected item type: %s" % type(item)) + raise TypeError(f"unexpected item type: {type(item)}") def __getattr__(self, name): if name.startswith("_"): @@ -1452,7 +1452,7 @@ def approxQuantile(self, col, probabilities, relativeError): [[2.0, 2.0, 5.0]] """ if not isinstance(col, (str, list, tuple)): - raise ValueError("col should be a string, list or tuple, but got %r" % type(col)) + raise ValueError(f"col should be a string, list or tuple, but got {repr(type(col))}") isStr = isinstance(col, str) @@ -1463,7 +1463,7 @@ def approxQuantile(self, col, probabilities, relativeError): for c in col: if not isinstance(c, str): - raise ValueError("columns should be strings, but got %r" % type(c)) + raise ValueError(f"columns should be strings, but got {repr(type(c))}") if not isinstance(probabilities, (list, tuple)): raise ValueError("probabilities should be a list or tuple") @@ -1618,8 +1618,8 @@ def toDF(self, *cols): def transform(self, func): result = func(self) - assert isinstance(result, DataFrame), "Func returned an instance of type [%s], " \ - "should have been DataFrame." % type(result) + assert isinstance(result, DataFrame), ( + f"Func returned an instance of type [{type(result)}], should have been DataFrame.") return result def toPandas(self): diff --git a/pysparkling/sql/internal_utils/writers.py b/pysparkling/sql/internal_utils/writers.py index a48c8156e..0e8fe7538 100644 --- a/pysparkling/sql/internal_utils/writers.py +++ b/pysparkling/sql/internal_utils/writers.py @@ -173,7 +173,7 @@ def save(self): success_path = os.path.join(output_path, "_SUCCESS") - with open(success_path, "w"): + with open(success_path, "w", encoding="utf8"): pass def preformat(self, row, schema): @@ -302,7 +302,7 @@ def write(self, items, ref_value, schema): # - escape # - escapeQuotes - with open(file_path, "w") as f: + with open(file_path, "w", encoding="utf8") as f: writer = csv.writer( f, delimiter=self.sep, @@ -367,6 +367,6 @@ def write(self, items, ref_value, schema): if not os.path.exists(partition_folder): os.makedirs(partition_folder) - with open(file_path, "a") as f: + with open(file_path, "a", encoding="utf8") as f: f.writelines(items) return len(items) diff --git a/pysparkling/sql/readwriter.py b/pysparkling/sql/readwriter.py index 77f9a486f..352e5b8c5 100644 --- a/pysparkling/sql/readwriter.py +++ b/pysparkling/sql/readwriter.py @@ -125,8 +125,8 @@ def option(self, key, value): return self def options(self, **options): - for k in options: - self._jwrite.option(k, options[k]) + for k, v in options.items(): + self._jwrite.option(k, v) return self def partitionBy(self, *cols): diff --git a/pysparkling/sql/session.py b/pysparkling/sql/session.py index babd55d99..5ba8c4ebd 100644 --- a/pysparkling/sql/session.py +++ b/pysparkling/sql/session.py @@ -148,7 +148,7 @@ def _createFromRDD(self, rdd, schema, samplingRatio): schema = struct elif not isinstance(schema, StructType): - raise TypeError("schema should be StructType or list or None, but got: %s" % schema) + raise TypeError(f"schema should be StructType or list or None, but got: {schema}") # convert python objects to sql data rdd = rdd.map(schema.toInternal) @@ -174,7 +174,7 @@ def _createFromLocal(self, data, schema): schema = struct elif not isinstance(schema, StructType): - raise TypeError("schema should be StructType or list or None, but got: %s" % schema) + raise TypeError(f"schema should be StructType or list or None, but got: {schema}") # convert python objects to sql data data = [schema.toInternal(row) for row in data] diff --git a/pysparkling/sql/tests/test_write.py b/pysparkling/sql/tests/test_write.py index 88ce6c4de..a9ab97b65 100644 --- a/pysparkling/sql/tests/test_write.py +++ b/pysparkling/sql/tests/test_write.py @@ -18,7 +18,7 @@ def get_folder_content(folder_path): relative_path = root[len(folder_path):] for file in files: file_path = os.path.join(root, file) - with open(file_path, 'r') as file_content: + with open(file_path, 'r', encoding='utf8') as file_content: folder_content[os.path.join(relative_path, file)] = file_content.readlines() return folder_content diff --git a/pysparkling/sql/types.py b/pysparkling/sql/types.py index 3b6574e05..8e1e027e5 100644 --- a/pysparkling/sql/types.py +++ b/pysparkling/sql/types.py @@ -203,13 +203,13 @@ def __init__(self, precision=10, scale=0): self.hasPrecisionInfo = True # this is public API def simpleString(self): - return "decimal(%d,%d)" % (self.precision, self.scale) + return f"decimal({self.precision:d},{self.scale:d})" def jsonValue(self): - return "decimal(%d,%d)" % (self.precision, self.scale) + return f"decimal({self.precision:d},{self.scale:d})" def __repr__(self): - return "DecimalType(%d,%d)" % (self.precision, self.scale) + return f"DecimalType({self.precision:d},{self.scale:d})" class DoubleType(FractionalType): @@ -276,16 +276,15 @@ def __init__(self, elementType, containsNull=True): False """ assert isinstance(elementType, DataType), \ - "elementType %s should be an instance of %s" % (elementType, DataType) + f"elementType {elementType} should be an instance of {DataType}" self.elementType = elementType self.containsNull = containsNull def simpleString(self): - return 'array<%s>' % self.elementType.simpleString() + return f'array<{self.elementType.simpleString()}>' def __repr__(self): - return "ArrayType(%s,%s)" % (self.elementType, - str(self.containsNull).lower()) + return f"ArrayType({self.elementType},{str(self.containsNull).lower()})" def jsonValue(self): return {"type": self.typeName(), @@ -332,19 +331,18 @@ def __init__(self, keyType, valueType, valueContainsNull=True): False """ assert isinstance(keyType, DataType), \ - "keyType %s should be an instance of %s" % (keyType, DataType) + f"keyType {keyType} should be an instance of {DataType}" assert isinstance(valueType, DataType), \ - "valueType %s should be an instance of %s" % (valueType, DataType) + f"valueType {valueType} should be an instance of {DataType}" self.keyType = keyType self.valueType = valueType self.valueContainsNull = valueContainsNull def simpleString(self): - return 'map<%s,%s>' % (self.keyType.simpleString(), self.valueType.simpleString()) + return f'map<{self.keyType.simpleString()},{self.valueType.simpleString()}>' def __repr__(self): - return "MapType(%s,%s,%s)" % (self.keyType, self.valueType, - str(self.valueContainsNull).lower()) + return f"MapType({self.keyType},{self.valueType},{str(self.valueContainsNull).lower()})" def jsonValue(self): return {"type": self.typeName(), @@ -394,19 +392,18 @@ def __init__(self, name, dataType, nullable=True, metadata=None): False """ assert isinstance(dataType, DataType), \ - "dataType %s should be an instance of %s" % (dataType, DataType) - assert isinstance(name, str), "field name %s should be string" % name + f"dataType {dataType} should be an instance of {DataType}" + assert isinstance(name, str), f"field name {name} should be string" self.name = name self.dataType = dataType self.nullable = nullable self.metadata = metadata or {} def simpleString(self): - return '%s:%s' % (self.name, self.dataType.simpleString()) + return f'{self.name}:{self.dataType.simpleString()}' def __repr__(self): - return "StructField(%s,%s,%s)" % (self.name, self.dataType, - str(self.nullable).lower()) + return f"StructField({self.name},{self.dataType},{str(self.nullable).lower()})" def jsonValue(self): return {"name": self.name, @@ -431,7 +428,7 @@ def toInternal(self, obj): def fromInternal(self, obj): return self.dataType.fromInternal(obj) - def typeName(self): + def typeName(self): # pylint: disable=arguments-differ raise TypeError( "StructField does not have typeName. " "Use typeName on its type explicitly instead.") @@ -549,7 +546,7 @@ def __getitem__(self, key): raise TypeError('StructType keys should be strings, integers or slices') def simpleString(self): - return 'struct<%s>' % (','.join(f.simpleString() for f in self)) + return f"struct<{','.join(f.simpleString() for f in self)}>" def treeString(self): """ @@ -614,8 +611,7 @@ def _dump_array(data_type): return '\n'.join(txt) def __repr__(self): - return ("StructType(List(%s))" % - ",".join(str(field) for field in self)) + return f"StructType(List({','.join(str(field) for field in self)}))" def jsonValue(self): return {"type": self.typeName(), @@ -656,7 +652,7 @@ def toInternal(self, obj): if hasattr(obj, "__dict__"): d = obj.__dict__ return tuple(d.get(n) for n in self.names) - raise ValueError("Unexpected tuple %r with StructType" % obj) + raise ValueError(f"Unexpected tuple {obj} with StructType") def to_serialized_internal(self, obj): # Only calling toInternal function for fields that need conversion @@ -675,7 +671,7 @@ def to_serialized_internal(self, obj): d = obj.__dict__ return tuple(f.toInternal(d.get(n)) if c else d.get(n) for n, f, c in zip(self.names, self.fields, self._needConversion)) - raise ValueError("Unexpected tuple %r with StructType" % obj) + raise ValueError(f"Unexpected tuple {obj} with StructType") def fromInternal(self, obj): if obj is None: @@ -794,7 +790,7 @@ def jsonValue(self): schema = { "type": "udt", "class": self.scalaUDT(), - "pyClass": "%s.%s" % (self.module(), type(self).__name__), + "pyClass": f"{self.module()}.{type(self).__name__}", "sqlType": self.sqlType().jsonValue() } else: @@ -889,14 +885,15 @@ def _parse_datatype_json_value(json_value): return DecimalType() if _FIXED_DECIMAL.match(json_value): m = _FIXED_DECIMAL.match(json_value) + assert m is not None return DecimalType(int(m.group(1)), int(m.group(2))) - raise ValueError("Could not parse datatype: %s" % json_value) + raise ValueError(f"Could not parse datatype: {json_value}") tpe = json_value["type"] if tpe in _all_complex_types: return _all_complex_types[tpe].fromJson(json_value) if tpe == 'udt': return UserDefinedType.fromJson(json_value) - raise ValueError("not supported type: %s" % tpe) + raise ValueError(f"not supported type: {tpe}") # Mapping Python types to Spark SQL DataType @@ -969,17 +966,17 @@ def _int_size_to_type(size): } # compute array typecode mappings for signed integer types -for _typecode in _array_signed_int_typecode_ctype_mappings: - _size = ctypes.sizeof(_array_signed_int_typecode_ctype_mappings[_typecode]) * 8 +for _typecode, mapped_ctype in _array_signed_int_typecode_ctype_mappings.items(): + _size = ctypes.sizeof(mapped_ctype) * 8 _dt = _int_size_to_type(_size) if _dt is not None: _array_type_mappings[_typecode] = _dt # compute array typecode mappings for unsigned integer types -for _typecode in _array_unsigned_int_typecode_ctype_mappings: +for _typecode, mapped_ctype in _array_unsigned_int_typecode_ctype_mappings.items(): # JVM does not have unsigned types, so use signed types that is at least 1 # bit larger to store - _size = ctypes.sizeof(_array_unsigned_int_typecode_ctype_mappings[_typecode]) * 8 + 1 + _size = ctypes.sizeof(mapped_ctype) * 8 + 1 _dt = _int_size_to_type(_size) if _dt is not None: _array_type_mappings[_typecode] = _dt @@ -1013,7 +1010,7 @@ def _infer_type(obj): try: return _infer_schema(obj) except TypeError as e: - raise TypeError("not supported type: %s" % type(obj)) from e + raise TypeError(f"not supported type: {type(obj)}") from e def _infer_struct_type(obj): @@ -1030,7 +1027,7 @@ def _infer_struct_type(obj): if isinstance(obj, array): if obj.typecode in _array_type_mappings: return ArrayType(_array_type_mappings[obj.typecode](), False) - raise TypeError("not supported type: array(%s)" % obj.typecode) + raise TypeError(f"not supported type: array({obj.typecode})") return None @@ -1041,22 +1038,22 @@ def _infer_schema(row, names=None): elif isinstance(row, (tuple, list)): if hasattr(row, "__fields__"): # Row - items = zip(row.__fields__, tuple(row)) + items = zip(row.__fields__, tuple(row)) # type: ignore elif hasattr(row, "_fields"): # namedtuple # noinspection PyProtectedMember - items = zip(row._fields, tuple(row)) + items = zip(row._fields, tuple(row)) # type: ignore else: if names is None: - names = ['_%d' % i for i in range(1, len(row) + 1)] + names = [f'_{i}' for i in range(1, len(row) + 1)] elif len(names) < len(row): - names.extend('_%d' % i for i in range(len(names) + 1, len(row) + 1)) + names.extend(f'_{i}' for i in range(len(names) + 1, len(row) + 1)) items = zip(names, row) elif hasattr(row, "__dict__"): # object items = sorted(row.__dict__.items()) else: - raise TypeError("Can not infer schema for type: %s" % type(row)) + raise TypeError(f"Can not infer schema for type: {type(row)}") fields = [StructField(k, _infer_type(v), True) for k, v in items] return StructType(fields) @@ -1101,10 +1098,10 @@ def _get_null_fields(field, prefix=""): def _merge_type(a, b, name=None): if name is None: new_msg = lambda msg: msg - new_name = lambda n: "field %s" % n + new_name = lambda n: f"field {n}" else: - new_msg = lambda msg: "%s: %s" % (name, msg) - new_name = lambda n: "field %s in %s" % (n, name) + new_msg = lambda msg: f"{name}: {msg}" + new_name = lambda n: f"field {n} in {name}" if isinstance(a, NullType): return b @@ -1113,7 +1110,7 @@ def _merge_type(a, b, name=None): if type(a) is not type(b): # pylint: disable=W0511 # TODO: type cast (such as int -> long) - raise TypeError(new_msg("Can not merge type %s and %s" % (type(a), type(b)))) + raise TypeError(new_msg(f"Can not merge type {type(a)} and {type(b)}")) # same type if isinstance(a, StructType): @@ -1129,11 +1126,11 @@ def _merge_type(a, b, name=None): if isinstance(a, ArrayType): return ArrayType(_merge_type(a.elementType, b.elementType, - name='element in array %s' % name), True) + name=f'element in array {name}'), True) if isinstance(a, MapType): - return MapType(_merge_type(a.keyType, b.keyType, name='key of map %s' % name), - _merge_type(a.valueType, b.valueType, name='value of map %s' % name), + return MapType(_merge_type(a.keyType, b.keyType, name=f'key of map {name}'), + _merge_type(a.valueType, b.valueType, name=f'value of map {name}'), True) return a @@ -1202,7 +1199,7 @@ def convert_dict(obj): elif hasattr(obj, "__dict__"): # object d = obj.__dict__ else: - raise TypeError("Unexpected obj type: %s" % type(obj)) + raise TypeError(f"Unexpected obj type: {type(obj)}") if convert_fields: return tuple(convert(d.get(name)) for name, convert in zip(names, converters)) @@ -1286,10 +1283,10 @@ def _make_type_verifier(dataType, nullable=True, name=None): if name is None: new_msg = lambda msg: msg - new_name = lambda n: "field %s" % n + new_name = lambda n: f"field {n}" else: - new_msg = lambda msg: "%s: %s" % (name, msg) - new_name = lambda n: "field %s in %s" % (n, name) + new_msg = lambda msg: f"{name}: {msg}" + new_name = lambda n: f"field {n} in {name}" def verify_nullability(obj): if obj is None: @@ -1302,14 +1299,13 @@ def verify_nullability(obj): def assert_acceptable_types(obj): assert _type in _acceptable_types, \ - new_msg("unknown datatype: %s for object %r" % (dataType, obj)) + new_msg(f"unknown datatype: {dataType} for object {repr(obj)}") def verify_acceptable_types(obj): # subclass of them can not be fromInternal in JVM convertible_types = tuple(_acceptable_types[_type]) if not isinstance(obj, convertible_types): - raise TypeError(new_msg("%s can not accept object %r in type %s" - % (dataType, obj, type(obj)))) + raise TypeError(new_msg(f"{dataType} can not accept object {repr(obj)} in type {type(obj)}")) verify_value = get_verifier( dataType, @@ -1340,7 +1336,7 @@ def no_check(value): def verify_udf(obj): if not (hasattr(obj, '__UDT__') and obj.__UDT__ == dataType): - raise ValueError(new_msg("%r is not an instance of type %r" % (obj, dataType))) + raise ValueError(new_msg(f"{repr(obj)} is not an instance of type {repr(dataType)}")) field_verifier(dataType.toInternal(obj)) verifier = verify_udf @@ -1349,7 +1345,7 @@ def verify_byte(obj): assert_acceptable_types(obj) verify_acceptable_types(obj) if obj < -128 or obj > 127: - raise ValueError(new_msg("object of ByteType out of range, got: %s" % obj)) + raise ValueError(new_msg(f"object of ByteType out of range, got: {obj}")) verifier = verify_byte elif isinstance(dataType, ShortType): @@ -1357,7 +1353,7 @@ def verify_short(obj): assert_acceptable_types(obj) verify_acceptable_types(obj) if obj < -32768 or obj > 32767: - raise ValueError(new_msg("object of ShortType out of range, got: %s" % obj)) + raise ValueError(new_msg(f"object of ShortType out of range, got: {obj}")) verifier = verify_short elif isinstance(dataType, IntegerType): @@ -1365,7 +1361,7 @@ def verify_integer(obj): assert_acceptable_types(obj) verify_acceptable_types(obj) if obj < -2147483648 or obj > 2147483647: - raise ValueError(new_msg("object of IntegerType out of range, got: %s" % obj)) + raise ValueError(new_msg(f"object of IntegerType out of range, got: {obj}")) verifier = verify_integer elif isinstance(dataType, ArrayType): @@ -1389,7 +1385,7 @@ def verify_default(obj): def get_array_verifier(dataType, name, assert_acceptable_types, verify_acceptable_types): element_verifier = _make_type_verifier( - dataType.elementType, dataType.containsNull, name="element in array %s" % name) + dataType.elementType, dataType.containsNull, name=f"element in array {name}") def verify_array(obj): assert_acceptable_types(obj) @@ -1423,8 +1419,7 @@ def verify_struct(obj): elif isinstance(obj, (tuple, list)): if len(obj) != len(verifiers): raise ValueError( - new_msg("Length of object (%d) does not match with " - "length of fields (%d)" % (len(obj), len(verifiers)))) + new_msg(f"Length of object ({len(obj)}) does not match with length of fields ({len(verifiers)})")) for v, (_, verifier) in zip(obj, verifiers): verifier(v) elif hasattr(obj, "__dict__"): @@ -1432,16 +1427,15 @@ def verify_struct(obj): for f, verifier in verifiers: verifier(d.get(f)) else: - raise TypeError(new_msg("StructType can not accept object %r in type %s" - % (obj, type(obj)))) + raise TypeError(new_msg(f"StructType can not accept object {repr(obj)} in type {type(obj)}")) return verify_struct def get_map_verifier(dataType, name, assert_acceptable_types, verify_acceptable_types): - key_verifier = _make_type_verifier(dataType.keyType, False, name="key of map %s" % name) + key_verifier = _make_type_verifier(dataType.keyType, False, name=f"key of map {name}") value_verifier = _make_type_verifier( - dataType.valueType, dataType.valueContainsNull, name="value of map %s" % name) + dataType.valueType, dataType.valueContainsNull, name=f"value of map {name}") def verify_map(obj): assert_acceptable_types(obj) @@ -1486,7 +1480,7 @@ def create_row(fields, values, metadata=None): :return: pysparkling.sql.Row """ new_row = tuple.__new__(Row, values) - new_row.__fields__ = tuple(fields) + new_row.__fields__ = tuple(fields) # pylint: disable=attribute-defined-outside-init new_row.set_metadata(metadata) return new_row @@ -1587,8 +1581,7 @@ def __contains__(self, item): def __call__(self, *args): """create new Row object""" if len(args) > len(self): - raise ValueError("Can not create Row with fields %s, expected %d values " - "but got %s" % (self, len(self), args)) + raise ValueError(f"Can not create Row with fields {self}, expected {len(self)} values but got {args}") return create_row(self, args) def __getitem__(self, item): @@ -1631,16 +1624,16 @@ def __reduce__(self): def __repr__(self): """Printable representation of Row used in Python REPL.""" if hasattr(self, "__fields__"): - return "Row(%s)" % ", ".join("%s=%r" % (k, v) - for k, v in zip(self.__fields__, tuple(self))) - return "" % ", ".join(self) + fields = ', '.join(f"{k}={repr(v)}" for k, v in zip(self.__fields__, tuple(self))) + return f"Row({fields})" + return f"" def set_grouping(self, grouping): # This method is specific to Pysparkling and should not be used by # user of the library who wants compatibility with PySpark if self._metadata is None: self.set_metadata({}) - self._metadata["grouping"] = grouping + self._metadata["grouping"] = grouping # type: ignore return self def set_input_file_name(self, input_file_name): @@ -1648,7 +1641,7 @@ def set_input_file_name(self, input_file_name): # user of the library who wants compatibility with PySpark if self._metadata is None: self.set_metadata({}) - self._metadata["input_file_name"] = input_file_name + self._metadata["input_file_name"] = input_file_name # type: ignore return self def set_metadata(self, metadata): @@ -1693,7 +1686,7 @@ def _check_series_localize_timestamps(s, timezone): try: # pandas is an optional dependency # pylint: disable=import-outside-toplevel - from pandas.api.types import is_datetime64tz_dtype + from pandas.api.types import is_datetime64tz_dtype # type: ignore except ImportError as e: raise Exception("require_minimum_pandas_version() was not called") from e tz = timezone or _get_local_timezone() @@ -1733,7 +1726,7 @@ def _check_series_convert_timestamps_internal(s, timezone): try: # pandas is an optional dependency # pylint: disable=import-outside-toplevel - from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype + from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype # type: ignore except ImportError as e: raise Exception("require_minimum_pandas_version() was not called") from e @@ -1792,7 +1785,7 @@ def _check_series_convert_timestamps_localize(s, from_timezone, to_timezone): # pandas is an optional dependency # pylint: disable=import-outside-toplevel import pandas as pd - from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype + from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype # type: ignore except ImportError as e: raise Exception("require_minimum_pandas_version() was not called") from e diff --git a/pysparkling/stat_counter.py b/pysparkling/stat_counter.py index fb38b40c5..8a6deff25 100644 --- a/pysparkling/stat_counter.py +++ b/pysparkling/stat_counter.py @@ -138,8 +138,10 @@ def sampleStdev(self): return sqrt(self.sampleVariance()) def __repr__(self): - return ("(count: %s, mean: %s, stdev: %s, max: %s, min: %s)" % - (self.count(), self.mean(), self.stdev(), self.max(), self.min())) + return ( + f"(count: {self.count()}, mean: {self.mean()}, stdev: {self.stdev()}, max: {self.max()}, " + f"min: {self.min()})" + ) PercentileStats = namedtuple("PercentileStats", ["value", "g", "delta"]) diff --git a/pysparkling/storagelevel.py b/pysparkling/storagelevel.py index 02a9b91c7..744b994e4 100644 --- a/pysparkling/storagelevel.py +++ b/pysparkling/storagelevel.py @@ -37,8 +37,10 @@ def __init__(self, useDisk, useMemory, useOffHeap, deserialized, replication=1): self.replication = replication def __repr__(self): - return "StorageLevel(%s, %s, %s, %s, %s)" % ( - self.useDisk, self.useMemory, self.useOffHeap, self.deserialized, self.replication) + return ( + f"StorageLevel({self.useDisk}, {self.useMemory}, {self.useOffHeap}, {self.deserialized}, " + f"{self.replication})" + ) def __str__(self): result = "" @@ -46,7 +48,7 @@ def __str__(self): result += "Memory " if self.useMemory else "" result += "OffHeap " if self.useOffHeap else "" result += "Deserialized " if self.deserialized else "Serialized " - result += "%sx Replicated" % self.replication + result += f"{self.replication}x Replicated" return result diff --git a/pysparkling/tests/profile_textfile.py b/pysparkling/tests/profile_textfile.py index d36a706c2..58f3bbe0d 100644 --- a/pysparkling/tests/profile_textfile.py +++ b/pysparkling/tests/profile_textfile.py @@ -7,7 +7,7 @@ @profile def main(): - tempFile = tempfile.NamedTemporaryFile(delete=True) + tempFile = tempfile.NamedTemporaryFile(delete=True) # pylint: disable=consider-using-with tempFile.close() sc = pysparkling.Context() diff --git a/pysparkling/tests/test_multiprocessing.py b/pysparkling/tests/test_multiprocessing.py index 1d94ce562..dbe36d40b 100644 --- a/pysparkling/tests/test_multiprocessing.py +++ b/pysparkling/tests/test_multiprocessing.py @@ -42,7 +42,7 @@ def lazy_execution_test(self): class Multiprocessing(unittest.TestCase): def setUp(self): - self.pool = multiprocessing.Pool(4) + self.pool = multiprocessing.Pool(4) # pylint: disable=consider-using-with self.sc = pysparkling.Context(pool=self.pool, serializer=cloudpickle.dumps, deserializer=pickle.loads) @@ -66,7 +66,7 @@ def square_op(x): class MultiprocessingWithoutCloudpickle(unittest.TestCase): def setUp(self): - self.pool = multiprocessing.Pool(4) + self.pool = multiprocessing.Pool(4) # pylint: disable=consider-using-with self.sc = pysparkling.Context(pool=self.pool) def test_basic(self): diff --git a/pysparkling/tests/test_textFile.py b/pysparkling/tests/test_textFile.py index ec43f4c64..f3f1197cd 100644 --- a/pysparkling/tests/test_textFile.py +++ b/pysparkling/tests/test_textFile.py @@ -1,9 +1,8 @@ -import logging import os +import pathlib import pickle import random import sys -import tempfile import unittest import pytest @@ -156,74 +155,66 @@ def test_http_textFile(): assert 'TGCTGCGGTGAATGCGTTCCCGGGTCT' in myrdd.collect() -def test_saveAsTextFile(): - tempFile = tempfile.NamedTemporaryFile(delete=True) - tempFile.close() - Context().parallelize(range(10)).saveAsTextFile(tempFile.name) - with open(tempFile.name, 'r') as f: +def test_saveAsTextFile(tmp_path: pathlib.Path): + filename = str(tmp_path / 'textfile.txt') + Context().parallelize(range(10)).saveAsTextFile(filename) + with open(filename, 'r', encoding='utf8') as f: r = f.readlines() print(r) assert '5\n' in r -def test_saveAsTextFile_tar(): - tempFile = tempfile.NamedTemporaryFile(delete=True) - tempFile.close() - Context().parallelize(range(10)).saveAsTextFile(tempFile.name + '.tar') - read_rdd = Context().textFile(tempFile.name + '.tar') +def test_saveAsTextFile_tar(tmp_path: pathlib.Path): + filename = str(tmp_path / 'textfile.txt.tar') + Context().parallelize(range(10)).saveAsTextFile(filename) + read_rdd = Context().textFile(filename) print(read_rdd.collect()) assert '5' in read_rdd.collect() @unittest.skipIf(hasattr(sys, 'pypy_version_info'), 'skip on pypy') -def test_saveAsTextFile_targz(): - tempFile = tempfile.NamedTemporaryFile(delete=True) - tempFile.close() - Context().parallelize(range(10)).saveAsTextFile(tempFile.name + '.tar.gz') - read_rdd = Context().textFile(tempFile.name + '.tar.gz') +def test_saveAsTextFile_targz(tmp_path: pathlib.Path): + filename = str(tmp_path / 'textfile.txt.tar.gz') + Context().parallelize(range(10)).saveAsTextFile(filename) + read_rdd = Context().textFile(filename) print(read_rdd.collect()) assert '5' in read_rdd.collect() -def test_saveAsTextFile_tarbz2(): - tempFile = tempfile.NamedTemporaryFile(delete=True) - tempFile.close() - Context().parallelize(range(10)).saveAsTextFile(tempFile.name + '.tar.bz2') - read_rdd = Context().textFile(tempFile.name + '.tar.bz2') +def test_saveAsTextFile_tarbz2(tmp_path: pathlib.Path): + filename = str(tmp_path / 'textfile.txt.tar.bz2') + Context().parallelize(range(10)).saveAsTextFile(filename) + read_rdd = Context().textFile(filename) print(read_rdd.collect()) assert '5' in read_rdd.collect() -def test_saveAsTextFile_gz(): - tempFile = tempfile.NamedTemporaryFile(delete=True) - tempFile.close() - Context().parallelize(range(10)).saveAsTextFile(tempFile.name + '.gz') - read_rdd = Context().textFile(tempFile.name + '.gz') +def test_saveAsTextFile_gz(tmp_path: pathlib.Path): + filename = str(tmp_path / 'textfile.txt.gz') + Context().parallelize(range(10)).saveAsTextFile(filename) + read_rdd = Context().textFile(filename) assert '5' in read_rdd.collect() -def test_saveAsTextFile_zip(): - tempFile = tempfile.NamedTemporaryFile(delete=True) - tempFile.close() - Context().parallelize(range(10)).saveAsTextFile(tempFile.name + '.zip') - read_rdd = Context().textFile(tempFile.name + '.zip') +def test_saveAsTextFile_zip(tmp_path: pathlib.Path): + filename = str(tmp_path / 'textfile.txt.zip') + Context().parallelize(range(10)).saveAsTextFile(filename) + read_rdd = Context().textFile(filename) print(read_rdd.collect()) assert '5' in read_rdd.collect() -def test_saveAsTextFile_bz2(): - tempFile = tempfile.NamedTemporaryFile(delete=True) - tempFile.close() - Context().parallelize(range(10)).saveAsTextFile(tempFile.name + '.bz2') - read_rdd = Context().textFile(tempFile.name + '.bz2') +def test_saveAsTextFile_bz2(tmp_path: pathlib.Path): + filename = str(tmp_path / 'textfile.txt.bz2') + Context().parallelize(range(10)).saveAsTextFile(filename) + read_rdd = Context().textFile(filename) assert '5' in read_rdd.collect() -def test_saveAsTextFile_lzma(): - tempFile = tempfile.NamedTemporaryFile(delete=True) - tempFile.close() - Context().parallelize(range(10)).saveAsTextFile(tempFile.name + '.lzma') - read_rdd = Context().textFile(tempFile.name + '.lzma') +def test_saveAsTextFile_lzma(tmp_path: pathlib.Path): + filename = str(tmp_path / 'textfile.txt.lzma') + Context().parallelize(range(10)).saveAsTextFile(filename) + read_rdd = Context().textFile(filename) assert '5' in read_rdd.collect() @@ -260,33 +251,27 @@ def test_pyspark_compatibility_txt(): kv = Context().textFile( f'{LOCAL_TEST_PATH}/pyspark/key_value.txt').collect() print(kv) - assert u"('a', 1)" in kv and u"('b', 2)" in kv and len(kv) == 2 + assert "('a', 1)" in kv and "('b', 2)" in kv and len(kv) == 2 def test_pyspark_compatibility_bz2(): kv = Context().textFile( f'{LOCAL_TEST_PATH}/pyspark/key_value.txt.bz2').collect() print(kv) - assert u"a\t1" in kv and u"b\t2" in kv and len(kv) == 2 + assert "a\t1" in kv and "b\t2" in kv and len(kv) == 2 def test_pyspark_compatibility_gz(): kv = Context().textFile( f'{LOCAL_TEST_PATH}/pyspark/key_value.txt.gz').collect() print(kv) - assert u"a\t1" in kv and u"b\t2" in kv and len(kv) == 2 + assert "a\t1" in kv and "b\t2" in kv and len(kv) == 2 -def test_local_regex_read(): +def test_local_regex_read(tmp_path: pathlib.Path): # was not working before 0.3.19 - tempFile = tempfile.NamedTemporaryFile(delete=True) - tempFile.close() - Context().parallelize(range(30), 30).saveAsTextFile(tempFile.name) - d = Context().textFile(tempFile.name + '/part-0000*').collect() + filename = str(tmp_path / 'textfile.txt') + Context().parallelize(range(30), 30).saveAsTextFile(filename) + d = Context().textFile(filename + '/part-0000*').collect() print(d) assert len(d) == 10 - - -if __name__ == '__main__': - logging.basicConfig(level=logging.DEBUG) - test_local_regex_read() diff --git a/pysparkling/utils.py b/pysparkling/utils.py index 465cb4c2e..a4d800b8b 100644 --- a/pysparkling/utils.py +++ b/pysparkling/utils.py @@ -228,12 +228,11 @@ def format_cell(value): if isinstance(value, Row): return f"[{', '.join(format_cell(sub_value) for sub_value in value)}]" if isinstance(value, dict): - return "[{0}]".format( - ", ".join( - f"{format_cell(key)} -> {format_cell(sub_value)}" - for key, sub_value in value.items() - ) + values = ", ".join( + f"{format_cell(key)} -> {format_cell(sub_value)}" + for key, sub_value in value.items() ) + return f"[{values}]" return str(value) diff --git a/scripts/benchmark_csv.py b/scripts/benchmark_csv.py index 6e68e9e61..9bb279b02 100644 --- a/scripts/benchmark_csv.py +++ b/scripts/benchmark_csv.py @@ -8,7 +8,7 @@ def create_csv(filename, lines=10000000, columns=12): - with open(filename, 'w') as f: + with open(filename, 'w', encoding='utf8') as f: column_names = ','.join(ascii_uppercase[i] for i in range(columns)) f.write(f'{column_names}\n') diff --git a/scripts/tcpperf_client.py b/scripts/tcpperf_client.py index 9cec6fc62..cd1fc2b0d 100644 --- a/scripts/tcpperf_client.py +++ b/scripts/tcpperf_client.py @@ -66,13 +66,12 @@ def r(self): return (s, v) def text(self): - return 'sensor{}|{}\n'.format(*self.r()).encode('utf8') + s, v = self.r() + return f'sensor{s}|{v}\n'.encode('utf8') def json(self): s, v = self.r() - return (json.dumps({ - f'sensor{s}': v, - }) + '\n').encode('utf8') + return (json.dumps({f'sensor{s}': v}) + '\n').encode('utf8') def bello(self): # 5 bytes diff --git a/scripts/tcpperf_plot.py b/scripts/tcpperf_plot.py index 8a9d2c307..77a9d92e1 100644 --- a/scripts/tcpperf_plot.py +++ b/scripts/tcpperf_plot.py @@ -17,7 +17,7 @@ def __init__(self, filename, x_label=None, y_label=None): self.frame() def read(self): - with open(self.filename, 'r') as f: + with open(self.filename, 'r', encoding='utf8') as f: reader = csv.reader(f) try: diff --git a/scripts/tcpperf_server.py b/scripts/tcpperf_server.py index 70cb63c88..d1412bdb8 100644 --- a/scripts/tcpperf_server.py +++ b/scripts/tcpperf_server.py @@ -98,7 +98,7 @@ def kv_from_struct(b): s, v = struct.unpack('If', b) return f'sensor{s}', v - with open('tests/tcpperf_messages.csv', 'w') as f: + with open('tests/tcpperf_messages.csv', 'w', encoding='utf8') as f: f.write('# messages, hello, text, json, bello, struct\n') server_1k = Server(pause=2, values=1000, processes=5) for n in reversed(N_CONNECTIONS_1K): @@ -112,7 +112,7 @@ def kv_from_struct(b): ) f.write(', '.join(f'{d}' for d in data) + '\n') - with open('tests/tcpperf_connections.csv', 'w') as f: + with open('tests/tcpperf_connections.csv', 'w', encoding='utf8') as f: f.write('# messages, hello, text, json, bello, struct\n') server = Server() for n in reversed(N_CONNECTIONS): diff --git a/setup.py b/setup.py index 262d7a714..399a7f894 100644 --- a/setup.py +++ b/setup.py @@ -8,13 +8,12 @@ packages=find_packages(), license='MIT', description='Pure Python implementation of the Spark RDD interface.', - long_description=open('README.rst').read(), + long_description=open('README.rst', 'r', encoding='utf8').read(), author='pysparkling contributors', url='https://github.com/svenkreiss/pysparkling', install_requires=[ 'boto>=2.36.0', - 'future>=0.15', 'requests>=2.6.0', 'pytz>=2019.3', 'python-dateutil>=2.8.0' @@ -30,7 +29,6 @@ 'tests': [ 'backports.tempfile==1.0rc1', 'cloudpickle>=0.1.0', - 'futures>=3.0.1', 'pylint', 'pylzma', 'memory-profiler>=0.47', From fc48af971b2658ed54282a5da6c1a1142836287c Mon Sep 17 00:00:00 2001 From: Sven Kreiss Date: Sat, 12 Nov 2022 17:31:03 +0100 Subject: [PATCH 2/7] cleanup tests --- .github/workflows/tests.yml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index a83ab748f..4201f17be 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -47,11 +47,6 @@ jobs: run: | python -m pip install -e ".[sql]" - - name: Lint - if: matrix.python == '3.9' - # https://github.com/PyCQA/pylint/issues/3882 - run: pylint pysparkling scripts --disable=fixme,unsubscriptable-object - - name: Lint if: matrix.python != '3.9' run: pylint pysparkling scripts --disable=fixme From e64da6231bb9a0990891e3996da8e6bf00658740 Mon Sep 17 00:00:00 2001 From: Sven Kreiss Date: Sat, 12 Nov 2022 17:50:31 +0100 Subject: [PATCH 3/7] boto optional (#165) --- README.rst | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index 6425e1d06..0239e42f1 100644 --- a/README.rst +++ b/README.rst @@ -40,7 +40,7 @@ Install .. code-block:: bash - pip install pysparkling[s3,hdfs,streaming] + python3 -m pip install "pysparkling[s3,hdfs,streaming]" `Documentation `_: diff --git a/setup.py b/setup.py index 399a7f894..15fee1a3a 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,6 @@ url='https://github.com/svenkreiss/pysparkling', install_requires=[ - 'boto>=2.36.0', 'requests>=2.6.0', 'pytz>=2019.3', 'python-dateutil>=2.8.0' @@ -21,6 +20,7 @@ extras_require={ 'hdfs': ['hdfs>=2.0.0'], 'performance': ['matplotlib>=1.5.3'], + 's3': ['boto>=2.36.0'], 'streaming': ['tornado>=4.3'], 'sql': [ 'numpy', From 1ddf590b5363b85b25e93456486ced780f178a83 Mon Sep 17 00:00:00 2001 From: Sven Kreiss Date: Sun, 13 Nov 2022 18:04:20 +0100 Subject: [PATCH 4/7] make requests optional (#166) --- README.rst | 2 +- setup.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/README.rst b/README.rst index 0239e42f1..893a876b2 100644 --- a/README.rst +++ b/README.rst @@ -40,7 +40,7 @@ Install .. code-block:: bash - python3 -m pip install "pysparkling[s3,hdfs,streaming]" + python3 -m pip install "pysparkling[s3,hdfs,http,streaming]" `Documentation `_: diff --git a/setup.py b/setup.py index 15fee1a3a..396455763 100644 --- a/setup.py +++ b/setup.py @@ -13,12 +13,12 @@ url='https://github.com/svenkreiss/pysparkling', install_requires=[ - 'requests>=2.6.0', 'pytz>=2019.3', 'python-dateutil>=2.8.0' ], extras_require={ 'hdfs': ['hdfs>=2.0.0'], + 'http': ['requests>=2.6.0'], 'performance': ['matplotlib>=1.5.3'], 's3': ['boto>=2.36.0'], 'streaming': ['tornado>=4.3'], @@ -29,13 +29,14 @@ 'tests': [ 'backports.tempfile==1.0rc1', 'cloudpickle>=0.1.0', + 'isort', 'pylint', 'pylzma', 'memory-profiler>=0.47', 'pycodestyle', 'pytest', 'pytest-cov', - 'isort', + 'requests>=2.6.0', 'tornado>=4.3', ], 'scripts': [ From 14c202d32476c8322d2e5ba6bdc833a1270d8ab4 Mon Sep 17 00:00:00 2001 From: Sven Kreiss Date: Sun, 13 Nov 2022 19:16:47 +0100 Subject: [PATCH 5/7] test Python 3.10 and 3.11 (#167) * properly test for exception * move profile_textfile.py --- .github/workflows/tests.yml | 10 +++++----- pysparkling/broadcast.py | 4 ---- pysparkling/rdd.py | 14 ++++++++------ pysparkling/tests/test_broadcast.py | 17 +++++++++++++++++ .../tests => scripts}/profile_textfile.py | 0 5 files changed, 30 insertions(+), 15 deletions(-) create mode 100644 pysparkling/tests/test_broadcast.py rename {pysparkling/tests => scripts}/profile_textfile.py (100%) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 4201f17be..9c1522f18 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -9,7 +9,7 @@ jobs: strategy: matrix: os: [ ubuntu-latest, macos-latest, windows-latest ] - python: [ 3.7, 3.8, 3.9 ] + python: [ 3.7, 3.8, 3.9, "3.10", "3.11" ] steps: - uses: actions/checkout@v3 @@ -36,12 +36,12 @@ jobs: run: | isort . --check --diff - - name: Test pysparkling.rdd - run: python -m pytest --pyargs pysparkling.rdd -vv + - name: Test pysparkling/rdd.py + run: python -m pytest pysparkling/rdd.py -vv - - name: Test pysparkling.tests + - name: Test pysparkling/tests if: matrix.os == 'ubuntu-latest' # because of timing sensitivity in stream tests - run: python -m pytest --pyargs pysparkling.tests -vv + run: python -m pytest pysparkling/tests -vv - name: Install SQL Dependencies run: | diff --git a/pysparkling/broadcast.py b/pysparkling/broadcast.py index 8e7f06851..3798e3ee0 100644 --- a/pysparkling/broadcast.py +++ b/pysparkling/broadcast.py @@ -34,10 +34,6 @@ class Broadcast: [1, 2, 3, 4, 5] >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect() [1, 2, 3, 4, 5, 1, 2, 3, 4, 5] - >>> b.value += [1] - Traceback (most recent call last): - ... - AttributeError: can't set attribute """ def __init__(self, sc=None, value=None): self._value = value diff --git a/pysparkling/rdd.py b/pysparkling/rdd.py index 3cbcafe7c..303088d6d 100644 --- a/pysparkling/rdd.py +++ b/pysparkling/rdd.py @@ -1209,13 +1209,15 @@ def reduce(self, f): """ _empty = object() + def f_without_empty(a, b): + if a is _empty: + return b + if b is _empty: + return a + return f(a, b) + def reducer(values): - try: - return functools.reduce(f, (v for v in values if v is not _empty)) - except TypeError as e: - if e.args[0] == "reduce() of empty sequence with no initial value": - return _empty - raise e + return functools.reduce(f_without_empty, values, _empty) result = self.context.runJob( self, diff --git a/pysparkling/tests/test_broadcast.py b/pysparkling/tests/test_broadcast.py new file mode 100644 index 000000000..a6055d7f2 --- /dev/null +++ b/pysparkling/tests/test_broadcast.py @@ -0,0 +1,17 @@ +import unittest + +import pysparkling + + +class BroadcastTest(unittest.TestCase): + def setUp(self) -> None: + self.context = pysparkling.Context() + + def testSimple(self): + b = self.context.broadcast([1, 2, 3, 4, 5]) + self.assertEqual(b.value, [1, 2, 3, 4, 5]) + + def testAppendFails(self): + b = self.context.broadcast([1, 2, 3, 4, 5]) + with self.assertRaises(AttributeError): + b.value += [1] # type: ignore diff --git a/pysparkling/tests/profile_textfile.py b/scripts/profile_textfile.py similarity index 100% rename from pysparkling/tests/profile_textfile.py rename to scripts/profile_textfile.py From d22966f8e70193daf70eae641be9e6d9d3eb24e1 Mon Sep 17 00:00:00 2001 From: Sven Kreiss Date: Sun, 13 Nov 2022 19:28:07 +0100 Subject: [PATCH 6/7] update info --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 396455763..1ede7b8c9 100644 --- a/setup.py +++ b/setup.py @@ -53,10 +53,11 @@ 'License :: OSI Approved :: MIT License', 'Operating System :: OS Independent', 'Programming Language :: Python', - 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10', + 'Programming Language :: Python :: 3.11', 'Programming Language :: Python :: Implementation :: PyPy', ] ) From 431df12873bd9cf12af5f085cd7e283aabdcf097 Mon Sep 17 00:00:00 2001 From: Sven Kreiss Date: Sun, 13 Nov 2022 19:39:01 +0100 Subject: [PATCH 7/7] v0.6.2 --- HISTORY.rst | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/HISTORY.rst b/HISTORY.rst index 35830e7e1..e5c1ee38b 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -1,7 +1,10 @@ Changelog ========= -* `master `_ +* `main `_ +* `v0.6.2 `_ (2019-11-13) + * make dependencies optional: boto, requests + * compatibility * `v0.6.0 `_ (2019-07-13) * Broadcast, Accumulator and AccumulatorParam by @alexprengere * support for increasing partition numbers in coalesce and repartition by @tools4origins