From f5ca681f9b7eaab3f6fb7799ab825108111892cc Mon Sep 17 00:00:00 2001 From: Sik Yoon Date: Sat, 9 Sep 2023 01:38:59 +0900 Subject: [PATCH] Change code to apply thread --- src/value_estimation_team/indicators/ema.rs | 93 ++++++++++++--------- 1 file changed, 52 insertions(+), 41 deletions(-) diff --git a/src/value_estimation_team/indicators/ema.rs b/src/value_estimation_team/indicators/ema.rs index be2d9a7..dc2bb56 100644 --- a/src/value_estimation_team/indicators/ema.rs +++ b/src/value_estimation_team/indicators/ema.rs @@ -3,9 +3,11 @@ use crate::database_control::*; use crate::value_estimation_team::datapoints::price_data::RealtimePriceData; use csv::{DeserializeRecordsIter, StringRecord}; +use futures::future::try_join_all; use serde::Deserialize; use sqlx::FromRow; -use tokio::{fs::*, io::AsyncWriteExt, time::*}; +use std::sync::Arc; +use tokio::{fs::*, io::AsyncWriteExt, sync::Mutex, time::*}; #[derive(Clone, Debug)] pub struct EmaData { @@ -24,61 +26,70 @@ impl EmaData { // Binance EMA (closeprice) pub async fn ema( - ema_number: usize, + moving_number: usize, input_rt_data: &Vec<(String, Vec)>, - output_ema_data: &mut Vec<(String, Vec)>, - valid_usdt_trades: &Vec, -) -> Result<(), Box> { - let instant = Instant::now(); - let alpha: f64 = 2.0 / (ema_number as f64 + 1.0); + filtered_symbols: &Vec<(String, i64)>, +) -> Result)>, Box> { + if filtered_symbols.is_empty() { + Err("Err")?; + } + + let alpha: f64 = 2.0 / (moving_number as f64 + 1.0); let mut ema_t: f64 = 0.0; let mut ema_prev: f64 = 0.0; - let mut ema_data = EmaData::new(); let mut ema_data_wrapper: Vec<(String, Vec)> = Vec::new(); - let mut ema_data_vec: Vec = Vec::new(); - for symbol in valid_usdt_trades { - let symbol_search_result = input_rt_data.iter().position(|x| x.0 == *symbol); - ema_data_vec.clear(); + let mut ema_data_wrapper_arc = Arc::new(Mutex::new(ema_data_wrapper)); + let mut task_vec = Vec::new(); + for symbol in filtered_symbols { + let symbol_search_result = input_rt_data.iter().position(|x| x.0 == *symbol.0); match symbol_search_result { Some(T) => { - if input_rt_data[T].1.len() < ema_number { - ema_data.ema_value = 0.0; - ema_data.close_time = 0; - ema_data_vec.push(ema_data.clone()); - } else { - let partial_vec1 = input_rt_data[T].1.get(..ema_number).unwrap(); - let partial_vec2 = input_rt_data[T].1.get(ema_number..).unwrap(); + let ema_data_wrapper_arc_c = Arc::clone(&ema_data_wrapper_arc); + let symbol_c = symbol.clone(); + let mut ema_data = EmaData::new(); + let mut ema_data_vec: Vec = Vec::new(); + let input_rt_data_c = input_rt_data.clone(); + task_vec.push(tokio::spawn(async move { + if input_rt_data_c[T].1.len() < moving_number { + ema_data.ema_value = 0.0; + ema_data.close_time = 0; + ema_data_vec.push(ema_data.clone()); + } else { + let partial_vec1 = input_rt_data_c[T].1.get(..moving_number).unwrap(); + let partial_vec2 = input_rt_data_c[T].1.get(moving_number..).unwrap(); - let mut sma_for_initial_value = 0.0; - for element in partial_vec1 { - sma_for_initial_value += element.close_price; - } - sma_for_initial_value /= ema_number as f64; + let mut sma_for_initial_value = 0.0; + for element in partial_vec1 { + sma_for_initial_value += element.close_price; + } + sma_for_initial_value /= moving_number as f64; - ema_data.ema_value = sma_for_initial_value; - ema_data.close_time = partial_vec1.last().unwrap().close_time; - ema_data_vec.push(ema_data.clone()); - - ema_prev = sma_for_initial_value; - - for element in partial_vec2 { - ema_t = (1.0 - alpha) * ema_prev + alpha * element.close_price; - - ema_data.ema_value = ema_t; - ema_data.close_time = element.close_time; + ema_data.ema_value = sma_for_initial_value; + ema_data.close_time = partial_vec1.last().unwrap().close_time; ema_data_vec.push(ema_data.clone()); - ema_prev = ema_t; + ema_prev = sma_for_initial_value; + + for element in partial_vec2 { + ema_t = (1.0 - alpha) * ema_prev + alpha * element.close_price; + + ema_data.ema_value = ema_t; + ema_data.close_time = element.close_time; + ema_data_vec.push(ema_data.clone()); + + ema_prev = ema_t; + } } - } - ema_data_wrapper.push((symbol.clone(), ema_data_vec.clone())); + let mut ema_data_wrapper_lock = ema_data_wrapper_arc_c.lock().await; + ema_data_wrapper_lock.push((symbol_c.0.clone(), ema_data_vec.clone())); + })); } None => {} } } - *output_ema_data = ema_data_wrapper; - // println!(" indicators/ema{} 완료 elapsed:{:.2}s", ema_number, instant.elapsed().as_secs_f32()); - Ok(()) + try_join_all(task_vec).await?; + let a = ema_data_wrapper_arc.lock().await.to_owned(); + Ok(a) }