Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Blueprint Mixins, create ScreenTaskRequired mixin #566

Merged
merged 8 commits into from
Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/common_qualification_flows.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ The `onboarding_qualification` is shared between all task runs that use the same
You can also set up tasks that are only available to workers that have passed an existing onboarding (potentially for tasks that don't have their own onboarding), or use the onboarding failure list as a block list for a future task. Both examples are shown below:

```python
from mephisto.abstractions.blueprint import OnboardingRequired
from mephisto.abstractions.blueprints.mixins.onboarding_required import OnboardingRequired
from mephisto.data_model.qualification import QUAL_EQUAL, QUAL_NOT_EXIST, make_qualification_dict

ONBOARDING_QUALIFICATION_NAME = "TEST_ONBOARDING_QUAL_NAME"
Expand Down
95 changes: 22 additions & 73 deletions mephisto/abstractions/blueprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
Dict,
Any,
Type,
ClassVar,
Union,
Iterable,
Callable,
Expand All @@ -31,6 +30,7 @@
AgentShutdownError,
)
from mephisto.data_model.constants.assignment_state import AssignmentState
import types

if TYPE_CHECKING:
from mephisto.data_model.agent import Agent, OnboardingAgent
Expand All @@ -39,7 +39,6 @@
from mephisto.data_model.unit import Unit
from mephisto.data_model.packet import Packet
from mephisto.data_model.worker import Worker
from argparse import _ArgumentGroup as ArgumentGroup

from mephisto.operations.logger_core import get_logger

Expand All @@ -49,15 +48,6 @@
@dataclass
class BlueprintArgs:
_blueprint_type: str = MISSING
onboarding_qualification: str = field(
default=MISSING,
metadata={
"help": (
"Specify the name of a qualification used to block workers who fail onboarding, "
"Empty will skip onboarding."
)
},
)
block_qualification: str = field(
default=MISSING,
metadata={
Expand All @@ -73,11 +63,7 @@ class SharedTaskState:
be passed as Hydra args, like functions and objects
"""

onboarding_data: Dict[str, Any] = field(default_factory=dict)
task_config: Dict[str, Any] = field(default_factory=dict)
validate_onboarding: Callable[[Any], bool] = field(
default_factory=lambda: (lambda x: True)
)
qualifications: List[Any] = field(default_factory=list)
worker_can_do_unit: Callable[["Worker", "Unit"], bool] = field(
default_factory=lambda: (lambda worker, unit: True)
Expand Down Expand Up @@ -546,70 +532,33 @@ def get_task_end(self) -> Optional[float]:
return 0.0


class OnboardingRequired(object):
class BlueprintMixin(ABC):
"""
Compositional class for blueprints that may have an onboarding step
Base class for compositional mixins for blueprints
"""

@staticmethod
def get_failed_qual(qual_name: str) -> str:
"""Returns the wrapper for a qualification to represent failing an onboarding"""
return qual_name + "-failed"

def init_onboarding_config(
@abstractmethod
def init_mixin_config(
self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState"
):
self.onboarding_qualification_name: Optional[str] = args.blueprint.get(
"onboarding_qualification", None
)
self.onboarding_data = shared_state.onboarding_data
self.use_onboarding = self.onboarding_qualification_name is not None
self.onboarding_qualification_id = None
if self.onboarding_qualification_name is not None:
db = task_run.db
found_qualifications = db.find_qualifications(
self.onboarding_qualification_name
)
if len(found_qualifications) == 0:
self.onboarding_qualification_id = db.make_qualification(
self.onboarding_qualification_name
)
else:
self.onboarding_qualification_id = found_qualifications[0].db_id

# We need to keep a separate qualification for failed onboarding
# to push to a crowd provider in order to prevent workers
# who have failed from being shown our task
self.onboarding_failed_name = self.get_failed_qual(
self.onboarding_qualification_name
)
found_qualifications = db.find_qualifications(self.onboarding_failed_name)
if len(found_qualifications) == 0:
self.onboarding_failed_id = db.make_qualification(
self.onboarding_failed_name
)
else:
self.onboarding_failed_id = found_qualifications[0].db_id

def get_onboarding_data(self, worker_id: str) -> Dict[str, Any]:
"""
If the onboarding task on the frontend requires any specialized data, the blueprint
should provide it for the user.
) -> None:
"""Method to initialize any required attributes to make this mixin function"""
raise NotImplementedError()

As onboarding qualifies a worker for all tasks from this blueprint, this should
generally be static data that can later be evaluated against.
"""
return self.onboarding_data
@classmethod
@abstractmethod
def assert_task_args(
cls, args: "DictConfig", shared_state: "SharedTaskState"
) -> None:
"""Method to validate the incoming args and throw if something won't work"""
raise NotImplementedError()

def validate_onboarding(
self, worker: "Worker", onboarding_agent: "OnboardingAgent"
) -> bool:
"""
Check the incoming onboarding data and evaluate if the worker
has passed the qualification or not. Return True if the worker
has qualified.
"""
return True
@classmethod
@abstractmethod
def get_mixin_qualifications(
cls, args: "DictConfig", shared_state: "SharedTaskState"
) -> List[Dict[str, Any]]:
"""Method to provide any required qualifications to make this mixin function"""
raise NotImplementedError()


class Blueprint(ABC):
Expand Down
8 changes: 8 additions & 0 deletions mephisto/abstractions/blueprints/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ A blueprint is able to create a container that handles any shared data that is i
- `worker_can_do_unit`: A function that takes in a `Worker` and a `Unit`, and should return a boolean representing if the worker is eligible to work on that particular unit.
- `on_unit_submitted`: A function that takes in a `Unit` after a `TaskRunner` ends, and is able to do any automatic post-processing operations on that unit that a Mephisto user may want.

## `Blueprint` Mixins
Blueprints sometimes share some component functionality that may be useful across a multitude of tasks. We capture these in mixins. Mephisto is able to recognize certain mixins in order to complete additional operations, however custom mixins may help cut down on boiler plate in common `run_task.py` scripts. As your tasks mature, we suggest utilizing blueprint mixins to share common workflows and design patterns you observe.
### `OnboardingRequired`
This mixin allows for blueprints that require people to complete an onboarding task _before_ they're even able to start on their first task. Usually this is useful for providing task context, and then quizzing workers to see if they understand what's provided. Tasks using this mixin will activate onboarding mode for new `Worker`s whenever the `mephisto.blueprint.onboarding_qualification` hydra argument is provided.
### `ScreenTaskRequired`
This mixin allows for blueprints that require people to complete a _test_ version of the real task the first time a worker does the task. This allows you to validate workers on a run of the real task, either on your actual data (when providing `SharedTaskState.generate_screening_unit_data=False`) or on test data that you may more easily validate using (when providing a generator to `SharedTaskState.generate_screening_unit_data`). The tasks should be the same as your standard task, just able to be easily validated. You **do pay** for screening tasks, and as such we ask you set `mephisto.blueprint.max_screening_units` to put a cap on how many screening units you want to launch.
Copy link
Contributor

@pringshia pringshia Sep 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really like this API! ❤️

screening_data_factory could be another possible name here... just throwing out ideas



## Implementations
### `StaticBlueprint`
The `StaticBlueprint` class allows a replication of the interface that MTurk provides, being able to take a snippet of `HTML` and a `.csv` file and deploy tasks that fill templates of the `HTML` with values from the `.csv`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@

from mephisto.abstractions.blueprint import (
Blueprint,
OnboardingRequired,
BlueprintArgs,
SharedTaskState,
)
from mephisto.abstractions.blueprints.mixins.onboarding_required import (
OnboardingRequired,
OnboardingSharedState,
)
from dataclasses import dataclass, field
from omegaconf import MISSING, DictConfig
from mephisto.data_model.assignment import InitializationData
Expand Down Expand Up @@ -49,7 +52,7 @@


@dataclass
class SharedStaticTaskState(SharedTaskState):
class SharedStaticTaskState(SharedTaskState, OnboardingSharedState):
static_task_data: Iterable[Any] = field(default_factory=list)


Expand Down Expand Up @@ -105,7 +108,10 @@ class StaticBlueprint(Blueprint, OnboardingRequired):
SharedStateClass = SharedStaticTaskState

def __init__(
self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState"
self,
task_run: "TaskRun",
args: "DictConfig",
shared_state: "SharedStaticTaskState",
):
super().__init__(task_run, args, shared_state)
self.init_onboarding_config(task_run, args, shared_state)
Expand Down Expand Up @@ -144,7 +150,7 @@ def __init__(
pass

@classmethod
def assert_task_args(cls, args: DictConfig, shared_state: "SharedTaskState"):
def assert_task_args(cls, args: DictConfig, shared_state: "SharedStaticTaskState"):
"""Ensure that the data can be properly loaded"""
blue_args = args.blueprint
if blue_args.get("data_csv", None) is not None:
Expand Down Expand Up @@ -196,16 +202,3 @@ def data_generator() -> Iterable["InitializationData"]:
)
for d in self._initialization_data_dicts
]

def validate_onboarding(
self, worker: "Worker", onboarding_agent: "OnboardingAgent"
) -> bool:
"""
Check the incoming onboarding data and evaluate if the worker
has passed the qualification or not. Return True if the worker
has qualified.
"""
data = onboarding_agent.state.get_data()
return self.shared_state.validate_onboarding(
data
) # data["outputs"].get("success", True)
5 changes: 5 additions & 0 deletions mephisto/abstractions/blueprints/mixins/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env python3

# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
144 changes: 144 additions & 0 deletions mephisto/abstractions/blueprints/mixins/onboarding_required.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
#!/usr/bin/env python3

# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from typing import (
Optional,
Dict,
List,
Any,
Callable,
TYPE_CHECKING,
)

from mephisto.abstractions.blueprint import BlueprintMixin
from dataclasses import dataclass, field
from omegaconf import MISSING, DictConfig
from mephisto.data_model.qualification import make_qualification_dict, QUAL_NOT_EXIST
from mephisto.operations.utils import find_or_create_qualification

if TYPE_CHECKING:
from mephisto.abstractions.blueprint import SharedTaskState
from mephisto.data_model.agent import OnboardingAgent
from mephisto.data_model.task_run import TaskRun
from mephisto.data_model.worker import Worker
from argparse import _ArgumentGroup as ArgumentGroup


@dataclass
class OnboardingRequiredArgs:
onboarding_qualification: str = field(
default=MISSING,
metadata={
"help": (
"Specify the name of a qualification used to block workers who fail onboarding, "
"Empty will skip onboarding."
)
},
)


@dataclass
class OnboardingSharedState:
onboarding_data: Dict[str, Any] = field(default_factory=dict)
validate_onboarding: Callable[[Any], bool] = field(
default_factory=lambda: (lambda x: True)
)


class OnboardingRequired(BlueprintMixin):
"""
Compositional class for blueprints that may have an onboarding step
"""

def init_mixin_config(
self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState"
) -> None:
"""Method to initialize any required attributes to make this mixin function"""
self.init_onboarding_config(task_run, args, shared_state)

@classmethod
def assert_task_args(
cls, args: "DictConfig", shared_state: "SharedTaskState"
) -> None:
"""Method to validate the incoming args and throw if something won't work"""
# Is there any validation that should be done on the onboarding qualification name?
return

@classmethod
def get_mixin_qualifications(
cls, args: "DictConfig", shared_state: "SharedTaskState"
) -> List[Dict[str, Any]]:
"""Method to provide any required qualifications to make this mixin function"""
onboarding_qualification_name: Optional[str] = args.blueprint.get(
"onboarding_qualification", None
)
if onboarding_qualification_name is None:
# Not using an onboarding qualification
return []
return [
# We need to keep a separate qualification for failed onboarding
# to push to a crowd provider in order to prevent workers
# who have failed from being shown our task
make_qualification_dict(
cls.get_failed_qual(onboarding_qualification_name),
QUAL_NOT_EXIST,
None,
)
]

@staticmethod
def get_failed_qual(qual_name: str) -> str:
"""Returns the wrapper for a qualification to represent failing an onboarding"""
return qual_name + "-failed"

def init_onboarding_config(
self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState"
):
self.onboarding_qualification_name: Optional[str] = args.blueprint.get(
"onboarding_qualification", None
)
self.onboarding_data = shared_state.onboarding_data
self.use_onboarding = self.onboarding_qualification_name is not None
self.onboarding_qualification_id = None
if not self.use_onboarding:
return

db = task_run.db
self.onboarding_qualification_id = find_or_create_qualification(
db,
self.onboarding_qualification_name,
)
self.onboarding_failed_name = self.get_failed_qual(
self.onboarding_qualification_name
)
self.onboarding_failed_id = find_or_create_qualification(
db, self.onboarding_failed_name
)

def get_onboarding_data(self, worker_id: str) -> Dict[str, Any]:
"""
If the onboarding task on the frontend requires any specialized data, the blueprint
should provide it for the user.

As onboarding qualifies a worker for all tasks from this blueprint, this should
generally be static data that can later be evaluated against.
"""
return self.onboarding_data

def validate_onboarding(
self, worker: "Worker", onboarding_agent: "OnboardingAgent"
) -> bool:
"""
Check the incoming onboarding data and evaluate if the worker
has passed the qualification or not. Return True if the worker
has qualified.

By default we use the validate_onboarding provided in a run_task,
and all onboarding tasks should allow run_task to specify additional
or entirely override what's provided in a blueprint.
"""
data = onboarding_agent.state.get_data()
return self.shared_state.validate_onboarding(data)
Loading