Apply threading for iteration work

This commit is contained in:
Sik Yoon 2023-07-22 20:17:19 +09:00
parent f846d0e978
commit 500026212f

View File

@ -7,7 +7,9 @@ use crate::value_estimation_team::indicators::sma::SmaData;
use csv::{DeserializeRecordsIter, StringRecord}; use csv::{DeserializeRecordsIter, StringRecord};
use serde::Deserialize; use serde::Deserialize;
use sqlx::FromRow; use sqlx::FromRow;
use tokio::{fs::*, io::AsyncWriteExt, time::*}; use tokio::{fs::*, io::AsyncWriteExt, time::*, sync::Mutex};
use futures::future::try_join_all;
use std::{sync::Arc};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct BollingerBandData { pub struct BollingerBandData {
@ -28,103 +30,107 @@ impl BollingerBandData {
a a
} }
} }
// Binance Bollingerband (SMA) // Binance Bollingerband (SMA)
pub async fn bollingerband( pub async fn bollingerband(
ma_number: usize, period: usize, // this value should be same as moving size of sma
sd_factor: f64, sd_factor: f64,
input_sma_data: &Vec<(String, Vec<SmaData>)>, input_sma_data: &Vec<(String, Vec<SmaData>)>,
rt_input_data: &Vec<(String, Vec<RealtimePriceData>)>, input_rt_data: &Vec<(String, Vec<RealtimePriceData>)>,
valid_usdt_trades: &Vec<String>, filtered_symbols: &Vec<(String, i64)>,
) -> Result<Vec<(String, Vec<BollingerBandData>)>, Box<dyn std::error::Error + Send + Sync>> { ) -> Result<Vec<(String, Vec<BollingerBandData>)>, Box<dyn std::error::Error + Send + Sync>> {
let instant = Instant::now(); if filtered_symbols.is_empty() {
Err(("Err"))?;
}
let mut read_rt_data_vec: Vec<(String, Vec<RealtimePriceData>)> = rt_input_data.clone(); let mut read_rt_data_vec: Vec<(String, Vec<RealtimePriceData>)> = input_rt_data.clone();
let mut read_sma_data_vec: Vec<(String, Vec<SmaData>)> = input_sma_data.clone(); let mut read_sma_data_vec: Vec<(String, Vec<SmaData>)> = input_sma_data.clone();
let mut standard_deviation: f64 = 0.0;
let mut sd_mean: f64 = 0.0;
let mut read_data_buffer: Option<&SmaData>;
let mut bb_data_wrapper: Vec<(String, Vec<BollingerBandData>)> = Vec::new(); let mut bb_data_wrapper: Vec<(String, Vec<BollingerBandData>)> = Vec::new();
let mut bb_data_vec: Vec<BollingerBandData> = Vec::new(); let mut bb_data_wrapper_arc = Arc::new(Mutex::new(bb_data_wrapper));
let mut task_vec = Vec::new();
for symbol in valid_usdt_trades { for symbol in filtered_symbols {
bb_data_vec.clear(); let symbol_search_result1 = read_rt_data_vec.iter().position(|x| x.0 == *symbol.0);
let mut bb_data = BollingerBandData::new(); let symbol_search_result2 = read_sma_data_vec.iter().position(|x| x.0 == *symbol.0);
let symbol_search_result1 = read_rt_data_vec.iter().position(|x| x.0 == *symbol);
let symbol_search_result2 = read_sma_data_vec.iter().position(|x| x.0 == *symbol);
match symbol_search_result1 { match symbol_search_result1 {
Some(rt_index) => { Some(rt_index) => {
match symbol_search_result2 { match symbol_search_result2 {
Some(sma_index) => { Some(sma_index) => {
// if the data has shorter than buffer let bb_data_wrapper_arc_c = Arc::clone(&bb_data_wrapper_arc);
if read_sma_data_vec[sma_index].1.len() < ma_number { let mut bb_data = BollingerBandData::new();
bb_data.sma = 0.0; let mut bb_data_vec: Vec<BollingerBandData> = Vec::new();
bb_data.upperband = 0.0; let read_rt_data_vec_c = read_rt_data_vec.clone();
bb_data.lowerband = 0.0; let read_sma_data_vec_c = read_sma_data_vec.clone();
bb_data.close_time = 0; let symbol_c = symbol.clone();
bb_data_vec.push(bb_data.clone()); task_vec.push(tokio::spawn(async move {
} else { // if the data has shorter than buffer
let result = read_rt_data_vec[rt_index].1.binary_search_by_key( if read_sma_data_vec_c[sma_index].1.len() < period {
&read_sma_data_vec[sma_index].1.first().unwrap().close_time, bb_data.sma = 0.0;
|RealtimePriceData { bb_data.upperband = 0.0;
opclo_price, bb_data.lowerband = 0.0;
open_price, bb_data.close_time = 0;
close_price, bb_data_vec.push(bb_data.clone());
high_price, } else {
low_price, let result = read_rt_data_vec_c[rt_index].1.binary_search_by_key(
close_time, &read_sma_data_vec_c[sma_index].1.first().unwrap().close_time,
quote_asset_volume, |RealtimePriceData {
candle_type, opclo_price,
}| *close_time, open_price,
); close_price,
match result { high_price,
Ok(T) => { low_price,
if T <= ma_number - 1 { close_time,
let mut read_data_iter = quote_asset_volume,
read_sma_data_vec[sma_index].1.iter(); candle_type,
for _ in T..ma_number - 1 { }| *close_time,
read_data_iter.next(); );
}
let window_iter =
read_rt_data_vec[rt_index].1.windows(ma_number);
for buffer in window_iter {
sd_mean = 0.0;
standard_deviation = 0.0;
for element in buffer {
sd_mean += element.close_price;
}
sd_mean /= (ma_number as f64);
for element in buffer {
standard_deviation +=
(element.close_price - sd_mean).powi(2);
}
standard_deviation = sd_factor
* ((standard_deviation / ma_number as f64).sqrt());
read_data_buffer = read_data_iter.next(); match result {
Ok(T) => {
match read_data_buffer { if T <= period - 1 {
Some(T) => { let mut read_data_iter =
bb_data.sma = T.sma_value; read_sma_data_vec_c[sma_index].1.iter();
bb_data.upperband = for _ in T..period - 1 {
T.sma_value + standard_deviation; read_data_iter.next();
bb_data.lowerband = }
T.sma_value - standard_deviation; let window_iter =
bb_data.close_time = T.close_time; read_rt_data_vec_c[rt_index].1.windows(period);
bb_data_vec.push(bb_data.clone()); for buffer in window_iter {
let mut sd_mean = 0.0;
let mut standard_deviation = 0.0;
for element in buffer {
sd_mean += element.close_price;
}
sd_mean /= (period as f64);
for element in buffer {
standard_deviation +=
(element.close_price - sd_mean).powi(2);
}
standard_deviation = sd_factor
* ((standard_deviation / period as f64).sqrt());
match read_data_iter.next() {
Some(T) => {
bb_data.sma = T.sma_value;
bb_data.upperband =
T.sma_value + standard_deviation;
bb_data.lowerband =
T.sma_value - standard_deviation;
bb_data.close_time = T.close_time;
bb_data_vec.push(bb_data.clone());
}
None => {}
} }
None => {}
} }
} }
} }
Err(E) => {}
} }
Err(E) => {} let mut bb_data_wrapper_lock = bb_data_wrapper_arc_c.lock().await;
bb_data_wrapper_lock.push((symbol_c.0.clone(), bb_data_vec.clone()));
} }
bb_data_wrapper.push((symbol.clone(), bb_data_vec.clone())); }));
}
} }
None => {} None => {}
} }
@ -132,6 +138,7 @@ pub async fn bollingerband(
None => {} None => {}
} }
} }
// println!(" indicators/bb{} 완료 elapsed:{:.2}s", ma_number, instant.elapsed().as_secs_f32()); try_join_all(task_vec).await?;
Ok(bb_data_wrapper) let a = bb_data_wrapper_arc.lock().await.to_owned();
Ok(a)
} }