# Copyright (c) 2018 Tulir Asokan
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at
from typing import Optional, Union, Pattern
from html import escape
import warnings
import re

from attr import dataclass
import attr

from .....api import JSON
from ..util import SerializableEnum, SerializableAttrs, Serializable, Obj, deserializer, serializer
from ..primitive import ContentURI, EventID
from .base import BaseRoomEvent, BaseUnsigned

# region Message types

class Format(SerializableEnum):
    """A message format. Currently only ``org.matrix.custom.html`` is available.
    This will probably be deprecated when extensible events are implemented."""
    HTML = "org.matrix.custom.html"

TEXT_MESSAGE_TYPES = ("m.text", "m.emote", "m.notice")
MEDIA_MESSAGE_TYPES = ("m.image", "m.sticker", "", "", "m.file")

class MessageType(SerializableEnum):
    """A message type."""
    TEXT = "m.text"
    EMOTE = "m.emote"
    NOTICE = "m.notice"
    IMAGE = "m.image"
    STICKER = "m.sticker"
    VIDEO = ""
    AUDIO = ""
    FILE = "m.file"
    LOCATION = "m.location"

    def is_text(self) -> bool:
        return self.value in TEXT_MESSAGE_TYPES

    def is_media(self) -> bool:
        return self.value in MEDIA_MESSAGE_TYPES

# endregion
# region Relations

[docs]class InReplyTo: def __init__(self, event_id: Optional[EventID] = None, proxy_target: Optional['RelatesTo'] = None) -> None: self._event_id = event_id self._proxy_target = proxy_target @property def event_id(self) -> EventID: if self._proxy_target: return self._proxy_target.event_id return self._event_id @event_id.setter def event_id(self, event_id: EventID) -> None: if self._proxy_target: self._proxy_target.rel_type = RelationType.REFERENCE self._proxy_target.event_id = event_id else: self._event_id = event_id
class RelationType(SerializableEnum): ANNOTATION = "m.annotation" REFERENCE = "m.reference" REPLACE = "m.replace"
[docs]@dataclass class RelatesTo(Serializable): """Message relations. Used for reactions, edits and replies.""" rel_type: RelationType = None event_id: Optional[EventID] = None key: Optional[str] = None
[docs] @classmethod def deserialize(cls, data: JSON) -> Optional['RelatesTo']: if not data: return None try: return cls(rel_type=RelationType.deserialize(data["rel_type"]), event_id=data.get("event_id", None), key=data.get("key", None)) except KeyError: pass try: return cls(rel_type=RelationType.REFERENCE, event_id=data["m.in_reply_to"]["event_id"]) except KeyError: pass return None
[docs] def serialize(self) -> JSON: data = { "rel_type": self.rel_type.serialize(), } if self.rel_type == RelationType.REFERENCE: data["m.in_reply_to"] = { "event_id": self.event_id } if self.event_id: data["event_id"] = self.event_id if self.key: data["key"] = self.key return data
# endregion # region Base event content
[docs]class BaseMessageEventContentFuncs: """Base class for the contents of all message-type events (currently and m.sticker). Contains relation helpers.""" body: str _relates_to: Optional[RelatesTo]
[docs] def set_reply(self, in_reply_to: 'MessageEvent') -> None: self.relates_to.rel_type = RelationType.REFERENCE self.relates_to.event_id = in_reply_to.event_id
[docs] def set_edit(self, edits: 'MessageEvent') -> None: self.relates_to.rel_type = RelationType.REPLACE self.relates_to.event_id = edits.event_id
@property def relates_to(self) -> RelatesTo: if self._relates_to is None: self._relates_to = RelatesTo() return self._relates_to @relates_to.setter def relates_to(self, relates_to: RelatesTo) -> None: self._relates_to = relates_to
[docs] def get_reply_to(self) -> Optional[EventID]: if self._relates_to and self._relates_to.rel_type == RelationType.REFERENCE: return self._relates_to.event_id return None
[docs] def get_edit(self) -> Optional[EventID]: if self._relates_to and self._relates_to.rel_type == RelationType.REPLACE: return self._relates_to.event_id return None
[docs] def trim_reply_fallback(self) -> None: pass
[docs]@dataclass class BaseMessageEventContent(BaseMessageEventContentFuncs): """Base event content for all events.""" msgtype: MessageType = None body: str = ""
# endregion # region Media info
[docs]@dataclass class BaseFileInfo(SerializableAttrs['BaseFileInfo']): mimetype: str = None size: int = None
[docs]@dataclass class ThumbnailInfo(BaseFileInfo, SerializableAttrs['ThumbnailInfo']): """Information about the thumbnail for a document, video, image or location.""" height: int = attr.ib(default=None, metadata={"json": "h"}) width: int = attr.ib(default=None, metadata={"json": "w"})
[docs]@dataclass class FileInfo(BaseFileInfo, SerializableAttrs['FileInfo']): """Information about a document message.""" thumbnail_info: ThumbnailInfo = None thumbnail_url: ContentURI = None
[docs]@dataclass class ImageInfo(FileInfo, SerializableAttrs['ImageInfo']): """Information about an image message.""" height: int = attr.ib(default=None, metadata={"json": "h"}) width: int = attr.ib(default=None, metadata={"json": "w"})
[docs]@dataclass class VideoInfo(ImageInfo, SerializableAttrs['VideoInfo']): """Information about a video message.""" duration: int = None
[docs]@dataclass class AudioInfo(BaseFileInfo, SerializableAttrs['AudioInfo']): """Information about an audio message.""" duration: int = None
MediaInfo = Union[ImageInfo, VideoInfo, AudioInfo, FileInfo, Obj]
[docs]@dataclass class LocationInfo(SerializableAttrs['LocationInfo']): """Information about a location message.""" thumbnail_url: ContentURI = None thumbnail_info: ThumbnailInfo = None
# endregion # region Event content
[docs]@dataclass class MediaMessageEventContent(BaseMessageEventContent, SerializableAttrs['MediaMessageEventContent']): """The content of a media message event (m.image,,, m.file)""" url: ContentURI = None info: Optional[MediaInfo] = None _relates_to: Optional[RelatesTo] = attr.ib(default=None, metadata={"json": "m.relates_to"})
[docs] @staticmethod @deserializer(MediaInfo) def deserialize_info(data: JSON) -> MediaInfo: if not isinstance(data, dict): return Obj() msgtype = data.pop("__mautrix_msgtype", None) if msgtype == "m.image" or msgtype == "m.sticker": return ImageInfo.deserialize(data) elif msgtype == "": return VideoInfo.deserialize(data) elif msgtype == "": return AudioInfo.deserialize(data) elif msgtype == "m.file": return FileInfo.deserialize(data) else: return Obj(**data)
[docs]@dataclass class LocationMessageEventContent(BaseMessageEventContent, SerializableAttrs['LocationMessageEventContent']): geo_uri: str = None info: LocationInfo = None _relates_to: Optional[RelatesTo] = attr.ib(default=None, metadata={"json": "m.relates_to"})
html_reply_fallback_regex: Pattern = re.compile("^<mx-reply>" r"[\s\S]+?" "</mx-reply>")
[docs]@dataclass class TextMessageEventContent(BaseMessageEventContent, SerializableAttrs['TextMessageEventContent']): """The content of a text message event (m.text, m.notice, m.emote)""" format: Format = None formatted_body: str = None _relates_to: Optional[RelatesTo] = attr.ib(default=None, metadata={"json": "m.relates_to"})
[docs] def set_reply(self, in_reply_to: 'MessageEvent') -> None: super().set_reply(in_reply_to) if not self.formatted_body or len(self.formatted_body) == 0 or self.format != Format.HTML: self.format = Format.HTML self.formatted_body = escape(self.body) self.formatted_body = in_reply_to.make_reply_fallback_html() + self.formatted_body self.body = in_reply_to.make_reply_fallback_text() + self.body
[docs] def trim_reply_fallback(self) -> None: if self.get_reply_to(): self._trim_reply_fallback_text() self._trim_reply_fallback_html()
def _trim_reply_fallback_text(self) -> None: if not self.body.startswith("> ") or "\n" not in self.body: return lines = self.body.split("\n") while len(lines) > 0 and lines[0].startswith("> "): lines.pop(0) # Pop extra newline at end of fallback lines.pop(0) self.body = "\n".join(lines) def _trim_reply_fallback_html(self) -> None: if self.formatted_body and self.format == Format.HTML: self.formatted_body = html_reply_fallback_regex.sub("", self.formatted_body)
MessageEventContent = Union[TextMessageEventContent, MediaMessageEventContent, LocationMessageEventContent, Obj] # endregion
[docs]@dataclass class MessageUnsigned(BaseUnsigned, SerializableAttrs['MessageUnsigned']): """Unsigned information sent with message events.""" transaction_id: str = None
html_reply_fallback_format = ("<mx-reply><blockquote>" "<a href='{room_id}/{event_id}'>In reply to</a> " "<a href='{sender}'>{displayname}</a><br/>" "{content}" "</blockquote></mx-reply>") media_reply_fallback_body_map = { MessageType.IMAGE: "an image", MessageType.STICKER: "a sticker", MessageType.AUDIO: "audio", MessageType.VIDEO: "a video", MessageType.FILE: "a file", MessageType.LOCATION: "a location", }
[docs]@dataclass class MessageEvent(BaseRoomEvent, SerializableAttrs['MessageEvent']): """An event""" content: MessageEventContent unsigned: Optional[MessageUnsigned] = None
[docs] @staticmethod @deserializer(MessageEventContent) def deserialize_content(data: JSON) -> MessageEventContent: if not isinstance(data, dict): return Obj() rel = (data.get("m.relates_to", None) or {}) if rel.get("rel_type", None) == RelationType.REPLACE.value: data = data.get("m.new_content", data) data["m.relates_to"] = rel msgtype = data.get("msgtype", None) if msgtype in TEXT_MESSAGE_TYPES: return TextMessageEventContent.deserialize(data) elif msgtype in MEDIA_MESSAGE_TYPES: data.get("info", {})["__mautrix_msgtype"] = msgtype return MediaMessageEventContent.deserialize(data) elif msgtype == "m.location": return LocationMessageEventContent.deserialize(data) else: return Obj(**data)
[docs] @staticmethod @serializer(MessageEventContent) def serialize_content(content: MessageEventContent) -> JSON: data = content.serialize() evt = content.get_edit() if evt: new_content = {**data} del new_content["m.relates_to"] data["m.new_content"] = new_content if "body" in data: data["body"] = f"* {data['body']}" if "formatted_body" in data: data["formatted_body"] = f"* {data['formatted_body']}" return data
[docs] def make_reply_fallback_html(self) -> str: """Generate the HTML fallback for messages replying to this event.""" if self.content.msgtype.is_text: body = self.content.formatted_body or escape(self.content.body) else: sent_type = media_reply_fallback_body_map[self.content.msgtype] or "a message" body = f"sent {sent_type}" displayname = self.sender return html_reply_fallback_format.format(room_id=self.room_id, event_id=self.event_id, sender=self.sender, displayname=displayname, content=body)
[docs] def make_reply_fallback_text(self) -> str: """Generate the plaintext fallback for messages replying to this event.""" if self.content.msgtype.is_text: body = self.content.body else: body = media_reply_fallback_body_map[self.content.msgtype] lines = body.strip().split("\n") first_line, lines = lines[0], lines[1:] displayname = self.sender fallback_text = f"> <{displayname}> {first_line}" for line in lines: fallback_text += f"\n> {line}" fallback_text += "\n\n" return fallback_text