openzeppelin_relayer/services/provider/
rpc_selector.rs

1//! # RPC Provider Selector
2//!
3//! This module provides functionality for dynamically selecting RPC endpoints based on configured priorities,
4//! health status, and selection strategies.
5//!
6//! ## Features
7//!
8//! - **Weighted selection**: Providers can be assigned different weights to control selection probability
9//! - **Round-robin fallback**: If weighted selection fails or weights are equal, round-robin is used
10//! - **Health tracking**: Failed providers are temporarily excluded from selection
11//! - **Automatic recovery**: Failed providers are automatically recovered after a configurable period
12use std::collections::HashSet;
13use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
14use std::sync::Arc;
15use std::time::Duration;
16
17use eyre::Result;
18use parking_lot::RwLock;
19use rand::distr::weighted::WeightedIndex;
20use rand::prelude::*;
21use serde::Serialize;
22use thiserror::Error;
23use tokio::time::Instant;
24
25use crate::models::RpcConfig;
26
27#[derive(Error, Debug, Serialize)]
28pub enum RpcSelectorError {
29    #[error("No providers available")]
30    NoProviders,
31    #[error("Client initialization failed: {0}")]
32    ClientInitializationError(String),
33    #[error("Weighted index error: {0}")]
34    WeightedIndexError(String),
35    #[error("All available providers have failed")]
36    AllProvidersFailed,
37}
38
39// Provider health tracking struct
40#[derive(Debug)]
41struct ProviderHealth {
42    // Maps the index of each failed provider to the timestamp (Instant) when it will become available for use again.
43    failed_provider_reset_times: std::collections::HashMap<usize, Instant>,
44    // The amount of time a provider remains unavailable after being marked as failed.
45    reset_duration: Duration,
46}
47
48impl ProviderHealth {
49    // Create a new ProviderHealth tracker with a given reset duration
50    fn new(reset_duration: Duration) -> Self {
51        Self {
52            failed_provider_reset_times: std::collections::HashMap::new(),
53            reset_duration,
54        }
55    }
56
57    // Mark a provider as failed
58    fn mark_failed(&mut self, index: usize) {
59        let reset_time = Instant::now() + self.reset_duration;
60        self.failed_provider_reset_times.insert(index, reset_time);
61    }
62
63    // Check if a provider is marked as failed and handle auto-reset if needed
64    fn is_failed(&mut self, index: usize) -> bool {
65        if let Some(reset_time) = self.failed_provider_reset_times.get(&index) {
66            if Instant::now() >= *reset_time {
67                // Time has passed, remove from failed set (auto-reset for this provider)
68                self.failed_provider_reset_times.remove(&index);
69                return false;
70            }
71            return true;
72        }
73        false
74    }
75
76    // Reset all failed providers
77    fn reset(&mut self) {
78        self.failed_provider_reset_times.clear();
79    }
80
81    // Get the number of failed providers whose reset time has not yet passed.
82    fn failed_count(&self) -> usize {
83        self.failed_provider_reset_times
84            .values()
85            .filter(|&&reset_time| Instant::now() < reset_time)
86            .count()
87    }
88}
89
90/// Manages selection of RPC endpoints based on configuration.
91#[derive(Debug)]
92pub struct RpcSelector {
93    /// RPC configurations
94    configs: Vec<RpcConfig>,
95    /// Pre-computed weighted distribution for faster provider selection
96    weights_dist: Option<Arc<WeightedIndex<u8>>>,
97    /// Counter for round-robin selection as a fallback or for equal weights
98    next_index: Arc<AtomicUsize>,
99    /// Health tracking for providers
100    health: Arc<RwLock<ProviderHealth>>,
101    /// Currently selected provider index
102    current_index: Arc<AtomicUsize>,
103    /// Flag indicating whether a current provider is valid
104    has_current: Arc<AtomicBool>,
105}
106
107// Auto-reset duration for failed providers (5 minutes)
108const DEFAULT_PROVIDER_RESET_DURATION: Duration = Duration::from_secs(300);
109
110impl RpcSelector {
111    /// Creates a new RpcSelector instance.
112    ///
113    /// # Arguments
114    /// * `configs` - A vector of RPC configurations (URL and weight)
115    ///
116    /// # Returns
117    /// * `Result<Self>` - A new selector instance or an error
118    pub fn new(configs: Vec<RpcConfig>) -> Result<Self, RpcSelectorError> {
119        if configs.is_empty() {
120            return Err(RpcSelectorError::NoProviders);
121        }
122
123        // Create the weights distribution based on provided weights
124        let weights_dist = Self::create_weights_distribution(&configs, &HashSet::new());
125
126        // Initialize health tracker with default reset duration
127        let health = ProviderHealth::new(DEFAULT_PROVIDER_RESET_DURATION);
128
129        Ok(Self {
130            configs,
131            weights_dist,
132            next_index: Arc::new(AtomicUsize::new(0)),
133            health: Arc::new(RwLock::new(health)),
134            current_index: Arc::new(AtomicUsize::new(0)),
135            has_current: Arc::new(AtomicBool::new(false)), // Initially no current provider
136        })
137    }
138
139    /// Gets the number of available providers
140    ///
141    /// # Returns
142    /// * `usize` - The number of providers in the selector
143    pub fn provider_count(&self) -> usize {
144        self.configs.len()
145    }
146
147    /// Gets the number of available (non-failed) providers
148    ///
149    /// # Returns
150    /// * `usize` - The number of non-failed providers
151    pub fn available_provider_count(&self) -> usize {
152        let health = self.health.read();
153        self.configs.len() - health.failed_count()
154    }
155
156    /// Marks the current endpoint as failed and forces selection of a different endpoint.
157    ///
158    /// This method is used when a provider consistently fails, and we want to try a different one.
159    /// It adds the current provider to the failed providers set and will avoid selecting it again.
160    pub fn mark_current_as_failed(&self) {
161        // Only proceed if we have a current provider
162        if self.has_current.load(Ordering::Relaxed) {
163            let current = self.current_index.load(Ordering::Relaxed);
164
165            // Mark this provider as failed
166            let mut health = self.health.write();
167            health.mark_failed(current);
168
169            // Clear the current provider
170            self.has_current.store(false, Ordering::Relaxed);
171
172            // Move round-robin index forward to avoid selecting the same provider again
173            if self.configs.len() > 1 {
174                self.next_index.fetch_add(1, Ordering::Relaxed);
175            }
176        }
177    }
178
179    /// Resets the failed providers set, making all providers available again.
180    pub fn reset_failed_providers(&self) {
181        let mut health = self.health.write();
182        health.reset();
183    }
184
185    /// Creates a weighted distribution for selecting RPC endpoints based on their weights.
186    ///
187    /// # Arguments
188    /// * `configs` - A slice of RPC configurations with weights
189    /// * `excluded_indices` - A set of indices to exclude from the distribution
190    ///
191    /// # Returns
192    /// * `Option<Arc<WeightedIndex<u8>>>` - A weighted distribution if configs have different weights, None otherwise
193    fn create_weights_distribution(
194        configs: &[RpcConfig],
195        excluded_indices: &HashSet<usize>,
196    ) -> Option<Arc<WeightedIndex<u8>>> {
197        // Collect weights, using 0 for excluded providers
198        let weights: Vec<u8> = configs
199            .iter()
200            .enumerate()
201            .map(|(idx, config)| {
202                if excluded_indices.contains(&idx) {
203                    0
204                } else {
205                    config.get_weight()
206                }
207            })
208            .collect();
209
210        // Count available providers with non-zero weight
211        let available_count = weights.iter().filter(|&&w| w > 0).count();
212        if available_count == 0 {
213            return None;
214        }
215
216        let first_non_zero_weight = weights.iter().find(|&&w| w > 0).copied();
217        if let Some(first_weight) = first_non_zero_weight {
218            // First check for the original equal weights case
219            let all_equal = weights
220                .iter()
221                .filter(|&&w| w > 0)
222                .all(|&w| w == first_weight);
223
224            if all_equal {
225                return None;
226            }
227        }
228
229        // Create weighted distribution
230        match WeightedIndex::new(&weights) {
231            Ok(dist) => Some(Arc::new(dist)),
232            Err(_) => None,
233        }
234    }
235
236    /// Gets the URL of the next RPC endpoint based on the selection strategy.
237    fn select_url(&self) -> Result<&str, RpcSelectorError> {
238        if self.configs.is_empty() {
239            return Err(RpcSelectorError::NoProviders);
240        }
241
242        // For a single provider, handle special case
243        if self.configs.len() == 1 {
244            let mut health = self.health.write();
245            if health.is_failed(0) {
246                // is_failed will attempt auto-reset for provider 0
247                return Err(RpcSelectorError::AllProvidersFailed);
248            }
249
250            // Set as current
251            self.current_index.store(0, Ordering::Relaxed);
252            self.has_current.store(true, Ordering::Relaxed);
253            return Ok(&self.configs[0].url);
254        }
255
256        // Try weighted selection first if available
257        if let Some(dist) = &self.weights_dist {
258            let mut rng = rand::rng();
259            let mut health = self.health.write();
260
261            // Try a limited number of times to find a non-failed provider with weighted selection
262            const MAX_ATTEMPTS: usize = 5;
263            for _ in 0..MAX_ATTEMPTS {
264                let index = dist.sample(&mut rng);
265                if !health.is_failed(index) {
266                    self.current_index.store(index, Ordering::Relaxed);
267                    self.has_current.store(true, Ordering::Relaxed);
268                    return Ok(&self.configs[index].url);
269                }
270            }
271            // If we couldn't find a provider after multiple attempts, fall back to round-robin
272        }
273
274        // Fall back to round-robin selection
275        let len = self.configs.len();
276        let start_index = self.next_index.load(Ordering::Relaxed) % len;
277
278        // Find the next available (non-failed) provider
279        for i in 0..len {
280            let index = (start_index + i) % len;
281            // Skip providers with zero weight
282            if self.configs[index].get_weight() == 0 {
283                continue;
284            }
285
286            let mut health = self.health.write();
287            if !health.is_failed(index) {
288                // Update the next_index atomically to point after this provider
289                self.next_index.store((index + 1) % len, Ordering::Relaxed);
290
291                // Set as current provider
292                self.current_index.store(index, Ordering::Relaxed);
293                self.has_current.store(true, Ordering::Relaxed);
294
295                return Ok(&self.configs[index].url);
296            }
297        }
298
299        // If we get here, all providers must have failed
300        Err(RpcSelectorError::AllProvidersFailed)
301    }
302
303    /// Gets the URL of the currently selected RPC endpoint.
304    ///
305    /// # Returns
306    /// * `Result<String, RpcSelectorError>` - The URL of the current provider, or an error
307    pub fn get_current_url(&self) -> Result<String, RpcSelectorError> {
308        self.select_url().map(|url| url.to_string())
309    }
310
311    /// Gets a client for the selected RPC endpoint.
312    ///
313    /// # Arguments
314    /// * `initializer` - A function that takes a URL string and returns a Result<T>
315    ///
316    /// # Returns
317    /// * `Result<T>` - The client instance or an error
318    pub fn get_client<T>(
319        &self,
320        initializer: impl Fn(&str) -> Result<T>,
321    ) -> Result<T, RpcSelectorError> {
322        let url = self.select_url()?;
323
324        initializer(url).map_err(|e| RpcSelectorError::ClientInitializationError(e.to_string()))
325    }
326}
327
328// Implement Clone for RpcSelector manually since the generic T doesn't require Clone
329impl Clone for RpcSelector {
330    fn clone(&self) -> Self {
331        Self {
332            configs: self.configs.clone(),
333            weights_dist: self.weights_dist.clone(),
334            next_index: Arc::clone(&self.next_index),
335            health: Arc::clone(&self.health),
336            current_index: Arc::clone(&self.current_index),
337            has_current: Arc::clone(&self.has_current),
338        }
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345    use std::thread;
346
347    #[test]
348    fn test_create_weights_distribution_single_config() {
349        let configs = vec![RpcConfig {
350            url: "https://example.com/rpc".to_string(),
351            weight: 1,
352        }];
353
354        let excluded = HashSet::new();
355        let result = RpcSelector::create_weights_distribution(&configs, &excluded);
356        assert!(result.is_none());
357    }
358
359    #[test]
360    fn test_create_weights_distribution_equal_weights() {
361        let configs = vec![
362            RpcConfig {
363                url: "https://example1.com/rpc".to_string(),
364                weight: 5,
365            },
366            RpcConfig {
367                url: "https://example2.com/rpc".to_string(),
368                weight: 5,
369            },
370            RpcConfig {
371                url: "https://example3.com/rpc".to_string(),
372                weight: 5,
373            },
374        ];
375
376        let excluded = HashSet::new();
377        let result = RpcSelector::create_weights_distribution(&configs, &excluded);
378        assert!(result.is_none());
379    }
380
381    #[test]
382    fn test_create_weights_distribution_different_weights() {
383        let configs = vec![
384            RpcConfig {
385                url: "https://example1.com/rpc".to_string(),
386                weight: 1,
387            },
388            RpcConfig {
389                url: "https://example2.com/rpc".to_string(),
390                weight: 2,
391            },
392            RpcConfig {
393                url: "https://example3.com/rpc".to_string(),
394                weight: 3,
395            },
396        ];
397
398        let excluded = HashSet::new();
399        let result = RpcSelector::create_weights_distribution(&configs, &excluded);
400        assert!(result.is_some());
401    }
402
403    #[test]
404    fn test_create_weights_distribution_with_excluded() {
405        let configs = vec![
406            RpcConfig {
407                url: "https://example1.com/rpc".to_string(),
408                weight: 1,
409            },
410            RpcConfig {
411                url: "https://example2.com/rpc".to_string(),
412                weight: 2,
413            },
414            RpcConfig {
415                url: "https://example3.com/rpc".to_string(),
416                weight: 3,
417            },
418        ];
419
420        // Exclude the first provider
421        let mut excluded = HashSet::new();
422        excluded.insert(0);
423
424        let result = RpcSelector::create_weights_distribution(&configs, &excluded);
425        assert!(result.is_some());
426
427        // Exclude two providers (with only one remaining, should return None)
428        excluded.insert(1);
429        let result = RpcSelector::create_weights_distribution(&configs, &excluded);
430        assert!(result.is_none());
431    }
432
433    #[test]
434    fn test_rpc_selector_new_empty_configs() {
435        let configs: Vec<RpcConfig> = vec![];
436        let result = RpcSelector::new(configs);
437        assert!(result.is_err());
438        assert!(matches!(result.unwrap_err(), RpcSelectorError::NoProviders));
439    }
440
441    #[test]
442    fn test_rpc_selector_new_single_config() {
443        let configs = vec![RpcConfig {
444            url: "https://example.com/rpc".to_string(),
445            weight: 1,
446        }];
447
448        let result = RpcSelector::new(configs);
449        assert!(result.is_ok());
450        let selector = result.unwrap();
451        assert!(selector.weights_dist.is_none());
452    }
453
454    #[test]
455    fn test_rpc_selector_new_multiple_equal_weights() {
456        let configs = vec![
457            RpcConfig {
458                url: "https://example1.com/rpc".to_string(),
459                weight: 5,
460            },
461            RpcConfig {
462                url: "https://example2.com/rpc".to_string(),
463                weight: 5,
464            },
465        ];
466
467        let result = RpcSelector::new(configs);
468        assert!(result.is_ok());
469        let selector = result.unwrap();
470        assert!(selector.weights_dist.is_none());
471    }
472
473    #[test]
474    fn test_rpc_selector_new_multiple_different_weights() {
475        let configs = vec![
476            RpcConfig {
477                url: "https://example1.com/rpc".to_string(),
478                weight: 1,
479            },
480            RpcConfig {
481                url: "https://example2.com/rpc".to_string(),
482                weight: 3,
483            },
484        ];
485
486        let result = RpcSelector::new(configs);
487        assert!(result.is_ok());
488        let selector = result.unwrap();
489        assert!(selector.weights_dist.is_some());
490    }
491
492    #[test]
493    fn test_rpc_selector_select_url_single_provider() {
494        let configs = vec![RpcConfig {
495            url: "https://example.com/rpc".to_string(),
496            weight: 1,
497        }];
498
499        let selector = RpcSelector::new(configs).unwrap();
500        let result = selector.select_url();
501        assert!(result.is_ok());
502        assert_eq!(result.unwrap(), "https://example.com/rpc");
503        assert!(selector.has_current.load(Ordering::Relaxed));
504    }
505
506    #[test]
507    fn test_rpc_selector_select_url_round_robin() {
508        let configs = vec![
509            RpcConfig {
510                url: "https://example1.com/rpc".to_string(),
511                weight: 1,
512            },
513            RpcConfig {
514                url: "https://example2.com/rpc".to_string(),
515                weight: 1,
516            },
517        ];
518
519        let selector = RpcSelector::new(configs).unwrap();
520
521        // First call should return the first URL
522        let first_url = selector.select_url().unwrap();
523        // Second call should return the second URL due to round-robin
524        let second_url = selector.select_url().unwrap();
525        // Third call should return the first URL again
526        let third_url = selector.select_url().unwrap();
527
528        // We don't know which URL comes first, but the sequence should alternate
529        assert_ne!(first_url, second_url);
530        assert_eq!(first_url, third_url);
531    }
532
533    #[test]
534    fn test_rpc_selector_get_client_success() {
535        let configs = vec![RpcConfig {
536            url: "https://example.com/rpc".to_string(),
537            weight: 1,
538        }];
539
540        let selector = RpcSelector::new(configs).unwrap();
541
542        // Create a simple initializer function that returns the URL as a string
543        let initializer = |url: &str| -> Result<String> { Ok(url.to_string()) };
544
545        let result = selector.get_client(initializer);
546        assert!(result.is_ok());
547        assert_eq!(result.unwrap(), "https://example.com/rpc");
548    }
549
550    #[test]
551    fn test_rpc_selector_get_client_failure() {
552        let configs = vec![RpcConfig {
553            url: "https://example.com/rpc".to_string(),
554            weight: 1,
555        }];
556
557        let selector = RpcSelector::new(configs).unwrap();
558
559        // Create a failing initializer function
560        let initializer =
561            |_url: &str| -> Result<String> { Err(eyre::eyre!("Initialization error")) };
562
563        let result = selector.get_client(initializer);
564        assert!(result.is_err());
565        assert!(matches!(
566            result.unwrap_err(),
567            RpcSelectorError::ClientInitializationError(_)
568        ));
569    }
570
571    #[test]
572    fn test_rpc_selector_clone() {
573        let configs = vec![
574            RpcConfig {
575                url: "https://example1.com/rpc".to_string(),
576                weight: 1,
577            },
578            RpcConfig {
579                url: "https://example2.com/rpc".to_string(),
580                weight: 3,
581            },
582        ];
583
584        let selector = RpcSelector::new(configs).unwrap();
585        let cloned = selector.clone();
586
587        // Check that the cloned selector has the same configuration
588        assert_eq!(selector.configs.len(), cloned.configs.len());
589        assert_eq!(selector.configs[0].url, cloned.configs[0].url);
590        assert_eq!(selector.configs[1].url, cloned.configs[1].url);
591
592        // Check that weights distribution is also cloned
593        assert_eq!(
594            selector.weights_dist.is_some(),
595            cloned.weights_dist.is_some()
596        );
597    }
598
599    #[test]
600    fn test_mark_current_as_failed_single_provider() {
601        // With a single provider, marking as failed should cause an error when trying to select it again
602        let configs = vec![RpcConfig {
603            url: "https://example.com/rpc".to_string(),
604            weight: 1,
605        }];
606
607        let selector = RpcSelector::new(configs).unwrap();
608        let initial_url = selector.select_url().unwrap();
609
610        // Mark as failed
611        selector.mark_current_as_failed();
612
613        // Next call should return an error
614        let next_url = selector.select_url();
615        assert!(next_url.is_err());
616        assert!(matches!(
617            next_url.unwrap_err(),
618            RpcSelectorError::AllProvidersFailed
619        ));
620
621        // Reset failed providers
622        selector.reset_failed_providers();
623
624        // Now we should be able to select the provider again
625        let after_reset = selector.select_url();
626        assert!(after_reset.is_ok());
627        assert_eq!(initial_url, after_reset.unwrap());
628    }
629
630    #[test]
631    fn test_mark_current_as_failed_multiple_providers() {
632        // With multiple providers, marking as failed should prevent that provider from being selected again
633        let configs = vec![
634            RpcConfig {
635                url: "https://example1.com/rpc".to_string(),
636                weight: 5,
637            },
638            RpcConfig {
639                url: "https://example2.com/rpc".to_string(),
640                weight: 5,
641            },
642            RpcConfig {
643                url: "https://example3.com/rpc".to_string(),
644                weight: 5,
645            },
646        ];
647
648        let selector = RpcSelector::new(configs).unwrap();
649
650        // Get the first URL
651        let url1 = selector.select_url().unwrap().to_string();
652
653        // Mark as failed to move to a different one
654        selector.mark_current_as_failed();
655        let url2 = selector.select_url().unwrap().to_string();
656
657        // The URLs should be different
658        assert_ne!(url1, url2);
659
660        // Mark the second URL as failed too
661        selector.mark_current_as_failed();
662        let url3 = selector.select_url().unwrap().to_string();
663
664        // Should get a third different URL
665        assert_ne!(url1, url3);
666        assert_ne!(url2, url3);
667
668        // Mark the third URL as failed too
669        selector.mark_current_as_failed();
670
671        // Now all URLs should be marked as failed, so next call should return error
672        let url4 = selector.select_url();
673        assert!(url4.is_err());
674        assert!(matches!(
675            url4.unwrap_err(),
676            RpcSelectorError::AllProvidersFailed
677        ));
678    }
679
680    #[test]
681    fn test_mark_current_as_failed_weighted() {
682        // Test with weighted selection
683        let configs = vec![
684            RpcConfig {
685                url: "https://example1.com/rpc".to_string(),
686                weight: 1, // Low weight
687            },
688            RpcConfig {
689                url: "https://example2.com/rpc".to_string(),
690                weight: 10, // High weight
691            },
692        ];
693
694        let selector = RpcSelector::new(configs).unwrap();
695        assert!(selector.weights_dist.is_some()); // Confirm we're using weighted selection
696
697        // Get a URL
698        let url1 = selector.select_url().unwrap().to_string();
699
700        // Mark it as failed
701        selector.mark_current_as_failed();
702
703        // Get another URL, it should be different
704        let url2 = selector.select_url().unwrap().to_string();
705        assert_ne!(url1, url2);
706
707        // Mark this one as failed too
708        selector.mark_current_as_failed();
709
710        // With no more providers, next call should fail
711        let url3 = selector.select_url();
712        assert!(url3.is_err());
713
714        // Reset and try again
715        selector.reset_failed_providers();
716        let url4 = selector.select_url();
717        assert!(url4.is_ok());
718    }
719
720    #[test]
721    fn test_auto_reset_mechanism() {
722        // Create a selector with a very short reset duration
723        let configs = vec![
724            RpcConfig {
725                url: "https://example1.com/rpc".to_string(),
726                weight: 1,
727            },
728            RpcConfig {
729                url: "https://example2.com/rpc".to_string(),
730                weight: 1,
731            },
732        ];
733
734        // Change the auto-reset duration for this test
735        let selector = RpcSelector::new(configs).unwrap();
736        {
737            let mut health = selector.health.write();
738            *health = ProviderHealth::new(Duration::from_millis(100)); // Very short duration for testing
739        }
740
741        // Select and mark both as failed
742        selector.select_url().unwrap();
743        selector.mark_current_as_failed();
744        selector.select_url().unwrap();
745        selector.mark_current_as_failed();
746
747        // Immediately after, all providers should be failed
748        let result = selector.select_url();
749        assert!(result.is_err());
750
751        // Sleep for longer than the reset duration
752        thread::sleep(Duration::from_millis(150));
753
754        // Force a check for auto-reset by directly calling is_failed()
755        {
756            let mut health = selector.health.write();
757            // This should trigger auto-reset
758            let _ = health.is_failed(0);
759        }
760
761        // After sleeping and checking, providers should be auto-reset
762        let result = selector.select_url();
763        assert!(
764            result.is_ok(),
765            "Providers should have been auto-reset after timeout"
766        );
767    }
768
769    #[test]
770    fn test_provider_count() {
771        // Test with no providers
772        let configs: Vec<RpcConfig> = vec![];
773        let result = RpcSelector::new(configs);
774        assert!(result.is_err());
775
776        // Test with a single provider
777        let configs = vec![RpcConfig {
778            url: "https://example.com/rpc".to_string(),
779            weight: 1,
780        }];
781        let selector = RpcSelector::new(configs).unwrap();
782        assert_eq!(selector.provider_count(), 1);
783
784        // Test with multiple providers
785        let configs = vec![
786            RpcConfig {
787                url: "https://example1.com/rpc".to_string(),
788                weight: 1,
789            },
790            RpcConfig {
791                url: "https://example2.com/rpc".to_string(),
792                weight: 2,
793            },
794            RpcConfig {
795                url: "https://example3.com/rpc".to_string(),
796                weight: 3,
797            },
798        ];
799        let selector = RpcSelector::new(configs).unwrap();
800        assert_eq!(selector.provider_count(), 3);
801    }
802
803    #[test]
804    fn test_available_provider_count() {
805        let configs = vec![
806            RpcConfig {
807                url: "https://example1.com/rpc".to_string(),
808                weight: 1,
809            },
810            RpcConfig {
811                url: "https://example2.com/rpc".to_string(),
812                weight: 2,
813            },
814            RpcConfig {
815                url: "https://example3.com/rpc".to_string(),
816                weight: 3,
817            },
818        ];
819
820        let selector = RpcSelector::new(configs).unwrap();
821        assert_eq!(selector.provider_count(), 3);
822        assert_eq!(selector.available_provider_count(), 3);
823
824        // Mark one provider as failed
825        selector.select_url().unwrap(); // Select a provider first
826        selector.mark_current_as_failed();
827        assert_eq!(selector.available_provider_count(), 2);
828
829        // Mark another provider as failed
830        selector.select_url().unwrap(); // Select another provider
831        selector.mark_current_as_failed();
832        assert_eq!(selector.available_provider_count(), 1);
833
834        // Reset failed providers
835        selector.reset_failed_providers();
836        assert_eq!(selector.available_provider_count(), 3);
837    }
838
839    #[test]
840    fn test_get_current_url() {
841        let configs = vec![
842            RpcConfig::new("https://example1.com/rpc".to_string()),
843            RpcConfig::new("https://example2.com/rpc".to_string()),
844        ];
845
846        let selector = RpcSelector::new(configs).unwrap();
847
848        // Should return a valid URL
849        let url = selector.get_current_url();
850        assert!(url.is_ok());
851        let url_str = url.unwrap();
852        assert!(
853            url_str == "https://example1.com/rpc" || url_str == "https://example2.com/rpc",
854            "Unexpected URL: {}",
855            url_str
856        );
857    }
858
859    #[test]
860    fn test_concurrent_usage() {
861        // Test RpcSelector with concurrent access from multiple threads
862        let configs = vec![
863            RpcConfig::new("https://example1.com/rpc".to_string()),
864            RpcConfig::new("https://example2.com/rpc".to_string()),
865            RpcConfig::new("https://example3.com/rpc".to_string()),
866        ];
867
868        let selector = RpcSelector::new(configs).unwrap();
869        let selector_arc = Arc::new(selector);
870
871        let mut handles = Vec::with_capacity(10);
872
873        // Launch 10 threads that select and mark providers
874        for _ in 0..10 {
875            let selector_clone = Arc::clone(&selector_arc);
876            let handle = thread::spawn(move || {
877                let url = selector_clone.select_url().unwrap().to_string();
878                if url.contains("example1") {
879                    // Only mark example1 as failed
880                    selector_clone.mark_current_as_failed();
881                }
882                url
883            });
884            handles.push(handle);
885        }
886
887        // Collect results
888        let mut urls = Vec::new();
889        for handle in handles {
890            urls.push(handle.join().unwrap());
891        }
892
893        // Check that at least some threads got different URLs
894        let unique_urls: std::collections::HashSet<String> = urls.into_iter().collect();
895        assert!(unique_urls.len() > 1, "Expected multiple unique URLs");
896
897        // After all threads, example1 should be marked as failed
898        let mut found_non_example1 = false;
899        for _ in 0..10 {
900            let url = selector_arc.select_url().unwrap().to_string();
901            if !url.contains("example1") {
902                found_non_example1 = true;
903            }
904        }
905
906        assert!(found_non_example1, "Should avoid selecting failed provider");
907    }
908
909    #[test]
910    fn test_provider_health_methods() {
911        let duration = Duration::from_secs(10);
912        let mut health = ProviderHealth::new(duration);
913
914        // Initially no failed providers
915        assert_eq!(health.failed_count(), 0);
916        assert!(!health.is_failed(0));
917
918        // Mark as failed and verify
919        health.mark_failed(0);
920        assert_eq!(health.failed_count(), 1);
921        assert!(health.is_failed(0));
922
923        // Reset and verify
924        health.reset();
925        assert_eq!(health.failed_count(), 0);
926        assert!(!health.is_failed(0));
927    }
928
929    #[test]
930    fn test_consecutive_mark_as_failed() {
931        let configs = vec![
932            RpcConfig::new("https://example1.com/rpc".to_string()),
933            RpcConfig::new("https://example2.com/rpc".to_string()),
934        ];
935
936        let selector = RpcSelector::new(configs).unwrap();
937
938        // First call to select a provider
939        selector.select_url().unwrap();
940
941        // Mark as failed twice consecutively without selecting in between
942        selector.mark_current_as_failed();
943        selector.mark_current_as_failed(); // This should be a no-op since has_current is now 0
944
945        // We should still be able to select a provider (since only one was marked failed)
946        let result = selector.select_url();
947        assert!(result.is_ok());
948    }
949
950    #[test]
951    fn test_partial_auto_reset() {
952        let configs = vec![
953            RpcConfig::new("https://example1.com/rpc".to_string()),
954            RpcConfig::new("https://example2.com/rpc".to_string()),
955            RpcConfig::new("https://example3.com/rpc".to_string()),
956        ];
957
958        let selector = RpcSelector::new(configs).unwrap();
959
960        // Override the reset durations to be different
961        {
962            let mut health = selector.health.write();
963            *health = ProviderHealth::new(Duration::from_millis(50)); // Very short duration
964        }
965
966        // Select and mark all providers as failed
967        for _ in 0..3 {
968            selector.select_url().unwrap();
969            selector.mark_current_as_failed();
970        }
971
972        // All providers should now be marked as failed
973        assert!(selector.select_url().is_err());
974
975        // Mark provider 0 with a longer timeout manually
976        {
977            let mut health = selector.health.write();
978            health
979                .failed_provider_reset_times
980                .insert(0, Instant::now() + Duration::from_millis(200));
981        }
982
983        // Sleep for enough time to auto-reset providers 1 and 2, but not 0
984        thread::sleep(Duration::from_millis(100));
985
986        // Now provider 0 should still be failed, but 1 and 2 should be available
987        let url = selector.select_url();
988        assert!(url.is_ok());
989
990        // The selected URL should not be provider 0
991        assert!(!url.unwrap().contains("example1"));
992    }
993
994    #[test]
995    fn test_weighted_to_round_robin_fallback() {
996        let configs = vec![
997            RpcConfig {
998                url: "https://example1.com/rpc".to_string(),
999                weight: 10, // High weight
1000            },
1001            RpcConfig {
1002                url: "https://example2.com/rpc".to_string(),
1003                weight: 1, // Low weight
1004            },
1005            RpcConfig {
1006                url: "https://example3.com/rpc".to_string(),
1007                weight: 1, // Low weight
1008            },
1009        ];
1010
1011        let selector = RpcSelector::new(configs).unwrap();
1012        assert!(selector.weights_dist.is_some()); // Using weighted selection
1013
1014        // Mock a situation where weighted selection would fail multiple times
1015        // by marking the high-weight provider as failed
1016        let mut selected_first = false;
1017
1018        // Try multiple times - the first provider should be selected more often due to weight
1019        for _ in 0..10 {
1020            let url = selector.select_url().unwrap();
1021            if url.contains("example1") {
1022                selected_first = true;
1023                // Mark the high-weight provider as failed
1024                selector.mark_current_as_failed();
1025                break;
1026            }
1027        }
1028
1029        assert!(
1030            selected_first,
1031            "High-weight provider should have been selected"
1032        );
1033
1034        // After marking it failed, the other providers should be selected
1035        let mut seen_urls = HashSet::new();
1036        for _ in 0..10 {
1037            let url = selector.select_url().unwrap().to_string();
1038            seen_urls.insert(url);
1039        }
1040
1041        // Should have seen at least example2 and example3
1042        assert!(seen_urls.len() >= 2);
1043        assert!(
1044            !seen_urls.iter().any(|url| url.contains("example1")),
1045            "Failed provider should not be selected"
1046        );
1047    }
1048
1049    #[test]
1050    fn test_zero_weight_providers() {
1051        let configs = vec![
1052            RpcConfig {
1053                url: "https://example1.com/rpc".to_string(),
1054                weight: 0, // Zero weight
1055            },
1056            RpcConfig {
1057                url: "https://example2.com/rpc".to_string(),
1058                weight: 5, // Normal weight
1059            },
1060        ];
1061
1062        let selector = RpcSelector::new(configs).unwrap();
1063
1064        // With weighted selection, should never select the zero-weight provider
1065        let mut seen_urls = HashSet::new();
1066        for _ in 0..10 {
1067            let url = selector.select_url().unwrap().to_string();
1068            seen_urls.insert(url);
1069        }
1070
1071        assert_eq!(seen_urls.len(), 1);
1072        assert!(
1073            seen_urls.iter().next().unwrap().contains("example2"),
1074            "Only the non-zero weight provider should be selected"
1075        );
1076    }
1077
1078    #[test]
1079    fn test_extreme_weight_differences() {
1080        let configs = vec![
1081            RpcConfig {
1082                url: "https://example1.com/rpc".to_string(),
1083                weight: 100, // Very high weight
1084            },
1085            RpcConfig {
1086                url: "https://example2.com/rpc".to_string(),
1087                weight: 1, // Very low weight
1088            },
1089        ];
1090
1091        let selector = RpcSelector::new(configs).unwrap();
1092
1093        // High weight provider should be selected much more frequently
1094        let mut count_high = 0;
1095
1096        for _ in 0..100 {
1097            let url = selector.select_url().unwrap().to_string();
1098            if url.contains("example1") {
1099                count_high += 1;
1100            }
1101
1102            // Reset to clear current selection
1103            selector.has_current.store(false, Ordering::Relaxed);
1104        }
1105
1106        // High-weight provider should be selected at least 90% of the time
1107        assert!(
1108            count_high > 90,
1109            "High-weight provider selected only {}/{} times",
1110            count_high,
1111            100
1112        );
1113    }
1114
1115    #[test]
1116    fn test_mark_unselected_as_failed() {
1117        let configs = vec![
1118            RpcConfig::new("https://example1.com/rpc".to_string()),
1119            RpcConfig::new("https://example2.com/rpc".to_string()),
1120        ];
1121
1122        let selector = RpcSelector::new(configs).unwrap();
1123
1124        // Without selecting, mark as failed (should be a no-op)
1125        selector.mark_current_as_failed();
1126
1127        // Should still be able to select both providers
1128        let mut seen_urls = HashSet::new();
1129        for _ in 0..10 {
1130            let url = selector.select_url().unwrap().to_string();
1131            seen_urls.insert(url);
1132
1133            // Reset for next iteration
1134            selector.has_current.store(false, Ordering::Relaxed);
1135        }
1136
1137        assert_eq!(
1138            seen_urls.len(),
1139            2,
1140            "Both providers should still be available"
1141        );
1142    }
1143
1144    #[test]
1145    fn test_rpc_selector_error_serialization() {
1146        let error = RpcSelectorError::NoProviders;
1147        let json = serde_json::to_string(&error).unwrap();
1148        assert!(json.contains("NoProviders"));
1149
1150        let error = RpcSelectorError::ClientInitializationError("test error".to_string());
1151        let json = serde_json::to_string(&error).unwrap();
1152        assert!(json.contains("ClientInitializationError"));
1153        assert!(json.contains("test error"));
1154
1155        let error = RpcSelectorError::WeightedIndexError("index error".to_string());
1156        let json = serde_json::to_string(&error).unwrap();
1157        assert!(json.contains("WeightedIndexError"));
1158        assert!(json.contains("index error"));
1159
1160        let error = RpcSelectorError::AllProvidersFailed;
1161        let json = serde_json::to_string(&error).unwrap();
1162        assert!(json.contains("AllProvidersFailed"));
1163    }
1164}