openzeppelin_relayer/domain/transaction/stellar/
stellar_transaction.rs

1/// This module defines the `StellarRelayerTransaction` struct and its associated
2/// functionality for handling Stellar transactions.
3/// It includes methods for preparing, submitting, handling status, and
4/// managing notifications for transactions. The module leverages various
5/// services and repositories to perform these operations asynchronously.
6use crate::{
7    domain::transaction::Transaction,
8    jobs::{JobProducer, JobProducerTrait, TransactionRequest},
9    models::{
10        produce_transaction_update_notification_payload, NetworkTransactionRequest,
11        RelayerRepoModel, TransactionError, TransactionRepoModel, TransactionStatus,
12        TransactionUpdateRequest,
13    },
14    repositories::{
15        InMemoryRelayerRepository, InMemoryTransactionCounter, InMemoryTransactionRepository,
16        RelayerRepositoryStorage, Repository, TransactionCounterTrait, TransactionRepository,
17    },
18    services::{Signer, StellarProvider, StellarProviderTrait, StellarSigner},
19};
20use async_trait::async_trait;
21use eyre::Result;
22use log::info;
23use std::sync::Arc;
24
25use super::lane_gate;
26
27#[allow(dead_code)]
28pub struct StellarRelayerTransaction<R, T, J, S, P, C>
29where
30    R: Repository<RelayerRepoModel, String>,
31    T: TransactionRepository,
32    J: JobProducerTrait,
33    S: Signer,
34    P: StellarProviderTrait,
35    C: TransactionCounterTrait,
36{
37    relayer: RelayerRepoModel,
38    relayer_repository: Arc<R>,
39    transaction_repository: Arc<T>,
40    job_producer: Arc<J>,
41    signer: Arc<S>,
42    provider: P,
43    transaction_counter_service: Arc<C>,
44}
45
46#[allow(dead_code)]
47impl<R, T, J, S, P, C> StellarRelayerTransaction<R, T, J, S, P, C>
48where
49    R: Repository<RelayerRepoModel, String>,
50    T: TransactionRepository,
51    J: JobProducerTrait,
52    S: Signer,
53    P: StellarProviderTrait,
54    C: TransactionCounterTrait,
55{
56    /// Creates a new `StellarRelayerTransaction`.
57    ///
58    /// # Arguments
59    ///
60    /// * `relayer` - The relayer model.
61    /// * `relayer_repository` - Storage for relayer repository.
62    /// * `transaction_repository` - Storage for transaction repository.
63    /// * `job_producer` - Producer for job queue.
64    /// * `signer` - The Stellar signer.
65    /// * `provider` - The Stellar provider.
66    /// * `transaction_counter_service` - Service for managing transaction counters.
67    ///
68    /// # Returns
69    ///
70    /// A result containing the new `StellarRelayerTransaction` or a `TransactionError`.
71    #[allow(clippy::too_many_arguments)]
72    pub fn new(
73        relayer: RelayerRepoModel,
74        relayer_repository: Arc<R>,
75        transaction_repository: Arc<T>,
76        job_producer: Arc<J>,
77        signer: Arc<S>,
78        provider: P,
79        transaction_counter_service: Arc<C>,
80    ) -> Result<Self, TransactionError> {
81        Ok(Self {
82            relayer,
83            relayer_repository,
84            transaction_repository,
85            job_producer,
86            signer,
87            provider,
88            transaction_counter_service,
89        })
90    }
91
92    pub fn provider(&self) -> &P {
93        &self.provider
94    }
95
96    pub fn relayer(&self) -> &RelayerRepoModel {
97        &self.relayer
98    }
99
100    pub fn job_producer(&self) -> &J {
101        &self.job_producer
102    }
103
104    pub fn transaction_repository(&self) -> &T {
105        &self.transaction_repository
106    }
107
108    pub fn signer(&self) -> &S {
109        &self.signer
110    }
111
112    pub fn transaction_counter_service(&self) -> &C {
113        &self.transaction_counter_service
114    }
115
116    /// Send a transaction-request job for the given transaction.
117    pub async fn send_transaction_request_job(
118        &self,
119        tx: &TransactionRepoModel,
120        delay_seconds: Option<i64>,
121    ) -> Result<(), TransactionError> {
122        let job = TransactionRequest::new(tx.id.clone(), tx.relayer_id.clone());
123        self.job_producer()
124            .produce_transaction_request_job(job, delay_seconds)
125            .await?;
126        Ok(())
127    }
128
129    /// Sends a transaction update notification if a notification ID is configured.
130    pub(super) async fn send_transaction_update_notification(
131        &self,
132        tx: &TransactionRepoModel,
133    ) -> Result<(), TransactionError> {
134        if let Some(notification_id) = &self.relayer().notification_id {
135            self.job_producer()
136                .produce_send_notification_job(
137                    produce_transaction_update_notification_payload(notification_id, tx),
138                    None,
139                )
140                .await
141                .map_err(|e| {
142                    TransactionError::UnexpectedError(format!("Failed to send notification: {}", e))
143                })?;
144        }
145        Ok(())
146    }
147
148    /// Helper function to update transaction status, save it, and send a notification.
149    pub async fn finalize_transaction_state(
150        &self,
151        tx_id: String,
152        new_status: TransactionStatus,
153        status_reason: Option<String>,
154        confirmed_at: Option<String>,
155    ) -> Result<TransactionRepoModel, TransactionError> {
156        let update_req = TransactionUpdateRequest {
157            status: Some(new_status),
158            status_reason,
159            confirmed_at,
160            ..Default::default()
161        };
162
163        let updated_tx = self
164            .transaction_repository()
165            .partial_update(tx_id, update_req)
166            .await?;
167
168        self.send_transaction_update_notification(&updated_tx)
169            .await?;
170        Ok(updated_tx)
171    }
172
173    pub async fn enqueue_next_pending_transaction(
174        &self,
175        finished_tx_id: &str,
176    ) -> Result<(), TransactionError> {
177        if let Some(next) = self
178            .find_oldest_pending_for_relayer(&self.relayer().id)
179            .await?
180        {
181            // Atomic hand-over while still owning the lane
182            info!("Handing over lane from {} to {}", finished_tx_id, next.id);
183            lane_gate::pass_to(&self.relayer().id, finished_tx_id, &next.id);
184            self.send_transaction_request_job(&next, None).await?;
185        } else {
186            info!("Releasing relayer lane after {}", finished_tx_id);
187            lane_gate::free(&self.relayer().id, finished_tx_id);
188        }
189        Ok(())
190    }
191
192    /// Finds the oldest pending transaction for a relayer.
193    async fn find_oldest_pending_for_relayer(
194        &self,
195        relayer_id: &str,
196    ) -> Result<Option<TransactionRepoModel>, TransactionError> {
197        let pending_txs = self
198            .transaction_repository()
199            .find_by_status(relayer_id, &[TransactionStatus::Pending])
200            .await
201            .map_err(TransactionError::from)?;
202
203        Ok(pending_txs.into_iter().next())
204    }
205}
206
207#[async_trait]
208impl<R, T, J, S, P, C> Transaction for StellarRelayerTransaction<R, T, J, S, P, C>
209where
210    R: Repository<RelayerRepoModel, String> + Send + Sync,
211    T: TransactionRepository + Send + Sync,
212    J: JobProducerTrait + Send + Sync,
213    S: Signer + Send + Sync,
214    P: StellarProviderTrait + Send + Sync,
215    C: TransactionCounterTrait + Send + Sync,
216{
217    async fn prepare_transaction(
218        &self,
219        tx: TransactionRepoModel,
220    ) -> Result<TransactionRepoModel, TransactionError> {
221        self.prepare_transaction_impl(tx).await
222    }
223
224    async fn submit_transaction(
225        &self,
226        tx: TransactionRepoModel,
227    ) -> Result<TransactionRepoModel, TransactionError> {
228        self.submit_transaction_impl(tx).await
229    }
230
231    async fn resubmit_transaction(
232        &self,
233        tx: TransactionRepoModel,
234    ) -> Result<TransactionRepoModel, TransactionError> {
235        Ok(tx)
236    }
237
238    async fn handle_transaction_status(
239        &self,
240        tx: TransactionRepoModel,
241    ) -> Result<TransactionRepoModel, TransactionError> {
242        self.handle_transaction_status_impl(tx).await
243    }
244
245    async fn cancel_transaction(
246        &self,
247        tx: TransactionRepoModel,
248    ) -> Result<TransactionRepoModel, TransactionError> {
249        Ok(tx)
250    }
251
252    async fn replace_transaction(
253        &self,
254        _old_tx: TransactionRepoModel,
255        _new_tx_request: NetworkTransactionRequest,
256    ) -> Result<TransactionRepoModel, TransactionError> {
257        Ok(_old_tx)
258    }
259
260    async fn sign_transaction(
261        &self,
262        tx: TransactionRepoModel,
263    ) -> Result<TransactionRepoModel, TransactionError> {
264        Ok(tx)
265    }
266
267    async fn validate_transaction(
268        &self,
269        _tx: TransactionRepoModel,
270    ) -> Result<bool, TransactionError> {
271        Ok(true)
272    }
273}
274
275pub type DefaultStellarTransaction = StellarRelayerTransaction<
276    RelayerRepositoryStorage<InMemoryRelayerRepository>,
277    InMemoryTransactionRepository,
278    JobProducer,
279    StellarSigner,
280    StellarProvider,
281    InMemoryTransactionCounter,
282>;
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287    use crate::models::RepositoryError;
288    use std::sync::Arc;
289
290    use crate::domain::transaction::stellar::test_helpers::*;
291
292    #[test]
293    fn new_returns_ok() {
294        let relayer = create_test_relayer();
295        let mocks = default_test_mocks();
296        let result = StellarRelayerTransaction::new(
297            relayer,
298            Arc::new(mocks.relayer_repo),
299            Arc::new(mocks.tx_repo),
300            Arc::new(mocks.job_producer),
301            Arc::new(mocks.signer),
302            mocks.provider,
303            Arc::new(mocks.counter),
304        );
305        assert!(result.is_ok());
306    }
307
308    #[test]
309    fn accessor_methods_return_correct_references() {
310        let relayer = create_test_relayer();
311        let mocks = default_test_mocks();
312        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
313
314        // Test all accessor methods
315        assert_eq!(handler.relayer().id, "relayer-1");
316        assert_eq!(handler.relayer().address, TEST_PK);
317
318        // These should not panic and return valid references
319        let _ = handler.provider();
320        let _ = handler.job_producer();
321        let _ = handler.transaction_repository();
322        let _ = handler.signer();
323        let _ = handler.transaction_counter_service();
324    }
325
326    #[tokio::test]
327    async fn send_transaction_request_job_success() {
328        let relayer = create_test_relayer();
329        let mut mocks = default_test_mocks();
330
331        mocks
332            .job_producer
333            .expect_produce_transaction_request_job()
334            .withf(|job, delay| {
335                job.transaction_id == "tx-1" && job.relayer_id == "relayer-1" && delay.is_none()
336            })
337            .times(1)
338            .returning(|_, _| Box::pin(async { Ok(()) }));
339
340        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
341        let tx = create_test_transaction(&relayer.id);
342
343        let result = handler.send_transaction_request_job(&tx, None).await;
344        assert!(result.is_ok());
345    }
346
347    #[tokio::test]
348    async fn send_transaction_request_job_with_delay() {
349        let relayer = create_test_relayer();
350        let mut mocks = default_test_mocks();
351
352        mocks
353            .job_producer
354            .expect_produce_transaction_request_job()
355            .withf(|job, delay| {
356                job.transaction_id == "tx-1" && job.relayer_id == "relayer-1" && delay == &Some(60)
357            })
358            .times(1)
359            .returning(|_, _| Box::pin(async { Ok(()) }));
360
361        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
362        let tx = create_test_transaction(&relayer.id);
363
364        let result = handler.send_transaction_request_job(&tx, Some(60)).await;
365        assert!(result.is_ok());
366    }
367
368    #[tokio::test]
369    async fn finalize_transaction_state_success() {
370        let relayer = create_test_relayer();
371        let mut mocks = default_test_mocks();
372
373        // Mock repository update
374        mocks
375            .tx_repo
376            .expect_partial_update()
377            .withf(|tx_id, update| {
378                tx_id == "tx-1"
379                    && update.status == Some(TransactionStatus::Confirmed)
380                    && update.status_reason == Some("Transaction confirmed".to_string())
381            })
382            .times(1)
383            .returning(|tx_id, update| {
384                let mut tx = create_test_transaction("relayer-1");
385                tx.id = tx_id;
386                tx.status = update.status.unwrap();
387                tx.status_reason = update.status_reason;
388                tx.confirmed_at = update.confirmed_at;
389                Ok::<_, RepositoryError>(tx)
390            });
391
392        // Mock notification
393        mocks
394            .job_producer
395            .expect_produce_send_notification_job()
396            .times(1)
397            .returning(|_, _| Box::pin(async { Ok(()) }));
398
399        let handler = make_stellar_tx_handler(relayer, mocks);
400
401        let result = handler
402            .finalize_transaction_state(
403                "tx-1".to_string(),
404                TransactionStatus::Confirmed,
405                Some("Transaction confirmed".to_string()),
406                Some("2023-01-01T00:00:00Z".to_string()),
407            )
408            .await;
409
410        assert!(result.is_ok());
411        let updated_tx = result.unwrap();
412        assert_eq!(updated_tx.status, TransactionStatus::Confirmed);
413        assert_eq!(
414            updated_tx.status_reason,
415            Some("Transaction confirmed".to_string())
416        );
417    }
418
419    #[tokio::test]
420    async fn enqueue_next_pending_transaction_with_pending_tx() {
421        let relayer = create_test_relayer();
422        let mut mocks = default_test_mocks();
423
424        // Mock finding a pending transaction
425        let mut pending_tx = create_test_transaction(&relayer.id);
426        pending_tx.id = "pending-tx-1".to_string();
427
428        mocks
429            .tx_repo
430            .expect_find_by_status()
431            .withf(|relayer_id, statuses| {
432                relayer_id == "relayer-1" && statuses == [TransactionStatus::Pending]
433            })
434            .times(1)
435            .returning(move |_, _| {
436                let mut tx = create_test_transaction("relayer-1");
437                tx.id = "pending-tx-1".to_string();
438                Ok(vec![tx])
439            });
440
441        // Mock job production for the next transaction
442        mocks
443            .job_producer
444            .expect_produce_transaction_request_job()
445            .withf(|job, delay| job.transaction_id == "pending-tx-1" && delay.is_none())
446            .times(1)
447            .returning(|_, _| Box::pin(async { Ok(()) }));
448
449        let handler = make_stellar_tx_handler(relayer, mocks);
450
451        let result = handler
452            .enqueue_next_pending_transaction("finished-tx")
453            .await;
454        assert!(result.is_ok());
455    }
456
457    #[tokio::test]
458    async fn enqueue_next_pending_transaction_no_pending_tx() {
459        let relayer = create_test_relayer();
460        let mut mocks = default_test_mocks();
461
462        // Mock finding no pending transactions
463        mocks
464            .tx_repo
465            .expect_find_by_status()
466            .times(1)
467            .returning(|_, _| Ok(vec![]));
468
469        let handler = make_stellar_tx_handler(relayer, mocks);
470
471        let result = handler
472            .enqueue_next_pending_transaction("finished-tx")
473            .await;
474        assert!(result.is_ok());
475    }
476}