openzeppelin_relayer/jobs/
queue.rs

1//! Queue management module for job processing.
2//!
3//! This module provides Redis-backed queue implementation for handling different types of jobs:
4//! - Transaction requests
5//! - Transaction submissions
6//! - Transaction status checks
7//! - Notifications
8//! - Solana swap requests
9use apalis_redis::{Config, RedisStorage};
10use color_eyre::{eyre, Result};
11use log::error;
12use serde::{Deserialize, Serialize};
13use tokio::time::{timeout, Duration};
14
15use crate::config::ServerConfig;
16
17use super::{
18    Job, NotificationSend, SolanaTokenSwapRequest, TransactionRequest, TransactionSend,
19    TransactionStatusCheck,
20};
21
22#[derive(Clone, Debug)]
23pub struct Queue {
24    pub transaction_request_queue: RedisStorage<Job<TransactionRequest>>,
25    pub transaction_submission_queue: RedisStorage<Job<TransactionSend>>,
26    pub transaction_status_queue: RedisStorage<Job<TransactionStatusCheck>>,
27    pub notification_queue: RedisStorage<Job<NotificationSend>>,
28    pub solana_token_swap_request_queue: RedisStorage<Job<SolanaTokenSwapRequest>>,
29}
30
31impl Queue {
32    async fn storage<T: Serialize + for<'de> Deserialize<'de>>(
33        namespace: &str,
34    ) -> Result<RedisStorage<T>> {
35        let redis_url = ServerConfig::from_env().redis_url.clone();
36        let redis_connection_timeout_ms = ServerConfig::from_env().redis_connection_timeout_ms;
37        let conn = match timeout(Duration::from_millis(redis_connection_timeout_ms), apalis_redis::connect(redis_url.clone())).await {
38            Ok(result) => result.map_err(|e| {
39                error!("Failed to connect to Redis at {}: {}", redis_url, e);
40                eyre::eyre!("Failed to connect to Redis. Please ensure Redis is running and accessible at {}. Error: {}", redis_url, e)
41            })?,
42            Err(_) => {
43                error!("Timeout connecting to Redis at {}", redis_url);
44                return Err(eyre::eyre!("Timed out after {} milliseconds while connecting to Redis at {}", redis_connection_timeout_ms, redis_url));
45            }
46        };
47        let config = Config::default().set_namespace(namespace);
48
49        Ok(RedisStorage::new_with_config(conn, config))
50    }
51
52    pub async fn setup() -> Result<Self> {
53        Ok(Self {
54            transaction_request_queue: Self::storage("transaction_request_queue").await?,
55            transaction_submission_queue: Self::storage("transaction_submission_queue").await?,
56            transaction_status_queue: Self::storage("transaction_status_queue").await?,
57            notification_queue: Self::storage("notification_queue").await?,
58            solana_token_swap_request_queue: Self::storage("solana_token_swap_request_queue")
59                .await?,
60        })
61    }
62}
63
64#[cfg(test)]
65mod tests {
66    use super::*;
67
68    #[tokio::test]
69    async fn test_queue_storage_configuration() {
70        // Test the config creation logic without actual Redis connections
71        let namespace = "test_namespace";
72        let config = Config::default().set_namespace(namespace);
73
74        assert_eq!(config.get_namespace(), namespace);
75    }
76
77    // Mock version of Queue for testing
78    #[derive(Clone, Debug)]
79    struct MockQueue {
80        pub namespace_transaction_request: String,
81        pub namespace_transaction_submission: String,
82        pub namespace_transaction_status: String,
83        pub namespace_notification: String,
84        pub namespace_solana_token_swap_request_queue: String,
85    }
86
87    impl MockQueue {
88        fn new() -> Self {
89            Self {
90                namespace_transaction_request: "transaction_request_queue".to_string(),
91                namespace_transaction_submission: "transaction_submission_queue".to_string(),
92                namespace_transaction_status: "transaction_status_queue".to_string(),
93                namespace_notification: "notification_queue".to_string(),
94                namespace_solana_token_swap_request_queue: "solana_token_swap_request_queue"
95                    .to_string(),
96            }
97        }
98    }
99
100    #[test]
101    fn test_queue_namespaces() {
102        let mock_queue = MockQueue::new();
103
104        assert_eq!(
105            mock_queue.namespace_transaction_request,
106            "transaction_request_queue"
107        );
108        assert_eq!(
109            mock_queue.namespace_transaction_submission,
110            "transaction_submission_queue"
111        );
112        assert_eq!(
113            mock_queue.namespace_transaction_status,
114            "transaction_status_queue"
115        );
116        assert_eq!(mock_queue.namespace_notification, "notification_queue");
117        assert_eq!(
118            mock_queue.namespace_solana_token_swap_request_queue,
119            "solana_token_swap_request_queue"
120        );
121    }
122}