diff --git a/README.md b/README.md index 4469c0d..c43d0ec 100644 --- a/README.md +++ b/README.md @@ -40,11 +40,14 @@ The main reason why I decided to fork the original `heyoo` library is that it us In this fork I added app events (to listen to incoming messages) and implemented an easier way to send/receive messages and media by using the `Message` object. The API version is always up to date and I'm always adding new features and fixing bugs. +I also added a full asyncronous version of the library, which is faster and more efficient than the syncronous one. + I fixed some bugs and added many features, however the library can still be improved. ### Supported features: - Listening to events (messages, media, etc.) +- Async version available - Sending messages - Sending messages from different numbers individually - Marking messages as read diff --git a/async_test.py b/async_test.py new file mode 100644 index 0000000..04972c0 --- /dev/null +++ b/async_test.py @@ -0,0 +1,142 @@ +from whatsapp import WhatsApp, Message, AsyncWhatsApp +from dotenv import load_dotenv +from os import getenv +import asyncio + +load_dotenv() +token = getenv("GRAPH_API_TOKEN") +phone_number_id = {1: getenv("TEST_PHONE_NUMBER_ID")} +dest_phone_number = getenv("DEST_PHONE_NUMBER") + +app = AsyncWhatsApp(token=token, phone_number_id=phone_number_id, + update_check=False) +loop = asyncio.get_event_loop() +msg = app.create_message(to=dest_phone_number, content="Hello world") + +async def run_test(): + + # test every single method and print the result + print("Testing send_text") + v = await msg.send() + # print(v) + # await asyncio.sleep(1) + # print("Getting request status") + # print(v.result()) + + + print("Testing send_reply_button") + v = await app.send_reply_button( + button={ + "type": "button", + "body": { + "text": "This is a test button" + }, + "action": { + "buttons": [ + { + "type": "reply", + "reply": { + "id": "b1", + "title": "This is button 1" + } + }, + { + "type": "reply", + "reply": { + "id": "b2", + "title": "this is button 2" + } + } + ] + } + }, + recipient_id=dest_phone_number, + sender=1 + + ) + print(f"send_reply_button: {v}") + + + print("Creating button") + v = app.create_button( + button={ + "header": { + "type": "text", + "text": "your-header-content" + }, + "body": { + "text": "your-text-message-content" + }, + "footer": { + "text": "your-footer-content" + }, + "action": { + "button": "cta-button-content", + "sections": [ + { + "title": "your-section-title-content", + "rows": [ + { + "id": "unique-row-identifier", + "title": "row-title-content", + "description": "row-description-content", + } + ] + }, + ] + } + }, + ) + print(f"create_button: {v}") + print("sending button") + v = await app.send_button({ + "header": "Header Testing", + "body": "Body Testing", + "footer": "Footer Testing", + "action": { + "button": "cta-button-content", + "sections": [ + { + "title": "your-secti", + "rows": [ + { + "id": "unique-row-", + "title": "row-title-", + "description": "row-description-", + } + ] + }, + ] + } + }, dest_phone_number, sender=1) + print(f"send_button: {v}") + print("sending image") + v = await app.send_image( + "https://file-examples.com/storage/feb05093336710053a32bc1/2017/10/file_example_JPG_1MB.jpg", dest_phone_number, sender=1) + print(f"send_image: {v}") + print("sending video") + v = await app.send_video( + "https://file-examples.com/storage/feb05093336710053a32bc1/2017/04/file_example_MP4_480_1_5MG.mp4", dest_phone_number, sender=1) + print(f"send_video: {v}") + print("sending audio") + v = await app.send_audio( + "https://file-examples.com/storage/feb05093336710053a32bc1/2017/11/file_example_MP3_1MG.mp3", dest_phone_number, sender=1) + print(f"send_audio: {v}") + print("sending document") + v = await app.send_document( + "https://file-examples.com/storage/feb05093336710053a32bc1/2017/10/file-example_PDF_1MB.pdf", dest_phone_number, sender=1) + print(f"send_document: {v}") + print("sending location") + v = await app.send_location("37.7749", "-122.4194", "test", + "test, test", dest_phone_number, sender=1) + print(f"send_location: {v}") + print("sending template") + v = await app.send_template( + "hello_world", dest_phone_number, sender=1) + print(f"send_template: {v}") # this returns error if the phone number is not a test phone number + + await asyncio.sleep(10) + + + +asyncio.run(run_test()) \ No newline at end of file diff --git a/whatsapp/__init__.py b/whatsapp/__init__.py index 1bf7545..2418da8 100644 --- a/whatsapp/__init__.py +++ b/whatsapp/__init__.py @@ -4,6 +4,9 @@ from __future__ import annotations import requests import logging +import json +import asyncio +import aiohttp from bs4 import BeautifulSoup from fastapi import FastAPI, HTTPException, Request from uvicorn import run as _run @@ -15,7 +18,14 @@ from .ext._media import upload_media, query_media_url, download_media, delete_media from .ext._buttons import send_button, create_button, send_reply_button from .ext._static import is_message, get_mobile, get_author, get_name, get_message, get_message_id, get_message_type, get_message_timestamp, get_audio, get_delivery, get_document, get_image, get_sticker, get_interactive_response, get_location, get_video, changed_field -import json + +from .async_ext._property import authorized as async_authorized +from .async_ext._send_others import send_custom_json as async_send_custom_json, send_contacts as async_send_contacts +from .async_ext._message import send_template as async_send_template +from .async_ext._send_media import send_image as async_send_image, send_video as async_send_video, send_audio as async_send_audio, send_location as async_send_location, send_sticker as async_send_sticker, send_document as async_send_document +from .async_ext._media import upload_media as async_upload_media, query_media_url as async_query_media_url, download_media as async_download_media, delete_media as async_delete_media +from .async_ext._buttons import send_button as async_send_button, create_button as async_create_button, send_reply_button as async_send_reply_button +from .async_ext._static import is_message as async_is_message, get_mobile as async_get_mobile, get_author as async_get_author, get_name as async_get_name, get_message as async_get_message, get_message_id as async_get_message_id, get_message_type as async_get_message_type, get_message_timestamp as async_get_message_timestamp, get_audio as async_get_audio, get_delivery as async_get_delivery, get_document as async_get_document, get_image as async_get_image, get_sticker as async_get_sticker, get_interactive_response as async_get_interactive_response, get_location as async_get_location, get_video as async_get_video, changed_field as async_changed_field class WhatsApp(object): @@ -132,8 +142,8 @@ async def verify_endpoint(r: Request): if r.query_params.get("hub.verify_token") == self.verify_token: logging.debug("Webhook verified successfully") challenge = r.query_params.get("hub.challenge") - self.verification_handler(challenge) - self.other_handler(challenge) + await self.verification_handler(challenge) + await self.other_handler(challenge) return int(challenge) logging.error("Webhook Verification failed - token mismatch") await self.verification_handler(False) @@ -202,9 +212,9 @@ async def hook(r: Request): send_template = send_template send_custom_json = send_custom_json send_contacts = send_contacts - authorized = property(authorized) + def create_message(self, **kwargs) -> Message: """ Create a message object @@ -248,6 +258,238 @@ def run(self, host: str = "localhost", port: int = 5000, **options): _run(self.app, host=host, port=port, **options) +class AsyncWhatsApp(WhatsApp): + def __init__(self, token: str = "", phone_number_id: dict = "", logger: bool = True, update_check: bool = True, verify_token: str = "", debug: bool = True, version: str = "latest"): + """ + Initialize the WhatsApp Object + + Args: + token[str]: Token for the WhatsApp cloud API obtained from the Facebook developer portal + phone_number_id[str]: Phone number id for the WhatsApp cloud API obtained from the developer portal + logger[bool]: Whether to enable logging or not (default: True) + """ + + # Check if the version is up to date + logging.getLogger(__name__).addHandler(logging.NullHandler()) + + self.l = phone_number_id + + if isinstance(phone_number_id, dict): + # use first phone number id as default + phone_number_id = phone_number_id[list(phone_number_id.keys())[0]] + + elif phone_number_id == "": + logging.error("Phone number ID not provided") + raise ValueError( + "Phone number ID not provided but required") + elif isinstance(phone_number_id, str): + logging.critical( + "The phone number ID should be a dictionary of phone numbers and their IDs. Using strings is deprecated.") + raise ValueError( + "Phone number ID should be a dictionary of phone numbers and their IDs") + else: + pass + + self.VERSION = VERSION # package version + r = requests.get("https://developers.facebook.com/docs/graph-api/changelog/").text + + # dynamically get the latest version of the API + if version == "latest": + soup = BeautifulSoup(r, features="html.parser") + t1 = soup.findAll("table") + + def makeversion(table): + result = [] + allrows = table.findAll('tr') + for row in allrows: + result.append([]) + allcols = row.findAll('td') + for col in allcols: + thestrings = [(s) for s in col.findAll(text=True)] + thetext = ''.join(thestrings) + result[-1].append(thetext) + return result[0][1] + self.LATEST = makeversion(t1[0]) # latest version of the API + else: + self.LATEST = version + + if update_check is True: + latest = str(requests.get( + "https://pypi.org/pypi/whatsapp-python/json").json()["info"]["version"]) + if self.VERSION != latest: + try: + version_int = int(self.VERSION.replace(".", "")) + except: + version_int = 0 + try: + latest_int = int(latest.replace(".", "")) + except: + latest_int = 0 + # this is to avoid the case where the version is 1.0.10 and the latest is 1.0.2 (possible if user is using the github version) + if version_int < latest_int: + if version_int == 0: + logging.critical( + f"There was an error while checking for updates, please check for updates manually. This may be due to the version being a post-release version (e.g. 1.0.0.post1) or a pre-release version (e.g. 1.0.0a1). READ THE CHANGELOG BEFORE UPDATING. NEW VERSIONS MAY BREAK YOUR CODE IF NOT PROPERLY UPDATED.") + else: + logging.critical( + f"Whatsapp-python is out of date. Please update to the latest version {latest}. READ THE CHANGELOG BEFORE UPDATING. NEW VERSIONS MAY BREAK YOUR CODE IF NOT PROPERLY UPDATED.") + + if token == "": + logging.error("Token not provided") + raise ValueError("Token not provided but required") + if phone_number_id == "": + logging.error("Phone number ID not provided") + raise ValueError( + "Phone number ID not provided but required") + self.token = token + self.phone_number_id = phone_number_id + self.base_url = f"https://graph.facebook.com/{self.LATEST}" + self.url = f"{self.base_url}/{phone_number_id}/messages" + self.verify_token = verify_token + + async def base(*args): + pass + self.message_handler = base + self.other_handler = base + self.verification_handler = base + self.headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {self.token}" + } + if logger is False: + logging.disable(logging.INFO) + logging.disable(logging.ERROR) + if debug is False: + logging.disable(logging.DEBUG) + logging.disable(logging.ERROR) + + self.app = FastAPI() + + # Verification handler has 1 argument: challenge (str | bool): str if verification is successful, False if not + + @self.app.get("/") + async def verify_endpoint(r: Request): + if r.query_params.get("hub.verify_token") == self.verify_token: + logging.debug("Webhook verified successfully") + challenge = r.query_params.get("hub.challenge") + await self.verification_handler(challenge) + await self.other_handler(challenge) + return int(challenge) + logging.error("Webhook Verification failed - token mismatch") + await self.verification_handler(False) + await self.other_handler(False) + return {"success": False} + + @self.app.post("/") + async def hook(r: Request): + try: + # Handle Webhook Subscriptions + data = await r.json() + if data is None: + return {"success": False} + data_str = json.dumps(data, indent=4) + # log the data received only if the log level is debug + logging.debug(f"Received webhook data: {data_str}") + + changed_field = self.changed_field(data) + if changed_field == "messages": + new_message = self.is_message(data) + if new_message: + msg = Message(instance=self, data=data) + await self.message_handler(msg) + await self.other_handler(msg) + return {"success": True} + except Exception as e: + logging.error(f"Error parsing message: {e}") + raise HTTPException(status_code=500, detail={ + "success": False, + "error": str(e) + }) + + # all the files starting with _ are imported here, and should not be imported directly. + + is_message = staticmethod(is_message) + get_mobile = staticmethod(get_mobile) + get_name = staticmethod(get_name) + get_message = staticmethod(get_message) + get_message_id = staticmethod(get_message_id) + get_message_type = staticmethod(get_message_type) + get_message_timestamp = staticmethod(get_message_timestamp) + get_audio = staticmethod(get_audio) + get_delivery = staticmethod(get_delivery) + get_document = staticmethod(get_document) + get_image = staticmethod(get_image) + get_sticker = staticmethod(get_sticker) + get_interactive_response = staticmethod(get_interactive_response) + get_location = staticmethod(get_location) + get_video = staticmethod(get_video) + changed_field = staticmethod(changed_field) + get_author = staticmethod(get_author) + + send_button = async_send_button + create_button = async_create_button + send_reply_button = async_send_reply_button + send_image = async_send_image + send_video = async_send_video + send_audio = async_send_audio + send_location = async_send_location + send_sticker = async_send_sticker + send_document = async_send_document + upload_media = async_upload_media + query_media_url = async_query_media_url + download_media = async_download_media + delete_media = async_delete_media + send_template = async_send_template + send_custom_json = async_send_custom_json + send_contacts = async_send_contacts + authorized = property(async_authorized) + + + + def create_message(self, **kwargs) -> AsyncMessage: + """ + Create a message object + + Args: + data[dict]: The message data + content[str]: The message content + to[str]: The recipient + rec_type[str]: The recipient type (individual/group) + """ + return AsyncMessage(**kwargs, instance=self) + + def on_message(self, handler: function): + """ + Set the handler for incoming messages + + Args: + handler[function]: The handler function + """ + self.message_handler = handler + + def on_event(self, handler: function): + """ + Set the handler for other events + + Args: + handler[function]: The handler function + """ + self.other_handler = handler + + def on_verification(self, handler: function): + """ + Set the handler for verification + + Args: + handler[function]: The handler function + """ + self.verification_handler = handler + + def run(self, host: str = "localhost", port: int = 5000, **options): + _run(self.app, host=host, port=port, **options) + + + class Message(object): # type: ignore def __init__(self, id: int = None, data: dict = {}, instance: WhatsApp = None, content: str = "", to: str = "", rec_type: str = "individual"): @@ -397,3 +639,175 @@ def react(self, emoji: str) -> dict: logging.info(f"Status code: {r.status_code}") logging.debug(f"Response: {r.json()}") return r.json() + + + +class AsyncMessage(object): + # type: ignore + def __init__(self, id: int = None, data: dict = {}, instance: WhatsApp = None, content: str = "", to: str = "", rec_type: str = "individual"): + self.instance = instance + self.url = self.instance.url + self.headers = self.instance.headers + + try: + self.id = instance.get_message_id(data) + except: + self.id = id + try: + self.type = self.instance.get_message_type(data) + except: + self.type = "text" + self.data = data + self.rec = rec_type + self.to = to + try: + self.content = content if content != "" else self.instance.get_message( + data) + except: + self.content = content + try: + self.name = self.instance.get_name(data) + except: + self.name = None + + if self.type == "image": + try: + self.image = self.instance.get_image(data) + except: + self.image = None + if self.type == "sticker": + try: + self.sticker = self.instance.get_sticker(data) + except: + self.sticker = None + elif self.type == "video": + try: + self.video = self.instance.get_video(data) + except: + self.video = None + elif self.type == "audio": + try: + self.audio = self.instance.get_audio(data) + except: + pass + elif self.type == "document": + try: + self.document = self.instance.get_document(data) + except: + pass + elif self.type == "location": + try: + self.location = self.instance.get_location(data) + except: + pass + elif self.type == "interactive": + try: + self.interactive = self.instance.get_interactive_response(data) + except: + pass + + + async def reply(self, reply_text: str = "", preview_url: bool = True) -> asyncio.Future: + if self.data == {}: + return {"error": "No data provided"} + author = self.instance.get_author(self.data) + payload = { + "messaging_product": "whatsapp", + "recipient_type": "individual", + "to": str(author), + "type": "text", + "context": {"message_id": self.id}, + "text": {"preview_url": preview_url, "body": reply_text}, + } + logging.info(f"Replying to {self.id}") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.post(self.url, headers=self.headers, json=payload) as response: + if response.status == 200: + logging.info(f"Message sent to {self.instance.get_author(self.data)}") + return await response.json() + logging.info(f"Message not sent to {self.instance.get_author(self.data)}") + logging.info(f"Status code: {response.status}") + logging.error(f"Response: {await response.json()}") + return await response.json() + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f + + + async def mark_as_read(self) -> asyncio.Future: + + payload = { + "messaging_product": "whatsapp", + "status": "read", + "message_id": self.id, + } + async def call(): + async with aiohttp.ClientSession() as session: + async with session.post(f"{self.instance.url}", headers=self.instance.headers, json=payload) as response: + if response.status == 200: + logging.info(await response.json()) + return await response.json() + else: + logging.error(await response.json()) + return await response.json() + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f + + async def send(self, sender=None, preview_url: bool = True) -> asyncio.Future: + try: + sender = dict(self.instance.l)[sender] + + except: + sender = self.instance.phone_number_id + + if sender == None: + sender = self.instance.phone_number_id + + url = f"https://graph.facebook.com/{self.instance.LATEST}/{sender}/messages" + data = { + "messaging_product": "whatsapp", + "recipient_type": self.rec, + "to": self.to, + "type": "text", + "text": {"preview_url": preview_url, "body": self.content}, + } + logging.info(f"Sending message to {self.to}") + async def call(): + print("sending") + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=self.headers, json=data) as response: + if response.status == 200: + logging.info(f"Message sent to {self.to}") + return await response.json() + logging.info(f"Message not sent to {self.to}") + logging.info(f"Status code: {response.status}") + logging.error(f"Response: {await response.json()}") + return await response.json() + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f + + async def react(self, emoji: str) -> asyncio.Future: + data = { + "messaging_product": "whatsapp", + "recipient_type": "individual", + "to": self.to, + "type": "reaction", + "reaction": {"message_id": self.id, "emoji": emoji}, + } + logging.info(f"Reacting to {self.id}") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.post(self.url, headers=self.headers, json=data) as response: + if response.status == 200: + logging.info(f"Reaction sent to {self.to}") + return await response.json() + logging.info(f"Reaction not sent to {self.to}") + logging.info(f"Status code: {response.status}") + logging.debug(f"Response: {await response.json()}") + return await response.json() + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) + return f diff --git a/whatsapp/async_ext/_buttons.py b/whatsapp/async_ext/_buttons.py new file mode 100644 index 0000000..b1b66ac --- /dev/null +++ b/whatsapp/async_ext/_buttons.py @@ -0,0 +1,118 @@ +import logging +import aiohttp +import asyncio +from typing import Dict, Any + + +def create_button(self, button: Dict[Any, Any]) -> Dict[Any, Any]: + """ + Method to create a button object to be used in the send_message method. + + This is method is designed to only be used internally by the send_button method. + + Args: + button[dict]: A dictionary containing the button data + """ + data = {"type": "list", "action": button.get("action")} + if button.get("header"): + data["header"] = {"type": "text", "text": button.get("header")} + if button.get("body"): + data["body"] = {"text": button.get("body")} + if button.get("footer"): + data["footer"] = {"text": button.get("footer")} + if button.get("type"): + data["type"] = button.get("type") + return data + + +async def send_button(self, button: Dict[Any, Any], recipient_id: str, sender=None) -> asyncio.Future: + """ + Sends an interactive buttons message to a WhatsApp user + + Args: + button[dict]: A dictionary containing the button data(rows-title may not exceed 20 characters) + recipient_id[str]: Phone number of the user with country code wihout + + + check https://github.com/Neurotech-HQ/whatsapp#sending-interactive-reply-buttons for an example. + """ + try: + sender = dict(self.l)[sender] + + except: + sender = self.phone_number_id + + if sender == None: + sender = self.phone_number_id + + url = f"https://graph.facebook.com/{self.LATEST}/{sender}/messages" + data = { + "messaging_product": "whatsapp", + "to": recipient_id, + "type": "interactive", + "interactive": self.create_button(button), + } + logging.info(f"Sending buttons to {recipient_id}") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=self.headers, json=data) as r: + if r.status == 200: + logging.info(f"Buttons sent to {recipient_id}") + return await r.json() + logging.info(f"Buttons not sent to {recipient_id}") + logging.info(f"Status code: {r.status}") + logging.info(f"Response: {await r.json()}") + return await r.json() + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f + + + +async def send_reply_button( + self, button: Dict[Any, Any], recipient_id: str, sender=None +) -> asyncio.Future: + """ + Sends an interactive reply buttons[menu] message to a WhatsApp user + + Args: + button[dict]: A dictionary containing the button data + recipient_id[str]: Phone number of the user with country code wihout + + + Note: + The maximum number of buttons is 3, more than 3 buttons will rise an error. + """ + try: + sender = dict(self.l)[sender] + + except: + sender = self.phone_number_id + + if sender == None: + sender = self.phone_number_id + + url = f"https://graph.facebook.com/{self.LATEST}/{sender}/messages" + if len(button["action"]["buttons"]) > 3: + raise ValueError("The maximum number of buttons is 3.") + + data = { + "messaging_product": "whatsapp", + "recipient_type": "individual", + "to": recipient_id, + "type": "interactive", + "interactive": button, + } + logging.info(f"Sending buttons to {recipient_id}") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=self.headers, json=data) as r: + if r.status == 200: + logging.info(f"Buttons sent to {recipient_id}") + return await r.json() + logging.info(f"Buttons not sent to {recipient_id}") + logging.info(f"Status code: {r.status}") + logging.info(f"Response: {await r.json()}") + return await r.json() + + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f diff --git a/whatsapp/async_ext/_media.py b/whatsapp/async_ext/_media.py new file mode 100644 index 0000000..6bb36c7 --- /dev/null +++ b/whatsapp/async_ext/_media.py @@ -0,0 +1,169 @@ +import logging +import aiohttp +import asyncio +import os +import mimetypes +from requests_toolbelt.multipart.encoder import MultipartEncoder +from typing import Union, Dict, Any + + +async def upload_media(self, media: str, sender=None) -> asyncio.Future: + """ + Uploads a media to the cloud api and returns the id of the media + + Args: + media[str]: Path of the media to be uploaded + + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> whatsapp.upload_media("/path/to/media") + + REFERENCE: https://developers.facebook.com/docs/whatsapp/cloud-api/reference/media# + """ + try: + sender = dict(self.l)[sender] + + except: + sender = self.phone_number_id + + if sender == None: + sender = self.phone_number_id + + form_data = { + "file": ( + media, + open(os.path.realpath(media), "rb"), + mimetypes.guess_type(media)[0], + ), + "messaging_product": "whatsapp", + "type": mimetypes.guess_type(media)[0], + } + form_data = MultipartEncoder(fields=form_data) + headers = self.headers.copy() + headers["Content-Type"] = form_data.content_type + logging.info(f"Content-Type: {form_data.content_type}") + logging.info(f"Uploading media {media}") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.post( + f"https://graph.facebook.com/{self.LATEST}/{sender}/media", + headers=headers, + data=form_data, + ) as r: + if r.status == 200: + logging.info(f"Media {media} uploaded") + return await r.json() + logging.info(f"Error uploading media {media}") + logging.info(f"Status code: {r.status}") + logging.info(f"Response: {await r.json()}") + return await r.json() + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f + + + +async def delete_media(self, media_id: str) -> asyncio.Future: + """ + Deletes a media from the cloud api + + Args: + media_id[str]: Id of the media to be deleted + """ + logging.info(f"Deleting media {media_id}") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.delete( + f"https://graph.facebook.com/{self.LATEST}/{media_id}", + headers=self.headers, + ) as r: + if r.status == 200: + logging.info(f"Media {media_id} deleted") + return await r.json() + logging.info(f"Error deleting media {media_id}") + logging.info(f"Status code: {r.status}") + logging.info(f"Response: {await r.json()}") + return await r.json() + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f + + +async def query_media_url(self, media_id: str) -> asyncio.Future: + """ + Query media url from media id obtained either by manually uploading media or received media + + Args: + media_id[str]: Media id of the media + + Returns: + str: Media url + + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> whatsapp.query_media_url("media_id") + """ + + logging.info(f"Querying media url for {media_id}") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.get( + f"https://graph.facebook.com/{self.LATEST}/{media_id}", + headers=self.headers, + ) as r: + if r.status == 200: + logging.info(f"Media url for {media_id} queried") + return await r.json() + logging.info(f"Error querying media url for {media_id}") + logging.info(f"Status code: {r.status}") + logging.info(f"Response: {await r.json()}") + return await r.json() + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f + + + +async def download_media( + self, media_url: str, mime_type: str, file_path: str = "temp" +) -> asyncio.Future: + """ + Download media from media url obtained either by manually uploading media or received media + + Args: + media_url[str]: Media url of the media + mime_type[str]: Mime type of the media + file_path[str]: Path of the file to be downloaded to. Default is "temp" + Do not include the file extension. It will be added automatically. + + Returns: + str: Media url + + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> whatsapp.download_media("media_url", "image/jpeg") + >>> whatsapp.download_media("media_url", "video/mp4", "path/to/file") #do not include the file extension + """ + logging.info(f"Downloading media from {media_url}") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.get(media_url, headers=self.headers) as r: + if r.status == 200: + logging.info(f"Media downloaded from {media_url}") + extension = mime_type.split("/")[1] + save_file_here = ( + f"{file_path}.{extension}" if file_path else f"temp.{extension}" + ) + with open(save_file_here, "wb") as f: + f.write(await r.read()) + return save_file_here + logging.info(f"Error downloading media from {media_url}") + logging.info(f"Status code: {r.status}") + logging.info(f"Response: {await r.json()}") + return await r.json() + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f \ No newline at end of file diff --git a/whatsapp/async_ext/_message.py b/whatsapp/async_ext/_message.py new file mode 100644 index 0000000..58f69e1 --- /dev/null +++ b/whatsapp/async_ext/_message.py @@ -0,0 +1,164 @@ +import logging +import aiohttp +import asyncio + + +async def react(self, emoji: str) -> asyncio.Future: + data = { + "messaging_product": "whatsapp", + "recipient_type": "individual", + "to": self.sender, + "type": "reaction", + "reaction": {"message_id": self.id, "emoji": emoji}, + } + logging.info(f"Reacting to {self.id}") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.post(self.url, headers=self.headers, json=data) as r: + if r.status == 200: + logging.info(f"Reacted to {self.id}") + return await r.json() + logging.info(f"Reaction not sent to {self.id}") + logging.info(f"Status code: {r.status}") + logging.info(f"Response: {await r.json()}") + return await r.json() + + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f + + +async def send_template(self, template: str, recipient_id: str, components: str = None, lang: str = "en_US", sender = None) -> asyncio.Future: + """ + Sends a template message to a WhatsApp user, Template messages can either be; + 1. Text template + 2. Media based template + 3. Interactive template + You can customize the template message by passing a dictionary of components. + You can find the available components in the documentation. + https://developers.facebook.com/docs/whatsapp/cloud-api/guides/send-message-templates + Args: + template[str]: Template name to be sent to the user + recipient_id[str]: Phone number of the user with country code wihout + + lang[str]: Language of the template message + components[list]: List of components to be sent to the user # \ + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> whatsapp.send_template("hello_world", "5511999999999", lang="en_US")) + """ + try: + sender = dict(self.l)[sender] + + except: + sender = self.phone_number_id + + if sender == None: + sender = self.phone_number_id + + url = f"https://graph.facebook.com/{self.LATEST}/{sender}/messages" + data = { + "messaging_product": "whatsapp", + "to": recipient_id, + "type": "template", + "template": { + "name": template, + "language": {"code": lang}, + "components": components, + }, + } + logging.info(f"Sending template to {recipient_id}") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=self.headers, json=data) as r: + if r.status == 200: + logging.info(f"Template sent to {recipient_id}") + return await r.json() + logging.info(f"Template not sent to {recipient_id}") + logging.info(f"Status code: {r.status}") + logging.info(f"Response: {await r.json()}") + return await r.json() + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f + + +# MESSAGE() + +async def reply(self, reply_text: str = "", preview_url: bool = True) -> asyncio.Future: + if self.data == {}: + return {"error": "No data provided"} + author = self.instance.get_author(self.data) + payload = { + "messaging_product": "whatsapp", + "recipient_type": "individual", + "to": str(author), + "type": "text", + "context": {"message_id": self.id}, + "text": {"preview_url": preview_url, "body": reply_text}, + } + logging.info(f"Replying to {self.id}") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.post(self.url, headers=self.headers, json=payload) as r: + if r.status == 200: + logging.info(f"Replied to {self.id}") + return await r.json() + logging.info(f"Reply not sent to {self.id}") + logging.info(f"Status code: {r.status}") + logging.info(f"Response: {await r.json()}") + return await r.json() + + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f + + +async def mark_as_read(self) -> asyncio.Future: + payload = { + "messaging_product": "whatsapp", + "status": "read", + "message_id": self.id, + } + + logging.info(f"Marking message {self.id} as read") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.post(self.url, headers=self.headers, json=payload) as r: + if r.status == 200: + logging.info(f"Message {self.id} marked as read") + return await r.json() + logging.info(f"Error marking message {self.id} as read") + logging.info(f"Status code: {r.status}") + logging.info(f"Response: {await r.json()}") + return await r.json() + + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f + + +async def send(self, preview_url: bool = True) -> asyncio.Future: + url = f"https://graph.facebook.com/{self.LATEST}/{self.sender}/messages" + data = { + "messaging_product": "whatsapp", + "recipient_type": self.rec, + "to": self.to, + "type": "text", + "text": {"preview_url": preview_url, "body": self.content}, + } + logging.info(f"Sending message to {self.to}") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=self.headers, json=data) as r: + if r.status == 200: + logging.info(f"Message sent to {self.to}") + return await r.json() + logging.info(f"Message not sent to {self.to}") + logging.info(f"Status code: {r.status}") + logging.info(f"Response: {await r.json()}") + return await r.json() + + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f diff --git a/whatsapp/async_ext/_property.py b/whatsapp/async_ext/_property.py new file mode 100644 index 0000000..f3501a0 --- /dev/null +++ b/whatsapp/async_ext/_property.py @@ -0,0 +1,5 @@ +import requests + + +def authorized(self) -> bool: + return requests.get(self.url, headers=self.headers).status_code != 401 diff --git a/whatsapp/async_ext/_send_media.py b/whatsapp/async_ext/_send_media.py new file mode 100644 index 0000000..20bb565 --- /dev/null +++ b/whatsapp/async_ext/_send_media.py @@ -0,0 +1,377 @@ +import logging +import aiohttp +import asyncio + + +async def send_location(self, lat: str, long: str, name: str, address: str, recipient_id: str, sender = None) -> asyncio.Future: + """ + Sends a location message to a WhatsApp user + + Args: + lat[str]: Latitude of the location + long[str]: Longitude of the location + name[str]: Name of the location + address[str]: Address of the location + recipient_id[str]: Phone number of the user with country code wihout + + + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> whatsapp.send_location("-23.564", "-46.654", "My Location", "Rua dois, 123", "5511999999999") + """ + try: + sender = dict(self.l)[sender] + + + + except: + sender = self.phone_number_id + + if sender == None: + sender = self.phone_number_id + + url = f"https://graph.facebook.com/{self.LATEST}/{sender}/messages" + data = { + "messaging_product": "whatsapp", + "to": recipient_id, + "type": "location", + "location": { + "latitude": lat, + "longitude": long, + "name": name, + "address": address, + }, + } + logging.info(f"Sending location to {recipient_id}") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=self.headers, json=data) as r: + if r.status == 200: + logging.info(f"Location sent to {recipient_id}") + return await r.json() + logging.info(f"Location not sent to {recipient_id}") + logging.info(f"Status code: {r.status}") + logging.info(f"Response: {await r.json()}") + return await r.json() + + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f + + +async def send_image( + self, + image: str, + recipient_id: str, + recipient_type: str = "individual", + caption: str = "", + link: bool = True, + sender = None +) -> asyncio.Future: + """ + Sends an image message to a WhatsApp user + + There are two ways to send an image message to a user, either by passing the image id or by passing the image link. + Image id is the id of the image uploaded to the cloud api. + + Args: + image[str]: Image id or link of the image + recipient_id[str]: Phone number of the user with country code wihout + + recipient_type[str]: Type of the recipient, either individual or group + caption[str]: Caption of the image + link[bool]: Whether to send an image id or an image link, True means that the image is an id, False means that the image is a link + + + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> whatsapp.send_image("https://i.imgur.com/Fh7XVYY.jpeg", "5511999999999") + """ + try: + sender = dict(self.l)[sender] + + + + except: + sender = self.phone_number_id + + if sender == None: + sender = self.phone_number_id + + url = f"https://graph.facebook.com/{self.LATEST}/{sender}/messages" + if link: + data = { + "messaging_product": "whatsapp", + "recipient_type": recipient_type, + "to": recipient_id, + "type": "image", + "image": {"link": image, "caption": caption}, + } + else: + data = { + "messaging_product": "whatsapp", + "recipient_type": recipient_type, + "to": recipient_id, + "type": "image", + "image": {"id": image, "caption": caption}, + } + logging.info(f"Sending image to {recipient_id}") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=self.headers, json=data) as r: + if r.status == 200: + logging.info(f"Image sent to {recipient_id}") + return await r.json() + logging.info(f"Image not sent to {recipient_id}") + logging.info(f"Status code: {r.status}") + logging.info(f"Response: {await r.json()}") + return await r.json() + + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f + + +async def send_sticker(self, sticker: str, recipient_id: str, recipient_type: str = "individual", link: bool = True, sender = None) -> asyncio.Future: + """ + Sends a sticker message to a WhatsApp user + + There are two ways to send a sticker message to a user, either by passing the image id or by passing the sticker link. + Sticker id is the id of the sticker uploaded to the cloud api. + + Args: + sticker[str]: Sticker id or link of the sticker + recipient_id[str]: Phone number of the user with country code wihout + + recipient_type[str]: Type of the recipient, either individual or group + link[bool]: Whether to send an sticker id or an sticker link, True means that the sticker is an id, False means that the image is a link + + + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> whatsapp.send_sticker("170511049062862", "5511999999999", link=False) + """ + try: + sender = dict(self.l)[sender] + + + + except: + sender = self.phone_number_id + + if sender == None: + sender = self.phone_number_id + + url = f"https://graph.facebook.com/{self.LATEST}/{sender}/messages" + if link: + data = { + "messaging_product": "whatsapp", + "recipient_type": recipient_type, + "to": recipient_id, + "type": "sticker", + "sticker": {"link": sticker}, + } + else: + data = { + "messaging_product": "whatsapp", + "recipient_type": recipient_type, + "to": recipient_id, + "type": "sticker", + "sticker": {"id": sticker}, + } + logging.info(f"Sending sticker to {recipient_id}") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=self.headers, json=data) as r: + if r.status == 200: + logging.info(f"Sticker sent to {recipient_id}") + return await r.json() + logging.info(f"Sticker not sent to {recipient_id}") + logging.info(f"Status code: {r.status}") + logging.info(f"Response: {await r.json()}") + return await r.json() + + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f + + +async def send_audio(self, audio: str, recipient_id: str, link: bool = True, sender = None) -> asyncio.Future: + """ + Sends an audio message to a WhatsApp user + Audio messages can either be sent by passing the audio id or by passing the audio link. + + Args: + audio[str]: Audio id or link of the audio + recipient_id[str]: Phone number of the user with country code wihout + + link[bool]: Whether to send an audio id or an audio link, True means that the audio is an id, False means that the audio is a link + + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> whatsapp.send_audio("https://www.soundhelix.com/examples/mp3/SoundHelix-Song-1.mp3", "5511999999999") + """ + try: + sender = dict(self.l)[sender] + + + + except: + sender = self.phone_number_id + + if sender == None: + sender = self.phone_number_id + + url = f"https://graph.facebook.com/{self.LATEST}/{sender}/messages" + if link: + data = { + "messaging_product": "whatsapp", + "to": recipient_id, + "type": "audio", + "audio": {"link": audio}, + } + else: + data = { + "messaging_product": "whatsapp", + "to": recipient_id, + "type": "audio", + "audio": {"id": audio}, + } + logging.info(f"Sending audio to {recipient_id}") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=self.headers, json=data) as r: + if r.status == 200: + logging.info(f"Audio sent to {recipient_id}") + return await r.json() + logging.info(f"Audio not sent to {recipient_id}") + logging.info(f"Status code: {r.status}") + logging.info(f"Response: {await r.json()}") + return await r.json() + + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f + + +async def send_video( + self, video: str, recipient_id: str, caption: str = "", link: bool = True, sender = None +) -> asyncio.Future: + """ " + Sends a video message to a WhatsApp user + Video messages can either be sent by passing the video id or by passing the video link. + + Args: + video[str]: Video id or link of the video + recipient_id[str]: Phone number of the user with country code wihout + + caption[str]: Caption of the video + link[bool]: Whether to send a video id or a video link, True means that the video is an id, False means that the video is a link + + example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> whatsapp.send_video("https://www.youtube.com/watch?v=dQw4w9WgXcQ", "5511999999999") + """ + try: + sender = dict(self.l)[sender] + + + + except: + sender = self.phone_number_id + + if sender == None: + sender = self.phone_number_id + + url = f"https://graph.facebook.com/{self.LATEST}/{sender}/messages" + if link: + data = { + "messaging_product": "whatsapp", + "to": recipient_id, + "type": "video", + "video": {"link": video, "caption": caption}, + } + else: + data = { + "messaging_product": "whatsapp", + "to": recipient_id, + "type": "video", + "video": {"id": video, "caption": caption}, + } + logging.info(f"Sending video to {recipient_id}") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=self.headers, json=data) as r: + if r.status == 200: + logging.info(f"Video sent to {recipient_id}") + return await r.json() + logging.info(f"Video not sent to {recipient_id}") + logging.info(f"Status code: {r.status}") + logging.info(f"Response: {await r.json()}") + return await r.json() + + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f + + + +async def send_document( + self, document: str, recipient_id: str, caption: str = "", link: bool = True, sender = None) -> asyncio.Future: + """ " + Sends a document message to a WhatsApp user + Document messages can either be sent by passing the document id or by passing the document link. + + Args: + document[str]: Document id or link of the document + recipient_id[str]: Phone number of the user with country code wihout + + caption[str]: Caption of the document + link[bool]: Whether to send a document id or a document link, True means that the document is an id, False means that the document is a link + + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> whatsapp.send_document("https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf", "5511999999999") + """ + try: + sender = dict(self.l)[sender] + + + + except: + sender = self.phone_number_id + + if sender == None: + sender = self.phone_number_id + + url = f"https://graph.facebook.com/{self.LATEST}/{sender}/messages" + if link: + data = { + "messaging_product": "whatsapp", + "to": recipient_id, + "type": "document", + "document": {"link": document, "caption": caption}, + } + else: + data = { + "messaging_product": "whatsapp", + "to": recipient_id, + "type": "document", + "document": {"id": document, "caption": caption}, + } + + logging.info(f"Sending document to {recipient_id}") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=self.headers, json=data) as r: + if r.status == 200: + logging.info(f"Document sent to {recipient_id}") + return await r.json() + logging.info(f"Document not sent to {recipient_id}") + logging.info(f"Status code: {r.status}") + logging.info(f"Response: {await r.json()}") + return await r.json() + + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f \ No newline at end of file diff --git a/whatsapp/async_ext/_send_others.py b/whatsapp/async_ext/_send_others.py new file mode 100644 index 0000000..18c6a88 --- /dev/null +++ b/whatsapp/async_ext/_send_others.py @@ -0,0 +1,120 @@ +from typing import Any, Dict, List +import aiohttp +import asyncio +import logging + + +async def send_custom_json(self, data: dict, recipient_id: str = "", sender=None) -> asyncio.Future: + """ + Sends a custom json to a WhatsApp user. This can be used to send custom objects to the message endpoint. + + Args: + data[dict]: Dictionary that should be send + recipient_id[str]: Phone number of the user with country code wihout + + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> whatsapp.send_custom_json({ + "messaging_product": "whatsapp", + "type": "audio", + "audio": {"id": audio}}, "5511999999999") + """ + try: + sender = dict(self.l)[sender] + + except: + sender = self.phone_number_id + + if sender == None: + sender = self.phone_number_id + + url = f"https://graph.facebook.com/{self.LATEST}/{sender}/messages" + + if recipient_id: + if "to" in data.keys(): + data_recipient_id = data["to"] + logging.info( + f"Recipient Id is defined in data ({data_recipient_id}) and recipient_id parameter ({recipient_id})" + ) + else: + data["to"] = recipient_id + + logging.info(f"Sending custom json to {recipient_id}") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=self.headers, json=data) as r: + if r.status == 200: + logging.info(f"Custom json sent to {recipient_id}") + return await r.json() + logging.info(f"Custom json not sent to {recipient_id}") + logging.info(f"Status code: {r.status}") + logging.info(f"Response: {await r.json()}") + return await r.json() + + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f + + +async def send_contacts( + self, contacts: List[Dict[Any, Any]], recipient_id: str, sender=None +) -> asyncio.Future: + """send_contacts + + Send a list of contacts to a user + + Args: + contacts(List[Dict[Any, Any]]): List of contacts to send + recipient_id(str): Phone number of the user with country code wihout + + + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> contacts = [{ + "addresses": [{ + "street": "STREET", + "city": "CITY", + "state": "STATE", + "zip": "ZIP", + "country": "COUNTRY", + "country_code": "COUNTRY_CODE", + "type": "HOME" + }, + .... + } + ] + + REFERENCE: https://developers.facebook.com/docs/whatsapp/cloud-api/reference/messages#contacts-object + """ + try: + sender = dict(self.l)[sender] + + except: + sender = self.phone_number_id + + if sender == None: + sender = self.phone_number_id + + url = f"https://graph.facebook.com/{self.LATEST}/{sender}/messages" + + data = { + "messaging_product": "whatsapp", + "to": recipient_id, + "type": "contacts", + "contacts": contacts, + } + logging.info(f"Sending contacts to {recipient_id}") + async def call(): + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=self.headers, json=data) as r: + if r.status == 200: + logging.info(f"Contacts sent to {recipient_id}") + return await r.json() + logging.info(f"Contacts not sent to {recipient_id}") + logging.info(f"Status code: {r.status}") + logging.info(f"Response: {await r.json()}") + return await r.json() + + f = asyncio.ensure_future(call()) + await asyncio.sleep(.001) # make asyncio run the task + return f \ No newline at end of file diff --git a/whatsapp/async_ext/_static.py b/whatsapp/async_ext/_static.py new file mode 100644 index 0000000..123a439 --- /dev/null +++ b/whatsapp/async_ext/_static.py @@ -0,0 +1,329 @@ +from typing import Any, Dict, Union + + +@staticmethod +def is_message(data: Dict[Any, Any]) -> bool: + """is_message checks if the data received from the webhook is a message. + + Args: + data (Dict[Any, Any]): The data received from the webhook + + Returns: + bool: True if the data is a message, False otherwise + """ + data = data["entry"][0]["changes"][0]["value"] + if "messages" in data: + return True + else: + return False + + +@staticmethod +def get_mobile(data: Dict[Any, Any]) -> Union[str, None]: + """ + Extracts the mobile number of the sender from the data received from the webhook. + + Args: + data[dict]: The data received from the webhook + Returns: + str: The mobile number of the sender + + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> mobile = whatsapp.get_mobile(data) + """ + data = data["entry"][0]["changes"][0]["value"] + if "contacts" in data: + return data["contacts"][0]["wa_id"] + + +@staticmethod +def get_name(data: Dict[Any, Any]) -> Union[str, None]: + """ + Extracts the name of the sender from the data received from the webhook. + + Args: + data[dict]: The data received from the webhook + Returns: + str: The name of the sender + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> mobile = whatsapp.get_name(data) + """ + contact = data["entry"][0]["changes"][0]["value"] + if contact: + return contact["contacts"][0]["profile"]["name"] + + +@staticmethod +def get_message(data: Dict[Any, Any]) -> Union[str, None]: + """ + Extracts the text message of the sender from the data received from the webhook. + + Args: + data[dict]: The data received from the webhook + Returns: + str: The text message received from the sender + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> message = message.get_message(data) + """ + data = data["entry"][0]["changes"][0]["value"] + if "messages" in data: + return data["messages"][0]["text"]["body"] + + +@staticmethod +def get_message_id(data: Dict[Any, Any]) -> Union[str, None]: + """ + Extracts the message id of the sender from the data received from the webhook. + + Args: + data[dict]: The data received from the webhook + Returns: + str: The message id of the sender + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> message_id = whatsapp.get_message_id(data) + """ + data = data["entry"][0]["changes"][0]["value"] + if "messages" in data: + return data["messages"][0]["id"] + + +@staticmethod +def get_message_timestamp(data: Dict[Any, Any]) -> Union[str, None]: + """ " + Extracts the timestamp of the message from the data received from the webhook. + + Args: + data[dict]: The data received from the webhook + Returns: + str: The timestamp of the message + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> whatsapp.get_message_timestamp(data) + """ + data = data["entry"][0]["changes"][0]["value"] + if "messages" in data: + return data["messages"][0]["timestamp"] + + +@staticmethod +def get_interactive_response(data: Dict[Any, Any]) -> Union[Dict, None]: + """ + Extracts the response of the interactive message from the data received from the webhook. + + Args: + data[dict]: The data received from the webhook + Returns: + dict: The response of the interactive message + + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> response = whatsapp.get_interactive_response(data) + >>> interactive_type = response.get("type") + >>> message_id = response[interactive_type]["id"] + >>> message_text = response[interactive_type]["title"] + """ + data = data["entry"][0]["changes"][0]["value"] + if "messages" in data: + if "interactive" in data["messages"][0]: + return data["messages"][0]["interactive"] + + +@staticmethod +def get_location(data: Dict[Any, Any]) -> Union[Dict, None]: + """ + Extracts the location of the sender from the data received from the webhook. + + Args: + data[dict]: The data received from the webhook + + Returns: + dict: The location of the sender + + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> whatsapp.get_location(data) + """ + data = data["entry"][0]["changes"][0]["value"] + if "messages" in data: + if "location" in data["messages"][0]: + return data["messages"][0]["location"] + + +@staticmethod +def get_image(data: Dict[Any, Any]) -> Union[Dict, None]: + """ " + Extracts the image of the sender from the data received from the webhook. + + Args: + data[dict]: The data received from the webhook + Returns: + dict: The image_id of an image sent by the sender + + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> image_id = whatsapp.get_image(data) + """ + data = data["entry"][0]["changes"][0]["value"] + if "messages" in data: + if "image" in data["messages"][0]: + return data["messages"][0]["image"] + +@staticmethod +def get_sticker(data: Dict[Any, Any]) -> Union[Dict, None]: + """ " + Extracts the sticker of the sender from the data received from the webhook. + + Args: + data[dict]: The data received from the webhook + Returns: + dict: The sticker_id of an sticker sent by the sender + + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> sticker_id = whatsapp.get_sticker(data) + """ + data = data["entry"][0]["changes"][0]["value"] + if "messages" in data: + if "sticker" in data["messages"][0]: + return data["messages"][0]["sticker"] + +@staticmethod +def get_document(data: Dict[Any, Any]) -> Union[Dict, None]: + """ " + Extracts the document of the sender from the data received from the webhook. + + Args: + data[dict]: The data received from the webhook + Returns: + dict: The document_id of an image sent by the sender + + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> document_id = whatsapp.get_document(data) + """ + data = data["entry"][0]["changes"][0]["value"] + if "messages" in data: + if "document" in data["messages"][0]: + return data["messages"][0]["document"] + + +@staticmethod +def get_audio(data: Dict[Any, Any]) -> Union[Dict, None]: + """ + Extracts the audio of the sender from the data received from the webhook. + + Args: + data[dict]: The data received from the webhook + + Returns: + dict: The audio of the sender + + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> whatsapp.get_audio(data) + """ + data = data["entry"][0]["changes"][0]["value"] + if "messages" in data: + if "audio" in data["messages"][0]: + return data["messages"][0]["audio"] + + +@staticmethod +def get_video(data: Dict[Any, Any]) -> Union[Dict, None]: + """ + Extracts the video of the sender from the data received from the webhook. + + Args: + data[dict]: The data received from the webhook + + Returns: + dict: Dictionary containing the video details sent by the sender + + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> whatsapp.get_video(data) + """ + data = data["entry"][0]["changes"][0]["value"] + if "messages" in data: + if "video" in data["messages"][0]: + return data["messages"][0]["video"] + + +@staticmethod +def get_message_type(data: Dict[Any, Any]) -> Union[str, None]: + """ + Gets the type of the message sent by the sender from the data received from the webhook. + + + Args: + data [dict]: The data received from the webhook + + Returns: + str: The type of the message sent by the sender + + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> whatsapp.get_message_type(data) + """ + data = data["entry"][0]["changes"][0]["value"] + if "messages" in data: + return data["messages"][0]["type"] + + +@staticmethod +def get_delivery(data: Dict[Any, Any]) -> Union[Dict, None]: + """ + Extracts the delivery status of the message from the data received from the webhook. + Args: + data [dict]: The data received from the webhook + + Returns: + dict: The delivery status of the message and message id of the message + """ + data = data["entry"][0]["changes"][0]["value"] + if "statuses" in data: + return data["statuses"][0]["status"] + + +@staticmethod +def changed_field(data: Dict[Any, Any]) -> str: + """ + Helper function to check if the field changed in the data received from the webhook. + + Args: + data [dict]: The data received from the webhook + + Returns: + str: The field changed in the data received from the webhook + + Example: + >>> from whatsapp import WhatsApp + >>> whatsapp = WhatsApp(token, phone_number_id) + >>> whatsapp.changed_field(data) + """ + return data["entry"][0]["changes"][0]["field"] + + +@staticmethod +def get_author(data: Dict[Any, Any]) -> Union[str, None]: + try: + return data["entry"][0]["changes"][0]["value"]["messages"][0]["from"] + except Exception: + return None diff --git a/whatsapp/constants.py b/whatsapp/constants.py index 4768a42..a4e8b50 100644 --- a/whatsapp/constants.py +++ b/whatsapp/constants.py @@ -1,4 +1,7 @@ # This file contains the constants used in the project. # The VERSION constant is used to store the version of the project - it's not only used in the __init__.py file, but also in the pyproject.toml file. +import asyncio + VERSION = "4.0.2" +LOOP = asyncio.get_event_loop() \ No newline at end of file