# -*- coding: utf-8 -*-
# Copyright (C) 2013-2015 Samuel Damashek, Peter Foley, James Forcier, Srijay Kasturi, Reed Koser, Christopher Reffett, and Fox Wilson
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import datetime
import re
import socket
from os.path import join
import geoip2
from pkg_resources import Requirement, resource_filename
from requests import get
from ..helpers import arguments
from ..helpers.command import Command
from ..helpers.geoip import get_zipcode
from ..helpers.orm import Weather_prefs
[docs]def get_default(nick, session, send, config, source):
location = session.query(Weather_prefs.location).filter(Weather_prefs.nick == nick).scalar()
if location is None:
try:
# attempt to get GeoIP location, can fail if the DB isn't available, hostmask doesn't have
# an IP, etc.
hostmask = source.split('@')[1]
hostip = re.search(r"\d{1,3}[.-]\d{1,3}[.-]\d{1,3}[.-]\d{1,3}", hostmask)
# If that failed, could be a v6 addr
if not hostip:
try:
socket.inet_pton(socket.AF_INET6, hostmask)
hostip = hostmask
except socket.error:
pass
else:
hostip = hostip.group()
if hostip:
hostip = re.sub('-', '.', hostip)
db_file = resource_filename(Requirement.parse('CslBot'), join('cslbot', config['db']['geoip']))
location = get_zipcode(db_file, hostip)
if location is not None:
send("No default location for %s, GeoIP guesses that your zip code is %s." % (nick, location))
return location
except (FileNotFoundError, geoip2.errors.AddressNotFoundError):
pass
# default to TJHSST
send("No default location for %s and unable to guess a location, defaulting to TJ (22312)." % nick)
return '22312'
else:
return location
[docs]def valid_location(location, apikey):
data = get('http://api.wunderground.com/api/%s/conditions/q/%s.json' % (apikey, location)).json()
return 'current_observation' in data
[docs]def set_default(nick, location, session, send, apikey):
"""Sets nick's default location to location."""
if valid_location(location, apikey):
send("Setting default location")
default = session.query(Weather_prefs).filter(Weather_prefs.nick == nick).first()
if default is None:
default = Weather_prefs(nick=nick, location=location)
session.add(default)
else:
default.location = location
else:
send("Invalid or Ambiguous Location")
[docs]def get_weather(cmdargs, send, apikey):
if cmdargs.string.startswith("-"):
data = get('http://api.wunderground.com/api/%s/conditions/q/%s.json' % (apikey, cmdargs.string[1:])).json()
if 'current_observation' in data:
data = {'display_location': {'full': cmdargs.string[1:]},
'weather': 'Sunny', 'temp_f': '94.8', 'feelslike_f': '92.6', 'relative_humidity': '60%', 'pressure_in': '29.98',
'wind_string': 'Calm'}
forecastdata = {'conditions': 'Thunderstorms... Extreme Thunderstorms... Plague of Insects... The Rapture... Anti-Christ',
'high': {'fahrenheit': '3841'}, 'low': {'fahrenheit': '-6666'}}
alertdata = {'alerts': [{'description': 'Apocalypse', 'expires': 'at the end of days'}]}
elif 'results' in data['response']:
send("%d results found, please be more specific" % len(data['response']['results']))
return False
else:
send("Invalid or Ambiguous Location")
return False
else:
data = get('http://api.wunderground.com/api/%s/conditions/q/%s.json' % (apikey, cmdargs.string)).json()
forecastdata = get('http://api.wunderground.com/api/%s/forecast/q/%s.json' % (apikey, cmdargs.string)).json()
alertdata = get('http://api.wunderground.com/api/%s/alerts/q/%s.json' % (apikey, cmdargs.string)).json()
if 'current_observation' in data:
data = data['current_observation']
elif 'results' in data['response']:
send("%d results found, please be more specific" % len(data['response']['results']))
return False
else:
send("Invalid or Ambiguous Location")
return False
forecastdata = forecastdata['forecast']['simpleforecast']['forecastday'][0]
send("Current weather for %s:" % data['display_location']['full'])
current = '%s, Temp: %s (Feels like %s), Humidity: %s, Pressure: %s", Wind: %s' % (
data['weather'],
data['temp_f'],
data['feelslike_f'],
data['relative_humidity'],
data['pressure_in'],
data['wind_string'])
forecast = '%s, High: %s, Low: %s' % (
forecastdata['conditions'],
forecastdata['high']['fahrenheit'],
forecastdata['low']['fahrenheit'])
send(current)
send("Forecast: %s" % forecast)
if alertdata['alerts']:
alertlist = []
for alert in alertdata['alerts']:
alertlist.append("%s, expires %s" % (
alert['description'],
alert['expires']))
send("Weather Alerts: %s" % ', '.join(alertlist))
return True
[docs]def get_forecast(cmdargs, send, apikey):
forecastdata = get('http://api.wunderground.com/api/%s/forecast10day/q/%s.json' % (apikey, cmdargs.string)).json()
if 'forecast' in forecastdata:
forecastdata = forecastdata['forecast']['simpleforecast']['forecastday']
elif 'results' in forecastdata['response']:
send("%d results found, please be more specific" % len(forecastdata['response']['results']))
return False
else:
send("Invalid or Ambiguous Location")
return False
for day in forecastdata:
if (day['date']['day'], day['date']['month'], day['date']['year']) == (cmdargs.date.day, cmdargs.date.month, cmdargs.date.year):
forecast = '%s, High: %s, Low: %s' % (
day['conditions'],
day['high']['fahrenheit'],
day['low']['fahrenheit'])
send("Forecast for %s on %s: %s" % (cmdargs.string, cmdargs.date.strftime("%x"), forecast))
return
send("Couldn't find data for %s in the 10-day forecast" % (cmdargs.date.strftime("%x")))
[docs]def get_hourly(cmdargs, send, apikey):
forecastdata = get('http://api.wunderground.com/api/%s/hourly10day/q/%s.json' % (apikey, cmdargs.string)).json()
if 'hourly_forecast' in forecastdata:
forecastdata = forecastdata['hourly_forecast']
elif 'results' in forecastdata['response']:
send("%d results found, please be more specific" % len(forecastdata['response']['results']))
return False
else:
send("Invalid or Ambiguous Location")
return False
if not cmdargs.date:
cmdargs.date = datetime.datetime.now()
for hour in forecastdata:
# wunderground's API returns strings rather than ints for the date for some reason, so casting is needed here
date = (int(hour['FCTIME'][x]) for x in ['hour', 'mday', 'mon', 'year'])
if date == (cmdargs.hour, cmdargs.date.day, cmdargs.date.month, cmdargs.date.year):
forecast = '%s, Temperature: %s' % (
hour['condition'],
hour['temp']['english'])
send("Forecast for %s on %s at %s00: %s" % (cmdargs.string, cmdargs.date.strftime("%x"), cmdargs.hour, forecast))
return
send("Couldn't find data for %s hour %s in the 10-day hourly forecast" % (cmdargs.date.strftime("%x"), cmdargs.hour))
@Command(['weather', 'bjones'], ['nick', 'config', 'db', 'name', 'source'])
[docs]def cmd(send, msg, args):
"""Gets the weather.
Syntax: {command} <[--date (date)] [--hour (hour)] (location)|--set (default)>
Powered by Weather Underground, www.wunderground.com
"""
apikey = args['config']['api']['weatherapikey']
parser = arguments.ArgParser(args['config'])
parser.add_argument('--date', action=arguments.DateParser)
parser.add_argument('--hour', type=int)
parser.add_argument('--set', action='store_true')
parser.add_argument('string', nargs='*')
try:
cmdargs = parser.parse_args(msg)
except arguments.ArgumentException as e:
send(str(e))
return
if isinstance(cmdargs.string, list):
cmdargs.string = " ".join(cmdargs.string)
if cmdargs.set:
set_default(args['nick'], cmdargs.string, args['db'], send, apikey)
return
if cmdargs.hour is not None and cmdargs.hour > 23:
send("Invalid Hour")
cmdargs.hour = None
nick = args['nick'] if args['name'] == 'weather' else '`bjones'
if not cmdargs.string:
cmdargs.string = get_default(nick, args['db'], send, args['config'], args['source'])
if cmdargs.hour is not None:
get_hourly(cmdargs, send, apikey)
elif cmdargs.date:
get_forecast(cmdargs, send, apikey)
else:
get_weather(cmdargs, send, apikey)