diff --git a/src/value_estimation_team/indicators/rsi.rs b/src/value_estimation_team/indicators/rsi.rs index 6551c83..e84d85e 100644 --- a/src/value_estimation_team/indicators/rsi.rs +++ b/src/value_estimation_team/indicators/rsi.rs @@ -7,7 +7,9 @@ use csv::{DeserializeRecordsIter, StringRecord}; use serde::Deserialize; use sqlx::FromRow; use std::f64::NAN; -use tokio::{fs::*, io::AsyncWriteExt, time::*}; +use futures::future::try_join_all; +use std::sync::Arc; +use tokio::{fs::*, io::AsyncWriteExt, sync::Mutex, time::*}; #[derive(Clone, Debug)] pub struct RsiData { @@ -28,124 +30,129 @@ impl RsiData { pub async fn rsi( rsi_number: usize, input_rt_data: &Vec<(String, Vec)>, - output_rsi_data: &mut Vec<(String, Vec)>, filtered_symbols: &Vec<(String, i64)>, -) -> Result<(), Box> { +) -> Result)>, Box> { let instant = Instant::now(); - let mut read_data_vec: Vec = Vec::new(); - let mut read_price_buffer: Vec = Vec::new(); - let mut prev_price: f64 = 0.0; - let mut current_price: f64 = 0.0; - let mut sum_increase: f64 = 0.0; - let mut sum_decrease: f64 = 0.0; - let mut rsi: f64 = 0.0; - let mut last_close_time = 0; - let mut rsi_data_wrapper: Vec<(String, Vec)> = Vec::new(); - let mut rsi_data_vec: Vec = Vec::new(); - let mut rsi_data = RsiData::new(); - for symbol in filtered_symbols { - read_data_vec.clear(); - read_price_buffer.clear(); - rsi_data_vec.clear(); - let symbol_search_result = input_rt_data.iter().position(|x| x.0 == *symbol.0); + let mut rsi_data_wrapper_arc = Arc::new(Mutex::new(rsi_data_wrapper)); + + let mut task_vec = Vec::new(); + for element in filtered_symbols { + + let element_c = element.clone(); + let rsi_data_wrapper_arc_c = Arc::clone(&rsi_data_wrapper_arc); + let symbol_search_result = input_rt_data.iter().position(|x| x.0 == *element_c.0); match symbol_search_result { Some(T) => { - if input_rt_data[T].1.len() < rsi_number + 1 { - rsi_data.rsi_value = 0.0; - rsi_data.close_time = 0; - rsi_data_vec.push(rsi_data.clone()); - } else { - read_data_vec = input_rt_data[T].1.clone(); - if read_data_vec.len() >= (150 + rsi_number) as usize { - read_data_vec.reverse(); - read_data_vec.truncate((150 + rsi_number) as usize); - read_data_vec.reverse(); - } - let window_iter = read_data_vec.windows(rsi_number + 1); + let input_rt_data_c = input_rt_data.clone(); + task_vec.push(tokio::spawn(async move { + let mut rsi_data_vec: Vec = Vec::new(); + let mut rsi_data = RsiData::new(); + let mut read_data_vec: Vec = Vec::new(); + let mut read_price_buffer: Vec = Vec::new(); + let mut prev_price: f64 = 0.0; + let mut current_price: f64 = 0.0; + let mut sum_increase: f64 = 0.0; + let mut sum_decrease: f64 = 0.0; + let mut rsi: f64 = 0.0; + let mut last_close_time = 0; - let mut prev_avg_ups: Option = None; - let mut prev_avg_downs: Option = None; - let mut current_avg_ups: Option = None; - let mut current_avg_downs: Option = None; - - for buffer in window_iter { - let mut up_vec: Vec = Vec::new(); - let mut down_vec: Vec = Vec::new(); - - let buffer_window = buffer.windows(2); - for element in buffer_window { - if element.last().unwrap().close_price - - element.first().unwrap().close_price - > 0.0 - { - up_vec.push( - element.last().unwrap().close_price - - element.first().unwrap().close_price, - ); - down_vec.push(0.0); - } else if element.last().unwrap().close_price - - element.first().unwrap().close_price - < 0.0 - { - up_vec.push(0.0); - down_vec.push( - (element.last().unwrap().close_price - - element.first().unwrap().close_price) - .abs(), - ); - } else { - up_vec.push(0.0); - down_vec.push(0.0); - } - } - - if current_avg_ups == None - && current_avg_downs == None - && prev_avg_ups == None - && prev_avg_downs == None - { - // initial averages based on SMA - let mut avg_ups: f64 = up_vec.iter().sum(); - avg_ups /= rsi_number as f64; - let mut avg_downs: f64 = down_vec.iter().sum(); - avg_downs /= rsi_number as f64; - - current_avg_ups = Some(avg_ups); - current_avg_downs = Some(avg_downs); - } else { - // [EMA] - let alpha = 1.0 / (rsi_number as f64); // Wilder's weight - current_avg_ups = Some( - alpha * up_vec.last().unwrap() - + ((1.0 - alpha) * prev_avg_ups.unwrap()), - ); - current_avg_downs = Some( - alpha * down_vec.last().unwrap() - + ((1.0 - alpha) * prev_avg_downs.unwrap()), - ); - } - prev_avg_ups = current_avg_ups; - prev_avg_downs = current_avg_downs; - - let rs = - current_avg_ups.unwrap() / (current_avg_downs.unwrap() + 0.00000001); // 0.00000001 is used to avoid division by 0 - - let rsi = 100.0 - (100.0 / (1.0 + rs)); - - rsi_data.rsi_value = rsi; - rsi_data.close_time = buffer.last().unwrap().close_time; + if input_rt_data_c[T].1.len() < rsi_number + 1 { + rsi_data.rsi_value = 0.0; + rsi_data.close_time = 0; rsi_data_vec.push(rsi_data.clone()); + } else { + read_data_vec = input_rt_data_c[T].1.clone(); + if read_data_vec.len() >= (150 + rsi_number) as usize { + read_data_vec.reverse(); + read_data_vec.truncate((150 + rsi_number) as usize); + read_data_vec.reverse(); + } + let window_iter = read_data_vec.windows(rsi_number + 1); + + let mut prev_avg_ups: Option = None; + let mut prev_avg_downs: Option = None; + let mut current_avg_ups: Option = None; + let mut current_avg_downs: Option = None; + + for buffer in window_iter { + let mut up_vec: Vec = Vec::new(); + let mut down_vec: Vec = Vec::new(); + + let buffer_window = buffer.windows(2); + for element in buffer_window { + if element.last().unwrap().close_price + - element.first().unwrap().close_price + > 0.0 + { + up_vec.push( + element.last().unwrap().close_price + - element.first().unwrap().close_price, + ); + down_vec.push(0.0); + } else if element.last().unwrap().close_price + - element.first().unwrap().close_price + < 0.0 + { + up_vec.push(0.0); + down_vec.push( + (element.last().unwrap().close_price + - element.first().unwrap().close_price) + .abs(), + ); + } else { + up_vec.push(0.0); + down_vec.push(0.0); + } + } + + if current_avg_ups == None + && current_avg_downs == None + && prev_avg_ups == None + && prev_avg_downs == None + { + // initial averages based on SMA + let mut avg_ups: f64 = up_vec.iter().sum(); + avg_ups /= rsi_number as f64; + let mut avg_downs: f64 = down_vec.iter().sum(); + avg_downs /= rsi_number as f64; + + current_avg_ups = Some(avg_ups); + current_avg_downs = Some(avg_downs); + } else { + // [EMA] + let alpha = 1.0 / (rsi_number as f64); // Wilder's weight + current_avg_ups = Some( + alpha * up_vec.last().unwrap() + + ((1.0 - alpha) * prev_avg_ups.unwrap()), + ); + current_avg_downs = Some( + alpha * down_vec.last().unwrap() + + ((1.0 - alpha) * prev_avg_downs.unwrap()), + ); + } + prev_avg_ups = current_avg_ups; + prev_avg_downs = current_avg_downs; + + let rs = + current_avg_ups.unwrap() / (current_avg_downs.unwrap() + 0.00000001); // 0.00000001 is used to avoid division by 0 + + let rsi = 100.0 - (100.0 / (1.0 + rs)); + + rsi_data.rsi_value = rsi; + rsi_data.close_time = buffer.last().unwrap().close_time; + rsi_data_vec.push(rsi_data.clone()); + } + let mut rsi_data_wrapper_lock = rsi_data_wrapper_arc_c.lock().await; + rsi_data_wrapper_lock.push((element_c.0.clone(), rsi_data_vec.clone())); } - rsi_data_wrapper.push((symbol.0.clone(), rsi_data_vec.clone())); - } + })); } None => {} } } - *output_rsi_data = rsi_data_wrapper; - // println!(" indicators/rsi{} 완료 elapsed:{:.2}s", rsi_number, instant.elapsed().as_secs_f32()); - - Ok(()) + try_join_all(task_vec).await?; + let a = rsi_data_wrapper_arc.lock().await.to_owned(); + Ok(a) }