1use crate::{
7 domain::transaction::Transaction,
8 jobs::{JobProducer, JobProducerTrait, TransactionRequest},
9 models::{
10 produce_transaction_update_notification_payload, NetworkTransactionRequest,
11 RelayerRepoModel, TransactionError, TransactionRepoModel, TransactionStatus,
12 TransactionUpdateRequest,
13 },
14 repositories::{
15 InMemoryRelayerRepository, InMemoryTransactionCounter, InMemoryTransactionRepository,
16 RelayerRepositoryStorage, Repository, TransactionCounterTrait, TransactionRepository,
17 },
18 services::{Signer, StellarProvider, StellarProviderTrait, StellarSigner},
19};
20use async_trait::async_trait;
21use eyre::Result;
22use log::info;
23use std::sync::Arc;
24
25use super::lane_gate;
26
27#[allow(dead_code)]
28pub struct StellarRelayerTransaction<R, T, J, S, P, C>
29where
30 R: Repository<RelayerRepoModel, String>,
31 T: TransactionRepository,
32 J: JobProducerTrait,
33 S: Signer,
34 P: StellarProviderTrait,
35 C: TransactionCounterTrait,
36{
37 relayer: RelayerRepoModel,
38 relayer_repository: Arc<R>,
39 transaction_repository: Arc<T>,
40 job_producer: Arc<J>,
41 signer: Arc<S>,
42 provider: P,
43 transaction_counter_service: Arc<C>,
44}
45
46#[allow(dead_code)]
47impl<R, T, J, S, P, C> StellarRelayerTransaction<R, T, J, S, P, C>
48where
49 R: Repository<RelayerRepoModel, String>,
50 T: TransactionRepository,
51 J: JobProducerTrait,
52 S: Signer,
53 P: StellarProviderTrait,
54 C: TransactionCounterTrait,
55{
56 #[allow(clippy::too_many_arguments)]
72 pub fn new(
73 relayer: RelayerRepoModel,
74 relayer_repository: Arc<R>,
75 transaction_repository: Arc<T>,
76 job_producer: Arc<J>,
77 signer: Arc<S>,
78 provider: P,
79 transaction_counter_service: Arc<C>,
80 ) -> Result<Self, TransactionError> {
81 Ok(Self {
82 relayer,
83 relayer_repository,
84 transaction_repository,
85 job_producer,
86 signer,
87 provider,
88 transaction_counter_service,
89 })
90 }
91
92 pub fn provider(&self) -> &P {
93 &self.provider
94 }
95
96 pub fn relayer(&self) -> &RelayerRepoModel {
97 &self.relayer
98 }
99
100 pub fn job_producer(&self) -> &J {
101 &self.job_producer
102 }
103
104 pub fn transaction_repository(&self) -> &T {
105 &self.transaction_repository
106 }
107
108 pub fn signer(&self) -> &S {
109 &self.signer
110 }
111
112 pub fn transaction_counter_service(&self) -> &C {
113 &self.transaction_counter_service
114 }
115
116 pub async fn send_transaction_request_job(
118 &self,
119 tx: &TransactionRepoModel,
120 delay_seconds: Option<i64>,
121 ) -> Result<(), TransactionError> {
122 let job = TransactionRequest::new(tx.id.clone(), tx.relayer_id.clone());
123 self.job_producer()
124 .produce_transaction_request_job(job, delay_seconds)
125 .await?;
126 Ok(())
127 }
128
129 pub(super) async fn send_transaction_update_notification(
131 &self,
132 tx: &TransactionRepoModel,
133 ) -> Result<(), TransactionError> {
134 if let Some(notification_id) = &self.relayer().notification_id {
135 self.job_producer()
136 .produce_send_notification_job(
137 produce_transaction_update_notification_payload(notification_id, tx),
138 None,
139 )
140 .await
141 .map_err(|e| {
142 TransactionError::UnexpectedError(format!("Failed to send notification: {}", e))
143 })?;
144 }
145 Ok(())
146 }
147
148 pub async fn finalize_transaction_state(
150 &self,
151 tx_id: String,
152 new_status: TransactionStatus,
153 status_reason: Option<String>,
154 confirmed_at: Option<String>,
155 ) -> Result<TransactionRepoModel, TransactionError> {
156 let update_req = TransactionUpdateRequest {
157 status: Some(new_status),
158 status_reason,
159 confirmed_at,
160 ..Default::default()
161 };
162
163 let updated_tx = self
164 .transaction_repository()
165 .partial_update(tx_id, update_req)
166 .await?;
167
168 self.send_transaction_update_notification(&updated_tx)
169 .await?;
170 Ok(updated_tx)
171 }
172
173 pub async fn enqueue_next_pending_transaction(
174 &self,
175 finished_tx_id: &str,
176 ) -> Result<(), TransactionError> {
177 if let Some(next) = self
178 .find_oldest_pending_for_relayer(&self.relayer().id)
179 .await?
180 {
181 info!("Handing over lane from {} to {}", finished_tx_id, next.id);
183 lane_gate::pass_to(&self.relayer().id, finished_tx_id, &next.id);
184 self.send_transaction_request_job(&next, None).await?;
185 } else {
186 info!("Releasing relayer lane after {}", finished_tx_id);
187 lane_gate::free(&self.relayer().id, finished_tx_id);
188 }
189 Ok(())
190 }
191
192 async fn find_oldest_pending_for_relayer(
194 &self,
195 relayer_id: &str,
196 ) -> Result<Option<TransactionRepoModel>, TransactionError> {
197 let pending_txs = self
198 .transaction_repository()
199 .find_by_status(relayer_id, &[TransactionStatus::Pending])
200 .await
201 .map_err(TransactionError::from)?;
202
203 Ok(pending_txs.into_iter().next())
204 }
205}
206
207#[async_trait]
208impl<R, T, J, S, P, C> Transaction for StellarRelayerTransaction<R, T, J, S, P, C>
209where
210 R: Repository<RelayerRepoModel, String> + Send + Sync,
211 T: TransactionRepository + Send + Sync,
212 J: JobProducerTrait + Send + Sync,
213 S: Signer + Send + Sync,
214 P: StellarProviderTrait + Send + Sync,
215 C: TransactionCounterTrait + Send + Sync,
216{
217 async fn prepare_transaction(
218 &self,
219 tx: TransactionRepoModel,
220 ) -> Result<TransactionRepoModel, TransactionError> {
221 self.prepare_transaction_impl(tx).await
222 }
223
224 async fn submit_transaction(
225 &self,
226 tx: TransactionRepoModel,
227 ) -> Result<TransactionRepoModel, TransactionError> {
228 self.submit_transaction_impl(tx).await
229 }
230
231 async fn resubmit_transaction(
232 &self,
233 tx: TransactionRepoModel,
234 ) -> Result<TransactionRepoModel, TransactionError> {
235 Ok(tx)
236 }
237
238 async fn handle_transaction_status(
239 &self,
240 tx: TransactionRepoModel,
241 ) -> Result<TransactionRepoModel, TransactionError> {
242 self.handle_transaction_status_impl(tx).await
243 }
244
245 async fn cancel_transaction(
246 &self,
247 tx: TransactionRepoModel,
248 ) -> Result<TransactionRepoModel, TransactionError> {
249 Ok(tx)
250 }
251
252 async fn replace_transaction(
253 &self,
254 _old_tx: TransactionRepoModel,
255 _new_tx_request: NetworkTransactionRequest,
256 ) -> Result<TransactionRepoModel, TransactionError> {
257 Ok(_old_tx)
258 }
259
260 async fn sign_transaction(
261 &self,
262 tx: TransactionRepoModel,
263 ) -> Result<TransactionRepoModel, TransactionError> {
264 Ok(tx)
265 }
266
267 async fn validate_transaction(
268 &self,
269 _tx: TransactionRepoModel,
270 ) -> Result<bool, TransactionError> {
271 Ok(true)
272 }
273}
274
275pub type DefaultStellarTransaction = StellarRelayerTransaction<
276 RelayerRepositoryStorage<InMemoryRelayerRepository>,
277 InMemoryTransactionRepository,
278 JobProducer,
279 StellarSigner,
280 StellarProvider,
281 InMemoryTransactionCounter,
282>;
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287 use crate::models::RepositoryError;
288 use std::sync::Arc;
289
290 use crate::domain::transaction::stellar::test_helpers::*;
291
292 #[test]
293 fn new_returns_ok() {
294 let relayer = create_test_relayer();
295 let mocks = default_test_mocks();
296 let result = StellarRelayerTransaction::new(
297 relayer,
298 Arc::new(mocks.relayer_repo),
299 Arc::new(mocks.tx_repo),
300 Arc::new(mocks.job_producer),
301 Arc::new(mocks.signer),
302 mocks.provider,
303 Arc::new(mocks.counter),
304 );
305 assert!(result.is_ok());
306 }
307
308 #[test]
309 fn accessor_methods_return_correct_references() {
310 let relayer = create_test_relayer();
311 let mocks = default_test_mocks();
312 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
313
314 assert_eq!(handler.relayer().id, "relayer-1");
316 assert_eq!(handler.relayer().address, TEST_PK);
317
318 let _ = handler.provider();
320 let _ = handler.job_producer();
321 let _ = handler.transaction_repository();
322 let _ = handler.signer();
323 let _ = handler.transaction_counter_service();
324 }
325
326 #[tokio::test]
327 async fn send_transaction_request_job_success() {
328 let relayer = create_test_relayer();
329 let mut mocks = default_test_mocks();
330
331 mocks
332 .job_producer
333 .expect_produce_transaction_request_job()
334 .withf(|job, delay| {
335 job.transaction_id == "tx-1" && job.relayer_id == "relayer-1" && delay.is_none()
336 })
337 .times(1)
338 .returning(|_, _| Box::pin(async { Ok(()) }));
339
340 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
341 let tx = create_test_transaction(&relayer.id);
342
343 let result = handler.send_transaction_request_job(&tx, None).await;
344 assert!(result.is_ok());
345 }
346
347 #[tokio::test]
348 async fn send_transaction_request_job_with_delay() {
349 let relayer = create_test_relayer();
350 let mut mocks = default_test_mocks();
351
352 mocks
353 .job_producer
354 .expect_produce_transaction_request_job()
355 .withf(|job, delay| {
356 job.transaction_id == "tx-1" && job.relayer_id == "relayer-1" && delay == &Some(60)
357 })
358 .times(1)
359 .returning(|_, _| Box::pin(async { Ok(()) }));
360
361 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
362 let tx = create_test_transaction(&relayer.id);
363
364 let result = handler.send_transaction_request_job(&tx, Some(60)).await;
365 assert!(result.is_ok());
366 }
367
368 #[tokio::test]
369 async fn finalize_transaction_state_success() {
370 let relayer = create_test_relayer();
371 let mut mocks = default_test_mocks();
372
373 mocks
375 .tx_repo
376 .expect_partial_update()
377 .withf(|tx_id, update| {
378 tx_id == "tx-1"
379 && update.status == Some(TransactionStatus::Confirmed)
380 && update.status_reason == Some("Transaction confirmed".to_string())
381 })
382 .times(1)
383 .returning(|tx_id, update| {
384 let mut tx = create_test_transaction("relayer-1");
385 tx.id = tx_id;
386 tx.status = update.status.unwrap();
387 tx.status_reason = update.status_reason;
388 tx.confirmed_at = update.confirmed_at;
389 Ok::<_, RepositoryError>(tx)
390 });
391
392 mocks
394 .job_producer
395 .expect_produce_send_notification_job()
396 .times(1)
397 .returning(|_, _| Box::pin(async { Ok(()) }));
398
399 let handler = make_stellar_tx_handler(relayer, mocks);
400
401 let result = handler
402 .finalize_transaction_state(
403 "tx-1".to_string(),
404 TransactionStatus::Confirmed,
405 Some("Transaction confirmed".to_string()),
406 Some("2023-01-01T00:00:00Z".to_string()),
407 )
408 .await;
409
410 assert!(result.is_ok());
411 let updated_tx = result.unwrap();
412 assert_eq!(updated_tx.status, TransactionStatus::Confirmed);
413 assert_eq!(
414 updated_tx.status_reason,
415 Some("Transaction confirmed".to_string())
416 );
417 }
418
419 #[tokio::test]
420 async fn enqueue_next_pending_transaction_with_pending_tx() {
421 let relayer = create_test_relayer();
422 let mut mocks = default_test_mocks();
423
424 let mut pending_tx = create_test_transaction(&relayer.id);
426 pending_tx.id = "pending-tx-1".to_string();
427
428 mocks
429 .tx_repo
430 .expect_find_by_status()
431 .withf(|relayer_id, statuses| {
432 relayer_id == "relayer-1" && statuses == [TransactionStatus::Pending]
433 })
434 .times(1)
435 .returning(move |_, _| {
436 let mut tx = create_test_transaction("relayer-1");
437 tx.id = "pending-tx-1".to_string();
438 Ok(vec![tx])
439 });
440
441 mocks
443 .job_producer
444 .expect_produce_transaction_request_job()
445 .withf(|job, delay| job.transaction_id == "pending-tx-1" && delay.is_none())
446 .times(1)
447 .returning(|_, _| Box::pin(async { Ok(()) }));
448
449 let handler = make_stellar_tx_handler(relayer, mocks);
450
451 let result = handler
452 .enqueue_next_pending_transaction("finished-tx")
453 .await;
454 assert!(result.is_ok());
455 }
456
457 #[tokio::test]
458 async fn enqueue_next_pending_transaction_no_pending_tx() {
459 let relayer = create_test_relayer();
460 let mut mocks = default_test_mocks();
461
462 mocks
464 .tx_repo
465 .expect_find_by_status()
466 .times(1)
467 .returning(|_, _| Ok(vec![]));
468
469 let handler = make_stellar_tx_handler(relayer, mocks);
470
471 let result = handler
472 .enqueue_next_pending_transaction("finished-tx")
473 .await;
474 assert!(result.is_ok());
475 }
476}