openzeppelin_relayer/bootstrap/
initialize_workers.rs1use actix_web::web::ThinData;
5use apalis::{layers::ErrorHandlingLayer, prelude::*};
6use apalis_cron::CronStream;
7use eyre::Result;
8use log::{error, info};
9use std::{str::FromStr, time::Duration};
10use tokio::signal::unix::SignalKind;
11
12use crate::{
13 jobs::{
14 notification_handler, solana_token_swap_cron_handler, solana_token_swap_request_handler,
15 transaction_request_handler, transaction_status_handler, transaction_submission_handler,
16 BackoffRetryPolicy,
17 },
18 models::DefaultAppState,
19 repositories::RelayerRepository,
20};
21
22const DEFAULT_CONCURRENCY: usize = 2;
24const DEFAULT_RATE_LIMIT: u64 = 20;
25const DEFAULT_RATE_LIMIT_DURATION: Duration = Duration::from_secs(1);
26
27const TRANSACTION_REQUEST: &str = "transaction_request";
28const TRANSACTION_SENDER: &str = "transaction_sender";
29const TRANSACTION_STATUS_CHECKER: &str = "transaction_status_checker";
30const NOTIFICATION_SENDER: &str = "notification_sender";
31const SOLANA_TOKEN_SWAP_REQUEST: &str = "solana_token_swap_request";
32
33pub async fn initialize_workers(app_state: ThinData<DefaultAppState>) -> Result<()> {
34 let queue = app_state.job_producer.get_queue().await?;
35
36 let transaction_request_queue_worker = WorkerBuilder::new(TRANSACTION_REQUEST)
37 .layer(ErrorHandlingLayer::new())
38 .enable_tracing()
39 .catch_panic()
40 .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
41 .retry(BackoffRetryPolicy::default())
42 .concurrency(DEFAULT_CONCURRENCY)
43 .data(app_state.clone())
44 .backend(queue.transaction_request_queue.clone())
45 .build_fn(transaction_request_handler);
46
47 let transaction_submission_queue_worker = WorkerBuilder::new(TRANSACTION_SENDER)
48 .layer(ErrorHandlingLayer::new())
49 .enable_tracing()
50 .catch_panic()
51 .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
52 .retry(BackoffRetryPolicy::default())
53 .concurrency(DEFAULT_CONCURRENCY)
54 .data(app_state.clone())
55 .backend(queue.transaction_submission_queue.clone())
56 .build_fn(transaction_submission_handler);
57
58 let transaction_status_queue_worker = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER)
59 .layer(ErrorHandlingLayer::new())
60 .catch_panic()
61 .enable_tracing()
62 .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
63 .retry(BackoffRetryPolicy::default())
64 .concurrency(DEFAULT_CONCURRENCY)
65 .data(app_state.clone())
66 .backend(queue.transaction_status_queue.clone())
67 .build_fn(transaction_status_handler);
68
69 let notification_queue_worker = WorkerBuilder::new(NOTIFICATION_SENDER)
70 .layer(ErrorHandlingLayer::new())
71 .enable_tracing()
72 .catch_panic()
73 .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
74 .retry(BackoffRetryPolicy::default())
75 .concurrency(DEFAULT_CONCURRENCY)
76 .data(app_state.clone())
77 .backend(queue.notification_queue.clone())
78 .build_fn(notification_handler);
79
80 let solana_token_swap_request_queue_worker = WorkerBuilder::new(SOLANA_TOKEN_SWAP_REQUEST)
81 .layer(ErrorHandlingLayer::new())
82 .enable_tracing()
83 .catch_panic()
84 .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
85 .retry(BackoffRetryPolicy::default())
86 .concurrency(10)
87 .data(app_state.clone())
88 .backend(queue.solana_token_swap_request_queue.clone())
89 .build_fn(solana_token_swap_request_handler);
90
91 let monitor = Monitor::new()
92 .register(transaction_request_queue_worker)
93 .register(transaction_submission_queue_worker)
94 .register(transaction_status_queue_worker)
95 .register(notification_queue_worker)
96 .register(solana_token_swap_request_queue_worker)
97 .on_event(monitor_handle_event)
98 .shutdown_timeout(Duration::from_millis(5000));
99
100 let monitor_future = monitor.run_with_signal(async {
101 let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
102 .expect("Failed to create SIGINT signal");
103 let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
104 .expect("Failed to create SIGTERM signal");
105
106 info!("Monitor started");
107
108 tokio::select! {
109 _ = sigint.recv() => info!("Received SIGINT."),
110 _ = sigterm.recv() => info!("Received SIGTERM."),
111 };
112
113 info!("Monitor shutting down");
114
115 Ok(())
116 });
117 tokio::spawn(async move {
118 if let Err(e) = monitor_future.await {
119 error!("Monitor error: {}", e);
120 }
121 });
122 info!("Monitor shutdown complete");
123 Ok(())
124}
125
126pub async fn initialize_solana_swap_workers(app_state: ThinData<DefaultAppState>) -> Result<()> {
129 let solena_relayers_with_swap_enabled = app_state
130 .relayer_repository
131 .list_active()
132 .await?
133 .into_iter()
134 .filter(|relayer| {
135 let policy = relayer.policies.get_solana_policy();
136 let swap_config = match policy.get_swap_config() {
137 Some(config) => config,
138 None => {
139 info!("No swap configuration specified; skipping validation.");
140 return false;
141 }
142 };
143
144 if swap_config.cron_schedule.is_none() {
145 return false;
146 }
147 true
148 })
149 .collect::<Vec<_>>();
150
151 if solena_relayers_with_swap_enabled.is_empty() {
152 info!("No solana relayers with swap enabled");
153 return Ok(());
154 }
155 info!(
156 "Found {} solana relayers with swap enabled",
157 solena_relayers_with_swap_enabled.len()
158 );
159
160 let mut workers = Vec::new();
161
162 for relayer in solena_relayers_with_swap_enabled {
163 info!("Found solana relayer with swap enabled: {:?}", relayer);
164
165 let policy = relayer.policies.get_solana_policy();
166 let swap_config = match policy.get_swap_config() {
167 Some(config) => config,
168 None => {
169 info!("No swap configuration specified; skipping validation.");
170 continue;
171 }
172 };
173
174 let calendar_schedule = match swap_config.cron_schedule {
175 Some(schedule) => apalis_cron::Schedule::from_str(&schedule).unwrap(),
176 None => {
177 info!("No swap cron schedule found for relayer: {:?}", relayer);
178 continue;
179 }
180 };
181
182 let worker = WorkerBuilder::new(format!("solana-swap-schedule-{}", relayer.id.clone()))
184 .layer(ErrorHandlingLayer::new())
185 .enable_tracing()
186 .catch_panic()
187 .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
188 .retry(BackoffRetryPolicy::default())
189 .concurrency(1)
190 .data(relayer.id.clone())
191 .data(app_state.clone())
192 .backend(CronStream::new(calendar_schedule))
193 .build_fn(solana_token_swap_cron_handler);
194
195 workers.push(worker);
196 info!(
197 "Created worker for solana relayer with swap enabled: {:?}",
198 relayer
199 );
200 }
201
202 let mut monitor = Monitor::new()
203 .on_event(monitor_handle_event)
204 .shutdown_timeout(Duration::from_millis(5000));
205
206 for worker in workers {
208 monitor = monitor.register(worker);
209 }
210
211 let monitor_future = monitor.run_with_signal(async {
212 let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
213 .expect("Failed to create SIGINT signal");
214 let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
215 .expect("Failed to create SIGTERM signal");
216
217 info!("Solana Swap Monitor started");
218
219 tokio::select! {
220 _ = sigint.recv() => info!("Received SIGINT."),
221 _ = sigterm.recv() => info!("Received SIGTERM."),
222 };
223
224 info!("Solana Swap Monitor shutting down");
225
226 Ok(())
227 });
228 tokio::spawn(async move {
229 if let Err(e) = monitor_future.await {
230 error!("Monitor error: {}", e);
231 }
232 });
233 Ok(())
234}
235
236fn monitor_handle_event(e: Worker<Event>) {
237 let worker_id = e.id();
238 match e.inner() {
239 Event::Engage(task_id) => {
240 info!("Worker [{worker_id}] got a job with id: {task_id}");
241 }
242 Event::Error(e) => {
243 error!("Worker [{worker_id}] encountered an error: {e}");
244 }
245 Event::Exit => {
246 info!("Worker [{worker_id}] exited");
247 }
248 Event::Idle => {
249 info!("Worker [{worker_id}] is idle");
250 }
251 Event::Start => {
252 info!("Worker [{worker_id}] started");
253 }
254 Event::Stop => {
255 info!("Worker [{worker_id}] stopped");
256 }
257 _ => {}
258 }
259}