From f846d0e978ab03de9cfe1a389685c2af01e19d3d Mon Sep 17 00:00:00 2001 From: Sik Yoon Date: Sat, 22 Jul 2023 19:28:44 +0900 Subject: [PATCH] Apply threading for iteratoration --- src/value_estimation_team/indicators/sma.rs | 74 +++++++++++++-------- 1 file changed, 45 insertions(+), 29 deletions(-) diff --git a/src/value_estimation_team/indicators/sma.rs b/src/value_estimation_team/indicators/sma.rs index 08fc6e2..de74946 100644 --- a/src/value_estimation_team/indicators/sma.rs +++ b/src/value_estimation_team/indicators/sma.rs @@ -6,9 +6,11 @@ use crate::value_estimation_team::datapoints::price_data::RealtimePriceData; use csv::{DeserializeRecordsIter, StringRecord}; use serde::Deserialize; use sqlx::FromRow; -use tokio::{fs::*, io::AsyncWriteExt, time::*}; +use tokio::{fs::*, io::AsyncWriteExt, time::*, sync::Mutex}; +use futures::future::try_join_all; +use std::{sync::Arc}; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct SmaData { pub sma_value: f64, pub close_time: i64, @@ -27,43 +29,57 @@ impl SmaData { pub async fn sma( moving_number: usize, input_rt_data: &Vec<(String, Vec)>, - valid_usdt_trades: &Vec, + filtered_symbols: &Vec<(String, i64)>, ) -> Result)>, Box> { - let instant = Instant::now(); + if filtered_symbols.is_empty() { + Err("Err")?; + } let mut sma_data_wrapper: Vec<(String, Vec)> = Vec::new(); - let mut sma_data_vec: Vec = Vec::new(); - let mut sma_data = SmaData::new(); - - for symbol in valid_usdt_trades { - let symbol_search_result = input_rt_data.iter().position(|x| x.0 == *symbol); - sma_data_vec.clear(); + let mut sma_data_wrapper_arc = Arc::new(Mutex::new(sma_data_wrapper)); + // let mut sma_data_vec: Vec = Vec::new(); + // let mut sma_data_vec_arc = Arc::new(sma_data_vec); + let mut task_vec = Vec::new(); + for symbol in filtered_symbols { + let symbol_search_result = input_rt_data.iter().position(|x| x.0 == *symbol.0); + // let sma_data_vec_arc_c = Arc::clone(&sma_data_vec_arc); + // sma_data_vec_arc_c.clear(); + match symbol_search_result { Some(T) => { - if input_rt_data[T].1.len() < moving_number { - sma_data.sma_value = 0.0; - sma_data.close_time = 0; - sma_data_vec.push(sma_data.clone()); - } else { - let mut iter = input_rt_data[T].1.windows(moving_number); - for buffer in iter { - let mut avg = 0.0; - for element in buffer { - avg += element.close_price; - } - avg /= (moving_number as f64); - - sma_data.sma_value = avg; - sma_data.close_time = buffer.last().unwrap().close_time; + let sma_data_wrapper_arc_c = Arc::clone(&sma_data_wrapper_arc); + let symbol_c = symbol.clone(); + let mut sma_data = SmaData::new(); + let mut sma_data_vec: Vec = Vec::new(); + let input_rt_data_c = input_rt_data.clone(); + task_vec.push(tokio::spawn(async move { + if input_rt_data_c[T].1.len() < moving_number { + sma_data.sma_value = 0.0; + sma_data.close_time = 0; sma_data_vec.push(sma_data.clone()); + } else { + let mut iter = input_rt_data_c[T].1.windows(moving_number); + for buffer in iter { + let mut avg = 0.0; + for element in buffer { + avg += element.close_price; + } + avg /= (moving_number as f64); + + sma_data.sma_value = avg; + sma_data.close_time = buffer.last().unwrap().close_time; + sma_data_vec.push(sma_data.clone()); + } } - } - sma_data_wrapper.push((symbol.clone(), sma_data_vec.clone())); + let mut sma_data_wrapper_lock = sma_data_wrapper_arc_c.lock().await; + sma_data_wrapper_lock.push((symbol_c.0.clone(), sma_data_vec.clone())); + })); } None => {} } } - // println!(" indicators/sma{} 완료 elapsed:{:.2}s", moving_number, instant.elapsed().as_secs_f32()); - Ok(sma_data_wrapper) + try_join_all(task_vec).await?; + let a = sma_data_wrapper_arc.lock().await.to_owned(); + Ok(a) }