Change code to apply thread

This commit is contained in:
Sik Yoon 2023-09-09 01:38:59 +09:00
parent afe32de899
commit f5ca681f9b

View File

@ -3,9 +3,11 @@
use crate::database_control::*; use crate::database_control::*;
use crate::value_estimation_team::datapoints::price_data::RealtimePriceData; use crate::value_estimation_team::datapoints::price_data::RealtimePriceData;
use csv::{DeserializeRecordsIter, StringRecord}; use csv::{DeserializeRecordsIter, StringRecord};
use futures::future::try_join_all;
use serde::Deserialize; use serde::Deserialize;
use sqlx::FromRow; use sqlx::FromRow;
use tokio::{fs::*, io::AsyncWriteExt, time::*}; use std::sync::Arc;
use tokio::{fs::*, io::AsyncWriteExt, sync::Mutex, time::*};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct EmaData { pub struct EmaData {
@ -24,61 +26,70 @@ impl EmaData {
// Binance EMA (closeprice) // Binance EMA (closeprice)
pub async fn ema( pub async fn ema(
ema_number: usize, moving_number: usize,
input_rt_data: &Vec<(String, Vec<RealtimePriceData>)>, input_rt_data: &Vec<(String, Vec<RealtimePriceData>)>,
output_ema_data: &mut Vec<(String, Vec<EmaData>)>, filtered_symbols: &Vec<(String, i64)>,
valid_usdt_trades: &Vec<String>, ) -> Result<Vec<(String, Vec<EmaData>)>, Box<dyn std::error::Error + Send + Sync>> {
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { if filtered_symbols.is_empty() {
let instant = Instant::now(); Err("Err")?;
let alpha: f64 = 2.0 / (ema_number as f64 + 1.0); }
let alpha: f64 = 2.0 / (moving_number as f64 + 1.0);
let mut ema_t: f64 = 0.0; let mut ema_t: f64 = 0.0;
let mut ema_prev: f64 = 0.0; let mut ema_prev: f64 = 0.0;
let mut ema_data = EmaData::new();
let mut ema_data_wrapper: Vec<(String, Vec<EmaData>)> = Vec::new(); let mut ema_data_wrapper: Vec<(String, Vec<EmaData>)> = Vec::new();
let mut ema_data_vec: Vec<EmaData> = Vec::new(); let mut ema_data_wrapper_arc = Arc::new(Mutex::new(ema_data_wrapper));
for symbol in valid_usdt_trades { let mut task_vec = Vec::new();
let symbol_search_result = input_rt_data.iter().position(|x| x.0 == *symbol); for symbol in filtered_symbols {
ema_data_vec.clear(); let symbol_search_result = input_rt_data.iter().position(|x| x.0 == *symbol.0);
match symbol_search_result { match symbol_search_result {
Some(T) => { Some(T) => {
if input_rt_data[T].1.len() < ema_number { let ema_data_wrapper_arc_c = Arc::clone(&ema_data_wrapper_arc);
ema_data.ema_value = 0.0; let symbol_c = symbol.clone();
ema_data.close_time = 0; let mut ema_data = EmaData::new();
ema_data_vec.push(ema_data.clone()); let mut ema_data_vec: Vec<EmaData> = Vec::new();
} else { let input_rt_data_c = input_rt_data.clone();
let partial_vec1 = input_rt_data[T].1.get(..ema_number).unwrap(); task_vec.push(tokio::spawn(async move {
let partial_vec2 = input_rt_data[T].1.get(ema_number..).unwrap(); if input_rt_data_c[T].1.len() < moving_number {
ema_data.ema_value = 0.0;
ema_data.close_time = 0;
ema_data_vec.push(ema_data.clone());
} else {
let partial_vec1 = input_rt_data_c[T].1.get(..moving_number).unwrap();
let partial_vec2 = input_rt_data_c[T].1.get(moving_number..).unwrap();
let mut sma_for_initial_value = 0.0; let mut sma_for_initial_value = 0.0;
for element in partial_vec1 { for element in partial_vec1 {
sma_for_initial_value += element.close_price; sma_for_initial_value += element.close_price;
} }
sma_for_initial_value /= ema_number as f64; sma_for_initial_value /= moving_number as f64;
ema_data.ema_value = sma_for_initial_value; ema_data.ema_value = sma_for_initial_value;
ema_data.close_time = partial_vec1.last().unwrap().close_time; ema_data.close_time = partial_vec1.last().unwrap().close_time;
ema_data_vec.push(ema_data.clone());
ema_prev = sma_for_initial_value;
for element in partial_vec2 {
ema_t = (1.0 - alpha) * ema_prev + alpha * element.close_price;
ema_data.ema_value = ema_t;
ema_data.close_time = element.close_time;
ema_data_vec.push(ema_data.clone()); ema_data_vec.push(ema_data.clone());
ema_prev = ema_t; ema_prev = sma_for_initial_value;
for element in partial_vec2 {
ema_t = (1.0 - alpha) * ema_prev + alpha * element.close_price;
ema_data.ema_value = ema_t;
ema_data.close_time = element.close_time;
ema_data_vec.push(ema_data.clone());
ema_prev = ema_t;
}
} }
} let mut ema_data_wrapper_lock = ema_data_wrapper_arc_c.lock().await;
ema_data_wrapper.push((symbol.clone(), ema_data_vec.clone())); ema_data_wrapper_lock.push((symbol_c.0.clone(), ema_data_vec.clone()));
}));
} }
None => {} None => {}
} }
} }
*output_ema_data = ema_data_wrapper; try_join_all(task_vec).await?;
// println!(" indicators/ema{} 완료 elapsed:{:.2}s", ema_number, instant.elapsed().as_secs_f32()); let a = ema_data_wrapper_arc.lock().await.to_owned();
Ok(()) Ok(a)
} }