Source code for ethereum_gasprice.providers.ethgasstation_provider

from typing import Dict, Optional, Tuple

from ethereum_gasprice.consts import GaspriceStrategy
from ethereum_gasprice.providers.base import BaseAsyncAPIGaspriceProvider, BaseSyncAPIGaspriceProvider

__all__ = ["EthGasStationProvider", "AsyncEthGasStationProvider"]


[docs]class EthGasStationProvider(BaseSyncAPIGaspriceProvider): """Provider for Eth Gas Station (https://ethgasstation.info/)""" title: str = "ethgasstation" api_url: str = "https://ethgasstation.info/api/ethgasAPI.json" secret_env_var_title: str = "ETHGASPRICE_ETHGASSTATION_SECRET"
[docs] def request(self) -> Tuple[bool, dict]: """Make request to API.""" try: response = self.client.get(url=self.api_url, params={"api-key": self.get_secret()}) response_data = response.json() if response.status_code == 200: return True, response_data except Exception: pass return False, {}
def _proceed_response_data(self, response_data: dict) -> Dict[GaspriceStrategy, Optional[int]]: """Unify data from response.""" data = self._data_template.copy() if not response_data: return data data.update( { GaspriceStrategy.SLOW: response_data.get("safeLow"), GaspriceStrategy.REGULAR: response_data.get("average"), GaspriceStrategy.FAST: response_data.get("fast"), GaspriceStrategy.FASTEST: response_data.get("fastest"), } ) for k, v in data.items(): data[k] = int(v) // 10 if v else None return data
[docs]class AsyncEthGasStationProvider(BaseAsyncAPIGaspriceProvider, EthGasStationProvider):
[docs] async def request(self) -> Tuple[bool, dict]: """Make request to API.""" try: response = await self.client.get(url=self.api_url, params={"api-key": self.get_secret()}) response_data = response.json() if response.status_code == 200: return True, response_data except Exception: pass return False, {}