# Good Luck
TP - Creer un outil de récupération de donnée
Objectifs de la session :
- Utiliser la programmation orientée objet (POO) pour structurer le code et encapsuler les méthodes
- Utiliser requests pour récupérer des données sur Internet
- Utiliser pandas pour réagréger ces données
- Ajouter une méthode asynchrone pour améliorer les performances
Sujets:
L’objectif est de créer une classe encapsulant la récupération de bars OHLCV depuis une API, en permettant une aggrégation à une fréquence différente de celle des données récupérées.
Rappel:
Pour rappel, les OHLCV signifie simplement Open, high, low, close, volume, il s’agit d’une des formes les plus standard d’aggrégation de données financiere de marché. En effet, pour n’importe quel marchés (dans le sens carnet d’ordre avec un asset et une currency, que ce soit du forex, des contrats, des crypto, des equity..), la donnée brute du marché est, d’une part couteuse auprès des provider, d’autres part difficile à utiliser et à comprendre facilement. En effet, la donnée la plus brute serait l’ensemble des ajouts, modfication et annulation d’ordre sur le marché, ce qui avec les marchés éléctroniques et le trading algorithmique représente une quantité colossalle par jour. Une version filtré de celle-ci est l’ensemble des transactions ayant eu lieue durant la journée, ce qui réduit très grandement la quantité de données mais reste très peu pratique. Les bars OHLCV sont une représentation aggrégé de cette donnée, car elle représente sur chaque période l’aggrégation par premier, dernier, max, min, somme…
API utilisée:
Pour le TP d’aujourd’hui nous allons utiliser les bars OHLCV de l’API de Binance, car celle-ci ne necessitent pas d’authentification, mais seulement une limite de requette par minute par IP. Les données peuvent être récupéré par requette get sur des urls construit comme: https://api.binance.com/api/v3/klines?symbol=BTCUSDT&interval=30m
la documentation se trouvant ici: https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data
Consignes:
Objectif final: - Pouvoir récupérer facilement pour n’importe quel symbol/liste de symbols les bars à la minute sur une période donnée - Pouvoir étendre cette période ensuite - Pouvoir recuperer des données aggrégés à une autre fréquence (>1minute) en aggregant les données déjà acquise - Ajouter une option pour effectuer la récupération de facon asynchrone
etape 1: Penser la classe: Construire une classe permettant de représenter le problème ci-dessus: - Poser d’abord sur papier / en brouillon de code la classe de base, ses attributes et ses méthodes - Penser à comment vous aller pouvoir l’utiliser ensuite
etape 2: Implementer le minimum necessaire et tester avec requests: - Avant de se lancer dans l’aggrégation et dans l’async, implementer de facon iterative en testant votre code
etape 3: Ajouter une méthode pour acceder au donnée récupéré avec une autre fréquence
etape 4: Creer une variance de votre méthode de récupération qui est asynchrone - Idéallement, vous avez séparé le code de sorte que chaque requette soit effectué dans une fonction - Ainsi il est possible d’utiliser asyncio.gather/asyncio.wait pour attendre de multiples coroutines
NB: async et notebook
Les notebook et un .py lancé normalement auront quelque différence dans le cas d’une utilisation asynchrone. En effet, dans un contexte standard (fichier .py), il est necessaire de créer une boucle d’evenement dans lequels les coroutines s’executeront. La façon standard de faire cela est:
import asyncio
# async and normal functions ....
async def main():
# what you would have put in __name__ == '__main__' usually
pass
if __name__ == '__main__':
asyncio.run(main())
<frozen genericpath>:89: RuntimeWarning: coroutine 'main' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
RuntimeError: asyncio.run() cannot be called from a running event loop
Ce qui est ci-dessus marche et est là façon la plus usuelle de lancer un code asynchrone dans un .py, mais dans un notebook vous obtiendrez une erreur:
RuntimeError: asyncio.run() cannot be called from a running event loop
En effet, le code s’execute déjà de façon asynchrone dans un notebook, et il n’y a pas besoin d’utiliser asyncio.run pour pouvoir utiliser await !
import asyncio
async def test(x):
print('enter test - waiting', x)
await asyncio.sleep(x)
return x
async def main():
= await test(2)
a = await test(1)
b print(a + b)
= asyncio.get_event_loop()
loop loop
<_UnixSelectorEventLoop running=True closed=False debug=False>
Il est donc possible d’attendre une fonction asynchrone directement en utilisant await:
await main()
enter test - waiting 2
enter test - waiting 1
3
Ce qui revient à l’ajouter à la boucle actuel
await loop.create_task(main())
enter test - waiting 2
enter test - waiting 1
3
Une méthode possible pour lancer de façon similaire votre code entre un .py et un notebook est de creer un processus séparé pour executer le code, comme ci-dessous par exemple:
%%time
import multiprocessing
def run_in_process():
def run():
= asyncio.new_event_loop()
loop
asyncio.run(main())
= multiprocessing.Process(target=run) #ou multiprocessing.Process(target=run, args=tuple d'argument) si vous avez des arguments dans votre fonction run
process # démarre le processus
process.start() # l'attends --> rend le lancement de run_in_process bloquant, i.e. vous ne pouvez pas en lancer plusieurs à la fois
process.join()
run_in_process()
run_in_process() run_in_process()
enter test - waiting 2
enter test - waiting 1
3
enter test - waiting 2
enter test - waiting 1
3
enter test - waiting 2
enter test - waiting 1
3
CPU times: user 14.6 ms, sys: 18.3 ms, total: 32.9 ms
Wall time: 9.07 s
En enlevant le join des process on attend plus les process et l’on execute les 3 boucles d’evenement en même temps
%%time
def run_in_process():
def run():
= asyncio.new_event_loop()
loop
asyncio.run(main())
= multiprocessing.Process(target=run)
process
process.start() #process.join() # Ici l'on attend pas la fin du process
run_in_process()
run_in_process() run_in_process()
enter test - waiting 2CPU times: user 3.4 ms, sys: 12 ms, total: 15.4 ms
Wall time: 13.4 ms
enter test - waiting 2enter test - waiting 2
enter test - waiting enter test - waiting1
1
enter test - waiting 1
33
3
Mais l’on voit que %%time n’affiche presque rien, en effet on ne verifie pas que les processus soit terminé avant de passer à la suite, on peut faire cela comme suit:
%%time
def run_in_process():
def run():
= asyncio.new_event_loop()
loop
asyncio.run(main())
= multiprocessing.Process(target=run)
process
process.start() #process.join() # Ici l'on attend pas la fin du process
return process
= []
list_processes
list_processes.append(run_in_process())
list_processes.append(run_in_process())
list_processes.append(run_in_process())for p in list_processes:
p.join()
enter test - waiting 2enter test - waiting
2
enter test - waiting 2
enter test - waiting 1enter test - waiting
1
enter test - waiting 1
3
3
3
CPU times: user 5.22 ms, sys: 19.8 ms, total: 25.1 ms
Wall time: 3.02 s
C’est peu ou prou la meme chose avec l’async - il faut bien penser à l’endroit où l’on va synchroniser les différents evenement
Petite note : %%time empeche d’utiliser normalement await dans une cellule
%%time
import asyncio
async def test(x):
print('enter test - waiting', x)
await asyncio.sleep(x)
return x
async def main():
for k in range(5):
await test(k)
await main()
SyntaxError: 'await' outside function (<timed exec>, line 12)
## Petit cadeau, voici un petit décorateur permettant de lancer dans un process la fonction async selectionné, permettant ainsi d'utiliser %%time de facon simple
def run_in_process(async_func):
def wrapper(*args, **kwargs):
def run():
= asyncio.new_event_loop()
loop *args, **kwargs))
asyncio.run(async_func(= multiprocessing.Process(target=run)
process
process.start()
process.join() return wrapper
%%time
async def test(x):
print('enter test - waiting', x)
await asyncio.sleep(x)
return x
@run_in_process
async def main(k_max=3):
for k in range(k_max):
await test(k)
=4) main(k_max
enter test - waiting 0
enter test - waiting 1
enter test - waiting 2
enter test - waiting 3
CPU times: user 8.45 ms, sys: 8.05 ms, total: 16.5 ms
Wall time: 6.03 s
On voit cependant au dessus que l’on a attendu 1+2+3 second - pas TOP
Il faut en fait utiliser gather par exemple pour pouvoir executer plusieurs coroutine en meme temps et les attendre
%%time
async def test(x):
print('enter test - waiting', x)
await asyncio.sleep(x)
return x
@run_in_process
async def main(k_max=3):
= []
coros for k in range(k_max):
coros.append(test(k))await asyncio.gather(*coros)
=4) main(k_max
enter test - waiting 0
enter test - waiting 1
enter test - waiting 2
enter test - waiting 3
CPU times: user 0 ns, sys: 13.2 ms, total: 13.2 ms
Wall time: 3.03 s
Enfin, regardons pourquoi l’on utilisait asyncio.sleep à la place de time.sleep
%%time
import time
async def test(x):
print('enter test - waiting', x)
time.sleep(x)return x
@run_in_process
async def main(k_max=3):
= []
coros for k in range(k_max):
coros.append(test(k))await asyncio.gather(*coros)
=4) main(k_max
enter test - waiting 0
enter test - waiting 1
enter test - waiting 2
enter test - waiting 3
CPU times: user 0 ns, sys: 13.9 ms, total: 13.9 ms
Wall time: 6.02 s
time.sleep est une fonction bloquante, cela veut dire que l’on ne peux pas effectuer d’autre coroutine en attendant que time.sleep s’execute !
C’est le cas pour toute fonction bloquante !
Cependant il est possible de deleguer une fonction bloquante à un executor (qui est simplement un autre thread qui va se charger d’executer le code de la fonction)
Avec cette option n’importe quel code peut être rendu asynchrone Toutefois si le code est CPU bound - c’est à dire que contrairement à sleep il demande au thread d’effectuer des taches - alors cela aura peut d’interet !
%%time
def test(x):
print('enter test - waiting', x)
time.sleep(x)return x
async def atest(x):
= asyncio.get_event_loop()
loop = lambda : test(x)
f await loop.run_in_executor(None, f)
@run_in_process
async def main(k_max=3):
= []
coros = asyncio.get_event_loop()
loop for k in range(k_max):
coros.append(atest(k))await asyncio.gather(*coros)
=4) main(k_max
enter test - waitingenter test - waitingenter test - waitingenter test - waiting 203
1
CPU times: user 1.69 ms, sys: 11.7 ms, total: 13.4 ms
Wall time: 3.02 s