openzeppelin_relayer/jobs/handlers/
notification_handler.rs

1//! Notification handling worker implementation.
2//!
3//! This module implements the notification handling worker that processes
4//! notification jobs from the queue.
5
6use actix_web::web::ThinData;
7use apalis::prelude::{Attempt, Data, *};
8use eyre::Result;
9use log::info;
10
11use crate::{
12    constants::WORKER_DEFAULT_MAXIMUM_RETRIES,
13    jobs::{handle_result, Job, JobProducer, NotificationSend},
14    models::AppState,
15    repositories::Repository,
16    services::WebhookNotificationService,
17};
18
19/// Handles incoming notification jobs from the queue.
20///
21/// # Arguments
22/// * `job` - The notification job containing recipient and message details
23/// * `context` - Application state containing notification services
24///
25/// # Returns
26/// * `Result<(), Error>` - Success or failure of notification processing
27pub async fn notification_handler(
28    job: Job<NotificationSend>,
29    context: Data<ThinData<AppState<JobProducer>>>,
30    attempt: Attempt,
31) -> Result<(), Error> {
32    info!("handling notification: {:?}", job.data);
33
34    let result = handle_request(job.data, context).await;
35
36    handle_result(
37        result,
38        attempt,
39        "Notification",
40        WORKER_DEFAULT_MAXIMUM_RETRIES,
41    )
42}
43
44async fn handle_request(
45    request: NotificationSend,
46    context: Data<ThinData<AppState<JobProducer>>>,
47) -> Result<()> {
48    info!("sending notification: {:?}", request);
49    let notification = context
50        .notification_repository
51        .get_by_id(request.notification_id)
52        .await?;
53
54    let notification_service =
55        WebhookNotificationService::new(notification.url, notification.signing_key);
56
57    notification_service
58        .send_notification(request.notification)
59        .await?;
60
61    Ok(())
62}
63
64#[cfg(test)]
65mod tests {
66    use super::*;
67    use crate::models::{
68        EvmPolicyResponse, EvmTransactionResponse, NetworkPolicyResponse, NetworkType,
69        RelayerDisabledPayload, RelayerResponse, TransactionResponse, TransactionStatus,
70        WebhookNotification, WebhookPayload, U256,
71    };
72
73    #[tokio::test]
74    async fn test_notification_job_creation() {
75        // Create a basic notification webhook payload
76        let payload =
77            WebhookPayload::Transaction(TransactionResponse::Evm(EvmTransactionResponse {
78                id: "tx123".to_string(),
79                hash: Some("0x123".to_string()),
80                status: TransactionStatus::Confirmed,
81                status_reason: None,
82                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
83                sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
84                confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
85                gas_price: Some(1000000000),
86                gas_limit: 21000,
87                nonce: Some(1),
88                value: U256::from(1000000000000000000_u64),
89                from: "0xabc".to_string(),
90                to: Some("0xdef".to_string()),
91                relayer_id: "relayer-1".to_string(),
92            }));
93
94        // Create a notification
95        let notification = WebhookNotification::new("test_event".to_string(), payload);
96        let notification_job =
97            NotificationSend::new("notification-1".to_string(), notification.clone());
98
99        // Create the job
100        let job = Job::new(crate::jobs::JobType::NotificationSend, notification_job);
101
102        // Test the job structure
103        assert_eq!(job.data.notification_id, "notification-1");
104        assert_eq!(job.data.notification.event, "test_event");
105    }
106
107    #[tokio::test]
108    async fn test_notification_job_with_different_payloads() {
109        // Test with different payload types
110
111        let transaction_payload =
112            WebhookPayload::Transaction(TransactionResponse::Evm(EvmTransactionResponse {
113                id: "tx123".to_string(),
114                hash: Some("0x123".to_string()),
115                status: TransactionStatus::Confirmed,
116                status_reason: None,
117                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
118                sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
119                confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
120                gas_price: Some(1000000000),
121                gas_limit: 21000,
122                nonce: Some(1),
123                value: U256::from(1000000000000000000_u64),
124                from: "0xabc".to_string(),
125                to: Some("0xdef".to_string()),
126                relayer_id: "relayer-1".to_string(),
127            }));
128
129        let string_notification =
130            WebhookNotification::new("transaction_payload".to_string(), transaction_payload);
131        let job = NotificationSend::new("notification-string".to_string(), string_notification);
132        assert_eq!(job.notification.event, "transaction_payload");
133
134        let relayer_disabled = WebhookPayload::RelayerDisabled(RelayerDisabledPayload {
135            relayer: RelayerResponse {
136                id: "relayer-1".to_string(),
137                name: "relayer-1".to_string(),
138                network: "ethereum".to_string(),
139                network_type: NetworkType::Evm,
140                paused: false,
141                policies: NetworkPolicyResponse::Evm(EvmPolicyResponse {
142                    gas_price_cap: None,
143                    whitelist_receivers: None,
144                    eip1559_pricing: None,
145                    private_transactions: false,
146                    min_balance: 0,
147                }),
148                address: "0xabc".to_string(),
149                system_disabled: false,
150            },
151            disable_reason: "test".to_string(),
152        });
153        let object_notification =
154            WebhookNotification::new("object_event".to_string(), relayer_disabled);
155        let job = NotificationSend::new("notification-object".to_string(), object_notification);
156        assert_eq!(job.notification.event, "object_event");
157    }
158}