diff --git a/src/value_estimation_team/indicators/heatmap_volume.rs b/src/value_estimation_team/indicators/heatmap_volume.rs index 1f593e4..57690fe 100644 --- a/src/value_estimation_team/indicators/heatmap_volume.rs +++ b/src/value_estimation_team/indicators/heatmap_volume.rs @@ -1,4 +1,8 @@ use crate::value_estimation_team::datapoints::price_data::RealtimePriceData; +use super::FilteredData; +use std::sync::Arc; +use tokio::{fs::*, io::AsyncWriteExt, sync::Mutex, time::*}; +use futures::future::try_join_all; #[derive(Debug, Clone, PartialEq)] pub enum HeatMapLevel { @@ -18,33 +22,39 @@ pub struct HeatmapVolumeData { // Implementation from TradingView Script (HeatMap Volume by xdecow) pub async fn heatmap_volume( - symbol: &String, - input_rt_data: &Vec<(String, Vec)>, ma_len: usize, std_len: usize, extra_high_thold: f64, high_thold: f64, medium_thold: f64, normal_thold: f64, -) -> Option> { - let symbol_search_result = input_rt_data.iter().position(|x| x.0 == *symbol); + filtered_symbols: &Vec, + input_rt_data: &Vec<(String, Vec)>, +) -> Result)>, Box> { + let mut heatmap_data_wrapper: Vec<(String, Vec)> = Vec::new(); + let mut heatmap_data_wrapper_arc = Arc::new(Mutex::new(heatmap_data_wrapper)); - match symbol_search_result { - Some(T) => { - if input_rt_data[T].1.len() >= ma_len && input_rt_data[T].1.len() >= std_len { + let mut task_vec = Vec::new(); + for element in filtered_symbols { + let heatmap_data_wrapper_arc_c = Arc::clone(&heatmap_data_wrapper_arc); + let element_c = element.clone(); + let input_rt_data_c = input_rt_data.clone(); + task_vec.push(tokio::spawn(async move { + let search_result = input_rt_data_c.iter().position(|x| x.0 == element_c.symbol); + if search_result.is_some_and(|a| input_rt_data_c[a].1.len() >= ma_len && input_rt_data_c[a].1.len() >= std_len) { // calc mean #[derive(Debug, Clone)] struct MeanData { mean_value: f64, close_time: i64, } - + let mut mean_vec: Vec = Vec::new(); let mut mean_data = MeanData { mean_value: 0.0, close_time: 0, }; - let window = input_rt_data[T].1.windows(ma_len); + let window = input_rt_data_c[search_result.unwrap()].1.windows(ma_len); for buffer in window { // calculate SMA of volume mean_data.mean_value = 0.0; @@ -54,10 +64,10 @@ pub async fn heatmap_volume( } mean_data.mean_value /= ma_len as f64; mean_data.close_time = buffer.last().unwrap().close_time; - + mean_vec.push(mean_data.clone()); } - + // calc pstdev #[derive(Debug, Clone)] struct PstdevData { @@ -72,7 +82,7 @@ pub async fn heatmap_volume( let mut mean = 0.0; let mut summation = 0.0; let mut sample_minus_mean = 0.0; - let window = input_rt_data[T].1.windows(std_len); + let window = input_rt_data_c[search_result.unwrap()].1.windows(std_len); for buffer in window { pstdev_data.pstdev_value = 0.0; pstdev_data.close_time = 0; @@ -83,18 +93,18 @@ pub async fn heatmap_volume( mean += element.quote_asset_volume; } mean /= std_len as f64; - + for element in buffer { sample_minus_mean = element.quote_asset_volume - mean; summation += sample_minus_mean.powi(2); } - + pstdev_data.pstdev_value = (summation / std_len as f64).sqrt(); pstdev_data.close_time = buffer.last().unwrap().close_time; - + pstdev_vec.push(pstdev_data.clone()); } - + // calc stdbar and heatmap volume let mut heatmap_vol_vec: Vec = Vec::new(); let mut heatmap_vol_data = HeatmapVolumeData { @@ -102,11 +112,11 @@ pub async fn heatmap_volume( heatmap_level: HeatMapLevel::Normal, close_time: 0, }; - + if ma_len == std_len { let mut rt_data_trunc_vec = - input_rt_data[T].1.get(ma_len - 1..).unwrap().iter(); - + input_rt_data_c[search_result.unwrap()].1.get(ma_len - 1..).unwrap().iter(); + let zipped = mean_vec.iter().zip(pstdev_vec.iter()); for element in zipped { heatmap_vol_data.heatmap_value = @@ -129,7 +139,7 @@ pub async fn heatmap_volume( } } else if ma_len > std_len { let mut rt_data_trunc_vec = - input_rt_data[T].1.get(std_len - 1..).unwrap().iter(); + input_rt_data_c[search_result.unwrap()].1.get(std_len - 1..).unwrap().iter(); let mut mean_trunc_vec = mean_vec.get(mean_vec.len() - std_len..).unwrap().iter(); let zipped = mean_trunc_vec.zip(pstdev_vec.iter()); @@ -143,7 +153,7 @@ pub async fn heatmap_volume( } } else { let mut rt_data_trunc_vec = - input_rt_data[T].1.get(ma_len - 1..).unwrap().iter(); + input_rt_data_c[search_result.unwrap()].1.get(ma_len - 1..).unwrap().iter(); let mut pstdev_trunc_vec = pstdev_vec.get(pstdev_vec.len() - ma_len..).unwrap().iter(); let zipped = mean_vec.iter().zip(pstdev_trunc_vec); @@ -156,11 +166,12 @@ pub async fn heatmap_volume( heatmap_vol_vec.push(heatmap_vol_data.clone()); } // level 구현 } - Some(heatmap_vol_vec) - } else { - None + let mut heatmap_data_wrapper_lock = heatmap_data_wrapper_arc_c.lock().await; + heatmap_data_wrapper_lock.push((element_c.symbol.clone(), heatmap_vol_vec.clone())); } - } - None => None, + })); } + try_join_all(task_vec).await?; + let a = heatmap_data_wrapper_arc.lock().await.to_owned(); + Ok(a) }