1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
//! [PUT /_matrix/app/v1/transactions/{txnId}](https://matrix.org/docs/spec/application_service/r0.1.2#put-matrix-app-v1-transactions-txnid)

use ruma_api::ruma_api;
use ruma_events::AnyRoomEvent;
use ruma_serde::Raw;

ruma_api! {
    metadata: {
        description: "This API is called by the homeserver when it wants to push an event (or batch of events) to the application service.",
        method: PUT,
        name: "push_events",
        path: "/_matrix/app/v1/transactions/:txn_id",
        rate_limited: false,
        authentication: QueryOnlyAccessToken,
    }

    request: {
        /// The transaction ID for this set of events.
        ///
        /// Homeservers generate these IDs and they are used to ensure idempotency of results.
        #[ruma_api(path)]
        pub txn_id: &'a str,

        /// A list of events.
        pub events: &'a [Raw<AnyRoomEvent>],
    }

    #[derive(Default)]
    response: {}
}

impl<'a> Request<'a> {
    /// Creates a new `Request` with the given transaction ID and list of events.
    pub fn new(txn_id: &'a str, events: &'a [Raw<AnyRoomEvent>]) -> Self {
        Self { txn_id, events }
    }
}

impl IncomingRequest {
    /// Creates an `IncomingRequest` with the given transaction ID and list of events.
    pub fn new(txn_id: String, events: Vec<Raw<AnyRoomEvent>>) -> IncomingRequest {
        IncomingRequest { txn_id, events }
    }

    /// Consumes the `IncomingRequest` and tries to convert it to a `sync_events::Response`
    ///
    /// This is a helper conversion in cases where it's easier to work with `sync_events::Response`
    /// instead of the original `push_events::IncomingRequest`. It puts all events with a `room_id`
    /// into the `JoinedRoom`'s `timeline`. The rationale behind that is that incoming Appservice
    /// transactions from the homeserver are not necessarily bound to a specific user but can cover
    /// a multitude of namespaces, and as such the Appservice basically only "observes joined
    /// rooms".
    ///
    /// Note: Currently homeservers only push PDUs to appservices, no EDUs. There's the open
    /// [MSC2409] regarding supporting EDUs in the future, though it seems to be planned to put
    /// EDUs into a different JSON key than `events` to stay backwards compatible.
    ///
    /// [MSC2409]: https://github.com/matrix-org/matrix-doc/pull/2409
    #[cfg(feature = "helper")]
    pub fn try_into_sync_response(
        self,
        next_batch: impl Into<String>,
    ) -> serde_json::Result<ruma_client_api::r0::sync::sync_events::Response> {
        use ruma_client_api::r0::sync::sync_events;
        use ruma_identifiers::RoomId;
        use serde::Deserialize;
        use tracing::warn;

        #[derive(Debug, Deserialize)]
        struct EventDeHelper {
            room_id: Option<Box<RoomId>>,
        }

        let mut response = sync_events::Response::new(next_batch.into());

        for raw_event in self.events {
            let helper = raw_event.deserialize_as::<EventDeHelper>()?;
            let event_json = Raw::into_json(raw_event);

            if let Some(room_id) = helper.room_id {
                let join = response.rooms.join.entry(room_id).or_default();
                join.timeline.events.push(Raw::from_json(event_json));
            } else {
                warn!("Event without room_id: {}", event_json);
            }
        }

        Ok(response)
    }
}

impl Response {
    /// Creates an empty `Response`.
    pub fn new() -> Self {
        Self {}
    }
}

#[cfg(feature = "helper")]
#[cfg(test)]
mod helper_tests {
    use super::{AnyRoomEvent, IncomingRequest, Raw};
    use ruma_client_api::r0::sync::sync_events;
    use ruma_identifiers::room_id;
    use serde_json::json;

    #[test]
    fn convert_incoming_request_to_sync_response() {
        let txn_id = "any_txn_id".to_owned();
        let state_event: AnyRoomEvent = serde_json::from_value(json!({
            "content": {},
            "event_id": "$h29iv0s8:example.com",
            "origin_server_ts": 1,
            "room_id": "!roomid:room.com",
            "sender": "@carl:example.com",
            "state_key": "",
            "type": "m.room.name"
        }))
        .unwrap();
        let message_event: AnyRoomEvent = serde_json::from_value(json!({
            "type": "m.room.message",
            "event_id": "$143273582443PhrSn:example.com",
            "origin_server_ts": 1,
            "room_id": "!roomid:room.com",
            "sender": "@user:example.com",
            "content": {
                "body": "test",
                "msgtype": "m.audio",
                "url": "mxc://example.com/AuDi0",
            }
        }))
        .unwrap();

        let events = vec![Raw::new(&state_event).unwrap(), Raw::new(&message_event).unwrap()];
        let incoming_request = IncomingRequest { txn_id: txn_id.clone(), events };

        let response: sync_events::Response =
            incoming_request.try_into_sync_response(txn_id).unwrap();

        let response_rooms_join =
            response.rooms.join.get(room_id!("!roomid:room.com")).expect("joined room response");

        assert_eq!(response_rooms_join.timeline.events.len(), 2);
    }
}

#[cfg(feature = "server")]
#[cfg(test)]
mod tests {
    use ruma_api::{exports::http, OutgoingRequest, SendAccessToken};
    use ruma_events::AnyRoomEvent;
    use ruma_serde::Raw;
    use serde_json::json;

    use super::Request;

    #[test]
    fn decode_request_contains_events_field() {
        let dummy_event: AnyRoomEvent = serde_json::from_value(json!({
            "type": "m.room.message",
            "event_id": "$143273582443PhrSn:example.com",
            "origin_server_ts": 1,
            "room_id": "!roomid:room.com",
            "sender": "@user:example.com",
            "content": {
                "body": "test",
                "msgtype": "m.text",
            },
        }))
        .unwrap();
        let dummy_event = Raw::new(&dummy_event).unwrap();
        let events = vec![dummy_event];

        let req: http::Request<Vec<u8>> = Request { events: &events, txn_id: "any_txn_id" }
            .try_into_http_request(
                "https://homeserver.tld",
                SendAccessToken::IfRequired("auth_tok"),
            )
            .unwrap();
        let json_body: serde_json::Value = serde_json::from_slice(req.body()).unwrap();

        assert_eq!(
            1,
            json_body.as_object().unwrap().get("events").unwrap().as_array().unwrap().len()
        );
    }
}