openzeppelin_relayer/domain/transaction/stellar/prepare/
mod.rs

1//! This module contains the preparation-related functionality for Stellar transactions.
2//! It includes methods for preparing transactions with robust error handling,
3//! ensuring lanes are always properly cleaned up on failure.
4
5// Declare submodules from the prepare/ directory
6pub mod common;
7pub mod fee_bump;
8pub mod operations;
9pub mod unsigned_xdr;
10
11use eyre::Result;
12use log::{info, warn};
13
14use super::{lane_gate, StellarRelayerTransaction};
15use crate::models::RelayerRepoModel;
16use crate::{
17    jobs::JobProducerTrait,
18    models::{TransactionError, TransactionInput, TransactionRepoModel, TransactionStatus},
19    repositories::{Repository, TransactionCounterTrait, TransactionRepository},
20    services::{Signer, StellarProviderTrait},
21};
22
23use common::{sign_and_finalize_transaction, update_and_notify_transaction};
24
25impl<R, T, J, S, P, C> StellarRelayerTransaction<R, T, J, S, P, C>
26where
27    R: Repository<RelayerRepoModel, String> + Send + Sync,
28    T: TransactionRepository + Send + Sync,
29    J: JobProducerTrait + Send + Sync,
30    S: Signer + Send + Sync,
31    P: StellarProviderTrait + Send + Sync,
32    C: TransactionCounterTrait + Send + Sync,
33{
34    /// Main preparation method with robust error handling and guaranteed lane cleanup.
35    pub async fn prepare_transaction_impl(
36        &self,
37        tx: TransactionRepoModel,
38    ) -> Result<TransactionRepoModel, TransactionError> {
39        if !lane_gate::claim(&self.relayer().id, &tx.id) {
40            info!(
41                "Relayer {} already has a transaction in flight – {} must wait.",
42                self.relayer().id,
43                tx.id
44            );
45            return Ok(tx);
46        }
47
48        info!("Preparing transaction: {:?}", tx.id);
49
50        // Call core preparation logic with error handling
51        match self.prepare_core(tx.clone()).await {
52            Ok(prepared_tx) => Ok(prepared_tx),
53            Err(error) => {
54                // Always cleanup on failure - this is the critical safety mechanism
55                self.handle_prepare_failure(tx, error).await
56            }
57        }
58    }
59
60    /// Core preparation logic
61    async fn prepare_core(
62        &self,
63        tx: TransactionRepoModel,
64    ) -> Result<TransactionRepoModel, TransactionError> {
65        let stellar_data = tx.network_data.get_stellar_transaction_data()?;
66
67        // Simple dispatch to appropriate processing function based on input type
68        match &stellar_data.transaction_input {
69            TransactionInput::Operations(_) => {
70                info!("Preparing operations-based transaction {}", tx.id);
71                let stellar_data_with_sim = operations::process_operations(
72                    self.transaction_counter_service(),
73                    &self.relayer().id,
74                    &self.relayer().address,
75                    &tx,
76                    stellar_data,
77                    self.provider(),
78                    self.signer(),
79                )
80                .await?;
81                self.finalize_with_signature(tx, stellar_data_with_sim)
82                    .await
83            }
84            TransactionInput::UnsignedXdr(_) => {
85                info!("Preparing unsigned XDR transaction {}", tx.id);
86                let stellar_data_with_sim = unsigned_xdr::process_unsigned_xdr(
87                    self.transaction_counter_service(),
88                    &self.relayer().id,
89                    &self.relayer().address,
90                    stellar_data,
91                    self.provider(),
92                    self.signer(),
93                )
94                .await?;
95                self.finalize_with_signature(tx, stellar_data_with_sim)
96                    .await
97            }
98            TransactionInput::SignedXdr { .. } => {
99                info!("Preparing fee-bump transaction {}", tx.id);
100                let stellar_data_with_fee_bump = fee_bump::process_fee_bump(
101                    &self.relayer().address,
102                    stellar_data,
103                    self.provider(),
104                    self.signer(),
105                )
106                .await?;
107                update_and_notify_transaction(
108                    self.transaction_repository(),
109                    self.job_producer(),
110                    tx.id,
111                    stellar_data_with_fee_bump,
112                    self.relayer().notification_id.as_deref(),
113                )
114                .await
115            }
116        }
117    }
118
119    /// Helper to sign and finalize transactions for Operations and UnsignedXdr inputs.
120    async fn finalize_with_signature(
121        &self,
122        tx: TransactionRepoModel,
123        stellar_data: crate::models::StellarTransactionData,
124    ) -> Result<TransactionRepoModel, TransactionError> {
125        let (tx, final_stellar_data) =
126            sign_and_finalize_transaction(self.signer(), tx, stellar_data).await?;
127        update_and_notify_transaction(
128            self.transaction_repository(),
129            self.job_producer(),
130            tx.id,
131            final_stellar_data,
132            self.relayer().notification_id.as_deref(),
133        )
134        .await
135    }
136
137    /// Handles preparation failures with comprehensive cleanup and error reporting.
138    /// This method ensures lanes are never left claimed after any failure.
139    async fn handle_prepare_failure(
140        &self,
141        tx: TransactionRepoModel,
142        error: TransactionError,
143    ) -> Result<TransactionRepoModel, TransactionError> {
144        let error_reason = format!("Preparation failed: {}", error);
145        let tx_id = tx.id.clone(); // Clone the ID before moving tx
146        warn!("Transaction {} preparation failed: {}", tx_id, error_reason);
147
148        // Step 1: Mark transaction as Failed with detailed reason
149        let _failed_tx = match self
150            .finalize_transaction_state(
151                tx_id.clone(),
152                TransactionStatus::Failed,
153                Some(error_reason.clone()),
154                None,
155            )
156            .await
157        {
158            Ok(updated_tx) => updated_tx,
159            Err(finalize_error) => {
160                warn!(
161                    "Failed to mark transaction {} as failed: {}. Proceeding with lane cleanup.",
162                    tx_id, finalize_error
163                );
164                // Continue with cleanup even if we can't update the transaction
165                tx
166            }
167        };
168
169        // Step 2: Attempt to enqueue next pending transaction or release lane
170        if let Err(enqueue_error) = self.enqueue_next_pending_transaction(&tx_id).await {
171            warn!(
172                "Failed to enqueue next pending transaction after {} failure: {}. Releasing lane directly.",
173                tx_id, enqueue_error
174            );
175            // Fallback: release lane directly if we can't hand it over
176            lane_gate::free(&self.relayer().id, &tx_id);
177        }
178
179        // Step 3: Log failure for monitoring (prepare_fail_total metric would go here)
180        info!(
181            "Transaction {} preparation failure handled. Lane cleaned up. Error: {}",
182            tx_id, error_reason
183        );
184
185        // Step 4: Return original error to maintain API compatibility
186        Err(error)
187    }
188}
189
190#[cfg(test)]
191mod prepare_transaction_tests {
192    use super::*;
193    use crate::{
194        domain::SignTransactionResponse,
195        models::{NetworkTransactionData, RepositoryError, TransactionStatus},
196    };
197    use soroban_rs::xdr::{Limits, ReadXdr, TransactionEnvelope};
198
199    use crate::domain::transaction::stellar::test_helpers::*;
200
201    #[tokio::test]
202    async fn prepare_transaction_happy_path() {
203        let relayer = create_test_relayer();
204        let mut mocks = default_test_mocks();
205
206        // sequence counter
207        mocks
208            .counter
209            .expect_get_and_increment()
210            .returning(|_, _| Ok(1));
211
212        // signer
213        mocks.signer.expect_sign_transaction().returning(|_| {
214            Box::pin(async {
215                Ok(SignTransactionResponse::Stellar(
216                    crate::domain::SignTransactionResponseStellar {
217                        signature: dummy_signature(),
218                    },
219                ))
220            })
221        });
222
223        mocks
224            .tx_repo
225            .expect_partial_update()
226            .withf(|_, upd| {
227                upd.status == Some(TransactionStatus::Sent) && upd.network_data.is_some()
228            })
229            .returning(|id, upd| {
230                let mut tx = create_test_transaction("relayer-1");
231                tx.id = id;
232                tx.status = upd.status.unwrap();
233                tx.network_data = upd.network_data.unwrap();
234                Ok::<_, RepositoryError>(tx)
235            });
236
237        // submit-job + notification
238        mocks
239            .job_producer
240            .expect_produce_submit_transaction_job()
241            .times(1)
242            .returning(|_, _| Box::pin(async { Ok(()) }));
243
244        mocks
245            .job_producer
246            .expect_produce_send_notification_job()
247            .times(1)
248            .returning(|_, _| Box::pin(async { Ok(()) }));
249
250        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
251        let tx = create_test_transaction(&relayer.id);
252
253        assert!(handler.prepare_transaction_impl(tx).await.is_ok());
254    }
255
256    #[tokio::test]
257    async fn prepare_transaction_stores_signed_envelope_xdr() {
258        let relayer = create_test_relayer();
259        let mut mocks = default_test_mocks();
260
261        // sequence counter
262        mocks
263            .counter
264            .expect_get_and_increment()
265            .returning(|_, _| Ok(1));
266
267        // signer
268        mocks.signer.expect_sign_transaction().returning(|_| {
269            Box::pin(async {
270                Ok(SignTransactionResponse::Stellar(
271                    crate::domain::SignTransactionResponseStellar {
272                        signature: dummy_signature(),
273                    },
274                ))
275            })
276        });
277
278        mocks
279            .tx_repo
280            .expect_partial_update()
281            .withf(|_, upd| {
282                upd.status == Some(TransactionStatus::Sent) && upd.network_data.is_some()
283            })
284            .returning(move |id, upd| {
285                let mut tx = create_test_transaction("relayer-1");
286                tx.id = id;
287                tx.status = upd.status.unwrap();
288                tx.network_data = upd.network_data.clone().unwrap();
289                Ok::<_, RepositoryError>(tx)
290            });
291
292        // submit-job + notification
293        mocks
294            .job_producer
295            .expect_produce_submit_transaction_job()
296            .times(1)
297            .returning(|_, _| Box::pin(async { Ok(()) }));
298
299        mocks
300            .job_producer
301            .expect_produce_send_notification_job()
302            .times(1)
303            .returning(|_, _| Box::pin(async { Ok(()) }));
304
305        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
306        let tx = create_test_transaction(&relayer.id);
307
308        let result = handler.prepare_transaction_impl(tx).await;
309        assert!(result.is_ok());
310
311        // Verify the signed_envelope_xdr was populated
312        if let Ok(prepared_tx) = result {
313            if let NetworkTransactionData::Stellar(stellar_data) = &prepared_tx.network_data {
314                assert!(
315                    stellar_data.signed_envelope_xdr.is_some(),
316                    "signed_envelope_xdr should be populated"
317                );
318
319                // Verify it's valid XDR by attempting to parse it
320                let xdr = stellar_data.signed_envelope_xdr.as_ref().unwrap();
321                let envelope_result = TransactionEnvelope::from_xdr_base64(xdr, Limits::none());
322                assert!(
323                    envelope_result.is_ok(),
324                    "signed_envelope_xdr should be valid XDR"
325                );
326
327                // Verify the envelope has signatures
328                if let Ok(envelope) = envelope_result {
329                    match envelope {
330                        TransactionEnvelope::Tx(ref e) => {
331                            assert!(!e.signatures.is_empty(), "Envelope should have signatures");
332                        }
333                        _ => panic!("Expected Tx envelope type"),
334                    }
335                }
336            } else {
337                panic!("Expected Stellar transaction data");
338            }
339        }
340    }
341
342    #[tokio::test]
343    async fn prepare_transaction_sequence_failure_cleans_up_lane() {
344        let relayer = create_test_relayer();
345        let mut mocks = default_test_mocks();
346
347        // Mock sequence counter to fail
348        mocks.counter.expect_get_and_increment().returning(|_, _| {
349            Err(crate::repositories::TransactionCounterError::NotFound(
350                "Counter service failure".to_string(),
351            ))
352        });
353
354        // Mock finalize_transaction_state for failure handling
355        mocks
356            .tx_repo
357            .expect_partial_update()
358            .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
359            .returning(|id, upd| {
360                let mut tx = create_test_transaction("relayer-1");
361                tx.id = id;
362                tx.status = upd.status.unwrap();
363                Ok::<_, RepositoryError>(tx)
364            });
365
366        // Mock notification for failed transaction
367        mocks
368            .job_producer
369            .expect_produce_send_notification_job()
370            .times(1)
371            .returning(|_, _| Box::pin(async { Ok(()) }));
372
373        // Mock find_by_status for enqueue_next_pending_transaction
374        mocks
375            .tx_repo
376            .expect_find_by_status()
377            .returning(|_, _| Ok(vec![])); // No pending transactions
378
379        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
380        let tx = create_test_transaction(&relayer.id);
381
382        // Verify that lane is claimed initially
383        assert!(lane_gate::claim(&relayer.id, &tx.id));
384
385        let result = handler.prepare_transaction_impl(tx.clone()).await;
386
387        // Should return error but lane should be cleaned up
388        assert!(result.is_err());
389
390        // Verify lane is released - another transaction should be able to claim it
391        let another_tx_id = "another-tx";
392        assert!(lane_gate::claim(&relayer.id, another_tx_id));
393        lane_gate::free(&relayer.id, another_tx_id)
394    }
395
396    #[tokio::test]
397    async fn prepare_transaction_signer_failure_cleans_up_lane() {
398        let relayer = create_test_relayer();
399        let mut mocks = default_test_mocks();
400
401        // sequence counter succeeds
402        mocks
403            .counter
404            .expect_get_and_increment()
405            .returning(|_, _| Ok(1));
406
407        // signer fails
408        mocks.signer.expect_sign_transaction().returning(|_| {
409            Box::pin(async {
410                Err(crate::models::SignerError::SigningError(
411                    "Signer failure".to_string(),
412                ))
413            })
414        });
415
416        // Mock finalize_transaction_state for failure handling
417        mocks
418            .tx_repo
419            .expect_partial_update()
420            .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
421            .returning(|id, upd| {
422                let mut tx = create_test_transaction("relayer-1");
423                tx.id = id;
424                tx.status = upd.status.unwrap();
425                Ok::<_, RepositoryError>(tx)
426            });
427
428        // Mock notification for failed transaction
429        mocks
430            .job_producer
431            .expect_produce_send_notification_job()
432            .times(1)
433            .returning(|_, _| Box::pin(async { Ok(()) }));
434
435        // Mock find_by_status for enqueue_next_pending_transaction
436        mocks
437            .tx_repo
438            .expect_find_by_status()
439            .returning(|_, _| Ok(vec![])); // No pending transactions
440
441        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
442        let tx = create_test_transaction(&relayer.id);
443
444        let result = handler.prepare_transaction_impl(tx.clone()).await;
445
446        // Should return error but lane should be cleaned up
447        assert!(result.is_err());
448
449        // Verify lane is released
450        let another_tx_id = "another-tx";
451        assert!(lane_gate::claim(&relayer.id, another_tx_id));
452        lane_gate::free(&relayer.id, another_tx_id); // cleanup
453    }
454
455    #[tokio::test]
456    async fn prepare_transaction_already_claimed_lane_returns_original() {
457        let mut relayer = create_test_relayer();
458        relayer.id = "unique-relayer-for-lane-test".to_string(); // Use unique relayer ID
459        let mocks = default_test_mocks();
460
461        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
462        let tx = create_test_transaction(&relayer.id);
463
464        // Claim lane with different transaction
465        assert!(lane_gate::claim(&relayer.id, "other-tx"));
466
467        let result = handler.prepare_transaction_impl(tx.clone()).await;
468
469        // Should return Ok with original transaction (waiting)
470        assert!(result.is_ok());
471        let returned_tx = result.unwrap();
472        assert_eq!(returned_tx.id, tx.id);
473        assert_eq!(returned_tx.status, tx.status);
474
475        // Cleanup
476        lane_gate::free(&relayer.id, "other-tx");
477    }
478}
479
480#[cfg(test)]
481mod refactoring_tests {
482    use crate::domain::transaction::stellar::prepare::common::update_and_notify_transaction;
483    use crate::domain::transaction::stellar::test_helpers::*;
484    use crate::models::{
485        NetworkTransactionData, RepositoryError, StellarTransactionData, TransactionInput,
486        TransactionStatus,
487    };
488
489    #[tokio::test]
490    async fn test_update_and_notify_transaction_consistency() {
491        let relayer = create_test_relayer();
492        let mut mocks = default_test_mocks();
493
494        // Mock the repository update
495        let expected_stellar_data = StellarTransactionData {
496            source_account: TEST_PK.to_string(),
497            network_passphrase: "Test SDF Network ; September 2015".to_string(),
498            fee: Some(100),
499            sequence_number: Some(1),
500            transaction_input: TransactionInput::Operations(vec![]),
501            memo: None,
502            valid_until: None,
503            signatures: vec![],
504            hash: None,
505            simulation_transaction_data: None,
506            signed_envelope_xdr: Some("test-xdr".to_string()),
507        };
508
509        let expected_xdr = expected_stellar_data.signed_envelope_xdr.clone();
510        mocks
511            .tx_repo
512            .expect_partial_update()
513            .withf(move |id, upd| {
514                id == "tx-1"
515                    && upd.status == Some(TransactionStatus::Sent)
516                    && if let Some(NetworkTransactionData::Stellar(ref data)) = upd.network_data {
517                        data.signed_envelope_xdr == expected_xdr
518                    } else {
519                        false
520                    }
521            })
522            .returning(|id, upd| {
523                let mut tx = create_test_transaction("relayer-1");
524                tx.id = id;
525                tx.status = upd.status.unwrap();
526                tx.network_data = upd.network_data.unwrap();
527                Ok::<_, RepositoryError>(tx)
528            });
529
530        // Mock job production
531        mocks
532            .job_producer
533            .expect_produce_submit_transaction_job()
534            .times(1)
535            .returning(|_, _| Box::pin(async { Ok(()) }));
536
537        mocks
538            .job_producer
539            .expect_produce_send_notification_job()
540            .times(1)
541            .returning(|_, _| Box::pin(async { Ok(()) }));
542
543        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
544
545        // Test update_and_notify_transaction directly
546        let result = update_and_notify_transaction(
547            handler.transaction_repository(),
548            handler.job_producer(),
549            "tx-1".to_string(),
550            expected_stellar_data,
551            handler.relayer().notification_id.as_deref(),
552        )
553        .await;
554
555        assert!(result.is_ok());
556        let updated_tx = result.unwrap();
557        assert_eq!(updated_tx.status, TransactionStatus::Sent);
558
559        if let NetworkTransactionData::Stellar(data) = &updated_tx.network_data {
560            assert_eq!(data.signed_envelope_xdr, Some("test-xdr".to_string()));
561        } else {
562            panic!("Expected Stellar transaction data");
563        }
564    }
565}