#!/usr/bin/env python """ irc.py - A Utility IRC Bot Copyright 2008, Sean B. Palmer, inamidst.com Licensed under the Eiffel Forum License 2. http://inamidst.com/phenny/ """ import sys, re, time, traceback import socket, asyncore, asynchat import ssl class Origin(object): source = re.compile(r'([^!]*)!?([^@]*)@?(.*)') def __init__(self, bot, source, args): source = source.decode('utf-8') match = Origin.source.match(source or '') self.nick, self.user, self.host = match.groups() if len(args) > 1: target = args[1] else: target = None mappings = {bot.nick: self.nick, None: None} self.sender = mappings.get(target, target) class Bot(asynchat.async_chat): def __init__(self, nick, name, channels, password=None): asynchat.async_chat.__init__(self) self.set_terminator(b'\n') self.buffer = b'' self.nick = nick self.user = nick self.name = name self.password = password self.verbose = True self.channels = channels or [] self.stack = [] import threading self.sending = threading.RLock() # def push(self, *args, **kargs): # asynchat.async_chat.push(self, *args, **kargs) def __write(self, args, text=None): # print '%r %r %r' % (self, args, text) try: if text is not None: self.push((b' '.join(args) + b' :' + text)[:512] + b'\r\n') else: self.push(b' '.join(args)[:512] + b'\r\n') except IndexError: pass def write(self, args, text=None): print(args, text) # This is a safe version of __write def safe(input): input = input.replace('\n', '') input = input.replace('\r', '') return input.encode('utf-8') try: args = [safe(arg) for arg in args] if text is not None: text = safe(text) self.__write(args, text) except Exception as e: pass def run(self, host, port=6667, ssl=False, ipv6=False): self.initiate_connect(host, port, ssl, ipv6) def initiate_connect(self, host, port, use_ssl, ipv6): if self.verbose: message = 'Connecting to %s:%s...' % (host, port) print(message, end=' ', file=sys.stderr) if ipv6 and socket.has_ipv6: af = socket.AF_INET6 else: af = socket.AF_INET self.create_socket(af, socket.SOCK_STREAM, use_ssl) self.connect((host, port)) try: asyncore.loop() except KeyboardInterrupt: sys.exit() def create_socket(self, family, type, use_ssl=False): self.family_and_type = family, type sock = socket.socket(family, type) if use_ssl: sock = ssl.wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLSv1) # FIXME: ssl module does not appear to work properly with nonblocking sockets #sock.setblocking(0) self.set_socket(sock) def handle_connect(self): if self.verbose: print('connected!', file=sys.stderr) if self.password: self.write(('PASS', self.password)) self.write(('NICK', self.nick)) self.write(('USER', self.user, '+iw', self.nick), self.name) def handle_close(self): self.close() print('Closed!', file=sys.stderr) def collect_incoming_data(self, data): self.buffer += data def found_terminator(self): line = self.buffer if line.endswith(b'\r'): line = line[:-1] self.buffer = b'' # print line if line.startswith(b':'): source, line = line[1:].split(b' ', 1) else: source = None if b' :' in line: argstr, text = line.split(b' :', 1) else: argstr, text = line, b'' args = argstr.split() origin = Origin(self, source, args) self.dispatch(origin, tuple([text] + args)) if args[0] == b'PING': self.write(('PONG', text)) def dispatch(self, origin, args): pass def msg(self, recipient, text): self.sending.acquire() # Cf. http://swhack.com/logs/2006-03-01#T19-43-25 if isinstance(text, str): try: text = text.encode('utf-8') except UnicodeEncodeError as e: text = e.__class__ + ': ' + str(e) if isinstance(recipient, str): try: recipient = recipient.encode('utf-8') except UnicodeEncodeError as e: return # No messages within the last 3 seconds? Go ahead! # Otherwise, wait so it's been at least 0.8 seconds + penalty if self.stack: elapsed = time.time() - self.stack[-1][0] if elapsed < 3: penalty = float(max(0, len(text) - 50)) / 70 wait = 0.8 + penalty if elapsed < wait: time.sleep(wait - elapsed) # Loop detection messages = [m[1] for m in self.stack[-8:]] if messages.count(text) >= 5: text = '...' if messages.count('...') >= 3: self.sending.release() return def safe(input): input = input.replace(b'\n', b'') return input.replace(b'\r', b'') self.__write((b'PRIVMSG', safe(recipient)), safe(text)) self.stack.append((time.time(), text)) self.stack = self.stack[-10:] self.sending.release() def action(self, recipient, text): text = "ACTION %s" % text textu = chr(1) + text + chr(1) return self.msg(recipient, textu) def notice(self, dest, text): self.write((b'NOTICE', dest), text) def error(self, origin): try: import traceback trace = traceback.format_exc() print(trace) lines = list(reversed(trace.splitlines())) report = [lines[0].strip()] for line in lines: line = line.strip() if line.startswith('File "/'): report.append(line[0].lower() + line[1:]) break else: report.append('source unknown') self.msg(origin.sender, report[0] + ' (' + report[1] + ')') except: self.msg(origin.sender, "Got an error.") class TestBot(Bot): def f_ping(self, origin, match, args): delay = m.group(1) if delay is not None: import time time.sleep(int(delay)) self.msg(origin.sender, 'pong (%s)' % delay) else: self.msg(origin.sender, 'pong') f_ping.rule = r'^\.ping(?:[ \t]+(\d+))?$' def main(): bot = TestBot('testbot007', 'testbot007', ['#wadsworth']) bot.run('irc.freenode.net') print(__doc__) if __name__=="__main__": main()