1pub mod common;
7pub mod fee_bump;
8pub mod operations;
9pub mod unsigned_xdr;
10
11use eyre::Result;
12use log::{info, warn};
13
14use super::{lane_gate, StellarRelayerTransaction};
15use crate::models::RelayerRepoModel;
16use crate::{
17 jobs::JobProducerTrait,
18 models::{TransactionError, TransactionInput, TransactionRepoModel, TransactionStatus},
19 repositories::{Repository, TransactionCounterTrait, TransactionRepository},
20 services::{Signer, StellarProviderTrait},
21};
22
23use common::{sign_and_finalize_transaction, update_and_notify_transaction};
24
25impl<R, T, J, S, P, C> StellarRelayerTransaction<R, T, J, S, P, C>
26where
27 R: Repository<RelayerRepoModel, String> + Send + Sync,
28 T: TransactionRepository + Send + Sync,
29 J: JobProducerTrait + Send + Sync,
30 S: Signer + Send + Sync,
31 P: StellarProviderTrait + Send + Sync,
32 C: TransactionCounterTrait + Send + Sync,
33{
34 pub async fn prepare_transaction_impl(
36 &self,
37 tx: TransactionRepoModel,
38 ) -> Result<TransactionRepoModel, TransactionError> {
39 if !lane_gate::claim(&self.relayer().id, &tx.id) {
40 info!(
41 "Relayer {} already has a transaction in flight – {} must wait.",
42 self.relayer().id,
43 tx.id
44 );
45 return Ok(tx);
46 }
47
48 info!("Preparing transaction: {:?}", tx.id);
49
50 match self.prepare_core(tx.clone()).await {
52 Ok(prepared_tx) => Ok(prepared_tx),
53 Err(error) => {
54 self.handle_prepare_failure(tx, error).await
56 }
57 }
58 }
59
60 async fn prepare_core(
62 &self,
63 tx: TransactionRepoModel,
64 ) -> Result<TransactionRepoModel, TransactionError> {
65 let stellar_data = tx.network_data.get_stellar_transaction_data()?;
66
67 match &stellar_data.transaction_input {
69 TransactionInput::Operations(_) => {
70 info!("Preparing operations-based transaction {}", tx.id);
71 let stellar_data_with_sim = operations::process_operations(
72 self.transaction_counter_service(),
73 &self.relayer().id,
74 &self.relayer().address,
75 &tx,
76 stellar_data,
77 self.provider(),
78 self.signer(),
79 )
80 .await?;
81 self.finalize_with_signature(tx, stellar_data_with_sim)
82 .await
83 }
84 TransactionInput::UnsignedXdr(_) => {
85 info!("Preparing unsigned XDR transaction {}", tx.id);
86 let stellar_data_with_sim = unsigned_xdr::process_unsigned_xdr(
87 self.transaction_counter_service(),
88 &self.relayer().id,
89 &self.relayer().address,
90 stellar_data,
91 self.provider(),
92 self.signer(),
93 )
94 .await?;
95 self.finalize_with_signature(tx, stellar_data_with_sim)
96 .await
97 }
98 TransactionInput::SignedXdr { .. } => {
99 info!("Preparing fee-bump transaction {}", tx.id);
100 let stellar_data_with_fee_bump = fee_bump::process_fee_bump(
101 &self.relayer().address,
102 stellar_data,
103 self.provider(),
104 self.signer(),
105 )
106 .await?;
107 update_and_notify_transaction(
108 self.transaction_repository(),
109 self.job_producer(),
110 tx.id,
111 stellar_data_with_fee_bump,
112 self.relayer().notification_id.as_deref(),
113 )
114 .await
115 }
116 }
117 }
118
119 async fn finalize_with_signature(
121 &self,
122 tx: TransactionRepoModel,
123 stellar_data: crate::models::StellarTransactionData,
124 ) -> Result<TransactionRepoModel, TransactionError> {
125 let (tx, final_stellar_data) =
126 sign_and_finalize_transaction(self.signer(), tx, stellar_data).await?;
127 update_and_notify_transaction(
128 self.transaction_repository(),
129 self.job_producer(),
130 tx.id,
131 final_stellar_data,
132 self.relayer().notification_id.as_deref(),
133 )
134 .await
135 }
136
137 async fn handle_prepare_failure(
140 &self,
141 tx: TransactionRepoModel,
142 error: TransactionError,
143 ) -> Result<TransactionRepoModel, TransactionError> {
144 let error_reason = format!("Preparation failed: {}", error);
145 let tx_id = tx.id.clone(); warn!("Transaction {} preparation failed: {}", tx_id, error_reason);
147
148 let _failed_tx = match self
150 .finalize_transaction_state(
151 tx_id.clone(),
152 TransactionStatus::Failed,
153 Some(error_reason.clone()),
154 None,
155 )
156 .await
157 {
158 Ok(updated_tx) => updated_tx,
159 Err(finalize_error) => {
160 warn!(
161 "Failed to mark transaction {} as failed: {}. Proceeding with lane cleanup.",
162 tx_id, finalize_error
163 );
164 tx
166 }
167 };
168
169 if let Err(enqueue_error) = self.enqueue_next_pending_transaction(&tx_id).await {
171 warn!(
172 "Failed to enqueue next pending transaction after {} failure: {}. Releasing lane directly.",
173 tx_id, enqueue_error
174 );
175 lane_gate::free(&self.relayer().id, &tx_id);
177 }
178
179 info!(
181 "Transaction {} preparation failure handled. Lane cleaned up. Error: {}",
182 tx_id, error_reason
183 );
184
185 Err(error)
187 }
188}
189
190#[cfg(test)]
191mod prepare_transaction_tests {
192 use super::*;
193 use crate::{
194 domain::SignTransactionResponse,
195 models::{NetworkTransactionData, RepositoryError, TransactionStatus},
196 };
197 use soroban_rs::xdr::{Limits, ReadXdr, TransactionEnvelope};
198
199 use crate::domain::transaction::stellar::test_helpers::*;
200
201 #[tokio::test]
202 async fn prepare_transaction_happy_path() {
203 let relayer = create_test_relayer();
204 let mut mocks = default_test_mocks();
205
206 mocks
208 .counter
209 .expect_get_and_increment()
210 .returning(|_, _| Ok(1));
211
212 mocks.signer.expect_sign_transaction().returning(|_| {
214 Box::pin(async {
215 Ok(SignTransactionResponse::Stellar(
216 crate::domain::SignTransactionResponseStellar {
217 signature: dummy_signature(),
218 },
219 ))
220 })
221 });
222
223 mocks
224 .tx_repo
225 .expect_partial_update()
226 .withf(|_, upd| {
227 upd.status == Some(TransactionStatus::Sent) && upd.network_data.is_some()
228 })
229 .returning(|id, upd| {
230 let mut tx = create_test_transaction("relayer-1");
231 tx.id = id;
232 tx.status = upd.status.unwrap();
233 tx.network_data = upd.network_data.unwrap();
234 Ok::<_, RepositoryError>(tx)
235 });
236
237 mocks
239 .job_producer
240 .expect_produce_submit_transaction_job()
241 .times(1)
242 .returning(|_, _| Box::pin(async { Ok(()) }));
243
244 mocks
245 .job_producer
246 .expect_produce_send_notification_job()
247 .times(1)
248 .returning(|_, _| Box::pin(async { Ok(()) }));
249
250 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
251 let tx = create_test_transaction(&relayer.id);
252
253 assert!(handler.prepare_transaction_impl(tx).await.is_ok());
254 }
255
256 #[tokio::test]
257 async fn prepare_transaction_stores_signed_envelope_xdr() {
258 let relayer = create_test_relayer();
259 let mut mocks = default_test_mocks();
260
261 mocks
263 .counter
264 .expect_get_and_increment()
265 .returning(|_, _| Ok(1));
266
267 mocks.signer.expect_sign_transaction().returning(|_| {
269 Box::pin(async {
270 Ok(SignTransactionResponse::Stellar(
271 crate::domain::SignTransactionResponseStellar {
272 signature: dummy_signature(),
273 },
274 ))
275 })
276 });
277
278 mocks
279 .tx_repo
280 .expect_partial_update()
281 .withf(|_, upd| {
282 upd.status == Some(TransactionStatus::Sent) && upd.network_data.is_some()
283 })
284 .returning(move |id, upd| {
285 let mut tx = create_test_transaction("relayer-1");
286 tx.id = id;
287 tx.status = upd.status.unwrap();
288 tx.network_data = upd.network_data.clone().unwrap();
289 Ok::<_, RepositoryError>(tx)
290 });
291
292 mocks
294 .job_producer
295 .expect_produce_submit_transaction_job()
296 .times(1)
297 .returning(|_, _| Box::pin(async { Ok(()) }));
298
299 mocks
300 .job_producer
301 .expect_produce_send_notification_job()
302 .times(1)
303 .returning(|_, _| Box::pin(async { Ok(()) }));
304
305 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
306 let tx = create_test_transaction(&relayer.id);
307
308 let result = handler.prepare_transaction_impl(tx).await;
309 assert!(result.is_ok());
310
311 if let Ok(prepared_tx) = result {
313 if let NetworkTransactionData::Stellar(stellar_data) = &prepared_tx.network_data {
314 assert!(
315 stellar_data.signed_envelope_xdr.is_some(),
316 "signed_envelope_xdr should be populated"
317 );
318
319 let xdr = stellar_data.signed_envelope_xdr.as_ref().unwrap();
321 let envelope_result = TransactionEnvelope::from_xdr_base64(xdr, Limits::none());
322 assert!(
323 envelope_result.is_ok(),
324 "signed_envelope_xdr should be valid XDR"
325 );
326
327 if let Ok(envelope) = envelope_result {
329 match envelope {
330 TransactionEnvelope::Tx(ref e) => {
331 assert!(!e.signatures.is_empty(), "Envelope should have signatures");
332 }
333 _ => panic!("Expected Tx envelope type"),
334 }
335 }
336 } else {
337 panic!("Expected Stellar transaction data");
338 }
339 }
340 }
341
342 #[tokio::test]
343 async fn prepare_transaction_sequence_failure_cleans_up_lane() {
344 let relayer = create_test_relayer();
345 let mut mocks = default_test_mocks();
346
347 mocks.counter.expect_get_and_increment().returning(|_, _| {
349 Err(crate::repositories::TransactionCounterError::NotFound(
350 "Counter service failure".to_string(),
351 ))
352 });
353
354 mocks
356 .tx_repo
357 .expect_partial_update()
358 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
359 .returning(|id, upd| {
360 let mut tx = create_test_transaction("relayer-1");
361 tx.id = id;
362 tx.status = upd.status.unwrap();
363 Ok::<_, RepositoryError>(tx)
364 });
365
366 mocks
368 .job_producer
369 .expect_produce_send_notification_job()
370 .times(1)
371 .returning(|_, _| Box::pin(async { Ok(()) }));
372
373 mocks
375 .tx_repo
376 .expect_find_by_status()
377 .returning(|_, _| Ok(vec![])); let handler = make_stellar_tx_handler(relayer.clone(), mocks);
380 let tx = create_test_transaction(&relayer.id);
381
382 assert!(lane_gate::claim(&relayer.id, &tx.id));
384
385 let result = handler.prepare_transaction_impl(tx.clone()).await;
386
387 assert!(result.is_err());
389
390 let another_tx_id = "another-tx";
392 assert!(lane_gate::claim(&relayer.id, another_tx_id));
393 lane_gate::free(&relayer.id, another_tx_id)
394 }
395
396 #[tokio::test]
397 async fn prepare_transaction_signer_failure_cleans_up_lane() {
398 let relayer = create_test_relayer();
399 let mut mocks = default_test_mocks();
400
401 mocks
403 .counter
404 .expect_get_and_increment()
405 .returning(|_, _| Ok(1));
406
407 mocks.signer.expect_sign_transaction().returning(|_| {
409 Box::pin(async {
410 Err(crate::models::SignerError::SigningError(
411 "Signer failure".to_string(),
412 ))
413 })
414 });
415
416 mocks
418 .tx_repo
419 .expect_partial_update()
420 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
421 .returning(|id, upd| {
422 let mut tx = create_test_transaction("relayer-1");
423 tx.id = id;
424 tx.status = upd.status.unwrap();
425 Ok::<_, RepositoryError>(tx)
426 });
427
428 mocks
430 .job_producer
431 .expect_produce_send_notification_job()
432 .times(1)
433 .returning(|_, _| Box::pin(async { Ok(()) }));
434
435 mocks
437 .tx_repo
438 .expect_find_by_status()
439 .returning(|_, _| Ok(vec![])); let handler = make_stellar_tx_handler(relayer.clone(), mocks);
442 let tx = create_test_transaction(&relayer.id);
443
444 let result = handler.prepare_transaction_impl(tx.clone()).await;
445
446 assert!(result.is_err());
448
449 let another_tx_id = "another-tx";
451 assert!(lane_gate::claim(&relayer.id, another_tx_id));
452 lane_gate::free(&relayer.id, another_tx_id); }
454
455 #[tokio::test]
456 async fn prepare_transaction_already_claimed_lane_returns_original() {
457 let mut relayer = create_test_relayer();
458 relayer.id = "unique-relayer-for-lane-test".to_string(); let mocks = default_test_mocks();
460
461 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
462 let tx = create_test_transaction(&relayer.id);
463
464 assert!(lane_gate::claim(&relayer.id, "other-tx"));
466
467 let result = handler.prepare_transaction_impl(tx.clone()).await;
468
469 assert!(result.is_ok());
471 let returned_tx = result.unwrap();
472 assert_eq!(returned_tx.id, tx.id);
473 assert_eq!(returned_tx.status, tx.status);
474
475 lane_gate::free(&relayer.id, "other-tx");
477 }
478}
479
480#[cfg(test)]
481mod refactoring_tests {
482 use crate::domain::transaction::stellar::prepare::common::update_and_notify_transaction;
483 use crate::domain::transaction::stellar::test_helpers::*;
484 use crate::models::{
485 NetworkTransactionData, RepositoryError, StellarTransactionData, TransactionInput,
486 TransactionStatus,
487 };
488
489 #[tokio::test]
490 async fn test_update_and_notify_transaction_consistency() {
491 let relayer = create_test_relayer();
492 let mut mocks = default_test_mocks();
493
494 let expected_stellar_data = StellarTransactionData {
496 source_account: TEST_PK.to_string(),
497 network_passphrase: "Test SDF Network ; September 2015".to_string(),
498 fee: Some(100),
499 sequence_number: Some(1),
500 transaction_input: TransactionInput::Operations(vec![]),
501 memo: None,
502 valid_until: None,
503 signatures: vec![],
504 hash: None,
505 simulation_transaction_data: None,
506 signed_envelope_xdr: Some("test-xdr".to_string()),
507 };
508
509 let expected_xdr = expected_stellar_data.signed_envelope_xdr.clone();
510 mocks
511 .tx_repo
512 .expect_partial_update()
513 .withf(move |id, upd| {
514 id == "tx-1"
515 && upd.status == Some(TransactionStatus::Sent)
516 && if let Some(NetworkTransactionData::Stellar(ref data)) = upd.network_data {
517 data.signed_envelope_xdr == expected_xdr
518 } else {
519 false
520 }
521 })
522 .returning(|id, upd| {
523 let mut tx = create_test_transaction("relayer-1");
524 tx.id = id;
525 tx.status = upd.status.unwrap();
526 tx.network_data = upd.network_data.unwrap();
527 Ok::<_, RepositoryError>(tx)
528 });
529
530 mocks
532 .job_producer
533 .expect_produce_submit_transaction_job()
534 .times(1)
535 .returning(|_, _| Box::pin(async { Ok(()) }));
536
537 mocks
538 .job_producer
539 .expect_produce_send_notification_job()
540 .times(1)
541 .returning(|_, _| Box::pin(async { Ok(()) }));
542
543 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
544
545 let result = update_and_notify_transaction(
547 handler.transaction_repository(),
548 handler.job_producer(),
549 "tx-1".to_string(),
550 expected_stellar_data,
551 handler.relayer().notification_id.as_deref(),
552 )
553 .await;
554
555 assert!(result.is_ok());
556 let updated_tx = result.unwrap();
557 assert_eq!(updated_tx.status, TransactionStatus::Sent);
558
559 if let NetworkTransactionData::Stellar(data) = &updated_tx.network_data {
560 assert_eq!(data.signed_envelope_xdr, Some("test-xdr".to_string()));
561 } else {
562 panic!("Expected Stellar transaction data");
563 }
564 }
565}