diff --git a/src/value_estimation_team/indicators/stoch_rsi.rs b/src/value_estimation_team/indicators/stoch_rsi.rs index 67ee269..7a51cb6 100644 --- a/src/value_estimation_team/indicators/stoch_rsi.rs +++ b/src/value_estimation_team/indicators/stoch_rsi.rs @@ -7,8 +7,27 @@ use csv::{DeserializeRecordsIter, StringRecord}; use serde::Deserialize; use sqlx::FromRow; use std::f64::NAN; +use futures::{future::try_join_all, lock::Mutex}; +use std::sync::Arc; use tokio::{fs::*, io::AsyncWriteExt, time::*}; +#[derive(Clone, Debug)] +pub struct StochRsiData { + pub k: f64, + pub d: f64, + pub close_time: i64, +} +impl StochRsiData { + fn new() -> StochRsiData { + let a = StochRsiData { + k: 0.0, + d: 0.0, + close_time: 0, + }; + a + } +} + #[derive(Clone, Debug)] pub struct StochRsiKData { pub stoch_rsi_k_value: f64, @@ -24,108 +43,91 @@ impl StochRsiKData { } } -#[derive(Clone, Debug)] -pub struct StochRsiDData { - pub stoch_rsi_d_value: f64, - pub close_time: i64, -} -impl StochRsiDData { - fn new() -> StochRsiDData { - let a = StochRsiDData { - stoch_rsi_d_value: 0.0, - close_time: 0, - }; - a - } -} - -// Binance Stoch RSI (RSI10, length 10, K: 3, D: 3) +// Binance Stoch RSI pub async fn stoch_rsi( input_rsi_data: &Vec<(String, Vec)>, stoch_rsi_length: usize, - output_stoch_rsi_k_data: &mut Vec<(String, Vec)>, - output_stoch_rsi_d_data: &mut Vec<(String, Vec)>, - valid_usdt_trades: &Vec, -) -> Result<(), Box> { - let mut read_data_vec: Vec = Vec::new(); - let mut stoch_rsi_k_data_wrapper: Vec<(String, Vec)> = Vec::new(); - let mut stoch_rsi_k_data_vec: Vec = Vec::new(); - let mut stoch_rsi_k_data = StochRsiKData::new(); - let mut stoch_rsi_d_data_wrapper: Vec<(String, Vec)> = Vec::new(); - let mut stoch_rsi_d_data_vec: Vec = Vec::new(); - let mut stoch_rsi_d_data = StochRsiDData::new(); + smooth_k: usize, + smooth_d: usize, +) -> Result)>, Box> { + let mut stoch_rsi_data_wrapper: Vec<(String, Vec)> = Vec::new(); + let mut stoch_rsi_data_wrapper_arc = Arc::new(Mutex::new(stoch_rsi_data_wrapper)); + let mut stoch_rsi_data_vec: Vec = Vec::new(); - let mut stoch_rsi_vec: Vec = Vec::new(); - let mut stoch_rsi = RsiData::new(); - let k_length = 2; - let d_length = 2; - - for symbol in valid_usdt_trades { - stoch_rsi_k_data_vec.clear(); - stoch_rsi_d_data_vec.clear(); - stoch_rsi_vec.clear(); - let symbol_search_result = input_rsi_data.iter().position(|x| x.0 == *symbol); - match symbol_search_result { - Some(T) => { - if input_rsi_data[T].1.len() >= stoch_rsi_length { - read_data_vec = input_rsi_data[T].1.clone(); - let window_iter = read_data_vec.windows(stoch_rsi_length); - - for buffer_window in window_iter { - let max_value = buffer_window - .iter() - .max_by(|x, y| x.rsi_value.partial_cmp(&y.rsi_value).unwrap()) - .unwrap() - .rsi_value; - let min_value = buffer_window - .iter() - .min_by(|x, y| x.rsi_value.partial_cmp(&y.rsi_value).unwrap()) - .unwrap() - .rsi_value; - - let stoch_rsi_value = if max_value == min_value { - max_value - } else { - (buffer_window.last().unwrap().rsi_value - min_value) - / (max_value - min_value) - }; - - stoch_rsi.rsi_value = stoch_rsi_value; - stoch_rsi.close_time = buffer_window.last().unwrap().close_time; - - stoch_rsi_vec.push(stoch_rsi.clone()); - } - - // making Stoch RSI K data - let window_iter = stoch_rsi_vec.windows(k_length); - for buffer_window in window_iter { - stoch_rsi_k_data.stoch_rsi_k_value = - (buffer_window.iter().fold(0.0, |acc, x| acc + x.rsi_value) - / (k_length as f64)) - * 100.0; - stoch_rsi_k_data.close_time = buffer_window.last().unwrap().close_time; - stoch_rsi_k_data_vec.push(stoch_rsi_k_data.clone()); - } - stoch_rsi_k_data_wrapper.push((symbol.clone(), stoch_rsi_k_data_vec.clone())); - - // making Stoch RSI D data - let window_iter = stoch_rsi_k_data_vec.windows(d_length); - for buffer_window in window_iter { - stoch_rsi_d_data.stoch_rsi_d_value = (buffer_window - .iter() - .fold(0.0, |acc, x| acc + x.stoch_rsi_k_value) - / (k_length as f64)); - stoch_rsi_d_data.close_time = buffer_window.last().unwrap().close_time; - stoch_rsi_d_data_vec.push(stoch_rsi_d_data.clone()); - } - stoch_rsi_d_data_wrapper.push((symbol.clone(), stoch_rsi_d_data_vec.clone())); - } - } - None => {} - } + if stoch_rsi_length == 0 || smooth_k == 0 || smooth_d == 0 { + panic!("stoch_rsi_length or smooth_k or smooth_d can't be 0! stoch_rsi_length: {}, k:{}, d:{}", stoch_rsi_length, smooth_k, smooth_d); } - *output_stoch_rsi_k_data = stoch_rsi_k_data_wrapper; - *output_stoch_rsi_d_data = stoch_rsi_d_data_wrapper; + let mut task_vec = Vec::new(); + for element in input_rsi_data { + let mut stoch_rsi_data = StochRsiData::new(); + let mut stoch_rsi_k_data = StochRsiKData::new(); + let stoch_rsi_data_wrapper_arc_c = Arc::clone(&stoch_rsi_data_wrapper_arc); + + let element_c = element.clone(); + task_vec.push(tokio::spawn(async move { + let mut stoch_rsi_data_vec: Vec = Vec::new(); + let mut stoch_rsi_k_data_vec: Vec = Vec::new(); + let mut stoch_rsi_vec: Vec = Vec::new(); + let mut stoch_rsi = RsiData::new(); - Ok(()) + if element_c.1.len() >= stoch_rsi_length && + element_c.1.len() >= smooth_k && + element_c.1.len() >= smooth_d { + let mut read_data_vec = element_c.1; + let window_iter = read_data_vec.windows(stoch_rsi_length); + + for buffer_window in window_iter { + let max_value = buffer_window + .iter() + .max_by(|x, y| x.rsi_value.partial_cmp(&y.rsi_value).unwrap()) + .unwrap() + .rsi_value; + let min_value = buffer_window + .iter() + .min_by(|x, y| x.rsi_value.partial_cmp(&y.rsi_value).unwrap()) + .unwrap() + .rsi_value; + + let stoch_rsi_value = if max_value == min_value { + max_value + } else { + (buffer_window.last().unwrap().rsi_value - min_value) + / (max_value - min_value) + }; + + stoch_rsi.rsi_value = stoch_rsi_value; + stoch_rsi.close_time = buffer_window.last().unwrap().close_time; + stoch_rsi_vec.push(stoch_rsi.clone()); + } + + // making Stoch RSI K data + let window_iter = stoch_rsi_vec.windows(smooth_k); + for buffer_window in window_iter { + stoch_rsi_k_data.stoch_rsi_k_value = + (buffer_window.iter().fold(0.0, |acc, x| acc + x.rsi_value) + / (smooth_k as f64)) + * 100.0; + stoch_rsi_k_data.close_time = buffer_window.last().unwrap().close_time; + stoch_rsi_k_data_vec.push(stoch_rsi_k_data.clone()); + } + + // making Stoch RSI D data and Stoch RSI data + let window_iter = stoch_rsi_k_data_vec.windows(smooth_d); + for buffer_window in window_iter { + stoch_rsi_data.close_time = buffer_window.last().unwrap().close_time; + stoch_rsi_data.k = buffer_window.last().unwrap().stoch_rsi_k_value; + stoch_rsi_data.d = (buffer_window + .iter() + .fold(0.0, |acc, x| acc + x.stoch_rsi_k_value) + / (smooth_k as f64)); + stoch_rsi_data_vec.push(stoch_rsi_data.clone()); + } + let mut stoch_rsi_data_wrapper_lock = stoch_rsi_data_wrapper_arc_c.lock().await; + stoch_rsi_data_wrapper_lock.push((element_c.0.clone(), stoch_rsi_data_vec.clone())); + } + })); + } + try_join_all(task_vec).await?; + let a = stoch_rsi_data_wrapper_arc.lock().await.to_owned(); + Ok(a) }