use crate::database_control::*; use hex::ToHex; use hmac_sha256::HMAC; use reqwest::{Client, ClientBuilder, Response}; use rust_decimal::{prelude::FromPrimitive, prelude::ToPrimitive, Decimal, RoundingStrategy}; use serde::Deserialize; use serde_json::Value; use sqlx::{Error, FromRow}; use std::borrow::{Borrow, BorrowMut}; use std::collections::HashSet; use std::sync::Arc; use std::collections::HashMap; use tokio::{join, sync::Mutex, time::*}; use log; #[derive(Debug, Clone)] pub struct TradeFee { pub makercommission: Decimal, pub takercommission: Decimal, } impl TradeFee { fn new() -> TradeFee { let a = TradeFee { makercommission: Decimal::new(0, 8), takercommission: Decimal::new(0, 8), }; a } } #[derive(Debug, FromRow, Clone)] pub struct ExchangeInfo { pub stepsize: Decimal, pub ticksize: Decimal, pub base_asset_precision: u32, pub base_commission_precision: u32, pub quote_asset_precision: u32, pub quote_commission_precision: u32, } impl ExchangeInfo { fn new() -> ExchangeInfo { let a = ExchangeInfo { stepsize: Decimal::new(0, 8), ticksize: Decimal::new(0, 8), base_asset_precision: 0, base_commission_precision: 0, quote_asset_precision: 0, quote_commission_precision: 0, }; a } } // request all coin price (/api, Weight(IP) 2) // request_all_coin_price -> de_all_coin_price_json -> store_coin_price_db // pub async fn request_all_coin_price(client: &Client, price_vec: &Arc>>) -> Result<(), Box> { pub async fn request_all_coin_price( client: &Client, price_vec: &mut HashMap, ) -> Result<(), Box> { let url = "https://api.binance.com/api/v3/ticker/price"; let mut response = client.get(url).send().await?; let mut body = response.text_with_charset("utf-8").await?; de_all_coin_price_json(&body, price_vec).await?; Ok(()) } async fn de_all_coin_price_json( body: &String, price_vec: &mut HashMap, ) -> Result<(), Box> { let v: Value = serde_json::from_str(body.as_str())?; let into_vec = v.as_array(); if into_vec == None { return Err("Err")?; } let mut object_map = &serde_json::map::Map::new(); let mut symbol = String::new(); let mut price = 0.0; for element in into_vec.unwrap() { object_map = element.as_object().unwrap(); let mut object_map_iter = object_map.iter(); for element in object_map_iter { match element.0.as_str() { "symbol" => symbol = element.1.as_str().unwrap().to_string(), "price" => price = element.1.as_str().unwrap().parse::().unwrap(), _ => { log::error!("Elements in body msg are changed. Please update both your coinprices table and vectors."); } } price_vec.insert(symbol.clone(), price); } } Ok(()) } // request trade fee (/sapi, Weight(IP) 1) // request_trade_fee -> deserialization_trade_fee_json -> save_db_trade_fee pub async fn request_trade_fee( api_key: &str, secret_key: &str, local_epoch: u128, difference_epoch: i64, client: &Client, tradefee_vec: &mut HashMap, ) -> Result<(), Box> { let mut base_url = String::from("https://api.binance.com/sapi/v1/asset/tradeFee?"); // local 시간이 server 시간보다 너무 앞서거나 뒤쳐지면 USER_DATA를 요청할 수 없으므로 local과 server 의 시간 차이만큼을 local에서 빼어 보정한 뒤 // 이를 timestamp로 사용한다. let mut timestamp; if difference_epoch >= 0 { timestamp = (local_epoch - difference_epoch as u128).to_string(); } else if difference_epoch < 0 { timestamp = (local_epoch - (difference_epoch * -1) as u128).to_string(); } else { timestamp = local_epoch.to_string(); } let recv_window_size = "20000".to_string(); // default: 5,000ms, Max: 60,000ms let mut query_string = String::from("×tamp="); query_string.push_str(×tamp); query_string.push_str("&recvWindow="); query_string.push_str(&recv_window_size); let signature = HMAC::mac(&query_string.as_bytes(), secret_key.as_bytes()); query_string.push_str("&signature="); base_url.push_str(&query_string); base_url.push_str(signature.encode_hex::().as_str()); let mut response = client .get(&base_url) .header("X-MBX-APIKEY", api_key) .send() .await?; while !response.status().is_success() { sleep(Duration::from_secs(5)).await; response = client .get(&base_url) .header("X-MBX-APIKEY", api_key) .send() .await?; } let mut body = response.text_with_charset("utf-8").await; match body { Ok(T) => { de_trade_fee_json(&T, tradefee_vec).await?; // println!("tradefee 완료"); } Err(E) => { log::warn!("request_trade_fee body failed!: {:?}", E); } } Ok(()) } async fn de_trade_fee_json( body: &String, tradefee_map: &mut HashMap, ) -> Result<(), Box> { let v: Value = serde_json::from_str(body.as_str())?; let into_vec = v.as_array(); if into_vec == None { return Err("Err")?; } let mut object_map = &serde_json::map::Map::new(); let mut tradefee_map_build: HashMap = HashMap::new(); let mut symbol = String::new(); let mut tradefee_data = TradeFee::new(); for element in into_vec.unwrap() { object_map = element.as_object().unwrap(); let mut object_map_iter = object_map.iter(); for element in object_map_iter { match element.0.as_str() { "symbol" => symbol = element.1.as_str().unwrap().to_string(), "makerCommission" => { tradefee_data.makercommission = rust_decimal::prelude::FromStr::from_str(element.1.as_str().unwrap()) .unwrap() } "takerCommission" => { tradefee_data.takercommission = rust_decimal::prelude::FromStr::from_str(element.1.as_str().unwrap()) .unwrap() } _ => { log::error!("Elements in body msg are changed. Please update both your tradefees table and vectors."); } } } tradefee_map_build.insert(symbol.clone(), tradefee_data.clone()); } *tradefee_map = tradefee_map_build; Ok(()) } // request 24hr Ticker Price Change Statistics (Weight(IP) 1 for a single symbol, 40 when the symbol parameter is omitted) // request_24hr_ticker_price_change_statistics -> deserialization_24hr_ticker_price_change_statistics_json -> pub async fn request_24hr_ticker_price_change_statistics( client: &Client, ) -> Result<(), Box> { let mut response = client .get("https://api.binance.com/api/v3/ticker/24hr") .send() .await; match response { Ok(T) => { let mut body = T.text_with_charset("utf-8").await; match body { Ok(T) => { de_24h_change_json(&T).await?; // println!("24h change 완료"); } Err(E) => { log::warn!( "request_24hr_ticker_price_change_statistics body failed!: {:?}", E ); } } } Err(E) => { log::warn!( "request_24hr_ticker_price_change_statistics response failed!: {:?}", E ); } } Ok(()) } async fn de_24h_change_json(body: &String) -> Result<(), Box> { let v: Value = serde_json::from_str(body.as_str())?; let mut into_vec = v.as_array(); if into_vec == None { return Err("Err")?; } let mut object_map = &serde_json::map::Map::new(); let columns = vec![ "symbol", "priceChange", "priceChangePercent", "weightedAvgPrice", "prevClosePrice", "lastPrice", "lastQty", "bidPrice", "bidQty", "askPrice", "askQty", "openPrice", "highPrice", "lowPrice", "volume", "quoteVolume", "openTime", "closeTime", "firstId", "lastId", "count", ]; let mut value_wrapper: Vec> = Vec::new(); for element in into_vec.unwrap() { object_map = element.as_object().unwrap(); let mut value_vec = vec![String::new(); 21]; let mut object_map_iter = object_map.iter(); for element in object_map_iter { match element.0.as_str() { "symbol" => value_vec[0] = element.1.as_str().unwrap().to_string(), "priceChange" => value_vec[1] = element.1.as_str().unwrap().to_string(), "priceChangePercent" => value_vec[2] = element.1.as_str().unwrap().to_string(), "weightedAvgPrice" => value_vec[3] = element.1.as_str().unwrap().to_string(), "prevClosePrice" => value_vec[4] = element.1.as_str().unwrap().to_string(), "lastPrice" => value_vec[5] = element.1.as_str().unwrap().to_string(), "lastQty" => value_vec[6] = element.1.as_str().unwrap().to_string(), "bidPrice" => value_vec[7] = element.1.as_str().unwrap().to_string(), "bidQty" => value_vec[8] = element.1.as_str().unwrap().to_string(), "askPrice" => value_vec[9] = element.1.as_str().unwrap().to_string(), "askQty" => value_vec[10] = element.1.as_str().unwrap().to_string(), "openPrice" => value_vec[11] = element.1.as_str().unwrap().to_string(), "highPrice" => value_vec[12] = element.1.as_str().unwrap().to_string(), "lowPrice" => value_vec[13] = element.1.as_str().unwrap().to_string(), "volume" => value_vec[14] = element.1.as_str().unwrap().to_string(), "quoteVolume" => value_vec[15] = element.1.as_str().unwrap().to_string(), "openTime" => value_vec[16] = element.1.as_i64().unwrap().to_string(), "closeTime" => value_vec[17] = element.1.as_i64().unwrap().to_string(), "firstId" => value_vec[18] = element.1.as_i64().unwrap().to_string(), "lastId" => value_vec[19] = element.1.as_i64().unwrap().to_string(), "count" => value_vec[20] = element.1.as_i64().unwrap().to_string(), _ => { log::error!("Elements in body msg are changed. Please update both your 24h_change table and vectors."); } } } value_wrapper.push(value_vec); } store_24h_change_db(columns, value_wrapper).await?; Ok(()) } async fn store_24h_change_db( columns: Vec<&str>, value_wrapper: Vec>, ) -> Result<(), Box> { let table_name = String::from("all_24h_change"); delete_all_rows(&table_name).await?; insert_records(&table_name, &columns, &value_wrapper).await?; Ok(()) } // request exchange information. (/api, Weight(IP) 10) pub async fn request_exchange_infomation( client: &Client, exchange_info_map: &mut HashMap, ) -> Result<(), Box> { // building URL and API-keys let mut url = String::new(); url.push_str("https://api.binance.com"); let endpoint_url = "/api/v3/exchangeInfo"; url.push_str(endpoint_url); let mut res = client.get(&url).send().await?; while !res.status().is_success() { sleep(Duration::from_secs(5)).await; res = client.get(&url).send().await?; } let body = res.text_with_charset("utf-8").await?; // deserialize JSON let v: Value = serde_json::from_str(body.as_str())?; let mut into_vec = v.as_object(); if into_vec == None { return Err("Err")?; } let mut symbol = String::new(); let mut exchange_info = ExchangeInfo::new(); let mut data_map_temp: HashMap = HashMap::new(); for element in into_vec.unwrap() { if element.0.contains("symbols") { for element in element.1.as_array().unwrap() { if element.is_object() { if element .get("symbol") .unwrap() .as_str() .unwrap() .ends_with("USDT") { symbol = (element.get("symbol").unwrap().as_str().unwrap().to_string()); exchange_info.base_asset_precision = (element.get("baseAssetPrecision").unwrap().as_u64().unwrap()) as u32; exchange_info.base_commission_precision = (element .get("baseCommissionPrecision") .unwrap() .as_u64() .unwrap()) as u32; exchange_info.quote_asset_precision = (element .get("quoteAssetPrecision") .unwrap() .as_u64() .unwrap()) as u32; exchange_info.quote_commission_precision = (element .get("quoteCommissionPrecision") .unwrap() .as_u64() .unwrap()) as u32; for element in element.get("filters").unwrap().as_array().unwrap() { if element .as_object() .unwrap() .get("filterType") .unwrap() .as_str() .unwrap() .starts_with("LOT_SIZE") { exchange_info.stepsize = rust_decimal::prelude::FromStr::from_str( element.get("stepSize").unwrap().as_str().unwrap(), ) .unwrap(); } else if element .as_object() .unwrap() .get("filterType") .unwrap() .as_str() .unwrap() .starts_with("PRICE_FILTER") { exchange_info.ticksize = rust_decimal::prelude::FromStr::from_str( element.get("tickSize").unwrap().as_str().unwrap(), ) .unwrap(); } } data_map_temp.insert(symbol.clone(), exchange_info.clone()); } } } } } *exchange_info_map = data_map_temp; Ok(()) } pub async fn request_delist_symbols( api_key: &str, secret_key: &str, local_epoch: u128, difference_epoch: i64, client: &Client ) -> Result<(), Box> { let mut base_url = String::from("https://api.binance.com/sapi/v1/spot/delist-schedule?"); // local 시간이 server 시간보다 너무 앞서거나 뒤쳐지면 USER_DATA를 요청할 수 없으므로 local과 server 의 시간 차이만큼을 local에서 빼어 보정한 뒤 // 이를 timestamp로 사용한다. let mut timestamp; if difference_epoch >= 0 { timestamp = (local_epoch - difference_epoch as u128).to_string(); } else if difference_epoch < 0 { timestamp = (local_epoch - (difference_epoch * -1) as u128).to_string(); } else { timestamp = local_epoch.to_string(); } let recv_window_size = "20000".to_string(); // default: 5,000ms, Max: 60,000ms let mut query_string = String::from("×tamp="); query_string.push_str(×tamp); query_string.push_str("&recvWindow="); query_string.push_str(&recv_window_size); let signature = HMAC::mac(&query_string.as_bytes(), secret_key.as_bytes()); query_string.push_str("&signature="); base_url.push_str(&query_string); base_url.push_str(signature.encode_hex::().as_str()); let mut response = client .get(&base_url) .header("X-MBX-APIKEY", api_key) .send() .await?; while !response.status().is_success() { sleep(Duration::from_secs(5)).await; response = client .get(&base_url) .header("X-MBX-APIKEY", api_key) .send() .await?; } let mut body = response.text_with_charset("utf-8").await; match body { Ok(T) => { de_deilst_symbol_json(&T).await?; } Err(E) => { log::warn!("request_delist_symbols body failed!: {:?}", E); } } Ok(()) } async fn de_deilst_symbol_json( body: &String, ) -> Result<(), Box> { let v: Value = serde_json::from_str(body.as_str())?; let into_vec = v.as_array(); if into_vec == None { return Err("Err")?; } let mut object_map = &serde_json::map::Map::new(); let mut delist_hashset: HashSet = HashSet::new(); let mut symbol = String::new(); for element in into_vec.unwrap() { object_map = element.as_object().unwrap(); let mut object_map_iter = object_map.iter(); for element in object_map_iter { match element.0.as_str() { "delistTime" => { }, "symbols" => { if let Some(array) = element.1.as_array() { for delist_symbol in array { if let Some(symbol) = delist_symbol.as_str() { if symbol.ends_with("USDT") && !delist_hashset.contains(symbol) { delist_hashset.insert(symbol.to_string()); } } } } }, _ => { log::error!("Elements in body msg are changed. Please update both your delist table and vectors."); } } } } if delist_hashset.len() != 0 { #[derive(Debug, FromRow)] struct Symbols { symbol: String, } let table_name = String::from("banned_usdt_trades"); let column_name = String::from("symbol"); let condition = None; let mut symbols = Symbols { symbol: String::new(), }; let mut select_result = try_select_record(&table_name, &column_name, &condition, &symbols) .await .unwrap(); let mut banned_usdt_trades_set: HashSet = HashSet::new(); for element in select_result { banned_usdt_trades_set.insert(element.symbol.clone()); } let insert_column = vec!["symbol"]; for element in &delist_hashset { if !banned_usdt_trades_set.contains(element) { let insert_values = vec![element.clone()]; insert_one_record(&table_name, &insert_column, &insert_values) .await .unwrap(); } } } Ok(()) }