Skip to content

Commit

Permalink
[EH/SB] ran black (#38210)
Browse files Browse the repository at this point in the history
* ran black

* fix pylint

* black sb

* pylint

* spacing
  • Loading branch information
l0lawrence authored Oct 31, 2024
1 parent b45b0ef commit 7212809
Show file tree
Hide file tree
Showing 369 changed files with 11,862 additions and 14,123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(
*,
amqp_transport: AmqpTransport,
max_buffer_length: int,
max_wait_time: float = 1
max_wait_time: float = 1,
):
self._buffered_queue: queue.Queue = queue.Queue()
self._max_buffer_len = max_buffer_length
Expand All @@ -59,9 +59,7 @@ def start(self):
self._running = True
if self._max_wait_time:
self._last_send_time = time.time()
self._check_max_wait_time_future = self._executor.submit(
self.check_max_wait_time_worker
)
self._check_max_wait_time_future = self._executor.submit(self.check_max_wait_time_worker)

def stop(self, flush=True, timeout_time=None, raise_error=False):
self._running = False
Expand All @@ -80,9 +78,7 @@ def stop(self, flush=True, timeout_time=None, raise_error=False):
try:
self._check_max_wait_time_future.result(remain_timeout)
except Exception as exc: # pylint: disable=broad-except
_LOGGER.warning(
"Partition %r stopped with error %r", self.partition_id, exc
)
_LOGGER.warning("Partition %r stopped with error %r", self.partition_id, exc)
self._producer.close()

def put_events(self, events, timeout_time=None):
Expand All @@ -102,9 +98,7 @@ def put_events(self, events, timeout_time=None):
# flush the buffer
self.flush(timeout_time=timeout_time)
if timeout_time and time.time() > timeout_time:
raise OperationTimeoutError(
"Failed to enqueue events into buffer due to timeout."
)
raise OperationTimeoutError("Failed to enqueue events into buffer due to timeout.")
with self._lock:
try:
# add single event into current batch
Expand Down Expand Up @@ -157,9 +151,7 @@ def flush(self, timeout_time=None, raise_error=True):
_LOGGER.info("Partition %r is sending.", self.partition_id)
self._producer.send(
batch,
timeout=timeout_time - time.time()
if timeout_time
else None,
timeout=timeout_time - time.time() if timeout_time else None,
)
_LOGGER.info(
"Partition %r sending %r events succeeded.",
Expand All @@ -184,14 +176,10 @@ def flush(self, timeout_time=None, raise_error=True):
finally:
self._cur_buffered_len -= len(batch)
else:
_LOGGER.info(
"Partition %r fails to flush due to timeout.", self.partition_id
)
_LOGGER.info("Partition %r fails to flush due to timeout.", self.partition_id)
if raise_error:
raise OperationTimeoutError(
"Failed to flush {!r} within {}".format(
self.partition_id, timeout_time
)
"Failed to flush {!r} within {}".format(self.partition_id, timeout_time)
)
break
# after finishing flushing, reset cur batch and put it into the buffer
Expand All @@ -202,9 +190,7 @@ def check_max_wait_time_worker(self):
while self._running:
if self._cur_buffered_len > 0:
now_time = time.time()
_LOGGER.info(
"Partition %r worker is checking max_wait_time.", self.partition_id
)
_LOGGER.info("Partition %r worker is checking max_wait_time.", self.partition_id)
# flush the partition if the producer is running beyond the waiting time
# or the buffer is at max capacity
if (now_time - self._last_send_time > self._max_wait_time) or (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(
amqp_transport: AmqpTransport,
max_buffer_length: int = 1500,
max_wait_time: float = 1,
executor: Optional[Union[ThreadPoolExecutor, int]] = None
executor: Optional[Union[ThreadPoolExecutor, int]] = None,
):
self._buffered_producers: Dict[str, BufferedProducer] = {}
self._partition_ids: List[str] = partitions
Expand Down Expand Up @@ -62,20 +62,14 @@ def _get_partition_id(self, partition_id, partition_key):
if partition_id:
if partition_id not in self._partition_ids:
raise ConnectError(
"Invalid partition {} for the event hub {}".format(
partition_id, self._eventhub_name
)
"Invalid partition {} for the event hub {}".format(partition_id, self._eventhub_name)
)
return partition_id
if isinstance(partition_key, str):
return self._partition_resolver.get_partition_id_by_partition_key(
partition_key
)
return self._partition_resolver.get_partition_id_by_partition_key(partition_key)
return self._partition_resolver.get_next_partition_id()

def enqueue_events(
self, events, *, partition_id=None, partition_key=None, timeout_time=None
):
def enqueue_events(self, events, *, partition_id=None, partition_key=None, timeout_time=None):
pid = self._get_partition_id(partition_id, partition_key)
with self._lock:
try:
Expand All @@ -90,7 +84,7 @@ def enqueue_events(
executor=self._executor,
max_wait_time=self._max_wait_time,
max_buffer_length=self._max_buffer_length,
amqp_transport = self._amqp_transport,
amqp_transport=self._amqp_transport,
)
buffered_producer.start()
self._buffered_producers[pid] = buffered_producer
Expand All @@ -105,9 +99,7 @@ def flush(self, timeout_time=None):
futures.append(
(
pid,
self._executor.submit(
producer.flush, timeout_time=timeout_time
),
self._executor.submit(producer.flush, timeout_time=timeout_time),
)
)

Expand All @@ -123,9 +115,7 @@ def flush(self, timeout_time=None):
_LOGGER.info("Flushing all partitions succeeded")
return

_LOGGER.warning(
"Flushing all partitions partially failed with result %r.", exc_results
)
_LOGGER.warning("Flushing all partitions partially failed with result %r.", exc_results)
raise EventDataSendError(
message="Flushing all partitions partially failed, failed partitions are {!r}"
" Exception details are {!r}".format(exc_results.keys(), exc_results)
Expand Down Expand Up @@ -166,9 +156,7 @@ def close(self, *, flush=True, timeout_time=None, raise_error=False):
if raise_error:
raise EventHubError(
message="Stopping all partitions partially failed, failed partitions are {!r}"
" Exception details are {!r}".format(
exc_results.keys(), exc_results
)
" Exception details are {!r}".format(exc_results.keys(), exc_results)
)

if not self._existing_executor:
Expand All @@ -182,6 +170,4 @@ def get_buffered_event_count(self, pid):

@property
def total_buffered_event_count(self):
return sum(
(self.get_buffered_event_count(pid) for pid in self._buffered_producers)
)
return sum((self.get_buffered_event_count(pid) for pid in self._buffered_producers))
Original file line number Diff line number Diff line change
Expand Up @@ -109,136 +109,46 @@ def compute_hash(data, init_val=0, init_val2=0):

p = 0 # string offset
while lenpos > 12:
a += (
ord(data[p + 0])
+ (ord(data[p + 1]) << 8)
+ (ord(data[p + 2]) << 16)
+ (ord(data[p + 3]) << 24)
)
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16) + (ord(data[p + 3]) << 24)
a &= 0xFFFFFFFF
b += (
ord(data[p + 4])
+ (ord(data[p + 5]) << 8)
+ (ord(data[p + 6]) << 16)
+ (ord(data[p + 7]) << 24)
)
b += ord(data[p + 4]) + (ord(data[p + 5]) << 8) + (ord(data[p + 6]) << 16) + (ord(data[p + 7]) << 24)
b &= 0xFFFFFFFF
c += (
ord(data[p + 8])
+ (ord(data[p + 9]) << 8)
+ (ord(data[p + 10]) << 16)
+ (ord(data[p + 11]) << 24)
)
c += ord(data[p + 8]) + (ord(data[p + 9]) << 8) + (ord(data[p + 10]) << 16) + (ord(data[p + 11]) << 24)
c &= 0xFFFFFFFF
a, b, c = mix(a, b, c)
p += 12
lenpos -= 12

if lenpos == 12:
c += (
ord(data[p + 8])
+ (ord(data[p + 9]) << 8)
+ (ord(data[p + 10]) << 16)
+ (ord(data[p + 11]) << 24)
)
b += (
ord(data[p + 4])
+ (ord(data[p + 5]) << 8)
+ (ord(data[p + 6]) << 16)
+ (ord(data[p + 7]) << 24)
)
a += (
ord(data[p + 0])
+ (ord(data[p + 1]) << 8)
+ (ord(data[p + 2]) << 16)
+ (ord(data[p + 3]) << 24)
)
c += ord(data[p + 8]) + (ord(data[p + 9]) << 8) + (ord(data[p + 10]) << 16) + (ord(data[p + 11]) << 24)
b += ord(data[p + 4]) + (ord(data[p + 5]) << 8) + (ord(data[p + 6]) << 16) + (ord(data[p + 7]) << 24)
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16) + (ord(data[p + 3]) << 24)
if lenpos == 11:
c += ord(data[p + 8]) + (ord(data[p + 9]) << 8) + (ord(data[p + 10]) << 16)
b += (
ord(data[p + 4])
+ (ord(data[p + 5]) << 8)
+ (ord(data[p + 6]) << 16)
+ (ord(data[p + 7]) << 24)
)
a += (
ord(data[p + 0])
+ (ord(data[p + 1]) << 8)
+ (ord(data[p + 2]) << 16)
+ (ord(data[p + 3]) << 24)
)
b += ord(data[p + 4]) + (ord(data[p + 5]) << 8) + (ord(data[p + 6]) << 16) + (ord(data[p + 7]) << 24)
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16) + (ord(data[p + 3]) << 24)
if lenpos == 10:
c += ord(data[p + 8]) + (ord(data[p + 9]) << 8)
b += (
ord(data[p + 4])
+ (ord(data[p + 5]) << 8)
+ (ord(data[p + 6]) << 16)
+ (ord(data[p + 7]) << 24)
)
a += (
ord(data[p + 0])
+ (ord(data[p + 1]) << 8)
+ (ord(data[p + 2]) << 16)
+ (ord(data[p + 3]) << 24)
)
b += ord(data[p + 4]) + (ord(data[p + 5]) << 8) + (ord(data[p + 6]) << 16) + (ord(data[p + 7]) << 24)
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16) + (ord(data[p + 3]) << 24)
if lenpos == 9:
c += ord(data[p + 8])
b += (
ord(data[p + 4])
+ (ord(data[p + 5]) << 8)
+ (ord(data[p + 6]) << 16)
+ (ord(data[p + 7]) << 24)
)
a += (
ord(data[p + 0])
+ (ord(data[p + 1]) << 8)
+ (ord(data[p + 2]) << 16)
+ (ord(data[p + 3]) << 24)
)
b += ord(data[p + 4]) + (ord(data[p + 5]) << 8) + (ord(data[p + 6]) << 16) + (ord(data[p + 7]) << 24)
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16) + (ord(data[p + 3]) << 24)
if lenpos == 8:
b += (
ord(data[p + 4])
+ (ord(data[p + 5]) << 8)
+ (ord(data[p + 6]) << 16)
+ (ord(data[p + 7]) << 24)
)
a += (
ord(data[p + 0])
+ (ord(data[p + 1]) << 8)
+ (ord(data[p + 2]) << 16)
+ (ord(data[p + 3]) << 24)
)
b += ord(data[p + 4]) + (ord(data[p + 5]) << 8) + (ord(data[p + 6]) << 16) + (ord(data[p + 7]) << 24)
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16) + (ord(data[p + 3]) << 24)
if lenpos == 7:
b += ord(data[p + 4]) + (ord(data[p + 5]) << 8) + (ord(data[p + 6]) << 16)
a += (
ord(data[p + 0])
+ (ord(data[p + 1]) << 8)
+ (ord(data[p + 2]) << 16)
+ (ord(data[p + 3]) << 24)
)
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16) + (ord(data[p + 3]) << 24)
if lenpos == 6:
b += (ord(data[p + 5]) << 8) + ord(data[p + 4])
a += (
ord(data[p + 0])
+ (ord(data[p + 1]) << 8)
+ (ord(data[p + 2]) << 16)
+ (ord(data[p + 3]) << 24)
)
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16) + (ord(data[p + 3]) << 24)
if lenpos == 5:
b += ord(data[p + 4])
a += (
ord(data[p + 0])
+ (ord(data[p + 1]) << 8)
+ (ord(data[p + 2]) << 16)
+ (ord(data[p + 3]) << 24)
)
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16) + (ord(data[p + 3]) << 24)
if lenpos == 4:
a += (
ord(data[p + 0])
+ (ord(data[p + 1]) << 8)
+ (ord(data[p + 2]) << 16)
+ (ord(data[p + 3]) << 24)
)
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16) + (ord(data[p + 3]) << 24)
if lenpos == 3:
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16)
if lenpos == 2:
Expand Down
Loading

0 comments on commit 7212809

Please sign in to comment.