1
//! IPC handling thread/task. Handles communication between [`Mpv`](crate::Mpv) instances and mpv's unix socket
2

            
3
use futures::{SinkExt, StreamExt};
4
use serde_json::{json, Value};
5
use tokio::{
6
    net::UnixStream,
7
    sync::{broadcast, mpsc, oneshot},
8
};
9
use tokio_util::codec::{Framed, LinesCodec};
10

            
11
use crate::{Error, ErrorCode};
12

            
13
/// Container for all state that regards communication with the mpv IPC socket
14
/// and message passing with [`Mpv`](crate::Mpv) controllers.
15
pub(crate) struct MpvIpc {
16
    socket: Framed<UnixStream, LinesCodec>,
17
    command_channel: mpsc::Receiver<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>,
18
    event_channel: broadcast::Sender<MpvIpcEvent>,
19
}
20

            
21
/// Commands that can be sent to [`MpvIpc`]
22
#[derive(Debug, Clone, PartialEq, Eq)]
23
pub(crate) enum MpvIpcCommand {
24
    Command(Vec<String>),
25
    GetProperty(String),
26
    SetProperty(String, Value),
27
    ObserveProperty(isize, String),
28
    UnobserveProperty(isize),
29
    Exit,
30
}
31

            
32
/// [`MpvIpc`]'s response to a [`MpvIpcCommand`].
33
#[derive(Debug, Clone)]
34
pub(crate) struct MpvIpcResponse(pub(crate) Result<Option<Value>, Error>);
35

            
36
/// A deserialized and partially parsed event from mpv.
37
#[derive(Debug, Clone)]
38
pub(crate) struct MpvIpcEvent(pub(crate) Value);
39

            
40
impl MpvIpc {
41
72
    pub(crate) fn new(
42
72
        socket: UnixStream,
43
72
        command_channel: mpsc::Receiver<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>,
44
72
        event_channel: broadcast::Sender<MpvIpcEvent>,
45
72
    ) -> Self {
46
72
        MpvIpc {
47
72
            socket: Framed::new(socket, LinesCodec::new()),
48
72
            command_channel,
49
72
            event_channel,
50
72
        }
51
72
    }
52

            
53
28632
    pub(crate) async fn send_command(&mut self, command: &[Value]) -> Result<Option<Value>, Error> {
54
7158
        let ipc_command = json!({ "command": command });
55
7158
        let ipc_command_str = serde_json::to_string(&ipc_command)
56
7158
            .map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))?;
57

            
58
7158
        log::trace!("Sending command: {}", ipc_command_str);
59

            
60
7158
        self.socket
61
7158
            .send(ipc_command_str)
62
15
            .await
63
7158
            .map_err(|why| Error(ErrorCode::ConnectError(why.to_string())))?;
64

            
65
7154
        let response = loop {
66
7156
            let response = self
67
7156
                .socket
68
7156
                .next()
69
7154
                .await
70
7154
                .ok_or(Error(ErrorCode::MissingValue))?
71
7154
                .map_err(|why| Error(ErrorCode::ConnectError(why.to_string())))?;
72

            
73
7154
            let parsed_response = serde_json::from_str::<Value>(&response)
74
7154
                .map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())));
75
7154

            
76
7154
            if parsed_response
77
7154
                .as_ref()
78
7154
                .ok()
79
7154
                .and_then(|v| v.as_object().map(|o| o.contains_key("event")))
80
7154
                .unwrap_or(false)
81
            {
82
                self.handle_event(parsed_response).await;
83
            } else {
84
7154
                break parsed_response;
85
7154
            }
86
7154
        };
87
7154

            
88
7154
        log::trace!("Received response: {:?}", response);
89

            
90
7154
        parse_mpv_response_data(response?)
91
7156
    }
92

            
93
13688
    pub(crate) async fn get_mpv_property(
94
13688
        &mut self,
95
13688
        property: &str,
96
13688
    ) -> Result<Option<Value>, Error> {
97
3422
        self.send_command(&[json!("get_property"), json!(property)])
98
3429
            .await
99
3421
    }
100

            
101
14924
    pub(crate) async fn set_mpv_property(
102
14924
        &mut self,
103
14924
        property: &str,
104
14924
        value: Value,
105
14924
    ) -> Result<Option<Value>, Error> {
106
3731
        self.send_command(&[json!("set_property"), json!(property), value])
107
3734
            .await
108
3730
    }
109

            
110
8
    pub(crate) async fn observe_property(
111
8
        &mut self,
112
8
        id: isize,
113
8
        property: &str,
114
8
    ) -> Result<Option<Value>, Error> {
115
2
        self.send_command(&[json!("observe_property"), json!(id), json!(property)])
116
3
            .await
117
2
    }
118

            
119
    pub(crate) async fn unobserve_property(&mut self, id: isize) -> Result<Option<Value>, Error> {
120
        self.send_command(&[json!("unobserve_property"), json!(id)])
121
            .await
122
    }
123

            
124
8
    async fn handle_event(&mut self, event: Result<Value, Error>) {
125
2
        match &event {
126
2
            Ok(event) => {
127
2
                log::trace!("Parsed event: {:?}", event);
128
2
                if let Err(broadcast::error::SendError(_)) =
129
2
                    self.event_channel.send(MpvIpcEvent(event.to_owned()))
130
                {
131
                    log::trace!("Failed to send event to channel, ignoring");
132
2
                }
133
            }
134
            Err(e) => {
135
                log::trace!("Error parsing event, ignoring:\n  {:?}\n  {:?}", &event, e);
136
            }
137
        }
138
2
    }
139

            
140
72
    pub(crate) async fn run(mut self) -> Result<(), Error> {
141
7176
        loop {
142
7176
            tokio::select! {
143
              Some(event) = self.socket.next() => {
144
                log::trace!("Got event: {:?}", event);
145

            
146
                let parsed_event = event
147
                    .map_err(|why| Error(ErrorCode::ConnectError(why.to_string())))
148
                    .and_then(|event|
149
2
                        serde_json::from_str::<Value>(&event)
150
2
                        .map_err(|why| Error(ErrorCode::JsonParseError(why.to_string()))));
151

            
152
                self.handle_event(parsed_event).await;
153
              }
154
              Some((cmd, tx)) = self.command_channel.recv() => {
155
                  log::trace!("Handling command: {:?}", cmd);
156
                  match cmd {
157
                      MpvIpcCommand::Command(command) => {
158
3
                          let refs = command.iter().map(|s| json!(s)).collect::<Vec<Value>>();
159
                          let response = self.send_command(refs.as_slice()).await;
160
                          tx.send(MpvIpcResponse(response)).unwrap()
161
                      }
162
                      MpvIpcCommand::GetProperty(property) => {
163
                          let response = self.get_mpv_property(&property).await;
164
                          tx.send(MpvIpcResponse(response)).unwrap()
165
                      }
166
                      MpvIpcCommand::SetProperty(property, value) => {
167
                          let response = self.set_mpv_property(&property, value).await;
168
                          tx.send(MpvIpcResponse(response)).unwrap()
169
                      }
170
                      MpvIpcCommand::ObserveProperty(id, property) => {
171
                          let response = self.observe_property(id, &property).await;
172
                          tx.send(MpvIpcResponse(response)).unwrap()
173
                      }
174
                      MpvIpcCommand::UnobserveProperty(id) => {
175
                          let response = self.unobserve_property(id).await;
176
                          tx.send(MpvIpcResponse(response)).unwrap()
177
                      }
178
                      MpvIpcCommand::Exit => {
179
                        tx.send(MpvIpcResponse(Ok(None))).unwrap();
180
                        return Ok(());
181
                      }
182
                  }
183
              }
184
7176
            }
185
7176
        }
186
    }
187
}
188

            
189
/// This function does the most basic JSON parsing and error handling
190
/// for status codes and errors that all responses from mpv are
191
/// expected to contain.
192
28616
fn parse_mpv_response_data(value: Value) -> Result<Option<Value>, Error> {
193
28616
    log::trace!("Parsing mpv response data: {:?}", value);
194
28616
    let result = value
195
28616
        .as_object()
196
28616
        .map(|o| (o.get("error").and_then(|e| e.as_str()), o.get("data")))
197
28616
        .ok_or(Error(ErrorCode::UnexpectedValue))
198
28616
        .and_then(|(error, data)| match error {
199
28616
            Some("success") => Ok(data),
200
540
            Some(e) => Err(Error(ErrorCode::MpvError(e.to_string()))),
201
            None => Err(Error(ErrorCode::UnexpectedValue)),
202
28616
        });
203
28616
    match &result {
204
28076
        Ok(v) => log::trace!("Successfully parsed mpv response data: {:?}", v),
205
540
        Err(e) => log::trace!("Error parsing mpv response data: {:?}", e),
206
    }
207
28616
    result.map(|opt| opt.cloned())
208
28616
}