from typing import Tuple, Type import json import asyncio import aiohttp from datetime import datetime from maubot import Plugin from maubot.handlers import command, web from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper from mautrix.types import MessageEvent, MessageType, TextMessageEventContent, RoomID, RoomAlias class Config(BaseProxyConfig): def do_update(self, helper: ConfigUpdateHelper) -> None: helper.copy("spaceapi") helper.copy("poll") helper.copy("messages") helper.copy('command_prefix') class SpaceapiBot(Plugin): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._room: RoomID = None self._update_room = True self._polling_task = None def get_command_prefix(self) -> str: return self.config.get('command_prefix', 'spaceapi') @classmethod def get_config_class(cls) -> Type[BaseProxyConfig]: return Config def on_external_config_update(self) -> None: self.config.load_and_update() self._update_room = True async def start(self) -> None: self.on_external_config_update() await self._get_room() if self._polling_task: self._polling_task.cancel() self._polling_task = asyncio.create_task(self.poll()) async def stop(self) -> None: if self._polling_task: self._polling_task.cancel() self._polling_task = None async def _get_room(self) -> RoomID: if self._room is not None and not self._update_room: return self._room room = self.config['poll']['room'] if room.startswith('#'): if 'resolve_room_alias' in dir(self.client): self._room = (await self.client.resolve_room_alias(RoomAlias(room))).room_id else: self._room = (await self.client.get_room_alias(RoomAlias(room))).room_id else: self._room = RoomID(room) self._update_room = False return self._room async def update_spaceapi_status(self, http: aiohttp.ClientSession) -> Tuple[object, bool]: try: async with http.get(self.config['spaceapi']) as resp: if resp.status != 200: raise RuntimeError() rdata = await resp.text() now = json.loads(rdata) except BaseException: return None, False with open(self.config['poll']['cache'], 'a+') as cache: cache.seek(0) try: pdata = cache.read() previous = json.loads(pdata) except BaseException as e: self.log.exception(e) previous = json.loads(rdata) cache.seek(0) cache.truncate() cache.write(rdata) changed = (now['state']['open'] != previous['state']['open']) return now, changed async def send_spaceapi_update(self, state: object, requester: MessageEvent = None) -> None: if requester: room = requester.room_id else: room = await self._get_room() try: fmt = { 'lastchange': datetime.fromtimestamp(state['state']['lastchange']).strftime('%H:%M'), 'known_users': state.get('state', {}).get('sensors', {}).get('people_now_present', [{}])[0].get('names', ""), 'user_cnt': state.get('state', {}).get('sensors', {}).get('people_now_present', [{}])[0].get('value', 0) } if state['state']['open']: body = self.config['messages']['open'].format(**fmt) else: body = self.config['messages']['closed'].format(**fmt) except BaseException as e: self.log.exception(e) if not requester: return body = self.config['messages']['error'] msg = TextMessageEventContent(msgtype=MessageType.TEXT, body=body) if requester: await requester.reply(msg) else: await self.client.send_message(room, msg) @command.new(name=get_command_prefix) async def cmd(self, evt: MessageEvent): async with aiohttp.ClientSession(read_timeout=15, conn_timeout=5) as http: state, _ = await self.update_spaceapi_status(http) await self.send_spaceapi_update(state, evt) async def poll(self) -> None: async with aiohttp.ClientSession(read_timeout=15, conn_timeout=5) as http: while True: await asyncio.sleep(self.config['poll'].get('interval', 60)) self.log.debug('Polling SpaceAPI endpoint') try: state, changed = await self.update_spaceapi_status(http) if changed: await self.send_spaceapi_update(state) except BaseException as e: self.log.exception(e)