Source code for whatstk.whatsapp.parser

"""Parser utils."""


import os
import re
from datetime import datetime
from urllib.request import urlopen
from typing import Optional, TYPE_CHECKING, Tuple, List, Dict

import pandas as pd

from whatstk.utils.exceptions import RegexError, HFormatError
from whatstk.utils.utils import COLNAMES_DF
from whatstk.whatsapp.auto_header import extract_header_from_text

if TYPE_CHECKING:  # pragma: no cover
    from whatstk.whatsapp.objects import WhatsAppChat  # pragma: no cover


regex_simplifier = {
    "%Y": r"(?P<year>\d{2,4})",
    "%y": r"(?P<year>\d{2,4})",
    "%m": r"(?P<month>\d{1,2})",
    "%d": r"(?P<day>\d{1,2})",
    "%H": r"(?P<hour>\d{1,2})",
    "%I": r"(?P<hour>\d{1,2})",
    "%M": r"(?P<minutes>\d{2})",
    "%S": r"(?P<seconds>\d{2})",
    "%P": r"(?P<ampm>[AaPp].? ?[Mm].?)",
    "%p": r"(?P<ampm>[AaPp].? ?[Mm].?)",
    "%name": rf"(?P<{COLNAMES_DF.USERNAME}>[^:]*)",
}


[docs]def df_from_txt_whatsapp( filepath: str, auto_header: bool = True, hformat: Optional[str] = None, encoding: str = "utf-8", ) -> "WhatsAppChat": """Load chat as a DataFrame. Args: filepath (str): Path to the file. Accepted sources are: * Local file, e.g. 'path/to/file.txt'. * URL to a remote hosted file, e.g. 'http://www.url.to/file.txt'. * Link to Google Drive file, e.g. 'gdrive://35gKKrNk-i3t05zPLyH4_P1rPdOmKW9NZ'. The format is expected to be 'gdrive://[FILE-ID]'. Note that in order to load a file from Google Drive you first need to run :func:`gdrive_init <whatstk.utils.gdrive.gdrive_init>`. auto_header (bool, optional): Detect header automatically. If False, ``hformat`` is required. hformat (str, optional): :ref:`Format of the header <The header format>`, e.g. ``'[%y-%m-%d %H:%M:%S] - %name:'``. Use following keywords: - ``'%y'``: for year (``'%Y'`` is equivalent). - ``'%m'``: for month. - ``'%d'``: for day. - ``'%H'``: for 24h-hour. - ``'%I'``: for 12h-hour. - ``'%M'``: for minutes. - ``'%S'``: for seconds. - ``'%P'``: for "PM"/"AM" or "p.m."/"a.m." characters. - ``'%name'``: for the username. Example 1: For the header '12/08/2016, 16:20 - username:' we have the ``'hformat='%d/%m/%y, %H:%M - %name:'``. Example 2: For the header '2016-08-12, 4:20 PM - username:' we have ``hformat='%y-%m-%d, %I:%M %P - %name:'``. encoding (str, optional): Encoding to use for UTF when reading/writing (ex. 'utf-8'). `List of Python standard encodings <https://docs.python.org/3/library/codecs. html#standard-encodings>`_. Returns: WhatsAppChat: Class instance with loaded and parsed chat. .. seealso:: * :func:`WhatsAppChat.from_source <whatstk.whatsapp.objects.WhatsAppChat.from_source>` * :func:`extract_header_from_text <whatstk.whatsapp.auto_header.extract_header_from_text>` * :func:`gdrive_init <whatstk.utils.gdrive.gdrive_init>` """ # Read local file text = _str_from_txt(filepath, encoding) # Build dataframe df = _df_from_str(text, auto_header, hformat) return df
[docs]def generate_regex(hformat: str) -> Tuple[str, str]: r"""Generate regular expression from hformat. Args: hformat (str): Simplified syntax for the header, e.g. ``'%y-%m-%d, %H:%M:%S - %name:'``. Returns: str: Regular expression corresponding to the specified syntax. Example: Generate regular expression corresponding to ``'hformat=%y-%m-%d, %H:%M:%S - %name:'``. .. code-block:: python >>> from whatstk.whatsapp.parser import generate_regex >>> generate_regex('%y-%m-%d, %H:%M:%S - %name:') ('(?P<year>\\d{2,4})-(?P<month>\\d{1,2})-(?P<day>\\d{1,2}), (?P<hour>\\d{1,2}):(?P<minutes>\\d{2}):(? P<seconds>\\d{2}) - (?P<username>[^:]*): ', '(?P<year>\\d{2,4})-(?P<month>\\d{1,2})-(?P<day>\\d{1,2}), (? P<hour>\\d{1,2}):(?P<minutes>\\d{2}):(?P<seconds>\\d{2}) - ') """ items = re.findall(r"\%\w*", hformat) for i in items: hformat = hformat.replace(i, regex_simplifier[i]) hformat = hformat + " " hformat_x = hformat.split("(?P<username>[^:]*)")[0] return hformat, hformat_x
def _str_from_txt(filepath: str, encoding: str = "utf-8") -> str: """Read text content as string. Args: filepath (str): Path to file. Accepted: local file, url (http://...), Google Drive file (gdrive://[file-id]). encoding (str, optional): Encoding to use for UTF when reading/writing (ex. ‘utf-8’). `List of Python standard encodings <https://docs.python.org/3/library/codecs. html#standard-encodings>`_. Raises: FileNotFoundError: [description] Returns: str: File content as a string. """ # Read local file if os.path.isfile(filepath) and os.access(filepath, os.R_OK): with open(filepath, "r", encoding=encoding) as f: text = f.read() # Read file from URL elif filepath.lower().startswith("http"): with urlopen(filepath) as response: # noqa text = response.read() text = text.decode(encoding) elif filepath.startswith("gdrive"): from whatstk.utils.gdrive import _load_str_from_file_id file_id = filepath.replace("gdrive://", "") text = _load_str_from_file_id(file_id) else: raise FileNotFoundError(f"File {filepath} was not found locally or remotely. Please check it exists.") return text def _df_from_str(text: str, auto_header: bool = True, hformat: Optional[str] = None) -> pd.DataFrame: # Get hformat if hformat: # Bracket is reserved character in RegEx, add backslash before them. hformat = hformat.replace("[", r"\[").replace("]", r"\]") if not hformat and auto_header: hformat = extract_header_from_text(text) if not hformat: raise RuntimeError( "Header automatic extraction failed. Please specify the format manually by setting" " input argument `hformat`. Report this issue so that automatic header detection support" " for your header format is added: https://github.com/lucasrodes/whatstk/issues." ) elif not (hformat or auto_header): raise ValueError("If auto_header is False, hformat can't be None.") # Generate regex for given hformat r, r_x = generate_regex(hformat=hformat) # Parse chat to DataFrame try: df = _parse_chat(text, r) except RegexError: raise HFormatError("hformat '{}' did not match the provided text. No match was found".format(hformat)) df = _remove_alerts_from_df(r_x, df) df = _add_schema(df) return df def _parse_chat(text: str, regex: str) -> pd.DataFrame: """Parse chat using given regex. Args: text (str) Whole log chat text. regex (str): Regular expression Returns: pandas.DataFrame: DataFrame with messages sent by users, index is the date the messages was sent. Raises: RegexError: When provided regex could not match the text. """ result = [] headers = list(re.finditer(regex, text)) for i in range(len(headers)): try: line_dict = _parse_line(text, headers, i) except KeyError: raise RegexError("Could not match the provided regex with provided text. No match was found.") result.append(line_dict) df_chat = pd.DataFrame.from_records(result) df_chat = df_chat[[COLNAMES_DF.DATE, COLNAMES_DF.USERNAME, COLNAMES_DF.MESSAGE]] return df_chat def _add_schema(df: pd.DataFrame) -> pd.DataFrame: """Add default chat schema to df. Args: df (pandas.DataFrame): Chat dataframe. Returns: pandas.DataFrame: Chat dataframe with correct dtypes. """ df = df.astype( { COLNAMES_DF.DATE: "datetime64[ns]", COLNAMES_DF.USERNAME: pd.StringDtype(), COLNAMES_DF.MESSAGE: pd.StringDtype(), } ) return df def _parse_line(text: str, headers: List[str], i: int) -> Dict[str, str]: """Get date, username and message from the i:th intervention. Args: text (str): Whole log chat text. headers (list): All headers. i (int): Index denoting the message number. Returns: dict: i:th date, username and message. """ result_ = headers[i].groupdict() if "ampm" in result_: hour = int(result_["hour"]) mode = result_.get("ampm").lower() if hour == 12 and mode == "am": hour = 0 elif hour != 12 and mode == "pm": hour += 12 else: hour = int(result_["hour"]) # Check format of year. If year is 2-digit represented we add 2000 if len(result_["year"]) == 2: year = int(result_["year"]) + 2000 else: year = int(result_["year"]) if "seconds" not in result_: date = datetime( year, int(result_["month"]), int(result_["day"]), hour, int(result_["minutes"]), ) else: date = datetime( year, int(result_["month"]), int(result_["day"]), hour, int(result_["minutes"]), int(result_["seconds"]), ) username = result_[COLNAMES_DF.USERNAME] message = _get_message(text, headers, i) line_dict = { COLNAMES_DF.DATE: date, COLNAMES_DF.USERNAME: username, COLNAMES_DF.MESSAGE: message, } return line_dict def _remove_alerts_from_df(r_x: str, df: pd.DataFrame) -> pd.DataFrame: """Try to get rid of alert/notification messages. Args: r_x (str): Regular expression to detect whatsapp warnings. df (pandas.DataFrame): DataFrame with all interventions. Returns: pandas.DataFrame: Fixed version of input dataframe. """ df_new = df.copy() df_new.loc[:, COLNAMES_DF.MESSAGE] = df_new[COLNAMES_DF.MESSAGE].apply(lambda x: _remove_alerts_from_line(r_x, x)) return df_new def _remove_alerts_from_line(r_x: str, line_df: str) -> str: """Remove line content that is not desirable (automatic alerts etc.). Args: r_x (str): Regula expression to detect WhatsApp warnings. line_df (str): Message sent as string. Returns: str: Cleaned message string. """ if re.search(r_x, line_df): return line_df[: re.search(r_x, line_df).start()] else: return line_df def _get_message(text: str, headers: List[str], i: int) -> str: """Get i:th message from text. Args: text (str): Whole log chat text. headers (list): All headers. i (int): Index denoting the message number. Returns: str: i:th message. """ msg_start = headers[i].end() msg_end = headers[i + 1].start() if i < len(headers) - 1 else headers[i].endpos msg = text[msg_start:msg_end].strip() return msg