openzeppelin_relayer/jobs/
job_producer.rs1use crate::{
10 jobs::{
11 Job, NotificationSend, Queue, TransactionRequest, TransactionSend, TransactionStatusCheck,
12 },
13 models::RelayerError,
14};
15use apalis::prelude::Storage;
16use apalis_redis::RedisError;
17use async_trait::async_trait;
18use log::{error, info};
19use serde::Serialize;
20use thiserror::Error;
21use tokio::sync::Mutex;
22
23use super::{JobType, SolanaTokenSwapRequest};
24
25#[cfg(test)]
26use mockall::automock;
27
28#[derive(Debug, Error, Serialize)]
29pub enum JobProducerError {
30 #[error("Queue error: {0}")]
31 QueueError(String),
32}
33
34impl From<RedisError> for JobProducerError {
35 fn from(_: RedisError) -> Self {
36 JobProducerError::QueueError("Queue error".to_string())
37 }
38}
39
40impl From<JobProducerError> for RelayerError {
41 fn from(_: JobProducerError) -> Self {
42 RelayerError::QueueError("Queue error".to_string())
43 }
44}
45
46#[derive(Debug)]
47pub struct JobProducer {
48 queue: Mutex<Queue>,
49}
50
51impl Clone for JobProducer {
52 fn clone(&self) -> Self {
53 let queue = self
56 .queue
57 .try_lock()
58 .expect("Failed to lock queue for cloning")
59 .clone();
60
61 Self {
62 queue: Mutex::new(queue),
63 }
64 }
65}
66
67#[async_trait]
68#[cfg_attr(test, automock)]
69pub trait JobProducerTrait: Send + Sync {
70 async fn produce_transaction_request_job(
71 &self,
72 transaction_process_job: TransactionRequest,
73 scheduled_on: Option<i64>,
74 ) -> Result<(), JobProducerError>;
75
76 async fn produce_submit_transaction_job(
77 &self,
78 transaction_submit_job: TransactionSend,
79 scheduled_on: Option<i64>,
80 ) -> Result<(), JobProducerError>;
81
82 async fn produce_check_transaction_status_job(
83 &self,
84 transaction_status_check_job: TransactionStatusCheck,
85 scheduled_on: Option<i64>,
86 ) -> Result<(), JobProducerError>;
87
88 async fn produce_send_notification_job(
89 &self,
90 notification_send_job: NotificationSend,
91 scheduled_on: Option<i64>,
92 ) -> Result<(), JobProducerError>;
93
94 async fn produce_solana_token_swap_request_job(
95 &self,
96 solana_swap_request_job: SolanaTokenSwapRequest,
97 scheduled_on: Option<i64>,
98 ) -> Result<(), JobProducerError>;
99}
100
101impl JobProducer {
102 pub fn new(queue: Queue) -> Self {
103 Self {
104 queue: Mutex::new(queue.clone()),
105 }
106 }
107
108 pub async fn get_queue(&self) -> Result<Queue, JobProducerError> {
109 let queue = self.queue.lock().await;
110
111 Ok(queue.clone())
112 }
113}
114
115#[async_trait]
116impl JobProducerTrait for JobProducer {
117 async fn produce_transaction_request_job(
118 &self,
119 transaction_process_job: TransactionRequest,
120 scheduled_on: Option<i64>,
121 ) -> Result<(), JobProducerError> {
122 info!(
123 "Producing transaction request job: {:?}",
124 transaction_process_job
125 );
126 let mut queue = self.queue.lock().await;
127 let job = Job::new(JobType::TransactionRequest, transaction_process_job);
128
129 match scheduled_on {
130 Some(scheduled_on) => {
131 queue
132 .transaction_request_queue
133 .schedule(job, scheduled_on)
134 .await?;
135 }
136 None => {
137 queue.transaction_request_queue.push(job).await?;
138 }
139 }
140 info!("Transaction job produced successfully");
141
142 Ok(())
143 }
144
145 async fn produce_submit_transaction_job(
146 &self,
147 transaction_submit_job: TransactionSend,
148 scheduled_on: Option<i64>,
149 ) -> Result<(), JobProducerError> {
150 let mut queue = self.queue.lock().await;
151 let job = Job::new(JobType::TransactionSend, transaction_submit_job);
152
153 match scheduled_on {
154 Some(on) => {
155 queue.transaction_submission_queue.schedule(job, on).await?;
156 }
157 None => {
158 queue.transaction_submission_queue.push(job).await?;
159 }
160 }
161 info!("Transaction Submit job produced successfully");
162
163 Ok(())
164 }
165
166 async fn produce_check_transaction_status_job(
167 &self,
168 transaction_status_check_job: TransactionStatusCheck,
169 scheduled_on: Option<i64>,
170 ) -> Result<(), JobProducerError> {
171 let mut queue = self.queue.lock().await;
172 let job = Job::new(
173 JobType::TransactionStatusCheck,
174 transaction_status_check_job,
175 );
176 match scheduled_on {
177 Some(on) => {
178 queue.transaction_status_queue.schedule(job, on).await?;
179 }
180 None => {
181 queue.transaction_status_queue.push(job).await?;
182 }
183 }
184 info!("Transaction Status Check job produced successfully");
185 Ok(())
186 }
187
188 async fn produce_send_notification_job(
189 &self,
190 notification_send_job: NotificationSend,
191 scheduled_on: Option<i64>,
192 ) -> Result<(), JobProducerError> {
193 let mut queue = self.queue.lock().await;
194 let job = Job::new(JobType::NotificationSend, notification_send_job);
195
196 match scheduled_on {
197 Some(on) => {
198 queue.notification_queue.schedule(job, on).await?;
199 }
200 None => {
201 queue.notification_queue.push(job).await?;
202 }
203 }
204
205 info!("Notification Send job produced successfully");
206 Ok(())
207 }
208
209 async fn produce_solana_token_swap_request_job(
210 &self,
211 solana_swap_request_job: SolanaTokenSwapRequest,
212 scheduled_on: Option<i64>,
213 ) -> Result<(), JobProducerError> {
214 let mut queue = self.queue.lock().await;
215 let job = Job::new(JobType::SolanaTokenSwapRequest, solana_swap_request_job);
216
217 match scheduled_on {
218 Some(on) => {
219 queue
220 .solana_token_swap_request_queue
221 .schedule(job, on)
222 .await?;
223 }
224 None => {
225 queue.solana_token_swap_request_queue.push(job).await?;
226 }
227 }
228
229 info!("Solana token swap job produced successfully");
230 Ok(())
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237 use crate::models::{
238 EvmTransactionResponse, TransactionResponse, TransactionStatus, WebhookNotification,
239 WebhookPayload, U256,
240 }; #[derive(Clone, Debug)]
242 struct TestRedisStorage<T> {
243 pub push_called: bool,
244 pub schedule_called: bool,
245 _phantom: std::marker::PhantomData<T>,
246 }
247
248 impl<T> TestRedisStorage<T> {
249 fn new() -> Self {
250 Self {
251 push_called: false,
252 schedule_called: false,
253 _phantom: std::marker::PhantomData,
254 }
255 }
256
257 async fn push(&mut self, _job: T) -> Result<(), JobProducerError> {
258 self.push_called = true;
259 Ok(())
260 }
261
262 async fn schedule(&mut self, _job: T, _timestamp: i64) -> Result<(), JobProducerError> {
263 self.schedule_called = true;
264 Ok(())
265 }
266 }
267
268 #[derive(Clone, Debug)]
270 struct TestQueue {
271 pub transaction_request_queue: TestRedisStorage<Job<TransactionRequest>>,
272 pub transaction_submission_queue: TestRedisStorage<Job<TransactionSend>>,
273 pub transaction_status_queue: TestRedisStorage<Job<TransactionStatusCheck>>,
274 pub notification_queue: TestRedisStorage<Job<NotificationSend>>,
275 pub solana_token_swap_request_queue: TestRedisStorage<Job<SolanaTokenSwapRequest>>,
276 }
277
278 impl TestQueue {
279 fn new() -> Self {
280 Self {
281 transaction_request_queue: TestRedisStorage::new(),
282 transaction_submission_queue: TestRedisStorage::new(),
283 transaction_status_queue: TestRedisStorage::new(),
284 notification_queue: TestRedisStorage::new(),
285 solana_token_swap_request_queue: TestRedisStorage::new(),
286 }
287 }
288 }
289
290 struct TestJobProducer {
292 queue: Mutex<TestQueue>,
293 }
294
295 impl TestJobProducer {
296 fn new() -> Self {
297 Self {
298 queue: Mutex::new(TestQueue::new()),
299 }
300 }
301
302 async fn get_queue(&self) -> TestQueue {
303 self.queue.lock().await.clone()
304 }
305 }
306
307 #[async_trait]
308 impl JobProducerTrait for TestJobProducer {
309 async fn produce_transaction_request_job(
310 &self,
311 transaction_process_job: TransactionRequest,
312 scheduled_on: Option<i64>,
313 ) -> Result<(), JobProducerError> {
314 let mut queue = self.queue.lock().await;
315 let job = Job::new(JobType::TransactionRequest, transaction_process_job);
316
317 match scheduled_on {
318 Some(scheduled_on) => {
319 queue
320 .transaction_request_queue
321 .schedule(job, scheduled_on)
322 .await?;
323 }
324 None => {
325 queue.transaction_request_queue.push(job).await?;
326 }
327 }
328
329 Ok(())
330 }
331
332 async fn produce_submit_transaction_job(
333 &self,
334 transaction_submit_job: TransactionSend,
335 scheduled_on: Option<i64>,
336 ) -> Result<(), JobProducerError> {
337 let mut queue = self.queue.lock().await;
338 let job = Job::new(JobType::TransactionSend, transaction_submit_job);
339
340 match scheduled_on {
341 Some(on) => {
342 queue.transaction_submission_queue.schedule(job, on).await?;
343 }
344 None => {
345 queue.transaction_submission_queue.push(job).await?;
346 }
347 }
348
349 Ok(())
350 }
351
352 async fn produce_check_transaction_status_job(
353 &self,
354 transaction_status_check_job: TransactionStatusCheck,
355 scheduled_on: Option<i64>,
356 ) -> Result<(), JobProducerError> {
357 let mut queue = self.queue.lock().await;
358 let job = Job::new(
359 JobType::TransactionStatusCheck,
360 transaction_status_check_job,
361 );
362
363 match scheduled_on {
364 Some(on) => {
365 queue.transaction_status_queue.schedule(job, on).await?;
366 }
367 None => {
368 queue.transaction_status_queue.push(job).await?;
369 }
370 }
371
372 Ok(())
373 }
374
375 async fn produce_send_notification_job(
376 &self,
377 notification_send_job: NotificationSend,
378 scheduled_on: Option<i64>,
379 ) -> Result<(), JobProducerError> {
380 let mut queue = self.queue.lock().await;
381 let job = Job::new(JobType::NotificationSend, notification_send_job);
382
383 match scheduled_on {
384 Some(on) => {
385 queue.notification_queue.schedule(job, on).await?;
386 }
387 None => {
388 queue.notification_queue.push(job).await?;
389 }
390 }
391
392 Ok(())
393 }
394
395 async fn produce_solana_token_swap_request_job(
396 &self,
397 solana_token_swap_request_job: SolanaTokenSwapRequest,
398 scheduled_on: Option<i64>,
399 ) -> Result<(), JobProducerError> {
400 let mut queue = self.queue.lock().await;
401 let job = Job::new(
402 JobType::SolanaTokenSwapRequest,
403 solana_token_swap_request_job,
404 );
405
406 match scheduled_on {
407 Some(on) => {
408 queue
409 .solana_token_swap_request_queue
410 .schedule(job, on)
411 .await?;
412 }
413 None => {
414 queue.solana_token_swap_request_queue.push(job).await?;
415 }
416 }
417
418 Ok(())
419 }
420 }
421
422 #[tokio::test]
423 async fn test_job_producer_operations() {
424 let producer = TestJobProducer::new();
425
426 let request = TransactionRequest::new("tx123", "relayer-1");
428 let result = producer
429 .produce_transaction_request_job(request, None)
430 .await;
431 assert!(result.is_ok());
432
433 let queue = producer.get_queue().await;
434 assert!(queue.transaction_request_queue.push_called);
435
436 let producer = TestJobProducer::new();
438 let request = TransactionRequest::new("tx123", "relayer-1");
439 let result = producer
440 .produce_transaction_request_job(request, Some(1000))
441 .await;
442 assert!(result.is_ok());
443
444 let queue = producer.get_queue().await;
445 assert!(queue.transaction_request_queue.schedule_called);
446 }
447
448 #[tokio::test]
449 async fn test_submit_transaction_job() {
450 let producer = TestJobProducer::new();
451
452 let submit_job = TransactionSend::submit("tx123", "relayer-1");
454 let result = producer
455 .produce_submit_transaction_job(submit_job, None)
456 .await;
457 assert!(result.is_ok());
458
459 let queue = producer.get_queue().await;
460 assert!(queue.transaction_submission_queue.push_called);
461 }
462
463 #[tokio::test]
464 async fn test_check_status_job() {
465 let producer = TestJobProducer::new();
466
467 let status_job = TransactionStatusCheck::new("tx123", "relayer-1");
469 let result = producer
470 .produce_check_transaction_status_job(status_job, None)
471 .await;
472 assert!(result.is_ok());
473
474 let queue = producer.get_queue().await;
475 assert!(queue.transaction_status_queue.push_called);
476 }
477
478 #[tokio::test]
479 async fn test_notification_job() {
480 let producer = TestJobProducer::new();
481
482 let notification = WebhookNotification::new(
484 "test_event".to_string(),
485 WebhookPayload::Transaction(TransactionResponse::Evm(EvmTransactionResponse {
486 id: "tx123".to_string(),
487 hash: Some("0x123".to_string()),
488 status: TransactionStatus::Confirmed,
489 status_reason: None,
490 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
491 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
492 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
493 gas_price: Some(1000000000),
494 gas_limit: 21000,
495 nonce: Some(1),
496 value: U256::from(1000000000000000000_u64),
497 from: "0xabc".to_string(),
498 to: Some("0xdef".to_string()),
499 relayer_id: "relayer-1".to_string(),
500 })),
501 );
502 let job = NotificationSend::new("notification-1".to_string(), notification);
503
504 let result = producer.produce_send_notification_job(job, None).await;
505 assert!(result.is_ok());
506
507 let queue = producer.get_queue().await;
508 assert!(queue.notification_queue.push_called);
509 }
510
511 #[test]
512 fn test_job_producer_error_conversion() {
513 let job_error = JobProducerError::QueueError("Test error".to_string());
515 let relayer_error: RelayerError = job_error.into();
516
517 match relayer_error {
518 RelayerError::QueueError(msg) => {
519 assert_eq!(msg, "Queue error");
520 }
521 _ => panic!("Unexpected error type"),
522 }
523 }
524}