openzeppelin_relayer/repositories/
notification.rs

1//! This module defines an in-memory notification repository for managing
2//! notifications. It provides functionality to create, retrieve, and list
3//! notifications, while update and delete operations are not supported.
4//! The repository is implemented using a `Mutex`-protected `HashMap` to
5//! ensure thread safety in asynchronous contexts. Additionally, it includes
6//! conversion implementations for `NotificationFileConfig` to `NotificationRepoModel`.
7use crate::{
8    config::{NotificationFileConfig, NotificationFileConfigType},
9    models::{NotificationRepoModel, NotificationType as ModelNotificationType, RepositoryError},
10    repositories::*,
11};
12use async_trait::async_trait;
13use std::collections::HashMap;
14use tokio::sync::{Mutex, MutexGuard};
15
16#[derive(Debug)]
17pub struct InMemoryNotificationRepository {
18    store: Mutex<HashMap<String, NotificationRepoModel>>,
19}
20
21#[allow(dead_code)]
22impl InMemoryNotificationRepository {
23    pub fn new() -> Self {
24        Self {
25            store: Mutex::new(HashMap::new()),
26        }
27    }
28
29    async fn acquire_lock<T>(lock: &Mutex<T>) -> Result<MutexGuard<T>, RepositoryError> {
30        Ok(lock.lock().await)
31    }
32}
33
34impl Default for InMemoryNotificationRepository {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40#[async_trait]
41impl Repository<NotificationRepoModel, String> for InMemoryNotificationRepository {
42    async fn create(
43        &self,
44        notification: NotificationRepoModel,
45    ) -> Result<NotificationRepoModel, RepositoryError> {
46        let mut store = Self::acquire_lock(&self.store).await?;
47        if store.contains_key(&notification.id) {
48            return Err(RepositoryError::ConstraintViolation(format!(
49                "Notification with ID {} already exists",
50                notification.id
51            )));
52        }
53        store.insert(notification.id.clone(), notification.clone());
54        Ok(notification)
55    }
56
57    async fn get_by_id(&self, id: String) -> Result<NotificationRepoModel, RepositoryError> {
58        let store = Self::acquire_lock(&self.store).await?;
59        match store.get(&id) {
60            Some(entity) => Ok(entity.clone()),
61            None => Err(RepositoryError::NotFound(format!(
62                "Notification with ID {} not found",
63                id
64            ))),
65        }
66    }
67
68    #[allow(clippy::map_entry)]
69    async fn update(
70        &self,
71        _id: String,
72        _relayer: NotificationRepoModel,
73    ) -> Result<NotificationRepoModel, RepositoryError> {
74        Err(RepositoryError::NotSupported("Not supported".to_string()))
75    }
76
77    async fn delete_by_id(&self, _id: String) -> Result<(), RepositoryError> {
78        Err(RepositoryError::NotSupported("Not supported".to_string()))
79    }
80
81    async fn list_all(&self) -> Result<Vec<NotificationRepoModel>, RepositoryError> {
82        let store = Self::acquire_lock(&self.store).await?;
83        let notifications: Vec<NotificationRepoModel> = store.values().cloned().collect();
84        Ok(notifications)
85    }
86
87    async fn list_paginated(
88        &self,
89        query: PaginationQuery,
90    ) -> Result<PaginatedResult<NotificationRepoModel>, RepositoryError> {
91        let total = self.count().await?;
92        let start = ((query.page - 1) * query.per_page) as usize;
93        let items: Vec<NotificationRepoModel> = self
94            .store
95            .lock()
96            .await
97            .values()
98            .skip(start)
99            .take(query.per_page as usize)
100            .cloned()
101            .collect();
102
103        Ok(PaginatedResult {
104            items,
105            total: total as u64,
106            page: query.page,
107            per_page: query.per_page,
108        })
109    }
110
111    async fn count(&self) -> Result<usize, RepositoryError> {
112        let store = Self::acquire_lock(&self.store).await?;
113        let length = store.len();
114        Ok(length)
115    }
116}
117
118impl TryFrom<NotificationFileConfig> for NotificationRepoModel {
119    type Error = ConversionError;
120
121    fn try_from(config: NotificationFileConfig) -> Result<Self, Self::Error> {
122        Ok(NotificationRepoModel {
123            id: config.id.clone(),
124            url: config.url.clone(),
125            notification_type: ModelNotificationType::try_from(&config.r#type)?,
126            signing_key: config.get_signing_key(),
127        })
128    }
129}
130
131impl TryFrom<&NotificationFileConfigType> for ModelNotificationType {
132    type Error = ConversionError;
133
134    fn try_from(config: &NotificationFileConfigType) -> Result<Self, Self::Error> {
135        match config {
136            NotificationFileConfigType::Webhook => Ok(ModelNotificationType::Webhook),
137        }
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144
145    fn create_test_notification(id: String) -> NotificationRepoModel {
146        NotificationRepoModel {
147            id: id.clone(),
148            url: "http://localhost".to_string(),
149            notification_type: ModelNotificationType::Webhook,
150            signing_key: None,
151        }
152    }
153
154    #[actix_web::test]
155    async fn test_new_repository_is_empty() {
156        let repo = InMemoryNotificationRepository::new();
157        assert_eq!(repo.count().await.unwrap(), 0);
158    }
159
160    #[actix_web::test]
161    async fn test_add_notification() {
162        let repo = InMemoryNotificationRepository::new();
163        let notification = create_test_notification("test".to_string());
164
165        repo.create(notification.clone()).await.unwrap();
166        assert_eq!(repo.count().await.unwrap(), 1);
167
168        let stored = repo.get_by_id("test".to_string()).await.unwrap();
169        assert_eq!(stored.id, notification.id);
170    }
171
172    #[actix_web::test]
173    async fn test_update_notification() {
174        let repo = InMemoryNotificationRepository::new();
175        let notification = create_test_notification("test".to_string());
176
177        let result = repo.update("test".to_string(), notification).await;
178        assert!(matches!(result, Err(RepositoryError::NotSupported(_))));
179    }
180
181    #[actix_web::test]
182    async fn test_list_notifications() {
183        let repo = InMemoryNotificationRepository::new();
184        let notification1 = create_test_notification("test".to_string());
185        let notification2 = create_test_notification("test2".to_string());
186
187        repo.create(notification1.clone()).await.unwrap();
188        repo.create(notification2).await.unwrap();
189
190        let notifications = repo.list_all().await.unwrap();
191        assert_eq!(notifications.len(), 2);
192    }
193
194    #[actix_web::test]
195    async fn test_update_nonexistent_notification() {
196        let repo = InMemoryNotificationRepository::new();
197        let notification = create_test_notification("test".to_string());
198
199        let result = repo.update("test2".to_string(), notification).await;
200        assert!(matches!(result, Err(RepositoryError::NotSupported(_))));
201    }
202
203    #[actix_web::test]
204    async fn test_get_nonexistent_notification() {
205        let repo = InMemoryNotificationRepository::new();
206
207        let result = repo.get_by_id("test".to_string()).await;
208        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
209    }
210}