use crate::database_control::*; use futures::future::try_join_all; use futures::{stream, StreamExt}; use reqwest::{Client, ClientBuilder, Response}; use serde::Deserialize; use serde_json::Value; use sqlx::{Error, FromRow}; use std::borrow::{Borrow, BorrowMut}; use std::collections::HashMap; use std::sync::Arc; use tokio::{join, sync::Mutex, time::*}; #[derive(Debug, Clone)] pub struct CandleData { pub open_time: i64, pub open_price: f64, pub high_price: f64, pub low_price: f64, pub close_price: f64, pub volume: f64, pub close_time: i64, pub quote_asset_volume: f64, pub number_of_trades: i64, pub taker_buy_base_asset_volume: f64, pub taker_buy_quote_asset_volume: f64, pub ignore_this: f64, } impl CandleData { fn new() -> CandleData { let candle_data = CandleData { open_time: 0, open_price: 0.0, high_price: 0.0, low_price: 0.0, close_price: 0.0, volume: 0.0, close_time: 0, quote_asset_volume: 0.0, number_of_trades: 0, taker_buy_base_asset_volume: 0.0, taker_buy_quote_asset_volume: 0.0, ignore_this: 0.0, }; candle_data } } // fetch the list of valid usdt trades #[derive(Debug, FromRow, Clone)] struct Symbols { symbol: String, } // request candlestick data from endpoint for each symbol (Weight(IP) 1) // use the following intervals: 1m, 30m, 1d, 1w, 1mon (1M) async fn request_candlestick_data( symbol: String, interval: &String, client: &Client, candle_set: &mut HashMap>, ) -> Result<(), Box> { let mut query = String::from("https://api.binance.com/api/v3/klines?"); query.push_str("&symbol="); query.push_str(symbol.as_str()); query.push_str("&limit="); query.push_str("1000"); query.push_str("&interval="); if interval == "1mon" { query.push_str("1M"); } else { query.push_str(interval.as_str()); } let response = client.get(query).send().await?; let mut body = String::new(); body = response.text_with_charset("utf-8").await?; if interval == "1m" || interval == "30m" { de_candle_json(symbol, &interval, &body, candle_set).await?; } else if interval == "1d" || interval == "1w" || interval == "1mon" { de_candle_json2(symbol, &interval, &body, candle_set).await?; } Ok(()) } async fn de_candle_json( symbol: String, interval: &String, body: &String, candle_map: &mut HashMap>, ) -> Result<(), Box> { let v: Value = serde_json::from_str(body.as_str())?; let mut into_vec = v.as_array(); if into_vec == None { return Err("Err")?; } let mut candle_vec: Vec = Vec::new(); let mut candle_data = CandleData::new(); for element in into_vec.unwrap() { let inner_into_vec = element.as_array().unwrap(); candle_data.open_time = element[0].as_i64().unwrap(); candle_data.open_price = element[1].as_str().unwrap().parse::().unwrap(); candle_data.high_price = element[2].as_str().unwrap().parse::().unwrap(); candle_data.low_price = element[3].as_str().unwrap().parse::().unwrap(); candle_data.close_price = element[4].as_str().unwrap().parse::().unwrap(); candle_data.volume = element[5].as_str().unwrap().parse::().unwrap(); candle_data.close_time = element[6].as_i64().unwrap(); candle_data.quote_asset_volume = element[7].as_str().unwrap().parse::().unwrap(); candle_data.number_of_trades = element[8].as_i64().unwrap(); candle_data.taker_buy_base_asset_volume = element[9].as_str().unwrap().parse::().unwrap(); candle_data.taker_buy_quote_asset_volume = element[10].as_str().unwrap().parse::().unwrap(); candle_data.ignore_this = element[11].as_str().unwrap().parse::().unwrap(); candle_vec.push(candle_data.clone()); } candle_map.insert(symbol, candle_vec); // let search_result = candle_set.iter().position(|x| x.0 == symbol); // match search_result { // Some(T) => { // candle_set[T].1 = candle_vec; // }, // None => { // candle_set.push((symbol, candle_vec)); // } // } Ok(()) } async fn de_candle_json2( symbol: String, interval: &String, body: &String, candle_map: &mut HashMap>, ) -> Result<(), Box> { let v: Value = serde_json::from_str(body.as_str())?; let mut into_vec = v.as_array(); if into_vec == None { return Err("Err")?; } let mut candle_vec: Vec = Vec::new(); let mut candle_data = CandleData::new(); for element in into_vec.unwrap() { let inner_into_vec = element.as_array().unwrap(); candle_data.open_time = element[0].as_i64().unwrap(); candle_data.open_price = element[1].as_str().unwrap().parse::().unwrap(); candle_data.high_price = element[2].as_str().unwrap().parse::().unwrap(); candle_data.low_price = element[3].as_str().unwrap().parse::().unwrap(); candle_data.close_price = element[4].as_str().unwrap().parse::().unwrap(); candle_data.volume = element[5].as_str().unwrap().parse::().unwrap(); candle_data.close_time = element[6].as_i64().unwrap(); candle_data.quote_asset_volume = element[7].as_str().unwrap().parse::().unwrap(); candle_data.number_of_trades = element[8].as_i64().unwrap(); candle_data.taker_buy_base_asset_volume = element[9].as_str().unwrap().parse::().unwrap(); candle_data.taker_buy_quote_asset_volume = element[10].as_str().unwrap().parse::().unwrap(); candle_data.ignore_this = element[11].as_str().unwrap().parse::().unwrap(); candle_vec.push(candle_data.clone()); } if let Some(value) = candle_map.get_mut(&symbol) { *value = candle_vec; } else { candle_map.insert(symbol, candle_vec); } Ok(()) } // // request candlestick data from endpoint for each symbol (Weight(IP) 1) // // use the following intervals: 1m, 30m, 1d, 1w, 1mon(1M) // async fn request_candlestick_data(symbol: &String, interval: &String, client: &Client) -> Result<(), Box> { // let mut query = String::from("https://api.binance.com/api/v3/klines?"); // query.push_str("&symbol="); // query.push_str(symbol.as_str()); // query.push_str("&interval="); // if interval == "1mon" { // query.push_str("1M"); // } else { // query.push_str(interval.as_str()); // } // let response = client.get(query).send().await?; // let mut body = String::new(); // body = response.text_with_charset("utf-8").await?; // de_candle_json(&symbol, &interval, &body).await?; // // println!(" candle {} {} 완료", symbol, interval); // Ok(()) // } // // for initialization // pub async fn request_candlestick_initial (symbol_vec: &Vec, interval: &String) -> Result<(), Box> { // const CONCURRENT_REQUESTS: usize = 50; // #[derive(Clone, Debug)] // struct QuerySet { // symbol: String, // query_url: String, // interval: String, // body: String, // } // let client = ClientBuilder::new().connect_timeout(tokio::time::Duration::from_millis(10000)).build().unwrap(); // let mut query_set = QuerySet{ symbol: String::new(), query_url: String::new(), interval: String::new(), body: String::new() }; // let mut query_set_vec: Vec = Vec::new(); // for symbol in symbol_vec { // let mut query_url = String::from("https://api.binance.com/api/v3/klines?"); // query_url.push_str("&interval="); // if interval == "1mon" { // query_url.push_str("1M"); // } else { // query_url.push_str(interval.as_str()); // } // query_url.push_str("&symbol="); // query_url.push_str(symbol.as_str()); // query_set.query_url = query_url; // query_set.symbol = symbol.clone(); // query_set.interval = interval.clone(); // query_set_vec.push(query_set.clone()); // } // let bodies = stream::iter(query_set_vec) // .map(|mut query_set| { // let client = &client; // async move { // let mut response = client.get(query_set.query_url.clone()).send().await.unwrap(); // let mut body = response.text_with_charset("utf-8").await; // while let Err(e) = body { // response = client.get(query_set.query_url.clone()).send().await.unwrap(); // body = response.text_with_charset("utf-8").await; // sleep(Duration::from_secs(1)).await; // } // query_set.body = body.unwrap(); // query_set // } // }) // .buffer_unordered(CONCURRENT_REQUESTS); // bodies.for_each(|query_set| async move {de_candle_json(&query_set.symbol.to_string(), &query_set.interval.to_string(), &query_set.body.to_string()).await;}).await; // Ok(()) // } // for initialization pub async fn request_candlestick_initial( symbol: String, interval: &String, ) -> Result<(), Box> { let mut candle_set: HashMap> = HashMap::new(); let client = ClientBuilder::new() .timeout(tokio::time::Duration::from_millis(20000)) .build() .unwrap(); let mut query = String::from("https://api.binance.com/api/v3/klines?"); // query.push_str("&limit="); // query.push_str("200"); query.push_str("&symbol="); query.push_str(symbol.as_str()); query.push_str("&interval="); if interval == "1mon" { query.push_str("1M"); } else { query.push_str(interval.as_str()); } let mut response = client.get(&query).send().await; while let Err(e) = response { response = client.get(&query).send().await; sleep(Duration::from_secs(1)).await; } let mut body = response.unwrap().text_with_charset("utf-8").await; while let Err(e) = body { response = client.get(&query).send().await; while let Err(e) = response { response = client.get(&query).send().await; sleep(Duration::from_secs(1)).await; } body = response.unwrap().text_with_charset("utf-8").await; } de_candle_json(symbol, &interval, &body.unwrap(), &mut candle_set).await?; Ok(()) } // pub async fn request_candlestick_initial (symbol: &String, interval: &String) -> Result<(), Box> { // let client = ClientBuilder::new().timeout(tokio::time::Duration::from_millis(10000)).build().unwrap(); // let mut query = String::from("https://api.binance.com/api/v3/klines?"); // query.push_str("&symbol="); // query.push_str(symbol.as_str()); // query.push_str("&interval="); // if interval == "1mon" { // query.push_str("1M"); // } else { // query.push_str(interval.as_str()); // } // // let mut response = client.get(&query).send().await; // let temp = reqwest::get("http://site.with.redirect.loop").await; // if let Err(e) = temp{ // e. // } // if let Err(e) = response { // if e.is_timeout() { // loop{ // println!(">>> [timeout] retry fetching candle data: {} {}", symbol, interval); // tokio::time::sleep(Duration::from_secs(1)).await; // response = client.get(&query).send().await?; // if response.is_ok() { break; } // } // } // } else { // let mut body = String::new(); // body = response.unwrap().text_with_charset("utf-8").await.unwrap(); // de_candle_json(&symbol, &interval, &body).await.unwrap(); // } // // Ok(()) // } // async fn de_candle_json(symbol: &String, interval: &String, body: &String) -> Result<(), Box> { // let v: Value = serde_json::from_str(body.as_str())?; // let mut into_vec = v.as_array(); // if into_vec == None { // return Err("Err")? // } // // let mut one_candle_data: &Vec = Vec::new(); // let columns = vec!["openTime", "openPrice", "highPrice","lowPrice", "closePrice", // "volume", "closeTime", "quoteAssetVolume", "numberOfTrades", "takerBuyBaseAssetVolume", // "takerBuyQuoteAssetVolume", "ignoreThis"]; // let mut value_wrapper: Vec> = Vec::new(); // for element in into_vec.unwrap() { // let inner_into_vec = element.as_array().unwrap(); // let mut value_vec = Vec::new(); // for element in inner_into_vec { // if element.is_number() { // value_vec.push(element.as_i64().unwrap().to_string()); // } else if element.is_string() { // value_vec.push(element.as_str().unwrap().to_string()); // } else { // println!("Elements in body msg are changed. Please update parsing in de_candle_json."); // } // } // value_wrapper.push(value_vec); // } // store_candle_db(&symbol, &interval, &columns, value_wrapper).await; // Ok(()) // } async fn store_candle_db( symbol: &String, interval: &String, columns: &Vec<&str>, value_wrapper: Vec>, ) -> Result<(), Box> { let mut table_name = String::from("candle_"); table_name.push_str(symbol.as_str()); table_name.push('_'); table_name.push_str(interval.as_str()); delete_all_rows(&table_name).await.unwrap(); insert_records(&table_name, columns, &value_wrapper) .await .unwrap(); Ok(()) } pub async fn create_candle_table( symbol: &String, interval: &String, ) -> Result<(), Box> { let mut table_name = String::from("candle_"); table_name.push_str(symbol.as_str()); table_name.push('_'); table_name.push_str(interval.as_str()); let initial_table = vec![ ("id", "integer", Some("PK, AI")), ("openTime", "bigint", None), ("openPrice", "double", None), ("highPrice", "double", None), ("lowPrice", "double", None), ("closePrice", "double", None), ("volume", "double", None), ("closeTime", "bigint", None), ("quoteAssetVolume", "double", None), ("numberOfTrades", "int", None), ("takerBuyBaseAssetVolume", "double", None), ("takerBuyQuoteAssetVolume", "double", None), ("ignoreThis", "double", None), ]; let initial_columns = vec![ "openTime", "openPrice", "highPrice", "lowPrice", "closePrice", "volume", "closeTime", "quoteAssetVolume", "numberOfTrades", "takerBuyBaseAssetVolume", "takerBuyQuoteAssetVolume", "ignoreThis", ]; let initial_values = vec![ String::from("0"), String::from("0.0"), String::from("0.0"), String::from("0.0"), String::from("0.0"), String::from("0.0"), String::from("0"), String::from("0.0"), String::from("0"), String::from("0.0"), String::from("0.0"), String::from("0.0"), ]; let table_condition = Some("ENGINE = MEMORY"); new_table(&table_name, &initial_table, &table_condition) .await .unwrap(); insert_one_record(&table_name, &initial_columns, &initial_values) .await .unwrap(); Ok(()) } // for fetching 1m and 30m candle pub async fn fetch_candle_parallel( interval: &String, candle_map: &mut HashMap>, ) -> Result<(), Box> { let instant = Instant::now(); let fetch_table_name = String::from("valid_usdt_trades"); let column_name = String::from("symbol"); let condition = None; let mut symbols = Symbols { symbol: String::new(), }; let mut select_result = select_record(&fetch_table_name, &column_name, &condition, &symbols) .await .unwrap(); // 심볼들을 20개씩 청스로 나누어 테스크를 생성하고 각 태스크 별로 병렬로 처리한다. let chunks = select_result.chunks(20); let nbr_chunks = chunks.len(); let mut candle_vec_arc_wrapper: Vec>>>> = Vec::new(); for _ in 0..nbr_chunks { let mut candle_map_temp: HashMap> = HashMap::new(); let mut candle_vec_arc = Arc::new(Mutex::new(candle_map_temp)); candle_vec_arc_wrapper.push(candle_vec_arc); } let mut task_vec = Vec::new(); let mut index = 0; for chunk in chunks { let candle_arc = Arc::clone(&candle_vec_arc_wrapper[index]); let chunk_vec = chunk.to_vec(); let interval_clone = interval.clone(); task_vec.push(tokio::spawn(async move { repeat_task(interval_clone, chunk_vec, candle_arc).await; })); index += 1; } let result = try_join_all(task_vec).await; match result { Ok(T) => { let mut candle_buffer: HashMap> = HashMap::new(); for element in candle_vec_arc_wrapper { let a = element.lock().await.clone(); for (symbol, candle_data) in a { candle_buffer.insert(symbol, candle_data); } } *candle_map = candle_buffer; // println!(" candle {} 완료 elapsed:{:.2}s", interval.as_str(), instant.elapsed().as_secs_f32()); } Err(E) => { panic!("Failed to fetch candle data!") } } Ok(()) } async fn repeat_task( interval: String, symbol_vec: Vec, my_count: Arc>>>, ) -> Result<(), Box> { let client = ClientBuilder::new() .timeout(tokio::time::Duration::from_millis(5000)) .build() .unwrap(); for element in symbol_vec { let mut candle_set_lock = my_count.lock().await; request_candlestick_data(element.symbol, &interval, &client, &mut candle_set_lock).await; sleep(Duration::from_millis(200)).await; } Ok(()) } // for fetching 1d, 1w, and 1mon candle pub async fn fetch_candle_delay( interval: &String, candle_vec: &mut HashMap>, ) -> Result<(), Box> { let instant_func = Instant::now(); let server_epoch = server_epoch().await; let client = ClientBuilder::new() .timeout(tokio::time::Duration::from_millis(5000)) .build() .unwrap(); // to decide keeping retry or not as the current server status from database #[derive(Debug, FromRow, Clone)] struct ServerStatus { server_on: bool, } let serverhealth_table_name = String::from("serverhealth"); let serverhealth_column_name = String::from("server_on"); let serverhealth_condition = None; let mut serverhealth = ServerStatus { server_on: true }; // fetch the list of valid usdt trades #[derive(Debug, FromRow)] struct Symbols { symbol: String, } let fetch_table_name = String::from("valid_usdt_trades"); let column_name = String::from("symbol"); let condition = None; let mut symbols = Symbols { symbol: String::new(), }; let mut select_result = select_record(&fetch_table_name, &column_name, &condition, &symbols) .await .unwrap(); let mut time_wait = 0; if interval == "30m" { time_wait = 1_000; } else if interval == "1d" { time_wait = 5_000; } else if interval == "1w" { time_wait = 10_000; } else { time_wait = 15_000; } let mut server_on_result = select_record( &serverhealth_table_name, &serverhealth_column_name, &serverhealth_condition, &serverhealth, ) .await .unwrap(); serverhealth.server_on = server_on_result[0].server_on; for element in select_result { let instant = Instant::now(); server_on_result = select_record( &serverhealth_table_name, &serverhealth_column_name, &serverhealth_condition, &serverhealth, ) .await .unwrap(); serverhealth.server_on = server_on_result[0].server_on; { request_candlestick_data(element.symbol, &interval, &client, candle_vec).await; } // sleep for 10secs for 1d, 20secs for 1w, 30secs for 1mon if time_wait > instant.elapsed().as_millis() { sleep(Duration::from_millis( (time_wait - instant.elapsed().as_millis()) as u64, )) .await; } } // println!(" candle {} 완료 elapsed:{:.2}s", interval, instant_func.elapsed().as_secs()); Ok(()) } // pub async fn fetch_candle_1w() -> Result<(), Box> { // let server_epoch = server_epoch().await; // let client = ClientBuilder::new().timeout(tokio::time::Duration::from_millis(1200)).build().unwrap(); // // to decide keeping retry or not as the current server status from database // #[derive(Debug, FromRow, Clone)] // struct ServerStatus { // server_on: bool, // } // let serverhealth_table_name = String::from("serverhealth"); // let serverhealth_column_name = String::from("server_on"); // let serverhealth_condition = None; // let mut serverhealth = ServerStatus { server_on: true }; // // fetch the list of valid usdt trades // #[derive(Debug, FromRow)] // struct Symbols { // symbol: String, // } // let fetch_table_name = String::from("valid_usdt_trades"); // let column_name = String::from("symbol"); // let condition = None; // let mut symbols = Symbols { symbol: String::new() }; // let mut select_result = select_record(&fetch_table_name, &column_name, &condition, &symbols).await.unwrap(); // let interval = String::from("1w"); // let column_name = String::from("closeTime"); // let condition = Some(String::from("ORDER BY id DESC LIMIT 1")); // #[derive(Debug, FromRow)] // struct CloseTime { // closetime: i64, // } // let mut server_on_result = select_record(&serverhealth_table_name, &serverhealth_column_name, &serverhealth_condition, &serverhealth).await.unwrap(); // serverhealth.server_on = server_on_result[0].server_on; // let instant_1w = Instant::now(); // for element in select_result { // let instant = Instant::now(); // server_on_result = select_record(&serverhealth_table_name, &serverhealth_column_name, &serverhealth_condition, &serverhealth).await.unwrap(); // serverhealth.server_on = server_on_result[0].server_on; // if serverhealth.server_on == true { // let mut closetime = CloseTime { closetime: 0 }; // let mut table_name = String::from("candle_"); // table_name.push_str(element.symbol.as_str()); // table_name.push_str("_1w"); // let mut exists_result = exists_table(&table_name).await; // if exists_result == false { create_candle_table(&element.symbol, &interval).await.unwrap(); } // let mut select_result = select_record(&table_name, &column_name, &condition, &closetime).await.unwrap(); // if (select_result[0].closetime as u64) < server_epoch { // while let Err(e) = request_candlestick_data(&element.symbol, &interval, &client).await // { // server_on_result = select_record(&serverhealth_table_name, &serverhealth_column_name, &serverhealth_condition, &serverhealth).await.unwrap(); // serverhealth.server_on = server_on_result[0].server_on; // if serverhealth.server_on == false { break; } // println!(">>> retry to fetch candlestick {} {} data from endpoint.", &element.symbol, &interval); // sleep(Duration::from_millis(500)).await; // } // } // } // // sleep as much as the loop recurs per 20 seconds if all operation finished within 20 seconds. // if 20_000_000_000 > instant.elapsed().as_nanos() { // sleep(Duration::from_nanos((20_000_000_000 - instant.elapsed().as_nanos()) as u64)).await; // } // } // println!(" candle 1w 완료 elapsed:{:.2}s", instant_1w.elapsed().as_secs()); // Ok(()) // } // pub async fn fetch_candle_1mon() -> Result<(), Box> { // let server_epoch = server_epoch().await; // let client = ClientBuilder::new().timeout(tokio::time::Duration::from_millis(1200)).build().unwrap(); // // to decide keeping retry or not as the current server status from database // #[derive(Debug, FromRow, Clone)] // struct ServerStatus { // server_on: bool, // } // let serverhealth_table_name = String::from("serverhealth"); // let serverhealth_column_name = String::from("server_on"); // let serverhealth_condition = None; // let mut serverhealth = ServerStatus { server_on: true }; // // fetch the list of valid usdt trades // #[derive(Debug, FromRow)] // struct Symbols { // symbol: String, // } // let fetch_table_name = String::from("valid_usdt_trades"); // let column_name = String::from("symbol"); // let condition = None; // let mut symbols = Symbols { symbol: String::new() }; // let mut select_result = select_record(&fetch_table_name, &column_name, &condition, &symbols).await.unwrap(); // let interval = String::from("1mon"); // let column_name = String::from("closeTime"); // let condition = Some(String::from("ORDER BY id DESC LIMIT 1")); // #[derive(Debug, FromRow)] // struct CloseTime { // closetime: i64, // } // let mut server_on_result = select_record(&serverhealth_table_name, &serverhealth_column_name, &serverhealth_condition, &serverhealth).await.unwrap(); // serverhealth.server_on = server_on_result[0].server_on; // let instant_1mon = Instant::now(); // for element in select_result { // let instant = Instant::now(); // server_on_result = select_record(&serverhealth_table_name, &serverhealth_column_name, &serverhealth_condition, &serverhealth).await.unwrap(); // serverhealth.server_on = server_on_result[0].server_on; // if serverhealth.server_on == true { // let mut closetime = CloseTime { closetime: 0 }; // let mut table_name = String::from("candle_"); // table_name.push_str(element.symbol.as_str()); // table_name.push_str("_1mon"); // let mut exists_result = exists_table(&table_name).await; // if exists_result == false { create_candle_table(&element.symbol, &interval).await.unwrap(); } // let mut select_result = select_record(&table_name, &column_name, &condition, &closetime).await.unwrap(); // if (select_result[0].closetime as u64) < server_epoch { // while let Err(e) = request_candlestick_data(&element.symbol, &interval, &client).await // { // server_on_result = select_record(&serverhealth_table_name, &serverhealth_column_name, &serverhealth_condition, &serverhealth).await.unwrap(); // serverhealth.server_on = server_on_result[0].server_on; // if serverhealth.server_on == false { break; } // println!(">>> retry to fetch candlestick {} {} data from endpoint.", &element.symbol, &interval); // sleep(Duration::from_millis(500)).await; // } // } // } // // sleep as much as the loop recurs per 30 seconds if all operation finished within 30 seconds. // if 30_000_000_000 > instant.elapsed().as_nanos() { // sleep(Duration::from_nanos((30_000_000_000 - instant.elapsed().as_nanos()) as u64)).await; // } // } // println!(" candle 1mon 완료 elapsed:{:.2}s", instant_1mon.elapsed().as_secs()); // Ok(()) // } async fn server_epoch() -> u64 { #[derive(Debug, FromRow)] struct ServerEpoch { server_epoch: u64, } let table_name = String::from("time"); let columns = String::from("*"); let condition = None; let mut time_info = ServerEpoch { server_epoch: 0 }; let select_result = select_record(&table_name, &columns, &condition, &time_info) .await .unwrap(); select_result.first().unwrap().server_epoch }