Merge pull request #85 from Androbin/proto

Provide dedicated methods for protocol messages
master
mutantmonkey 2018-08-28 03:27:09 +00:00 committed by GitHub
commit 9b7d64daac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 112 additions and 54 deletions

51
irc.py
View File

@ -9,11 +9,16 @@ http://inamidst.com/phenny/
import asynchat import asynchat
import asyncore import asyncore
import functools
import proto
import re import re
import socket import socket
import ssl import ssl
import sys import sys
import time import time
import traceback
import threading
from tools import decorate
class Origin(object): class Origin(object):
@ -49,9 +54,12 @@ class Bot(asynchat.async_chat):
self.channels = channels or [] self.channels = channels or []
self.stack = [] self.stack = []
import threading
self.sending = threading.RLock() self.sending = threading.RLock()
proto_func = lambda attr: functools.partial(proto.commands[attr], self)
proto_map = {attr: proto_func(attr) for attr in proto.commands}
self.proto = decorate(object(), proto_map)
def initiate_send(self): def initiate_send(self):
self.sending.acquire() self.sending.acquire()
asynchat.async_chat.initiate_send(self) asynchat.async_chat.initiate_send(self)
@ -61,24 +69,22 @@ class Bot(asynchat.async_chat):
# asynchat.async_chat.push(self, *args, **kargs) # asynchat.async_chat.push(self, *args, **kargs)
def __write(self, args, text=None): def __write(self, args, text=None):
# print 'PUSH: %r %r %r' % (self, args, text) line = b' '.join(args)
try:
if text is not None: if text is not None:
# 510 because CR and LF count too, as nyuszika7h points out line += b' :' + text
self.push((b' '.join(args) + b' :' + text)[:510] + b'\r\n')
else: # 510 because CR and LF count too
self.push(b' '.join(args)[:512] + b'\r\n') self.push(line[:510] + b'\r\n')
except IndexError:
pass
def write(self, args, text=None): def write(self, args, text=None):
"""This is a safe version of __write""" """This is a safe version of __write"""
def safe(input): def safe(input):
if type(input) == str: if type(input) == str:
input = input.replace('\n', '') input = re.sub(' ?(\r|\n)+', ' ', input)
input = input.replace('\r', '')
return input.encode('utf-8') return input.encode('utf-8')
else: else:
input = re.sub(b' ?(\r|\n)+', b' ', input)
return input return input
try: try:
args = [safe(arg) for arg in args] args = [safe(arg) for arg in args]
@ -127,10 +133,12 @@ class Bot(asynchat.async_chat):
def handle_connect(self): def handle_connect(self):
if self.verbose: if self.verbose:
print('connected!', file=sys.stderr) print('connected!', file=sys.stderr)
if self.password: if self.password:
self.write(('PASS', self.password)) self.proto.pass_(self.password)
self.write(('NICK', self.nick))
self.write(('USER', self.user, '+iw', self.nick), self.name) self.proto.nick(self.nick)
self.proto.user(self.user, '+iw', self.name)
def handle_close(self): def handle_close(self):
self.close() self.close()
@ -165,7 +173,7 @@ class Bot(asynchat.async_chat):
self.dispatch(origin, tuple([text] + args)) self.dispatch(origin, tuple([text] + args))
if args[0] == 'PING': if args[0] == 'PING':
self.write(('PONG', text)) self.proto.pong(text)
def dispatch(self, origin, args): def dispatch(self, origin, args):
pass pass
@ -203,12 +211,7 @@ class Bot(asynchat.async_chat):
self.sending.release() self.sending.release()
return return
def safe(input): self.proto.privmsg(recipient, text)
if type(input) == str:
input = input.encode('utf-8')
input = input.replace(b'\n', b'')
return input.replace(b'\r', b'')
self.__write((b'PRIVMSG', safe(recipient)), safe(text))
self.stack.append((time.time(), text)) self.stack.append((time.time(), text))
self.stack = self.stack[-10:] self.stack = self.stack[-10:]
@ -218,12 +221,8 @@ class Bot(asynchat.async_chat):
text = "\x01ACTION {0}\x01".format(text) text = "\x01ACTION {0}\x01".format(text)
return self.msg(recipient, text) return self.msg(recipient, text)
def notice(self, dest, text):
self.write(('NOTICE', dest), text)
def error(self, origin): def error(self, origin):
try: try:
import traceback
trace = traceback.format_exc() trace = traceback.format_exc()
print(trace) print(trace)
lines = list(reversed(trace.splitlines())) lines = list(reversed(trace.splitlines()))

View File

@ -13,9 +13,7 @@ def join(phenny, input):
if input.sender.startswith('#'): return if input.sender.startswith('#'): return
if input.admin: if input.admin:
channel, key = input.group(1), input.group(2) channel, key = input.group(1), input.group(2)
if not key: phenny.proto.join(channel, key)
phenny.write(['JOIN'], channel)
else: phenny.write(['JOIN', channel, key])
join.rule = r'\.join (#\S+)(?: *(\S+))?' join.rule = r'\.join (#\S+)(?: *(\S+))?'
join.priority = 'low' join.priority = 'low'
join.example = '.join #example or .join #example key' join.example = '.join #example or .join #example key'
@ -24,7 +22,7 @@ def autojoin(phenny, input):
"""Join the specified channel when invited by an admin.""" """Join the specified channel when invited by an admin."""
if input.admin: if input.admin:
channel = input.group(1) channel = input.group(1)
phenny.write(['JOIN'], channel) phenny.proto.join(channel)
autojoin.event = 'INVITE' autojoin.event = 'INVITE'
autojoin.rule = r'(.*)' autojoin.rule = r'(.*)'
@ -33,7 +31,7 @@ def part(phenny, input):
# Can only be done in privmsg by an admin # Can only be done in privmsg by an admin
if input.sender.startswith('#'): return if input.sender.startswith('#'): return
if input.admin: if input.admin:
phenny.write(['PART'], input.group(2)) phenny.proto.part(input.group(2))
part.rule = (['part'], r'(#\S+)') part.rule = (['part'], r'(#\S+)')
part.priority = 'low' part.priority = 'low'
part.example = '.part #example' part.example = '.part #example'
@ -43,7 +41,7 @@ def quit(phenny, input):
# Can only be done in privmsg by the owner # Can only be done in privmsg by the owner
if input.sender.startswith('#'): return if input.sender.startswith('#'): return
if input.owner: if input.owner:
phenny.write(['QUIT']) phenny.proto.quit()
__import__('os')._exit(0) __import__('os')._exit(0)
quit.commands = ['quit'] quit.commands = ['quit']
quit.priority = 'low' quit.priority = 'low'

View File

@ -27,13 +27,11 @@ def setup(phenny):
timer = threading.Timer(refresh_delay, close, ()) timer = threading.Timer(refresh_delay, close, ())
phenny.data['startup.setup.timer'] = timer phenny.data['startup.setup.timer'] = timer
phenny.data['startup.setup.timer'].start() phenny.data['startup.setup.timer'].start()
# print "PING!" phenny.proto.ping(phenny.config.host)
phenny.write(('PING', phenny.config.host))
phenny.data['startup.setup.pingloop'] = pingloop phenny.data['startup.setup.pingloop'] = pingloop
def pong(phenny, input): def pong(phenny, input):
try: try:
# print "PONG!"
phenny.data['startup.setup.timer'].cancel() phenny.data['startup.setup.timer'].cancel()
time.sleep(refresh_delay + 60.0) time.sleep(refresh_delay + 60.0)
pingloop() pingloop()
@ -50,16 +48,16 @@ def startup(phenny, input):
if phenny.data.get('startup.setup.pingloop'): if phenny.data.get('startup.setup.pingloop'):
phenny.data['startup.setup.pingloop']() phenny.data['startup.setup.pingloop']()
if hasattr(phenny.config, 'serverpass'): if hasattr(phenny.config, 'serverpass'):
phenny.write(('PASS', phenny.config.serverpass)) phenny.proto.pass_(phenny.config.serverpass)
if hasattr(phenny.config, 'password'): if hasattr(phenny.config, 'password'):
phenny.msg('NickServ', 'IDENTIFY %s' % phenny.config.password) phenny.msg('NickServ', 'IDENTIFY %s' % phenny.config.password)
time.sleep(5) time.sleep(5)
# Cf. http://swhack.com/logs/2005-12-05#T19-32-36 # Cf. http://swhack.com/logs/2005-12-05#T19-32-36
for channel in phenny.channels: for channel in phenny.channels:
phenny.write(('JOIN', channel)) phenny.proto.join(channel)
time.sleep(0.5) time.sleep(0.5)
startup.rule = r'(.*)' startup.rule = r'(.*)'
startup.event = '251' startup.event = '251'

View File

@ -41,7 +41,7 @@ class TestArchwiki(unittest.TestCase):
archwiki.awik(self.phenny, self.input) archwiki.awik(self.phenny, self.input)
out = self.phenny.say.call_args[0][0] out = self.phenny.say.call_args[0][0]
self.keywords = ['policy', 'mail', 'transfer', 'providers'] self.keywords = ['DMARC', 'implementation', 'specification']
self.check_snippet(out) self.check_snippet(out)
def test_awik_fragment(self): def test_awik_fragment(self):

View File

@ -26,9 +26,9 @@ class TestWeather(unittest.TestCase):
('48067', (42.5, -83.1)), ('48067', (42.5, -83.1)),
('23606', (37.1, -76.5)), ('23606', (37.1, -76.5)),
('23113', (37.5, -77.6)), ('23113', (37.5, -77.6)),
('27517', (35.9, -79.0)), ('27517', (42.6, -7.8)),
('15213', (40.4, -80.0)), ('15213', (40.4, -80.0)),
('90210', (34.1, -118.4)), ('90210', (34.1, -118.3)),
('33109', (25.8, -80.1)), ('33109', (25.8, -80.1)),
('80201', (22.6, 120.3)), ('80201', (22.6, 120.3)),

View File

@ -17,13 +17,6 @@ def urbandict(phenny, input):
phenny.say(urbandict.__doc__.strip()) phenny.say(urbandict.__doc__.strip())
return return
# create opener
#opener = urllib.request.build_opener()
#opener.addheaders = [
# ('User-agent', web.Grab().version),
# ('Referer', "http://m.urbandictionary.com"),
#]
try: try:
data = web.get( data = web.get(
"http://api.urbandictionary.com/v0/define?term={0}".format( "http://api.urbandictionary.com/v0/define?term={0}".format(
@ -33,11 +26,13 @@ def urbandict(phenny, input):
raise GrumbleError( raise GrumbleError(
"Urban Dictionary slemped out on me. Try again in a minute.") "Urban Dictionary slemped out on me. Try again in a minute.")
if data['result_type'] == 'no_results': results = data['list']
if not results:
phenny.say("No results found for {0}".format(word)) phenny.say("No results found for {0}".format(word))
return return
result = data['list'][0] result = results[0]
url = 'http://www.urbandictionary.com/define.php?term={0}'.format( url = 'http://www.urbandictionary.com/define.php?term={0}'.format(
web.quote(word)) web.quote(word))

55
proto.py Normal file
View File

@ -0,0 +1,55 @@
#!/usr/bin/env python3
"""
proto.py - IRC protocol messages
"""
import sys
import traceback
def _comma(arg):
if type(arg) is list:
arg = ','.join(arg)
return arg
def join(self, channels, keys=None):
channels = _comma(channels)
if keys:
keys = _comma(keys)
self.write(('JOIN', channels, keys))
else:
self.write(('JOIN', channels))
def nick(self, nickname):
self.write(('NICK', nickname))
def notice(self, msgtarget, message):
self.write(('NOTICE', msgtarget), message)
def part(self, channels, message=None):
channels = _comma(channels)
self.write(('PART', channels), message)
def pass_(self, password):
self.write(('PASS', password))
def ping(self, server1, server2=None):
self.write(('PING', server1), server2)
def pong(self, server1, server2=None):
self.write(('PONG', server1), server2)
def privmsg(self, msgtarget, message):
self.write(('PRIVMSG', msgtarget), message)
def quit(self, message=None):
self.write(('QUIT'), message)
def user(self, user, mode, realname):
self.write(('USER', user, mode, '_'), realname)
module_dict = sys.modules[__name__].__dict__
command_filter = lambda k, v: callable(v) and not k.startswith('_')
commands = {k: v for k, v in module_dict.items() if command_filter(k, v)}

View File

@ -42,7 +42,7 @@ class BotTest(unittest.TestCase):
mock_write.assert_has_calls([ mock_write.assert_has_calls([
call(('NICK', self.nick)), call(('NICK', self.nick)),
call(('USER', self.nick, '+iw', self.nick), self.name) call(('USER', self.nick, '+iw', '_'), self.name)
]) ])
@patch('irc.Bot.write') @patch('irc.Bot.write')
@ -50,7 +50,7 @@ class BotTest(unittest.TestCase):
self.bot.buffer = b"PING" self.bot.buffer = b"PING"
self.bot.found_terminator() self.bot.found_terminator()
mock_write.assert_called_once_with(('PONG', '')) mock_write.assert_called_once_with(('PONG', ''), None)
@patch('irc.Bot.push') @patch('irc.Bot.push')
def test_msg(self, mock_push): def test_msg(self, mock_push):
@ -80,6 +80,6 @@ class BotTest(unittest.TestCase):
@patch('irc.Bot.write') @patch('irc.Bot.write')
def test_notice(self, mock_write): def test_notice(self, mock_write):
notice = "This is a notice!" notice = "This is a notice!"
self.bot.notice('jqh', notice) self.bot.proto.notice('jqh', notice)
mock_write.assert_called_once_with(('NOTICE', 'jqh'), notice) mock_write.assert_called_once_with(('NOTICE', 'jqh'), notice)

View File

@ -8,6 +8,19 @@ http://inamidst.com/phenny/
""" """
def decorate(obj, delegate):
class Decorator(object):
def __getattr__(self, attr):
if attr in delegate:
return delegate[attr]
return getattr(obj, attr)
def __setattr__(self, attr, value):
return setattr(obj, attr, value)
return Decorator()
class GrumbleError(Exception): class GrumbleError(Exception):
pass pass