openzeppelin_relayer/domain/transaction/stellar/
submit.rs

1//! This module contains the submission-related functionality for Stellar transactions.
2//! It includes methods for submitting transactions with robust error handling,
3//! ensuring proper transaction state management on failure.
4
5use chrono::Utc;
6use log::{info, warn};
7
8use super::StellarRelayerTransaction;
9use crate::{
10    jobs::{JobProducerTrait, TransactionStatusCheck},
11    models::{
12        NetworkTransactionData, RelayerRepoModel, TransactionError, TransactionRepoModel,
13        TransactionStatus, TransactionUpdateRequest,
14    },
15    repositories::{Repository, TransactionCounterTrait, TransactionRepository},
16    services::{Signer, StellarProviderTrait},
17};
18
19impl<R, T, J, S, P, C> StellarRelayerTransaction<R, T, J, S, P, C>
20where
21    R: Repository<RelayerRepoModel, String> + Send + Sync,
22    T: TransactionRepository + Send + Sync,
23    J: JobProducerTrait + Send + Sync,
24    S: Signer + Send + Sync,
25    P: StellarProviderTrait + Send + Sync,
26    C: TransactionCounterTrait + Send + Sync,
27{
28    /// Main submission method with robust error handling.
29    /// Unlike prepare, submit doesn't claim lanes but still needs proper error handling.
30    pub async fn submit_transaction_impl(
31        &self,
32        tx: TransactionRepoModel,
33    ) -> Result<TransactionRepoModel, TransactionError> {
34        info!("Submitting Stellar transaction: {:?}", tx.id);
35
36        // Call core submission logic with error handling
37        match self.submit_core(tx.clone()).await {
38            Ok(submitted_tx) => Ok(submitted_tx),
39            Err(error) => {
40                // Handle submission failure - mark as failed and send notification
41                self.handle_submit_failure(tx, error).await
42            }
43        }
44    }
45
46    /// Core submission logic - pure business logic without error handling concerns.
47    async fn submit_core(
48        &self,
49        tx: TransactionRepoModel,
50    ) -> Result<TransactionRepoModel, TransactionError> {
51        let stellar_data = tx.network_data.get_stellar_transaction_data()?;
52        let tx_envelope = stellar_data
53            .get_envelope_for_submission()
54            .map_err(TransactionError::from)?;
55
56        let hash = self
57            .provider()
58            .send_transaction(&tx_envelope)
59            .await
60            .map_err(TransactionError::from)?;
61
62        let tx_hash_hex = hex::encode(hash.as_slice());
63        let updated_stellar_data = stellar_data.with_hash(tx_hash_hex.clone());
64
65        let mut hashes = tx.hashes.clone();
66        hashes.push(tx_hash_hex);
67
68        let update_req = TransactionUpdateRequest {
69            status: Some(TransactionStatus::Submitted),
70            sent_at: Some(Utc::now().to_rfc3339()),
71            network_data: Some(NetworkTransactionData::Stellar(updated_stellar_data)),
72            hashes: Some(hashes),
73            ..Default::default()
74        };
75
76        let updated_tx = self
77            .transaction_repository()
78            .partial_update(tx.id.clone(), update_req)
79            .await?;
80
81        // Enqueue status check job
82        self.job_producer()
83            .produce_check_transaction_status_job(
84                TransactionStatusCheck::new(updated_tx.id.clone(), updated_tx.relayer_id.clone()),
85                None,
86            )
87            .await?;
88
89        // Send notification
90        self.send_transaction_update_notification(&updated_tx)
91            .await?;
92
93        Ok(updated_tx)
94    }
95
96    /// Handles submission failures with comprehensive cleanup and error reporting.
97    /// This method ensures failed transactions are properly marked and notifications sent.
98    async fn handle_submit_failure(
99        &self,
100        tx: TransactionRepoModel,
101        error: TransactionError,
102    ) -> Result<TransactionRepoModel, TransactionError> {
103        let error_reason = format!("Submission failed: {}", error);
104        let tx_id = tx.id.clone(); // Clone the ID before moving tx
105        warn!("Transaction {} submission failed: {}", tx_id, error_reason);
106
107        // Step 1: Mark transaction as Failed with detailed reason
108        let _failed_tx = match self
109            .finalize_transaction_state(
110                tx_id.clone(),
111                TransactionStatus::Failed,
112                Some(error_reason.clone()),
113                None,
114            )
115            .await
116        {
117            Ok(updated_tx) => updated_tx,
118            Err(finalize_error) => {
119                warn!(
120                    "Failed to mark transaction {} as failed: {}. Continuing with lane cleanup.",
121                    tx_id, finalize_error
122                );
123                // Continue with cleanup even if we can't update the transaction
124                tx
125            }
126        };
127
128        // Step 2: Attempt to enqueue next pending transaction or release lane
129        if let Err(enqueue_error) = self.enqueue_next_pending_transaction(&tx_id).await {
130            warn!(
131                "Failed to enqueue next pending transaction after {} submission failure: {}. Lane may remain stuck.",
132                tx_id, enqueue_error
133            );
134            // Note: We don't have direct lane ownership here since submit doesn't claim lanes,
135            // but the transaction still owns the lane from prepare phase
136        }
137
138        // Step 3: Log failure for monitoring (submit_fail_total metric would go here)
139        info!(
140            "Transaction {} submission failure handled. Next transaction enqueued. Error: {}",
141            tx_id, error_reason
142        );
143
144        Err(error)
145    }
146
147    /// Resubmit transaction - delegates to submit_transaction_impl
148    pub async fn resubmit_transaction_impl(
149        &self,
150        tx: TransactionRepoModel,
151    ) -> Result<TransactionRepoModel, TransactionError> {
152        self.submit_transaction_impl(tx).await
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use soroban_rs::xdr::{Hash, WriteXdr};
160
161    use crate::domain::transaction::stellar::test_helpers::*;
162
163    mod submit_transaction_tests {
164        use crate::models::RepositoryError;
165
166        use super::*;
167
168        #[tokio::test]
169        async fn submit_transaction_happy_path() {
170            let relayer = create_test_relayer();
171            let mut mocks = default_test_mocks();
172
173            // provider gives a hash
174            mocks
175                .provider
176                .expect_send_transaction()
177                .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
178
179            // expect partial update to Submitted
180            mocks
181                .tx_repo
182                .expect_partial_update()
183                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
184                .returning(|id, upd| {
185                    let mut tx = create_test_transaction("relayer-1");
186                    tx.id = id;
187                    tx.status = upd.status.unwrap();
188                    Ok::<_, RepositoryError>(tx)
189                });
190
191            // enqueue status-check & notification
192            mocks
193                .job_producer
194                .expect_produce_check_transaction_status_job()
195                .times(1)
196                .returning(|_, _| Box::pin(async { Ok(()) }));
197            mocks
198                .job_producer
199                .expect_produce_send_notification_job()
200                .times(1)
201                .returning(|_, _| Box::pin(async { Ok(()) }));
202
203            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
204
205            let mut tx = create_test_transaction(&relayer.id);
206            if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
207                d.signatures.push(dummy_signature());
208            }
209
210            let res = handler.submit_transaction_impl(tx).await.unwrap();
211            assert_eq!(res.status, TransactionStatus::Submitted);
212        }
213
214        #[tokio::test]
215        async fn submit_transaction_provider_error_marks_failed() {
216            let relayer = create_test_relayer();
217            let mut mocks = default_test_mocks();
218
219            // Provider fails
220            mocks
221                .provider
222                .expect_send_transaction()
223                .returning(|_| Box::pin(async { Err(eyre::eyre!("Network error")) }));
224
225            // Mock finalize_transaction_state for failure handling
226            mocks
227                .tx_repo
228                .expect_partial_update()
229                .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
230                .returning(|id, upd| {
231                    let mut tx = create_test_transaction("relayer-1");
232                    tx.id = id;
233                    tx.status = upd.status.unwrap();
234                    Ok::<_, RepositoryError>(tx)
235                });
236
237            // Mock notification for failed transaction
238            mocks
239                .job_producer
240                .expect_produce_send_notification_job()
241                .times(1)
242                .returning(|_, _| Box::pin(async { Ok(()) }));
243
244            // Mock find_by_status for enqueue_next_pending_transaction
245            mocks
246                .tx_repo
247                .expect_find_by_status()
248                .returning(|_, _| Ok(vec![])); // No pending transactions
249
250            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
251            let mut tx = create_test_transaction(&relayer.id);
252            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
253                data.signatures.push(dummy_signature());
254            }
255
256            let res = handler.submit_transaction_impl(tx).await;
257
258            // Should return error but transaction should be marked as failed
259            assert!(res.is_err());
260            matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
261        }
262
263        #[tokio::test]
264        async fn submit_transaction_repository_error_marks_failed() {
265            let relayer = create_test_relayer();
266            let mut mocks = default_test_mocks();
267
268            // Provider succeeds
269            mocks
270                .provider
271                .expect_send_transaction()
272                .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
273
274            // Repository fails on first update (submission)
275            mocks
276                .tx_repo
277                .expect_partial_update()
278                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
279                .returning(|_, _| Err(RepositoryError::Unknown("Database error".to_string())));
280
281            // Mock finalize_transaction_state for failure handling
282            mocks
283                .tx_repo
284                .expect_partial_update()
285                .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
286                .returning(|id, upd| {
287                    let mut tx = create_test_transaction("relayer-1");
288                    tx.id = id;
289                    tx.status = upd.status.unwrap();
290                    Ok::<_, RepositoryError>(tx)
291                });
292
293            // Mock notification for failed transaction
294            mocks
295                .job_producer
296                .expect_produce_send_notification_job()
297                .times(1)
298                .returning(|_, _| Box::pin(async { Ok(()) }));
299
300            // Mock find_by_status for enqueue_next_pending_transaction
301            mocks
302                .tx_repo
303                .expect_find_by_status()
304                .returning(|_, _| Ok(vec![])); // No pending transactions
305
306            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
307            let mut tx = create_test_transaction(&relayer.id);
308            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
309                data.signatures.push(dummy_signature());
310            }
311
312            let res = handler.submit_transaction_impl(tx).await;
313
314            // Should return error but transaction should be marked as failed
315            assert!(res.is_err());
316        }
317
318        #[tokio::test]
319        async fn submit_transaction_uses_signed_envelope_xdr() {
320            let relayer = create_test_relayer();
321            let mut mocks = default_test_mocks();
322
323            // Create a transaction with signed_envelope_xdr set
324            let mut tx = create_test_transaction(&relayer.id);
325            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
326                data.signatures.push(dummy_signature());
327                // Build and store the signed envelope XDR
328                let envelope = data.get_envelope_for_submission().unwrap();
329                let xdr = envelope
330                    .to_xdr_base64(soroban_rs::xdr::Limits::none())
331                    .unwrap();
332                data.signed_envelope_xdr = Some(xdr);
333            }
334
335            // Provider should receive the envelope decoded from signed_envelope_xdr
336            mocks
337                .provider
338                .expect_send_transaction()
339                .returning(|_| Box::pin(async { Ok(Hash([2u8; 32])) }));
340
341            // Update to Submitted
342            mocks
343                .tx_repo
344                .expect_partial_update()
345                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
346                .returning(|id, upd| {
347                    let mut tx = create_test_transaction("relayer-1");
348                    tx.id = id;
349                    tx.status = upd.status.unwrap();
350                    Ok::<_, RepositoryError>(tx)
351                });
352
353            // Job and notification expectations
354            mocks
355                .job_producer
356                .expect_produce_check_transaction_status_job()
357                .times(1)
358                .returning(|_, _| Box::pin(async { Ok(()) }));
359            mocks
360                .job_producer
361                .expect_produce_send_notification_job()
362                .times(1)
363                .returning(|_, _| Box::pin(async { Ok(()) }));
364
365            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
366            let res = handler.submit_transaction_impl(tx).await.unwrap();
367
368            assert_eq!(res.status, TransactionStatus::Submitted);
369        }
370
371        #[tokio::test]
372        async fn resubmit_transaction_delegates_to_submit() {
373            let relayer = create_test_relayer();
374            let mut mocks = default_test_mocks();
375
376            // provider gives a hash
377            mocks
378                .provider
379                .expect_send_transaction()
380                .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
381
382            // expect partial update to Submitted
383            mocks
384                .tx_repo
385                .expect_partial_update()
386                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
387                .returning(|id, upd| {
388                    let mut tx = create_test_transaction("relayer-1");
389                    tx.id = id;
390                    tx.status = upd.status.unwrap();
391                    Ok::<_, RepositoryError>(tx)
392                });
393
394            // enqueue status-check & notification
395            mocks
396                .job_producer
397                .expect_produce_check_transaction_status_job()
398                .times(1)
399                .returning(|_, _| Box::pin(async { Ok(()) }));
400            mocks
401                .job_producer
402                .expect_produce_send_notification_job()
403                .times(1)
404                .returning(|_, _| Box::pin(async { Ok(()) }));
405
406            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
407
408            let mut tx = create_test_transaction(&relayer.id);
409            if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
410                d.signatures.push(dummy_signature());
411            }
412
413            let res = handler.resubmit_transaction_impl(tx).await.unwrap();
414            assert_eq!(res.status, TransactionStatus::Submitted);
415        }
416
417        #[tokio::test]
418        async fn submit_transaction_failure_enqueues_next_transaction() {
419            let relayer = create_test_relayer();
420            let mut mocks = default_test_mocks();
421
422            // Provider fails
423            mocks
424                .provider
425                .expect_send_transaction()
426                .returning(|_| Box::pin(async { Err(eyre::eyre!("Network error")) }));
427
428            // Mock finalize_transaction_state for failure handling
429            mocks
430                .tx_repo
431                .expect_partial_update()
432                .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
433                .returning(|id, upd| {
434                    let mut tx = create_test_transaction("relayer-1");
435                    tx.id = id;
436                    tx.status = upd.status.unwrap();
437                    Ok::<_, RepositoryError>(tx)
438                });
439
440            // Mock notification for failed transaction
441            mocks
442                .job_producer
443                .expect_produce_send_notification_job()
444                .times(1)
445                .returning(|_, _| Box::pin(async { Ok(()) }));
446
447            // Mock find_by_status to return a pending transaction
448            let mut pending_tx = create_test_transaction(&relayer.id);
449            pending_tx.id = "next-pending-tx".to_string();
450            pending_tx.status = TransactionStatus::Pending;
451            let captured_pending_tx = pending_tx.clone();
452            mocks
453                .tx_repo
454                .expect_find_by_status()
455                .with(
456                    mockall::predicate::eq(relayer.id.clone()),
457                    mockall::predicate::eq(vec![TransactionStatus::Pending]),
458                )
459                .times(1)
460                .returning(move |_, _| Ok(vec![captured_pending_tx.clone()]));
461
462            // Mock produce_transaction_request_job for the next pending transaction
463            mocks
464                .job_producer
465                .expect_produce_transaction_request_job()
466                .withf(move |job, _delay| job.transaction_id == "next-pending-tx")
467                .times(1)
468                .returning(|_, _| Box::pin(async { Ok(()) }));
469
470            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
471            let mut tx = create_test_transaction(&relayer.id);
472            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
473                data.signatures.push(dummy_signature());
474            }
475
476            let res = handler.submit_transaction_impl(tx).await;
477
478            // Should return error but next transaction should be enqueued
479            assert!(res.is_err());
480            matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
481        }
482    }
483}