openzeppelin_relayer/domain/transaction/stellar/
submit.rs1use chrono::Utc;
6use log::{info, warn};
7
8use super::StellarRelayerTransaction;
9use crate::{
10 jobs::{JobProducerTrait, TransactionStatusCheck},
11 models::{
12 NetworkTransactionData, RelayerRepoModel, TransactionError, TransactionRepoModel,
13 TransactionStatus, TransactionUpdateRequest,
14 },
15 repositories::{Repository, TransactionCounterTrait, TransactionRepository},
16 services::{Signer, StellarProviderTrait},
17};
18
19impl<R, T, J, S, P, C> StellarRelayerTransaction<R, T, J, S, P, C>
20where
21 R: Repository<RelayerRepoModel, String> + Send + Sync,
22 T: TransactionRepository + Send + Sync,
23 J: JobProducerTrait + Send + Sync,
24 S: Signer + Send + Sync,
25 P: StellarProviderTrait + Send + Sync,
26 C: TransactionCounterTrait + Send + Sync,
27{
28 pub async fn submit_transaction_impl(
31 &self,
32 tx: TransactionRepoModel,
33 ) -> Result<TransactionRepoModel, TransactionError> {
34 info!("Submitting Stellar transaction: {:?}", tx.id);
35
36 match self.submit_core(tx.clone()).await {
38 Ok(submitted_tx) => Ok(submitted_tx),
39 Err(error) => {
40 self.handle_submit_failure(tx, error).await
42 }
43 }
44 }
45
46 async fn submit_core(
48 &self,
49 tx: TransactionRepoModel,
50 ) -> Result<TransactionRepoModel, TransactionError> {
51 let stellar_data = tx.network_data.get_stellar_transaction_data()?;
52 let tx_envelope = stellar_data
53 .get_envelope_for_submission()
54 .map_err(TransactionError::from)?;
55
56 let hash = self
57 .provider()
58 .send_transaction(&tx_envelope)
59 .await
60 .map_err(TransactionError::from)?;
61
62 let tx_hash_hex = hex::encode(hash.as_slice());
63 let updated_stellar_data = stellar_data.with_hash(tx_hash_hex.clone());
64
65 let mut hashes = tx.hashes.clone();
66 hashes.push(tx_hash_hex);
67
68 let update_req = TransactionUpdateRequest {
69 status: Some(TransactionStatus::Submitted),
70 sent_at: Some(Utc::now().to_rfc3339()),
71 network_data: Some(NetworkTransactionData::Stellar(updated_stellar_data)),
72 hashes: Some(hashes),
73 ..Default::default()
74 };
75
76 let updated_tx = self
77 .transaction_repository()
78 .partial_update(tx.id.clone(), update_req)
79 .await?;
80
81 self.job_producer()
83 .produce_check_transaction_status_job(
84 TransactionStatusCheck::new(updated_tx.id.clone(), updated_tx.relayer_id.clone()),
85 None,
86 )
87 .await?;
88
89 self.send_transaction_update_notification(&updated_tx)
91 .await?;
92
93 Ok(updated_tx)
94 }
95
96 async fn handle_submit_failure(
99 &self,
100 tx: TransactionRepoModel,
101 error: TransactionError,
102 ) -> Result<TransactionRepoModel, TransactionError> {
103 let error_reason = format!("Submission failed: {}", error);
104 let tx_id = tx.id.clone(); warn!("Transaction {} submission failed: {}", tx_id, error_reason);
106
107 let _failed_tx = match self
109 .finalize_transaction_state(
110 tx_id.clone(),
111 TransactionStatus::Failed,
112 Some(error_reason.clone()),
113 None,
114 )
115 .await
116 {
117 Ok(updated_tx) => updated_tx,
118 Err(finalize_error) => {
119 warn!(
120 "Failed to mark transaction {} as failed: {}. Continuing with lane cleanup.",
121 tx_id, finalize_error
122 );
123 tx
125 }
126 };
127
128 if let Err(enqueue_error) = self.enqueue_next_pending_transaction(&tx_id).await {
130 warn!(
131 "Failed to enqueue next pending transaction after {} submission failure: {}. Lane may remain stuck.",
132 tx_id, enqueue_error
133 );
134 }
137
138 info!(
140 "Transaction {} submission failure handled. Next transaction enqueued. Error: {}",
141 tx_id, error_reason
142 );
143
144 Err(error)
145 }
146
147 pub async fn resubmit_transaction_impl(
149 &self,
150 tx: TransactionRepoModel,
151 ) -> Result<TransactionRepoModel, TransactionError> {
152 self.submit_transaction_impl(tx).await
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use soroban_rs::xdr::{Hash, WriteXdr};
160
161 use crate::domain::transaction::stellar::test_helpers::*;
162
163 mod submit_transaction_tests {
164 use crate::models::RepositoryError;
165
166 use super::*;
167
168 #[tokio::test]
169 async fn submit_transaction_happy_path() {
170 let relayer = create_test_relayer();
171 let mut mocks = default_test_mocks();
172
173 mocks
175 .provider
176 .expect_send_transaction()
177 .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
178
179 mocks
181 .tx_repo
182 .expect_partial_update()
183 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
184 .returning(|id, upd| {
185 let mut tx = create_test_transaction("relayer-1");
186 tx.id = id;
187 tx.status = upd.status.unwrap();
188 Ok::<_, RepositoryError>(tx)
189 });
190
191 mocks
193 .job_producer
194 .expect_produce_check_transaction_status_job()
195 .times(1)
196 .returning(|_, _| Box::pin(async { Ok(()) }));
197 mocks
198 .job_producer
199 .expect_produce_send_notification_job()
200 .times(1)
201 .returning(|_, _| Box::pin(async { Ok(()) }));
202
203 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
204
205 let mut tx = create_test_transaction(&relayer.id);
206 if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
207 d.signatures.push(dummy_signature());
208 }
209
210 let res = handler.submit_transaction_impl(tx).await.unwrap();
211 assert_eq!(res.status, TransactionStatus::Submitted);
212 }
213
214 #[tokio::test]
215 async fn submit_transaction_provider_error_marks_failed() {
216 let relayer = create_test_relayer();
217 let mut mocks = default_test_mocks();
218
219 mocks
221 .provider
222 .expect_send_transaction()
223 .returning(|_| Box::pin(async { Err(eyre::eyre!("Network error")) }));
224
225 mocks
227 .tx_repo
228 .expect_partial_update()
229 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
230 .returning(|id, upd| {
231 let mut tx = create_test_transaction("relayer-1");
232 tx.id = id;
233 tx.status = upd.status.unwrap();
234 Ok::<_, RepositoryError>(tx)
235 });
236
237 mocks
239 .job_producer
240 .expect_produce_send_notification_job()
241 .times(1)
242 .returning(|_, _| Box::pin(async { Ok(()) }));
243
244 mocks
246 .tx_repo
247 .expect_find_by_status()
248 .returning(|_, _| Ok(vec![])); let handler = make_stellar_tx_handler(relayer.clone(), mocks);
251 let mut tx = create_test_transaction(&relayer.id);
252 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
253 data.signatures.push(dummy_signature());
254 }
255
256 let res = handler.submit_transaction_impl(tx).await;
257
258 assert!(res.is_err());
260 matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
261 }
262
263 #[tokio::test]
264 async fn submit_transaction_repository_error_marks_failed() {
265 let relayer = create_test_relayer();
266 let mut mocks = default_test_mocks();
267
268 mocks
270 .provider
271 .expect_send_transaction()
272 .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
273
274 mocks
276 .tx_repo
277 .expect_partial_update()
278 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
279 .returning(|_, _| Err(RepositoryError::Unknown("Database error".to_string())));
280
281 mocks
283 .tx_repo
284 .expect_partial_update()
285 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
286 .returning(|id, upd| {
287 let mut tx = create_test_transaction("relayer-1");
288 tx.id = id;
289 tx.status = upd.status.unwrap();
290 Ok::<_, RepositoryError>(tx)
291 });
292
293 mocks
295 .job_producer
296 .expect_produce_send_notification_job()
297 .times(1)
298 .returning(|_, _| Box::pin(async { Ok(()) }));
299
300 mocks
302 .tx_repo
303 .expect_find_by_status()
304 .returning(|_, _| Ok(vec![])); let handler = make_stellar_tx_handler(relayer.clone(), mocks);
307 let mut tx = create_test_transaction(&relayer.id);
308 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
309 data.signatures.push(dummy_signature());
310 }
311
312 let res = handler.submit_transaction_impl(tx).await;
313
314 assert!(res.is_err());
316 }
317
318 #[tokio::test]
319 async fn submit_transaction_uses_signed_envelope_xdr() {
320 let relayer = create_test_relayer();
321 let mut mocks = default_test_mocks();
322
323 let mut tx = create_test_transaction(&relayer.id);
325 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
326 data.signatures.push(dummy_signature());
327 let envelope = data.get_envelope_for_submission().unwrap();
329 let xdr = envelope
330 .to_xdr_base64(soroban_rs::xdr::Limits::none())
331 .unwrap();
332 data.signed_envelope_xdr = Some(xdr);
333 }
334
335 mocks
337 .provider
338 .expect_send_transaction()
339 .returning(|_| Box::pin(async { Ok(Hash([2u8; 32])) }));
340
341 mocks
343 .tx_repo
344 .expect_partial_update()
345 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
346 .returning(|id, upd| {
347 let mut tx = create_test_transaction("relayer-1");
348 tx.id = id;
349 tx.status = upd.status.unwrap();
350 Ok::<_, RepositoryError>(tx)
351 });
352
353 mocks
355 .job_producer
356 .expect_produce_check_transaction_status_job()
357 .times(1)
358 .returning(|_, _| Box::pin(async { Ok(()) }));
359 mocks
360 .job_producer
361 .expect_produce_send_notification_job()
362 .times(1)
363 .returning(|_, _| Box::pin(async { Ok(()) }));
364
365 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
366 let res = handler.submit_transaction_impl(tx).await.unwrap();
367
368 assert_eq!(res.status, TransactionStatus::Submitted);
369 }
370
371 #[tokio::test]
372 async fn resubmit_transaction_delegates_to_submit() {
373 let relayer = create_test_relayer();
374 let mut mocks = default_test_mocks();
375
376 mocks
378 .provider
379 .expect_send_transaction()
380 .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
381
382 mocks
384 .tx_repo
385 .expect_partial_update()
386 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
387 .returning(|id, upd| {
388 let mut tx = create_test_transaction("relayer-1");
389 tx.id = id;
390 tx.status = upd.status.unwrap();
391 Ok::<_, RepositoryError>(tx)
392 });
393
394 mocks
396 .job_producer
397 .expect_produce_check_transaction_status_job()
398 .times(1)
399 .returning(|_, _| Box::pin(async { Ok(()) }));
400 mocks
401 .job_producer
402 .expect_produce_send_notification_job()
403 .times(1)
404 .returning(|_, _| Box::pin(async { Ok(()) }));
405
406 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
407
408 let mut tx = create_test_transaction(&relayer.id);
409 if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
410 d.signatures.push(dummy_signature());
411 }
412
413 let res = handler.resubmit_transaction_impl(tx).await.unwrap();
414 assert_eq!(res.status, TransactionStatus::Submitted);
415 }
416
417 #[tokio::test]
418 async fn submit_transaction_failure_enqueues_next_transaction() {
419 let relayer = create_test_relayer();
420 let mut mocks = default_test_mocks();
421
422 mocks
424 .provider
425 .expect_send_transaction()
426 .returning(|_| Box::pin(async { Err(eyre::eyre!("Network error")) }));
427
428 mocks
430 .tx_repo
431 .expect_partial_update()
432 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
433 .returning(|id, upd| {
434 let mut tx = create_test_transaction("relayer-1");
435 tx.id = id;
436 tx.status = upd.status.unwrap();
437 Ok::<_, RepositoryError>(tx)
438 });
439
440 mocks
442 .job_producer
443 .expect_produce_send_notification_job()
444 .times(1)
445 .returning(|_, _| Box::pin(async { Ok(()) }));
446
447 let mut pending_tx = create_test_transaction(&relayer.id);
449 pending_tx.id = "next-pending-tx".to_string();
450 pending_tx.status = TransactionStatus::Pending;
451 let captured_pending_tx = pending_tx.clone();
452 mocks
453 .tx_repo
454 .expect_find_by_status()
455 .with(
456 mockall::predicate::eq(relayer.id.clone()),
457 mockall::predicate::eq(vec![TransactionStatus::Pending]),
458 )
459 .times(1)
460 .returning(move |_, _| Ok(vec![captured_pending_tx.clone()]));
461
462 mocks
464 .job_producer
465 .expect_produce_transaction_request_job()
466 .withf(move |job, _delay| job.transaction_id == "next-pending-tx")
467 .times(1)
468 .returning(|_, _| Box::pin(async { Ok(()) }));
469
470 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
471 let mut tx = create_test_transaction(&relayer.id);
472 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
473 data.signatures.push(dummy_signature());
474 }
475
476 let res = handler.submit_transaction_impl(tx).await;
477
478 assert!(res.is_err());
480 matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
481 }
482 }
483}