diff --git a/README.md b/README.md index 814beac..31571de 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,7 @@ The library is fully type-hinted. | [Segment Display 4x7 2.0](https://www.tinkerforge.com/en/doc/Hardware/Bricklets/Segment_Display_4x7_V2.html) |:heavy_check_mark:|:heavy_check_mark:| | [Temperature](https://www.tinkerforge.com/en/doc/Hardware/Bricklets/Temperature.html) |:heavy_check_mark:|:heavy_check_mark:| | [Temperature 2.0](https://www.tinkerforge.com/en/doc/Hardware/Bricklets/Temperature_V2.html) |:heavy_check_mark:|:heavy_check_mark:| +| [Thermocouple 2.0](https://www.tinkerforge.com/en/doc/Hardware/Bricklets/Thermocouple_V2.html) |:heavy_check_mark:|:heavy_check_mark:| ## Documentation The documentation is currently work in progress. The full documentation will be moved to diff --git a/examples/bricklet_thermocouple_v2.py b/examples/bricklet_thermocouple_v2.py new file mode 100755 index 0000000..aee60c4 --- /dev/null +++ b/examples/bricklet_thermocouple_v2.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +# pylint: disable=duplicate-code +""" +An example to demonstrate most of the capabilities of the Tinkerforge Thermocouple Bricklet 2.0. +""" +import asyncio +import warnings +from decimal import Decimal + +from tinkerforge_async.bricklet_thermocouple_v2 import BrickletThermocoupleV2 +from tinkerforge_async.devices import BrickletWithMCU +from tinkerforge_async.ip_connection import IPConnectionAsync + + +async def process_callbacks(device: BrickletThermocoupleV2) -> None: + """Prints the callbacks (filtered by id) of the bricklet.""" + async for packet in device.read_events(): + print("Callback received", packet) + + +async def run_example_generic(bricklet: BrickletWithMCU) -> None: + """This is a demo of the generic features of the Tinkerforge bricklets with a microcontroller.""" + uid = await bricklet.read_uid() + print("Device uid:", uid) + await bricklet.write_uid(uid) + + print("SPI error count:", await bricklet.get_spitfp_error_count()) + + print("Current bootloader mode:", await bricklet.get_bootloader_mode()) + bootloader_mode = bricklet.BootloaderMode.FIRMWARE + print("Setting bootloader mode to", bootloader_mode, ":", await bricklet.set_bootloader_mode(bootloader_mode)) + + print("Disable status LED") + await bricklet.set_status_led_config(bricklet.LedConfig.OFF) + print("Current status:", await bricklet.get_status_led_config()) + await asyncio.sleep(1) + print("Enable status LED") + await bricklet.set_status_led_config(bricklet.LedConfig.SHOW_STATUS) + print("Current status:", await bricklet.get_status_led_config()) + + print("Get Chip temperature:", await bricklet.get_chip_temperature() - Decimal("273.15"), "°C") + + print("Reset Bricklet") + await bricklet.reset() + + +async def run_example(bricklet: BrickletThermocoupleV2) -> None: + """This is the actual demo. If the bricklet is found, this code will be run.""" + callback_task = asyncio.create_task(process_callbacks(bricklet)) + try: + print("Identity:", await bricklet.get_identity()) + + # Query the value + print("Get temperature:", await bricklet.get_temperature()) + print("Set callback period to", 1000, "ms") + print("Set threshold to >10 °C and wait for callbacks") + # We use a low temperature value on purpose, so that the callback will be triggered + await bricklet.set_temperature_callback_configuration( + period=1000, value_has_to_change=False, option=bricklet.ThresholdOption.GREATER_THAN, minimum=10, maximum=0 + ) + print("Temperature callback configuration:", await bricklet.get_temperature_callback_configuration()) + await asyncio.sleep(10.1) # Wait for 2-3 callbacks + print("Disable threshold callback") + await bricklet.set_temperature_callback_configuration() + print("Temperature callback configuration:", await bricklet.get_temperature_callback_configuration()) + + over_under, opencircuit = await bricklet.get_error_state() + print(f"Reading errors. Over-/Undervoltage: {over_under}, Open circuit: {opencircuit}") + + # Test the generic features of the bricklet. These are available with all + # new bricklets that have a microcontroller + await run_example_generic(bricklet) + finally: + callback_task.cancel() + + +async def shutdown(tasks: set[asyncio.Task]) -> None: + """Clean up by stopping all consumers""" + for task in tasks: + task.cancel() + await asyncio.gather(*tasks) + + +async def main() -> None: + """ + The main loop, that will spawn all callback handlers and wait until they are done. There are two callback handlers, + one waits for the bricklet to connect and runs the demo, the other handles messages sent by the bricklet. + """ + tasks = set() + try: + # Use the context manager of the ip connection. It will automatically do the cleanup. + async with IPConnectionAsync(host="127.0.0.1", port=4223) as connection: + await connection.enumerate() + # Read all enumeration replies, then start the example if we find the correct device + async for enumeration_type, device in connection.read_enumeration(): # pylint: disable=unused-variable + if isinstance(device, BrickletThermocoupleV2): + print(f"Found {device}, running example.") + tasks.add(asyncio.create_task(run_example(device))) + break + print(f"Found {device}, but not interested.") + + # Wait for run_example() to finish + await asyncio.gather(*tasks) + except ConnectionRefusedError: + print("Could not connect to server. Connection refused. Is the brick daemon up?") + except asyncio.CancelledError: + print("Stopped the main loop.") + raise # It is good practice to re-raise CancelledErrors + finally: + await shutdown(tasks) + + +# Report all mistakes managing asynchronous resources. +warnings.simplefilter("always", ResourceWarning) + +# Start the main loop and run the async loop forever. Turn off the debug parameter for production code. +try: + asyncio.run(main(), debug=True) +except KeyboardInterrupt: + print("Shutting down gracefully.") diff --git a/pyproject.toml b/pyproject.toml index 358a289..8450ea2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ classifiers = [ "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", "Development Status :: 5 - Production/Stable", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", @@ -44,7 +45,7 @@ doc = [ ] test = [ - "mypy", "pylint", "pytest", + "mypy", "pylint", "pytest", "setuptools" ] [tool.pylint.'MESSAGES CONTROL'] diff --git a/tinkerforge_async/bricklet_ptc.py b/tinkerforge_async/bricklet_ptc.py index 0874d8b..306ab3a 100644 --- a/tinkerforge_async/bricklet_ptc.py +++ b/tinkerforge_async/bricklet_ptc.py @@ -33,6 +33,8 @@ class CallbackID(Enum): SENSOR_CONNECTED = 24 +# We need the alias for MyPy type hinting +# See https://mypy.readthedocs.io/en/stable/common_issues.html#variables-vs-type-aliases _CallbackID = CallbackID @@ -73,13 +75,15 @@ class LineFilter(Enum): FREQUENCY_60HZ = 1 -_LineFilter = LineFilter # We need the alias for MyPy type hinting +# We need the alias for MyPy type hinting +# See https://mypy.readthedocs.io/en/stable/common_issues.html#variables-vs-type-aliases +_LineFilter = LineFilter @unique class WireMode(Enum): """ - Select the measurement setup. Use 3 or wires to eliminate most/all of the + Select the measurement setup. Use 3 or 4 wires to eliminate most/all the resistance of the wire. Use 3 or 4 wire setups when using PT100 and long cables. """ @@ -89,7 +93,9 @@ class WireMode(Enum): WIRE_4 = 4 -_WireMode = WireMode # We need the alias for MyPy type hinting +# We need the alias for MyPy type hinting +# See https://mypy.readthedocs.io/en/stable/common_issues.html#variables-vs-type-aliases +_WireMode = WireMode @unique @@ -102,6 +108,8 @@ class SensorType(Enum): PT_1000 = 1 +# We need the alias for MyPy type hinting +# See https://mypy.readthedocs.io/en/stable/common_issues.html#variables-vs-type-aliases _SensorType = SensorType diff --git a/tinkerforge_async/bricklet_ptc_v2.py b/tinkerforge_async/bricklet_ptc_v2.py index db6d480..5f94b57 100644 --- a/tinkerforge_async/bricklet_ptc_v2.py +++ b/tinkerforge_async/bricklet_ptc_v2.py @@ -30,6 +30,8 @@ class CallbackID(Enum): SENSOR_CONNECTED = 18 +# We need the alias for MyPy type hinting +# See https://mypy.readthedocs.io/en/stable/common_issues.html#variables-vs-type-aliases _CallbackID = CallbackID @@ -67,7 +69,9 @@ class LineFilter(Enum): FREQUENCY_60HZ = 1 -_LineFilter = LineFilter # We need the alias for MyPy type hinting +# We need the alias for MyPy type hinting +# See https://mypy.readthedocs.io/en/stable/common_issues.html#variables-vs-type-aliases +_LineFilter = LineFilter @unique @@ -82,6 +86,8 @@ class WireMode(Enum): WIRE_4 = 4 +# We need the alias for MyPy type hinting +# See https://mypy.readthedocs.io/en/stable/common_issues.html#variables-vs-type-aliases _WireMode = WireMode # We need the alias for MyPy type hinting @@ -95,7 +101,9 @@ class SensorType(Enum): PT_1000 = 1 -_SensorType = SensorType # We need the alias for MyPy type hinting +# We need the alias for MyPy type hinting +# See https://mypy.readthedocs.io/en/stable/common_issues.html#variables-vs-type-aliases +_SensorType = SensorType class GetMovingAverageConfiguration(NamedTuple): diff --git a/tinkerforge_async/bricklet_thermocouple_v2.py b/tinkerforge_async/bricklet_thermocouple_v2.py new file mode 100644 index 0000000..cae4400 --- /dev/null +++ b/tinkerforge_async/bricklet_thermocouple_v2.py @@ -0,0 +1,389 @@ +""" +Module for the Tinkerforge Thermocouple Bricklet 2.0 +(https://www.tinkerforge.com/en/doc/Hardware/Bricklets/Thermocouple_V2.html) implemented using Python asyncio. It does +the low-level communication with the Tinkerforge ip connection and also handles conversion of raw units to SI units. +""" +# pylint: disable=duplicate-code # Many sensors of different generations have a similar API +from __future__ import annotations + +from decimal import Decimal +from enum import Enum, unique +from typing import TYPE_CHECKING, AsyncGenerator, NamedTuple + +from .devices import AdvancedCallbackConfiguration, BrickletWithMCU, DeviceIdentifier, Event +from .devices import ThresholdOption as Threshold +from .devices import _FunctionID +from .ip_connection_helper import pack_payload, unpack_payload + +if TYPE_CHECKING: + from .ip_connection import IPConnectionAsync + + +@unique +class CallbackID(Enum): + """ + The callbacks available to this bricklet + """ + + TEMPERATURE = 4 + ERROR_STATE = 8 + + +# We need the alias for MyPy type hinting +# See https://mypy.readthedocs.io/en/stable/common_issues.html#variables-vs-type-aliases +_CallbackID = CallbackID + + +@unique +class FunctionID(_FunctionID): + """ + The function calls available to this bricklet + """ + + GET_TEMPERATURE = 1 + SET_TEMPERATURE_CALLBACK_CONFIGURATION = 2 + GET_TEMPERATURE_CALLBACK_CONFIGURATION = 3 + SET_CONFIGURATION = 5 + GET_CONFIGURATION = 6 + GET_ERROR_STATE = 7 + + +@unique +class Averaging(Enum): + """ + The number of values averaged before returning the measurement result. + """ + + AVERAGING_1 = 1 + AVERAGING_2 = 2 + AVERAGING_4 = 4 + AVERAGING_8 = 8 + AVERAGING_16 = 16 + + +# We need the alias for MyPy type hinting +# See https://mypy.readthedocs.io/en/stable/common_issues.html#variables-vs-type-aliases +_Averaging = Averaging + + +@unique +class ThermocoupleType(Enum): + """ + The type of thermocouple connected to the bricklet. + """ + + TYPE_B = 0 + TYPE_E = 1 + TYPE_J = 2 + TYPE_K = 3 + TYPE_N = 4 + TYPE_R = 5 + TYPE_S = 6 + TYPE_T = 7 + TYPE_G8 = 8 + TYPE_G32 = 9 + + +# We need the alias for MyPy type hinting +# See https://mypy.readthedocs.io/en/stable/common_issues.html#variables-vs-type-aliases +_ThermocoupleType = ThermocoupleType + + +class LineFilter(Enum): + """ + Selects the notch filter to filter out the mains frequency hum + """ + + FREQUENCY_50HZ = 0 + FREQUENCY_60HZ = 1 + + +# We need the alias for MyPy type hinting +# See https://mypy.readthedocs.io/en/stable/common_issues.html#variables-vs-type-aliases +_LineFilter = LineFilter + + +class GetConfiguration(NamedTuple): + averaging: Averaging + thermocouple_type: ThermocoupleType + filter: LineFilter + + +class GetErrorState(NamedTuple): + """ + Tuple that contains the error states of the system. Over-/undervoltage indicates either a voltage above 3.3 V or + below 0 V and likely a defective thermocouple. An open circuit indicates a missing or defective thermocouple. + """ + + over_under: bool + open_circuit: bool + + +class BrickletThermocoupleV2(BrickletWithMCU): + """ + Measures temperature with a thermocouple. + """ + + DEVICE_IDENTIFIER = DeviceIdentifier.BRICKLET_THERMOCOUPLE_V2 + DEVICE_DISPLAY_NAME = "Thermocouple Bricklet 2.0" + + # Convenience imports, so that the user does not need to additionally import them + CallbackID = CallbackID + FunctionID = FunctionID + ThresholdOption = Threshold + ThermocoupleType = ThermocoupleType + + CALLBACK_FORMATS = {CallbackID.TEMPERATURE: "i", CallbackID.ERROR_STATE: "! !"} + + SID_TO_CALLBACK = { + 0: (CallbackID.TEMPERATURE,), + 1: (CallbackID.ERROR_STATE,), + } + + def __init__(self, uid, ipcon: IPConnectionAsync) -> None: + """ + Creates an object with the unique device ID *uid* and adds it to + the IP Connection *ipcon*. + """ + super().__init__(self.DEVICE_DISPLAY_NAME, uid, ipcon) + + self.api_version = (2, 0, 0) + + async def get_value(self, sid: int) -> Decimal | GetErrorState: + assert sid in (0, 1) + + if sid == 0: + return await self.get_temperature() + if sid == 1: + return await self.get_error_state() + raise ValueError(f"Invalid sid: {sid}. sid must be in (0, 1).") + + async def set_callback_configuration( # pylint: disable=too-many-arguments + self, + sid: int, + period: int = 0, + value_has_to_change: bool = False, + option: Threshold | int = Threshold.OFF, + minimum: float | Decimal | None = None, + maximum: float | Decimal | None = None, + response_expected: bool = True, + ) -> None: + minimum = Decimal("273.15") if minimum is None else minimum + maximum = Decimal("273.15") if maximum is None else maximum + + if sid == 0: + await self.set_temperature_callback_configuration( + period, value_has_to_change, option, minimum, maximum, response_expected + ) + + raise ValueError(f"Invalid sid: {sid}. sid must be in (0, ).") + + async def get_callback_configuration(self, sid: int) -> AdvancedCallbackConfiguration: + if sid == 0: + return await self.get_temperature_callback_configuration() + + raise ValueError(f"Invalid sid: {sid}. sid must be in (0, ).") + + async def get_temperature(self) -> Decimal: + """ + Returns the temperature of the thermocouple. The value is given in °C/100, + e.g. a value of 4223 means that a temperature of 42.23 °C is measured. + + If you want to get the temperature periodically, it is recommended + to use the :cb:`Temperature` callback and set the period with + :func:`Set Temperature Callback Configuration`. + + If you want to get the value periodically, it is recommended to use the + :cb:`Temperature` callback. You can set the callback configuration + with :func:`Set Temperature Callback Configuration`. + """ + _, payload = await self.ipcon.send_request( + device=self, function_id=FunctionID.GET_TEMPERATURE, response_expected=True + ) + return self.__value_to_si(unpack_payload(payload, "i")) + + async def set_temperature_callback_configuration( # pylint: disable=too-many-arguments + self, + period: int = 0, + value_has_to_change: bool = False, + option: Threshold | int = Threshold.OFF, + minimum: Decimal | int | float = Decimal("273.15"), + maximum: Decimal | int | float = Decimal("273.15"), + response_expected=True, + ) -> None: + """ + The period is the period with which the :cb:`Temperature` callback is triggered + periodically. A value of 0 turns the callback off. + + If the `value has to change`-parameter is set to true, the callback is only + triggered after the value has changed. If the value didn't change + within the period, the callback is triggered immediately on change. + + If it is set to false, the callback is continuously triggered with the period, + independent of the value. + + It is furthermore possible to constrain the callback with thresholds. + + The `option`-parameter together with min/max sets a threshold for the :cb:`Temperature` callback. + + The following options are possible: + + .. csv-table:: + :header: "Option", "Description" + :widths: 10, 100 + + "'x'", "Threshold is turned off" + "'o'", "Threshold is triggered when the value is *outside* the min and max values" + "'i'", "Threshold is triggered when the value is *inside* or equal to the min and max values" + "'<'", "Threshold is triggered when the value is smaller than the min value (max is ignored)" + "'>'", "Threshold is triggered when the value is greater than the min value (max is ignored)" + + If the option is set to 'x' (threshold turned off) the callback is triggered with the fixed period. + """ + if not isinstance(option, Threshold): + option = Threshold(option) + assert period >= 0 + + await self.ipcon.send_request( + device=self, + function_id=FunctionID.SET_TEMPERATURE_CALLBACK_CONFIGURATION, + data=pack_payload( + ( + int(period), + bool(value_has_to_change), + option.value.encode("ascii"), + self.__si_to_value(minimum), + self.__si_to_value(maximum), + ), + "I ! c i i", + ), + response_expected=response_expected, + ) + + async def get_temperature_callback_configuration(self) -> AdvancedCallbackConfiguration: + """ + Returns the callback configuration as set by :func:`Set Temperature Callback Configuration`. + """ + _, payload = await self.ipcon.send_request( + device=self, function_id=FunctionID.GET_TEMPERATURE_CALLBACK_CONFIGURATION, response_expected=True + ) + period, value_has_to_change, option, minimum, maximum = unpack_payload(payload, "I ! c i i") + option = Threshold(option) + minimum, maximum = self.__value_to_si(minimum), self.__value_to_si(maximum) + return AdvancedCallbackConfiguration(period, value_has_to_change, option, minimum, maximum) + + async def set_configuration( + self, + averaging: _Averaging, + thermocouple_type: _ThermocoupleType, + line_filter: _LineFilter, + response_expected: bool = True, + ) -> None: + """ + You can configure averaging size, thermocouple type and frequency + filtering. + + Available averaging sizes are 1, 2, 4, 8 and 16 samples. + + As thermocouple type you can use B, E, J, K, N, R, S and T. If you have a + different thermocouple or a custom thermocouple you can also use + G8 and G32. With these types the returned value will not be in °C/100, + it will be calculated by the following formulas: + + * G8: ``value = 8 * 1.6 * 2^17 * Vin`` + * G32: ``value = 32 * 1.6 * 2^17 * Vin`` + + where Vin is the thermocouple input voltage. + + The frequency filter can be either configured to 50Hz or to 60Hz. You should + configure it according to your utility frequency. + + The conversion time depends on the averaging and filter configuration, it can + be calculated as follows: + + * 60Hz: ``time = 82 + (samples - 1) * 16.67`` + * 50Hz: ``time = 98 + (samples - 1) * 20`` + """ + if not isinstance(averaging, Averaging): + averaging = Averaging(averaging) + if not isinstance(thermocouple_type, ThermocoupleType): + thermocouple_type = ThermocoupleType(thermocouple_type) + if not isinstance(line_filter, LineFilter): + line_filter = LineFilter(line_filter) + + await self.ipcon.send_request( + device=self, + function_id=FunctionID.SET_CONFIGURATION, + data=pack_payload((averaging.value, thermocouple_type.value, line_filter.value), "B B B"), + response_expected=response_expected, + ) + + async def get_configuration(self) -> GetConfiguration: + _, payload = await self.ipcon.send_request( + device=self, function_id=FunctionID.GET_CONFIGURATION, response_expected=True + ) + + averaging, thermocouple_type, line_filter = unpack_payload(payload, "B B B") + return GetConfiguration(Averaging(averaging), ThermocoupleType(thermocouple_type), LineFilter(line_filter)) + + async def get_error_state(self) -> GetErrorState: + """ + Returns the current error state. There are two possible errors: + + * Over/Under Voltage and + * Open Circuit. + + Over/Under Voltage happens for voltages below 0V or above 3.3V. In this case + it is very likely that your thermocouple is defective. An Open Circuit error + indicates that there is no thermocouple connected. + + You can use the :cb:`Error State` callback to automatically get triggered + when the error state changes. + """ + _, payload = await self.ipcon.send_request( + device=self, function_id=FunctionID.GET_ERROR_STATE, response_expected=True + ) + + over_under, open_circuit = unpack_payload(payload, "! !") + return GetErrorState(over_under, open_circuit) + + @staticmethod + def __value_to_si(value: int) -> Decimal: + """ + Convert to the sensor value to SI units + """ + return Decimal(value + 27315) / 100 + + @staticmethod + def __si_to_value(value: float | Decimal) -> int: + return int(value * 100) - 27315 + + async def read_events( + self, + events: tuple[int | _CallbackID, ...] | list[int | _CallbackID] | None = None, + sids: tuple[int, ...] | list[int] | None = None, + ) -> AsyncGenerator[Event, None]: + registered_events = set() + if events: + for event in events: + registered_events.add(self.CallbackID(event)) + if sids is not None: + for sid in sids: + for callback in self.SID_TO_CALLBACK.get(sid, []): + registered_events.add(callback) + + if events is None and sids is None: + registered_events = set(self.CALLBACK_FORMATS.keys()) + + async for header, payload in super()._read_events(): + try: + function_id = CallbackID(header.function_id) + except ValueError: + # Invalid header. Drop the packet. + continue + if function_id in registered_events: + value = unpack_payload(payload, self.CALLBACK_FORMATS[function_id]) + if function_id is CallbackID.TEMPERATURE: + yield Event(self, 0, function_id, self.__value_to_si(value)) + else: + yield Event(self, 1, function_id, value) diff --git a/tinkerforge_async/device_factory.py b/tinkerforge_async/device_factory.py index 85445c0..ef6085d 100644 --- a/tinkerforge_async/device_factory.py +++ b/tinkerforge_async/device_factory.py @@ -28,6 +28,7 @@ from .bricklet_segment_display_4x7_v2 import BrickletSegmentDisplay4x7V2 from .bricklet_temperature import BrickletTemperature from .bricklet_temperature_v2 import BrickletTemperatureV2 +from .bricklet_thermocouple_v2 import BrickletThermocoupleV2 if TYPE_CHECKING: from . import IPConnectionAsync @@ -83,3 +84,4 @@ def get(self, ipcon: IPConnectionAsync, device_id: DeviceIdentifier, uid: int, * device_factory.register(BrickletRS232V2) device_factory.register(BrickletTemperature) device_factory.register(BrickletTemperatureV2) +device_factory.register(BrickletThermocoupleV2) diff --git a/tinkerforge_async/devices.py b/tinkerforge_async/devices.py index 333e193..28a17f6 100644 --- a/tinkerforge_async/devices.py +++ b/tinkerforge_async/devices.py @@ -56,6 +56,7 @@ class DeviceIdentifier(Enum): BRICKLET_MOTION_DETECTOR_V2 = 292 BRICKLET_PTC_V2 = 2101 BRICKLET_RS232_V2 = 2108 + BRICKLET_THERMOCOUPLE_V2 = 2109 BRICKLET_IO_4_V2 = 2111 BRICKLET_TEMPERATURE_V2 = 2113 BRICKLET_BAROMETER_V2 = 2117