Source code for pyobs.modules.utils.telegram

import asyncio
import io
import logging
from enum import Enum
from functools import partial
from inspect import Parameter
from pprint import pprint
from typing import Any, Optional, List

from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.ext import CallbackQueryHandler, CallbackContext

from pyobs.modules import Module
from pyobs.events import LogEvent, Event

log = logging.getLogger(__name__)


class TelegramUserState(Enum):
    IDLE = (0,)
    AUTH = (1,)
    EXEC_MODULE = (2,)
    EXEC_METHOD = (3,)
    EXEC_PARAMS = (4,)
    LOG_LEVEL = 5


class Telegram(Module):
    """A telegram bot."""

    __module__ = "pyobs.modules.utils"

    def __init__(self, token: str, password: str, allow_new_users: bool = True, **kwargs: Any):
        """Initialize a new bot.

        Args:
            token: The telegram API token.
            password: Password for users to log in.
            allow_new_users: Whether new users are allowed to connect.

        """
        Module.__init__(self, **kwargs)
        from telegram.ext import Updater

        # store
        self._token = token
        self._password = password
        self._allow_new_users = allow_new_users
        self._updater: Optional[Updater] = None
        self._message_queue = asyncio.Queue()
        self._loop = None

        # get log levels
        self._log_levels = {
            logging.getLevelName(x): x for x in range(1, 101) if not logging.getLevelName(x).startswith("Level")
        }

        # thread
        self.add_background_task(self._log_sender_thread)

[docs] async def open(self) -> None: """Open module.""" from telegram.ext import CommandHandler, MessageHandler, Filters, Updater await Module.open(self) self._loop = asyncio.get_running_loop() # get dispatcher self._updater = Updater(token=self._token) dispatcher = self._updater.dispatcher # type: ignore # add command handler dispatcher.add_handler(CommandHandler("start", self._command_start)) dispatcher.add_handler(CommandHandler("exec", self._command_exec)) dispatcher.add_handler(CommandHandler("modules", self._command_modules)) dispatcher.add_handler(CommandHandler("loglevel", self._command_loglevel)) # add text handler echo_handler = MessageHandler(Filters.text & (~Filters.command), self._process_message) dispatcher.add_handler(echo_handler) # add callback handler for buttons dispatcher.add_handler(CallbackQueryHandler(self._handle_buttons)) # load storage file try: dispatcher.bot_data["storage"] = await self.vfs.read_yaml("/pyobs/telegram.yaml") except FileNotFoundError: dispatcher.bot_data["storage"] = {} # start polling self._updater.start_polling(poll_interval=0.1) # listen to log events await self.comm.register_event(LogEvent, self._process_log_entry)
[docs] async def close(self) -> None: """Close module.""" await Module.close(self) # stop telegram if self._updater is not None: self._updater.stop()
async def _save_storage(self, context: CallbackContext) -> None: """Save storage file. Args: context: Telegram context. """ await self.vfs.write_yaml("/pyobs/telegram.yaml", context.bot_data["storage"]) @staticmethod def _is_user_authorized(context: CallbackContext, user_id: int) -> bool: """Is user authorized? Args: context: Telegram context. user_id: ID of user. Returns: Whether user is known and authorized to give commands. """ s = context.bot_data["storage"] return "users" in s and user_id in s["users"] def _store_user(self, context: CallbackContext, user_id: int, name: str) -> None: """Store new user in auth database. Args: context: Telegram context. user_id: ID of user. name: Name of user. """ # add user s = context.bot_data["storage"] if "users" not in s: s["users"] = {} s["users"][user_id] = {"name": name, "loglevel": None} asyncio.run_coroutine_threadsafe(self._save_storage(context), self._loop) def _command_start(self, update: Update, context: CallbackContext) -> None: """Handle /start command. Args: update: Message to process. context: Telegram context. """ if context.user_data is None: raise ValueError("No user data in context.") # is user already known? if self._is_user_authorized(context, update.message.from_user.id): # welcome him back context.bot.send_message( chat_id=update.effective_chat.id, text="Welcome back %s!" % update.message.from_user.first_name ) else: # do we allow for new users? if self._allow_new_users: # go to AUTH state context.bot.send_message(chat_id=update.effective_chat.id, text="Password?") context.user_data["state"] = TelegramUserState.AUTH else: # show message context.bot.send_message(chat_id=update.effective_chat.id, text="No new users allowed in the system.") def _command_exec(self, update: Update, context: CallbackContext) -> None: """Handle /exec command. Args: update: Message to process. context: Telegram context. """ if context.user_data is None: raise ValueError("No user data in context.") # not logged in? if not self._is_user_authorized(context, update.message.from_user.id): context.bot.send_message(chat_id=update.effective_chat.id, text="Not logged in, use /start.") return # create buttons for all modules keyboard = [[InlineKeyboardButton(c, callback_data=c)] for c in self.comm.clients] + [ [InlineKeyboardButton("Cancel", callback_data="cancel")] ] reply_markup = InlineKeyboardMarkup(keyboard) update.message.reply_text("Please choose module:", reply_markup=reply_markup) # go to EXEC_MODULE state context.user_data["state"] = TelegramUserState.EXEC_MODULE @staticmethod def _reset_state(context: CallbackContext) -> None: """Reset state.""" if context.user_data is None: raise ValueError("No user data in context.") context.user_data["state"] = TelegramUserState.IDLE context.user_data["method"] = None context.user_data["params"] = [] context.user_data["exec_query"] = None def _handle_buttons(self, update: Update, context: CallbackContext) -> None: """Handle click on buttons. Args: update: Message to process. context: Telegram context. """ asyncio.run_coroutine_threadsafe(self._handle_buttons_async(update, context), self._loop) async def _handle_buttons_async(self, update: Update, context: CallbackContext) -> None: """Handle click on buttons. Args: update: Message to process. context: Telegram context. """ if context.user_data is None: raise ValueError("No user data in context.") # get query query = update.callback_query # not logged in? if not self._is_user_authorized(context, query.from_user.id): query.edit_message_text(text="Not logged in, use /start.") return # CallbackQueries need to be answered, even if no notification to the user is needed # Some clients may have trouble otherwise. See https://core.telegram.org/bots/api#callbackquery query.answer() # cancel? if query.data == "cancel": # reset state self._reset_state(context) # remove markup query.edit_message_text("Canceled.") # what state are we in? if context.user_data["state"] == TelegramUserState.EXEC_MODULE: # get proxy for selected module proxy = await self.proxy(query.data) # show buttons for all modules keyboard = [ [InlineKeyboardButton(m, callback_data="%s.%s" % (query.data, m))] for m in proxy.method_names ] + [[InlineKeyboardButton("Cancel", callback_data="cancel")]] reply_markup = InlineKeyboardMarkup(keyboard) query.edit_message_text(text="Chose method in %s:" % query.data) query.edit_message_reply_markup(reply_markup) # change to EXEC_MEHOD state context.user_data["state"] = TelegramUserState.EXEC_METHOD elif context.user_data["state"] == TelegramUserState.EXEC_METHOD: # init command context.user_data["method"] = query.data context.user_data["params"] = [] context.user_data["exec_query_message"] = update.callback_query.message.message_id context.user_data["exec_query_chat"] = update.callback_query.message.chat_id # show buttons for all methods keyboard = [[InlineKeyboardButton("Cancel", callback_data="cancel")]] reply_markup = InlineKeyboardMarkup(keyboard) query.edit_message_text(text="Executing %s..." % query.data) query.edit_message_reply_markup(reply_markup) # go to EXEC_PARAMS state context.user_data["state"] = TelegramUserState.EXEC_PARAMS # see whether this command runs without parameters self._handle_params(update, context) elif context.user_data["state"] == TelegramUserState.LOG_LEVEL: # set log level s = context.bot_data["storage"] s["users"][query.from_user.id]["loglevel"] = query.data await self._save_storage(context) # change to IDLE state query.edit_message_text(text="Changed log level to %s." % query.data) context.user_data["state"] = TelegramUserState.IDLE def _handle_params(self, update: Update, context: CallbackContext) -> None: """Handle input of params when in EXEC_PARAMS state. Args: update: Message to process. context: Telegram context. """ asyncio.run_coroutine_threadsafe(self._handle_params_async(update, context), self._loop) async def _handle_params_async(self, update: Update, context: CallbackContext) -> None: """Handle input of params when in EXEC_PARAMS state. Args: update: Message to process. context: Telegram context. """ if context.user_data is None: raise ValueError("No user data in context.") # wrong state? if context.user_data["state"] != TelegramUserState.EXEC_PARAMS: return # get proxy and method signature module, method = context.user_data["method"].split(".") proxy = await self.proxy(module) signature = proxy.signature(method) # get list of parameters params = [ (name, param) for name, param in signature.parameters.items() if name not in ["self", "args", "kwargs"] ] # append param, if new one was received if update.message is not None: # get type of new parameter param_type = params[len(context.user_data["params"])][1].annotation # cast it value = param_type(update.message.text) # append it context.user_data["params"].append(value) # got enough params? nparams = len(context.user_data["params"]) if nparams >= len(params): # ID for command? if "ncalls" not in context.user_data: context.user_data["ncalls"] = 0 context.user_data["ncalls"] += 1 call_id = context.user_data["ncalls"] # remove cancel button, and show command msg = "Executing %s..." % context.user_data["method"] context.bot.edit_message_text( text=msg, message_id=context.user_data["exec_query_message"], chat_id=context.user_data["exec_query_chat"], ) # send message command = ( context.user_data["method"] + "(" + ", ".join(['"%s"' % p if isinstance(p, str) else str(p) for p in context.user_data["params"]]) + ")" ) context.bot.send_message(chat_id=update.effective_chat.id, text="Executing #%d:\n%s" % (call_id, command)) # start call asyncio.create_task( self._call_method( context, update.effective_chat.id, call_id, context.user_data["method"], context.user_data["params"] ) ) # reset self._reset_state(context) else: # no, print next one next_param: Parameter = params[nparams][1] # format it message = "Value for " + next_param.name if next_param.annotation is not None: message += ": " + next_param.annotation.__name__ if next_param.default != Parameter.empty: message += " = " + str(next_param.default) message += "?" # send it context.bot.send_message(chat_id=update.effective_chat.id, text=message) async def _call_method( self, context: CallbackContext, chat_id: int, call_id: int, method: str, params: List[Any] ) -> None: """ Args: context: Telegram context. chat_id: Telegram chat ID. call_id: ID of command. method: Method to call as <module>.<method> params: List of parameters. """ # get proxy module, method_name = method.split(".") proxy = await self.proxy(module) # call it func = getattr(proxy, method_name) response = await func(*params) # set message if response is None: message = "Finished #%d." % call_id else: # format message with io.StringIO() as sio: # format response pprint(response, stream=sio, indent=2) message = "Finished #%d:\n%s" % (call_id, sio.getvalue()) # send reply context.bot.send_message(chat_id=chat_id, text=message) def _process_message(self, update: Update, context: CallbackContext) -> None: """Handle normal text messages, e.g. for login or method parameters. Args: update: Telegram message. context: Telegram context. """ if context.user_data is None: raise ValueError("No user data in context.") # what state is user in? if context.user_data["state"] == TelegramUserState.AUTH: # AUTH, so we expect a password. Is it valid? if update.message.text == self._password: # Yes, successful AUTH context.bot.send_message(chat_id=update.effective_chat.id, text="AUTH successful.") context.user_data["state"] = TelegramUserState.IDLE self._store_user(context, update.message.from_user.id, update.message.from_user.first_name) else: context.bot.send_message(chat_id=update.effective_chat.id, text="AUTH failed, try again.") elif context.user_data["state"] == TelegramUserState.EXEC_PARAMS: # we're expecting params, so handle them self._handle_params(update, context) def _command_modules(self, update: Update, context: CallbackContext) -> None: """Handle /modules command that shows list of modules. Args: update: Message to process. context: Telegram context. """ # not logged in? if not self._is_user_authorized(context, update.message.from_user.id): context.bot.send_message(chat_id=update.effective_chat.id, text="Not logged in, use /start.") return # list all modules message = "Available modules:\n" + "\n".join(["- " + c for c in self.comm.clients]) context.bot.send_message(chat_id=update.effective_chat.id, text=message) def _command_loglevel(self, update: Update, context: CallbackContext) -> None: """Handle /loglevel command that sets the log level Args: update: Message to process. context: Telegram context. """ if context.user_data is None: raise ValueError("No user data in context.") # not logged in? if not self._is_user_authorized(context, update.message.from_user.id): context.bot.send_message(chat_id=update.effective_chat.id, text="Not logged in, use /start.") return # set state context.user_data["state"] = TelegramUserState.LOG_LEVEL # get current level s = context.bot_data["storage"] current_level = s["users"][update.message.from_user.id]["loglevel"] # create buttons for all log levels keyboard = [[InlineKeyboardButton(level, callback_data=level)] for level in self._log_levels.keys()] + [ [InlineKeyboardButton("Cancel", callback_data="cancel")] ] reply_markup = InlineKeyboardMarkup(keyboard) update.message.reply_text( "Current log level: %s\nPlease choose new log level:" % current_level, reply_markup=reply_markup ) async def _process_log_entry(self, entry: Event, sender: str) -> bool: """Process a new log entry. Args: entry: The log event. sender: Name of sender. """ if not isinstance(entry, LogEvent): return False # get numerical value for log level level = self._log_levels[entry.level] # build log message message = "(%s) %s: %s" % (entry.level, sender, entry.message) # get storage if self._updater is None: raise ValueError("No update initialised.") s = self._updater.dispatcher.bot_data["storage"] # type: ignore # loop users for user_id, user in s["users"].items(): # get user log level user_level = self._log_levels[user["loglevel"]] if user["loglevel"] in self._log_levels else 100 # is it larger than the log entry level? if level >= user_level: # queue message self._message_queue.put_nowait((user_id, message)) return True async def _log_sender_thread(self) -> None: """Thread for sending messages.""" loop = asyncio.get_running_loop() while True: # get next entry user_id, message = await self._message_queue.get() # send message try: if self._updater is None: raise ValueError("No update initialised.") await loop.run_in_executor(None, partial(self._updater.bot.send_message, chat_id=user_id, text=message)) except Exception: # something went wrong, sleep a little and queue message again await asyncio.sleep(10) await self._message_queue.put((user_id, message)) __all__ = ["Telegram"]