1use rand::Rng;
24use std::future::Future;
25use std::time::Duration;
26
27use super::rpc_selector::RpcSelector;
28use crate::config::ServerConfig;
29use crate::constants::RETRY_JITTER_PERCENT;
30
31pub fn calculate_retry_delay(attempt: u8, base_delay_ms: u64, max_delay_ms: u64) -> Duration {
41 if base_delay_ms == 0 || max_delay_ms == 0 {
42 return Duration::from_millis(0);
43 }
44
45 let exp_backoff = if attempt > 63 {
47 max_delay_ms
48 } else {
49 let multiplier = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
51 base_delay_ms.saturating_mul(multiplier)
52 };
53
54 let delay_ms = exp_backoff.min(max_delay_ms);
55
56 apply_jitter(delay_ms)
57}
58
59fn apply_jitter(delay_ms: u64) -> Duration {
71 if delay_ms == 0 {
72 return Duration::from_millis(0);
73 }
74
75 let jitter_range = (delay_ms as f64 * RETRY_JITTER_PERCENT).floor() as u64;
77
78 if jitter_range == 0 {
79 return Duration::from_millis(delay_ms);
80 }
81
82 let mut rng = rand::rng();
83 let jitter_value = rng.random_range(0..=jitter_range);
84
85 let final_delay = if rng.random_bool(0.5) {
86 delay_ms.saturating_add(jitter_value)
87 } else {
88 delay_ms.saturating_sub(jitter_value)
89 };
90
91 Duration::from_millis(final_delay)
92}
93
94#[derive(Debug)]
96enum InternalRetryError<E> {
97 NonRetriable(E),
98 RetriesExhausted(E),
99}
100
101#[derive(Debug, Clone)]
103pub struct RetryConfig {
104 pub max_retries: u8,
106 pub max_failovers: u8,
108 pub base_delay_ms: u64,
110 pub max_delay_ms: u64,
112}
113
114impl RetryConfig {
115 pub fn new(max_retries: u8, max_failovers: u8, base_delay_ms: u64, max_delay_ms: u64) -> Self {
127 if (base_delay_ms == 0) != (max_delay_ms == 0) {
129 panic!(
130 "Delay values must be consistent: both zero (no delays) or both non-zero. Got base_delay_ms={}, max_delay_ms={}",
131 base_delay_ms, max_delay_ms
132 );
133 }
134
135 if base_delay_ms > 0 && max_delay_ms > 0 && max_delay_ms < base_delay_ms {
137 panic!(
138 "max_delay_ms ({}) must be >= base_delay_ms ({}) when both are non-zero",
139 max_delay_ms, base_delay_ms
140 );
141 }
142
143 Self {
144 max_retries,
145 max_failovers,
146 base_delay_ms,
147 max_delay_ms,
148 }
149 }
150
151 pub fn from_env() -> Self {
153 let config = ServerConfig::from_env();
154 Self::new(
155 config.provider_max_retries,
156 config.provider_max_failovers,
157 config.provider_retry_base_delay_ms,
158 config.provider_retry_max_delay_ms,
159 )
160 }
161}
162
163pub async fn retry_rpc_call<P, T, E, F, Fut, I>(
193 selector: &RpcSelector,
194 operation_name: &str,
195 is_retriable_error: impl Fn(&E) -> bool,
196 should_mark_provider_failed: impl Fn(&E) -> bool,
197 provider_initializer: I,
198 operation: F,
199 config: Option<RetryConfig>,
200) -> Result<T, E>
201where
202 P: Clone,
203 E: std::fmt::Display + From<String>,
204 F: Fn(P) -> Fut,
205 Fut: Future<Output = Result<T, E>>,
206 I: Fn(&str) -> Result<P, E>,
207{
208 let config = config.unwrap_or_else(RetryConfig::from_env);
209 let total_providers = selector.provider_count();
210 let max_failovers = std::cmp::min(config.max_failovers as usize, total_providers - 1);
211 let mut failover_count = 0;
212 let mut total_attempts = 0;
213 let mut last_error = None;
214
215 log::debug!(
216 "Starting RPC call '{}' with max_retries={}, max_failovers={}, available_providers={}",
217 operation_name,
218 config.max_retries,
219 max_failovers,
220 total_providers
221 );
222
223 while failover_count <= max_failovers && selector.available_provider_count() > 0 {
224 let (provider, provider_url) =
226 match get_provider(selector, operation_name, &provider_initializer) {
227 Ok((provider, url)) => (provider, url),
228 Err(e) => {
229 last_error = Some(e);
230 failover_count += 1;
231
232 if failover_count > max_failovers || selector.available_provider_count() == 0 {
234 break;
235 }
236
237 selector.mark_current_as_failed();
239 continue;
240 }
241 };
242
243 log::debug!(
244 "Selected provider: {} for operation '{}'",
245 provider_url,
246 operation_name
247 );
248
249 match try_with_retries(
251 &provider,
252 &provider_url,
253 operation_name,
254 &operation,
255 &is_retriable_error,
256 &config,
257 &mut total_attempts,
258 )
259 .await
260 {
261 Ok(result) => {
262 log::debug!(
263 "RPC call '{}' succeeded with provider '{}' (total attempts: {})",
264 operation_name,
265 provider_url,
266 total_attempts
267 );
268 return Ok(result);
269 }
270 Err(internal_err) => {
271 match internal_err {
272 InternalRetryError::NonRetriable(original_err) => {
273 if should_mark_provider_failed(&original_err)
275 && selector.available_provider_count() > 1
276 {
277 log::warn!(
278 "Non-retriable error '{}' for provider '{}' on operation '{}' should mark provider as failed. Marking as failed and switching to next provider...",
279 original_err,
280 provider_url,
281 operation_name
282 );
283 selector.mark_current_as_failed();
284 }
285 return Err(original_err);
286 }
287 InternalRetryError::RetriesExhausted(original_err) => {
288 last_error = Some(original_err);
289
290 if selector.available_provider_count() > 1 {
293 log::warn!(
294 "All {} retry attempts failed for provider '{}' on operation '{}'. Error: {}. Marking as failed and switching to next provider (failover {}/{})...",
295 config.max_retries,
296 provider_url,
297 operation_name,
298 last_error.as_ref().unwrap(),
299 failover_count + 1,
300 max_failovers
301 );
302 selector.mark_current_as_failed();
303 failover_count += 1;
304 } else {
305 log::warn!(
306 "All {} retry attempts failed for provider '{}' on operation '{}'. Error: {}. This is the last available provider, not marking as failed.",
307 config.max_retries,
308 provider_url,
309 operation_name,
310 last_error.as_ref().unwrap()
311 );
312 break;
313 }
314 }
315 }
316 }
317 }
318 }
319
320 let error_message = match &last_error {
321 Some(e) => format!(
322 "RPC call '{}' failed after {} total attempts across {} providers: {}",
323 operation_name,
324 total_attempts,
325 failover_count,
326 e
327 ),
328 None => format!(
329 "RPC call '{}' failed after {} total attempts across {} providers with no error details",
330 operation_name,
331 total_attempts,
332 failover_count
333 )
334 };
335
336 log::error!("{}", error_message);
337
338 Err(last_error.unwrap_or_else(|| E::from(error_message)))
340}
341
342fn get_provider<P, E, I>(
344 selector: &RpcSelector,
345 operation_name: &str,
346 provider_initializer: &I,
347) -> Result<(P, String), E>
348where
349 E: std::fmt::Display + From<String>,
350 I: Fn(&str) -> Result<P, E>,
351{
352 let provider_url = selector
354 .get_client(|url| Ok::<_, eyre::Report>(url.to_string()))
355 .map_err(|e| {
356 let err_msg = format!("Failed to get provider URL for {}: {}", operation_name, e);
357 log::warn!("{}", err_msg);
358 E::from(err_msg)
359 })?;
360
361 let provider = provider_initializer(&provider_url).map_err(|e| {
363 log::warn!(
364 "Failed to initialize provider '{}' for operation '{}': {}",
365 provider_url,
366 operation_name,
367 e
368 );
369 e
370 })?;
371
372 Ok((provider, provider_url))
373}
374
375async fn try_with_retries<P, T, E, F, Fut>(
377 provider: &P,
378 provider_url: &str,
379 operation_name: &str,
380 operation: &F,
381 is_retriable_error: &impl Fn(&E) -> bool,
382 config: &RetryConfig,
383 total_attempts: &mut usize,
384) -> Result<T, InternalRetryError<E>>
385where
386 P: Clone,
387 E: std::fmt::Display + From<String>,
388 F: Fn(P) -> Fut,
389 Fut: Future<Output = Result<T, E>>,
390{
391 if config.max_retries <= 1 {
393 *total_attempts += 1;
394 return operation(provider.clone())
395 .await
396 .map_err(InternalRetryError::NonRetriable);
397 }
398
399 for current_attempt_idx in 0..config.max_retries {
400 *total_attempts += 1;
401
402 match operation(provider.clone()).await {
403 Ok(result) => {
404 log::debug!(
405 "RPC call '{}' succeeded with provider '{}' (attempt {}/{}, total attempts: {})",
406 operation_name,
407 provider_url,
408 current_attempt_idx + 1,
409 config.max_retries,
410 *total_attempts
411 );
412 return Ok(result);
413 }
414 Err(e) => {
415 let is_retriable = is_retriable_error(&e);
416 let is_last_attempt = current_attempt_idx + 1 >= config.max_retries;
417
418 log::warn!(
419 "RPC call '{}' failed with provider '{}' (attempt {}/{}): {} [{}]",
420 operation_name,
421 provider_url,
422 current_attempt_idx + 1,
423 config.max_retries,
424 e,
425 if is_retriable {
426 "retriable"
427 } else {
428 "non-retriable"
429 }
430 );
431
432 if !is_retriable {
433 return Err(InternalRetryError::NonRetriable(e));
434 }
435
436 if is_last_attempt {
437 log::warn!(
438 "All {} retries exhausted for RPC call '{}' with provider '{}'. Last error: {}",
439 config.max_retries, operation_name, provider_url, e
440 );
441 return Err(InternalRetryError::RetriesExhausted(e));
442 }
443
444 let delay = calculate_retry_delay(
446 current_attempt_idx + 1,
447 config.base_delay_ms,
448 config.max_delay_ms,
449 );
450
451 log::debug!(
452 "Retrying RPC call '{}' with provider '{}' after {:?} delay (attempt {}/{})",
453 operation_name,
454 provider_url,
455 delay,
456 current_attempt_idx + 2,
457 config.max_retries
458 );
459 tokio::time::sleep(delay).await;
460 }
461 }
462 }
463
464 unreachable!(
465 "Loop should have returned if max_retries > 1; max_retries=0 or 1 case is handled above."
466 );
467}
468
469#[cfg(test)]
470mod tests {
471 use super::*;
472 use crate::models::RpcConfig;
473 use lazy_static::lazy_static;
474 use std::cmp::Ordering;
475 use std::env;
476 use std::sync::atomic::{AtomicU8, Ordering as AtomicOrdering};
477 use std::sync::Arc;
478 use std::sync::Mutex;
479
480 lazy_static! {
482 static ref RETRY_TEST_ENV_MUTEX: Mutex<()> = Mutex::new(());
483 }
484
485 #[derive(Debug, Clone)]
487 struct TestError(String);
488
489 impl std::fmt::Display for TestError {
490 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
491 write!(f, "TestError: {}", self.0)
492 }
493 }
494
495 impl From<String> for TestError {
496 fn from(msg: String) -> Self {
497 TestError(msg)
498 }
499 }
500
501 struct EnvGuard {
503 keys: Vec<String>,
504 old_values: Vec<Option<String>>,
505 }
506
507 impl EnvGuard {
508 fn new() -> Self {
509 Self {
510 keys: Vec::new(),
511 old_values: Vec::new(),
512 }
513 }
514
515 fn set(&mut self, key: &str, value: &str) {
516 let old_value = env::var(key).ok();
517 self.keys.push(key.to_string());
518 self.old_values.push(old_value);
519 env::set_var(key, value);
520 }
521 }
522
523 impl Drop for EnvGuard {
524 fn drop(&mut self) {
525 for i in 0..self.keys.len() {
526 match &self.old_values[i] {
527 Some(value) => env::set_var(&self.keys[i], value),
528 None => env::remove_var(&self.keys[i]),
529 }
530 }
531 }
532 }
533
534 fn setup_test_env() -> EnvGuard {
536 let mut guard = EnvGuard::new();
537 guard.set("API_KEY", "fake-api-key-for-tests-01234567890123456789");
538 guard.set("PROVIDER_MAX_RETRIES", "2");
539 guard.set("PROVIDER_MAX_FAILOVERS", "1");
540 guard.set("PROVIDER_RETRY_BASE_DELAY_MS", "1");
541 guard.set("PROVIDER_RETRY_MAX_DELAY_MS", "5");
542 guard.set("REDIS_URL", "redis://localhost:6379");
543 guard.set(
544 "RELAYER_PRIVATE_KEY",
545 "0x1234567890123456789012345678901234567890123456789012345678901234",
546 );
547 guard
548 }
549
550 #[test]
551 fn test_calculate_retry_delay() {
552 let base_delay_ms = 10;
554 let max_delay_ms = 10000;
555
556 let expected_backoffs = [
557 10, 20, 40, 80, 160, 320, ];
564
565 for (i, expected) in expected_backoffs.iter().enumerate() {
566 let attempt = i as u8;
567 let delay = calculate_retry_delay(attempt, base_delay_ms, max_delay_ms);
568
569 let min_expected = (*expected as f64 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u128;
570 let max_expected = (*expected as f64 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128;
571
572 assert!(
573 (min_expected..=max_expected).contains(&delay.as_millis()),
574 "Delay {} outside expected range {}..={}",
575 delay.as_millis(),
576 min_expected,
577 max_expected
578 );
579 }
580
581 let base_delay_ms = 100;
583 let max_delay_ms = 1000;
584 let delay = calculate_retry_delay(4, base_delay_ms, max_delay_ms);
585 let min_expected = (max_delay_ms as f64 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u128;
586 let max_expected = (max_delay_ms as f64 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128;
587 assert!(
588 (min_expected..=max_expected).contains(&delay.as_millis()),
589 "Delay {} outside expected range {}..={}",
590 delay.as_millis(),
591 min_expected,
592 max_expected
593 );
594
595 assert_eq!(calculate_retry_delay(5, 0, 1000).as_millis(), 0);
597 assert_eq!(calculate_retry_delay(5, 100, 0).as_millis(), 0);
598 assert_eq!(calculate_retry_delay(5, 0, 0).as_millis(), 0);
599
600 let max_delay_ms = 10_000;
602 let delay = calculate_retry_delay(u8::MAX, 1, max_delay_ms);
603 assert!(
604 delay.as_millis()
605 <= (max_delay_ms as f64 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128
606 );
607 }
608
609 #[test]
610 fn test_apply_jitter() {
611 let base_delay = 1000;
612 let jittered = apply_jitter(base_delay);
613
614 let min_expected = (base_delay as f64 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u64;
615 let max_expected = (base_delay as f64 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u64;
616
617 assert!(
618 (min_expected as u128..=max_expected as u128).contains(&jittered.as_millis()),
619 "Jittered value {} outside expected range {}..={}",
620 jittered.as_millis(),
621 min_expected,
622 max_expected
623 );
624
625 assert_eq!(apply_jitter(0).as_millis(), 0);
627
628 for delay in 1..5 {
630 let jittered = apply_jitter(delay);
631 let jitter_range = (delay as f64 * RETRY_JITTER_PERCENT).floor() as u64;
632
633 if jitter_range == 0 {
634 assert_eq!(jittered.as_millis(), delay as u128);
635 } else {
636 let min_expected = delay.saturating_sub(jitter_range);
637 let max_expected = delay.saturating_add(jitter_range);
638 assert!(
639 (min_expected as u128..=max_expected as u128).contains(&jittered.as_millis()),
640 "Jittered value {} outside expected range {}..={}",
641 jittered.as_millis(),
642 min_expected,
643 max_expected
644 );
645 }
646 }
647
648 let base_delay = 10000;
649 let iterations = 200;
650 let mut additions = 0;
651 let mut subtractions = 0;
652
653 for _ in 0..iterations {
654 let jittered = apply_jitter(base_delay);
655 let j_millis = jittered.as_millis();
656 let b_delay = base_delay as u128;
657
658 match j_millis.cmp(&b_delay) {
659 Ordering::Greater => {
660 additions += 1;
661 }
662 Ordering::Less => {
663 subtractions += 1;
664 }
665 Ordering::Equal => {}
666 }
667 }
668
669 assert!(additions > 0, "No additions were observed");
670 assert!(subtractions > 0, "No subtractions were observed");
671 }
672
673 #[test]
674 fn test_retry_config() {
675 let config = RetryConfig::new(5, 2, 100, 5000);
676 assert_eq!(config.max_retries, 5);
677 assert_eq!(config.max_failovers, 2);
678 assert_eq!(config.base_delay_ms, 100);
679 assert_eq!(config.max_delay_ms, 5000);
680 }
681
682 #[test]
683 fn test_retry_config_from_env() {
684 let _lock = RETRY_TEST_ENV_MUTEX
685 .lock()
686 .unwrap_or_else(|e| e.into_inner());
687 let mut guard = setup_test_env();
688 guard.set("REDIS_URL", "redis://localhost:6379");
690 guard.set(
691 "RELAYER_PRIVATE_KEY",
692 "0x1234567890123456789012345678901234567890123456789012345678901234",
693 );
694
695 let config = RetryConfig::from_env();
696 assert_eq!(config.max_retries, 2);
697 assert_eq!(config.max_failovers, 1);
698 assert_eq!(config.base_delay_ms, 1);
699 assert_eq!(config.max_delay_ms, 5);
700 }
701
702 #[test]
703 fn test_calculate_retry_delay_edge_cases() {
704 let delay = calculate_retry_delay(0, 100, 1000);
706 let min_expected = (100.0 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u128;
707 let max_expected = (100.0 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128;
708 assert!(
709 (min_expected..=max_expected).contains(&delay.as_millis()),
710 "Delay {} outside expected range {}..={}",
711 delay.as_millis(),
712 min_expected,
713 max_expected
714 );
715
716 let delay = calculate_retry_delay(5, 100, 100);
718 let min_expected = (100.0 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u128;
719 let max_expected = (100.0 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128;
720 assert!(
721 (min_expected..=max_expected).contains(&delay.as_millis()),
722 "Delay {} outside expected range {}..={}",
723 delay.as_millis(),
724 min_expected,
725 max_expected
726 );
727
728 let delay = calculate_retry_delay(60, 1000, u64::MAX);
730 assert!(delay.as_millis() > 0);
731
732 let delay = calculate_retry_delay(1, 1, 1);
734 assert_eq!(delay.as_millis(), 1);
735 }
736
737 #[test]
738 fn test_retry_config_validation() {
739 let _config = RetryConfig::new(3, 1, 100, 1000);
741 let _config = RetryConfig::new(3, 1, 0, 0); let _config = RetryConfig::new(3, 1, 100, 100); let _config = RetryConfig::new(0, 0, 1, 1); let _config = RetryConfig::new(255, 255, 1, 1000); }
746
747 #[test]
748 #[should_panic(
749 expected = "max_delay_ms (50) must be >= base_delay_ms (100) when both are non-zero"
750 )]
751 fn test_retry_config_validation_panic_delay_ordering() {
752 let _config = RetryConfig::new(3, 1, 100, 50);
754 }
755
756 #[test]
757 #[should_panic(
758 expected = "Delay values must be consistent: both zero (no delays) or both non-zero"
759 )]
760 fn test_retry_config_validation_panic_inconsistent_delays_base_zero() {
761 let _config = RetryConfig::new(3, 1, 0, 1000);
763 }
764
765 #[test]
766 #[should_panic(
767 expected = "Delay values must be consistent: both zero (no delays) or both non-zero"
768 )]
769 fn test_retry_config_validation_panic_inconsistent_delays_max_zero() {
770 let _config = RetryConfig::new(3, 1, 100, 0);
772 }
773
774 #[test]
775 fn test_get_provider() {
776 let _guard = setup_test_env();
777
778 let configs = vec![
779 RpcConfig::new("http://localhost:8545".to_string()),
780 RpcConfig::new("http://localhost:8546".to_string()),
781 ];
782 let selector = RpcSelector::new(configs).expect("Failed to create selector");
783
784 let initializer =
785 |url: &str| -> Result<String, TestError> { Ok(format!("provider-{}", url)) };
786
787 let result = get_provider(&selector, "test_operation", &initializer);
788 assert!(result.is_ok());
789 let (provider, url) = result.unwrap();
790 assert_eq!(url, "http://localhost:8545");
791 assert_eq!(provider, "provider-http://localhost:8545");
792
793 let initializer = |_: &str| -> Result<String, TestError> {
794 Err(TestError("Failed to initialize".to_string()))
795 };
796
797 let result = get_provider(&selector, "test_operation", &initializer);
798 assert!(result.is_err());
799 let err = result.unwrap_err();
800 assert!(format!("{}", err).contains("Failed to initialize"));
801 }
802
803 #[tokio::test]
804 async fn test_try_with_retries() {
805 let provider = "test_provider".to_string();
806 let provider_url = "http://localhost:8545";
807 let mut total_attempts = 0;
808 let config = RetryConfig::new(3, 1, 5, 10);
809
810 let operation = |p: String| async move {
811 assert_eq!(p, "test_provider");
812 Ok::<_, TestError>(42)
813 };
814
815 let result = try_with_retries(
816 &provider,
817 provider_url,
818 "test_operation",
819 &operation,
820 &|_| false,
821 &config,
822 &mut total_attempts,
823 )
824 .await;
825
826 assert!(result.is_ok());
827 assert_eq!(result.unwrap(), 42);
828 assert_eq!(total_attempts, 1);
829
830 let attempts = Arc::new(AtomicU8::new(0));
831 let attempts_clone = attempts.clone();
832 let operation = move |_: String| {
833 let attempts = attempts_clone.clone();
834 async move {
835 let current = attempts.fetch_add(1, AtomicOrdering::SeqCst);
836 if current < 2 {
837 Err(TestError("Retriable error".to_string()))
838 } else {
839 Ok(42)
840 }
841 }
842 };
843
844 let mut total_attempts = 0;
845 let result = try_with_retries(
846 &provider,
847 provider_url,
848 "test_operation",
849 &operation,
850 &|_| true,
851 &config,
852 &mut total_attempts,
853 )
854 .await;
855
856 assert!(result.is_ok());
857 assert_eq!(result.unwrap(), 42);
858 assert_eq!(total_attempts, 3);
859
860 let operation = |_: String| async { Err(TestError("Non-retriable error".to_string())) };
862
863 let mut total_attempts = 0;
864 let result: Result<i32, InternalRetryError<TestError>> = try_with_retries(
865 &provider,
866 provider_url,
867 "test_operation",
868 &operation,
869 &|_| false,
870 &config,
871 &mut total_attempts,
872 )
873 .await;
874
875 assert!(result.is_err());
876 assert_eq!(total_attempts, 1);
877 let err = result.unwrap_err();
878 assert!(matches!(err, InternalRetryError::NonRetriable(_)));
879
880 let operation = |_: String| async { Err(TestError("Always fails".to_string())) };
882
883 let mut total_attempts = 0;
884 let result: Result<i32, InternalRetryError<TestError>> = try_with_retries(
885 &provider,
886 provider_url,
887 "test_operation",
888 &operation,
889 &|_| true,
890 &config,
891 &mut total_attempts,
892 )
893 .await;
894
895 assert!(result.is_err());
896 assert_eq!(total_attempts, 3); let error = result.unwrap_err();
898 assert!(matches!(error, InternalRetryError::RetriesExhausted(_)));
899 }
900
901 #[tokio::test]
902 async fn test_try_with_retries_max_retries_zero() {
903 let provider = "test_provider".to_string();
904 let provider_url = "http://localhost:8545";
905 let mut total_attempts = 0;
906 let config = RetryConfig::new(0, 1, 5, 10);
907
908 let operation = |_p: String| async move { Ok::<_, TestError>(42) };
910
911 let result = try_with_retries(
912 &provider,
913 provider_url,
914 "test_operation",
915 &operation,
916 &|_| false,
917 &config,
918 &mut total_attempts,
919 )
920 .await;
921
922 assert!(result.is_ok());
923 assert_eq!(result.unwrap(), 42);
924
925 let operation = |_: String| async { Err(TestError("Always fails".to_string())) };
927
928 let mut total_attempts = 0;
929 let result: Result<i32, InternalRetryError<TestError>> = try_with_retries(
930 &provider,
931 provider_url,
932 "test_operation",
933 &operation,
934 &|_| true,
935 &config,
936 &mut total_attempts,
937 )
938 .await;
939
940 assert!(result.is_err());
941 let error = result.unwrap_err();
942 assert!(matches!(error, InternalRetryError::NonRetriable(_))); }
944
945 #[tokio::test]
946 async fn test_try_with_retries_max_retries_one() {
947 let provider = "test_provider".to_string();
948 let provider_url = "http://localhost:8545";
949 let mut total_attempts = 0;
950 let config = RetryConfig::new(1, 1, 5, 10);
951
952 let operation = |p: String| async move {
954 assert_eq!(p, "test_provider");
955 Ok::<_, TestError>(42)
956 };
957
958 let result = try_with_retries(
959 &provider,
960 provider_url,
961 "test_operation",
962 &operation,
963 &|_| false,
964 &config,
965 &mut total_attempts,
966 )
967 .await;
968
969 assert!(result.is_ok());
970 assert_eq!(result.unwrap(), 42);
971
972 let operation = |_: String| async { Err(TestError("Always fails".to_string())) };
974
975 let mut total_attempts = 0;
976 let result: Result<i32, InternalRetryError<TestError>> = try_with_retries(
977 &provider,
978 provider_url,
979 "test_operation",
980 &operation,
981 &|_| true,
982 &config,
983 &mut total_attempts,
984 )
985 .await;
986
987 assert!(result.is_err());
988 let error = result.unwrap_err();
989 assert!(matches!(error, InternalRetryError::NonRetriable(_))); }
991
992 #[tokio::test]
993 async fn test_non_retriable_error_does_not_mark_provider_failed() {
994 let _guard = setup_test_env();
995
996 let configs = vec![
997 RpcConfig::new("http://localhost:8545".to_string()),
998 RpcConfig::new("http://localhost:8546".to_string()),
999 ];
1000 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1001
1002 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1003
1004 let operation =
1006 |_provider: String| async move { Err(TestError("Non-retriable error".to_string())) };
1007
1008 let config = RetryConfig::new(3, 1, 0, 0);
1009
1010 let initial_available_count = selector.available_provider_count();
1012
1013 let result: Result<i32, TestError> = retry_rpc_call(
1014 &selector,
1015 "test_operation",
1016 |_| false, |_| false, provider_initializer,
1019 operation,
1020 Some(config),
1021 )
1022 .await;
1023
1024 assert!(result.is_err());
1025
1026 let final_available_count = selector.available_provider_count();
1028 assert_eq!(
1029 initial_available_count, final_available_count,
1030 "Provider count should remain the same for non-retriable errors"
1031 );
1032 }
1033
1034 #[tokio::test]
1035 async fn test_retriable_error_marks_provider_failed_after_retries_exhausted() {
1036 let _guard = setup_test_env();
1037
1038 let configs = vec![
1039 RpcConfig::new("http://localhost:8545".to_string()),
1040 RpcConfig::new("http://localhost:8546".to_string()),
1041 ];
1042 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1043
1044 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1045
1046 let operation = |_provider: String| async { Err(TestError("Retriable error".to_string())) };
1048
1049 let config = RetryConfig::new(2, 1, 0, 0); let initial_available_count = selector.available_provider_count();
1053
1054 let result: Result<i32, TestError> = retry_rpc_call(
1055 &selector,
1056 "test_operation",
1057 |_| true, |_| true, provider_initializer,
1060 operation,
1061 Some(config),
1062 )
1063 .await;
1064
1065 assert!(result.is_err());
1066
1067 let final_available_count = selector.available_provider_count();
1069 assert!(final_available_count < initial_available_count,
1070 "At least one provider should be marked as failed after retriable errors exhaust retries");
1071 }
1072
1073 #[tokio::test]
1074 async fn test_retry_rpc_call_success() {
1075 let _guard = setup_test_env();
1076
1077 let configs = vec![
1078 RpcConfig::new("http://localhost:8545".to_string()),
1079 RpcConfig::new("http://localhost:8546".to_string()),
1080 ];
1081 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1082
1083 let attempts = Arc::new(AtomicU8::new(0));
1084 let attempts_clone = attempts.clone();
1085
1086 let provider_initializer =
1087 |_url: &str| -> Result<String, TestError> { Ok("mock_provider".to_string()) };
1088
1089 let operation = move |_provider: String| {
1090 let attempts = attempts_clone.clone();
1091 async move {
1092 attempts.fetch_add(1, AtomicOrdering::SeqCst);
1093 Ok::<_, TestError>(42)
1094 }
1095 };
1096
1097 let config = RetryConfig::new(1, 1, 0, 0);
1098
1099 let result = retry_rpc_call(
1100 &selector,
1101 "test_operation",
1102 |_| false, |_| false, provider_initializer,
1105 operation,
1106 Some(config),
1107 )
1108 .await;
1109
1110 assert!(result.is_ok(), "Expected OK result but got: {:?}", result);
1111 assert_eq!(result.unwrap(), 42);
1112 assert_eq!(attempts.load(AtomicOrdering::SeqCst), 1); }
1114
1115 #[tokio::test]
1116 async fn test_retry_rpc_call_with_provider_failover() {
1117 let _guard = setup_test_env();
1118
1119 let configs = vec![
1120 RpcConfig::new("http://localhost:8545".to_string()),
1121 RpcConfig::new("http://localhost:8546".to_string()),
1122 ];
1123 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1124
1125 let current_provider = Arc::new(Mutex::new(String::new()));
1126 let current_provider_clone = current_provider.clone();
1127
1128 let provider_initializer = move |url: &str| -> Result<String, TestError> {
1129 let mut provider = current_provider_clone.lock().unwrap();
1130 *provider = url.to_string();
1131 Ok(url.to_string())
1132 };
1133
1134 let operation = move |provider: String| async move {
1135 if provider.contains("8545") {
1136 Err(TestError("First provider error".to_string()))
1137 } else {
1138 Ok(42)
1139 }
1140 };
1141
1142 let config = RetryConfig::new(2, 1, 0, 0); let result = retry_rpc_call(
1145 &selector,
1146 "test_operation",
1147 |_| true, |_| true, provider_initializer,
1150 operation,
1151 Some(config),
1152 )
1153 .await;
1154
1155 assert!(result.is_ok(), "Expected OK result but got: {:?}", result);
1156 assert_eq!(result.unwrap(), 42);
1157
1158 let final_provider = current_provider.lock().unwrap().clone();
1160 assert!(
1161 final_provider.contains("8546"),
1162 "Wrong provider selected: {}",
1163 final_provider
1164 );
1165 }
1166
1167 #[tokio::test]
1168 async fn test_retry_rpc_call_all_providers_fail() {
1169 let _guard = setup_test_env();
1170
1171 let configs = vec![
1172 RpcConfig::new("http://localhost:8545".to_string()),
1173 RpcConfig::new("http://localhost:8546".to_string()),
1174 ];
1175 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1176
1177 let provider_initializer =
1178 |_: &str| -> Result<String, TestError> { Ok("mock_provider".to_string()) };
1179
1180 let operation = |_: String| async { Err(TestError("Always fails".to_string())) };
1181
1182 let config = RetryConfig::new(2, 1, 0, 0); let result: Result<i32, TestError> = retry_rpc_call(
1185 &selector,
1186 "test_operation",
1187 |_| true, |_| false, provider_initializer,
1190 operation,
1191 Some(config),
1192 )
1193 .await;
1194
1195 assert!(result.is_err(), "Expected an error but got: {:?}", result);
1196 }
1197
1198 #[tokio::test]
1199 async fn test_retry_rpc_call_with_default_config() {
1200 let (_guard, selector) = {
1201 let _lock = RETRY_TEST_ENV_MUTEX
1202 .lock()
1203 .unwrap_or_else(|e| e.into_inner());
1204 let guard = setup_test_env();
1205
1206 let configs = vec![RpcConfig::new("http://localhost:8545".to_string())];
1207 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1208 (guard, selector)
1209 };
1210
1211 let provider_initializer =
1212 |_url: &str| -> Result<String, TestError> { Ok("mock_provider".to_string()) };
1213
1214 let operation = |_provider: String| async move { Ok::<_, TestError>(42) };
1215
1216 let result = retry_rpc_call(
1218 &selector,
1219 "test_operation",
1220 |_| false,
1221 |_| false,
1222 provider_initializer,
1223 operation,
1224 None, )
1226 .await;
1227
1228 assert!(result.is_ok());
1229 assert_eq!(result.unwrap(), 42);
1230 }
1231
1232 #[tokio::test]
1233 async fn test_retry_rpc_call_provider_initialization_failures() {
1234 let _guard = setup_test_env();
1235
1236 let configs = vec![
1237 RpcConfig::new("http://localhost:8545".to_string()),
1238 RpcConfig::new("http://localhost:8546".to_string()),
1239 ];
1240 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1241
1242 let attempt_count = Arc::new(AtomicU8::new(0));
1243 let attempt_count_clone = attempt_count.clone();
1244
1245 let provider_initializer = move |url: &str| -> Result<String, TestError> {
1246 let count = attempt_count_clone.fetch_add(1, AtomicOrdering::SeqCst);
1247 if count == 0 && url.contains("8545") {
1248 Err(TestError("First provider init failed".to_string()))
1249 } else {
1250 Ok(url.to_string())
1251 }
1252 };
1253
1254 let operation = |_provider: String| async move { Ok::<_, TestError>(42) };
1255
1256 let config = RetryConfig::new(2, 1, 0, 0);
1257
1258 let result = retry_rpc_call(
1259 &selector,
1260 "test_operation",
1261 |_| true,
1262 |_| false,
1263 provider_initializer,
1264 operation,
1265 Some(config),
1266 )
1267 .await;
1268
1269 assert!(result.is_ok());
1270 assert_eq!(result.unwrap(), 42);
1271 assert!(attempt_count.load(AtomicOrdering::SeqCst) >= 2); }
1273
1274 #[test]
1275 fn test_get_provider_selector_errors() {
1276 let _guard = setup_test_env();
1277
1278 let configs = vec![RpcConfig::new("http://localhost:8545".to_string())];
1280 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1281
1282 let _ = selector.get_current_url().unwrap(); selector.mark_current_as_failed(); let provider_initializer =
1287 |url: &str| -> Result<String, TestError> { Ok(format!("provider-{}", url)) };
1288
1289 let result = get_provider(&selector, "test_operation", &provider_initializer);
1291 assert!(result.is_err());
1292 }
1293
1294 #[tokio::test]
1295 async fn test_last_provider_never_marked_as_failed() {
1296 let _guard = setup_test_env();
1297
1298 let configs = vec![RpcConfig::new("http://localhost:8545".to_string())];
1300 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1301
1302 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1303
1304 let operation = |_provider: String| async { Err(TestError("Always fails".to_string())) };
1306
1307 let config = RetryConfig::new(2, 1, 0, 0); let initial_available_count = selector.available_provider_count();
1311 assert_eq!(initial_available_count, 1);
1312
1313 let result: Result<i32, TestError> = retry_rpc_call(
1314 &selector,
1315 "test_operation",
1316 |_| true, |_| true, provider_initializer,
1319 operation,
1320 Some(config),
1321 )
1322 .await;
1323
1324 assert!(result.is_err());
1325
1326 let final_available_count = selector.available_provider_count();
1328 assert_eq!(
1329 final_available_count, initial_available_count,
1330 "Last provider should never be marked as failed"
1331 );
1332 assert_eq!(
1333 final_available_count, 1,
1334 "Should still have 1 provider available"
1335 );
1336 }
1337
1338 #[tokio::test]
1339 async fn test_last_provider_behavior_with_multiple_providers() {
1340 let _guard = setup_test_env();
1341
1342 let configs = vec![
1344 RpcConfig::new("http://localhost:8545".to_string()),
1345 RpcConfig::new("http://localhost:8546".to_string()),
1346 RpcConfig::new("http://localhost:8547".to_string()),
1347 ];
1348 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1349
1350 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1351
1352 let operation = |_provider: String| async { Err(TestError("Always fails".to_string())) };
1354
1355 let config = RetryConfig::new(2, 2, 0, 0); let initial_available_count = selector.available_provider_count();
1359 assert_eq!(initial_available_count, 3);
1360
1361 let result: Result<i32, TestError> = retry_rpc_call(
1362 &selector,
1363 "test_operation",
1364 |_| true, |_| true, provider_initializer,
1367 operation,
1368 Some(config),
1369 )
1370 .await;
1371
1372 assert!(result.is_err());
1373
1374 let final_available_count = selector.available_provider_count();
1376 assert_eq!(
1377 final_available_count, 1,
1378 "Should have exactly 1 provider left (the last one should not be marked as failed)"
1379 );
1380 }
1381
1382 #[tokio::test]
1383 async fn test_non_retriable_error_should_mark_provider_failed() {
1384 let _guard = setup_test_env();
1385
1386 let configs = vec![
1387 RpcConfig::new("http://localhost:8545".to_string()),
1388 RpcConfig::new("http://localhost:8546".to_string()),
1389 ];
1390 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1391
1392 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1393
1394 let operation = |_provider: String| async move {
1396 Err(TestError("Critical non-retriable error".to_string()))
1397 };
1398
1399 let config = RetryConfig::new(3, 1, 0, 0);
1400
1401 let initial_available_count = selector.available_provider_count();
1403 assert_eq!(initial_available_count, 2);
1404
1405 let result: Result<i32, TestError> = retry_rpc_call(
1406 &selector,
1407 "test_operation",
1408 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1411 operation,
1412 Some(config),
1413 )
1414 .await;
1415
1416 assert!(result.is_err());
1417
1418 let final_available_count = selector.available_provider_count();
1420 assert_eq!(final_available_count, 1,
1421 "Provider should be marked as failed when should_mark_provider_failed returns true for non-retriable error");
1422 }
1423
1424 #[tokio::test]
1425 async fn test_non_retriable_error_should_not_mark_provider_failed() {
1426 let _guard = setup_test_env();
1427
1428 let configs = vec![
1429 RpcConfig::new("http://localhost:8545".to_string()),
1430 RpcConfig::new("http://localhost:8546".to_string()),
1431 ];
1432 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1433
1434 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1435
1436 let operation = |_provider: String| async move {
1438 Err(TestError("Minor non-retriable error".to_string()))
1439 };
1440
1441 let config = RetryConfig::new(3, 1, 0, 0);
1442
1443 let initial_available_count = selector.available_provider_count();
1445 assert_eq!(initial_available_count, 2);
1446
1447 let result: Result<i32, TestError> = retry_rpc_call(
1448 &selector,
1449 "test_operation",
1450 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1453 operation,
1454 Some(config),
1455 )
1456 .await;
1457
1458 assert!(result.is_err());
1459
1460 let final_available_count = selector.available_provider_count();
1462 assert_eq!(final_available_count, initial_available_count,
1463 "Provider should NOT be marked as failed when should_mark_provider_failed returns false for non-retriable error");
1464 }
1465
1466 #[tokio::test]
1467 async fn test_retriable_error_ignores_should_mark_provider_failed() {
1468 let _guard = setup_test_env();
1469
1470 let configs = vec![
1471 RpcConfig::new("http://localhost:8545".to_string()),
1472 RpcConfig::new("http://localhost:8546".to_string()),
1473 ];
1474 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1475
1476 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1477
1478 let operation =
1480 |_provider: String| async { Err(TestError("Retriable network error".to_string())) };
1481
1482 let config = RetryConfig::new(2, 1, 0, 0); let initial_available_count = selector.available_provider_count();
1486 assert_eq!(initial_available_count, 2);
1487
1488 let result: Result<i32, TestError> = retry_rpc_call(
1489 &selector,
1490 "test_operation",
1491 |_| true, |_| false, provider_initializer,
1494 operation,
1495 Some(config),
1496 )
1497 .await;
1498
1499 assert!(result.is_err());
1500
1501 let final_available_count = selector.available_provider_count();
1504 assert!(final_available_count < initial_available_count,
1505 "Provider should be marked as failed when retriable errors exhaust retries, regardless of should_mark_provider_failed");
1506 }
1507
1508 #[tokio::test]
1509 async fn test_mixed_error_scenarios_with_different_marking_behavior() {
1510 let _guard = setup_test_env();
1511
1512 let configs = vec![
1514 RpcConfig::new("http://localhost:8545".to_string()),
1515 RpcConfig::new("http://localhost:8546".to_string()),
1516 ];
1517 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1518
1519 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1520
1521 let operation =
1522 |_provider: String| async move { Err(TestError("Critical network error".to_string())) };
1523
1524 let config = RetryConfig::new(1, 1, 0, 0);
1525 let initial_count = selector.available_provider_count();
1526
1527 let result: Result<i32, TestError> = retry_rpc_call(
1528 &selector,
1529 "test_operation",
1530 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1533 operation,
1534 Some(config.clone()),
1535 )
1536 .await;
1537
1538 assert!(result.is_err());
1539 let after_critical_count = selector.available_provider_count();
1540 assert_eq!(
1541 after_critical_count,
1542 initial_count - 1,
1543 "Critical error should mark provider as failed"
1544 );
1545
1546 let operation =
1548 |_provider: String| async move { Err(TestError("Minor validation error".to_string())) };
1549
1550 let result: Result<i32, TestError> = retry_rpc_call(
1551 &selector,
1552 "test_operation",
1553 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1556 operation,
1557 Some(config),
1558 )
1559 .await;
1560
1561 assert!(result.is_err());
1562 let final_count = selector.available_provider_count();
1563 assert_eq!(
1564 final_count, after_critical_count,
1565 "Minor error should NOT mark provider as failed"
1566 );
1567 }
1568
1569 #[tokio::test]
1570 async fn test_should_mark_provider_failed_respects_last_provider_protection() {
1571 let _guard = setup_test_env();
1572
1573 let configs = vec![RpcConfig::new("http://localhost:8545".to_string())];
1575 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1576
1577 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1578
1579 let operation =
1581 |_provider: String| async move { Err(TestError("Critical network error".to_string())) };
1582
1583 let config = RetryConfig::new(1, 1, 0, 0);
1584
1585 let initial_available_count = selector.available_provider_count();
1587 assert_eq!(initial_available_count, 1);
1588
1589 let result: Result<i32, TestError> = retry_rpc_call(
1590 &selector,
1591 "test_operation",
1592 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1595 operation,
1596 Some(config),
1597 )
1598 .await;
1599
1600 assert!(result.is_err());
1601
1602 let final_available_count = selector.available_provider_count();
1604 assert_eq!(final_available_count, initial_available_count,
1605 "Last provider should never be marked as failed, regardless of should_mark_provider_failed");
1606 assert_eq!(
1607 final_available_count, 1,
1608 "Should still have 1 provider available"
1609 );
1610 }
1611
1612 #[tokio::test]
1613 async fn test_should_mark_provider_failed_with_multiple_providers_last_protection() {
1614 let _guard = setup_test_env();
1615
1616 let configs = vec![
1618 RpcConfig::new("http://localhost:8545".to_string()),
1619 RpcConfig::new("http://localhost:8546".to_string()),
1620 ];
1621 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1622
1623 let attempt_count = Arc::new(AtomicU8::new(0));
1624 let attempt_count_clone = attempt_count.clone();
1625
1626 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1627
1628 let operation = move |_provider: String| {
1630 let attempt_count = attempt_count_clone.clone();
1631 async move {
1632 let count = attempt_count.fetch_add(1, AtomicOrdering::SeqCst);
1633 Err(TestError(format!("Critical error #{}", count)))
1634 }
1635 };
1636
1637 let config = RetryConfig::new(1, 1, 0, 0); let initial_available_count = selector.available_provider_count();
1641 assert_eq!(initial_available_count, 2);
1642
1643 let result: Result<i32, TestError> = retry_rpc_call(
1644 &selector,
1645 "test_operation",
1646 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1649 operation,
1650 Some(config),
1651 )
1652 .await;
1653
1654 assert!(result.is_err());
1655
1656 let final_available_count = selector.available_provider_count();
1658 assert_eq!(
1659 final_available_count, 1,
1660 "First provider should be marked as failed, but last provider should be protected"
1661 );
1662 }
1663}