diff --git a/src/value_estimation_team/indicators/bollingerband.rs b/src/value_estimation_team/indicators/bollingerband.rs index 20d0852..e3d3a77 100644 --- a/src/value_estimation_team/indicators/bollingerband.rs +++ b/src/value_estimation_team/indicators/bollingerband.rs @@ -7,7 +7,9 @@ use crate::value_estimation_team::indicators::sma::SmaData; use csv::{DeserializeRecordsIter, StringRecord}; use serde::Deserialize; use sqlx::FromRow; -use tokio::{fs::*, io::AsyncWriteExt, time::*}; +use tokio::{fs::*, io::AsyncWriteExt, time::*, sync::Mutex}; +use futures::future::try_join_all; +use std::{sync::Arc}; #[derive(Clone, Debug)] pub struct BollingerBandData { @@ -28,103 +30,107 @@ impl BollingerBandData { a } } + // Binance Bollingerband (SMA) pub async fn bollingerband( - ma_number: usize, + period: usize, // this value should be same as moving size of sma sd_factor: f64, input_sma_data: &Vec<(String, Vec)>, - rt_input_data: &Vec<(String, Vec)>, - valid_usdt_trades: &Vec, + input_rt_data: &Vec<(String, Vec)>, + filtered_symbols: &Vec<(String, i64)>, ) -> Result)>, Box> { - let instant = Instant::now(); + if filtered_symbols.is_empty() { + Err(("Err"))?; + } - let mut read_rt_data_vec: Vec<(String, Vec)> = rt_input_data.clone(); + let mut read_rt_data_vec: Vec<(String, Vec)> = input_rt_data.clone(); let mut read_sma_data_vec: Vec<(String, Vec)> = input_sma_data.clone(); - let mut standard_deviation: f64 = 0.0; - let mut sd_mean: f64 = 0.0; - let mut read_data_buffer: Option<&SmaData>; - let mut bb_data_wrapper: Vec<(String, Vec)> = Vec::new(); - let mut bb_data_vec: Vec = Vec::new(); - - for symbol in valid_usdt_trades { - bb_data_vec.clear(); - let mut bb_data = BollingerBandData::new(); - - let symbol_search_result1 = read_rt_data_vec.iter().position(|x| x.0 == *symbol); - let symbol_search_result2 = read_sma_data_vec.iter().position(|x| x.0 == *symbol); + let mut bb_data_wrapper_arc = Arc::new(Mutex::new(bb_data_wrapper)); + let mut task_vec = Vec::new(); + for symbol in filtered_symbols { + let symbol_search_result1 = read_rt_data_vec.iter().position(|x| x.0 == *symbol.0); + let symbol_search_result2 = read_sma_data_vec.iter().position(|x| x.0 == *symbol.0); match symbol_search_result1 { Some(rt_index) => { match symbol_search_result2 { Some(sma_index) => { - // if the data has shorter than buffer - if read_sma_data_vec[sma_index].1.len() < ma_number { - bb_data.sma = 0.0; - bb_data.upperband = 0.0; - bb_data.lowerband = 0.0; - bb_data.close_time = 0; - bb_data_vec.push(bb_data.clone()); - } else { - let result = read_rt_data_vec[rt_index].1.binary_search_by_key( - &read_sma_data_vec[sma_index].1.first().unwrap().close_time, - |RealtimePriceData { - opclo_price, - open_price, - close_price, - high_price, - low_price, - close_time, - quote_asset_volume, - candle_type, - }| *close_time, - ); - match result { - Ok(T) => { - if T <= ma_number - 1 { - let mut read_data_iter = - read_sma_data_vec[sma_index].1.iter(); - for _ in T..ma_number - 1 { - read_data_iter.next(); - } - let window_iter = - read_rt_data_vec[rt_index].1.windows(ma_number); - for buffer in window_iter { - sd_mean = 0.0; - standard_deviation = 0.0; - for element in buffer { - sd_mean += element.close_price; + let bb_data_wrapper_arc_c = Arc::clone(&bb_data_wrapper_arc); + let mut bb_data = BollingerBandData::new(); + let mut bb_data_vec: Vec = Vec::new(); + let read_rt_data_vec_c = read_rt_data_vec.clone(); + let read_sma_data_vec_c = read_sma_data_vec.clone(); + let symbol_c = symbol.clone(); + task_vec.push(tokio::spawn(async move { + // if the data has shorter than buffer + if read_sma_data_vec_c[sma_index].1.len() < period { + bb_data.sma = 0.0; + bb_data.upperband = 0.0; + bb_data.lowerband = 0.0; + bb_data.close_time = 0; + bb_data_vec.push(bb_data.clone()); + } else { + let result = read_rt_data_vec_c[rt_index].1.binary_search_by_key( + &read_sma_data_vec_c[sma_index].1.first().unwrap().close_time, + |RealtimePriceData { + opclo_price, + open_price, + close_price, + high_price, + low_price, + close_time, + quote_asset_volume, + candle_type, + }| *close_time, + ); + + match result { + Ok(T) => { + if T <= period - 1 { + let mut read_data_iter = + read_sma_data_vec_c[sma_index].1.iter(); + for _ in T..period - 1 { + read_data_iter.next(); } - sd_mean /= (ma_number as f64); - for element in buffer { - standard_deviation += - (element.close_price - sd_mean).powi(2); - } - standard_deviation = sd_factor - * ((standard_deviation / ma_number as f64).sqrt()); - - read_data_buffer = read_data_iter.next(); - - match read_data_buffer { - Some(T) => { - bb_data.sma = T.sma_value; - bb_data.upperband = - T.sma_value + standard_deviation; - bb_data.lowerband = - T.sma_value - standard_deviation; - bb_data.close_time = T.close_time; - bb_data_vec.push(bb_data.clone()); + let window_iter = + read_rt_data_vec_c[rt_index].1.windows(period); + for buffer in window_iter { + let mut sd_mean = 0.0; + let mut standard_deviation = 0.0; + for element in buffer { + sd_mean += element.close_price; + } + sd_mean /= (period as f64); + for element in buffer { + standard_deviation += + (element.close_price - sd_mean).powi(2); + } + standard_deviation = sd_factor + * ((standard_deviation / period as f64).sqrt()); + + match read_data_iter.next() { + Some(T) => { + bb_data.sma = T.sma_value; + bb_data.upperband = + T.sma_value + standard_deviation; + bb_data.lowerband = + T.sma_value - standard_deviation; + bb_data.close_time = T.close_time; + bb_data_vec.push(bb_data.clone()); + } + None => {} } - None => {} } } } + Err(E) => {} } - Err(E) => {} + let mut bb_data_wrapper_lock = bb_data_wrapper_arc_c.lock().await; + bb_data_wrapper_lock.push((symbol_c.0.clone(), bb_data_vec.clone())); } - bb_data_wrapper.push((symbol.clone(), bb_data_vec.clone())); - } + })); } None => {} } @@ -132,6 +138,7 @@ pub async fn bollingerband( None => {} } } - // println!(" indicators/bb{} 완료 elapsed:{:.2}s", ma_number, instant.elapsed().as_secs_f32()); - Ok(bb_data_wrapper) + try_join_all(task_vec).await?; + let a = bb_data_wrapper_arc.lock().await.to_owned(); + Ok(a) }