Skip to content

Commit

Permalink
Review script tooling (#526)
Browse files Browse the repository at this point in the history
* Creating new review tooling helpers

* Haven't landed this yet...

* Exposing buck in mock tests going to expired

* Cleaner flow, bugfixes

* Fixing qualifications, adding reviews left

* Updating example to new examine script

* Black

* missed a line

* Black fix
  • Loading branch information
JackUrb authored Aug 27, 2021
1 parent 1c4f5c0 commit f0f88da
Show file tree
Hide file tree
Showing 4 changed files with 311 additions and 51 deletions.
59 changes: 9 additions & 50 deletions examples/simple_static_task/examine_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,10 @@
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.


from mephisto.abstractions.databases.local_database import LocalMephistoDB
from mephisto.tools.data_browser import DataBrowser as MephistoDataBrowser
from mephisto.tools.examine_utils import run_examine_or_review, print_results
from mephisto.data_model.worker import Worker
from mephisto.data_model.unit import Unit

db = LocalMephistoDB()
mephisto_data_browser = MephistoDataBrowser(db=db)

DO_REVIEW = True

units = mephisto_data_browser.get_units_for_task_name(input("Input task name: "))

tasks_to_show = input("Tasks to see? (a)ll/(u)nreviewed: ")
if tasks_to_show in ["all", "a"]:
DO_REVIEW = False
else:
units = [u for u in units if u.get_status() == "completed"]
print(
"You will be reviewing actual tasks with this flow. Tasks that you either Accept or Pass "
"will be paid out to the worker, while rejected tasks will not. Passed tasks will be "
"specially marked such that you can leave them out of your dataset. \n"
"When you pass on a task, the script gives you an option to disqualify the worker "
"from future tasks by assigning a qualification. If provided, this worker will no "
"longer be able to work on tasks where the set --block-qualification shares the same name.\n"
"You should only reject tasks when it is clear the worker has acted in bad faith, and "
"didn't actually do the task. Prefer to pass on tasks that were misunderstandings."
)


def format_for_printing_data(data):
Expand Down Expand Up @@ -58,28 +35,10 @@ def format_for_printing_data(data):
return f"-------------------\n{metadata_string}{inputs_string}{output_string}"


disqualification_name = None
for unit in units:
print(format_for_printing_data(mephisto_data_browser.get_data_from_unit(unit)))
if DO_REVIEW:
keep = input("Do you want to accept this work? (a)ccept, (r)eject, (p)ass: ")
if keep == "a":
unit.get_assigned_agent().approve_work()
elif keep == "r":
reason = input("Why are you rejecting this work?")
unit.get_assigned_agent().reject_work(reason)
elif keep == "p":
# General best practice is to accept borderline work and then disqualify
# the worker from working on more of these tasks
agent = unit.get_assigned_agent()
agent.soft_reject_work()
should_soft_block = input(
"Do you want to soft block this worker? (y)es/(n)o: "
)
if should_soft_block.lower() in ["y", "yes"]:
if disqualification_name == None:
disqualification_name = input(
"Please input the qualification name you are using to soft block for this task: "
)
worker = agent.get_worker()
worker.grant_qualification(disqualification_name, 1)
def main():
db = LocalMephistoDB()
run_examine_or_review(db, format_data_for_printing)


if __name__ == "__main__":
main()
6 changes: 5 additions & 1 deletion mephisto/abstractions/providers/mock/mock_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ def launch(self, task_url: str) -> None:

def expire(self) -> float:
"""Expiration is immediate on Mocks"""
self.db.update_unit(self.db_id, status=AssignmentState.EXPIRED)
if self.get_status() not in [
AssignmentState.EXPIRED,
AssignmentState.COMPLETED,
]:
self.db.update_unit(self.db_id, status=AssignmentState.EXPIRED)
self.datastore.set_unit_expired(self.db_id, True)
return 0.0

Expand Down
7 changes: 7 additions & 0 deletions mephisto/tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ It has three usable methods at the moment:
- `get_units_for_task_name`: This will go through all task runs that share the given `task_name`, and collect their units in the same manner as `get_units_for_run_id`.
- `get_data_from_unit`: When given a `Unit` that is in a terminal state, this method will return data about that `Unit`, including the Mephisto id of the worker, the status of the work, the data saved by this `Unit`, and the start and end times for when the worker produced the data.

## `examine_utils.py`
This file contains a number of helper functions that are useful for running reviews over Mephisto data. We provide a high-level script for doing a 'review-by-worker' style evaluation, as well as breakout helper functions that could be useful in alternate review flows.
- `run_examine_by_worker`: This function takes a function `format_data_for_printing` that consumes the result of `MephistoDataBrowser.get_data_from_unit`, and should print out to terminal a reviewable format. It optionally takes in `task_name`, `block_qualification`, and `approve_qualification` arguments. If `task_name` is provided, the script will be run in review mode without querying the user for anything.
- `print_results`: This function takes a task name and display function `format_data_for_printing`, and an optional int `limit`, and prints up to `limit` results to stdout.
- `format_worker_stats`: Takes in a worker id and set of previous worker stats, and returns the previous stats in the format `(accepted_count | total_rejected_count (soft_rejected_count) / total count)`
- `prompt_for_options`: Prompts the user for `task_name`, `block_qualification`, and `approve_qualification`. If provided as an argument, skips. Returns these values after confirming with the user, and if blank uses `None`.

## `scripts.py`
This file contains a few helper methods for running scripts relying on the `MephistoDB`. They are as follows:
- `get_db_from_config`: This method takes in a hydra-produced `DictConfig` containing a `MephistoConfig` (such as a `RunScriptConfig`), and returns an initialized `MephistoDB` compatible with the configuration. Right now this exclusively leverages the `LocalMephistoDB`.
Expand Down
290 changes: 290 additions & 0 deletions mephisto/tools/examine_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
#!/usr/bin/env python3

# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""
Utilities specifically for running examine scripts. Example usage can be
seen in the examine results scripts in the examples directory.
"""

from mephisto.tools.data_browser import DataBrowser
from mephisto.data_model.worker import Worker
from mephisto.operations.utils import find_or_create_qualification
import traceback

from typing import TYPE_CHECKING, Optional, Tuple, Callable, Dict, Any, List

if TYPE_CHECKING:
from mephisto.abstractions.database import MephistoDB
from mephsito.data_model.unit import Unit


def _get_and_format_data(
data_browser: "DataBrowser",
format_data_for_printing: Callable[[Dict[str, Any]], str],
unit: "Unit",
) -> str:
"""
Safetly wrapped function to extract the display data string for a specific unit.
Catches and prints any exceptions.
"""
formatted = "Error formatting data, see above..."
try:
data = data_browser.get_data_from_unit(unit)
try:
formatted = format_data_for_printing(data)
except Exception as e:
print(f"Unexpected error formatting data for {unit}: {e}")
# Print the full exception, as this could be user error on the
# formatting function
traceback.print_exc()
except Exception as e:
print(f"Unexpected error getting data for {unit}: {e}")
return formatted


def print_results(
db: "MephistoDB",
task_name: str,
format_data_for_printing: Callable[[Dict[str, Any]], str],
start: Optional[int] = None,
end: Optional[int] = None,
) -> None:
"""
Script to write out to stdout from start to end results from the task with the given task name
"""
data_browser = DataBrowser(db=db)
units = data_browser.get_units_for_task_name(task_name)
if end is None:
end = len(units)
if start is None:
start = 0
units.reverse()
for unit in units[start:end]:
print(_get_and_format_data(data_browser, format_data_for_printing, unit))


def prompt_for_options(
task_name: Optional[str] = None,
block_qualification: Optional[str] = None,
approve_qualification: Optional[str] = None,
) -> Tuple[str, Optional[str], Optional[str]]:
"""
Utility to request common user options for examine scripts.
Leave `block_qualification` or `approve_qualification` as empty strings
to skip their respective prompt.
"""
if task_name is None:
task_name = input("Input task name: ")
if block_qualification is None:
block_qualification = input(
"If you'd like to soft-block workers, you'll need a block qualification. "
"Leave blank otherwise.\nEnter block qualification: "
)
if approve_qualification is None:
approve_qualification = input(
"If you'd like to qualify high-quality workers, you'll need an approve "
"qualification. Leave blank otherwise.\nEnter approve qualification: "
)
if len(block_qualification.strip()) == 0:
block_qualification = None
if len(approve_qualification.strip()) == 0:
approve_qualification = None
input(
"Starting review with following params:\n"
f"Task name: {task_name}\n"
f"Blocking qualification: {block_qualification}\n"
f"Approve qualification: {approve_qualification}\n"
"Press enter to continue... "
)
return task_name, block_qualification, approve_qualification


def get_worker_stats(units: List["Unit"]) -> Dict[str, Dict[str, List["Unit"]]]:
"""
Traverse a list of units and create a mapping from worker id
to their units, grouped by their current status
"""
previous_work_by_worker = {}
for unit in units:
w_id = unit.worker_id
if w_id not in previous_work_by_worker:
previous_work_by_worker[w_id] = {
"accepted": [],
"soft_rejected": [],
"rejected": [],
}
previous_work_by_worker[w_id][unit.get_status()].append(unit)
return previous_work_by_worker


def format_worker_stats(
worker_id: str, previous_work_by_worker: Dict[str, Dict[str, List["Unit"]]]
) -> str:
"""
When given a worker id and a list of worker stats, return a string
containing the proportion of accepted to rejected work.
"""
prev_work = previous_work_by_worker.get(worker_id)
if prev_work is None:
return "(First time worker!)"
accepted_work = len(prev_work["accepted"])
soft_rejected_work = len(prev_work["soft_rejected"])
rejected_work = len(prev_work["rejected"])
return f"({accepted_work} | {rejected_work + soft_rejected_work}({soft_rejected_work}) / {accepted_work + soft_rejected_work + rejected_work})"


def run_examine_by_worker(
db: "MephistoDB",
format_data_for_printing: Callable[[Dict[str, Any]], str],
task_name: Optional[str] = None,
block_qualification: Optional[str] = None,
approve_qualification: Optional[str] = None,
):
"""
Basic script for reviewing work, grouped by worker for convenience. First gets
the required information to run a review, then
"""
data_browser = DataBrowser(db=db)

# Get initial arguments
if task_name is None:
task_name, block_qualification, approve_qualification = prompt_for_options(
task_name, block_qualification, approve_qualification
)

tasks = db.find_tasks(task_name=task_name)
assert len(tasks) >= 1, f"No task found under name {task_name}"

print(
"You will be reviewing actual tasks with this flow. Tasks that you either Accept or Pass "
"will be paid out to the worker, while rejected tasks will not. Passed tasks will be "
"specially marked such that you can leave them out of your dataset. \n"
"You may enter the option in caps to apply it to the rest of the units for a given worker."
)
if block_qualification is not None:
created_block_qual = find_or_create_qualification(db, block_qualification)
print(
"When you pass or reject a task, the script gives you an option to disqualify the worker "
"from future tasks by assigning a qualification. If provided, this worker will no "
"longer be able to work on tasks where the set --block-qualification shares the same name "
f"you provided above: {block_qualification}\n"
)
if approve_qualification is not None:
created_approve_qual = find_or_create_qualification(db, approve_qualification)
print(
"You may use this script to establish a qualified worker pool by granting the provided "
f"approve qualification {approve_qualification} to workers you think understand the task "
"well. This will be provided as an option for workers you (A)pprove all on. "
"Future tasks can use this qual as a required qualification, as described in the "
"common qualification flows document."
)
print(
"**************\n"
"You should only reject tasks when it is clear the worker has acted in bad faith, and "
"didn't actually do the task. Prefer to pass on tasks that were misunderstandings.\n"
"**************\n"
)

units = data_browser.get_units_for_task_name(task_name)

others = [u for u in units if u.get_status() != "completed"]
units = [u for u in units if u.get_status() == "completed"]
reviews_left = len(units)
previous_work_by_worker = get_worker_stats(others)

# Determine allowed options
options = ["a", "p", "r"]
options_string = "Do you want to accept this work? (a)ccept, (r)eject, (p)ass:"

units_by_worker: Dict[str, List["Unit"]] = {}

for u in units:
w_id = u.worker_id
if w_id not in units_by_worker:
units_by_worker[w_id] = []
units_by_worker[w_id].append(u)

# Run the review
for w_id, w_units in units_by_worker.items():
worker = Worker(db, w_id)
worker_name = worker.worker_name
apply_all_decision = None
reason = None
for idx, unit in enumerate(w_units):

print(
f"Reviewing for worker {worker_name}, ({idx+1}/{len(w_units)}), "
f"Previous {format_worker_stats(w_id, previous_work_by_worker)} "
f"(total remaining: {reviews_left})"
)
reviews_left -= 1
print(format_data_for_printing(data_browser.get_data_from_unit(unit)))
if apply_all_decision is not None:
decision = apply_all_decision
else:
decision = input(
"Do you want to accept this work? (a)ccept, (r)eject, (p)ass: "
)
while decision.lower() not in options:
decision = input(
"Decision must be one of a, p, r. Use CAPS to apply to all remaining for worker: "
)

agent = unit.get_assigned_agent()

if decision.lower() == "a":
agent.approve_work()
if decision == "A" and approve_qualification is not None:
should_special_qualify = input(
"Do you want to approve qualify this worker? (y)es/(n)o: "
)
if should_special_qualify.lower() in ["y", "yes"]:
worker.grant_qualification(approve_qualification, 1)
elif decision.lower() == "p":
agent.soft_reject_work()
if apply_all_decision is None and block_qualification is not None:
should_soft_block = input(
"Do you want to soft block this worker? (y)es/(n)o: "
)
if should_soft_block.lower() in ["y", "yes"]:
worker.grant_qualification(block_qualification, 1)
else: # decision = 'r'
if apply_all_decision is None:
reason = input("Why are you rejecting this work? ")
should_block = input(
"Do you want to hard block this worker? (y)es/(n)o: "
)
if should_block.lower() in ["y", "yes"]:
block_reason = input("Why permanently block this worker? ")
worker.block_worker(block_reason)
unit.get_assigned_agent().reject_work(reason)

if decision.lower() != decision:
apply_all_decision = decision.lower()


def run_examine_or_review(
db: "MephistoDB",
format_data_for_printing: Callable[[Dict[str, Any]], str],
) -> None:
do_review = input(
"Do you want to (r)eview, or (e)xamine data? Default "
"examine. Can put e <end> or e <start> <end> to choose "
"how many to view\n"
)

if do_review.lower().startswith("r"):
run_examine_by_worker(db, format_data_for_printing)
else:
start = 0
end = 15
opts = do_review.split(" ")
if len(opts) == 2:
end = int(opts[1])
elif len(opts) == 3:
start = int(opts[1])
end = int(opts[2])
task_name = input("Input task name: ")
print_results(db, task_name, format_data_for_printing, start=start, end=end)

0 comments on commit f0f88da

Please sign in to comment.