1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
pub mod response;

pub use crate::client::response::*;

use futures::stream::StreamExt;
use http::header::{AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE, RETRY_AFTER};
use hyper::{
    client::{Client as HttpClient, HttpConnector},
    Body, Request, StatusCode,
};
use hyper_tls::{self, HttpsConnector};
use crate::message::Message;
use serde_json;
use std::future::Future;

/// An async client for sending the notification payload.
pub struct Client {
    http_client: HttpClient<HttpsConnector<HttpConnector>>,
}

impl Client {
    /// Get a new instance of Client.
    pub fn new() -> Client {
        let mut http_client = HttpClient::builder();
        http_client.keep_alive(true);

        Client {
            http_client: http_client.build(HttpsConnector::new()),
        }
    }

    /// Try sending a `Message` to FCM.
    pub fn send(&self, message: Message<'_>) -> impl Future<Output = Result<FcmResponse, FcmError>> + 'static {
        let payload = serde_json::to_vec(&message.body).unwrap();

        let builder = Request::builder()
            .method("POST")
            .header(CONTENT_TYPE, "application/json")
            .header(
                CONTENT_LENGTH,
                format!("{}", payload.len() as u64).as_bytes(),
            )
            .header(AUTHORIZATION, format!("key={}", message.api_key).as_bytes())
            .uri("https://fcm.googleapis.com/fcm/send");

        let request = builder.body(Body::from(payload)).unwrap();
        let requesting = self.http_client.request(request);

        async move {
            let response = requesting.await?;
            let response_status = response.status();

            let retry_after = response
                .headers()
                .get(RETRY_AFTER)
                .and_then(|ra| ra.to_str().ok())
                .and_then(|ra| RetryAfter::from_str(ra));

            let content_length: usize = response
                .headers()
                .get(CONTENT_LENGTH)
                .and_then(|s| s.to_str().ok())
                .and_then(|s| s.parse().ok())
                .unwrap_or(0);

            let mut body: Vec<u8> = Vec::with_capacity(content_length);
            let mut chunks = response.into_body();

            while let Some(chunk) = chunks.next().await {
                body.extend_from_slice(&chunk?);
            }

            match response_status {
                StatusCode::OK => {
                    let fcm_response: FcmResponse = serde_json::from_slice(&body).unwrap();

                    match fcm_response.error {
                        Some(ErrorReason::Unavailable) => {
                            Err(response::FcmError::ServerError(retry_after))
                        }
                        Some(ErrorReason::InternalServerError) => {
                            Err(response::FcmError::ServerError(retry_after))
                        }
                        _ => Ok(fcm_response),
                    }
                }
                StatusCode::UNAUTHORIZED => Err(response::FcmError::Unauthorized),
                StatusCode::BAD_REQUEST => Err(response::FcmError::InvalidMessage(
                    "Bad Request".to_string(),
                )),
                status if status.is_server_error() => {
                    Err(response::FcmError::ServerError(retry_after))
                }
                _ => Err(response::FcmError::InvalidMessage(
                    "Unknown Error".to_string(),
                )),
            }
        }
    }
}