from typing import List, Any, Union
import time
import json
import pandas as pd
import time
from tqdm import tqdm
import asyncio
import aiohttp
import requests
from IPython.display import display
import multiprocessing
from functools import wraps, reduce
def in_notebook():
try:
from IPython import get_ipython
if 'IPKernelApp' not in get_ipython().config: # pragma: no cover
return False
except ImportError:
return False
except AttributeError:
return False
return True
def run_in_process(async_func):
# if in_notebook():
# return async_func
@wraps(async_func)
def wrapper(*args, **kwargs):
def run():
loop = asyncio.new_event_loop()
asyncio.run(async_func(*args, **kwargs))
process = multiprocessing.Process(target=run)
process.start()
process.join()
return wrapper
def sync_wrap(func):
@wraps(func)
async def wrapped_func(*args, **kwargs):
loop = asyncio.get_event_loop()
f = lambda : func(*args, **kwargs)
return await loop.run_in_executor(None, f)
return wrapped_func
class DataGatherer:
interval='1m'
def __init__(self, symbol: str, start_time: str = '20230101', end_time: str = '20230131', cache_folder=''):
self.symbol = symbol
self.start_time = pd.Timestamp(start_time, tz = 'UTC')
self.end_time = pd.Timestamp(end_time, tz = 'UTC')
delta = (self.end_time - self.start_time)
n_period = int(delta.total_seconds() / 60 / 1000) + 1
self.start_times = [self.start_time + pd.Timedelta(minutes=1000) * iter for iter in range(n_period)]
def _load_underlying_data(self):
# load all the bars
pass
def _make_url(self, start_time: pd.Timestamp):
pass
def _get_data(self):
pass
@staticmethod
def group_df(df: pd.DataFrame, col: str, group_size: str = '1H') -> pd.DataFrame:
"""
Groups the DataFrame based on the specified time interval and aggregates the data.
The function supports grouping by hours ('H'), minutes ('m'), or days ('d'). It aggregates
the data to get the first value of 'open', maximum of 'high', minimum of 'low', last value of
'close', and the sum of 'volume' for each group.
Args:
df (pd.DataFrame): The DataFrame to be grouped.
col (str): The column name in the DataFrame that contains the datetime information.
group_size (str): The size of the grouping interval, e.g., '1H', '30m', '1d'.
Returns:
pd.DataFrame: A DataFrame grouped and aggregated based on the specified interval.
Raises:
ValueError: If the group size is not defined properly.
"""
if group_size[-1] == 'H':
df['group'] = df[col].apply(lambda x: x.replace(hour=x.hour - x.hour % int(group_size[:-1]), minute=0, second=0))
elif group_size[-1] == 'm':
df['group'] = df[col].apply(lambda x: x.replace(minute=x.minute - x.minute % int(group_size[:-1]), second=0))
elif group_size[-1] == 'd':
df['group'] = df[col].apply(lambda x: x.replace(day=x.day - x.day % int(group_size[:-1]), hour=0, minute=0, second=0))
else:
raise ValueError("Undefined group size")
return df.groupby('group').agg({'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum', 'quote asset volume': 'sum'})
def get_data(self, start_time: str = None, end_time: str = None, interval: str = None):
start_time = start_time or self.start_time
end_time = end_time or self.end_time
interval = interval or self.interval
view = self.dataframe[self.dataframe['open time'].apply(lambda x: x<end_time and x>start_time)]
return DataGatherer.group_df(view, 'open time', interval)
class BinanceSpotDataGatherer(DataGatherer):
url = "https://api.binance.com/api/v3/klines"
def make_url(self, start_time):
limit = min(1000, (self.end_time - start_time).value)
return f'https://api.binance.com/api/v3/klines?symbol={self.symbol}&interval={self.interval}&startTime={start_time}&limit=1000'
def _load_underlying_data(self):
all_data = []
for start in tqdm(self.start_times):
data = self._get_data(self._make_url(start))
all_data += data
self.dataframe = BinanceSpotDataGatherer.data_to_df(all_data)
async def _aload_underlying_data(self):
coros = []
for start in tqdm(self.start_times):
coros.append(self._aget_data(self._make_url(start)))
datas = await asyncio.gather(*coros)
all_data = []
for data in datas:
all_data += data
self.dataframe = BinanceSpotDataGatherer.data_to_df(all_data)
def _make_url(self, start_time: pd.Timestamp):
limit = min(1000, int((self.end_time - self.start_time).total_seconds() / 60) + 1)
return f'https://api.binance.com/api/v3/klines?symbol={self.symbol}&startTime={int(start_time.timestamp() * 1000)}&interval={self.interval}&limit={limit}'
def _get_data(self, url, max_retry = 4, current_retry = 0):
if current_retry >= max_retry:
raise ValueError('Too much error')
response = requests.get(url)
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
time.sleep(10)
return self._get_data(url, max_retry=max_retry, current_retry=current_retry+1)
else:
raise ValueError(f'Unknow status code {response.status_code}')
async def _aget_data(self, url):
async with aiohttp.ClientSession() as session:
try:
async with session.get(url) as response:
response.raise_for_status()
data_asynchrone = await response.json()
return data_asynchrone
except Exception as e:
raise e
@staticmethod
def data_to_df(data):
df = pd.DataFrame(
data,
columns=[
"Open time",
"Open",
"High",
"Low",
"Close",
"Volume",
"Close time",
"Quote asset volume",
"Number of trades",
"Taker buy base asset volume",
"Taker buy quote asset volume",
"Ignore",
],
).astype(float)
df["Open time"] = (
pd.to_datetime(df["Open time"], unit="ms")
.dt.tz_localize("UTC")
.dt.floor("us")
)
df["Close time"] = (
pd.to_datetime(df["Close time"], unit="ms")
.dt.tz_localize("UTC")
.dt.floor("us")
)
df.columns = [c.lower() for c in df.columns]
df = df.drop_duplicates()
return df
class BinanceSpotDataGathererSmart(BinanceSpotDataGatherer):
async def _load_underlying_data(self):
self.dataframe = BinanceSpotDataGatherer.data_to_df(
reduce(lambda x, y: list(x) + list(y),
await asyncio.gather(*[
self._get_data(
self._make_url(start)
) for start in self.start_times
])))
@sync_wrap
def _get_data(self, url, max_retry = 4, current_retry = 0):
if current_retry >= max_retry:
raise ValueError('Too much error')
response = requests.get(url)
used_weight = int(response.headers['x-mbx-used-weight-1m'])
if used_weight > 800:
print(f'warning: {used_weight} used_weight!')
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
print('warning, received a 429 error - used_weight= {used_weight} - retrying in 60 seconds')
time.sleep(60)
return self._get_data(url, max_retry=max_retry, current_retry=current_retry+1)
else:
raise ValueError(f'Unknow status code {response.status_code}')
@run_in_process
async def test():
t1 = time.time()
btc_loader = BinanceSpotDataGathererSmart('BTCUSDT')
await btc_loader._load_underlying_data()
print(time.time()-t1)
display(btc_loader.get_data(interval='1H'))
t1 = time.time()
eth_loader = BinanceSpotDataGathererSmart('ETHUSDT')
await eth_loader._aload_underlying_data()
print(time.time()-t1)
display(eth_loader.get_data(interval='1H'))
test()