Implement thread

This commit is contained in:
Sik Yoon 2024-01-01 13:36:06 +09:00
parent f4435da178
commit e3b6f9e293

View File

@ -1,4 +1,8 @@
use crate::value_estimation_team::datapoints::price_data::RealtimePriceData;
use super::FilteredData;
use std::sync::Arc;
use tokio::{fs::*, io::AsyncWriteExt, sync::Mutex, time::*};
use futures::future::try_join_all;
#[derive(Debug, Clone, PartialEq)]
pub enum HeatMapLevel {
@ -18,33 +22,39 @@ pub struct HeatmapVolumeData {
// Implementation from TradingView Script (HeatMap Volume by xdecow)
pub async fn heatmap_volume(
symbol: &String,
input_rt_data: &Vec<(String, Vec<RealtimePriceData>)>,
ma_len: usize,
std_len: usize,
extra_high_thold: f64,
high_thold: f64,
medium_thold: f64,
normal_thold: f64,
) -> Option<Vec<HeatmapVolumeData>> {
let symbol_search_result = input_rt_data.iter().position(|x| x.0 == *symbol);
filtered_symbols: &Vec<FilteredData>,
input_rt_data: &Vec<(String, Vec<RealtimePriceData>)>,
) -> Result<Vec<(String, Vec<HeatmapVolumeData>)>, Box<dyn std::error::Error + Send + Sync>> {
let mut heatmap_data_wrapper: Vec<(String, Vec<HeatmapVolumeData>)> = Vec::new();
let mut heatmap_data_wrapper_arc = Arc::new(Mutex::new(heatmap_data_wrapper));
match symbol_search_result {
Some(T) => {
if input_rt_data[T].1.len() >= ma_len && input_rt_data[T].1.len() >= std_len {
let mut task_vec = Vec::new();
for element in filtered_symbols {
let heatmap_data_wrapper_arc_c = Arc::clone(&heatmap_data_wrapper_arc);
let element_c = element.clone();
let input_rt_data_c = input_rt_data.clone();
task_vec.push(tokio::spawn(async move {
let search_result = input_rt_data_c.iter().position(|x| x.0 == element_c.symbol);
if search_result.is_some_and(|a| input_rt_data_c[a].1.len() >= ma_len && input_rt_data_c[a].1.len() >= std_len) {
// calc mean
#[derive(Debug, Clone)]
struct MeanData {
mean_value: f64,
close_time: i64,
}
let mut mean_vec: Vec<MeanData> = Vec::new();
let mut mean_data = MeanData {
mean_value: 0.0,
close_time: 0,
};
let window = input_rt_data[T].1.windows(ma_len);
let window = input_rt_data_c[search_result.unwrap()].1.windows(ma_len);
for buffer in window {
// calculate SMA of volume
mean_data.mean_value = 0.0;
@ -54,10 +64,10 @@ pub async fn heatmap_volume(
}
mean_data.mean_value /= ma_len as f64;
mean_data.close_time = buffer.last().unwrap().close_time;
mean_vec.push(mean_data.clone());
}
// calc pstdev
#[derive(Debug, Clone)]
struct PstdevData {
@ -72,7 +82,7 @@ pub async fn heatmap_volume(
let mut mean = 0.0;
let mut summation = 0.0;
let mut sample_minus_mean = 0.0;
let window = input_rt_data[T].1.windows(std_len);
let window = input_rt_data_c[search_result.unwrap()].1.windows(std_len);
for buffer in window {
pstdev_data.pstdev_value = 0.0;
pstdev_data.close_time = 0;
@ -83,18 +93,18 @@ pub async fn heatmap_volume(
mean += element.quote_asset_volume;
}
mean /= std_len as f64;
for element in buffer {
sample_minus_mean = element.quote_asset_volume - mean;
summation += sample_minus_mean.powi(2);
}
pstdev_data.pstdev_value = (summation / std_len as f64).sqrt();
pstdev_data.close_time = buffer.last().unwrap().close_time;
pstdev_vec.push(pstdev_data.clone());
}
// calc stdbar and heatmap volume
let mut heatmap_vol_vec: Vec<HeatmapVolumeData> = Vec::new();
let mut heatmap_vol_data = HeatmapVolumeData {
@ -102,11 +112,11 @@ pub async fn heatmap_volume(
heatmap_level: HeatMapLevel::Normal,
close_time: 0,
};
if ma_len == std_len {
let mut rt_data_trunc_vec =
input_rt_data[T].1.get(ma_len - 1..).unwrap().iter();
input_rt_data_c[search_result.unwrap()].1.get(ma_len - 1..).unwrap().iter();
let zipped = mean_vec.iter().zip(pstdev_vec.iter());
for element in zipped {
heatmap_vol_data.heatmap_value =
@ -129,7 +139,7 @@ pub async fn heatmap_volume(
}
} else if ma_len > std_len {
let mut rt_data_trunc_vec =
input_rt_data[T].1.get(std_len - 1..).unwrap().iter();
input_rt_data_c[search_result.unwrap()].1.get(std_len - 1..).unwrap().iter();
let mut mean_trunc_vec =
mean_vec.get(mean_vec.len() - std_len..).unwrap().iter();
let zipped = mean_trunc_vec.zip(pstdev_vec.iter());
@ -143,7 +153,7 @@ pub async fn heatmap_volume(
}
} else {
let mut rt_data_trunc_vec =
input_rt_data[T].1.get(ma_len - 1..).unwrap().iter();
input_rt_data_c[search_result.unwrap()].1.get(ma_len - 1..).unwrap().iter();
let mut pstdev_trunc_vec =
pstdev_vec.get(pstdev_vec.len() - ma_len..).unwrap().iter();
let zipped = mean_vec.iter().zip(pstdev_trunc_vec);
@ -156,11 +166,12 @@ pub async fn heatmap_volume(
heatmap_vol_vec.push(heatmap_vol_data.clone());
} // level 구현
}
Some(heatmap_vol_vec)
} else {
None
let mut heatmap_data_wrapper_lock = heatmap_data_wrapper_arc_c.lock().await;
heatmap_data_wrapper_lock.push((element_c.symbol.clone(), heatmap_vol_vec.clone()));
}
}
None => None,
}));
}
try_join_all(task_vec).await?;
let a = heatmap_data_wrapper_arc.lock().await.to_owned();
Ok(a)
}