openzeppelin_relayer/jobs/
queue.rs1use apalis_redis::{Config, RedisStorage};
10use color_eyre::{eyre, Result};
11use log::error;
12use serde::{Deserialize, Serialize};
13use tokio::time::{timeout, Duration};
14
15use crate::config::ServerConfig;
16
17use super::{
18 Job, NotificationSend, SolanaTokenSwapRequest, TransactionRequest, TransactionSend,
19 TransactionStatusCheck,
20};
21
22#[derive(Clone, Debug)]
23pub struct Queue {
24 pub transaction_request_queue: RedisStorage<Job<TransactionRequest>>,
25 pub transaction_submission_queue: RedisStorage<Job<TransactionSend>>,
26 pub transaction_status_queue: RedisStorage<Job<TransactionStatusCheck>>,
27 pub notification_queue: RedisStorage<Job<NotificationSend>>,
28 pub solana_token_swap_request_queue: RedisStorage<Job<SolanaTokenSwapRequest>>,
29}
30
31impl Queue {
32 async fn storage<T: Serialize + for<'de> Deserialize<'de>>(
33 namespace: &str,
34 ) -> Result<RedisStorage<T>> {
35 let redis_url = ServerConfig::from_env().redis_url.clone();
36 let redis_connection_timeout_ms = ServerConfig::from_env().redis_connection_timeout_ms;
37 let conn = match timeout(Duration::from_millis(redis_connection_timeout_ms), apalis_redis::connect(redis_url.clone())).await {
38 Ok(result) => result.map_err(|e| {
39 error!("Failed to connect to Redis at {}: {}", redis_url, e);
40 eyre::eyre!("Failed to connect to Redis. Please ensure Redis is running and accessible at {}. Error: {}", redis_url, e)
41 })?,
42 Err(_) => {
43 error!("Timeout connecting to Redis at {}", redis_url);
44 return Err(eyre::eyre!("Timed out after {} milliseconds while connecting to Redis at {}", redis_connection_timeout_ms, redis_url));
45 }
46 };
47 let config = Config::default().set_namespace(namespace);
48
49 Ok(RedisStorage::new_with_config(conn, config))
50 }
51
52 pub async fn setup() -> Result<Self> {
53 Ok(Self {
54 transaction_request_queue: Self::storage("transaction_request_queue").await?,
55 transaction_submission_queue: Self::storage("transaction_submission_queue").await?,
56 transaction_status_queue: Self::storage("transaction_status_queue").await?,
57 notification_queue: Self::storage("notification_queue").await?,
58 solana_token_swap_request_queue: Self::storage("solana_token_swap_request_queue")
59 .await?,
60 })
61 }
62}
63
64#[cfg(test)]
65mod tests {
66 use super::*;
67
68 #[tokio::test]
69 async fn test_queue_storage_configuration() {
70 let namespace = "test_namespace";
72 let config = Config::default().set_namespace(namespace);
73
74 assert_eq!(config.get_namespace(), namespace);
75 }
76
77 #[derive(Clone, Debug)]
79 struct MockQueue {
80 pub namespace_transaction_request: String,
81 pub namespace_transaction_submission: String,
82 pub namespace_transaction_status: String,
83 pub namespace_notification: String,
84 pub namespace_solana_token_swap_request_queue: String,
85 }
86
87 impl MockQueue {
88 fn new() -> Self {
89 Self {
90 namespace_transaction_request: "transaction_request_queue".to_string(),
91 namespace_transaction_submission: "transaction_submission_queue".to_string(),
92 namespace_transaction_status: "transaction_status_queue".to_string(),
93 namespace_notification: "notification_queue".to_string(),
94 namespace_solana_token_swap_request_queue: "solana_token_swap_request_queue"
95 .to_string(),
96 }
97 }
98 }
99
100 #[test]
101 fn test_queue_namespaces() {
102 let mock_queue = MockQueue::new();
103
104 assert_eq!(
105 mock_queue.namespace_transaction_request,
106 "transaction_request_queue"
107 );
108 assert_eq!(
109 mock_queue.namespace_transaction_submission,
110 "transaction_submission_queue"
111 );
112 assert_eq!(
113 mock_queue.namespace_transaction_status,
114 "transaction_status_queue"
115 );
116 assert_eq!(mock_queue.namespace_notification, "notification_queue");
117 assert_eq!(
118 mock_queue.namespace_solana_token_swap_request_queue,
119 "solana_token_swap_request_queue"
120 );
121 }
122}