Source code for whatstk.whatsapp.parser

"""Parser utils."""

import os
import re
from datetime import datetime
from urllib.request import urlopen

import pandas as pd

from whatstk.utils.exceptions import RegexError, HFormatError
from whatstk.utils.utils import COLNAMES_DF
from whatstk.whatsapp.auto_header import extract_header_from_text

regex_simplifier = {
    '%Y': r'(?P<year>\d{2,4})',
    '%y': r'(?P<year>\d{2,4})',
    '%m': r'(?P<month>\d{1,2})',
    '%d': r'(?P<day>\d{1,2})',
    '%H': r'(?P<hour>\d{1,2})',
    '%I': r'(?P<hour>\d{1,2})',
    '%M': r'(?P<minutes>\d{2})',
    '%S': r'(?P<seconds>\d{2})',
    '%P': r'(?P<ampm>[AaPp].? ?[Mm].?)',
    '%p': r'(?P<ampm>[AaPp].? ?[Mm].?)',
    '%name': fr'(?P<{COLNAMES_DF.USERNAME}>[^:]*)'

[docs]def df_from_txt_whatsapp(filepath, auto_header=True, hformat=None, encoding='utf-8'): """Load chat as a DataFrame. Args: filepath (str): Path to the file. Accepted sources are: * Local file, e.g. 'path/to/file.txt'. * URL to a remote hosted file, e.g. ''. * Link to Google Drive file, e.g. 'gdrive://35gKKrNk-i3t05zPLyH4_P1rPdOmKW9NZ'. The format is expected to be 'gdrive://[FILE-ID]'. Note that in order to load a file from Google Drive you first need to run :func:`gdrive_init <whatstk.utils.gdrive.gdrive_init>`. auto_header (bool, optional): Detect header automatically. If False, ``hformat`` is required. hformat (str, optional): :ref:`Format of the header <The header format>`, e.g. ``'[%y-%m-%d %H:%M:%S] - %name:'``. Use following keywords: - ``'%y'``: for year (``'%Y'`` is equivalent). - ``'%m'``: for month. - ``'%d'``: for day. - ``'%H'``: for 24h-hour. - ``'%I'``: for 12h-hour. - ``'%M'``: for minutes. - ``'%S'``: for seconds. - ``'%P'``: for "PM"/"AM" or "p.m."/"a.m." characters. - ``'%name'``: for the username. Example 1: For the header '12/08/2016, 16:20 - username:' we have the ``'hformat='%d/%m/%y, %H:%M - %name:'``. Example 2: For the header '2016-08-12, 4:20 PM - username:' we have ``hformat='%y-%m-%d, %I:%M %P - %name:'``. encoding (str, optional): Encoding to use for UTF when reading/writing (ex. 'utf-8'). `List of Python standard encodings < html#standard-encodings>`_. Returns: WhatsAppChat: Class instance with loaded and parsed chat. .. seealso:: * :func:`WhatsAppChat.from_source <whatstk.whatsapp.objects.WhatsAppChat.from_source>` * :func:`extract_header_from_text <whatstk.whatsapp.auto_header.extract_header_from_text>` * :func:`gdrive_init <whatstk.utils.gdrive.gdrive_init>` """ # Read local file text = _str_from_txt(filepath, encoding) # Build dataframe df = _df_from_str(text, auto_header, hformat) return df
[docs]def generate_regex(hformat): r"""Generate regular expression from hformat. Args: hformat (str): Simplified syntax for the header, e.g. ``'%y-%m-%d, %H:%M:%S - %name:'``. Returns: str: Regular expression corresponding to the specified syntax. Example: Generate regular expression corresponding to ``'hformat=%y-%m-%d, %H:%M:%S - %name:'``. .. code-block:: python >>> from whatstk.whatsapp.parser import generate_regex >>> generate_regex('%y-%m-%d, %H:%M:%S - %name:') ('(?P<year>\\d{2,4})-(?P<month>\\d{1,2})-(?P<day>\\d{1,2}), (?P<hour>\\d{1,2}):(?P<minutes>\\d{2}):(? P<seconds>\\d{2}) - (?P<username>[^:]*): ', '(?P<year>\\d{2,4})-(?P<month>\\d{1,2})-(?P<day>\\d{1,2}), (? P<hour>\\d{1,2}):(?P<minutes>\\d{2}):(?P<seconds>\\d{2}) - ') """ items = re.findall(r'\%\w*', hformat) for i in items: hformat = hformat.replace(i, regex_simplifier[i]) hformat = hformat + ' ' hformat_x = hformat.split('(?P<username>[^:]*)')[0] return hformat, hformat_x
def _str_from_txt(filepath: str, encoding='utf-8'): """Read text content as string. Args: filepath (str): Path to file. Accepted: local file, url (http://...), Google Drive file (gdrive://[file-id]). encoding (str, optional): Encoding to use for UTF when reading/writing (ex. ‘utf-8’). `List of Python standard encodings < html#standard-encodings>`_. Raises: FileNotFoundError: [description] Returns: str: File content as a string. """ # Read local file if os.path.isfile(filepath) and os.access(filepath, os.R_OK): with open(filepath, 'r', encoding=encoding) as f: text = # Read file from URL elif filepath.lower().startswith('http'): with urlopen(filepath) as response: # noqa text = text = text.decode(encoding) elif filepath.startswith("gdrive"): from whatstk.utils.gdrive import _load_str_from_file_id file_id = filepath.replace("gdrive://", "") text = _load_str_from_file_id(file_id) else: raise FileNotFoundError(f"File {filepath} was not found locally or remotely. Please check it exists.") return text def _df_from_str(text, auto_header=True, hformat=None): # Get hformat if hformat: # Bracket is reserved character in RegEx, add backslash before them. hformat = hformat.replace('[', r'\[').replace(']', r'\]') if not hformat and auto_header: hformat = extract_header_from_text(text) if not hformat: raise RuntimeError("Header automatic extraction failed. Please specify the format manually by setting" " input argument `hformat`. Report this issue so that automatic header detection support" " for your header format is added:") elif not (hformat or auto_header): raise ValueError("If auto_header is False, hformat can't be None.") # Generate regex for given hformat r, r_x = generate_regex(hformat=hformat) # Parse chat to DataFrame try: df = _parse_chat(text, r) except RegexError: raise HFormatError("hformat '{}' did not match the provided text. No match was found".format(hformat)) df = _remove_alerts_from_df(r_x, df) df = _add_schema(df) return df def _parse_chat(text, regex): """Parse chat using given regex. Args: text (str) Whole log chat text. regex (str): Regular expression Returns: pandas.DataFrame: DataFrame with messages sent by users, index is the date the messages was sent. Raises: RegexError: When provided regex could not match the text. """ result = [] headers = list(re.finditer(regex, text)) for i in range(len(headers)): try: line_dict = _parse_line(text, headers, i) except KeyError: raise RegexError("Could not match the provided regex with provided text. No match was found.") result.append(line_dict) df_chat = pd.DataFrame.from_records(result) df_chat = df_chat[[COLNAMES_DF.DATE, COLNAMES_DF.USERNAME, COLNAMES_DF.MESSAGE]] return df_chat def _add_schema(df): """Add default chat schema to df. Args: df (pandas.DataFrame): Chat dataframe. Returns: pandas.DataFrame: Chat dataframe with correct dtypes. """ df = df.astype({ COLNAMES_DF.DATE: 'datetime64[ns]', COLNAMES_DF.USERNAME: pd.StringDtype(), COLNAMES_DF.MESSAGE: pd.StringDtype() }) return df def _parse_line(text, headers, i): """Get date, username and message from the i:th intervention. Args: text (str): Whole log chat text. headers (list): All headers. i (int): Index denoting the message number. Returns: dict: i:th date, username and message. """ result_ = headers[i].groupdict() if 'ampm' in result_: hour = int(result_['hour']) mode = result_.get('ampm').lower() if hour == 12 and mode == 'am': hour = 0 elif hour != 12 and mode == 'pm': hour += 12 else: hour = int(result_['hour']) # Check format of year. If year is 2-digit represented we add 2000 if len(result_['year']) == 2: year = int(result_['year']) + 2000 else: year = int(result_['year']) if 'seconds' not in result_: date = datetime(year, int(result_['month']), int(result_['day']), hour, int(result_['minutes'])) else: date = datetime(year, int(result_['month']), int(result_['day']), hour, int(result_['minutes']), int(result_['seconds'])) username = result_[COLNAMES_DF.USERNAME] message = _get_message(text, headers, i) line_dict = { COLNAMES_DF.DATE: date, COLNAMES_DF.USERNAME: username, COLNAMES_DF.MESSAGE: message } return line_dict def _remove_alerts_from_df(r_x, df): """Try to get rid of alert/notification messages. Args: r_x (str): Regular expression to detect whatsapp warnings. df (pandas.DataFrame): DataFrame with all interventions. Returns: pandas.DataFrame: Fixed version of input dataframe. """ df_new = df.copy() df_new.loc[:, COLNAMES_DF.MESSAGE] = df_new[COLNAMES_DF.MESSAGE].apply(lambda x: _remove_alerts_from_line(r_x, x)) return df_new def _remove_alerts_from_line(r_x, line_df): """Remove line content that is not desirable (automatic alerts etc.). Args: r_x (str): Regula expression to detect WhatsApp warnings. line_df (str): Message sent as string. Returns: str: Cleaned message string. """ if, line_df): return line_df[, line_df).start()] else: return line_df def _get_message(text, headers, i): """Get i:th message from text. Args: text (str): Whole log chat text. headers (list): All headers. i (int): Index denoting the message number. Returns: str: i:th message. """ msg_start = headers[i].end() msg_end = headers[i + 1].start() if i < len(headers) - 1 else headers[i].endpos msg = text[msg_start:msg_end].strip() return msg