Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merging new commits. #12

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ jobs:
name: Build Python source distribution
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
with:
fetch-depth: 0

- uses: actions/setup-python@v2
- uses: actions/setup-python@v4
name: Install Python
with:
python-version: '3.7'

- name: Build sdist
run: python setup.py sdist

- uses: actions/upload-artifact@v2
- uses: actions/upload-artifact@v3
with:
path: dist/*.tar.gz

Expand All @@ -43,7 +43,7 @@ jobs:
# alternatively, to publish when a GitHub Release is created, use the following rule:
if: github.event_name == 'release' && github.event.action == 'published'
steps:
- uses: actions/download-artifact@v2
- uses: actions/download-artifact@v3
with:
name: artifact
path: dist
Expand Down
25 changes: 10 additions & 15 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@ jobs:
strategy:
matrix:
os: [ ubuntu-latest, macos-latest, windows-latest ]
python: [ 3.6, 3.7, 3.8, 3.9 ]
python: [ 3.7, 3.8, 3.9, "3.10", "3.11" ]

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
with:
fetch-depth: 0

- name: Set up Python ${{ matrix.python }}
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python }}

- name: Install
run: |
python -m pip install --upgrade pip
python -m pip install -e ".[tests,scripts]" --use-deprecated=legacy-resolver
python -m pip install --upgrade pip setuptools
python -m pip install -e ".[tests,scripts]"

- name: Print environment
run: |
Expand All @@ -36,21 +36,16 @@ jobs:
run: |
isort . --check --diff

- name: Test pysparkling.rdd
run: python -m pytest --pyargs pysparkling.rdd -vv
- name: Test pysparkling/rdd.py
run: python -m pytest pysparkling/rdd.py -vv

- name: Test pysparkling.tests
- name: Test pysparkling/tests
if: matrix.os == 'ubuntu-latest' # because of timing sensitivity in stream tests
run: python -m pytest --pyargs pysparkling.tests -vv
run: python -m pytest pysparkling/tests -vv

- name: Install SQL Dependencies
run: |
python -m pip install -e ".[sql]" --use-deprecated=legacy-resolver

- name: Lint
if: matrix.python == '3.9'
# https://github.com/PyCQA/pylint/issues/3882
run: pylint pysparkling scripts --disable=fixme,unsubscriptable-object
python -m pip install -e ".[sql]"

- name: Lint
if: matrix.python != '3.9'
Expand Down
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
variable-rgx=[a-z0-9_]{1,30}$
good-names=log

disable=invalid-name,unused-argument,too-few-public-methods,no-self-use,missing-docstring,logging-format-interpolation,too-many-instance-attributes,duplicate-code,too-many-public-methods,too-many-arguments,protected-access,too-many-lines
disable=invalid-name,unused-argument,too-few-public-methods,missing-docstring,logging-format-interpolation,too-many-instance-attributes,duplicate-code,too-many-public-methods,too-many-arguments,protected-access,too-many-lines,missing-timeout,unnecessary-lambda-assignment

[FORMAT]
max-line-length=119
Expand Down
5 changes: 4 additions & 1 deletion HISTORY.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
Changelog
=========

* `master <https://github.com/svenkreiss/pysparkling/compare/v0.6.0...master>`_
* `main <https://github.com/svenkreiss/pysparkling/compare/v0.6.2...main>`_
* `v0.6.2 <https://github.com/svenkreiss/pysparkling/compare/v0.6.0...v0.6.2>`_ (2019-11-13)
* make dependencies optional: boto, requests
* compatibility
* `v0.6.0 <https://github.com/svenkreiss/pysparkling/compare/v0.5.0...v0.6.0>`_ (2019-07-13)
* Broadcast, Accumulator and AccumulatorParam by @alexprengere
* support for increasing partition numbers in coalesce and repartition by @tools4origins
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Install

.. code-block:: bash

pip install pysparkling[s3,hdfs,streaming]
python3 -m pip install "pysparkling[s3,hdfs,http,streaming]"


`Documentation <https://pysparkling.trivial.io>`_:
Expand Down
4 changes: 0 additions & 4 deletions pysparkling/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ class Broadcast:
[1, 2, 3, 4, 5]
>>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
>>> b.value += [1]
Traceback (most recent call last):
...
AttributeError: can't set attribute
"""
def __init__(self, sc=None, value=None):
self._value = value
Expand Down
32 changes: 18 additions & 14 deletions pysparkling/fileio/fs/file_system.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import io
import logging
import typing as t

log = logging.getLogger(__name__)

Expand All @@ -8,55 +10,56 @@ class FileSystem:

:param str file_name: File name.
"""
def __init__(self, file_name):
self.file_name = file_name
def __init__(self, file_name: str):
self.file_name: str = file_name

@staticmethod
def resolve_filenames(expr):
def resolve_filenames(expr: str) -> t.List[str]:
"""Resolve the given glob-like expression to filenames.

:rtype: list
"""
log.error('Cannot resolve: %s', expr)
raise NotImplementedError

@staticmethod
def resolve_content(expr):
def resolve_content(expr: str) -> t.List[str]:
"""Return all the files matching expr or in a folder matching expr

:rtype: list
"""
log.error('Cannot resolve: %s', expr)
raise NotImplementedError

def exists(self):
def exists(self) -> bool:
"""Check whether the given file_name exists.

:rtype: bool
"""
log.warning('Could not determine whether %s exists due to unhandled scheme.', self.file_name)
raise NotImplementedError

def load(self):
"""Load a file to a stream.

:rtype: io.BytesIO
"""
def load(self) -> io.BytesIO:
"""Load a file to a stream."""
log.error('Cannot load: %s', self.file_name)
raise NotImplementedError

def load_text(self, encoding='utf8', encoding_errors='ignore'):
def load_text(self, encoding: str = 'utf8', encoding_errors: str = 'ignore') -> io.StringIO:
"""Load a file to a stream.

:param str encoding: Text encoding.
:param str encoding_errors: How to handle encoding errors.

:rtype: io.StringIO
"""
log.error('Cannot load: %s', self.file_name)
raise NotImplementedError

def dump(self, stream):
def dump(self, stream: io.BytesIO):
"""Dump a stream to a file.

:param io.BytesIO stream: Input tream.
"""
log.error('Cannot dump: %s', self.file_name)
raise NotImplementedError

def make_public(self, recursive=False):
"""Make the file public (only on some file systems).
Expand All @@ -65,3 +68,4 @@ def make_public(self, recursive=False):
:rtype: FileSystem
"""
log.warning('Cannot make %s public.', self.file_name)
raise NotImplementedError
16 changes: 9 additions & 7 deletions pysparkling/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1209,13 +1209,15 @@ def reduce(self, f):
"""
_empty = object()

def f_without_empty(a, b):
if a is _empty:
return b
if b is _empty:
return a
return f(a, b)

def reducer(values):
try:
return functools.reduce(f, (v for v in values if v is not _empty))
except TypeError as e:
if e.args[0] == "reduce() of empty sequence with no initial value":
return _empty
raise e
return functools.reduce(f_without_empty, values, _empty)

result = self.context.runJob(
self,
Expand Down Expand Up @@ -1840,7 +1842,7 @@ def takeSample(self, withReplacement, num, seed=None):
maxSampleSize = sys.maxsize - int(numStDev * math.sqrt(sys.maxsize))
if num > maxSampleSize:
raise ValueError(
"Sample size cannot be greater than %d." % maxSampleSize)
f"Sample size cannot be greater than {maxSampleSize}.")

fraction = RDD._computeFractionForSampleSize(num, initial_count, withReplacement)
samples = self.sample(withReplacement, fraction, seed).collect()
Expand Down
9 changes: 4 additions & 5 deletions pysparkling/sql/casts.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,11 @@ def cast_map(value, from_type, options):
if sub_value is not None else None))
for key, sub_value in value.items()
]
return "[{0}]".format(
", ".join("{0} ->{1}".format(
casted_key,
f" {casted_value}" if casted_value is not None else ""
) for casted_key, casted_value in casted_values)
joined_values = ", ".join(
f"{casted_key} ->{(' ' + casted_value) if casted_value is not None else ''}"
for casted_key, casted_value in casted_values
)
return f"[{joined_values}]"


def cast_sequence(value, from_type, options):
Expand Down
2 changes: 1 addition & 1 deletion pysparkling/sql/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ def alias(self, *alias, **kwargs):
"""

metadata = kwargs.pop('metadata', None)
assert not kwargs, 'Unexpected kwargs where passed: %s' % kwargs
assert not kwargs, f'Unexpected kwargs where passed: {kwargs}'

if metadata:
# pylint: disable=W0511
Expand Down
3 changes: 1 addition & 2 deletions pysparkling/sql/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ def unset(self, key):

def _checkType(self, obj, identifier):
if not isinstance(obj, str):
raise TypeError("expected %s '%s' to be a string (was '%s')" %
(identifier, obj, type(obj).__name__))
raise TypeError(f"expected {identifier} '{obj}' to be a string (was '{type(obj).__name__}')")

def isModifiable(self, key):
raise NotImplementedError("pysparkling does not support yet this feature")
20 changes: 10 additions & 10 deletions pysparkling/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
from .internal_utils.joins import CROSS_JOIN, JOIN_TYPES
from .internals import CUBE_TYPE, InternalGroupedDataFrame, ROLLUP_TYPE
from .types import (
_check_series_convert_timestamps_local_tz, ByteType, FloatType, IntegerType, IntegralType, ShortType,
TimestampType
_check_series_convert_timestamps_local_tz, ByteType, FloatType, IntegerType, IntegralType, ShortType, TimestampType
)
from .utils import AnalysisException, IllegalArgumentException, require_minimum_pandas_version

Expand Down Expand Up @@ -187,7 +186,8 @@ def show(self, n=20, truncate=True, vertical=False):
print(self._jdf.showString(n, int(truncate), vertical))

def __repr__(self):
return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes))
values = ", ".join(f"{c1}: {c2}" for c1, c2 in self.dtypes)
return f"DataFrame[{values}]"

def checkpoint(self, eager=True):
raise NotImplementedError("Streaming is not supported in PySparkling")
Expand Down Expand Up @@ -510,7 +510,7 @@ def sample(self, withReplacement=None, fraction=None, seed=None):
raise TypeError(
"withReplacement (optional), fraction (required) and seed (optional)"
" should be a bool, float and number; however, "
"got [%s]." % ", ".join(argtypes))
f"got [{', '.join(argtypes)}].")

if is_withReplacement_omitted_args:
if fraction is not None:
Expand Down Expand Up @@ -829,7 +829,7 @@ def _sort_cols(cols, kwargs):
elif isinstance(ascending, list):
cols = [jc if asc else jc.desc() for asc, jc in zip(ascending, cols)]
else:
raise TypeError("ascending can only be boolean or list, but got %s" % type(ascending))
raise TypeError(f"ascending can only be boolean or list, but got {type(ascending)}")

if kwargs:
raise TypeError(f"Unrecognized arguments: {kwargs}")
Expand Down Expand Up @@ -961,7 +961,7 @@ def __getitem__(self, item):
return self.select(*item)
if isinstance(item, int):
return Column(FieldAsExpression(self._jdf.bound_schema[item]))
raise TypeError("unexpected item type: %s" % type(item))
raise TypeError(f"unexpected item type: {type(item)}")

def __getattr__(self, name):
if name.startswith("_"):
Expand Down Expand Up @@ -1452,7 +1452,7 @@ def approxQuantile(self, col, probabilities, relativeError):
[[2.0, 2.0, 5.0]]
"""
if not isinstance(col, (str, list, tuple)):
raise ValueError("col should be a string, list or tuple, but got %r" % type(col))
raise ValueError(f"col should be a string, list or tuple, but got {repr(type(col))}")

isStr = isinstance(col, str)

Expand All @@ -1463,7 +1463,7 @@ def approxQuantile(self, col, probabilities, relativeError):

for c in col:
if not isinstance(c, str):
raise ValueError("columns should be strings, but got %r" % type(c))
raise ValueError(f"columns should be strings, but got {repr(type(c))}")

if not isinstance(probabilities, (list, tuple)):
raise ValueError("probabilities should be a list or tuple")
Expand Down Expand Up @@ -1618,8 +1618,8 @@ def toDF(self, *cols):

def transform(self, func):
result = func(self)
assert isinstance(result, DataFrame), "Func returned an instance of type [%s], " \
"should have been DataFrame." % type(result)
assert isinstance(result, DataFrame), (
f"Func returned an instance of type [{type(result)}], should have been DataFrame.")
return result

def toPandas(self):
Expand Down
6 changes: 3 additions & 3 deletions pysparkling/sql/internal_utils/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def save(self):

success_path = os.path.join(output_path, "_SUCCESS")

with open(success_path, "w"):
with open(success_path, "w", encoding="utf8"):
pass

def preformat(self, row, schema):
Expand Down Expand Up @@ -302,7 +302,7 @@ def write(self, items, ref_value, schema):
# - escape
# - escapeQuotes

with open(file_path, "w") as f:
with open(file_path, "w", encoding="utf8") as f:
writer = csv.writer(
f,
delimiter=self.sep,
Expand Down Expand Up @@ -367,6 +367,6 @@ def write(self, items, ref_value, schema):
if not os.path.exists(partition_folder):
os.makedirs(partition_folder)

with open(file_path, "a") as f:
with open(file_path, "a", encoding="utf8") as f:
f.writelines(items)
return len(items)
4 changes: 2 additions & 2 deletions pysparkling/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ def option(self, key, value):
return self

def options(self, **options):
for k in options:
self._jwrite.option(k, options[k])
for k, v in options.items():
self._jwrite.option(k, v)
return self

def partitionBy(self, *cols):
Expand Down
Loading