openzeppelin_relayer/jobs/
job_producer.rs

1//! Job producer module for enqueueing jobs to Redis queues.
2//!
3//! Provides functionality for producing various types of jobs:
4//! - Transaction processing jobs
5//! - Transaction submission jobs
6//! - Status monitoring jobs
7//! - Notification jobs
8
9use crate::{
10    jobs::{
11        Job, NotificationSend, Queue, TransactionRequest, TransactionSend, TransactionStatusCheck,
12    },
13    models::RelayerError,
14};
15use apalis::prelude::Storage;
16use apalis_redis::RedisError;
17use async_trait::async_trait;
18use log::{error, info};
19use serde::Serialize;
20use thiserror::Error;
21use tokio::sync::Mutex;
22
23use super::{JobType, SolanaTokenSwapRequest};
24
25#[cfg(test)]
26use mockall::automock;
27
28#[derive(Debug, Error, Serialize)]
29pub enum JobProducerError {
30    #[error("Queue error: {0}")]
31    QueueError(String),
32}
33
34impl From<RedisError> for JobProducerError {
35    fn from(_: RedisError) -> Self {
36        JobProducerError::QueueError("Queue error".to_string())
37    }
38}
39
40impl From<JobProducerError> for RelayerError {
41    fn from(_: JobProducerError) -> Self {
42        RelayerError::QueueError("Queue error".to_string())
43    }
44}
45
46#[derive(Debug)]
47pub struct JobProducer {
48    queue: Mutex<Queue>,
49}
50
51impl Clone for JobProducer {
52    fn clone(&self) -> Self {
53        // We can't clone the Mutex directly, but we can create a new one with a cloned Queue
54        // This requires getting the lock first
55        let queue = self
56            .queue
57            .try_lock()
58            .expect("Failed to lock queue for cloning")
59            .clone();
60
61        Self {
62            queue: Mutex::new(queue),
63        }
64    }
65}
66
67#[async_trait]
68#[cfg_attr(test, automock)]
69pub trait JobProducerTrait: Send + Sync {
70    async fn produce_transaction_request_job(
71        &self,
72        transaction_process_job: TransactionRequest,
73        scheduled_on: Option<i64>,
74    ) -> Result<(), JobProducerError>;
75
76    async fn produce_submit_transaction_job(
77        &self,
78        transaction_submit_job: TransactionSend,
79        scheduled_on: Option<i64>,
80    ) -> Result<(), JobProducerError>;
81
82    async fn produce_check_transaction_status_job(
83        &self,
84        transaction_status_check_job: TransactionStatusCheck,
85        scheduled_on: Option<i64>,
86    ) -> Result<(), JobProducerError>;
87
88    async fn produce_send_notification_job(
89        &self,
90        notification_send_job: NotificationSend,
91        scheduled_on: Option<i64>,
92    ) -> Result<(), JobProducerError>;
93
94    async fn produce_solana_token_swap_request_job(
95        &self,
96        solana_swap_request_job: SolanaTokenSwapRequest,
97        scheduled_on: Option<i64>,
98    ) -> Result<(), JobProducerError>;
99}
100
101impl JobProducer {
102    pub fn new(queue: Queue) -> Self {
103        Self {
104            queue: Mutex::new(queue.clone()),
105        }
106    }
107
108    pub async fn get_queue(&self) -> Result<Queue, JobProducerError> {
109        let queue = self.queue.lock().await;
110
111        Ok(queue.clone())
112    }
113}
114
115#[async_trait]
116impl JobProducerTrait for JobProducer {
117    async fn produce_transaction_request_job(
118        &self,
119        transaction_process_job: TransactionRequest,
120        scheduled_on: Option<i64>,
121    ) -> Result<(), JobProducerError> {
122        info!(
123            "Producing transaction request job: {:?}",
124            transaction_process_job
125        );
126        let mut queue = self.queue.lock().await;
127        let job = Job::new(JobType::TransactionRequest, transaction_process_job);
128
129        match scheduled_on {
130            Some(scheduled_on) => {
131                queue
132                    .transaction_request_queue
133                    .schedule(job, scheduled_on)
134                    .await?;
135            }
136            None => {
137                queue.transaction_request_queue.push(job).await?;
138            }
139        }
140        info!("Transaction job produced successfully");
141
142        Ok(())
143    }
144
145    async fn produce_submit_transaction_job(
146        &self,
147        transaction_submit_job: TransactionSend,
148        scheduled_on: Option<i64>,
149    ) -> Result<(), JobProducerError> {
150        let mut queue = self.queue.lock().await;
151        let job = Job::new(JobType::TransactionSend, transaction_submit_job);
152
153        match scheduled_on {
154            Some(on) => {
155                queue.transaction_submission_queue.schedule(job, on).await?;
156            }
157            None => {
158                queue.transaction_submission_queue.push(job).await?;
159            }
160        }
161        info!("Transaction Submit job produced successfully");
162
163        Ok(())
164    }
165
166    async fn produce_check_transaction_status_job(
167        &self,
168        transaction_status_check_job: TransactionStatusCheck,
169        scheduled_on: Option<i64>,
170    ) -> Result<(), JobProducerError> {
171        let mut queue = self.queue.lock().await;
172        let job = Job::new(
173            JobType::TransactionStatusCheck,
174            transaction_status_check_job,
175        );
176        match scheduled_on {
177            Some(on) => {
178                queue.transaction_status_queue.schedule(job, on).await?;
179            }
180            None => {
181                queue.transaction_status_queue.push(job).await?;
182            }
183        }
184        info!("Transaction Status Check job produced successfully");
185        Ok(())
186    }
187
188    async fn produce_send_notification_job(
189        &self,
190        notification_send_job: NotificationSend,
191        scheduled_on: Option<i64>,
192    ) -> Result<(), JobProducerError> {
193        let mut queue = self.queue.lock().await;
194        let job = Job::new(JobType::NotificationSend, notification_send_job);
195
196        match scheduled_on {
197            Some(on) => {
198                queue.notification_queue.schedule(job, on).await?;
199            }
200            None => {
201                queue.notification_queue.push(job).await?;
202            }
203        }
204
205        info!("Notification Send job produced successfully");
206        Ok(())
207    }
208
209    async fn produce_solana_token_swap_request_job(
210        &self,
211        solana_swap_request_job: SolanaTokenSwapRequest,
212        scheduled_on: Option<i64>,
213    ) -> Result<(), JobProducerError> {
214        let mut queue = self.queue.lock().await;
215        let job = Job::new(JobType::SolanaTokenSwapRequest, solana_swap_request_job);
216
217        match scheduled_on {
218            Some(on) => {
219                queue
220                    .solana_token_swap_request_queue
221                    .schedule(job, on)
222                    .await?;
223            }
224            None => {
225                queue.solana_token_swap_request_queue.push(job).await?;
226            }
227        }
228
229        info!("Solana token swap job produced successfully");
230        Ok(())
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237    use crate::models::{
238        EvmTransactionResponse, TransactionResponse, TransactionStatus, WebhookNotification,
239        WebhookPayload, U256,
240    }; // Define a simplified queue for testing without using complex mocks
241    #[derive(Clone, Debug)]
242    struct TestRedisStorage<T> {
243        pub push_called: bool,
244        pub schedule_called: bool,
245        _phantom: std::marker::PhantomData<T>,
246    }
247
248    impl<T> TestRedisStorage<T> {
249        fn new() -> Self {
250            Self {
251                push_called: false,
252                schedule_called: false,
253                _phantom: std::marker::PhantomData,
254            }
255        }
256
257        async fn push(&mut self, _job: T) -> Result<(), JobProducerError> {
258            self.push_called = true;
259            Ok(())
260        }
261
262        async fn schedule(&mut self, _job: T, _timestamp: i64) -> Result<(), JobProducerError> {
263            self.schedule_called = true;
264            Ok(())
265        }
266    }
267
268    // A test version of the Queue
269    #[derive(Clone, Debug)]
270    struct TestQueue {
271        pub transaction_request_queue: TestRedisStorage<Job<TransactionRequest>>,
272        pub transaction_submission_queue: TestRedisStorage<Job<TransactionSend>>,
273        pub transaction_status_queue: TestRedisStorage<Job<TransactionStatusCheck>>,
274        pub notification_queue: TestRedisStorage<Job<NotificationSend>>,
275        pub solana_token_swap_request_queue: TestRedisStorage<Job<SolanaTokenSwapRequest>>,
276    }
277
278    impl TestQueue {
279        fn new() -> Self {
280            Self {
281                transaction_request_queue: TestRedisStorage::new(),
282                transaction_submission_queue: TestRedisStorage::new(),
283                transaction_status_queue: TestRedisStorage::new(),
284                notification_queue: TestRedisStorage::new(),
285                solana_token_swap_request_queue: TestRedisStorage::new(),
286            }
287        }
288    }
289
290    // A test version of JobProducer
291    struct TestJobProducer {
292        queue: Mutex<TestQueue>,
293    }
294
295    impl TestJobProducer {
296        fn new() -> Self {
297            Self {
298                queue: Mutex::new(TestQueue::new()),
299            }
300        }
301
302        async fn get_queue(&self) -> TestQueue {
303            self.queue.lock().await.clone()
304        }
305    }
306
307    #[async_trait]
308    impl JobProducerTrait for TestJobProducer {
309        async fn produce_transaction_request_job(
310            &self,
311            transaction_process_job: TransactionRequest,
312            scheduled_on: Option<i64>,
313        ) -> Result<(), JobProducerError> {
314            let mut queue = self.queue.lock().await;
315            let job = Job::new(JobType::TransactionRequest, transaction_process_job);
316
317            match scheduled_on {
318                Some(scheduled_on) => {
319                    queue
320                        .transaction_request_queue
321                        .schedule(job, scheduled_on)
322                        .await?;
323                }
324                None => {
325                    queue.transaction_request_queue.push(job).await?;
326                }
327            }
328
329            Ok(())
330        }
331
332        async fn produce_submit_transaction_job(
333            &self,
334            transaction_submit_job: TransactionSend,
335            scheduled_on: Option<i64>,
336        ) -> Result<(), JobProducerError> {
337            let mut queue = self.queue.lock().await;
338            let job = Job::new(JobType::TransactionSend, transaction_submit_job);
339
340            match scheduled_on {
341                Some(on) => {
342                    queue.transaction_submission_queue.schedule(job, on).await?;
343                }
344                None => {
345                    queue.transaction_submission_queue.push(job).await?;
346                }
347            }
348
349            Ok(())
350        }
351
352        async fn produce_check_transaction_status_job(
353            &self,
354            transaction_status_check_job: TransactionStatusCheck,
355            scheduled_on: Option<i64>,
356        ) -> Result<(), JobProducerError> {
357            let mut queue = self.queue.lock().await;
358            let job = Job::new(
359                JobType::TransactionStatusCheck,
360                transaction_status_check_job,
361            );
362
363            match scheduled_on {
364                Some(on) => {
365                    queue.transaction_status_queue.schedule(job, on).await?;
366                }
367                None => {
368                    queue.transaction_status_queue.push(job).await?;
369                }
370            }
371
372            Ok(())
373        }
374
375        async fn produce_send_notification_job(
376            &self,
377            notification_send_job: NotificationSend,
378            scheduled_on: Option<i64>,
379        ) -> Result<(), JobProducerError> {
380            let mut queue = self.queue.lock().await;
381            let job = Job::new(JobType::NotificationSend, notification_send_job);
382
383            match scheduled_on {
384                Some(on) => {
385                    queue.notification_queue.schedule(job, on).await?;
386                }
387                None => {
388                    queue.notification_queue.push(job).await?;
389                }
390            }
391
392            Ok(())
393        }
394
395        async fn produce_solana_token_swap_request_job(
396            &self,
397            solana_token_swap_request_job: SolanaTokenSwapRequest,
398            scheduled_on: Option<i64>,
399        ) -> Result<(), JobProducerError> {
400            let mut queue = self.queue.lock().await;
401            let job = Job::new(
402                JobType::SolanaTokenSwapRequest,
403                solana_token_swap_request_job,
404            );
405
406            match scheduled_on {
407                Some(on) => {
408                    queue
409                        .solana_token_swap_request_queue
410                        .schedule(job, on)
411                        .await?;
412                }
413                None => {
414                    queue.solana_token_swap_request_queue.push(job).await?;
415                }
416            }
417
418            Ok(())
419        }
420    }
421
422    #[tokio::test]
423    async fn test_job_producer_operations() {
424        let producer = TestJobProducer::new();
425
426        // Test transaction request job
427        let request = TransactionRequest::new("tx123", "relayer-1");
428        let result = producer
429            .produce_transaction_request_job(request, None)
430            .await;
431        assert!(result.is_ok());
432
433        let queue = producer.get_queue().await;
434        assert!(queue.transaction_request_queue.push_called);
435
436        // Test scheduled job
437        let producer = TestJobProducer::new();
438        let request = TransactionRequest::new("tx123", "relayer-1");
439        let result = producer
440            .produce_transaction_request_job(request, Some(1000))
441            .await;
442        assert!(result.is_ok());
443
444        let queue = producer.get_queue().await;
445        assert!(queue.transaction_request_queue.schedule_called);
446    }
447
448    #[tokio::test]
449    async fn test_submit_transaction_job() {
450        let producer = TestJobProducer::new();
451
452        // Test submit transaction job
453        let submit_job = TransactionSend::submit("tx123", "relayer-1");
454        let result = producer
455            .produce_submit_transaction_job(submit_job, None)
456            .await;
457        assert!(result.is_ok());
458
459        let queue = producer.get_queue().await;
460        assert!(queue.transaction_submission_queue.push_called);
461    }
462
463    #[tokio::test]
464    async fn test_check_status_job() {
465        let producer = TestJobProducer::new();
466
467        // Test status check job
468        let status_job = TransactionStatusCheck::new("tx123", "relayer-1");
469        let result = producer
470            .produce_check_transaction_status_job(status_job, None)
471            .await;
472        assert!(result.is_ok());
473
474        let queue = producer.get_queue().await;
475        assert!(queue.transaction_status_queue.push_called);
476    }
477
478    #[tokio::test]
479    async fn test_notification_job() {
480        let producer = TestJobProducer::new();
481
482        // Create a simple notification for testing
483        let notification = WebhookNotification::new(
484            "test_event".to_string(),
485            WebhookPayload::Transaction(TransactionResponse::Evm(EvmTransactionResponse {
486                id: "tx123".to_string(),
487                hash: Some("0x123".to_string()),
488                status: TransactionStatus::Confirmed,
489                status_reason: None,
490                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
491                sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
492                confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
493                gas_price: Some(1000000000),
494                gas_limit: 21000,
495                nonce: Some(1),
496                value: U256::from(1000000000000000000_u64),
497                from: "0xabc".to_string(),
498                to: Some("0xdef".to_string()),
499                relayer_id: "relayer-1".to_string(),
500            })),
501        );
502        let job = NotificationSend::new("notification-1".to_string(), notification);
503
504        let result = producer.produce_send_notification_job(job, None).await;
505        assert!(result.is_ok());
506
507        let queue = producer.get_queue().await;
508        assert!(queue.notification_queue.push_called);
509    }
510
511    #[test]
512    fn test_job_producer_error_conversion() {
513        // Test error conversion without using specific Redis error types
514        let job_error = JobProducerError::QueueError("Test error".to_string());
515        let relayer_error: RelayerError = job_error.into();
516
517        match relayer_error {
518            RelayerError::QueueError(msg) => {
519                assert_eq!(msg, "Queue error");
520            }
521            _ => panic!("Unexpected error type"),
522        }
523    }
524}