Make calculation in parallel

This commit is contained in:
Sik Yoon 2023-08-13 00:02:42 +09:00
parent ddbaaa0508
commit 45f640e3bc

View File

@ -7,7 +7,9 @@ use csv::{DeserializeRecordsIter, StringRecord};
use serde::Deserialize;
use sqlx::FromRow;
use std::f64::NAN;
use tokio::{fs::*, io::AsyncWriteExt, time::*};
use futures::future::try_join_all;
use std::sync::Arc;
use tokio::{fs::*, io::AsyncWriteExt, sync::Mutex, time::*};
#[derive(Clone, Debug)]
pub struct RsiData {
@ -28,124 +30,129 @@ impl RsiData {
pub async fn rsi(
rsi_number: usize,
input_rt_data: &Vec<(String, Vec<RealtimePriceData>)>,
output_rsi_data: &mut Vec<(String, Vec<RsiData>)>,
filtered_symbols: &Vec<(String, i64)>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
) -> Result<Vec<(String, Vec<RsiData>)>, Box<dyn std::error::Error + Send + Sync>> {
let instant = Instant::now();
let mut read_data_vec: Vec<RealtimePriceData> = Vec::new();
let mut read_price_buffer: Vec<RealtimePriceData> = Vec::new();
let mut prev_price: f64 = 0.0;
let mut current_price: f64 = 0.0;
let mut sum_increase: f64 = 0.0;
let mut sum_decrease: f64 = 0.0;
let mut rsi: f64 = 0.0;
let mut last_close_time = 0;
let mut rsi_data_wrapper: Vec<(String, Vec<RsiData>)> = Vec::new();
let mut rsi_data_vec: Vec<RsiData> = Vec::new();
let mut rsi_data = RsiData::new();
for symbol in filtered_symbols {
read_data_vec.clear();
read_price_buffer.clear();
rsi_data_vec.clear();
let symbol_search_result = input_rt_data.iter().position(|x| x.0 == *symbol.0);
let mut rsi_data_wrapper_arc = Arc::new(Mutex::new(rsi_data_wrapper));
let mut task_vec = Vec::new();
for element in filtered_symbols {
let element_c = element.clone();
let rsi_data_wrapper_arc_c = Arc::clone(&rsi_data_wrapper_arc);
let symbol_search_result = input_rt_data.iter().position(|x| x.0 == *element_c.0);
match symbol_search_result {
Some(T) => {
if input_rt_data[T].1.len() < rsi_number + 1 {
rsi_data.rsi_value = 0.0;
rsi_data.close_time = 0;
rsi_data_vec.push(rsi_data.clone());
} else {
read_data_vec = input_rt_data[T].1.clone();
if read_data_vec.len() >= (150 + rsi_number) as usize {
read_data_vec.reverse();
read_data_vec.truncate((150 + rsi_number) as usize);
read_data_vec.reverse();
}
let window_iter = read_data_vec.windows(rsi_number + 1);
let input_rt_data_c = input_rt_data.clone();
task_vec.push(tokio::spawn(async move {
let mut rsi_data_vec: Vec<RsiData> = Vec::new();
let mut rsi_data = RsiData::new();
let mut read_data_vec: Vec<RealtimePriceData> = Vec::new();
let mut read_price_buffer: Vec<RealtimePriceData> = Vec::new();
let mut prev_price: f64 = 0.0;
let mut current_price: f64 = 0.0;
let mut sum_increase: f64 = 0.0;
let mut sum_decrease: f64 = 0.0;
let mut rsi: f64 = 0.0;
let mut last_close_time = 0;
let mut prev_avg_ups: Option<f64> = None;
let mut prev_avg_downs: Option<f64> = None;
let mut current_avg_ups: Option<f64> = None;
let mut current_avg_downs: Option<f64> = None;
for buffer in window_iter {
let mut up_vec: Vec<f64> = Vec::new();
let mut down_vec: Vec<f64> = Vec::new();
let buffer_window = buffer.windows(2);
for element in buffer_window {
if element.last().unwrap().close_price
- element.first().unwrap().close_price
> 0.0
{
up_vec.push(
element.last().unwrap().close_price
- element.first().unwrap().close_price,
);
down_vec.push(0.0);
} else if element.last().unwrap().close_price
- element.first().unwrap().close_price
< 0.0
{
up_vec.push(0.0);
down_vec.push(
(element.last().unwrap().close_price
- element.first().unwrap().close_price)
.abs(),
);
} else {
up_vec.push(0.0);
down_vec.push(0.0);
}
}
if current_avg_ups == None
&& current_avg_downs == None
&& prev_avg_ups == None
&& prev_avg_downs == None
{
// initial averages based on SMA
let mut avg_ups: f64 = up_vec.iter().sum();
avg_ups /= rsi_number as f64;
let mut avg_downs: f64 = down_vec.iter().sum();
avg_downs /= rsi_number as f64;
current_avg_ups = Some(avg_ups);
current_avg_downs = Some(avg_downs);
} else {
// [EMA]
let alpha = 1.0 / (rsi_number as f64); // Wilder's weight
current_avg_ups = Some(
alpha * up_vec.last().unwrap()
+ ((1.0 - alpha) * prev_avg_ups.unwrap()),
);
current_avg_downs = Some(
alpha * down_vec.last().unwrap()
+ ((1.0 - alpha) * prev_avg_downs.unwrap()),
);
}
prev_avg_ups = current_avg_ups;
prev_avg_downs = current_avg_downs;
let rs =
current_avg_ups.unwrap() / (current_avg_downs.unwrap() + 0.00000001); // 0.00000001 is used to avoid division by 0
let rsi = 100.0 - (100.0 / (1.0 + rs));
rsi_data.rsi_value = rsi;
rsi_data.close_time = buffer.last().unwrap().close_time;
if input_rt_data_c[T].1.len() < rsi_number + 1 {
rsi_data.rsi_value = 0.0;
rsi_data.close_time = 0;
rsi_data_vec.push(rsi_data.clone());
} else {
read_data_vec = input_rt_data_c[T].1.clone();
if read_data_vec.len() >= (150 + rsi_number) as usize {
read_data_vec.reverse();
read_data_vec.truncate((150 + rsi_number) as usize);
read_data_vec.reverse();
}
let window_iter = read_data_vec.windows(rsi_number + 1);
let mut prev_avg_ups: Option<f64> = None;
let mut prev_avg_downs: Option<f64> = None;
let mut current_avg_ups: Option<f64> = None;
let mut current_avg_downs: Option<f64> = None;
for buffer in window_iter {
let mut up_vec: Vec<f64> = Vec::new();
let mut down_vec: Vec<f64> = Vec::new();
let buffer_window = buffer.windows(2);
for element in buffer_window {
if element.last().unwrap().close_price
- element.first().unwrap().close_price
> 0.0
{
up_vec.push(
element.last().unwrap().close_price
- element.first().unwrap().close_price,
);
down_vec.push(0.0);
} else if element.last().unwrap().close_price
- element.first().unwrap().close_price
< 0.0
{
up_vec.push(0.0);
down_vec.push(
(element.last().unwrap().close_price
- element.first().unwrap().close_price)
.abs(),
);
} else {
up_vec.push(0.0);
down_vec.push(0.0);
}
}
if current_avg_ups == None
&& current_avg_downs == None
&& prev_avg_ups == None
&& prev_avg_downs == None
{
// initial averages based on SMA
let mut avg_ups: f64 = up_vec.iter().sum();
avg_ups /= rsi_number as f64;
let mut avg_downs: f64 = down_vec.iter().sum();
avg_downs /= rsi_number as f64;
current_avg_ups = Some(avg_ups);
current_avg_downs = Some(avg_downs);
} else {
// [EMA]
let alpha = 1.0 / (rsi_number as f64); // Wilder's weight
current_avg_ups = Some(
alpha * up_vec.last().unwrap()
+ ((1.0 - alpha) * prev_avg_ups.unwrap()),
);
current_avg_downs = Some(
alpha * down_vec.last().unwrap()
+ ((1.0 - alpha) * prev_avg_downs.unwrap()),
);
}
prev_avg_ups = current_avg_ups;
prev_avg_downs = current_avg_downs;
let rs =
current_avg_ups.unwrap() / (current_avg_downs.unwrap() + 0.00000001); // 0.00000001 is used to avoid division by 0
let rsi = 100.0 - (100.0 / (1.0 + rs));
rsi_data.rsi_value = rsi;
rsi_data.close_time = buffer.last().unwrap().close_time;
rsi_data_vec.push(rsi_data.clone());
}
let mut rsi_data_wrapper_lock = rsi_data_wrapper_arc_c.lock().await;
rsi_data_wrapper_lock.push((element_c.0.clone(), rsi_data_vec.clone()));
}
rsi_data_wrapper.push((symbol.0.clone(), rsi_data_vec.clone()));
}
}));
}
None => {}
}
}
*output_rsi_data = rsi_data_wrapper;
// println!(" indicators/rsi{} 완료 elapsed:{:.2}s", rsi_number, instant.elapsed().as_secs_f32());
Ok(())
try_join_all(task_vec).await?;
let a = rsi_data_wrapper_arc.lock().await.to_owned();
Ok(a)
}