From ddac3a7009b9100873fcddfeb6da49b67507b6a4 Mon Sep 17 00:00:00 2001 From: Jack Urbanek Date: Fri, 17 Sep 2021 19:44:34 -0400 Subject: [PATCH 1/8] Creating the validator workflow --- mephisto/abstractions/blueprint.py | 99 ++++++++++++++++++++++ mephisto/abstractions/blueprints/README.md | 8 ++ mephisto/data_model/task_run.py | 7 +- mephisto/operations/operator.py | 1 + mephisto/operations/supervisor.py | 41 ++++++++- mephisto/operations/task_launcher.py | 37 ++++++++ 6 files changed, 188 insertions(+), 5 deletions(-) diff --git a/mephisto/abstractions/blueprint.py b/mephisto/abstractions/blueprint.py index bb5f642a1..f02496778 100644 --- a/mephisto/abstractions/blueprint.py +++ b/mephisto/abstractions/blueprint.py @@ -31,6 +31,7 @@ AgentShutdownError, ) from mephisto.data_model.constants.assignment_state import AssignmentState +import types if TYPE_CHECKING: from mephisto.data_model.agent import Agent, OnboardingAgent @@ -612,6 +613,104 @@ def validate_onboarding( return True +class ValidationRequired(object): + """ + Compositional class for blueprints that may have a first task to + qualify workers who have never attempted the task before + """ + + def init_validation_config( + self, + task_run: "TaskRun", + args: "DictConfig", + shared_state: "SharedTaskState", + ) -> None: + self.use_validation_task = args.blueprint.get("use_validation_task", False) + if not self.use_validation_task: + return + + # Runs that are using a qualification task should be able to assign + # a specially generated unit to unqualified workers + self.passed_qualification_name = args.blueprint.passed_qualification_name + self.failed_qualification_name = args.blueprint.block_qualification + self.generate_validation_unit_data = shared_state.generate_validation_unit_data + + find_or_create_qualification(task_run.db, self.passed_qualification_name) + find_or_create_qualification(task_run.db, self.failed_qualification_name) + + @classmethod + def assert_task_args(cls, args: "DictConfig", shared_state: "SharedTaskState"): + use_validation_task = args.blueprint.get("use_validation_task", False) + if not use_validation_task: + return + passed_qualification_name = args.blueprint.passed_qualification_name + failed_qualification_name = args.blueprint.block_qualification + assert args.task.allowed_concurrent == 1, ( + "Can only run this task type with one allowed task at a time per worker, to ensure validation " + "before moving into more tasks." + ) + assert ( + passed_qualification_name is not None + ), "Must supply an passed_qualification_name in Hydra args to use a qualification task" + assert ( + failed_qualification_name is not None + ), "Must supply an block_qualification in Hydra args to use a qualification task" + assert hasattr(shared_state, "generate_validation_unit_data"), ( + "You must supply a generate_validation_unit_data function in your SharedTaskState to use " + "qualification tasks." + ) + generate_validation_unit_data = shared_state.generate_validation_unit_data + if generate_validation_unit_data is not False: + assert isinstance(generate_validation_unit_data, types.GeneratorType), ( + "Must provide a generator function to SharedTaskState.generate_validation_unit_data if " + "you want to generate validation tasks on the fly, or False if you can validate on any task " + ) + + def worker_needs_validation(self, worker: "Worker") -> bool: + """Workers that are able to access the task (not blocked) but are not passed need qualification""" + return worker.get_granted_qualification(self.passed_qualification_name) is None + + def should_generate_unit(self) -> bool: + return self.generate_validation_unit_data is not False + + def get_validation_unit_data(self) -> Optional[Dict[str, Any]]: + try: + return next(self.generate_validation_unit_data) + except StopIteration: + return None # No validation units left... + + @classmethod + def create_validation_function( + cls, args: "DictConfig", validate_unit: Callable[["Unit"], bool] + ): + """ + Takes in a validator function to determine if validation units are + passable, and returns a `on_unit_submitted` function to be used + in the SharedTaskState + """ + passed_qualification_name = args.blueprint.passed_qualification_name + failed_qualification_name = args.blueprint.block_qualification + + def _wrapped_validate(unit): + if unit.unit_index >= 0: + return # We only run validation on the validatable units + + validation_result = validate_unit(unit) + agent = unit.get_assigned_agent() + if validation_result is True: + agent.get_worker().grant_qualification(passed_qualification_name) + elif validation_result is False: + agent.get_worker().grant_qualification(failed_qualification_name) + + return _wrapped_validate + + @classmethod + def get_relevant_qualifications(cls, args: "DictConfig"): + """Creates the relevant task qualifications for this task""" + passed_qualification_name = args.blueprint.passed_qualification_name + failed_qualification_name = args.blueprint.block_qualification + + class Blueprint(ABC): """ Configuration class for the various parts of building, launching, diff --git a/mephisto/abstractions/blueprints/README.md b/mephisto/abstractions/blueprints/README.md index 0643aff65..9a0606fe2 100644 --- a/mephisto/abstractions/blueprints/README.md +++ b/mephisto/abstractions/blueprints/README.md @@ -31,6 +31,14 @@ A blueprint is able to create a container that handles any shared data that is i - `worker_can_do_unit`: A function that takes in a `Worker` and a `Unit`, and should return a boolean representing if the worker is eligible to work on that particular unit. - `on_unit_submitted`: A function that takes in a `Unit` after a `TaskRunner` ends, and is able to do any automatic post-processing operations on that unit that a Mephisto user may want. +## `Blueprint` Mixins +Blueprints sometimes share some component functionality that may be useful across a multitude of tasks. We capture these in mixins. Mephisto is able to recognize certain mixins in order to complete additional operations, however custom mixins may help cut down on boiler plate in common `run_task.py` scripts. As your tasks mature, we suggest utilizing blueprint mixins to share common workflows and design patterns you observe. +### `OnboardingRequired` +This mixin allows for blueprints that require people to complete an onboarding task _before_ they're even able to start on their first task. Usually this is useful for providing task context, and then quizzing workers to see if they understand what's provided. Tasks using this mixin will activate onboarding mode for new `Worker`s whenever the `mephisto.blueprint.onboarding_qualification` hydra argument is provided. +### `ValidationRequired` +This mixin allows for blueprints that require people to complete a _test_ version of the real task the first time a worker does the task. This allows you to validate workers on a run of the real task, either on your actual data (when providing `SharedTaskState.generate_validation_unit_data=False`) or on test data that you may more easily validate using (when providing a generator to `SharedTaskState.generate_validation_unit_data`). The tasks should be the same as your standard task, just able to be validated. If your generator runs out, Mephisto assumes that you've hit the limit of workers you're willing to validate for the task (because you do pay for the validation step). + + ## Implementations ### `StaticBlueprint` The `StaticBlueprint` class allows a replication of the interface that MTurk provides, being able to take a snippet of `HTML` and a `.csv` file and deploy tasks that fill templates of the `HTML` with values from the `.csv`. diff --git a/mephisto/data_model/task_run.py b/mephisto/data_model/task_run.py index d4ae2d871..9298c6cd1 100644 --- a/mephisto/data_model/task_run.py +++ b/mephisto/data_model/task_run.py @@ -144,7 +144,12 @@ def get_valid_units_for_worker(self, worker: "Worker") -> List["Unit"]: is_self_set = map(lambda u: u.worker_id == worker.db_id, unit_set) if not any(is_self_set): units += unit_set - valid_units = [u for u in units if u.get_status() == AssignmentState.LAUNCHED] + # Valid units must be launched and must not be special units (negative indices) + valid_units = [ + u + for u in units + if u.get_status() == AssignmentState.LAUNCHED and u.unit_index >= 0 + ] logger.debug(f"Found {len(valid_units)} available units") # Should load cached blueprint for SharedTaskState diff --git a/mephisto/operations/operator.py b/mephisto/operations/operator.py index fc5eb6636..243ce0773 100644 --- a/mephisto/operations/operator.py +++ b/mephisto/operations/operator.py @@ -278,6 +278,7 @@ def validate_and_run_config_or_die( ) launcher.create_assignments() launcher.launch_units(task_url) + job.task_launcher = launcher self._task_runs_tracked[task_run.db_id] = TrackedRun( task_run=task_run, diff --git a/mephisto/operations/supervisor.py b/mephisto/operations/supervisor.py index c1a3ed761..1ef019533 100644 --- a/mephisto/operations/supervisor.py +++ b/mephisto/operations/supervisor.py @@ -27,8 +27,13 @@ from mephisto.data_model.worker import Worker from mephisto.data_model.qualification import worker_is_qualified from mephisto.data_model.agent import Agent, OnboardingAgent -from mephisto.abstractions.blueprint import OnboardingRequired, AgentState +from mephisto.abstractions.blueprint import ( + OnboardingRequired, + ValidationRequired, + AgentState, +) from mephisto.operations.registry import get_crowd_provider_from_type +from mephisto.operations.task_launcher import TaskLauncher, VALIDATOR_UNIT_INDEX from mephisto.abstractions.channel import Channel, STATUS_CHECK_TIME from dataclasses import dataclass @@ -79,6 +84,7 @@ class Job: provider: "CrowdProvider" qualifications: List[Dict[str, Any]] registered_channel_ids: List[str] + task_launcher: Optional["TaskLauncher"] @dataclass @@ -155,6 +161,7 @@ def register_job( provider=provider, qualifications=qualifications, registered_channel_ids=[], + task_launcher=None, ) for channel in channels: channel_id = self.register_channel(channel, job) @@ -399,6 +406,8 @@ def _launch_and_run_unit( logger.exception(f"Cleaning up unit: {e}", exc_info=True) task_runner.cleanup_unit(unit) finally: + if unit.unit_index < 0: # Special units should always expire + unit.expire() task_runner.task_run.clear_reservation(unit) def _assign_unit_to_agent( @@ -452,7 +461,7 @@ def _assign_unit_to_agent( ] = agent_info # Launch individual tasks - if not channel_info.job.task_runner.is_concurrent: + if unit.unit_index < 0 or not channel_info.job.task_runner.is_concurrent: unit_thread = threading.Thread( target=self._launch_and_run_unit, args=(unit, agent_info, channel_info.job.task_runner), @@ -589,8 +598,7 @@ def _register_agent(self, packet: Packet, channel_info: ChannelInfo): ) ) logger.debug( - f"Found existing agent_registration_id {agent_registration_id}, " - f"had no valid units." + f"agent_registration_id {agent_registration_id}, had no valid units." ) return @@ -657,6 +665,31 @@ def _register_agent(self, packet: Packet, channel_info: ChannelInfo): agent_info.assignment_thread = onboard_thread onboard_thread.start() return + if isinstance(blueprint, ValidationRequired) and blueprint.use_validation_task: + if ( + blueprint.worker_needs_validation(worker) + and blueprint.should_generate_unit() + ): + validation_data = blueprint.get_validation_unit_data() + if validation_data is not None: + launcher = channel_info.job.task_launcher + units = [launcher.launch_validator_unit(validation_data)] + else: + self.message_queue.append( + Packet( + packet_type=PACKET_TYPE_PROVIDER_DETAILS, + sender_id=SYSTEM_CHANNEL_ID, + receiver_id=channel_info.channel_id, + data={ + "request_id": packet.data["request_id"], + "agent_id": None, + }, + ) + ) + logger.debug( + f"No validation units left for {agent_registration_id}." + ) + return # Not onboarding, so just register directly self._assign_unit_to_agent(packet, channel_info, units) diff --git a/mephisto/operations/task_launcher.py b/mephisto/operations/task_launcher.py index 49c837d30..d43ea2459 100644 --- a/mephisto/operations/task_launcher.py +++ b/mephisto/operations/task_launcher.py @@ -35,6 +35,7 @@ UNIT_GENERATOR_WAIT_SECONDS = 10 ASSIGNMENT_GENERATOR_WAIT_SECONDS = 0.5 +VALIDATOR_UNIT_INDEX = -1 class GeneratorType(enum.Enum): @@ -70,6 +71,7 @@ def __init__( self.keep_launching_units: bool = False self.finished_generators: bool = False self.assignment_thread_done: bool = True + self.launch_url: Optional[str] = None self.unlaunched_units_access_condition = threading.Condition() if isinstance(self.assignment_data_iterable, types.GeneratorType): @@ -198,11 +200,46 @@ def _launch_limited_units(self, url: str) -> None: def launch_units(self, url: str) -> None: """launch any units registered by this TaskLauncher""" + self.launch_url = url self.units_thread = threading.Thread( target=self._launch_limited_units, args=(url,), name="unit-generator" ) self.units_thread.start() + def launch_validator_unit(self, unit_data: Dict[str, Any]) -> "Unit": + """Launch a validator unit, which should never return to the pool""" + assert ( + self.launch_url is not None + ), "Cannot launch a validator unit before launching others" + task_run = self.task_run + task_config = task_run.get_task_config() + assignment_id = self.db.new_assignment( + task_run.task_id, + task_run.db_id, + task_run.requester_id, + task_run.task_type, + task_run.provider_type, + task_run.sandbox, + ) + data = InitializationData(unit_data, [{}]) + assignment = Assignment.get(self.db, assignment_id) + assignment.write_assignment_data(data) + self.assignments.append(assignment) + unit_id = self.db.new_unit( + task_run.task_id, + task_run.db_id, + task_run.requester_id, + assignment_id, + VALIDATOR_UNIT_INDEX, + task_config.task_reward, + task_run.provider_type, + task_run.task_type, + task_run.sandbox, + ) + validator_unit = Unit.get(self.db, unit_id) + validator_unit.launch(self.launch_url) + return validator_unit + def get_assignments_are_all_created(self) -> bool: return self.assignment_thread_done From 475957cdc3aed6dff307138f1c4cd4b331fb281e Mon Sep 17 00:00:00 2001 From: Jack Urbanek Date: Mon, 20 Sep 2021 14:50:23 -0400 Subject: [PATCH 2/8] Fixing testing --- test/core/test_supervisor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test/core/test_supervisor.py b/test/core/test_supervisor.py index a9240b2f2..e617b42da 100644 --- a/test/core/test_supervisor.py +++ b/test/core/test_supervisor.py @@ -118,6 +118,7 @@ def test_channel_operations(self): provider=self.provider, qualifications=[], registered_channel_ids=[], + task_launcher=self.launcher, ) channels = self.architect.get_channels( From a6f3205a3e7db200cd0f04002977b50f925cf3d2 Mon Sep 17 00:00:00 2001 From: Jack Urbanek Date: Mon, 20 Sep 2021 17:50:55 -0400 Subject: [PATCH 3/8] adding qualification note --- mephisto/abstractions/blueprint.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mephisto/abstractions/blueprint.py b/mephisto/abstractions/blueprint.py index f02496778..fd66b6092 100644 --- a/mephisto/abstractions/blueprint.py +++ b/mephisto/abstractions/blueprint.py @@ -709,6 +709,9 @@ def get_relevant_qualifications(cls, args: "DictConfig"): """Creates the relevant task qualifications for this task""" passed_qualification_name = args.blueprint.passed_qualification_name failed_qualification_name = args.blueprint.block_qualification + # Block_qualification is currently covered in mephisto core, and we don't need + # to invoke any viewership constraints for passed workers. + return [] class Blueprint(ABC): From 9f5765837039e5f91dca090eccaab80d36a0d155 Mon Sep 17 00:00:00 2001 From: Jack Urbanek Date: Wed, 22 Sep 2021 15:31:46 -0400 Subject: [PATCH 4/8] refactoring mixins --- docs/common_qualification_flows.md | 2 +- mephisto/abstractions/blueprint.py | 189 ++--------------- mephisto/abstractions/blueprints/README.md | 4 +- .../abstract/static_task/static_blueprint.py | 27 +-- .../blueprints/mixins/__init__.py | 5 + .../blueprints/mixins/onboarding_required.py | 144 +++++++++++++ .../blueprints/mixins/screen_task_required.py | 195 ++++++++++++++++++ .../blueprints/mock/mock_blueprint.py | 25 ++- .../parlai_chat/parlai_chat_blueprint.py | 14 +- .../static_html_task/static_html_blueprint.py | 10 - .../abstractions/test/architect_tester.py | 4 +- mephisto/data_model/task_run.py | 4 +- mephisto/operations/operator.py | 5 +- mephisto/operations/supervisor.py | 22 +- mephisto/operations/task_launcher.py | 10 +- mephisto/operations/utils.py | 15 +- .../architects/test_heroku_architect.py | 4 +- test/core/test_supervisor.py | 8 +- 18 files changed, 445 insertions(+), 242 deletions(-) create mode 100644 mephisto/abstractions/blueprints/mixins/__init__.py create mode 100644 mephisto/abstractions/blueprints/mixins/onboarding_required.py create mode 100644 mephisto/abstractions/blueprints/mixins/screen_task_required.py diff --git a/docs/common_qualification_flows.md b/docs/common_qualification_flows.md index bc5948dbe..1ebb19579 100644 --- a/docs/common_qualification_flows.md +++ b/docs/common_qualification_flows.md @@ -15,7 +15,7 @@ The `onboarding_qualification` is shared between all task runs that use the same You can also set up tasks that are only available to workers that have passed an existing onboarding (potentially for tasks that don't have their own onboarding), or use the onboarding failure list as a block list for a future task. Both examples are shown below: ```python -from mephisto.abstractions.blueprint import OnboardingRequired +from mephisto.abstractions.blueprints.mixins.onboarding_required import OnboardingRequired from mephisto.data_model.qualification import QUAL_EQUAL, QUAL_NOT_EXIST, make_qualification_dict ONBOARDING_QUALIFICATION_NAME = "TEST_ONBOARDING_QUAL_NAME" diff --git a/mephisto/abstractions/blueprint.py b/mephisto/abstractions/blueprint.py index fd66b6092..23f33a6be 100644 --- a/mephisto/abstractions/blueprint.py +++ b/mephisto/abstractions/blueprint.py @@ -13,7 +13,6 @@ Dict, Any, Type, - ClassVar, Union, Iterable, Callable, @@ -40,7 +39,6 @@ from mephisto.data_model.unit import Unit from mephisto.data_model.packet import Packet from mephisto.data_model.worker import Worker - from argparse import _ArgumentGroup as ArgumentGroup from mephisto.operations.logger_core import get_logger @@ -50,15 +48,6 @@ @dataclass class BlueprintArgs: _blueprint_type: str = MISSING - onboarding_qualification: str = field( - default=MISSING, - metadata={ - "help": ( - "Specify the name of a qualification used to block workers who fail onboarding, " - "Empty will skip onboarding." - ) - }, - ) block_qualification: str = field( default=MISSING, metadata={ @@ -74,11 +63,7 @@ class SharedTaskState: be passed as Hydra args, like functions and objects """ - onboarding_data: Dict[str, Any] = field(default_factory=dict) task_config: Dict[str, Any] = field(default_factory=dict) - validate_onboarding: Callable[[Any], bool] = field( - default_factory=lambda: (lambda x: True) - ) qualifications: List[Any] = field(default_factory=list) worker_can_do_unit: Callable[["Worker", "Unit"], bool] = field( default_factory=lambda: (lambda worker, unit: True) @@ -547,171 +532,33 @@ def get_task_end(self) -> Optional[float]: return 0.0 -class OnboardingRequired(object): +class BlueprintMixin(ABC): """ - Compositional class for blueprints that may have an onboarding step + Base class for compositional mixins for blueprints """ - @staticmethod - def get_failed_qual(qual_name: str) -> str: - """Returns the wrapper for a qualification to represent failing an onboarding""" - return qual_name + "-failed" - - def init_onboarding_config( + @abstractmethod + def init_mixin_config( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState" - ): - self.onboarding_qualification_name: Optional[str] = args.blueprint.get( - "onboarding_qualification", None - ) - self.onboarding_data = shared_state.onboarding_data - self.use_onboarding = self.onboarding_qualification_name is not None - self.onboarding_qualification_id = None - if self.onboarding_qualification_name is not None: - db = task_run.db - found_qualifications = db.find_qualifications( - self.onboarding_qualification_name - ) - if len(found_qualifications) == 0: - self.onboarding_qualification_id = db.make_qualification( - self.onboarding_qualification_name - ) - else: - self.onboarding_qualification_id = found_qualifications[0].db_id - - # We need to keep a separate qualification for failed onboarding - # to push to a crowd provider in order to prevent workers - # who have failed from being shown our task - self.onboarding_failed_name = self.get_failed_qual( - self.onboarding_qualification_name - ) - found_qualifications = db.find_qualifications(self.onboarding_failed_name) - if len(found_qualifications) == 0: - self.onboarding_failed_id = db.make_qualification( - self.onboarding_failed_name - ) - else: - self.onboarding_failed_id = found_qualifications[0].db_id - - def get_onboarding_data(self, worker_id: str) -> Dict[str, Any]: - """ - If the onboarding task on the frontend requires any specialized data, the blueprint - should provide it for the user. - - As onboarding qualifies a worker for all tasks from this blueprint, this should - generally be static data that can later be evaluated against. - """ - return self.onboarding_data - - def validate_onboarding( - self, worker: "Worker", onboarding_agent: "OnboardingAgent" - ) -> bool: - """ - Check the incoming onboarding data and evaluate if the worker - has passed the qualification or not. Return True if the worker - has qualified. - """ - return True - - -class ValidationRequired(object): - """ - Compositional class for blueprints that may have a first task to - qualify workers who have never attempted the task before - """ - - def init_validation_config( - self, - task_run: "TaskRun", - args: "DictConfig", - shared_state: "SharedTaskState", ) -> None: - self.use_validation_task = args.blueprint.get("use_validation_task", False) - if not self.use_validation_task: - return - - # Runs that are using a qualification task should be able to assign - # a specially generated unit to unqualified workers - self.passed_qualification_name = args.blueprint.passed_qualification_name - self.failed_qualification_name = args.blueprint.block_qualification - self.generate_validation_unit_data = shared_state.generate_validation_unit_data - - find_or_create_qualification(task_run.db, self.passed_qualification_name) - find_or_create_qualification(task_run.db, self.failed_qualification_name) - - @classmethod - def assert_task_args(cls, args: "DictConfig", shared_state: "SharedTaskState"): - use_validation_task = args.blueprint.get("use_validation_task", False) - if not use_validation_task: - return - passed_qualification_name = args.blueprint.passed_qualification_name - failed_qualification_name = args.blueprint.block_qualification - assert args.task.allowed_concurrent == 1, ( - "Can only run this task type with one allowed task at a time per worker, to ensure validation " - "before moving into more tasks." - ) - assert ( - passed_qualification_name is not None - ), "Must supply an passed_qualification_name in Hydra args to use a qualification task" - assert ( - failed_qualification_name is not None - ), "Must supply an block_qualification in Hydra args to use a qualification task" - assert hasattr(shared_state, "generate_validation_unit_data"), ( - "You must supply a generate_validation_unit_data function in your SharedTaskState to use " - "qualification tasks." - ) - generate_validation_unit_data = shared_state.generate_validation_unit_data - if generate_validation_unit_data is not False: - assert isinstance(generate_validation_unit_data, types.GeneratorType), ( - "Must provide a generator function to SharedTaskState.generate_validation_unit_data if " - "you want to generate validation tasks on the fly, or False if you can validate on any task " - ) - - def worker_needs_validation(self, worker: "Worker") -> bool: - """Workers that are able to access the task (not blocked) but are not passed need qualification""" - return worker.get_granted_qualification(self.passed_qualification_name) is None - - def should_generate_unit(self) -> bool: - return self.generate_validation_unit_data is not False - - def get_validation_unit_data(self) -> Optional[Dict[str, Any]]: - try: - return next(self.generate_validation_unit_data) - except StopIteration: - return None # No validation units left... + """Method to initialize any required attributes to make this mixin function""" + raise NotImplementedError() @classmethod - def create_validation_function( - cls, args: "DictConfig", validate_unit: Callable[["Unit"], bool] - ): - """ - Takes in a validator function to determine if validation units are - passable, and returns a `on_unit_submitted` function to be used - in the SharedTaskState - """ - passed_qualification_name = args.blueprint.passed_qualification_name - failed_qualification_name = args.blueprint.block_qualification - - def _wrapped_validate(unit): - if unit.unit_index >= 0: - return # We only run validation on the validatable units - - validation_result = validate_unit(unit) - agent = unit.get_assigned_agent() - if validation_result is True: - agent.get_worker().grant_qualification(passed_qualification_name) - elif validation_result is False: - agent.get_worker().grant_qualification(failed_qualification_name) - - return _wrapped_validate + @abstractmethod + def assert_task_args( + cls, args: "DictConfig", shared_state: "SharedTaskState" + ) -> None: + """Method to validate the incoming args and throw if something won't work""" + raise NotImplementedError() @classmethod - def get_relevant_qualifications(cls, args: "DictConfig"): - """Creates the relevant task qualifications for this task""" - passed_qualification_name = args.blueprint.passed_qualification_name - failed_qualification_name = args.blueprint.block_qualification - # Block_qualification is currently covered in mephisto core, and we don't need - # to invoke any viewership constraints for passed workers. - return [] + @abstractmethod + def get_mixin_qualifications( + cls, args: "DictConfig", shared_state: "SharedTaskState" + ) -> List[Dict[str, Any]]: + """Method to provide any required qualifications to make this mixin function""" + raise NotImplementedError() class Blueprint(ABC): diff --git a/mephisto/abstractions/blueprints/README.md b/mephisto/abstractions/blueprints/README.md index 9a0606fe2..2d36b12ee 100644 --- a/mephisto/abstractions/blueprints/README.md +++ b/mephisto/abstractions/blueprints/README.md @@ -35,8 +35,8 @@ A blueprint is able to create a container that handles any shared data that is i Blueprints sometimes share some component functionality that may be useful across a multitude of tasks. We capture these in mixins. Mephisto is able to recognize certain mixins in order to complete additional operations, however custom mixins may help cut down on boiler plate in common `run_task.py` scripts. As your tasks mature, we suggest utilizing blueprint mixins to share common workflows and design patterns you observe. ### `OnboardingRequired` This mixin allows for blueprints that require people to complete an onboarding task _before_ they're even able to start on their first task. Usually this is useful for providing task context, and then quizzing workers to see if they understand what's provided. Tasks using this mixin will activate onboarding mode for new `Worker`s whenever the `mephisto.blueprint.onboarding_qualification` hydra argument is provided. -### `ValidationRequired` -This mixin allows for blueprints that require people to complete a _test_ version of the real task the first time a worker does the task. This allows you to validate workers on a run of the real task, either on your actual data (when providing `SharedTaskState.generate_validation_unit_data=False`) or on test data that you may more easily validate using (when providing a generator to `SharedTaskState.generate_validation_unit_data`). The tasks should be the same as your standard task, just able to be validated. If your generator runs out, Mephisto assumes that you've hit the limit of workers you're willing to validate for the task (because you do pay for the validation step). +### `ScreenTaskRequired` +This mixin allows for blueprints that require people to complete a _test_ version of the real task the first time a worker does the task. This allows you to validate workers on a run of the real task, either on your actual data (when providing `SharedTaskState.generate_screening_unit_data=False`) or on test data that you may more easily validate using (when providing a generator to `SharedTaskState.generate_screening_unit_data`). The tasks should be the same as your standard task, just able to be easily validated. You **do pay** for screening tasks, and as such we ask you set `mephisto.blueprint.max_screening_units` to put a cap on how many screening units you want to launch. ## Implementations diff --git a/mephisto/abstractions/blueprints/abstract/static_task/static_blueprint.py b/mephisto/abstractions/blueprints/abstract/static_task/static_blueprint.py index ed31d9382..44f00c821 100644 --- a/mephisto/abstractions/blueprints/abstract/static_task/static_blueprint.py +++ b/mephisto/abstractions/blueprints/abstract/static_task/static_blueprint.py @@ -6,10 +6,13 @@ from mephisto.abstractions.blueprint import ( Blueprint, - OnboardingRequired, BlueprintArgs, SharedTaskState, ) +from mephisto.abstractions.blueprints.mixins.onboarding_required import ( + OnboardingRequired, + OnboardingSharedState, +) from dataclasses import dataclass, field from omegaconf import MISSING, DictConfig from mephisto.data_model.assignment import InitializationData @@ -49,7 +52,7 @@ @dataclass -class SharedStaticTaskState(SharedTaskState): +class SharedStaticTaskState(SharedTaskState, OnboardingSharedState): static_task_data: Iterable[Any] = field(default_factory=list) @@ -105,7 +108,10 @@ class StaticBlueprint(Blueprint, OnboardingRequired): SharedStateClass = SharedStaticTaskState def __init__( - self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState" + self, + task_run: "TaskRun", + args: "DictConfig", + shared_state: "SharedStaticTaskState", ): super().__init__(task_run, args, shared_state) self.init_onboarding_config(task_run, args, shared_state) @@ -144,7 +150,7 @@ def __init__( pass @classmethod - def assert_task_args(cls, args: DictConfig, shared_state: "SharedTaskState"): + def assert_task_args(cls, args: DictConfig, shared_state: "SharedStaticTaskState"): """Ensure that the data can be properly loaded""" blue_args = args.blueprint if blue_args.get("data_csv", None) is not None: @@ -196,16 +202,3 @@ def data_generator() -> Iterable["InitializationData"]: ) for d in self._initialization_data_dicts ] - - def validate_onboarding( - self, worker: "Worker", onboarding_agent: "OnboardingAgent" - ) -> bool: - """ - Check the incoming onboarding data and evaluate if the worker - has passed the qualification or not. Return True if the worker - has qualified. - """ - data = onboarding_agent.state.get_data() - return self.shared_state.validate_onboarding( - data - ) # data["outputs"].get("success", True) diff --git a/mephisto/abstractions/blueprints/mixins/__init__.py b/mephisto/abstractions/blueprints/mixins/__init__.py new file mode 100644 index 000000000..240697e32 --- /dev/null +++ b/mephisto/abstractions/blueprints/mixins/__init__.py @@ -0,0 +1,5 @@ +#!/usr/bin/env python3 + +# Copyright (c) Facebook, Inc. and its affiliates. +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. diff --git a/mephisto/abstractions/blueprints/mixins/onboarding_required.py b/mephisto/abstractions/blueprints/mixins/onboarding_required.py new file mode 100644 index 000000000..8498c1468 --- /dev/null +++ b/mephisto/abstractions/blueprints/mixins/onboarding_required.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 + +# Copyright (c) Facebook, Inc. and its affiliates. +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +from typing import ( + Optional, + Dict, + List, + Any, + Callable, + TYPE_CHECKING, +) + +from mephisto.abstractions.blueprint import BlueprintMixin +from dataclasses import dataclass, field +from omegaconf import MISSING, DictConfig +from mephisto.data_model.qualification import make_qualification_dict, QUAL_NOT_EXIST +from mephisto.operations.utils import find_or_create_qualification + +if TYPE_CHECKING: + from mephisto.abstractions.blueprint import SharedTaskState + from mephisto.data_model.agent import OnboardingAgent + from mephisto.data_model.task_run import TaskRun + from mephisto.data_model.worker import Worker + from argparse import _ArgumentGroup as ArgumentGroup + + +@dataclass +class OnboardingRequiredArgs: + onboarding_qualification: str = field( + default=MISSING, + metadata={ + "help": ( + "Specify the name of a qualification used to block workers who fail onboarding, " + "Empty will skip onboarding." + ) + }, + ) + + +@dataclass +class OnboardingSharedState: + onboarding_data: Dict[str, Any] = field(default_factory=dict) + validate_onboarding: Callable[[Any], bool] = field( + default_factory=lambda: (lambda x: True) + ) + + +class OnboardingRequired(BlueprintMixin): + """ + Compositional class for blueprints that may have an onboarding step + """ + + def init_mixin_config( + self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState" + ) -> None: + """Method to initialize any required attributes to make this mixin function""" + self.init_onboarding_config(task_run, args, shared_state) + + @classmethod + def assert_task_args( + cls, args: "DictConfig", shared_state: "SharedTaskState" + ) -> None: + """Method to validate the incoming args and throw if something won't work""" + # Is there any validation that should be done on the onboarding qualification name? + return + + @classmethod + def get_mixin_qualifications( + cls, args: "DictConfig", shared_state: "SharedTaskState" + ) -> List[Dict[str, Any]]: + """Method to provide any required qualifications to make this mixin function""" + onboarding_qualification_name: Optional[str] = args.blueprint.get( + "onboarding_qualification", None + ) + if onboarding_qualification_name is None: + # Not using an onboarding qualification + return [] + return [ + # We need to keep a separate qualification for failed onboarding + # to push to a crowd provider in order to prevent workers + # who have failed from being shown our task + make_qualification_dict( + cls.get_failed_qual(onboarding_qualification_name), + QUAL_NOT_EXIST, + None, + ) + ] + + @staticmethod + def get_failed_qual(qual_name: str) -> str: + """Returns the wrapper for a qualification to represent failing an onboarding""" + return qual_name + "-failed" + + def init_onboarding_config( + self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState" + ): + self.onboarding_qualification_name: Optional[str] = args.blueprint.get( + "onboarding_qualification", None + ) + self.onboarding_data = shared_state.onboarding_data + self.use_onboarding = self.onboarding_qualification_name is not None + self.onboarding_qualification_id = None + if not self.use_onboarding: + return + + db = task_run.db + self.onboarding_qualification_id = find_or_create_qualification( + db, + self.onboarding_qualification_name, + ) + self.onboarding_failed_name = self.get_failed_qual( + self.onboarding_qualification_name + ) + self.onboarding_failed_id = find_or_create_qualification( + db, self.onboarding_failed_name + ) + + def get_onboarding_data(self, worker_id: str) -> Dict[str, Any]: + """ + If the onboarding task on the frontend requires any specialized data, the blueprint + should provide it for the user. + + As onboarding qualifies a worker for all tasks from this blueprint, this should + generally be static data that can later be evaluated against. + """ + return self.onboarding_data + + def validate_onboarding( + self, worker: "Worker", onboarding_agent: "OnboardingAgent" + ) -> bool: + """ + Check the incoming onboarding data and evaluate if the worker + has passed the qualification or not. Return True if the worker + has qualified. + + By default we use the validate_onboarding provided in a run_task, + and all onboarding tasks should allow run_task to specify additional + or entirely override what's provided in a blueprint. + """ + data = onboarding_agent.state.get_data() + return self.shared_state.validate_onboarding(data) diff --git a/mephisto/abstractions/blueprints/mixins/screen_task_required.py b/mephisto/abstractions/blueprints/mixins/screen_task_required.py new file mode 100644 index 000000000..f2eb8a40b --- /dev/null +++ b/mephisto/abstractions/blueprints/mixins/screen_task_required.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python3 + +# Copyright (c) Facebook, Inc. and its affiliates. +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +from typing import ( + Optional, + Dict, + Any, + Union, + Iterable, + Callable, + Tuple, + Generator, + TYPE_CHECKING, +) + +import types +from mephisto.abstractions.blueprint import BlueprintMixin +from dataclasses import dataclass, field +from omegaconf import MISSING, DictConfig +from mephisto.data_model.qualification import make_qualification_dict, QUAL_NOT_EXIST +from mephisto.operations.utils import find_or_create_qualification + + +if TYPE_CHECKING: + from mephisto.data_model.task_run import TaskRun + from mephisto.data_model.unit import Unit + from mephisto.data_model.packet import Packet + from mephisto.data_model.worker import Worker + from argparse import _ArgumentGroup as ArgumentGroup + + +@dataclass +class ScreenTaskRequiredArgs: + passed_qualification_name: str = field( + default=MISSING, + metadata={ + "help": ( + "Specify the name of a qualification used to designate " + "workers who have passed screening." + ) + }, + ) + max_screening_units: int = field( + default=MISSING, + metadata={ + "help": ( + "The maximum number of screening units that can be launched " + "with this batch, specified to limit the number of validations " + "you may need to pay out for." + ) + }, + ) + use_screening_task: bool = field( + default=MISSING, + metadata={"help": ("Whether or not to use a screening task in this run.")}, + ) + + +ScreenUnitDataGenerator = Generator[Dict[str, Any], None, None] + + +@dataclass +class ScreenTaskSharedState: + onboarding_data: Dict[str, Any] = field(default_factory=dict) + generate_screening_unit_data: Tuple[bool, ScreenUnitDataGenerator] = field( + default_factory=lambda: (lambda x: {}) + ) + + +class ScreenTaskRequired(BlueprintMixin): + """ + Compositional class for blueprints that may have a first task to + qualify workers who have never attempted the task before + """ + + def init_mixin_config( + self, + task_run: "TaskRun", + args: "DictConfig", + shared_state: "ScreenTaskSharedState", + ) -> None: + return self.init_screening_config(task_run, args, shared_state) + + def init_screening_config( + self, + task_run: "TaskRun", + args: "DictConfig", + shared_state: "ScreenTaskSharedState", + ) -> None: + self.use_screening_task = args.blueprint.get("use_screening_task", False) + if not self.use_screening_task: + return + + # Runs that are using a qualification task should be able to assign + # a specially generated unit to unqualified workers + self.passed_qualification_name = args.blueprint.passed_qualification_name + self.failed_qualification_name = args.blueprint.block_qualification + self.generate_screening_unit_data: Tuple[ + bool, ScreenUnitDataGenerator + ] = shared_state.generate_screening_unit_data + self.screening_units_launched = 0 + self.screening_unit_cap = args.blueprint.max_screening_units + + find_or_create_qualification(task_run.db, self.passed_qualification_name) + find_or_create_qualification(task_run.db, self.failed_qualification_name) + + @classmethod + def assert_task_args(cls, args: "DictConfig", shared_state: "SharedTaskState"): + use_screening_task = args.blueprint.get("use_screening_task", False) + if not use_screening_task: + return + passed_qualification_name = args.blueprint.passed_qualification_name + failed_qualification_name = args.blueprint.block_qualification + assert args.task.allowed_concurrent == 1, ( + "Can only run this task type with one allowed task at a time per worker, to ensure screening " + "before moving into more tasks." + ) + assert ( + passed_qualification_name is not None + ), "Must supply an passed_qualification_name in Hydra args to use a qualification task" + assert ( + failed_qualification_name is not None + ), "Must supply an block_qualification in Hydra args to use a qualification task" + assert hasattr(shared_state, "generate_screening_unit_data"), ( + "You must supply a generate_screening_unit_data function in your SharedTaskState to use " + "qualification tasks." + ) + max_screening_units = args.blueprint.max_screening_units + assert max_screening_units is not None, ( + "You must supply a blueprint.max_screening_units argument to set the maximum number of " + "additional tasks you will pay out for the purpose of validating new workers. " + ) + generate_screening_unit_data = shared_state.generate_screening_unit_data + if generate_screening_unit_data is not False: + assert isinstance(generate_screening_unit_data, types.GeneratorType), ( + "Must provide a generator function to SharedTaskState.generate_screening_unit_data if " + "you want to generate screening tasks on the fly, or False if you can validate on any task " + ) + + def worker_needs_screening(self, worker: "Worker") -> bool: + """Workers that are able to access the task (not blocked) but are not passed need qualification""" + return worker.get_granted_qualification(self.passed_qualification_name) is None + + def should_generate_unit(self) -> bool: + return self.generate_screening_unit_data is not False + + def get_screening_unit_data(self) -> Optional[Dict[str, Any]]: + try: + if self.screening_units_launched > self.screening_unit_cap: + return None # Exceeded the cap on these units + else: + return next(self.generate_screening_unit_data) + except StopIteration: + return None # No screening units left... + + @classmethod + def create_validation_function( + cls, args: "DictConfig", screen_unit: Callable[["Unit"], bool] + ): + """ + Takes in a validator function to determine if validation units are + passable, and returns a `on_unit_submitted` function to be used + in the SharedTaskState + """ + passed_qualification_name = args.blueprint.passed_qualification_name + failed_qualification_name = args.blueprint.block_qualification + + def _wrapped_validate(unit): + if unit.unit_index >= 0: + return # We only run validation on the validatable units + + validation_result = screen_unit(unit) + agent = unit.get_assigned_agent() + if validation_result is True: + agent.get_worker().grant_qualification(passed_qualification_name) + elif validation_result is False: + agent.get_worker().grant_qualification(failed_qualification_name) + + return _wrapped_validate + + @classmethod + def get_mixin_qualifications(cls, args: "DictConfig"): + """Creates the relevant task qualifications for this task""" + passed_qualification_name = args.blueprint.passed_qualification_name + failed_qualification_name = args.blueprint.block_qualification + return [ + make_qualification_dict( + cls.get_failed_qual(failed_qualification_name), + QUAL_NOT_EXIST, + None, + ) + ] diff --git a/mephisto/abstractions/blueprints/mock/mock_blueprint.py b/mephisto/abstractions/blueprints/mock/mock_blueprint.py index 3e106498c..f8c6d0bfb 100644 --- a/mephisto/abstractions/blueprints/mock/mock_blueprint.py +++ b/mephisto/abstractions/blueprints/mock/mock_blueprint.py @@ -6,10 +6,19 @@ from mephisto.abstractions.blueprint import ( Blueprint, - OnboardingRequired, BlueprintArgs, SharedTaskState, ) +from mephisto.abstractions.blueprints.mixins.onboarding_required import ( + OnboardingRequired, + OnboardingSharedState, + OnboardingRequiredArgs, +) +from mephisto.abstractions.blueprints.mixins.screen_task_required import ( + ScreenTaskRequired, + ScreenTaskSharedState, + ScreenTaskRequiredArgs, +) from dataclasses import dataclass, field from omegaconf import MISSING, DictConfig from mephisto.data_model.assignment import InitializationData @@ -35,7 +44,7 @@ @dataclass -class MockBlueprintArgs(BlueprintArgs): +class MockBlueprintArgs(BlueprintArgs, OnboardingRequiredArgs, ScreenTaskRequiredArgs): _blueprint_type: str = BLUEPRINT_TYPE num_assignments: int = field( default=MISSING, @@ -57,8 +66,15 @@ class MockBlueprintArgs(BlueprintArgs): ) +# Mock tasks right now inherit all mixins, this way we can test them. +# In the future, we'll likely want to compose mock tasks for mixin testing +@dataclass +class MockSharedState(SharedTaskState, OnboardingSharedState, ScreenTaskSharedState): + pass + + @register_mephisto_abstraction() -class MockBlueprint(Blueprint, OnboardingRequired): +class MockBlueprint(Blueprint, OnboardingRequired, ScreenTaskRequired): """Mock of a task type, for use in testing""" AgentStateClass: ClassVar[Type["AgentState"]] = MockAgentState @@ -67,10 +83,11 @@ class MockBlueprint(Blueprint, OnboardingRequired): TaskRunnerClass: ClassVar[Type["TaskRunner"]] = MockTaskRunner ArgsClass: ClassVar[Type["BlueprintArgs"]] = MockBlueprintArgs supported_architects: ClassVar[List[str]] = ["mock"] + SharedStateClass: ClassVar[Type["SharedTaskState"]] = MockSharedState BLUEPRINT_TYPE = BLUEPRINT_TYPE def __init__( - self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState" + self, task_run: "TaskRun", args: "DictConfig", shared_state: "MockSharedState" ): super().__init__(task_run, args, shared_state) self.init_onboarding_config(task_run, args, shared_state) diff --git a/mephisto/abstractions/blueprints/parlai_chat/parlai_chat_blueprint.py b/mephisto/abstractions/blueprints/parlai_chat/parlai_chat_blueprint.py index 64c64023b..aa5e54eca 100644 --- a/mephisto/abstractions/blueprints/parlai_chat/parlai_chat_blueprint.py +++ b/mephisto/abstractions/blueprints/parlai_chat/parlai_chat_blueprint.py @@ -6,10 +6,13 @@ from mephisto.abstractions.blueprint import ( Blueprint, - OnboardingRequired, BlueprintArgs, SharedTaskState, ) +from mephisto.abstractions.blueprints.mixins.onboarding_required import ( + OnboardingRequired, + OnboardingSharedState, +) from dataclasses import dataclass, field from mephisto.data_model.assignment import InitializationData from mephisto.abstractions.blueprints.parlai_chat.parlai_chat_agent_state import ( @@ -55,7 +58,7 @@ @dataclass -class SharedParlAITaskState(SharedTaskState): +class SharedParlAITaskState(SharedTaskState, OnboardingSharedState): frontend_task_opts: Dict[str, Any] = field(default_factory=dict) world_opt: Dict[str, Any] = field(default_factory=dict) onboarding_world_opt: Dict[str, Any] = field(default_factory=dict) @@ -144,7 +147,10 @@ class ParlAIChatBlueprint(Blueprint, OnboardingRequired): BLUEPRINT_TYPE = BLUEPRINT_TYPE def __init__( - self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState" + self, + task_run: "TaskRun", + args: "DictConfig", + shared_state: "SharedParlAITaskState", ): super().__init__(task_run, args, shared_state) self._initialization_data_dicts: List[Dict[str, Any]] = [] @@ -210,7 +216,7 @@ def __init__( @classmethod def assert_task_args( - cls, args: "DictConfig", shared_state: "SharedTaskState" + cls, args: "DictConfig", shared_state: "SharedParlAITaskState" ) -> None: """Ensure that arguments are properly configured to launch this task""" # Find world module diff --git a/mephisto/abstractions/blueprints/static_html_task/static_html_blueprint.py b/mephisto/abstractions/blueprints/static_html_task/static_html_blueprint.py index 8df2730dd..cb766d238 100644 --- a/mephisto/abstractions/blueprints/static_html_task/static_html_blueprint.py +++ b/mephisto/abstractions/blueprints/static_html_task/static_html_blueprint.py @@ -143,13 +143,3 @@ def assert_task_args(cls, args: DictConfig, shared_state: "SharedTaskState"): "Must use an onboarding validation function to use onboarding " "with static tasks." ) - - def validate_onboarding( - self, worker: "Worker", onboarding_agent: "OnboardingAgent" - ) -> bool: - """ - Check the incoming onboarding data and evaluate if the worker - has passed the qualification or not. Return True if the worker - has qualified. - """ - return self.shared_state.validate_onboarding(onboarding_agent.state.get_data()) diff --git a/mephisto/abstractions/test/architect_tester.py b/mephisto/abstractions/test/architect_tester.py index 395c2ef1e..f52daf49b 100644 --- a/mephisto/abstractions/test/architect_tester.py +++ b/mephisto/abstractions/test/architect_tester.py @@ -15,13 +15,13 @@ from mephisto.data_model.task_run import TaskRun from mephisto.abstractions.test.utils import get_test_task_run from mephisto.abstractions.database import MephistoDB -from mephisto.abstractions.blueprint import SharedTaskState +from mephisto.abstractions.blueprints.mock.mock_blueprint import MockSharedState from mephisto.abstractions.blueprints.mock.mock_task_builder import MockTaskBuilder from mephisto.abstractions.databases.local_database import LocalMephistoDB from mephisto.operations.hydra_config import MephistoConfig from omegaconf import OmegaConf -EMPTY_STATE = SharedTaskState() +EMPTY_STATE = MockSharedState() class ArchitectTests(unittest.TestCase): diff --git a/mephisto/data_model/task_run.py b/mephisto/data_model/task_run.py index 9298c6cd1..12280f94b 100644 --- a/mephisto/data_model/task_run.py +++ b/mephisto/data_model/task_run.py @@ -206,10 +206,10 @@ def get_blueprint( args = self.args else: cache = True - if shared_state is None: - shared_state = SharedTaskState() BlueprintClass = get_blueprint_from_type(self.task_type) + if shared_state is None: + shared_state = BlueprintClass.SharedStateClass() if not cache: return BlueprintClass(self, args, shared_state) self.__blueprint = BlueprintClass(self, args, shared_state) diff --git a/mephisto/operations/operator.py b/mephisto/operations/operator.py index 243ce0773..d3b8a08ed 100644 --- a/mephisto/operations/operator.py +++ b/mephisto/operations/operator.py @@ -24,7 +24,10 @@ from mephisto.data_model.task_config import TaskConfig from mephisto.data_model.task_run import TaskRun from mephisto.data_model.requester import Requester -from mephisto.abstractions.blueprint import OnboardingRequired, SharedTaskState +from mephisto.abstractions.blueprint import SharedTaskState +from mephisto.abstractions.blueprints.mixins.onboarding_required import ( + OnboardingRequired, +) from mephisto.abstractions.database import MephistoDB, EntryDoesNotExistException from mephisto.data_model.qualification import make_qualification_dict, QUAL_NOT_EXIST from mephisto.operations.task_launcher import TaskLauncher diff --git a/mephisto/operations/supervisor.py b/mephisto/operations/supervisor.py index 1ef019533..b17fa59d9 100644 --- a/mephisto/operations/supervisor.py +++ b/mephisto/operations/supervisor.py @@ -27,13 +27,15 @@ from mephisto.data_model.worker import Worker from mephisto.data_model.qualification import worker_is_qualified from mephisto.data_model.agent import Agent, OnboardingAgent -from mephisto.abstractions.blueprint import ( +from mephisto.abstractions.blueprint import AgentState +from mephisto.abstractions.blueprints.mixins.onboarding_required import ( OnboardingRequired, - ValidationRequired, - AgentState, +) +from mephisto.abstractions.blueprints.mixins.screen_task_required import ( + ScreenTaskRequired, ) from mephisto.operations.registry import get_crowd_provider_from_type -from mephisto.operations.task_launcher import TaskLauncher, VALIDATOR_UNIT_INDEX +from mephisto.operations.task_launcher import TaskLauncher from mephisto.abstractions.channel import Channel, STATUS_CHECK_TIME from dataclasses import dataclass @@ -665,15 +667,15 @@ def _register_agent(self, packet: Packet, channel_info: ChannelInfo): agent_info.assignment_thread = onboard_thread onboard_thread.start() return - if isinstance(blueprint, ValidationRequired) and blueprint.use_validation_task: + if isinstance(blueprint, ScreenTaskRequired) and blueprint.use_screening_task: if ( - blueprint.worker_needs_validation(worker) + blueprint.worker_needs_screening(worker) and blueprint.should_generate_unit() ): - validation_data = blueprint.get_validation_unit_data() - if validation_data is not None: + screening_data = blueprint.get_screening_unit_data() + if screening_data is not None: launcher = channel_info.job.task_launcher - units = [launcher.launch_validator_unit(validation_data)] + units = [launcher.launch_screening_unit(screening_data)] else: self.message_queue.append( Packet( @@ -687,7 +689,7 @@ def _register_agent(self, packet: Packet, channel_info: ChannelInfo): ) ) logger.debug( - f"No validation units left for {agent_registration_id}." + f"No screening units left for {agent_registration_id}." ) return diff --git a/mephisto/operations/task_launcher.py b/mephisto/operations/task_launcher.py index d43ea2459..869637b57 100644 --- a/mephisto/operations/task_launcher.py +++ b/mephisto/operations/task_launcher.py @@ -35,7 +35,7 @@ UNIT_GENERATOR_WAIT_SECONDS = 10 ASSIGNMENT_GENERATOR_WAIT_SECONDS = 0.5 -VALIDATOR_UNIT_INDEX = -1 +SCREENING_UNIT_INDEX = -1 class GeneratorType(enum.Enum): @@ -206,11 +206,11 @@ def launch_units(self, url: str) -> None: ) self.units_thread.start() - def launch_validator_unit(self, unit_data: Dict[str, Any]) -> "Unit": - """Launch a validator unit, which should never return to the pool""" + def launch_screening_unit(self, unit_data: Dict[str, Any]) -> "Unit": + """Launch a screening unit, which should never return to the pool""" assert ( self.launch_url is not None - ), "Cannot launch a validator unit before launching others" + ), "Cannot launch a screening unit before launching others" task_run = self.task_run task_config = task_run.get_task_config() assignment_id = self.db.new_assignment( @@ -230,7 +230,7 @@ def launch_validator_unit(self, unit_data: Dict[str, Any]) -> "Unit": task_run.db_id, task_run.requester_id, assignment_id, - VALIDATOR_UNIT_INDEX, + SCREENING_UNIT_INDEX, task_config.task_reward, task_run.provider_type, task_run.task_type, diff --git a/mephisto/operations/utils.py b/mephisto/operations/utils.py index 0b61e0b44..039c9fb9c 100644 --- a/mephisto/operations/utils.py +++ b/mephisto/operations/utils.py @@ -183,17 +183,16 @@ def build_arg_list_from_dict(in_dict: Dict[str, Any]) -> List[str]: return arg_list -def find_or_create_qualification(db, qualification_name) -> None: +def find_or_create_qualification(db, qualification_name) -> str: """ Ensure the given qualification exists in the db, - creating it if it doesn't already + creating it if it doesn't already. Returns the id """ - from mephisto.abstractions.database import EntryAlreadyExistsException - - try: - db.make_qualification(qualification_name) - except EntryAlreadyExistsException: - pass # qualification already exists + found_qualifications = db.find_qualifications(qualification_name) + if len(found_qualifications) == 0: + return db.make_qualification(qualification_name) + else: + return found_qualifications[0].db_id def get_dict_from_field(in_field: Field) -> Dict[str, Any]: diff --git a/test/abstractions/architects/test_heroku_architect.py b/test/abstractions/architects/test_heroku_architect.py index 510d570fe..4ff808dd7 100644 --- a/test/abstractions/architects/test_heroku_architect.py +++ b/test/abstractions/architects/test_heroku_architect.py @@ -21,7 +21,7 @@ from omegaconf import OmegaConf from mephisto.operations.hydra_config import MephistoConfig -from mephisto.abstractions.blueprint import SharedTaskState +from mephisto.abstractions.blueprints.mock.mock_blueprint import MockSharedState # TODO(#104) these tests should be marked as nightly's rather than on every run? # Maybe with some kind of LONG TEST flag? Investigate @@ -42,7 +42,7 @@ def get_architect(self) -> HerokuArchitect: arch_args = HerokuArchitectArgs(heroku_team=None, use_hobby=False) args = OmegaConf.structured(MephistoConfig(architect=arch_args)) self.curr_architect = self.ArchitectClass( - self.db, args, SharedTaskState(), self.task_run, self.build_dir + self.db, args, MockSharedState(), self.task_run, self.build_dir ) return self.curr_architect diff --git a/test/core/test_supervisor.py b/test/core/test_supervisor.py index e617b42da..8a169954f 100644 --- a/test/core/test_supervisor.py +++ b/test/core/test_supervisor.py @@ -24,7 +24,6 @@ from mephisto.data_model.assignment import InitializationData from mephisto.data_model.task_run import TaskRun from mephisto.operations.supervisor import Supervisor, Job -from mephisto.abstractions.blueprint import SharedTaskState from mephisto.abstractions.architects.mock_architect import ( @@ -33,12 +32,15 @@ ) from mephisto.operations.hydra_config import MephistoConfig from mephisto.abstractions.providers.mock.mock_provider import MockProviderArgs -from mephisto.abstractions.blueprints.mock.mock_blueprint import MockBlueprintArgs +from mephisto.abstractions.blueprints.mock.mock_blueprint import ( + MockBlueprintArgs, + MockSharedState, +) from mephisto.data_model.task_config import TaskConfigArgs from omegaconf import OmegaConf -EMPTY_STATE = SharedTaskState() +EMPTY_STATE = MockSharedState() class BaseTestSupervisor: From 7469aa6d8b332956deb386e542e470150e8e8d54 Mon Sep 17 00:00:00 2001 From: Jack Urbanek Date: Wed, 22 Sep 2021 17:12:47 -0400 Subject: [PATCH 5/8] Add testing for ScreenTaskRequired --- .../blueprints/mixins/screen_task_required.py | 15 +- .../blueprints/mock/mock_blueprint.py | 2 + mephisto/operations/operator.py | 1 + mephisto/operations/supervisor.py | 15 +- test/core/test_supervisor.py | 208 +++++++++++++++++- 5 files changed, 231 insertions(+), 10 deletions(-) diff --git a/mephisto/abstractions/blueprints/mixins/screen_task_required.py b/mephisto/abstractions/blueprints/mixins/screen_task_required.py index f2eb8a40b..150c28b5e 100644 --- a/mephisto/abstractions/blueprints/mixins/screen_task_required.py +++ b/mephisto/abstractions/blueprints/mixins/screen_task_required.py @@ -54,7 +54,7 @@ class ScreenTaskRequiredArgs: }, ) use_screening_task: bool = field( - default=MISSING, + default=False, metadata={"help": ("Whether or not to use a screening task in this run.")}, ) @@ -62,11 +62,16 @@ class ScreenTaskRequiredArgs: ScreenUnitDataGenerator = Generator[Dict[str, Any], None, None] +def blank_generator(): + while True: + yield {} + + @dataclass class ScreenTaskSharedState: onboarding_data: Dict[str, Any] = field(default_factory=dict) generate_screening_unit_data: Tuple[bool, ScreenUnitDataGenerator] = field( - default_factory=lambda: (lambda x: {}) + default_factory=lambda: blank_generator() ) @@ -149,10 +154,12 @@ def should_generate_unit(self) -> bool: def get_screening_unit_data(self) -> Optional[Dict[str, Any]]: try: - if self.screening_units_launched > self.screening_unit_cap: + if self.screening_units_launched >= self.screening_unit_cap: return None # Exceeded the cap on these units else: - return next(self.generate_screening_unit_data) + data = next(self.generate_screening_unit_data) + self.screening_units_launched += 1 + return data except StopIteration: return None # No screening units left... diff --git a/mephisto/abstractions/blueprints/mock/mock_blueprint.py b/mephisto/abstractions/blueprints/mock/mock_blueprint.py index f8c6d0bfb..b69bfae2f 100644 --- a/mephisto/abstractions/blueprints/mock/mock_blueprint.py +++ b/mephisto/abstractions/blueprints/mock/mock_blueprint.py @@ -90,7 +90,9 @@ def __init__( self, task_run: "TaskRun", args: "DictConfig", shared_state: "MockSharedState" ): super().__init__(task_run, args, shared_state) + # TODO these can be done with self.mro() and using the mixin variant self.init_onboarding_config(task_run, args, shared_state) + self.init_screening_config(task_run, args, shared_state) def get_initialization_data(self) -> Iterable[InitializationData]: """ diff --git a/mephisto/operations/operator.py b/mephisto/operations/operator.py index d3b8a08ed..bd18c35f8 100644 --- a/mephisto/operations/operator.py +++ b/mephisto/operations/operator.py @@ -227,6 +227,7 @@ def validate_and_run_config_or_die( ) # Small hack for auto appending block qualification + # TODO we can use blueprint.mro() to discover BlueprintMixins and extract from there existing_qualifications = shared_state.qualifications if run_config.blueprint.get("block_qualification", None) is not None: existing_qualifications.append( diff --git a/mephisto/operations/supervisor.py b/mephisto/operations/supervisor.py index b17fa59d9..50e934807 100644 --- a/mephisto/operations/supervisor.py +++ b/mephisto/operations/supervisor.py @@ -35,7 +35,7 @@ ScreenTaskRequired, ) from mephisto.operations.registry import get_crowd_provider_from_type -from mephisto.operations.task_launcher import TaskLauncher +from mephisto.operations.task_launcher import TaskLauncher, SCREENING_UNIT_INDEX from mephisto.abstractions.channel import Channel, STATUS_CHECK_TIME from dataclasses import dataclass @@ -390,8 +390,8 @@ def _launch_and_run_unit( self, unit: "Unit", agent_info: "AgentInfo", task_runner: "TaskRunner" ): """Launch a thread to supervise the completion of an assignment""" + agent = agent_info.agent try: - agent = agent_info.agent assert isinstance( agent, Agent ), f"Can launch units for Agents, not OnboardingAgents, got {agent}" @@ -408,8 +408,15 @@ def _launch_and_run_unit( logger.exception(f"Cleaning up unit: {e}", exc_info=True) task_runner.cleanup_unit(unit) finally: - if unit.unit_index < 0: # Special units should always expire - unit.expire() + print("Run completed") + if unit.unit_index == SCREENING_UNIT_INDEX: + if agent.get_status() != AgentState.STATUS_COMPLETED: + print("But was not completed!") + blueprint = task_runner.task_run.get_blueprint( + args=task_runner.args + ) + blueprint.screening_units_launched -= 1 + unit.expire() task_runner.task_run.clear_reservation(unit) def _assign_unit_to_agent( diff --git a/test/core/test_supervisor.py b/test/core/test_supervisor.py index 8a169954f..f1709c91c 100644 --- a/test/core/test_supervisor.py +++ b/test/core/test_supervisor.py @@ -13,18 +13,22 @@ from typing import List +from mephisto.abstractions.blueprint import AgentState from mephisto.abstractions.blueprints.mock.mock_blueprint import MockBlueprint from mephisto.abstractions.blueprints.mock.mock_task_runner import MockTaskRunner from mephisto.abstractions.architects.mock_architect import MockArchitect from mephisto.abstractions.providers.mock.mock_provider import MockProvider from mephisto.abstractions.databases.local_database import LocalMephistoDB from mephisto.abstractions.databases.local_singleton_database import MephistoSingletonDB -from mephisto.operations.task_launcher import TaskLauncher +from mephisto.operations.task_launcher import TaskLauncher, SCREENING_UNIT_INDEX +from mephisto.abstractions.blueprints.mixins.screen_task_required import ( + ScreenTaskRequired, +) from mephisto.abstractions.test.utils import get_test_task_run from mephisto.data_model.assignment import InitializationData from mephisto.data_model.task_run import TaskRun from mephisto.operations.supervisor import Supervisor, Job - +from mephisto.operations.utils import find_or_create_qualification from mephisto.abstractions.architects.mock_architect import ( MockArchitect, @@ -849,6 +853,206 @@ def test_register_job_with_onboarding(self): sup.shutdown() self.assertTrue(channel_info.channel.is_closed()) + def test_register_job_with_screening(self): + """Test registering and running a job with screening""" + if self.DB_CLASS != MephistoSingletonDB: + return # TODO(#97) This test only works with singleton for now due to disconnect simulation + + # Handle baseline setup + sup = Supervisor(self.db) + self.sup = sup + PASSED_QUALIFICATION_NAME = "test_screening_qualification" + FAILED_QUALIFICATION_NAME = "failed_screening_qualification" + + # Register onboarding arguments for blueprint + task_run_args = self.task_run.args + task_run_args.blueprint.use_screening_task = True + task_run_args.blueprint.passed_qualification_name = PASSED_QUALIFICATION_NAME + task_run_args.blueprint.block_qualification = FAILED_QUALIFICATION_NAME + task_run_args.blueprint.max_screening_units = 2 + task_run_args.blueprint.timeout_time = 5 + task_run_args.blueprint.is_concurrent = False + self.task_run.get_task_config() + + def screen_unit(unit): + if unit.get_assigned_agent() is None: + return None # No real data to evaluate + + agent = unit.get_assigned_agent() + output = agent.state.get_data() + if output is None: + return None # no data to evaluate + + return output["success"] + + shared_state = MockSharedState() + shared_state.on_unit_submitted = ScreenTaskRequired.create_validation_function( + task_run_args, + screen_unit, + ) + + # Supervisor expects that blueprint setup has already occurred + blueprint = self.task_run.get_blueprint(task_run_args, shared_state) + + TaskRunnerClass = MockBlueprint.TaskRunnerClass + task_runner = TaskRunnerClass(self.task_run, task_run_args, shared_state) + job = sup.register_job(self.architect, task_runner, self.provider) + job.task_launcher = self.launcher + self.assertEqual(len(sup.channels), 1) + channel_info = list(sup.channels.values())[0] + self.assertIsNotNone(channel_info) + self.assertTrue(channel_info.channel.is_alive()) + channel_id = channel_info.channel_id + task_runner = channel_info.job.task_runner + self.assertIsNotNone(channel_id) + self.assertEqual( + len(self.architect.server.subs), + 1, + "MockServer doesn't see registered channel", + ) + self.assertIsNotNone( + self.architect.server.last_alive_packet, + "No alive packet received by server", + ) + sup.launch_sending_thread() + self.assertIsNotNone(sup.sending_thread) + + # Register workers + mock_worker_name = "MOCK_WORKER" + self.architect.server.register_mock_worker(mock_worker_name) + workers = self.db.find_workers(worker_name=mock_worker_name) + self.assertEqual(len(workers), 1, "Worker not successfully registered") + worker_1 = workers[0] + worker_id = workers[0].db_id + + mock_worker_name = "MOCK_WORKER_2" + self.architect.server.register_mock_worker(mock_worker_name) + workers = self.db.find_workers(worker_name=mock_worker_name) + worker_2 = workers[0] + worker_id_2 = worker_2.db_id + + mock_worker_name = "MOCK_WORKER_3" + self.architect.server.register_mock_worker(mock_worker_name) + workers = self.db.find_workers(worker_name=mock_worker_name) + worker_3 = workers[0] + worker_id_3 = worker_3.db_id + + self.assertEqual(len(task_runner.running_units), 0) + + # Register a screening agent successfully + mock_agent_details = "FAKE_ASSIGNMENT" + self.architect.server.register_mock_agent(worker_id, mock_agent_details) + agents = self.db.find_agents() + self.assertEqual(len(agents), 1, "No agent created for screening") + time.sleep(0.1) + last_packet = self.architect.server.last_packet + self.assertIsNotNone(last_packet) + self.assertEqual( + agents[0].get_unit().unit_index, + SCREENING_UNIT_INDEX, + "Agent not assigned screening unit", + ) + self.architect.server.last_packet = None + + # Register a second screening agent successfully + mock_agent_details = "FAKE_ASSIGNMENT2" + self.architect.server.register_mock_agent(worker_id_2, mock_agent_details) + agents = self.db.find_agents() + self.assertEqual(len(agents), 2, "No agent created for screening") + last_packet = None + time.sleep(0.1) + last_packet = self.architect.server.last_packet + self.assertIsNotNone(last_packet) + self.assertEqual( + agents[1].get_unit().unit_index, + SCREENING_UNIT_INDEX, + "Agent not assigned screening unit", + ) + self.architect.server.last_packet = None + + # Fail to register a third screening agent + mock_agent_details = "FAKE_ASSIGNMENT3" + self.architect.server.register_mock_agent(worker_id_3, mock_agent_details) + agents = self.db.find_agents() + self.assertEqual(len(agents), 2, "Third agent created, when 2 was max") + time.sleep(0.1) + last_packet = self.architect.server.last_packet + self.assertIsNotNone(last_packet) + self.assertIsNone( + last_packet["data"]["agent_id"], "worker assigned real agent id" + ) + self.architect.server.last_packet = None + + # Disconnect first agent + agents[0].update_status(AgentState.STATUS_DISCONNECT) + time.sleep(0.5) + + # Register third screening agent + mock_agent_details = "FAKE_ASSIGNMENT3" + self.architect.server.register_mock_agent(worker_id_3, mock_agent_details) + agents = self.db.find_agents() + self.assertEqual(len(agents), 3, "Third agent not created") + time.sleep(0.1) + last_packet = self.architect.server.last_packet + self.assertIsNotNone(last_packet) + self.assertIsNotNone( + last_packet["data"]["agent_id"], "worker not assigned real agent id" + ) + self.assertEqual( + agents[2].get_unit().unit_index, + SCREENING_UNIT_INDEX, + "Agent not assigned screening unit", + ) + self.architect.server.last_packet = None + + # Submit screening from the agent + validation_data = {"success": False} + self.architect.server.send_agent_act(agents[1].get_agent_id(), validation_data) + # Assert failed screening screening + start_time = time.time() + TIMEOUT_TIME = 5 + while time.time() - start_time < TIMEOUT_TIME: + if worker_2.is_qualified(FAILED_QUALIFICATION_NAME): + break + time.sleep(0.1) + self.assertLess( + time.time() - start_time, TIMEOUT_TIME, "Did not qualify in time" + ) + + # Submit screening from the agent + validation_data = {"success": True} + self.architect.server.send_agent_act(agents[2].get_agent_id(), validation_data) + # Assert successful screening screening + start_time = time.time() + TIMEOUT_TIME = 5 + while time.time() - start_time < TIMEOUT_TIME: + if worker_3.is_qualified(PASSED_QUALIFICATION_NAME): + break + time.sleep(0.1) + self.assertLess( + time.time() - start_time, TIMEOUT_TIME, "Did not qualify in time" + ) + + # Accept a real task, and complete it, from worker 3 + # Register a task agent successfully + mock_agent_details = "FAKE_ASSIGNMENT4" + self.architect.server.register_mock_agent(worker_id_3, mock_agent_details) + agents = self.db.find_agents() + self.assertEqual(len(agents), 4, "No agent created for task") + last_packet = None + time.sleep(0.1) + last_packet = self.architect.server.last_packet + self.assertIsNotNone(last_packet) + self.assertNotEqual( + agents[3].get_unit().unit_index, + SCREENING_UNIT_INDEX, + "Agent assigned screening unit", + ) + self.architect.server.last_packet = None + + sup.shutdown() + self.assertTrue(channel_info.channel.is_closed()) + # TODO(#97) handle testing for disconnecting in and out of tasks From 7cdc5af2ffaedd3f4c6456307e3bb05b6f79ead0 Mon Sep 17 00:00:00 2001 From: Jack Urbanek Date: Wed, 22 Sep 2021 19:05:25 -0400 Subject: [PATCH 6/8] More test consistency --- test/core/test_supervisor.py | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/test/core/test_supervisor.py b/test/core/test_supervisor.py index f1709c91c..a10bff819 100644 --- a/test/core/test_supervisor.py +++ b/test/core/test_supervisor.py @@ -945,14 +945,11 @@ def screen_unit(unit): agents = self.db.find_agents() self.assertEqual(len(agents), 1, "No agent created for screening") time.sleep(0.1) - last_packet = self.architect.server.last_packet - self.assertIsNotNone(last_packet) self.assertEqual( agents[0].get_unit().unit_index, SCREENING_UNIT_INDEX, "Agent not assigned screening unit", ) - self.architect.server.last_packet = None # Register a second screening agent successfully mock_agent_details = "FAKE_ASSIGNMENT2" @@ -961,14 +958,11 @@ def screen_unit(unit): self.assertEqual(len(agents), 2, "No agent created for screening") last_packet = None time.sleep(0.1) - last_packet = self.architect.server.last_packet - self.assertIsNotNone(last_packet) self.assertEqual( agents[1].get_unit().unit_index, SCREENING_UNIT_INDEX, "Agent not assigned screening unit", ) - self.architect.server.last_packet = None # Fail to register a third screening agent mock_agent_details = "FAKE_ASSIGNMENT3" @@ -976,12 +970,6 @@ def screen_unit(unit): agents = self.db.find_agents() self.assertEqual(len(agents), 2, "Third agent created, when 2 was max") time.sleep(0.1) - last_packet = self.architect.server.last_packet - self.assertIsNotNone(last_packet) - self.assertIsNone( - last_packet["data"]["agent_id"], "worker assigned real agent id" - ) - self.architect.server.last_packet = None # Disconnect first agent agents[0].update_status(AgentState.STATUS_DISCONNECT) @@ -993,17 +981,11 @@ def screen_unit(unit): agents = self.db.find_agents() self.assertEqual(len(agents), 3, "Third agent not created") time.sleep(0.1) - last_packet = self.architect.server.last_packet - self.assertIsNotNone(last_packet) - self.assertIsNotNone( - last_packet["data"]["agent_id"], "worker not assigned real agent id" - ) self.assertEqual( agents[2].get_unit().unit_index, SCREENING_UNIT_INDEX, "Agent not assigned screening unit", ) - self.architect.server.last_packet = None # Submit screening from the agent validation_data = {"success": False} @@ -1041,14 +1023,11 @@ def screen_unit(unit): self.assertEqual(len(agents), 4, "No agent created for task") last_packet = None time.sleep(0.1) - last_packet = self.architect.server.last_packet - self.assertIsNotNone(last_packet) self.assertNotEqual( agents[3].get_unit().unit_index, SCREENING_UNIT_INDEX, "Agent assigned screening unit", ) - self.architect.server.last_packet = None sup.shutdown() self.assertTrue(channel_info.channel.is_closed()) From 738c2622903715204facd08f4b6361cb56dce19b Mon Sep 17 00:00:00 2001 From: Jack Urbanek Date: Wed, 22 Sep 2021 19:07:35 -0400 Subject: [PATCH 7/8] Removing sneaky debug --- mephisto/operations/supervisor.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/mephisto/operations/supervisor.py b/mephisto/operations/supervisor.py index 50e934807..80dfad2b1 100644 --- a/mephisto/operations/supervisor.py +++ b/mephisto/operations/supervisor.py @@ -408,10 +408,8 @@ def _launch_and_run_unit( logger.exception(f"Cleaning up unit: {e}", exc_info=True) task_runner.cleanup_unit(unit) finally: - print("Run completed") if unit.unit_index == SCREENING_UNIT_INDEX: if agent.get_status() != AgentState.STATUS_COMPLETED: - print("But was not completed!") blueprint = task_runner.task_run.get_blueprint( args=task_runner.args ) From e4515f5fa35c323a374db7d1c5ce39ac2bcf7152 Mon Sep 17 00:00:00 2001 From: Jack Urbanek Date: Fri, 24 Sep 2021 12:39:25 -0400 Subject: [PATCH 8/8] Naming changes, addressing comments --- mephisto/abstractions/blueprints/README.md | 2 +- .../blueprints/mixins/screen_task_required.py | 33 ++++++++++--------- mephisto/operations/task_launcher.py | 6 ++-- test/core/test_supervisor.py | 8 ++--- 4 files changed, 25 insertions(+), 24 deletions(-) diff --git a/mephisto/abstractions/blueprints/README.md b/mephisto/abstractions/blueprints/README.md index 2d36b12ee..a1eec09c1 100644 --- a/mephisto/abstractions/blueprints/README.md +++ b/mephisto/abstractions/blueprints/README.md @@ -36,7 +36,7 @@ Blueprints sometimes share some component functionality that may be useful acros ### `OnboardingRequired` This mixin allows for blueprints that require people to complete an onboarding task _before_ they're even able to start on their first task. Usually this is useful for providing task context, and then quizzing workers to see if they understand what's provided. Tasks using this mixin will activate onboarding mode for new `Worker`s whenever the `mephisto.blueprint.onboarding_qualification` hydra argument is provided. ### `ScreenTaskRequired` -This mixin allows for blueprints that require people to complete a _test_ version of the real task the first time a worker does the task. This allows you to validate workers on a run of the real task, either on your actual data (when providing `SharedTaskState.generate_screening_unit_data=False`) or on test data that you may more easily validate using (when providing a generator to `SharedTaskState.generate_screening_unit_data`). The tasks should be the same as your standard task, just able to be easily validated. You **do pay** for screening tasks, and as such we ask you set `mephisto.blueprint.max_screening_units` to put a cap on how many screening units you want to launch. +This mixin allows for blueprints that require people to complete a _test_ version of the real task the first time a worker does the task. This allows you to validate workers on a run of the real task, either on your actual data (when providing `SharedTaskState.screening_data_factory=False`) or on test data that you may more easily validate using (when providing a generator to `SharedTaskState.screening_data_factory`). The tasks should be the same as your standard task, just able to be easily validated. You **do pay** for screening tasks, and as such we ask you set `mephisto.blueprint.max_screening_units` to put a cap on how many screening units you want to launch. ## Implementations diff --git a/mephisto/abstractions/blueprints/mixins/screen_task_required.py b/mephisto/abstractions/blueprints/mixins/screen_task_required.py index 150c28b5e..b5c4f761a 100644 --- a/mephisto/abstractions/blueprints/mixins/screen_task_required.py +++ b/mephisto/abstractions/blueprints/mixins/screen_task_required.py @@ -70,7 +70,7 @@ def blank_generator(): @dataclass class ScreenTaskSharedState: onboarding_data: Dict[str, Any] = field(default_factory=dict) - generate_screening_unit_data: Tuple[bool, ScreenUnitDataGenerator] = field( + screening_data_factory: Tuple[bool, ScreenUnitDataGenerator] = field( default_factory=lambda: blank_generator() ) @@ -103,9 +103,9 @@ def init_screening_config( # a specially generated unit to unqualified workers self.passed_qualification_name = args.blueprint.passed_qualification_name self.failed_qualification_name = args.blueprint.block_qualification - self.generate_screening_unit_data: Tuple[ + self.screening_data_factory: Tuple[ bool, ScreenUnitDataGenerator - ] = shared_state.generate_screening_unit_data + ] = shared_state.screening_data_factory self.screening_units_launched = 0 self.screening_unit_cap = args.blueprint.max_screening_units @@ -120,8 +120,8 @@ def assert_task_args(cls, args: "DictConfig", shared_state: "SharedTaskState"): passed_qualification_name = args.blueprint.passed_qualification_name failed_qualification_name = args.blueprint.block_qualification assert args.task.allowed_concurrent == 1, ( - "Can only run this task type with one allowed task at a time per worker, to ensure screening " - "before moving into more tasks." + "Can only run this task type with one allowed concurrent unit at a time per worker, to ensure " + "screening before moving into real units." ) assert ( passed_qualification_name is not None @@ -129,20 +129,21 @@ def assert_task_args(cls, args: "DictConfig", shared_state: "SharedTaskState"): assert ( failed_qualification_name is not None ), "Must supply an block_qualification in Hydra args to use a qualification task" - assert hasattr(shared_state, "generate_screening_unit_data"), ( - "You must supply a generate_screening_unit_data function in your SharedTaskState to use " - "qualification tasks." + assert hasattr(shared_state, "screening_data_factory"), ( + "You must supply a screening_data_factory generator in your SharedTaskState to use " + "screening units, or False if you can screen on any tasks." ) max_screening_units = args.blueprint.max_screening_units assert max_screening_units is not None, ( "You must supply a blueprint.max_screening_units argument to set the maximum number of " - "additional tasks you will pay out for the purpose of validating new workers. " + "additional units you will pay out for the purpose of screening new workers. Note that you " + "do pay for screening units, they are just like any other units." ) - generate_screening_unit_data = shared_state.generate_screening_unit_data - if generate_screening_unit_data is not False: - assert isinstance(generate_screening_unit_data, types.GeneratorType), ( - "Must provide a generator function to SharedTaskState.generate_screening_unit_data if " - "you want to generate screening tasks on the fly, or False if you can validate on any task " + screening_data_factory = shared_state.screening_data_factory + if screening_data_factory is not False: + assert isinstance(screening_data_factory, types.GeneratorType), ( + "Must provide a generator function to SharedTaskState.screening_data_factory if " + "you want to generate screening tasks on the fly, or False if you can screen on any task " ) def worker_needs_screening(self, worker: "Worker") -> bool: @@ -150,14 +151,14 @@ def worker_needs_screening(self, worker: "Worker") -> bool: return worker.get_granted_qualification(self.passed_qualification_name) is None def should_generate_unit(self) -> bool: - return self.generate_screening_unit_data is not False + return self.screening_data_factory is not False def get_screening_unit_data(self) -> Optional[Dict[str, Any]]: try: if self.screening_units_launched >= self.screening_unit_cap: return None # Exceeded the cap on these units else: - data = next(self.generate_screening_unit_data) + data = next(self.screening_data_factory) self.screening_units_launched += 1 return data except StopIteration: diff --git a/mephisto/operations/task_launcher.py b/mephisto/operations/task_launcher.py index 869637b57..9034ead4a 100644 --- a/mephisto/operations/task_launcher.py +++ b/mephisto/operations/task_launcher.py @@ -236,9 +236,9 @@ def launch_screening_unit(self, unit_data: Dict[str, Any]) -> "Unit": task_run.task_type, task_run.sandbox, ) - validator_unit = Unit.get(self.db, unit_id) - validator_unit.launch(self.launch_url) - return validator_unit + screening_unit = Unit.get(self.db, unit_id) + screening_unit.launch(self.launch_url) + return screening_unit def get_assignments_are_all_created(self) -> bool: return self.assignment_thread_done diff --git a/test/core/test_supervisor.py b/test/core/test_supervisor.py index a10bff819..b56511cb5 100644 --- a/test/core/test_supervisor.py +++ b/test/core/test_supervisor.py @@ -988,8 +988,8 @@ def screen_unit(unit): ) # Submit screening from the agent - validation_data = {"success": False} - self.architect.server.send_agent_act(agents[1].get_agent_id(), validation_data) + screening_data = {"success": False} + self.architect.server.send_agent_act(agents[1].get_agent_id(), screening_data) # Assert failed screening screening start_time = time.time() TIMEOUT_TIME = 5 @@ -1002,8 +1002,8 @@ def screen_unit(unit): ) # Submit screening from the agent - validation_data = {"success": True} - self.architect.server.send_agent_act(agents[2].get_agent_id(), validation_data) + screening_data = {"success": True} + self.architect.server.send_agent_act(agents[2].get_agent_id(), screening_data) # Assert successful screening screening start_time = time.time() TIMEOUT_TIME = 5