Skip to content

Commit

Permalink
Fix RF broadcast feature (rapidsai#3833)
Browse files Browse the repository at this point in the history
Answers rapidsai#3820.

This PR fixes the broadcast feature of the Random Forest estimator. The weights used by the reduction step were generated incorrectly. Indeed, the right values are to be deducted, for each chunk to be predicted, by the number estimators trained by the specific worker holding that chunk. The values wrongly used previously were the number of estimators held by each worker in the order of their construction.

Authors:
  - Victor Lafargue (https://github.com/viclafargue)

Approvers:
  - Dante Gama Dessavre (https://github.com/dantegd)

URL: rapidsai#3833
  • Loading branch information
viclafargue authored May 10, 2021
1 parent b59be2e commit ce96f3d
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions python/cuml/dask/ensemble/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def _create_model(self, model_func,
self.client = get_client(client)
if workers is None:
# Default to all workers
workers = self.client.scheduler_info()['workers'].keys()
workers = list(self.client.scheduler_info()['workers'].keys())
self.workers = workers
self._set_internal_model(None)
self.active_workers = list()
Expand Down Expand Up @@ -130,6 +130,13 @@ def _fit(self, model, dataset, convert_dtype, broadcast_data):
workers=[worker],
pure=False)
)

self.n_active_estimators_per_worker = []
for worker in data.worker_to_parts.keys():
n = self.workers.index(worker)
n_est = self.n_estimators_per_worker[n]
self.n_active_estimators_per_worker.append(n_est)

if len(self.workers) > len(self.active_workers):
if self.ignore_empty_partitions:
curent_estimators = self.n_estimators / \
Expand Down Expand Up @@ -323,7 +330,7 @@ def apply_reduction(self, reduce, partial_infs, datatype, delayed):
correct for this worker's predictions are weighted differently during
reduction.
"""
workers_weights = np.array(self.n_estimators_per_worker)
workers_weights = np.array(self.n_active_estimators_per_worker)
workers_weights = workers_weights[workers_weights != 0]
workers_weights = workers_weights / workers_weights.sum()
workers_weights = cp.array(workers_weights)
Expand Down

0 comments on commit ce96f3d

Please sign in to comment.