Merge remote-tracking branch 'upstream/master'

master
Paul Walko 2018-12-23 17:36:49 -05:00
commit 545b6690ed
23 changed files with 625 additions and 294 deletions

1
.coverage Normal file

File diff suppressed because one or more lines are too long

View File

@ -2,9 +2,10 @@ language: python
sudo: false
cache: pip
python:
- 3.4
- 3.5
- 3.6
- 3.7
install:
- pip install -r requirements.txt
script: nosetests
script:
- LANG=en_US.UTF-8 nosetests

16
Dockerfile Normal file
View File

@ -0,0 +1,16 @@
FROM python:3
WORKDIR /usr/src/app
COPY requirements.txt /usr/src/app/
RUN pip install --no-cache-dir -r requirements.txt
COPY . /usr/src/app/
RUN groupadd -r -g 200 bot \
&& useradd -mr -g bot -u 200 bot
USER bot
VOLUME ["/home/bot/.phenny"]
CMD ["/usr/src/app/phenny"]

64
irc.py
View File

@ -9,11 +9,16 @@ http://inamidst.com/phenny/
import asynchat
import asyncore
import functools
import proto
import re
import socket
import ssl
import sys
import time
import traceback
import threading
from tools import decorate
class Origin(object):
@ -49,9 +54,12 @@ class Bot(asynchat.async_chat):
self.channels = channels or []
self.stack = []
import threading
self.sending = threading.RLock()
proto_func = lambda attr: functools.partial(proto.commands[attr], self)
proto_map = {attr: proto_func(attr) for attr in proto.commands}
self.proto = decorate(object(), proto_map)
def initiate_send(self):
self.sending.acquire()
asynchat.async_chat.initiate_send(self)
@ -61,24 +69,22 @@ class Bot(asynchat.async_chat):
# asynchat.async_chat.push(self, *args, **kargs)
def __write(self, args, text=None):
# print 'PUSH: %r %r %r' % (self, args, text)
try:
if text is not None:
# 510 because CR and LF count too, as nyuszika7h points out
self.push((b' '.join(args) + b' :' + text)[:510] + b'\r\n')
else:
self.push(b' '.join(args)[:512] + b'\r\n')
except IndexError:
pass
line = b' '.join(args)
if text is not None:
line += b' :' + text
# 510 because CR and LF count too
self.push(line[:510] + b'\r\n')
def write(self, args, text=None):
"""This is a safe version of __write"""
def safe(input):
if type(input) == str:
input = input.replace('\n', '')
input = input.replace('\r', '')
input = re.sub(' ?(\r|\n)+', ' ', input)
return input.encode('utf-8')
else:
input = re.sub(b' ?(\r|\n)+', b' ', input)
return input
try:
args = [safe(arg) for arg in args]
@ -127,10 +133,12 @@ class Bot(asynchat.async_chat):
def handle_connect(self):
if self.verbose:
print('connected!', file=sys.stderr)
if self.password:
self.write(('PASS', self.password))
self.write(('NICK', self.nick))
self.write(('USER', self.user, '+iw', self.nick), self.name)
self.proto.pass_(self.password)
self.proto.nick(self.nick)
self.proto.user(self.user, '+iw', self.name)
def handle_close(self):
self.close()
@ -165,7 +173,7 @@ class Bot(asynchat.async_chat):
self.dispatch(origin, tuple([text] + args))
if args[0] == 'PING':
self.write(('PONG', text))
self.proto.pong(text)
def dispatch(self, origin, args):
pass
@ -185,19 +193,6 @@ class Bot(asynchat.async_chat):
except UnicodeEncodeError as e:
return
# Split long messages
maxlength = 430
if len(text) > maxlength:
first_message = text[0:maxlength].decode('utf-8','ignore')
line_break = len(first_message)
for i in range(len(first_message)-1,-1,-1):
if first_message[i] == " ":
line_break = i
break
self.msg(recipient, text.decode('utf-8','ignore')[0:line_break])
self.msg(recipient, text.decode('utf-8','ignore')[line_break+1:])
return
# No messages within the last 3 seconds? Go ahead!
# Otherwise, wait so it's been at least 0.8 seconds + penalty
if self.stack:
@ -216,12 +211,7 @@ class Bot(asynchat.async_chat):
self.sending.release()
return
def safe(input):
if type(input) == str:
input = input.encode('utf-8')
input = input.replace(b'\n', b'')
return input.replace(b'\r', b'')
self.__write((b'PRIVMSG', safe(recipient)), safe(text))
self.proto.privmsg(recipient, text)
self.stack.append((time.time(), text))
self.stack = self.stack[-10:]
@ -231,12 +221,8 @@ class Bot(asynchat.async_chat):
text = "\x01ACTION {0}\x01".format(text)
return self.msg(recipient, text)
def notice(self, dest, text):
self.write(('NOTICE', dest), text)
def error(self, origin):
try:
import traceback
trace = traceback.format_exc()
print(trace)
lines = list(reversed(trace.splitlines()))

View File

@ -13,9 +13,7 @@ def join(phenny, input):
if input.sender.startswith('#'): return
if input.admin:
channel, key = input.group(1), input.group(2)
if not key:
phenny.write(['JOIN'], channel)
else: phenny.write(['JOIN', channel, key])
phenny.proto.join(channel, key)
join.rule = r'\.join (#\S+)(?: *(\S+))?'
join.priority = 'low'
join.example = '.join #example or .join #example key'
@ -24,7 +22,7 @@ def autojoin(phenny, input):
"""Join the specified channel when invited by an admin."""
if input.admin:
channel = input.group(1)
phenny.write(['JOIN'], channel)
phenny.proto.join(channel)
autojoin.event = 'INVITE'
autojoin.rule = r'(.*)'
@ -33,7 +31,7 @@ def part(phenny, input):
# Can only be done in privmsg by an admin
if input.sender.startswith('#'): return
if input.admin:
phenny.write(['PART'], input.group(2))
phenny.proto.part(input.group(2))
part.rule = (['part'], r'(#\S+)')
part.priority = 'low'
part.example = '.part #example'
@ -43,7 +41,7 @@ def quit(phenny, input):
# Can only be done in privmsg by the owner
if input.sender.startswith('#'): return
if input.owner:
phenny.write(['QUIT'])
phenny.proto.quit()
__import__('os')._exit(0)
quit.commands = ['quit']
quit.priority = 'low'

View File

@ -10,36 +10,33 @@ modified from Wikipedia module
author: mutantmonkey <mutantmonkey@mutantmonkey.in>
"""
import re
import web
import wiki
wikiapi = 'https://wiki.archlinux.org/api.php?action=query&list=search&srsearch={0}&limit=1&prop=snippet&format=json'
wikiuri = 'https://wiki.archlinux.org/index.php/{0}'
wikisearch = 'https://wiki.archlinux.org/index.php/Special:Search?' \
+ 'search={0}&fulltext=Search'
endpoints = {
'api': 'https://wiki.archlinux.org/api.php?action=query&list=search&srsearch={0}&limit=1&format=json',
'url': 'https://wiki.archlinux.org/index.php/{0}',
'search': 'https://wiki.archlinux.org/index.php/Special:Search?search={0}&fulltext=Search',
}
def awik(phenny, input):
origterm = input.groups()[1]
if not origterm:
""".awik <term> - Look up something on the ArchWiki."""
origterm = input.group(1)
if not origterm:
return phenny.say('Perhaps you meant ".awik dwm"?')
term = web.unquote(origterm)
term = term[0].upper() + term[1:]
term = term.replace(' ', '_')
term, section = wiki.parse_term(origterm)
w = wiki.Wiki(wikiapi, wikiuri, wikisearch)
w = wiki.Wiki(endpoints)
match = w.search(term)
try:
result = w.search(term)
except web.ConnectionError:
error = "Can't connect to wiki.archlinux.org ({0})".format(wikiuri.format(term))
return phenny.say(error)
if not match:
phenny.say('Can\'t find anything in the ArchWiki for "{0}".'.format(term))
return
if result is not None:
phenny.say(result)
else:
phenny.say('Can\'t find anything in the ArchWiki for "{0}".'.format(origterm))
snippet, url = wiki.extract_snippet(match, section)
phenny.say('"{0}" - {1}'.format(snippet, url))
awik.commands = ['awik']
awik.priority = 'high'

View File

@ -27,13 +27,11 @@ def setup(phenny):
timer = threading.Timer(refresh_delay, close, ())
phenny.data['startup.setup.timer'] = timer
phenny.data['startup.setup.timer'].start()
# print "PING!"
phenny.write(('PING', phenny.config.host))
phenny.proto.ping(phenny.config.host)
phenny.data['startup.setup.pingloop'] = pingloop
def pong(phenny, input):
try:
# print "PONG!"
phenny.data['startup.setup.timer'].cancel()
time.sleep(refresh_delay + 60.0)
pingloop()
@ -50,16 +48,16 @@ def startup(phenny, input):
if phenny.data.get('startup.setup.pingloop'):
phenny.data['startup.setup.pingloop']()
if hasattr(phenny.config, 'serverpass'):
phenny.write(('PASS', phenny.config.serverpass))
if hasattr(phenny.config, 'serverpass'):
phenny.proto.pass_(phenny.config.serverpass)
if hasattr(phenny.config, 'password'):
phenny.msg('NickServ', 'IDENTIFY %s' % phenny.config.password)
time.sleep(5)
# Cf. http://swhack.com/logs/2005-12-05#T19-32-36
for channel in phenny.channels:
phenny.write(('JOIN', channel))
for channel in phenny.channels:
phenny.proto.join(channel)
time.sleep(0.5)
startup.rule = r'(.*)'
startup.event = '251'

View File

@ -2,38 +2,76 @@
test_archwiki.py - tests for the arch wiki module
author: mutantmonkey <mutantmonkey@mutantmonkey.in>
"""
import re
import unittest
from mock import MagicMock, Mock
from mock import MagicMock
from modules import archwiki
import wiki
class TestArchwiki(unittest.TestCase):
def setUp(self):
self.phenny = MagicMock()
self.input = MagicMock()
self.term = None
self.section = None
def prepare(self):
if self.section:
self.text = self.term + '#' + self.section
url_text = wiki.format_term(self.term) +\
'#' + wiki.format_section(self.section)
else:
self.text = self.term
url_text = wiki.format_term(self.term)
self.input.group = lambda x: [None, self.text][x]
self.url = 'https://wiki.archlinux.org/index.php/{0}'.format(url_text)
def check_snippet(self, output):
self.assertIn(self.url, output)
for keyword in self.keywords:
self.assertIn(keyword, output)
def test_awik(self):
input = Mock(groups=lambda: ['', "KVM"])
archwiki.awik(self.phenny, input)
self.term = "OpenDMARC"
self.prepare()
archwiki.awik(self.phenny, self.input)
out = self.phenny.say.call_args[0][0]
m = re.match('^.* - https:\/\/wiki\.archlinux\.org\/index\.php\/KVM$',
out, flags=re.UNICODE)
self.assertTrue(m)
self.keywords = ['DMARC', 'implementation', 'specification']
self.check_snippet(out)
def test_awik_fragment(self):
self.term = "KVM"
self.section = "Kernel support"
self.prepare()
archwiki.awik(self.phenny, self.input)
out = self.phenny.say.call_args[0][0]
self.keywords = ['kernel', 'modules', 'KVM', 'VIRTIO']
self.check_snippet(out)
def test_awik_invalid(self):
term = "KVM#Enabling_KSM"
input = Mock(groups=lambda: ['', term])
archwiki.awik(self.phenny, input)
self.term = "KVM"
self.section = "Enabling KSM"
self.prepare()
self.phenny.say.assert_called_once_with( "Can't find anything in "\
"the ArchWiki for \"{0}\".".format(term))
archwiki.awik(self.phenny, self.input)
out = self.phenny.say.call_args[0][0]
message = "No '{0}' section found.".format(self.section)
self.assertEqual(out, '"{0}" - {1}'.format(message, self.url))
def test_awik_none(self):
term = "Ajgoajh"
input = Mock(groups=lambda: ['', term])
archwiki.awik(self.phenny, input)
self.term = "Ajgoajh"
self.prepare()
self.phenny.say.assert_called_once_with( "Can't find anything in "\
"the ArchWiki for \"{0}\".".format(term))
archwiki.awik(self.phenny, self.input)
out = self.phenny.say.call_args[0][0]
expected = "Can't find anything in the ArchWiki for \"{0}\"."
self.assertEqual(out, expected.format(self.text))

View File

@ -20,7 +20,7 @@ class TestTfw(unittest.TestCase):
tfw.tfw(self.phenny, input)
self.phenny.say.assert_called_once_with(
"WHERE THE FUCK IS THAT? Try another location.")
"WHERE THE FUCK IS THAT? I guess you might think it's a place, but no one else does. Try again.")
def test_celsius(self):
input = Mock(group=lambda x: '24060')

View File

@ -2,38 +2,77 @@
test_vtluugwiki.py - tests for the VTLUUG wiki module
author: mutantmonkey <mutantmonkey@mutantmonkey.in>
"""
import re
import unittest
from mock import MagicMock, Mock
from mock import MagicMock
from modules import vtluugwiki
import wiki
class TestVtluugwiki(unittest.TestCase):
def setUp(self):
self.phenny = MagicMock()
self.input = MagicMock()
self.term = None
self.section = None
def prepare(self):
if self.section:
self.text = self.term + '#' + self.section
url_text = wiki.format_term(self.term) +\
'#' + wiki.format_section(self.section)
else:
self.text = self.term
url_text = wiki.format_term(self.term)
self.input.groups.return_value = [None, self.text]
self.url = 'https://vtluug.org/wiki/{0}'.format(url_text)
def check_snippet(self, output):
self.assertIn(self.url, output)
for keyword in self.keywords:
self.assertIn(keyword, output)
def test_vtluug(self):
input = Mock(groups=lambda: ['', "VT-Wireless"])
vtluugwiki.vtluug(self.phenny, input)
self.term = "VT-Wireless"
self.prepare()
vtluugwiki.vtluug(self.phenny, self.input)
out = self.phenny.say.call_args[0][0]
m = re.match('^.* - https:\/\/vtluug\.org\/wiki\/VT-Wireless$',
out, flags=re.UNICODE)
self.assertTrue(m)
self.keywords = ['campus', 'wireless', 'networks']
self.check_snippet(out)
def test_vtluug_fragment(self):
self.term = "EAP-TLS"
self.section = "netctl"
self.prepare()
vtluugwiki.vtluug(self.phenny, self.input)
out = self.phenny.say.call_args[0][0]
self.keywords = ['Arch', 'Linux', 'netctl']
self.check_snippet(out)
def test_vtluug_invalid(self):
term = "EAP-TLS#netcfg"
input = Mock(groups=lambda: ['', term])
vtluugwiki.vtluug(self.phenny, input)
self.term = "EAP-TLS"
self.section = "netcfg"
self.prepare()
self.phenny.say.assert_called_once_with( "Can't find anything in "\
"the VTLUUG Wiki for \"{0}\".".format(term))
vtluugwiki.vtluug(self.phenny, self.input)
out = self.phenny.say.call_args[0][0]
message = "No '{0}' section found.".format(self.section)
self.assertEqual(out, '"{0}" - {1}'.format(message, self.url))
def test_vtluug_none(self):
term = "Ajgoajh"
input = Mock(groups=lambda: ['', term])
vtluugwiki.vtluug(self.phenny, input)
self.term = "Ajgoajh"
self.prepare()
vtluugwiki.vtluug(self.phenny, self.input)
out = self.phenny.say.call_args[0][0]
expected = "Can't find anything in the VTLUUG Wiki for \"{0}\"."
self.assertEqual(out, expected.format(self.text))
self.phenny.say.assert_called_once_with( "Can't find anything in "\
"the VTLUUG Wiki for \"{0}\".".format(term))

View File

@ -6,7 +6,7 @@ author: mutantmonkey <mutantmonkey@mutantmonkey.in>
import re
import unittest
from mock import MagicMock, Mock, patch
from modules.weather import location, local, code, f_weather
from modules import weather
class TestWeather(unittest.TestCase):
@ -14,58 +14,55 @@ class TestWeather(unittest.TestCase):
self.phenny = MagicMock()
def test_locations(self):
def check_places(*args):
def validate(actual_name, actual_lat, actual_lon):
names = [n.strip() for n in actual_name.split(',')]
for arg in args:
self.assertIn(arg, names)
return validate
def check_location(result, expected):
self.assertAlmostEqual(result[0], expected[0], places=1)
self.assertAlmostEqual(result[1], expected[1], places=1)
locations = [
('92121', check_places("San Diego", "California")),
('94110', check_places("SF", "California")),
('94041', check_places("Mountain View", "California")),
('27959', check_places("Dare County", "North Carolina")),
('48067', check_places("Royal Oak", "Michigan")),
('23606', check_places("Newport News", "Virginia")),
('23113', check_places("Chesterfield County", "Virginia")),
('27517', check_places("Chapel Hill", "North Carolina")),
('15213', check_places("Allegheny County", "Pennsylvania")),
('90210', check_places("Los Angeles County", "California")),
('33109', check_places("Miami-Dade County", "Florida")),
('80201', check_places("Denver", "Colorado")),
('92121', (32.9, -117.2)),
('94110', (37.8, -122.4)),
('94041', (37.4, -122.1)),
('27959', (36.0, -75.6)),
('48067', (42.5, -83.1)),
('23606', (37.1, -76.5)),
('23113', (37.5, -77.6)),
('27517', (42.6, -7.8)),
('15213', (40.4, -80.0)),
('90210', (34.1, -118.3)),
('33109', (25.8, -80.1)),
('80201', (22.6, 120.3)),
("Berlin", check_places("Berlin", "Deutschland")),
("Paris", check_places("Paris", "France")),
("Vilnius", check_places("Vilnius", "Lietuva")),
("Berlin", (52.5, 13.4)),
("Paris", (48.9, 2.4)),
("Vilnius", (54.7, 25.3)),
('Blacksburg, VA', check_places("Blacksburg", "Virginia")),
('Granger, IN', check_places("Granger", "Indiana")),
('Blacksburg, VA', (37.2, -80.4)),
('Granger, IN', (41.8, -86.1)),
]
for loc, validator in locations:
names, lat, lon = location(loc)
validator(names, lat, lon)
for query, expected in locations:
result = weather.location(query)
check_location(result, expected)
def test_code_94110(self):
icao = code(self.phenny, '94110')
icao = weather.code(self.phenny, '94110')
self.assertEqual(icao, 'KSFO')
def test_airport(self):
input = Mock(group=lambda x: 'KIAD')
f_weather(self.phenny, input)
weather.f_weather(self.phenny, input)
assert self.phenny.say.called is True
def test_place(self):
input = Mock(group=lambda x: 'Blacksburg')
f_weather(self.phenny, input)
weather.f_weather(self.phenny, input)
assert self.phenny.say.called is True
def test_notfound(self):
input = Mock(group=lambda x: 'Hell')
f_weather(self.phenny, input)
weather.f_weather(self.phenny, input)
self.phenny.say.called_once_with('#phenny',
"No NOAA data available for that location.")

View File

@ -2,38 +2,76 @@
test_wikipedia.py - tests for the wikipedia module
author: mutantmonkey <mutantmonkey@mutantmonkey.in>
"""
import re
import unittest
from mock import MagicMock, Mock
from mock import MagicMock
from modules import wikipedia
import wiki
class TestWikipedia(unittest.TestCase):
def setUp(self):
self.phenny = MagicMock()
self.input = MagicMock()
self.term = None
self.section = None
def prepare(self):
if self.section:
self.text = self.term + '#' + self.section
url_text = wiki.format_term(self.term) +\
'#' + wiki.format_section(self.section)
else:
self.text = self.term
url_text = wiki.format_term(self.term)
self.input.groups.return_value = [None, self.text]
self.url = 'https://en.wikipedia.org/wiki/{0}'.format(url_text)
def check_snippet(self, output):
self.assertIn(self.url, output)
for keyword in self.keywords:
self.assertIn(keyword, output)
def test_wik(self):
input = Mock(groups=lambda: ['', "Human back"])
wikipedia.wik(self.phenny, input)
self.term = "Human back"
self.prepare()
wikipedia.wik(self.phenny, self.input)
out = self.phenny.say.call_args[0][0]
m = re.match('^.* - https:\/\/en\.wikipedia\.org\/wiki\/Human_back$',
out, flags=re.UNICODE)
self.assertTrue(m)
self.keywords = ['human', 'back', 'body', 'buttocks', 'neck']
self.check_snippet(out)
def test_wik_fragment(self):
self.term = "New York City"
self.section = "Climate"
self.prepare()
wikipedia.wik(self.phenny, self.input)
out = self.phenny.say.call_args[0][0]
self.keywords = ['New York', 'climate', 'humid', 'subtropical']
self.check_snippet(out)
def test_wik_invalid(self):
term = "New York City#Climate"
input = Mock(groups=lambda: ['', term])
wikipedia.wik(self.phenny, input)
self.term = "New York City"
self.section = "Physics"
self.prepare()
self.phenny.say.assert_called_once_with( "Can't find anything in "\
"Wikipedia for \"{0}\".".format(term))
wikipedia.wik(self.phenny, self.input)
out = self.phenny.say.call_args[0][0]
message = "No '{0}' section found.".format(self.section)
self.assertEqual(out, '"{0}" - {1}'.format(message, self.url))
def test_wik_none(self):
term = "Ajgoajh"
input = Mock(groups=lambda: ['', term])
wikipedia.wik(self.phenny, input)
self.term = "Ajgoajh"
self.prepare()
self.phenny.say.assert_called_once_with( "Can't find anything in "\
"Wikipedia for \"{0}\".".format(term))
wikipedia.wik(self.phenny, self.input)
out = self.phenny.say.call_args[0][0]
expected = "Can't find anything in Wikipedia for \"{0}\"."
self.assertEqual(out, expected.format(self.text))

View File

@ -26,7 +26,7 @@ def tfw(phenny, input, fahrenheit=False, celsius=False, mev=False):
icao_code = weather.code(phenny, where)
if not icao_code:
phenny.say("WHERE THE FUCK IS THAT? Try another location.")
phenny.say("WHERE THE FUCK IS THAT? I guess you might think it's a place, but no one else does. Try again.")
return
uri = 'http://tgftp.nws.noaa.gov/data/observations/metar/stations/%s.TXT'
@ -35,11 +35,11 @@ def tfw(phenny, input, fahrenheit=False, celsius=False, mev=False):
except AttributeError:
raise GrumbleError("THE INTERNET IS FUCKING BROKEN. Please try again later.")
except web.HTTPError:
phenny.say("WHERE THE FUCK IS THAT? Try another location.")
phenny.say("WHERE THE FUCK IS THAT? I guess you might think it's a place, but no one else does. Try again.")
return
if 'Not Found' in bytes:
phenny.say("WHERE THE FUCK IS THAT? Try another location.")
phenny.say("WHERE THE FUCK IS THAT? I guess you might think it's a place, but no one else does. Try again.")
return
w = metar.parse(bytes)
@ -64,33 +64,44 @@ def tfw(phenny, input, fahrenheit=False, celsius=False, mev=False):
"Nothing a few shots couldn't fix",
"Should have gone south",
"You think this is cold? Have you been to upstate New York?",
"Why do I live here?", "wang icicles.",
"Freezing my balls off out here", "Fuck this place.",
"GREAT! If you're a penguin.", "Fresh off the tap.",
"Why do I live here?",
"wang icicles.",
"Freezing my balls off out here",
"Fuck this place.",
"GREAT! If you're a penguin.",
"Fresh off the tap.",
"Fantastic do-nothing weather.",
"Put on some fucking socks.", "Blue balls x 2",
"Put on some fucking socks.",
"Blue balls x 2",
"Good news, food won't spoil nearly as fast outside. Bad news, who cares?",
"Really?", "Wear a fucking jacket.",
"Really?",
"Wear a fucking jacket.",
"I hear Siberia is the same this time of year.",
"NOT FUCKING JOGGING WEATHER", "Shrinkage's best friend.",
"Warmer than Hoth.", "Good baby making weather.",
"NOT FUCKING JOGGING WEATHER",
"Shrinkage's best friend.",
"Warmer than Hoth.",
"Good baby making weather.",
"Where's a Tauntaun when you need one?",
"My nipples could cut glass", "Global Warming? Bullshit.",
"My nipples could cut glass",
"Global Warming? Bullshit.",
"Call your local travel agency and ask them if they're serious.",
"Freezing my balls off IN here",
"I'm not sure how you can stand it", "I'm sorry.",
"I'm not sure how you can stand it",
"I'm sorry.",
"Even penguins are wearing jackets.",
"Keep track of your local old people.",
"WHAT THE FUCK DO YOU MEAN IT'S NICER IN ALASKA?",
"Sock warmers are go. Everywhere.",
"Why does my car feel like a pair of ice skates?",
"Actually, a sharp-stick in the eye might not all be that bad right now.",
"THO Season.", "It's a tit-bit nipplie.",
"THO Season.",
"It's a tit-bit nipplie.",
"Anything wooden will make a good fireplace. Thank us later.",
"MOVE THE FUCK ON GOLDILOCKS",
"I'm defrosting inside of my freezer.",
"It's time for a vacation.",
"It's bone chilling cold out. Sorry ladies."]
"It's bone chilling cold out. Sorry ladies."
]
elif w.temperature < 20:
remark = "IT'S FUCKING...ALRIGHT"
flavors = [
@ -98,7 +109,8 @@ def tfw(phenny, input, fahrenheit=False, celsius=False, mev=False):
"Better than a sharp stick in the eye.",
"Everything's nice butter weather!",
"At least you aren't living in a small town in Alaska",
"It could be worse.", "FUCKING NOTHING TO SEE HERE",
"It could be worse.",
"FUCKING NOTHING TO SEE HERE",
"Listen, weather. We need to have a talk.",
"OH NO. THE WEATHER MACHINE IS BROKEN.",
"An Eskimo would beat your ass to be here",
@ -120,31 +132,52 @@ def tfw(phenny, input, fahrenheit=False, celsius=False, mev=False):
"Well, at least we're not in prison.",
"Slap me around and call me Sally. It'd be an improvement.",
"Today is the perfect size, really honey.",
"Maybe Jersey Shore is on tonight."]
"It's that kind of day where you want zip off pants, until you realize how much of a jackass you look like in them.",
"Maybe Jersey Shore is on tonight.",
"Praise \"Bob\"!",
"Or kill me.",
"This statement is false.",
"Lies and slander, sire!"
]
elif w.temperature < 27:
remark = "IT'S FUCKING NICE"
flavors = [
"I made today breakfast in bed.", "FUCKING SWEET",
"Quit your bitching", "Enjoy.", "IT'S ABOUT FUCKING TIME",
"READ A FUCKIN' BOOK", "LETS HAVE A FUCKING PICNIC",
"It is safe to take your ball-mittens off.", "More please.",
"uh, can we trade?", "WOO, Spring Break!",
"I can't believe it's not porn!", "I approve of this message!",
"Operation beach volleyball is go.", "Plucky ducky kinda day.",
"I made today breakfast in bed.",
"FUCKING SWEET",
"Quit your bitching",
"Enjoy.",
"IT'S ABOUT FUCKING TIME",
"READ A FUCKIN' BOOK",
"LETS HAVE A FUCKING PICNIC",
"It is safe to take your ball-mittens off.",
"More please.",
"uh, can we trade?",
"I approve of this message!",
"WE WERE BEGINNING TO THINK YOU LOST YOUR MIND",
"WOO, Spring Break!",
"I can't believe it's not porn!",
"I approve of this message!",
"Operation beach volleyball is go.",
"Plucky ducky kinda day.",
"Today called just to say \"Hi.\"",
"STOP AND SMELL THE FUCKING ROSES",
"FUCKING NOTHING WRONG WITH TODAY", "LETS HAVE A FUCKING SOIREE",
"FUCKING NOTHING WRONG WITH TODAY",
"LETS HAVE A FUCKING SOIREE",
"What would you do for a holyshititsniceout bar?",
"There are no rules today, blow shit up!",
"Celebrate Today's Day and buy your Today a present so it knows you care.",
"I feel bad about playing on my computer all day.",
"Party in the woods.", "It is now safe to leave your home.",
"Party in the woods.",
"It is now safe to leave your home.",
"PUT A FUCKING CAPE ON TODAY, BECAUSE IT'S SUPER",
"Today is like \"ice\" if it started with an \"n\". Fuck you, we don't mean nce.",
"Water park! Water drive! Just get wet!",
"The geese are on their way back! Unless you live where they migrate to for the winter.",
"FUCKING AFFABLE AS SHIT", "Give the sun a raise!",
"Today is better than an original holographic Charizard. Loser!"]
"FUCKING AFFABLE AS SHIT",
"Give the sun a raise!",
"Go outside and go cycling or some shit, you fitness nerd!",
"Today is better than an original holographic Charizard. Loser!"
]
else:
remark = "IT'S FUCKING HOT"
flavors = [
@ -161,24 +194,60 @@ def tfw(phenny, input, fahrenheit=False, celsius=False, mev=False):
"Isn't the desert nice this time of year?",
"Why, oh why did we decide to live in an oven?",
"It's hotter outside than my fever.",
"I recommend staying away from fat people.",
"TAKE IT OFF!",
"TAKE FUCKING EVERYTHING OFF!",
"EVEN THAT NEEDS TO COME OFF!",
"Even your frigid girlfriend can't save you from today.",
"I need gloves to touch the steering wheel.",
"I can hear that power bill running up right now!",
"Lock up yo' ice cream trucks, lock up yo' wife.",
"FUCKING SUNBURNED, AND I WAS INSIDE ALL DAY.",
"Fuck this shit, I'm moving back to Alaska."]
"Fuck this shit, I'm moving back to Alaska."
]
if w.descriptor == "thunderstorm":
remark += " AND THUNDERING"
flavors += [
"Are you sure you want to go out in that? I'm not",
"Fuck my ears!",
"Don't go flying a kite. Unless you're Ben Franklin",
"Did you think Eris would smile upon your failings?"
]
elif w.precipitation in ("snow", "snow grains"):
remark += " AND SNOWING"
flavors += [
"What's this white stuff that's sticking to everything?",
"At least that stuff doesn't glow in the dark!",
"How the fuck am I supposed to get around now?",
"And you thought four-wheel-drive would help you!",
"Go fight those cadets with snowballs",
"Where does the white go when the snow melts?",
"Just sNOw"
]
elif w.precipitation in ("drizzle", "rain", "unknown precipitation"):
remark += " AND WET"
flavors += [
"Just like your mom!",
"I guess it can't get much worse",
"Hope you have a rain coat",
"Shower outside?",
"If only more buildings had gargoyles..."
]
elif w.precipitation in ("ice crystals", "ice pellets"):
remark += " AND ICY"
flavors += [
"Nice, but without the N!",
"Where's some NaCl when you need it?",
"I hope your skates are nearby.",
"Studded tyres? What're those?"
]
elif w.precipitation in ("hail", "small hail"):
remark += " AND HAILING"
flavors += [
"Windshield damage!",
"Car alarms!",
"Lie face-down outside: free massage!"
]
if int(tempf) == 69:
remark = "IT'S FUCKING SEXY TIME"
@ -187,7 +256,8 @@ def tfw(phenny, input, fahrenheit=False, celsius=False, mev=False):
"What comes after 69? Mouthwash.",
"If you are given two contradictory orders, obey them both.",
"a good fuckin' time! ;)",
"What's the square root of 69? Eight something."]
"What's the square root of 69? Eight something."
]
flavor = random.choice(flavors)
@ -210,7 +280,7 @@ def tfwc(phenny, input):
tfwc.rule = (['tfwc'], r'(.*)')
def tfwev(phenny, input):
""".tfwc <city/zip> - The fucking weather, in fucking degrees celsius."""
""".tfwev <city/zip> - The fucking weather, in fucking electron volts."""
return tfw(phenny, input, mev=True)
tfwev.rule = (['tfwev'], r'(.*)')

View File

@ -17,13 +17,6 @@ def urbandict(phenny, input):
phenny.say(urbandict.__doc__.strip())
return
# create opener
#opener = urllib.request.build_opener()
#opener.addheaders = [
# ('User-agent', web.Grab().version),
# ('Referer', "http://m.urbandictionary.com"),
#]
try:
data = web.get(
"http://api.urbandictionary.com/v0/define?term={0}".format(
@ -33,11 +26,13 @@ def urbandict(phenny, input):
raise GrumbleError(
"Urban Dictionary slemped out on me. Try again in a minute.")
if data['result_type'] == 'no_results':
results = data['list']
if not results:
phenny.say("No results found for {0}".format(word))
return
result = data['list'][0]
result = results[0]
url = 'http://www.urbandictionary.com/define.php?term={0}'.format(
web.quote(word))

View File

@ -10,14 +10,13 @@ modified from Wikipedia module
author: mutantmonkey <mutantmonkey@mutantmonkey.in>
"""
import re
import web
import wiki
wikiapi = 'https://vtluug.org/w/api.php?action=query&list=search&srsearch={0}&limit=1&prop=snippet&format=json'
wikiuri = 'https://vtluug.org/wiki/{0}'
wikisearch = 'https://vtluug.org/wiki/Special:Search?' \
+ 'search={0}&fulltext=Search'
endpoints = {
'api': 'https://vtluug.org/w/api.php?action=query&list=search&srsearch={0}&limit=1&prop=snippet&format=json',
'url': 'https://vtluug.org/wiki/{0}',
'search': 'https://vtluug.org/wiki/Special:Search?search={0}&fulltext=Search',
}
def vtluug(phenny, input):
""".vtluug <term> - Look up something on the VTLUUG wiki."""
@ -26,22 +25,19 @@ def vtluug(phenny, input):
if not origterm:
return phenny.say('Perhaps you meant ".vtluug VT-Wireless"?')
term = web.unquote(origterm)
term = term[0].upper() + term[1:]
term = term.replace(' ', '_')
term, section = wiki.parse_term(origterm)
w = wiki.Wiki(wikiapi, wikiuri, wikisearch)
w = wiki.Wiki(endpoints)
match = w.search(term)
try:
result = w.search(term)
except web.ConnectionError:
error = "Can't connect to vtluug.org ({0})".format(wikiuri.format(term))
return phenny.say(error)
if not match:
phenny.say('Can\'t find anything in the VTLUUG Wiki for "{0}".'.format(term))
return
snippet, url = wiki.extract_snippet(match, section)
phenny.say('"{0}" - {1}'.format(snippet, url))
if result is not None:
phenny.say(result)
else:
phenny.say('Can\'t find anything in the VTLUUG Wiki for "{0}".'.format(origterm))
vtluug.commands = ['vtluug']
vtluug.priority = 'high'

View File

@ -25,13 +25,13 @@ def location(q):
results = web.get(uri)
data = json.loads(results)
if len(data) < 1:
return '?', None, None
if not data:
return None, None
display_name = data[0]['display_name']
lat = float(data[0]['lat'])
lon = float(data[0]['lon'])
return display_name, lat, lon
latitude = float(data[0]['lat'])
longitude = float(data[0]['lon'])
return latitude, longitude
def local(icao, hour, minute):
@ -58,7 +58,7 @@ def code(phenny, search):
if search.upper() in [loc[0] for loc in data]:
return search.upper()
else:
display_name, latitude, longitude = location(search)
latitude, longitude = location(search)
if not latitude or not longitude:
return False
sumOfSquares = (99999999999999999999999999999, 'ICAO')

View File

@ -7,14 +7,13 @@ Licensed under the Eiffel Forum License 2.
http://inamidst.com/phenny/
"""
import re
import web
import wiki
wikiapi = 'https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={0}&limit=1&prop=snippet&format=json'
wikiuri = 'https://en.wikipedia.org/wiki/{0}'
wikisearch = 'https://en.wikipedia.org/wiki/Special:Search?' \
+ 'search={0}&fulltext=Search'
endpoints = {
'api': 'https://en.wikipedia.org/w/api.php?format=json&action=query&list=search&srsearch={0}&prop=snippet&limit=1',
'url': 'https://en.wikipedia.org/wiki/{0}',
'search': 'https://en.wikipedia.org/wiki/Special:Search?search={0}&fulltext=Search',
}
def wik(phenny, input):
""".wik <term> - Look up something on Wikipedia."""
@ -23,22 +22,19 @@ def wik(phenny, input):
if not origterm:
return phenny.say('Perhaps you meant ".wik Zen"?')
term = web.unquote(origterm)
term = term[0].upper() + term[1:]
term = term.replace(' ', '_')
origterm = origterm.strip()
term, section = wiki.parse_term(origterm)
w = wiki.Wiki(wikiapi, wikiuri, wikisearch)
w = wiki.Wiki(endpoints)
match = w.search(term)
try:
result = w.search(term)
except web.ConnectionError:
error = "Can't connect to en.wikipedia.org ({0})".format(wikiuri.format(term))
return phenny.say(error)
if result is not None:
phenny.say(result)
else:
if not match:
phenny.say('Can\'t find anything in Wikipedia for "{0}".'.format(origterm))
return
snippet, url = wiki.extract_snippet(match, section)
phenny.say('"{0}" - {1}'.format(snippet, url))
wik.commands = ['wik']
wik.priority = 'high'

View File

@ -17,7 +17,7 @@ def wuvt(phenny, input):
except:
raise GrumbleError("Failed to fetch current track from WUVT")
if 'listeners' in trackinfo:
if 'listeners' in trackinfo and trackinfo['listeners'] is not None:
phenny.say(
"{dj} is currently playing \"{title}\" by {artist} with "
"{listeners:d} online listeners".format(

43
phenny
View File

@ -12,9 +12,9 @@ Then run ./phenny again
"""
import argparse
import imp
import os
import sys
from importlib.machinery import SourceFileLoader
from textwrap import dedent as trim
dotdir = os.path.expanduser('~/.phenny')
@ -152,35 +152,24 @@ def main(argv=None):
config_modules = []
for config_name in config_names(args.config):
name = os.path.basename(config_name).split('.')[0] + '_config'
module = imp.load_source(name, config_name)
module = SourceFileLoader(name, config_name).load_module()
module.filename = config_name
if not hasattr(module, 'prefix'):
module.prefix = r'\.'
defaults = {
'prefix': r'\.',
'name': 'Phenny Palmersbot, http://inamidst.com/phenny/',
'port': 6667,
'ssl': False,
'ca_certs': None,
'ssl_cert': None,
'ssl_key': None,
'ipv6': False,
'password': None,
}
if not hasattr(module, 'name'):
module.name = 'Phenny Palmersbot, http://inamidst.com/phenny/'
if not hasattr(module, 'port'):
module.port = 6667
if not hasattr(module, 'ssl'):
module.ssl = False
if not hasattr(module, 'ca_certs'):
module.ca_certs = None
if not hasattr(module, 'ssl_cert'):
module.ssl_cert = None
if not hasattr(module, 'ssl_key'):
module.ssl_key = None
if not hasattr(module, 'ipv6'):
module.ipv6 = False
if not hasattr(module, 'password'):
module.password = None
for key, value in defaults.items():
if not hasattr(module, key):
setattr(module, key, value)
if module.host == 'irc.example.net':
error = ('Error: you must edit the config file first!\n' +

55
proto.py Normal file
View File

@ -0,0 +1,55 @@
#!/usr/bin/env python3
"""
proto.py - IRC protocol messages
"""
import sys
import traceback
def _comma(arg):
if type(arg) is list:
arg = ','.join(arg)
return arg
def join(self, channels, keys=None):
channels = _comma(channels)
if keys:
keys = _comma(keys)
self.write(('JOIN', channels, keys))
else:
self.write(('JOIN', channels))
def nick(self, nickname):
self.write(('NICK', nickname))
def notice(self, msgtarget, message):
self.write(('NOTICE', msgtarget), message)
def part(self, channels, message=None):
channels = _comma(channels)
self.write(('PART', channels), message)
def pass_(self, password):
self.write(('PASS', password))
def ping(self, server1, server2=None):
self.write(('PING', server1), server2)
def pong(self, server1, server2=None):
self.write(('PONG', server1), server2)
def privmsg(self, msgtarget, message):
self.write(('PRIVMSG', msgtarget), message)
def quit(self, message=None):
self.write(('QUIT'), message)
def user(self, user, mode, realname):
self.write(('USER', user, mode, '_'), realname)
module_dict = sys.modules[__name__].__dict__
command_filter = lambda k, v: callable(v) and not k.startswith('_')
commands = {k: v for k, v in module_dict.items() if command_filter(k, v)}

View File

@ -42,7 +42,7 @@ class BotTest(unittest.TestCase):
mock_write.assert_has_calls([
call(('NICK', self.nick)),
call(('USER', self.nick, '+iw', self.nick), self.name)
call(('USER', self.nick, '+iw', '_'), self.name)
])
@patch('irc.Bot.write')
@ -50,7 +50,7 @@ class BotTest(unittest.TestCase):
self.bot.buffer = b"PING"
self.bot.found_terminator()
mock_write.assert_called_once_with(('PONG', ''))
mock_write.assert_called_once_with(('PONG', ''), None)
@patch('irc.Bot.push')
def test_msg(self, mock_push):
@ -80,6 +80,6 @@ class BotTest(unittest.TestCase):
@patch('irc.Bot.write')
def test_notice(self, mock_write):
notice = "This is a notice!"
self.bot.notice('jqh', notice)
self.bot.proto.notice('jqh', notice)
mock_write.assert_called_once_with(('NOTICE', 'jqh'), notice)

View File

@ -8,6 +8,19 @@ http://inamidst.com/phenny/
"""
def decorate(obj, delegate):
class Decorator(object):
def __getattr__(self, attr):
if attr in delegate:
return delegate[attr]
return getattr(obj, attr)
def __setattr__(self, attr, value):
return setattr(obj, attr, value)
return Decorator()
class GrumbleError(Exception):
pass

142
wiki.py
View File

@ -1,5 +1,8 @@
import json
import lxml.html
import re
from requests.exceptions import HTTPError
from urllib.parse import quote, unquote
import web
@ -16,15 +19,104 @@ abbrs = ['etc', 'ca', 'cf', 'Co', 'Ltd', 'Inc', 'Mt', 'Mr', 'Mrs',
'syn', 'transl', 'sess', 'fl', 'Op', 'Dec', 'Brig', 'Gen'] \
+ list('ABCDEFGHIJKLMNOPQRSTUVWXYZ') \
+ list('abcdefghijklmnopqrstuvwxyz')
t_sentence = r'^.{5,}?(?<!\b%s)(?:\.(?=[\[ ][A-Z0-9]|\Z)|\Z)'
r_sentence = re.compile(t_sentence % r')(?<!\b'.join(abbrs))
no_abbr = ''.join('(?<! ' + abbr + ')' for abbr in abbrs)
breaks = re.compile('({})+'.format('|'.join([
no_abbr + '[.!?](?:[ \n]|\[[0-9]+\]|$)',
'', '', '', '', '',
])))
def format_term(term):
term = term.replace(' ', '_')
term = term[0].upper() + term[1:]
return term
def deformat_term(term):
term = term.replace('_', ' ')
return term
def format_section(section):
section = section.replace(' ', '_')
section = quote(section)
section = section.replace('%', '.')
section = section.replace(".3A", ":")
return section
def parse_term(origterm):
if "#" in origterm:
term, section = origterm.split("#")[:2]
term, section = term.strip(), section.strip()
else:
term = origterm.strip()
section = None
return (term, section)
def good_content(text, content):
if text.tag not in ['p', 'ul', 'ol']:
return False
if not content.strip():
return False
if not breaks.search(content):
return False
if text.find(".//span[@id='coordinates']") is not None:
return False
return True
def search_content(text):
if text is None:
return None
content = text.text_content()
while not good_content(text, content):
text = text.getnext()
if text is None:
return None
content = text.text_content()
return content
def extract_snippet(match, origsection=None):
html, url = match
page = lxml.html.fromstring(html)
article = page.get_element_by_id('mw-content-text')
if origsection:
section = format_section(origsection)
text = article.find(".//span[@id='{0}']".format(section))
url += "#" + unquote(section)
if text is None:
return ("No '{0}' section found.".format(origsection), url)
text = text.getparent().getnext()
content = search_content(text)
if text is None:
return ("No section text found.", url)
else:
text = article.find('./p')
if text is None:
text = article.find('./div/p')
content = search_content(text)
if text is None:
return ("No introduction text found.", url)
sentences = [x.strip() for x in breaks.split(content)]
return (sentences[0], url)
class Wiki(object):
def __init__(self, api, url, searchurl=""):
self.api = api
self.url = url
self.searchurl = searchurl
def __init__(self, endpoints):
self.endpoints = endpoints
@staticmethod
def unescape(s):
@ -41,18 +133,34 @@ class Wiki(object):
html = r_whitespace.sub(' ', html)
return Wiki.unescape(html).strip()
def search(self, term, last=False):
url = self.api.format(term)
bytes = web.get(url)
def search(self, term):
try:
result = json.loads(bytes)
result = result['query']['search']
if len(result) <= 0:
return None
exactterm = format_term(term)
exactterm = quote(exactterm)
exacturl = self.endpoints['url'].format(exactterm)
html = web.get(exacturl)
return (html, exacturl)
except HTTPError:
pass
term = deformat_term(term)
term = quote(term)
apiurl = self.endpoints['api'].format(term)
try:
result = json.loads(web.get(apiurl))
except ValueError:
return None
term = result[0]['title']
term = term.replace(' ', '_')
snippet = self.text(result[0]['snippet'])
return "{0} - {1}".format(snippet, self.url.format(term))
result = result['query']['search']
if not result:
return None
term = result[0]['title']
term = format_term(term)
term = quote(term)
url = self.endpoints['url'].format(term)
html = web.get(url)
return (html, url)