openzeppelin_relayer/services/plugins/
runner.rs

1//! This module is the orchestrator of the plugin execution.
2//!
3//! 1. Initiates a socket connection to the relayer server - socket.rs
4//! 2. Executes the plugin script - script_executor.rs
5//! 3. Sends the shutdown signal to the relayer server - socket.rs
6//! 4. Waits for the relayer server to finish the execution - socket.rs
7//! 5. Returns the output of the script - script_executor.rs
8//!
9use std::sync::Arc;
10
11use crate::services::plugins::{RelayerApi, ScriptExecutor, ScriptResult, SocketService};
12use crate::{jobs::JobProducerTrait, models::AppState};
13
14use super::PluginError;
15use actix_web::web;
16use async_trait::async_trait;
17use tokio::sync::oneshot;
18
19#[cfg(test)]
20use mockall::automock;
21
22#[cfg_attr(test, automock)]
23#[async_trait]
24pub trait PluginRunnerTrait {
25    async fn run<J: JobProducerTrait + 'static>(
26        &self,
27        socket_path: &str,
28        script_path: String,
29        script_params: String,
30        state: Arc<web::ThinData<AppState<J>>>,
31    ) -> Result<ScriptResult, PluginError>;
32}
33
34#[derive(Default)]
35pub struct PluginRunner;
36
37impl PluginRunner {
38    async fn run<J: JobProducerTrait + 'static>(
39        &self,
40        socket_path: &str,
41        script_path: String,
42        script_params: String,
43        state: Arc<web::ThinData<AppState<J>>>,
44    ) -> Result<ScriptResult, PluginError> {
45        let socket_service = SocketService::new(socket_path)?;
46        let socket_path_clone = socket_service.socket_path().to_string();
47
48        let (shutdown_tx, shutdown_rx) = oneshot::channel();
49
50        let server_handle = tokio::spawn(async move {
51            let relayer_api = Arc::new(RelayerApi);
52            socket_service.listen(shutdown_rx, state, relayer_api).await
53        });
54
55        let mut script_result =
56            ScriptExecutor::execute_typescript(script_path, socket_path_clone, script_params)
57                .await?;
58
59        let _ = shutdown_tx.send(());
60
61        let server_handle = server_handle
62            .await
63            .map_err(|e| PluginError::SocketError(e.to_string()))?;
64
65        match server_handle {
66            Ok(traces) => {
67                script_result.trace = traces;
68            }
69            Err(e) => {
70                return Err(PluginError::SocketError(e.to_string()));
71            }
72        }
73
74        Ok(script_result)
75    }
76}
77
78#[async_trait]
79impl PluginRunnerTrait for PluginRunner {
80    async fn run<J: JobProducerTrait + 'static>(
81        &self,
82        socket_path: &str,
83        script_path: String,
84        script_params: String,
85        state: Arc<web::ThinData<AppState<J>>>,
86    ) -> Result<ScriptResult, PluginError> {
87        self.run(socket_path, script_path, script_params, state)
88            .await
89    }
90}
91
92#[cfg(test)]
93mod tests {
94    use std::fs;
95
96    use crate::{
97        jobs::MockJobProducerTrait, services::plugins::LogLevel,
98        utils::mocks::mockutils::create_mock_app_state,
99    };
100    use tempfile::tempdir;
101
102    use super::*;
103
104    static TS_CONFIG: &str = r#"
105        {
106            "compilerOptions": {
107              "target": "es2016",
108              "module": "commonjs",
109              "esModuleInterop": true,
110              "forceConsistentCasingInFileNames": true,
111              "strict": true,
112              "skipLibCheck": true
113            }
114          }
115    "#;
116
117    #[tokio::test]
118    async fn test_run() {
119        let temp_dir = tempdir().unwrap();
120        let ts_config = temp_dir.path().join("tsconfig.json");
121        let script_path = temp_dir.path().join("test_run.ts");
122        let socket_path = temp_dir.path().join("test_run.sock");
123
124        let content = r#"
125            console.log(JSON.stringify({ level: 'log', message: 'test' }));
126            console.log(JSON.stringify({ level: 'error', message: 'test-error' }));
127            console.log(JSON.stringify({ level: 'result', message: 'test-result' }));
128        "#;
129        fs::write(script_path.clone(), content).unwrap();
130        fs::write(ts_config.clone(), TS_CONFIG.as_bytes()).unwrap();
131
132        let state = create_mock_app_state(None, None, None, None).await;
133
134        let plugin_runner = PluginRunner;
135        let result = plugin_runner
136            .run::<MockJobProducerTrait>(
137                &socket_path.display().to_string(),
138                script_path.display().to_string(),
139                "{ \"test\": \"test\" }".to_string(),
140                Arc::new(web::ThinData(state)),
141            )
142            .await;
143
144        assert!(result.is_ok());
145        let result = result.unwrap();
146        assert_eq!(result.logs[0].level, LogLevel::Log);
147        assert_eq!(result.logs[0].message, "test");
148        assert_eq!(result.logs[1].level, LogLevel::Error);
149        assert_eq!(result.logs[1].message, "test-error");
150        assert_eq!(result.return_value, "test-result");
151    }
152}