#![allow(unused)] #![allow(warnings)] use crate::database_control::*; use crate::value_estimation_team::datapoints::price_data::RealtimePriceData; use csv::{DeserializeRecordsIter, StringRecord}; use futures::future::try_join_all; use serde::Deserialize; use sqlx::FromRow; use std::sync::Arc; use tokio::{fs::*, io::AsyncWriteExt, sync::Mutex, time::*}; #[derive(Clone, Debug)] pub struct SmaData { pub sma_value: f64, pub close_time: i64, } impl SmaData { fn new() -> SmaData { let a = SmaData { sma_value: 0.0, close_time: 0, }; a } } // Binance MA (closeprice) pub async fn sma( moving_number: usize, input_rt_data: &Vec<(String, Vec)>, filtered_symbols: &Vec<(String, i64)>, ) -> Result)>, Box> { if filtered_symbols.is_empty() { Err("Err")?; } let mut sma_data_wrapper: Vec<(String, Vec)> = Vec::new(); let mut sma_data_wrapper_arc = Arc::new(Mutex::new(sma_data_wrapper)); let mut task_vec = Vec::new(); for symbol in filtered_symbols { let symbol_search_result = input_rt_data.iter().position(|x| x.0 == *symbol.0); match symbol_search_result { Some(T) => { let sma_data_wrapper_arc_c = Arc::clone(&sma_data_wrapper_arc); let symbol_c = symbol.clone(); let mut sma_data = SmaData::new(); let mut sma_data_vec: Vec = Vec::new(); let input_rt_data_c = input_rt_data.clone(); task_vec.push(tokio::spawn(async move { if input_rt_data_c[T].1.len() < moving_number { sma_data.sma_value = 0.0; sma_data.close_time = 0; sma_data_vec.push(sma_data.clone()); } else { let mut iter = input_rt_data_c[T].1.windows(moving_number); for buffer in iter { let mut avg = 0.0; for element in buffer { avg += element.close_price; } avg /= (moving_number as f64); sma_data.sma_value = avg; sma_data.close_time = buffer.last().unwrap().close_time; sma_data_vec.push(sma_data.clone()); } } let mut sma_data_wrapper_lock = sma_data_wrapper_arc_c.lock().await; sma_data_wrapper_lock.push((symbol_c.0.clone(), sma_data_vec.clone())); })); } None => {} } } try_join_all(task_vec).await?; let a = sma_data_wrapper_arc.lock().await.to_owned(); Ok(a) }