Apply threading for iteratoration

This commit is contained in:
Sik Yoon 2023-07-22 19:28:44 +09:00
parent f0b3224e49
commit f846d0e978

View File

@ -6,9 +6,11 @@ use crate::value_estimation_team::datapoints::price_data::RealtimePriceData;
use csv::{DeserializeRecordsIter, StringRecord}; use csv::{DeserializeRecordsIter, StringRecord};
use serde::Deserialize; use serde::Deserialize;
use sqlx::FromRow; use sqlx::FromRow;
use tokio::{fs::*, io::AsyncWriteExt, time::*}; use tokio::{fs::*, io::AsyncWriteExt, time::*, sync::Mutex};
use futures::future::try_join_all;
use std::{sync::Arc};
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct SmaData { pub struct SmaData {
pub sma_value: f64, pub sma_value: f64,
pub close_time: i64, pub close_time: i64,
@ -27,43 +29,57 @@ impl SmaData {
pub async fn sma( pub async fn sma(
moving_number: usize, moving_number: usize,
input_rt_data: &Vec<(String, Vec<RealtimePriceData>)>, input_rt_data: &Vec<(String, Vec<RealtimePriceData>)>,
valid_usdt_trades: &Vec<String>, filtered_symbols: &Vec<(String, i64)>,
) -> Result<Vec<(String, Vec<SmaData>)>, Box<dyn std::error::Error + Send + Sync>> { ) -> Result<Vec<(String, Vec<SmaData>)>, Box<dyn std::error::Error + Send + Sync>> {
let instant = Instant::now(); if filtered_symbols.is_empty() {
Err("Err")?;
}
let mut sma_data_wrapper: Vec<(String, Vec<SmaData>)> = Vec::new(); let mut sma_data_wrapper: Vec<(String, Vec<SmaData>)> = Vec::new();
let mut sma_data_vec: Vec<SmaData> = Vec::new(); let mut sma_data_wrapper_arc = Arc::new(Mutex::new(sma_data_wrapper));
let mut sma_data = SmaData::new(); // let mut sma_data_vec: Vec<SmaData> = Vec::new();
// let mut sma_data_vec_arc = Arc::new(sma_data_vec);
for symbol in valid_usdt_trades { let mut task_vec = Vec::new();
let symbol_search_result = input_rt_data.iter().position(|x| x.0 == *symbol); for symbol in filtered_symbols {
sma_data_vec.clear(); let symbol_search_result = input_rt_data.iter().position(|x| x.0 == *symbol.0);
// let sma_data_vec_arc_c = Arc::clone(&sma_data_vec_arc);
// sma_data_vec_arc_c.clear();
match symbol_search_result { match symbol_search_result {
Some(T) => { Some(T) => {
if input_rt_data[T].1.len() < moving_number { let sma_data_wrapper_arc_c = Arc::clone(&sma_data_wrapper_arc);
sma_data.sma_value = 0.0; let symbol_c = symbol.clone();
sma_data.close_time = 0; let mut sma_data = SmaData::new();
sma_data_vec.push(sma_data.clone()); let mut sma_data_vec: Vec<SmaData> = Vec::new();
} else { let input_rt_data_c = input_rt_data.clone();
let mut iter = input_rt_data[T].1.windows(moving_number); task_vec.push(tokio::spawn(async move {
for buffer in iter { if input_rt_data_c[T].1.len() < moving_number {
let mut avg = 0.0; sma_data.sma_value = 0.0;
for element in buffer { sma_data.close_time = 0;
avg += element.close_price;
}
avg /= (moving_number as f64);
sma_data.sma_value = avg;
sma_data.close_time = buffer.last().unwrap().close_time;
sma_data_vec.push(sma_data.clone()); sma_data_vec.push(sma_data.clone());
} else {
let mut iter = input_rt_data_c[T].1.windows(moving_number);
for buffer in iter {
let mut avg = 0.0;
for element in buffer {
avg += element.close_price;
}
avg /= (moving_number as f64);
sma_data.sma_value = avg;
sma_data.close_time = buffer.last().unwrap().close_time;
sma_data_vec.push(sma_data.clone());
}
} }
} let mut sma_data_wrapper_lock = sma_data_wrapper_arc_c.lock().await;
sma_data_wrapper.push((symbol.clone(), sma_data_vec.clone())); sma_data_wrapper_lock.push((symbol_c.0.clone(), sma_data_vec.clone()));
}));
} }
None => {} None => {}
} }
} }
// println!(" indicators/sma{} 완료 elapsed:{:.2}s", moving_number, instant.elapsed().as_secs_f32()); try_join_all(task_vec).await?;
Ok(sma_data_wrapper) let a = sma_data_wrapper_arc.lock().await.to_owned();
Ok(a)
} }