Make calculation in parallel

This commit is contained in:
Sik Yoon 2023-08-12 23:23:01 +09:00
parent 1a242da6b3
commit ddbaaa0508

View File

@ -7,8 +7,27 @@ use csv::{DeserializeRecordsIter, StringRecord};
use serde::Deserialize;
use sqlx::FromRow;
use std::f64::NAN;
use futures::{future::try_join_all, lock::Mutex};
use std::sync::Arc;
use tokio::{fs::*, io::AsyncWriteExt, time::*};
#[derive(Clone, Debug)]
pub struct StochRsiData {
pub k: f64,
pub d: f64,
pub close_time: i64,
}
impl StochRsiData {
fn new() -> StochRsiData {
let a = StochRsiData {
k: 0.0,
d: 0.0,
close_time: 0,
};
a
}
}
#[derive(Clone, Debug)]
pub struct StochRsiKData {
pub stoch_rsi_k_value: f64,
@ -24,108 +43,91 @@ impl StochRsiKData {
}
}
#[derive(Clone, Debug)]
pub struct StochRsiDData {
pub stoch_rsi_d_value: f64,
pub close_time: i64,
}
impl StochRsiDData {
fn new() -> StochRsiDData {
let a = StochRsiDData {
stoch_rsi_d_value: 0.0,
close_time: 0,
};
a
}
}
// Binance Stoch RSI (RSI10, length 10, K: 3, D: 3)
// Binance Stoch RSI
pub async fn stoch_rsi(
input_rsi_data: &Vec<(String, Vec<RsiData>)>,
stoch_rsi_length: usize,
output_stoch_rsi_k_data: &mut Vec<(String, Vec<StochRsiKData>)>,
output_stoch_rsi_d_data: &mut Vec<(String, Vec<StochRsiDData>)>,
valid_usdt_trades: &Vec<String>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut read_data_vec: Vec<RsiData> = Vec::new();
let mut stoch_rsi_k_data_wrapper: Vec<(String, Vec<StochRsiKData>)> = Vec::new();
let mut stoch_rsi_k_data_vec: Vec<StochRsiKData> = Vec::new();
let mut stoch_rsi_k_data = StochRsiKData::new();
let mut stoch_rsi_d_data_wrapper: Vec<(String, Vec<StochRsiDData>)> = Vec::new();
let mut stoch_rsi_d_data_vec: Vec<StochRsiDData> = Vec::new();
let mut stoch_rsi_d_data = StochRsiDData::new();
smooth_k: usize,
smooth_d: usize,
) -> Result<Vec<(String, Vec<StochRsiData>)>, Box<dyn std::error::Error + Send + Sync>> {
let mut stoch_rsi_data_wrapper: Vec<(String, Vec<StochRsiData>)> = Vec::new();
let mut stoch_rsi_data_wrapper_arc = Arc::new(Mutex::new(stoch_rsi_data_wrapper));
let mut stoch_rsi_data_vec: Vec<StochRsiData> = Vec::new();
let mut stoch_rsi_vec: Vec<RsiData> = Vec::new();
let mut stoch_rsi = RsiData::new();
let k_length = 2;
let d_length = 2;
for symbol in valid_usdt_trades {
stoch_rsi_k_data_vec.clear();
stoch_rsi_d_data_vec.clear();
stoch_rsi_vec.clear();
let symbol_search_result = input_rsi_data.iter().position(|x| x.0 == *symbol);
match symbol_search_result {
Some(T) => {
if input_rsi_data[T].1.len() >= stoch_rsi_length {
read_data_vec = input_rsi_data[T].1.clone();
let window_iter = read_data_vec.windows(stoch_rsi_length);
for buffer_window in window_iter {
let max_value = buffer_window
.iter()
.max_by(|x, y| x.rsi_value.partial_cmp(&y.rsi_value).unwrap())
.unwrap()
.rsi_value;
let min_value = buffer_window
.iter()
.min_by(|x, y| x.rsi_value.partial_cmp(&y.rsi_value).unwrap())
.unwrap()
.rsi_value;
let stoch_rsi_value = if max_value == min_value {
max_value
} else {
(buffer_window.last().unwrap().rsi_value - min_value)
/ (max_value - min_value)
};
stoch_rsi.rsi_value = stoch_rsi_value;
stoch_rsi.close_time = buffer_window.last().unwrap().close_time;
stoch_rsi_vec.push(stoch_rsi.clone());
}
// making Stoch RSI K data
let window_iter = stoch_rsi_vec.windows(k_length);
for buffer_window in window_iter {
stoch_rsi_k_data.stoch_rsi_k_value =
(buffer_window.iter().fold(0.0, |acc, x| acc + x.rsi_value)
/ (k_length as f64))
* 100.0;
stoch_rsi_k_data.close_time = buffer_window.last().unwrap().close_time;
stoch_rsi_k_data_vec.push(stoch_rsi_k_data.clone());
}
stoch_rsi_k_data_wrapper.push((symbol.clone(), stoch_rsi_k_data_vec.clone()));
// making Stoch RSI D data
let window_iter = stoch_rsi_k_data_vec.windows(d_length);
for buffer_window in window_iter {
stoch_rsi_d_data.stoch_rsi_d_value = (buffer_window
.iter()
.fold(0.0, |acc, x| acc + x.stoch_rsi_k_value)
/ (k_length as f64));
stoch_rsi_d_data.close_time = buffer_window.last().unwrap().close_time;
stoch_rsi_d_data_vec.push(stoch_rsi_d_data.clone());
}
stoch_rsi_d_data_wrapper.push((symbol.clone(), stoch_rsi_d_data_vec.clone()));
}
}
None => {}
}
if stoch_rsi_length == 0 || smooth_k == 0 || smooth_d == 0 {
panic!("stoch_rsi_length or smooth_k or smooth_d can't be 0! stoch_rsi_length: {}, k:{}, d:{}", stoch_rsi_length, smooth_k, smooth_d);
}
*output_stoch_rsi_k_data = stoch_rsi_k_data_wrapper;
*output_stoch_rsi_d_data = stoch_rsi_d_data_wrapper;
let mut task_vec = Vec::new();
for element in input_rsi_data {
let mut stoch_rsi_data = StochRsiData::new();
let mut stoch_rsi_k_data = StochRsiKData::new();
let stoch_rsi_data_wrapper_arc_c = Arc::clone(&stoch_rsi_data_wrapper_arc);
Ok(())
let element_c = element.clone();
task_vec.push(tokio::spawn(async move {
let mut stoch_rsi_data_vec: Vec<StochRsiData> = Vec::new();
let mut stoch_rsi_k_data_vec: Vec<StochRsiKData> = Vec::new();
let mut stoch_rsi_vec: Vec<RsiData> = Vec::new();
let mut stoch_rsi = RsiData::new();
if element_c.1.len() >= stoch_rsi_length &&
element_c.1.len() >= smooth_k &&
element_c.1.len() >= smooth_d {
let mut read_data_vec = element_c.1;
let window_iter = read_data_vec.windows(stoch_rsi_length);
for buffer_window in window_iter {
let max_value = buffer_window
.iter()
.max_by(|x, y| x.rsi_value.partial_cmp(&y.rsi_value).unwrap())
.unwrap()
.rsi_value;
let min_value = buffer_window
.iter()
.min_by(|x, y| x.rsi_value.partial_cmp(&y.rsi_value).unwrap())
.unwrap()
.rsi_value;
let stoch_rsi_value = if max_value == min_value {
max_value
} else {
(buffer_window.last().unwrap().rsi_value - min_value)
/ (max_value - min_value)
};
stoch_rsi.rsi_value = stoch_rsi_value;
stoch_rsi.close_time = buffer_window.last().unwrap().close_time;
stoch_rsi_vec.push(stoch_rsi.clone());
}
// making Stoch RSI K data
let window_iter = stoch_rsi_vec.windows(smooth_k);
for buffer_window in window_iter {
stoch_rsi_k_data.stoch_rsi_k_value =
(buffer_window.iter().fold(0.0, |acc, x| acc + x.rsi_value)
/ (smooth_k as f64))
* 100.0;
stoch_rsi_k_data.close_time = buffer_window.last().unwrap().close_time;
stoch_rsi_k_data_vec.push(stoch_rsi_k_data.clone());
}
// making Stoch RSI D data and Stoch RSI data
let window_iter = stoch_rsi_k_data_vec.windows(smooth_d);
for buffer_window in window_iter {
stoch_rsi_data.close_time = buffer_window.last().unwrap().close_time;
stoch_rsi_data.k = buffer_window.last().unwrap().stoch_rsi_k_value;
stoch_rsi_data.d = (buffer_window
.iter()
.fold(0.0, |acc, x| acc + x.stoch_rsi_k_value)
/ (smooth_k as f64));
stoch_rsi_data_vec.push(stoch_rsi_data.clone());
}
let mut stoch_rsi_data_wrapper_lock = stoch_rsi_data_wrapper_arc_c.lock().await;
stoch_rsi_data_wrapper_lock.push((element_c.0.clone(), stoch_rsi_data_vec.clone()));
}
}));
}
try_join_all(task_vec).await?;
let a = stoch_rsi_data_wrapper_arc.lock().await.to_owned();
Ok(a)
}