Skip to content

Commit

Permalink
Task review app - added creating Unit Review to Agent logic
Browse files Browse the repository at this point in the history
  • Loading branch information
meta-paul committed Oct 3, 2023
1 parent c9489d9 commit 259739a
Show file tree
Hide file tree
Showing 17 changed files with 286 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ function ReviewFrontend({ reviewData }) {
<div>
<Instructions taskData={{}} />
<div>
{reviewData["final_submission"]["annotations"].map((_d, idx) => (
{reviewData["outputs"]["final_submission"]["annotations"].map((_d, idx) => (
<ReviewAnnotationCanvas
index={idx}
key={"Annotation-" + String(idx)}
Expand Down
54 changes: 54 additions & 0 deletions mephisto/abstractions/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class EntryDoesNotExistException(MephistoDBException):
)
GET_GRANTED_QUALIFICATION_LATENCY = DATABASE_LATENCY.labels(method="get_granted_qualification")
REVOKE_QUALIFICATION_LATENCY = DATABASE_LATENCY.labels(method="revoke_qualification")
NEW_UNIT_REVIEW_LATENCY = DATABASE_LATENCY.labels(method="new_unit_review")
UPDATE_UNIT_REVIEW_LATENCY = DATABASE_LATENCY.labels(method="update_unit_review")


class MephistoDB(ABC):
Expand Down Expand Up @@ -1045,6 +1047,58 @@ def revoke_qualification(self, qualification_id: str, worker_id: str) -> None:
"""
return self._revoke_qualification(qualification_id=qualification_id, worker_id=worker_id)

def _new_unit_review(
self,
unit_id: int,
task_id: int,
worker_id: int,
status: str,
feedback: Optional[str] = None,
tips: Optional[str] = None,
) -> None:
"""new_unit_review implementation"""
raise NotImplementedError()

@NEW_UNIT_REVIEW_LATENCY.time()
def new_unit_review(
self,
unit_id: int,
task_id: int,
worker_id: int,
status: str,
feedback: Optional[str] = None,
tips: Optional[str] = None,
) -> None:
"""Create unit review"""
return self._new_unit_review(unit_id, task_id, worker_id, status, feedback, tips)

@abstractmethod
def _update_unit_review(
self,
unit_id: int,
qualification_id: int,
worker_id: int,
value: Optional[int] = None,
revoke: bool = False,
) -> None:
"""update_unit_review implementation"""
raise NotImplementedError()

@UPDATE_UNIT_REVIEW_LATENCY.time()
def update_unit_review(
self,
unit_id: int,
qualification_id: int,
worker_id: int,
value: Optional[int] = None,
revoke: bool = False,
) -> None:
"""
Update the given unit review with the given parameters if possible,
raise appropriate exception otherwise.
"""
return self._update_unit_review(unit_id, qualification_id, worker_id, value, revoke)

# File/blob manipulation methods

@abstractmethod
Expand Down
86 changes: 86 additions & 0 deletions mephisto/abstractions/databases/local_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1451,6 +1451,92 @@ def _find_onboarding_agents(
for r in rows
]

def _new_unit_review(
self,
unit_id: int,
task_id: int,
worker_id: int,
status: str,
feedback: Optional[str] = None,
tips: Optional[str] = None,
) -> None:
"""Create unit review"""

with self.table_access_condition:
conn = self._get_connection()
c = conn.cursor()
c.execute(
"""
INSERT INTO unit_review (
unit_id,
worker_id,
task_id,
status,
feedback,
tips
) VALUES (?, ?, ?, ?, ?, ?);
""",
(
unit_id,
worker_id,
task_id,
status,
feedback,
tips,
),
)
conn.commit()

def _update_unit_review(
self,
unit_id: int,
qualification_id: int,
worker_id: int,
value: Optional[int] = None,
revoke: bool = False,
) -> None:
"""
Update the given unit review with the given parameters if possible,
raise appropriate exception otherwise.
"""
with self.table_access_condition:
conn = self._get_connection()
c = conn.cursor()

c.execute(
"""
SELECT * FROM unit_review
WHERE (unit_id = ?) AND (worker_id = ?)
ORDER BY created_at ASC;
""",
(unit_id, worker_id),
)
results = c.fetchall()
if not results:
raise EntryDoesNotExistException(
f"`unit_review` was not created for this `unit_id={unit_id}`"
)

latest_unit_review_id = results[-1]["id"]

c.execute(
"""
UPDATE unit_review
SET
updated_qualification_id = ?,
updated_qualification_value = ?,
revoked_qualification_id = ?
WHERE id = ?;
""",
(
qualification_id if not revoke else None,
value,
qualification_id if revoke else None,
latest_unit_review_id,
),
)
conn.commit()

# File/blob manipulation methods

def _assert_path_in_domain(self, path_key: str) -> None:
Expand Down
29 changes: 27 additions & 2 deletions mephisto/abstractions/providers/mock/mock_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,45 @@ def get_live_update(self, timeout=None) -> Optional[Dict[str, Any]]:
self.datastore.agent_data[self.db_id]["acts"].append(act)
return act

def approve_work(self) -> None:
def approve_work(
self,
feedback: Optional[str] = None,
tips: Optional[str] = None,
skip_unit_review: bool = False,
) -> None:
"""
Approve the work done on this specific Unit
Mock Units
"""
self.update_status(AgentState.STATUS_APPROVED)

def reject_work(self, reason) -> None:
if not skip_unit_review:
unit = self.get_unit()
self.db.new_unit_review(
unit_id=int(unit.db_id),
task_id=int(unit.task_id),
worker_id=int(unit.worker_id),
status=AgentState.STATUS_APPROVED,
feedback=feedback,
tips=tips,
)

def reject_work(self, feedback: Optional[str] = None) -> None:
"""
Reject the work done on this specific Unit
"""
self.update_status(AgentState.STATUS_REJECTED)

unit = self.get_unit()
self.db.new_unit_review(
unit_id=int(unit.db_id),
task_id=int(unit.task_id),
worker_id=int(unit.worker_id),
status=AgentState.STATUS_REJECTED,
feedback=feedback,
)

def mark_done(self):
"""No need to tell mock crowd provider about doneness"""
pass
Expand Down
31 changes: 28 additions & 3 deletions mephisto/abstractions/providers/mturk/mturk_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,12 @@ def attempt_to_reconcile_submitted_data(self, mturk_hit_id: str):

# Required functions for Agent Interface

def approve_work(self) -> None:
def approve_work(
self,
feedback: Optional[str] = None,
tips: Optional[str] = None,
skip_unit_review: bool = False,
) -> None:
"""Approve the work done on this specific Unit"""
if self.get_status() == AgentState.STATUS_APPROVED:
logger.info(f"Approving already approved agent {self}, skipping")
Expand All @@ -110,15 +115,35 @@ def approve_work(self) -> None:
approve_work(client, self._get_mturk_assignment_id(), override_rejection=True)
self.update_status(AgentState.STATUS_APPROVED)

def reject_work(self, reason) -> None:
if not skip_unit_review:
unit = self.get_unit()
self.db.new_unit_review(
unit_id=int(unit.db_id),
task_id=int(unit.task_id),
worker_id=int(unit.worker_id),
status=AgentState.STATUS_APPROVED,
feedback=feedback,
tips=tips,
)

def reject_work(self, feedback: Optional[str] = None) -> None:
"""Reject the work done on this specific Unit"""
if self.get_status() == AgentState.STATUS_APPROVED:
logger.warning(f"Cannot reject {self}, it is already approved")
return
client = self._get_client()
reject_work(client, self._get_mturk_assignment_id(), reason)
reject_work(client, self._get_mturk_assignment_id(), feedback)
self.update_status(AgentState.STATUS_REJECTED)

unit = self.get_unit()
self.db.new_unit_review(
unit_id=int(unit.db_id),
task_id=int(unit.task_id),
worker_id=int(unit.worker_id),
status=AgentState.STATUS_REJECTED,
feedback=feedback,
)

def mark_done(self) -> None:
"""
MTurk agents are marked as done on the side of MTurk, so if this agent
Expand Down
4 changes: 2 additions & 2 deletions mephisto/abstractions/providers/mturk/mturk_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,10 +610,10 @@ def approve_work(client: MTurkClient, assignment_id: str, override_rejection: bo
)


def reject_work(client: MTurkClient, assignment_id: str, reason: str) -> None:
def reject_work(client: MTurkClient, assignment_id: str, feedback: Optional[str] = None) -> None:
"""reject work for a given assignment through the mturk client"""
try:
client.reject_assignment(AssignmentId=assignment_id, RequesterFeedback=reason)
client.reject_assignment(AssignmentId=assignment_id, RequesterFeedback=feedback)
except Exception as e:
logger.exception(
f"Rejecting MTurk assignment failed, likely because it has auto-approved. Details:{e}",
Expand Down
35 changes: 30 additions & 5 deletions mephisto/abstractions/providers/prolific/prolific_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ def new_from_provider_data(

return super().new_from_provider_data(db, worker, unit, provider_data)

def approve_work(self) -> None:
def approve_work(
self,
feedback: Optional[str] = None,
tips: Optional[str] = None,
skip_unit_review: bool = False,
) -> None:
"""Approve the work done on this specific Unit"""
logger.debug(f"{self.log_prefix}Approving work")

Expand All @@ -125,9 +130,20 @@ def approve_work(self) -> None:

self.update_status(AgentState.STATUS_APPROVED)

def soft_reject_work(self) -> None:
if not skip_unit_review:
unit = self.get_unit()
self.db.new_unit_review(
unit_id=int(unit.db_id),
task_id=int(unit.task_id),
worker_id=int(unit.worker_id),
status=AgentState.STATUS_APPROVED,
feedback=feedback,
tips=tips,
)

def soft_reject_work(self, feedback: Optional[str] = None) -> None:
"""Mark as soft rejected on Mephisto and approve Worker on Prolific"""
super().soft_reject_work()
super().soft_reject_work(feedback=feedback)

client = self._get_client()
prolific_study_id = self.unit.get_prolific_study_id()
Expand All @@ -144,7 +160,7 @@ def soft_reject_work(self) -> None:
f"has been soft rejected"
)

def reject_work(self, reason) -> None:
def reject_work(self, feedback: Optional[str] = None) -> None:
"""Reject the work done on this specific Unit"""
logger.debug(f"{self.log_prefix}Rejecting work")

Expand Down Expand Up @@ -174,11 +190,20 @@ def reject_work(self, reason) -> None:
logger.debug(
f"{self.log_prefix}"
f'Work for Study "{prolific_study_id}" completed by worker "{worker_id}" '
f"has been rejected. Reason: {reason}"
f"has been rejected. Reason: {feedback}"
)

self.update_status(AgentState.STATUS_REJECTED)

unit = self.get_unit()
self.db.new_unit_review(
unit_id=int(unit.db_id),
task_id=int(unit.task_id),
worker_id=int(unit.worker_id),
status=AgentState.STATUS_REJECTED,
feedback=feedback,
)

def mark_done(self) -> None:
"""
Prolific agents are marked as done on the side of Prolific, so if this agent
Expand Down
4 changes: 1 addition & 3 deletions mephisto/client/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,14 +410,12 @@ def metrics_cli(args):
@click.option("-h", "--host", type=(str), default="127.0.0.1")
@click.option("-p", "--port", type=(int), default=5000)
@click.option("-d", "--debug", type=(bool), default=None)
@click.option("-P", "--provider", type=(str), default=PROLIFIC_PROVIDER_TYPE)
@pass_script_info
def review_app(
info,
host,
port,
debug,
provider,
):
"""
Launch a local review server.
Expand All @@ -438,7 +436,7 @@ def review_app(
show_server_banner(get_env(), debug, info.app_import_path, eager_loading)

# Init App
app = create_app(provider=provider, debug=debug)
app = create_app(debug=debug)

# Run Flask server
run_simple(
Expand Down
Loading

0 comments on commit 259739a

Please sign in to comment.