weathercom.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. import json
  2. import re
  3. from datetime import datetime
  4. from html.parser import HTMLParser
  5. from urllib.request import Request, urlopen
  6. from i3pystatus.core.util import internet, require
  7. from i3pystatus.weather import WeatherBackend
  8. USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64; rv:66.0) Gecko/20100101 Firefox/66.0'
  9. class WeathercomHTMLParser(HTMLParser):
  10. '''
  11. Obtain data points required by the Weather.com API which are obtained
  12. through some other source at runtime and added as <script> elements to the
  13. page source.
  14. '''
  15. def __init__(self, logger):
  16. self.logger = logger
  17. super(WeathercomHTMLParser, self).__init__()
  18. def get_weather_data(self, url):
  19. self.logger.debug('Making request to %s to retrieve weather data', url)
  20. self.weather_data = None
  21. req = Request(url, headers={'User-Agent': USER_AGENT})
  22. with urlopen(req) as content:
  23. try:
  24. content_type = dict(content.getheaders())['Content-Type']
  25. charset = re.search(r'charset=(.*)', content_type).group(1)
  26. except AttributeError:
  27. charset = 'utf-8'
  28. html = content.read().decode(charset)
  29. try:
  30. self.feed(html)
  31. except Exception:
  32. self.logger.exception(
  33. 'Exception raised while parsing forecast page',
  34. exc_info=True
  35. )
  36. def load_json(self, json_input):
  37. self.logger.debug('Loading the following data as JSON: %s', json_input)
  38. try:
  39. return json.loads(json_input)
  40. except json.decoder.JSONDecodeError as exc:
  41. self.logger.debug('Error loading JSON: %s', exc)
  42. self.logger.debug('String that failed to load: %s', json_input)
  43. return None
  44. def handle_data(self, content):
  45. '''
  46. Sometimes the weather data is set under an attribute of the "window"
  47. DOM object. Sometimes it appears as part of a javascript function.
  48. Catch either possibility.
  49. '''
  50. if self.weather_data is not None:
  51. # We've already found weather data, no need to continue parsing
  52. return
  53. content = content.strip().rstrip(';')
  54. try:
  55. tag_text = self.get_starttag_text().lower()
  56. except AttributeError:
  57. tag_text = ''
  58. if tag_text.startswith('<script'):
  59. # Look for feed information embedded as a javascript variable
  60. begin = content.find('window.__data')
  61. if begin != -1:
  62. self.logger.debug('Located window.__data')
  63. # Look for end of JSON dict and end of javascript statement
  64. end = content.find('};', begin)
  65. if end == -1:
  66. self.logger.debug('Failed to locate end of javascript statement')
  67. else:
  68. # Strip the "window.__data=" from the beginning
  69. json_data = self.load_json(
  70. content[begin:end + 1].split('=', 1)[1].lstrip()
  71. )
  72. if json_data is not None:
  73. def _find_weather_data(data):
  74. '''
  75. Helper designed to minimize impact of potential
  76. structural changes to this data.
  77. '''
  78. if isinstance(data, dict):
  79. if 'Observation' in data and 'DailyForecast' in data:
  80. return data
  81. else:
  82. for key in data:
  83. ret = _find_weather_data(data[key])
  84. if ret is not None:
  85. return ret
  86. return None
  87. weather_data = _find_weather_data(json_data)
  88. if weather_data is None:
  89. self.logger.debug(
  90. 'Failed to locate weather data in the '
  91. 'following data structure: %s', json_data
  92. )
  93. else:
  94. self.weather_data = weather_data
  95. return
  96. for line in content.splitlines():
  97. line = line.strip().rstrip(';')
  98. if line.startswith('var adaptorParams'):
  99. # Strip off the "var adaptorParams = " from the beginning,
  100. # and the javascript semicolon from the end. This will give
  101. # us JSON that we can load.
  102. weather_data = self.load_json(line.split('=', 1)[1].lstrip())
  103. if weather_data is not None:
  104. self.weather_data = weather_data
  105. return
  106. class Weathercom(WeatherBackend):
  107. '''
  108. This module gets the weather from weather.com. The ``location_code``
  109. parameter should be set to the location code from weather.com. To obtain
  110. this code, search for your location on weather.com, and when you go to the
  111. forecast page, the code you need will be everything after the last slash in
  112. the URL (e.g. ``94107:4:US``).
  113. .. _weather-usage-weathercom:
  114. .. rubric:: Usage example
  115. .. code-block:: python
  116. from i3pystatus import Status
  117. from i3pystatus.weather import weathercom
  118. status = Status(logfile='/home/username/var/i3pystatus.log')
  119. status.register(
  120. 'weather',
  121. format='{condition} {current_temp}{temp_unit}[ {icon}][ Hi: {high_temp}][ Lo: {low_temp}][ {update_error}]',
  122. interval=900,
  123. colorize=True,
  124. hints={'markup': 'pango'},
  125. backend=weathercom.Weathercom(
  126. location_code='94107:4:US',
  127. units='imperial',
  128. update_error='<span color="#ff0000">!</span>',
  129. ),
  130. )
  131. status.run()
  132. See :ref:`here <weather-formatters>` for a list of formatters which can be
  133. used.
  134. '''
  135. settings = (
  136. ('location_code', 'Location code from www.weather.com'),
  137. ('units', '\'metric\' or \'imperial\''),
  138. ('update_error', 'Value for the ``{update_error}`` formatter when an '
  139. 'error is encountered while checking weather data'),
  140. )
  141. required = ('location_code',)
  142. location_code = None
  143. units = 'metric'
  144. update_error = '!'
  145. url_template = 'https://weather.com/{locale}/weather/today/l/{location_code}'
  146. # This will be set in the init based on the passed location code
  147. forecast_url = None
  148. def init(self):
  149. if self.location_code is not None:
  150. # Ensure that the location code is a string, in the event that a
  151. # ZIP code (or other all-numeric code) is passed as a non-string.
  152. self.location_code = str(self.location_code)
  153. # Setting the locale to en-AU returns units in metric. Leaving it blank
  154. # causes weather.com to return the default, which is imperial.
  155. self.locale = 'en-AU' if self.units == 'metric' else ''
  156. self.forecast_url = self.url_template.format(**vars(self))
  157. self.parser = WeathercomHTMLParser(self.logger)
  158. def check_response(self, response):
  159. # Errors for weather.com API manifest in HTTP error codes, not in the
  160. # JSON response.
  161. return False
  162. @require(internet)
  163. def check_weather(self):
  164. '''
  165. Fetches the current weather from wxdata.weather.com service.
  166. '''
  167. if self.units not in ('imperial', 'metric'):
  168. raise Exception("units must be one of (imperial, metric)!")
  169. if self.location_code is None:
  170. self.logger.error(
  171. 'A location_code is required to check Weather.com. See the '
  172. 'documentation for more information.'
  173. )
  174. self.data['update_error'] = self.update_error
  175. return
  176. self.data['update_error'] = ''
  177. try:
  178. self.parser.get_weather_data(self.forecast_url)
  179. if self.parser.weather_data is None:
  180. self.logger.error(
  181. 'Failed to read weather data from page. Run module with '
  182. 'debug logging to get more information.'
  183. )
  184. self.data['update_error'] = self.update_error
  185. return
  186. try:
  187. observed = self.parser.weather_data['Observation']
  188. # Observation data stored under a sub-key containing the
  189. # lat/long coordinates. For example:
  190. #
  191. # geocode:41.77,-88.35:language:en-US:units:e
  192. #
  193. # Since this is the only key under "Observation", we can just
  194. # use next(iter(observed)) to get it.
  195. observed = observed[next(iter(observed))]['data']['vt1observation']
  196. except KeyError:
  197. self.logger.error(
  198. 'Failed to retrieve current conditions from API response. '
  199. 'Run module with debug logging to get more information.'
  200. )
  201. self.data['update_error'] = self.update_error
  202. return
  203. try:
  204. forecast = self.parser.weather_data['DailyForecast']
  205. # Same as above, use next(iter(forecast)) to drill down to the
  206. # correct nested dict level.
  207. forecast = forecast[next(iter(forecast))]
  208. forecast = forecast['data']['vt1dailyForecast'][0]
  209. except (IndexError, KeyError):
  210. self.logger.error(
  211. 'Failed to retrieve forecast data from API response. '
  212. 'Run module with debug logging to get more information.'
  213. )
  214. self.data['update_error'] = self.update_error
  215. return
  216. try:
  217. self.city_name = self.parser.weather_data['Location']
  218. # Again, same technique as above used to get down to the
  219. # correct nested dict level.
  220. self.city_name = self.city_name[next(iter(self.city_name))]
  221. self.city_name = self.city_name['data']['location']['displayName']
  222. except KeyError:
  223. self.logger.warning(
  224. 'Failed to get city name from API response, falling back '
  225. 'to location code \'%s\'', self.location_code
  226. )
  227. self.city_name = self.location_code
  228. # Cut off the timezone from the end of the string (it's after the last
  229. # space, hence the use of rpartition). International timezones (or ones
  230. # outside the system locale) don't seem to be handled well by
  231. # datetime.datetime.strptime().
  232. try:
  233. observation_time_str = str(observed.get('observationTime', ''))
  234. observation_time = datetime.strptime(observation_time_str,
  235. '%Y-%d-%yT%H:%M:%S%z')
  236. except (ValueError, AttributeError):
  237. observation_time = datetime.fromtimestamp(0)
  238. try:
  239. pressure_trend_str = observed.get('barometerTrend', '').lower()
  240. except AttributeError:
  241. pressure_trend_str = ''
  242. if pressure_trend_str == 'rising':
  243. pressure_trend = '+'
  244. elif pressure_trend_str == 'falling':
  245. pressure_trend = '-'
  246. else:
  247. pressure_trend = ''
  248. try:
  249. high_temp = forecast.get('day', {}).get('temperature', '')
  250. except (AttributeError, IndexError):
  251. high_temp = ''
  252. else:
  253. if high_temp is None:
  254. # In the mid-afternoon, the high temp disappears from the
  255. # forecast, so just set high_temp to an empty string.
  256. high_temp = ''
  257. try:
  258. low_temp = forecast.get('night', {}).get('temperature', '')
  259. except (AttributeError, IndexError):
  260. low_temp = ''
  261. if self.units == 'imperial':
  262. temp_unit = '°F'
  263. wind_unit = 'mph'
  264. pressure_unit = 'in'
  265. visibility_unit = 'mi'
  266. else:
  267. temp_unit = '°C'
  268. wind_unit = 'kph'
  269. pressure_unit = 'mb'
  270. visibility_unit = 'km'
  271. self.data['city'] = self.city_name
  272. self.data['condition'] = str(observed.get('phrase', ''))
  273. self.data['observation_time'] = observation_time
  274. self.data['current_temp'] = str(observed.get('temperature', ''))
  275. self.data['low_temp'] = str(low_temp)
  276. self.data['high_temp'] = str(high_temp)
  277. self.data['temp_unit'] = temp_unit
  278. self.data['feelslike'] = str(observed.get('feelsLike', ''))
  279. self.data['dewpoint'] = str(observed.get('dewPoint', ''))
  280. self.data['wind_speed'] = str(observed.get('windSpeed', ''))
  281. self.data['wind_unit'] = wind_unit
  282. self.data['wind_direction'] = str(observed.get('windDirCompass', ''))
  283. # Gust can be None, using "or" to ensure empty string in this case
  284. self.data['wind_gust'] = str(observed.get('gust', '') or '')
  285. self.data['pressure'] = str(observed.get('altimeter', ''))
  286. self.data['pressure_unit'] = pressure_unit
  287. self.data['pressure_trend'] = pressure_trend
  288. self.data['visibility'] = str(observed.get('visibility', ''))
  289. self.data['visibility_unit'] = visibility_unit
  290. self.data['humidity'] = str(observed.get('humidity', ''))
  291. self.data['uv_index'] = str(observed.get('uvIndex', ''))
  292. except Exception:
  293. # Don't let an uncaught exception kill the update thread
  294. self.logger.error(
  295. 'Uncaught error occurred while checking weather. '
  296. 'Exception follows:', exc_info=True
  297. )
  298. self.data['update_error'] = self.update_error