1use std::collections::HashSet;
13use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
14use std::sync::Arc;
15use std::time::Duration;
16
17use eyre::Result;
18use parking_lot::RwLock;
19use rand::distr::weighted::WeightedIndex;
20use rand::prelude::*;
21use serde::Serialize;
22use thiserror::Error;
23use tokio::time::Instant;
24
25use crate::models::RpcConfig;
26
27#[derive(Error, Debug, Serialize)]
28pub enum RpcSelectorError {
29 #[error("No providers available")]
30 NoProviders,
31 #[error("Client initialization failed: {0}")]
32 ClientInitializationError(String),
33 #[error("Weighted index error: {0}")]
34 WeightedIndexError(String),
35 #[error("All available providers have failed")]
36 AllProvidersFailed,
37}
38
39#[derive(Debug)]
41struct ProviderHealth {
42 failed_provider_reset_times: std::collections::HashMap<usize, Instant>,
44 reset_duration: Duration,
46}
47
48impl ProviderHealth {
49 fn new(reset_duration: Duration) -> Self {
51 Self {
52 failed_provider_reset_times: std::collections::HashMap::new(),
53 reset_duration,
54 }
55 }
56
57 fn mark_failed(&mut self, index: usize) {
59 let reset_time = Instant::now() + self.reset_duration;
60 self.failed_provider_reset_times.insert(index, reset_time);
61 }
62
63 fn is_failed(&mut self, index: usize) -> bool {
65 if let Some(reset_time) = self.failed_provider_reset_times.get(&index) {
66 if Instant::now() >= *reset_time {
67 self.failed_provider_reset_times.remove(&index);
69 return false;
70 }
71 return true;
72 }
73 false
74 }
75
76 fn reset(&mut self) {
78 self.failed_provider_reset_times.clear();
79 }
80
81 fn failed_count(&self) -> usize {
83 self.failed_provider_reset_times
84 .values()
85 .filter(|&&reset_time| Instant::now() < reset_time)
86 .count()
87 }
88}
89
90#[derive(Debug)]
92pub struct RpcSelector {
93 configs: Vec<RpcConfig>,
95 weights_dist: Option<Arc<WeightedIndex<u8>>>,
97 next_index: Arc<AtomicUsize>,
99 health: Arc<RwLock<ProviderHealth>>,
101 current_index: Arc<AtomicUsize>,
103 has_current: Arc<AtomicBool>,
105}
106
107const DEFAULT_PROVIDER_RESET_DURATION: Duration = Duration::from_secs(300);
109
110impl RpcSelector {
111 pub fn new(configs: Vec<RpcConfig>) -> Result<Self, RpcSelectorError> {
119 if configs.is_empty() {
120 return Err(RpcSelectorError::NoProviders);
121 }
122
123 let weights_dist = Self::create_weights_distribution(&configs, &HashSet::new());
125
126 let health = ProviderHealth::new(DEFAULT_PROVIDER_RESET_DURATION);
128
129 Ok(Self {
130 configs,
131 weights_dist,
132 next_index: Arc::new(AtomicUsize::new(0)),
133 health: Arc::new(RwLock::new(health)),
134 current_index: Arc::new(AtomicUsize::new(0)),
135 has_current: Arc::new(AtomicBool::new(false)), })
137 }
138
139 pub fn provider_count(&self) -> usize {
144 self.configs.len()
145 }
146
147 pub fn available_provider_count(&self) -> usize {
152 let health = self.health.read();
153 self.configs.len() - health.failed_count()
154 }
155
156 pub fn mark_current_as_failed(&self) {
161 if self.has_current.load(Ordering::Relaxed) {
163 let current = self.current_index.load(Ordering::Relaxed);
164
165 let mut health = self.health.write();
167 health.mark_failed(current);
168
169 self.has_current.store(false, Ordering::Relaxed);
171
172 if self.configs.len() > 1 {
174 self.next_index.fetch_add(1, Ordering::Relaxed);
175 }
176 }
177 }
178
179 pub fn reset_failed_providers(&self) {
181 let mut health = self.health.write();
182 health.reset();
183 }
184
185 fn create_weights_distribution(
194 configs: &[RpcConfig],
195 excluded_indices: &HashSet<usize>,
196 ) -> Option<Arc<WeightedIndex<u8>>> {
197 let weights: Vec<u8> = configs
199 .iter()
200 .enumerate()
201 .map(|(idx, config)| {
202 if excluded_indices.contains(&idx) {
203 0
204 } else {
205 config.get_weight()
206 }
207 })
208 .collect();
209
210 let available_count = weights.iter().filter(|&&w| w > 0).count();
212 if available_count == 0 {
213 return None;
214 }
215
216 let first_non_zero_weight = weights.iter().find(|&&w| w > 0).copied();
217 if let Some(first_weight) = first_non_zero_weight {
218 let all_equal = weights
220 .iter()
221 .filter(|&&w| w > 0)
222 .all(|&w| w == first_weight);
223
224 if all_equal {
225 return None;
226 }
227 }
228
229 match WeightedIndex::new(&weights) {
231 Ok(dist) => Some(Arc::new(dist)),
232 Err(_) => None,
233 }
234 }
235
236 fn select_url(&self) -> Result<&str, RpcSelectorError> {
238 if self.configs.is_empty() {
239 return Err(RpcSelectorError::NoProviders);
240 }
241
242 if self.configs.len() == 1 {
244 let mut health = self.health.write();
245 if health.is_failed(0) {
246 return Err(RpcSelectorError::AllProvidersFailed);
248 }
249
250 self.current_index.store(0, Ordering::Relaxed);
252 self.has_current.store(true, Ordering::Relaxed);
253 return Ok(&self.configs[0].url);
254 }
255
256 if let Some(dist) = &self.weights_dist {
258 let mut rng = rand::rng();
259 let mut health = self.health.write();
260
261 const MAX_ATTEMPTS: usize = 5;
263 for _ in 0..MAX_ATTEMPTS {
264 let index = dist.sample(&mut rng);
265 if !health.is_failed(index) {
266 self.current_index.store(index, Ordering::Relaxed);
267 self.has_current.store(true, Ordering::Relaxed);
268 return Ok(&self.configs[index].url);
269 }
270 }
271 }
273
274 let len = self.configs.len();
276 let start_index = self.next_index.load(Ordering::Relaxed) % len;
277
278 for i in 0..len {
280 let index = (start_index + i) % len;
281 if self.configs[index].get_weight() == 0 {
283 continue;
284 }
285
286 let mut health = self.health.write();
287 if !health.is_failed(index) {
288 self.next_index.store((index + 1) % len, Ordering::Relaxed);
290
291 self.current_index.store(index, Ordering::Relaxed);
293 self.has_current.store(true, Ordering::Relaxed);
294
295 return Ok(&self.configs[index].url);
296 }
297 }
298
299 Err(RpcSelectorError::AllProvidersFailed)
301 }
302
303 pub fn get_current_url(&self) -> Result<String, RpcSelectorError> {
308 self.select_url().map(|url| url.to_string())
309 }
310
311 pub fn get_client<T>(
319 &self,
320 initializer: impl Fn(&str) -> Result<T>,
321 ) -> Result<T, RpcSelectorError> {
322 let url = self.select_url()?;
323
324 initializer(url).map_err(|e| RpcSelectorError::ClientInitializationError(e.to_string()))
325 }
326}
327
328impl Clone for RpcSelector {
330 fn clone(&self) -> Self {
331 Self {
332 configs: self.configs.clone(),
333 weights_dist: self.weights_dist.clone(),
334 next_index: Arc::clone(&self.next_index),
335 health: Arc::clone(&self.health),
336 current_index: Arc::clone(&self.current_index),
337 has_current: Arc::clone(&self.has_current),
338 }
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345 use std::thread;
346
347 #[test]
348 fn test_create_weights_distribution_single_config() {
349 let configs = vec![RpcConfig {
350 url: "https://example.com/rpc".to_string(),
351 weight: 1,
352 }];
353
354 let excluded = HashSet::new();
355 let result = RpcSelector::create_weights_distribution(&configs, &excluded);
356 assert!(result.is_none());
357 }
358
359 #[test]
360 fn test_create_weights_distribution_equal_weights() {
361 let configs = vec![
362 RpcConfig {
363 url: "https://example1.com/rpc".to_string(),
364 weight: 5,
365 },
366 RpcConfig {
367 url: "https://example2.com/rpc".to_string(),
368 weight: 5,
369 },
370 RpcConfig {
371 url: "https://example3.com/rpc".to_string(),
372 weight: 5,
373 },
374 ];
375
376 let excluded = HashSet::new();
377 let result = RpcSelector::create_weights_distribution(&configs, &excluded);
378 assert!(result.is_none());
379 }
380
381 #[test]
382 fn test_create_weights_distribution_different_weights() {
383 let configs = vec![
384 RpcConfig {
385 url: "https://example1.com/rpc".to_string(),
386 weight: 1,
387 },
388 RpcConfig {
389 url: "https://example2.com/rpc".to_string(),
390 weight: 2,
391 },
392 RpcConfig {
393 url: "https://example3.com/rpc".to_string(),
394 weight: 3,
395 },
396 ];
397
398 let excluded = HashSet::new();
399 let result = RpcSelector::create_weights_distribution(&configs, &excluded);
400 assert!(result.is_some());
401 }
402
403 #[test]
404 fn test_create_weights_distribution_with_excluded() {
405 let configs = vec![
406 RpcConfig {
407 url: "https://example1.com/rpc".to_string(),
408 weight: 1,
409 },
410 RpcConfig {
411 url: "https://example2.com/rpc".to_string(),
412 weight: 2,
413 },
414 RpcConfig {
415 url: "https://example3.com/rpc".to_string(),
416 weight: 3,
417 },
418 ];
419
420 let mut excluded = HashSet::new();
422 excluded.insert(0);
423
424 let result = RpcSelector::create_weights_distribution(&configs, &excluded);
425 assert!(result.is_some());
426
427 excluded.insert(1);
429 let result = RpcSelector::create_weights_distribution(&configs, &excluded);
430 assert!(result.is_none());
431 }
432
433 #[test]
434 fn test_rpc_selector_new_empty_configs() {
435 let configs: Vec<RpcConfig> = vec![];
436 let result = RpcSelector::new(configs);
437 assert!(result.is_err());
438 assert!(matches!(result.unwrap_err(), RpcSelectorError::NoProviders));
439 }
440
441 #[test]
442 fn test_rpc_selector_new_single_config() {
443 let configs = vec![RpcConfig {
444 url: "https://example.com/rpc".to_string(),
445 weight: 1,
446 }];
447
448 let result = RpcSelector::new(configs);
449 assert!(result.is_ok());
450 let selector = result.unwrap();
451 assert!(selector.weights_dist.is_none());
452 }
453
454 #[test]
455 fn test_rpc_selector_new_multiple_equal_weights() {
456 let configs = vec![
457 RpcConfig {
458 url: "https://example1.com/rpc".to_string(),
459 weight: 5,
460 },
461 RpcConfig {
462 url: "https://example2.com/rpc".to_string(),
463 weight: 5,
464 },
465 ];
466
467 let result = RpcSelector::new(configs);
468 assert!(result.is_ok());
469 let selector = result.unwrap();
470 assert!(selector.weights_dist.is_none());
471 }
472
473 #[test]
474 fn test_rpc_selector_new_multiple_different_weights() {
475 let configs = vec![
476 RpcConfig {
477 url: "https://example1.com/rpc".to_string(),
478 weight: 1,
479 },
480 RpcConfig {
481 url: "https://example2.com/rpc".to_string(),
482 weight: 3,
483 },
484 ];
485
486 let result = RpcSelector::new(configs);
487 assert!(result.is_ok());
488 let selector = result.unwrap();
489 assert!(selector.weights_dist.is_some());
490 }
491
492 #[test]
493 fn test_rpc_selector_select_url_single_provider() {
494 let configs = vec![RpcConfig {
495 url: "https://example.com/rpc".to_string(),
496 weight: 1,
497 }];
498
499 let selector = RpcSelector::new(configs).unwrap();
500 let result = selector.select_url();
501 assert!(result.is_ok());
502 assert_eq!(result.unwrap(), "https://example.com/rpc");
503 assert!(selector.has_current.load(Ordering::Relaxed));
504 }
505
506 #[test]
507 fn test_rpc_selector_select_url_round_robin() {
508 let configs = vec![
509 RpcConfig {
510 url: "https://example1.com/rpc".to_string(),
511 weight: 1,
512 },
513 RpcConfig {
514 url: "https://example2.com/rpc".to_string(),
515 weight: 1,
516 },
517 ];
518
519 let selector = RpcSelector::new(configs).unwrap();
520
521 let first_url = selector.select_url().unwrap();
523 let second_url = selector.select_url().unwrap();
525 let third_url = selector.select_url().unwrap();
527
528 assert_ne!(first_url, second_url);
530 assert_eq!(first_url, third_url);
531 }
532
533 #[test]
534 fn test_rpc_selector_get_client_success() {
535 let configs = vec![RpcConfig {
536 url: "https://example.com/rpc".to_string(),
537 weight: 1,
538 }];
539
540 let selector = RpcSelector::new(configs).unwrap();
541
542 let initializer = |url: &str| -> Result<String> { Ok(url.to_string()) };
544
545 let result = selector.get_client(initializer);
546 assert!(result.is_ok());
547 assert_eq!(result.unwrap(), "https://example.com/rpc");
548 }
549
550 #[test]
551 fn test_rpc_selector_get_client_failure() {
552 let configs = vec![RpcConfig {
553 url: "https://example.com/rpc".to_string(),
554 weight: 1,
555 }];
556
557 let selector = RpcSelector::new(configs).unwrap();
558
559 let initializer =
561 |_url: &str| -> Result<String> { Err(eyre::eyre!("Initialization error")) };
562
563 let result = selector.get_client(initializer);
564 assert!(result.is_err());
565 assert!(matches!(
566 result.unwrap_err(),
567 RpcSelectorError::ClientInitializationError(_)
568 ));
569 }
570
571 #[test]
572 fn test_rpc_selector_clone() {
573 let configs = vec![
574 RpcConfig {
575 url: "https://example1.com/rpc".to_string(),
576 weight: 1,
577 },
578 RpcConfig {
579 url: "https://example2.com/rpc".to_string(),
580 weight: 3,
581 },
582 ];
583
584 let selector = RpcSelector::new(configs).unwrap();
585 let cloned = selector.clone();
586
587 assert_eq!(selector.configs.len(), cloned.configs.len());
589 assert_eq!(selector.configs[0].url, cloned.configs[0].url);
590 assert_eq!(selector.configs[1].url, cloned.configs[1].url);
591
592 assert_eq!(
594 selector.weights_dist.is_some(),
595 cloned.weights_dist.is_some()
596 );
597 }
598
599 #[test]
600 fn test_mark_current_as_failed_single_provider() {
601 let configs = vec![RpcConfig {
603 url: "https://example.com/rpc".to_string(),
604 weight: 1,
605 }];
606
607 let selector = RpcSelector::new(configs).unwrap();
608 let initial_url = selector.select_url().unwrap();
609
610 selector.mark_current_as_failed();
612
613 let next_url = selector.select_url();
615 assert!(next_url.is_err());
616 assert!(matches!(
617 next_url.unwrap_err(),
618 RpcSelectorError::AllProvidersFailed
619 ));
620
621 selector.reset_failed_providers();
623
624 let after_reset = selector.select_url();
626 assert!(after_reset.is_ok());
627 assert_eq!(initial_url, after_reset.unwrap());
628 }
629
630 #[test]
631 fn test_mark_current_as_failed_multiple_providers() {
632 let configs = vec![
634 RpcConfig {
635 url: "https://example1.com/rpc".to_string(),
636 weight: 5,
637 },
638 RpcConfig {
639 url: "https://example2.com/rpc".to_string(),
640 weight: 5,
641 },
642 RpcConfig {
643 url: "https://example3.com/rpc".to_string(),
644 weight: 5,
645 },
646 ];
647
648 let selector = RpcSelector::new(configs).unwrap();
649
650 let url1 = selector.select_url().unwrap().to_string();
652
653 selector.mark_current_as_failed();
655 let url2 = selector.select_url().unwrap().to_string();
656
657 assert_ne!(url1, url2);
659
660 selector.mark_current_as_failed();
662 let url3 = selector.select_url().unwrap().to_string();
663
664 assert_ne!(url1, url3);
666 assert_ne!(url2, url3);
667
668 selector.mark_current_as_failed();
670
671 let url4 = selector.select_url();
673 assert!(url4.is_err());
674 assert!(matches!(
675 url4.unwrap_err(),
676 RpcSelectorError::AllProvidersFailed
677 ));
678 }
679
680 #[test]
681 fn test_mark_current_as_failed_weighted() {
682 let configs = vec![
684 RpcConfig {
685 url: "https://example1.com/rpc".to_string(),
686 weight: 1, },
688 RpcConfig {
689 url: "https://example2.com/rpc".to_string(),
690 weight: 10, },
692 ];
693
694 let selector = RpcSelector::new(configs).unwrap();
695 assert!(selector.weights_dist.is_some()); let url1 = selector.select_url().unwrap().to_string();
699
700 selector.mark_current_as_failed();
702
703 let url2 = selector.select_url().unwrap().to_string();
705 assert_ne!(url1, url2);
706
707 selector.mark_current_as_failed();
709
710 let url3 = selector.select_url();
712 assert!(url3.is_err());
713
714 selector.reset_failed_providers();
716 let url4 = selector.select_url();
717 assert!(url4.is_ok());
718 }
719
720 #[test]
721 fn test_auto_reset_mechanism() {
722 let configs = vec![
724 RpcConfig {
725 url: "https://example1.com/rpc".to_string(),
726 weight: 1,
727 },
728 RpcConfig {
729 url: "https://example2.com/rpc".to_string(),
730 weight: 1,
731 },
732 ];
733
734 let selector = RpcSelector::new(configs).unwrap();
736 {
737 let mut health = selector.health.write();
738 *health = ProviderHealth::new(Duration::from_millis(100)); }
740
741 selector.select_url().unwrap();
743 selector.mark_current_as_failed();
744 selector.select_url().unwrap();
745 selector.mark_current_as_failed();
746
747 let result = selector.select_url();
749 assert!(result.is_err());
750
751 thread::sleep(Duration::from_millis(150));
753
754 {
756 let mut health = selector.health.write();
757 let _ = health.is_failed(0);
759 }
760
761 let result = selector.select_url();
763 assert!(
764 result.is_ok(),
765 "Providers should have been auto-reset after timeout"
766 );
767 }
768
769 #[test]
770 fn test_provider_count() {
771 let configs: Vec<RpcConfig> = vec![];
773 let result = RpcSelector::new(configs);
774 assert!(result.is_err());
775
776 let configs = vec![RpcConfig {
778 url: "https://example.com/rpc".to_string(),
779 weight: 1,
780 }];
781 let selector = RpcSelector::new(configs).unwrap();
782 assert_eq!(selector.provider_count(), 1);
783
784 let configs = vec![
786 RpcConfig {
787 url: "https://example1.com/rpc".to_string(),
788 weight: 1,
789 },
790 RpcConfig {
791 url: "https://example2.com/rpc".to_string(),
792 weight: 2,
793 },
794 RpcConfig {
795 url: "https://example3.com/rpc".to_string(),
796 weight: 3,
797 },
798 ];
799 let selector = RpcSelector::new(configs).unwrap();
800 assert_eq!(selector.provider_count(), 3);
801 }
802
803 #[test]
804 fn test_available_provider_count() {
805 let configs = vec![
806 RpcConfig {
807 url: "https://example1.com/rpc".to_string(),
808 weight: 1,
809 },
810 RpcConfig {
811 url: "https://example2.com/rpc".to_string(),
812 weight: 2,
813 },
814 RpcConfig {
815 url: "https://example3.com/rpc".to_string(),
816 weight: 3,
817 },
818 ];
819
820 let selector = RpcSelector::new(configs).unwrap();
821 assert_eq!(selector.provider_count(), 3);
822 assert_eq!(selector.available_provider_count(), 3);
823
824 selector.select_url().unwrap(); selector.mark_current_as_failed();
827 assert_eq!(selector.available_provider_count(), 2);
828
829 selector.select_url().unwrap(); selector.mark_current_as_failed();
832 assert_eq!(selector.available_provider_count(), 1);
833
834 selector.reset_failed_providers();
836 assert_eq!(selector.available_provider_count(), 3);
837 }
838
839 #[test]
840 fn test_get_current_url() {
841 let configs = vec![
842 RpcConfig::new("https://example1.com/rpc".to_string()),
843 RpcConfig::new("https://example2.com/rpc".to_string()),
844 ];
845
846 let selector = RpcSelector::new(configs).unwrap();
847
848 let url = selector.get_current_url();
850 assert!(url.is_ok());
851 let url_str = url.unwrap();
852 assert!(
853 url_str == "https://example1.com/rpc" || url_str == "https://example2.com/rpc",
854 "Unexpected URL: {}",
855 url_str
856 );
857 }
858
859 #[test]
860 fn test_concurrent_usage() {
861 let configs = vec![
863 RpcConfig::new("https://example1.com/rpc".to_string()),
864 RpcConfig::new("https://example2.com/rpc".to_string()),
865 RpcConfig::new("https://example3.com/rpc".to_string()),
866 ];
867
868 let selector = RpcSelector::new(configs).unwrap();
869 let selector_arc = Arc::new(selector);
870
871 let mut handles = Vec::with_capacity(10);
872
873 for _ in 0..10 {
875 let selector_clone = Arc::clone(&selector_arc);
876 let handle = thread::spawn(move || {
877 let url = selector_clone.select_url().unwrap().to_string();
878 if url.contains("example1") {
879 selector_clone.mark_current_as_failed();
881 }
882 url
883 });
884 handles.push(handle);
885 }
886
887 let mut urls = Vec::new();
889 for handle in handles {
890 urls.push(handle.join().unwrap());
891 }
892
893 let unique_urls: std::collections::HashSet<String> = urls.into_iter().collect();
895 assert!(unique_urls.len() > 1, "Expected multiple unique URLs");
896
897 let mut found_non_example1 = false;
899 for _ in 0..10 {
900 let url = selector_arc.select_url().unwrap().to_string();
901 if !url.contains("example1") {
902 found_non_example1 = true;
903 }
904 }
905
906 assert!(found_non_example1, "Should avoid selecting failed provider");
907 }
908
909 #[test]
910 fn test_provider_health_methods() {
911 let duration = Duration::from_secs(10);
912 let mut health = ProviderHealth::new(duration);
913
914 assert_eq!(health.failed_count(), 0);
916 assert!(!health.is_failed(0));
917
918 health.mark_failed(0);
920 assert_eq!(health.failed_count(), 1);
921 assert!(health.is_failed(0));
922
923 health.reset();
925 assert_eq!(health.failed_count(), 0);
926 assert!(!health.is_failed(0));
927 }
928
929 #[test]
930 fn test_consecutive_mark_as_failed() {
931 let configs = vec![
932 RpcConfig::new("https://example1.com/rpc".to_string()),
933 RpcConfig::new("https://example2.com/rpc".to_string()),
934 ];
935
936 let selector = RpcSelector::new(configs).unwrap();
937
938 selector.select_url().unwrap();
940
941 selector.mark_current_as_failed();
943 selector.mark_current_as_failed(); let result = selector.select_url();
947 assert!(result.is_ok());
948 }
949
950 #[test]
951 fn test_partial_auto_reset() {
952 let configs = vec![
953 RpcConfig::new("https://example1.com/rpc".to_string()),
954 RpcConfig::new("https://example2.com/rpc".to_string()),
955 RpcConfig::new("https://example3.com/rpc".to_string()),
956 ];
957
958 let selector = RpcSelector::new(configs).unwrap();
959
960 {
962 let mut health = selector.health.write();
963 *health = ProviderHealth::new(Duration::from_millis(50)); }
965
966 for _ in 0..3 {
968 selector.select_url().unwrap();
969 selector.mark_current_as_failed();
970 }
971
972 assert!(selector.select_url().is_err());
974
975 {
977 let mut health = selector.health.write();
978 health
979 .failed_provider_reset_times
980 .insert(0, Instant::now() + Duration::from_millis(200));
981 }
982
983 thread::sleep(Duration::from_millis(100));
985
986 let url = selector.select_url();
988 assert!(url.is_ok());
989
990 assert!(!url.unwrap().contains("example1"));
992 }
993
994 #[test]
995 fn test_weighted_to_round_robin_fallback() {
996 let configs = vec![
997 RpcConfig {
998 url: "https://example1.com/rpc".to_string(),
999 weight: 10, },
1001 RpcConfig {
1002 url: "https://example2.com/rpc".to_string(),
1003 weight: 1, },
1005 RpcConfig {
1006 url: "https://example3.com/rpc".to_string(),
1007 weight: 1, },
1009 ];
1010
1011 let selector = RpcSelector::new(configs).unwrap();
1012 assert!(selector.weights_dist.is_some()); let mut selected_first = false;
1017
1018 for _ in 0..10 {
1020 let url = selector.select_url().unwrap();
1021 if url.contains("example1") {
1022 selected_first = true;
1023 selector.mark_current_as_failed();
1025 break;
1026 }
1027 }
1028
1029 assert!(
1030 selected_first,
1031 "High-weight provider should have been selected"
1032 );
1033
1034 let mut seen_urls = HashSet::new();
1036 for _ in 0..10 {
1037 let url = selector.select_url().unwrap().to_string();
1038 seen_urls.insert(url);
1039 }
1040
1041 assert!(seen_urls.len() >= 2);
1043 assert!(
1044 !seen_urls.iter().any(|url| url.contains("example1")),
1045 "Failed provider should not be selected"
1046 );
1047 }
1048
1049 #[test]
1050 fn test_zero_weight_providers() {
1051 let configs = vec![
1052 RpcConfig {
1053 url: "https://example1.com/rpc".to_string(),
1054 weight: 0, },
1056 RpcConfig {
1057 url: "https://example2.com/rpc".to_string(),
1058 weight: 5, },
1060 ];
1061
1062 let selector = RpcSelector::new(configs).unwrap();
1063
1064 let mut seen_urls = HashSet::new();
1066 for _ in 0..10 {
1067 let url = selector.select_url().unwrap().to_string();
1068 seen_urls.insert(url);
1069 }
1070
1071 assert_eq!(seen_urls.len(), 1);
1072 assert!(
1073 seen_urls.iter().next().unwrap().contains("example2"),
1074 "Only the non-zero weight provider should be selected"
1075 );
1076 }
1077
1078 #[test]
1079 fn test_extreme_weight_differences() {
1080 let configs = vec![
1081 RpcConfig {
1082 url: "https://example1.com/rpc".to_string(),
1083 weight: 100, },
1085 RpcConfig {
1086 url: "https://example2.com/rpc".to_string(),
1087 weight: 1, },
1089 ];
1090
1091 let selector = RpcSelector::new(configs).unwrap();
1092
1093 let mut count_high = 0;
1095
1096 for _ in 0..100 {
1097 let url = selector.select_url().unwrap().to_string();
1098 if url.contains("example1") {
1099 count_high += 1;
1100 }
1101
1102 selector.has_current.store(false, Ordering::Relaxed);
1104 }
1105
1106 assert!(
1108 count_high > 90,
1109 "High-weight provider selected only {}/{} times",
1110 count_high,
1111 100
1112 );
1113 }
1114
1115 #[test]
1116 fn test_mark_unselected_as_failed() {
1117 let configs = vec![
1118 RpcConfig::new("https://example1.com/rpc".to_string()),
1119 RpcConfig::new("https://example2.com/rpc".to_string()),
1120 ];
1121
1122 let selector = RpcSelector::new(configs).unwrap();
1123
1124 selector.mark_current_as_failed();
1126
1127 let mut seen_urls = HashSet::new();
1129 for _ in 0..10 {
1130 let url = selector.select_url().unwrap().to_string();
1131 seen_urls.insert(url);
1132
1133 selector.has_current.store(false, Ordering::Relaxed);
1135 }
1136
1137 assert_eq!(
1138 seen_urls.len(),
1139 2,
1140 "Both providers should still be available"
1141 );
1142 }
1143
1144 #[test]
1145 fn test_rpc_selector_error_serialization() {
1146 let error = RpcSelectorError::NoProviders;
1147 let json = serde_json::to_string(&error).unwrap();
1148 assert!(json.contains("NoProviders"));
1149
1150 let error = RpcSelectorError::ClientInitializationError("test error".to_string());
1151 let json = serde_json::to_string(&error).unwrap();
1152 assert!(json.contains("ClientInitializationError"));
1153 assert!(json.contains("test error"));
1154
1155 let error = RpcSelectorError::WeightedIndexError("index error".to_string());
1156 let json = serde_json::to_string(&error).unwrap();
1157 assert!(json.contains("WeightedIndexError"));
1158 assert!(json.contains("index error"));
1159
1160 let error = RpcSelectorError::AllProvidersFailed;
1161 let json = serde_json::to_string(&error).unwrap();
1162 assert!(json.contains("AllProvidersFailed"));
1163 }
1164}