openzeppelin_relayer/bootstrap/
initialize_workers.rs

1//! # Workers
2//! Initialise and starts the workers for the application
3
4use actix_web::web::ThinData;
5use apalis::{layers::ErrorHandlingLayer, prelude::*};
6use apalis_cron::CronStream;
7use eyre::Result;
8use log::{error, info};
9use std::{str::FromStr, time::Duration};
10use tokio::signal::unix::SignalKind;
11
12use crate::{
13    jobs::{
14        notification_handler, solana_token_swap_cron_handler, solana_token_swap_request_handler,
15        transaction_request_handler, transaction_status_handler, transaction_submission_handler,
16        BackoffRetryPolicy,
17    },
18    models::DefaultAppState,
19    repositories::RelayerRepository,
20};
21
22// Review and fine tune configuration for the workers
23const DEFAULT_CONCURRENCY: usize = 2;
24const DEFAULT_RATE_LIMIT: u64 = 20;
25const DEFAULT_RATE_LIMIT_DURATION: Duration = Duration::from_secs(1);
26
27const TRANSACTION_REQUEST: &str = "transaction_request";
28const TRANSACTION_SENDER: &str = "transaction_sender";
29const TRANSACTION_STATUS_CHECKER: &str = "transaction_status_checker";
30const NOTIFICATION_SENDER: &str = "notification_sender";
31const SOLANA_TOKEN_SWAP_REQUEST: &str = "solana_token_swap_request";
32
33pub async fn initialize_workers(app_state: ThinData<DefaultAppState>) -> Result<()> {
34    let queue = app_state.job_producer.get_queue().await?;
35
36    let transaction_request_queue_worker = WorkerBuilder::new(TRANSACTION_REQUEST)
37        .layer(ErrorHandlingLayer::new())
38        .enable_tracing()
39        .catch_panic()
40        .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
41        .retry(BackoffRetryPolicy::default())
42        .concurrency(DEFAULT_CONCURRENCY)
43        .data(app_state.clone())
44        .backend(queue.transaction_request_queue.clone())
45        .build_fn(transaction_request_handler);
46
47    let transaction_submission_queue_worker = WorkerBuilder::new(TRANSACTION_SENDER)
48        .layer(ErrorHandlingLayer::new())
49        .enable_tracing()
50        .catch_panic()
51        .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
52        .retry(BackoffRetryPolicy::default())
53        .concurrency(DEFAULT_CONCURRENCY)
54        .data(app_state.clone())
55        .backend(queue.transaction_submission_queue.clone())
56        .build_fn(transaction_submission_handler);
57
58    let transaction_status_queue_worker = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER)
59        .layer(ErrorHandlingLayer::new())
60        .catch_panic()
61        .enable_tracing()
62        .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
63        .retry(BackoffRetryPolicy::default())
64        .concurrency(DEFAULT_CONCURRENCY)
65        .data(app_state.clone())
66        .backend(queue.transaction_status_queue.clone())
67        .build_fn(transaction_status_handler);
68
69    let notification_queue_worker = WorkerBuilder::new(NOTIFICATION_SENDER)
70        .layer(ErrorHandlingLayer::new())
71        .enable_tracing()
72        .catch_panic()
73        .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
74        .retry(BackoffRetryPolicy::default())
75        .concurrency(DEFAULT_CONCURRENCY)
76        .data(app_state.clone())
77        .backend(queue.notification_queue.clone())
78        .build_fn(notification_handler);
79
80    let solana_token_swap_request_queue_worker = WorkerBuilder::new(SOLANA_TOKEN_SWAP_REQUEST)
81        .layer(ErrorHandlingLayer::new())
82        .enable_tracing()
83        .catch_panic()
84        .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
85        .retry(BackoffRetryPolicy::default())
86        .concurrency(10)
87        .data(app_state.clone())
88        .backend(queue.solana_token_swap_request_queue.clone())
89        .build_fn(solana_token_swap_request_handler);
90
91    let monitor = Monitor::new()
92        .register(transaction_request_queue_worker)
93        .register(transaction_submission_queue_worker)
94        .register(transaction_status_queue_worker)
95        .register(notification_queue_worker)
96        .register(solana_token_swap_request_queue_worker)
97        .on_event(monitor_handle_event)
98        .shutdown_timeout(Duration::from_millis(5000));
99
100    let monitor_future = monitor.run_with_signal(async {
101        let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
102            .expect("Failed to create SIGINT signal");
103        let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
104            .expect("Failed to create SIGTERM signal");
105
106        info!("Monitor started");
107
108        tokio::select! {
109            _ = sigint.recv() => info!("Received SIGINT."),
110            _ = sigterm.recv() => info!("Received SIGTERM."),
111        };
112
113        info!("Monitor shutting down");
114
115        Ok(())
116    });
117    tokio::spawn(async move {
118        if let Err(e) = monitor_future.await {
119            error!("Monitor error: {}", e);
120        }
121    });
122    info!("Monitor shutdown complete");
123    Ok(())
124}
125
126/// Initializes the Solana swap workers
127/// This function creates and registers workers for Solana relayers that have swap enabled and cron schedule set.
128pub async fn initialize_solana_swap_workers(app_state: ThinData<DefaultAppState>) -> Result<()> {
129    let solena_relayers_with_swap_enabled = app_state
130        .relayer_repository
131        .list_active()
132        .await?
133        .into_iter()
134        .filter(|relayer| {
135            let policy = relayer.policies.get_solana_policy();
136            let swap_config = match policy.get_swap_config() {
137                Some(config) => config,
138                None => {
139                    info!("No swap configuration specified; skipping validation.");
140                    return false;
141                }
142            };
143
144            if swap_config.cron_schedule.is_none() {
145                return false;
146            }
147            true
148        })
149        .collect::<Vec<_>>();
150
151    if solena_relayers_with_swap_enabled.is_empty() {
152        info!("No solana relayers with swap enabled");
153        return Ok(());
154    }
155    info!(
156        "Found {} solana relayers with swap enabled",
157        solena_relayers_with_swap_enabled.len()
158    );
159
160    let mut workers = Vec::new();
161
162    for relayer in solena_relayers_with_swap_enabled {
163        info!("Found solana relayer with swap enabled: {:?}", relayer);
164
165        let policy = relayer.policies.get_solana_policy();
166        let swap_config = match policy.get_swap_config() {
167            Some(config) => config,
168            None => {
169                info!("No swap configuration specified; skipping validation.");
170                continue;
171            }
172        };
173
174        let calendar_schedule = match swap_config.cron_schedule {
175            Some(schedule) => apalis_cron::Schedule::from_str(&schedule).unwrap(),
176            None => {
177                info!("No swap cron schedule found for relayer: {:?}", relayer);
178                continue;
179            }
180        };
181
182        // Create worker and add to the workers vector
183        let worker = WorkerBuilder::new(format!("solana-swap-schedule-{}", relayer.id.clone()))
184            .layer(ErrorHandlingLayer::new())
185            .enable_tracing()
186            .catch_panic()
187            .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
188            .retry(BackoffRetryPolicy::default())
189            .concurrency(1)
190            .data(relayer.id.clone())
191            .data(app_state.clone())
192            .backend(CronStream::new(calendar_schedule))
193            .build_fn(solana_token_swap_cron_handler);
194
195        workers.push(worker);
196        info!(
197            "Created worker for solana relayer with swap enabled: {:?}",
198            relayer
199        );
200    }
201
202    let mut monitor = Monitor::new()
203        .on_event(monitor_handle_event)
204        .shutdown_timeout(Duration::from_millis(5000));
205
206    // Register all workers with the monitor
207    for worker in workers {
208        monitor = monitor.register(worker);
209    }
210
211    let monitor_future = monitor.run_with_signal(async {
212        let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
213            .expect("Failed to create SIGINT signal");
214        let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
215            .expect("Failed to create SIGTERM signal");
216
217        info!("Solana Swap Monitor started");
218
219        tokio::select! {
220            _ = sigint.recv() => info!("Received SIGINT."),
221            _ = sigterm.recv() => info!("Received SIGTERM."),
222        };
223
224        info!("Solana Swap Monitor shutting down");
225
226        Ok(())
227    });
228    tokio::spawn(async move {
229        if let Err(e) = monitor_future.await {
230            error!("Monitor error: {}", e);
231        }
232    });
233    Ok(())
234}
235
236fn monitor_handle_event(e: Worker<Event>) {
237    let worker_id = e.id();
238    match e.inner() {
239        Event::Engage(task_id) => {
240            info!("Worker [{worker_id}] got a job with id: {task_id}");
241        }
242        Event::Error(e) => {
243            error!("Worker [{worker_id}] encountered an error: {e}");
244        }
245        Event::Exit => {
246            info!("Worker [{worker_id}] exited");
247        }
248        Event::Idle => {
249            info!("Worker [{worker_id}] is idle");
250        }
251        Event::Start => {
252            info!("Worker [{worker_id}] started");
253        }
254        Event::Stop => {
255            info!("Worker [{worker_id}] stopped");
256        }
257        _ => {}
258    }
259}