No edit summary |
No edit summary |
||
Line 2: | Line 2: | ||
(>•.•<)<br/> | (>•.•<)<br/> | ||
<nowiki> (") (")</nowiki> | <nowiki> (") (")</nowiki> | ||
== @_pil_bot_ == | |||
=== pilbot.py === | |||
<source lang="python"> | |||
# -*- coding: utf-8 -*- # | |||
from twitterbot import TwitterBot | |||
import keys | |||
from PIL import Image | |||
from sys import argv | |||
from random import random | |||
from pileval import PILEval | |||
class PilBot(TwitterBot): | |||
def __init__(self): | |||
TwitterBot.__init__(self) | |||
self.pillermann = PILEval() | |||
def bot_init(self): | |||
""" Initialize and configure the bot """ | |||
############################ | |||
# REQUIRED: LOGIN DETAILS! # | |||
############################ | |||
self.config['api_key'] = keys.consumer_key | |||
self.config['api_secret'] = keys.consumer_secret | |||
self.config['access_key'] = keys.access_token | |||
self.config['access_secret'] = keys.access_token_secret | |||
###################################### | |||
# SEMI-OPTIONAL: OTHER CONFIG STUFF! # | |||
###################################### | |||
# how often to tweet, in seconds | |||
self.config['tweet_interval'] = 1 * 5 # default: 1 minutes | |||
# use this to define a (min, max) random range of how often to tweet | |||
# e.g., self.config['tweet_interval_range'] = (5*60, 10*60) # tweets every 5-10 minutes | |||
self.config['tweet_interval_range'] = None | |||
# only reply to tweets that specifically mention the bot | |||
self.config['reply_direct_mention_only'] = True | |||
# only include bot followers (and original tweeter) in @-replies | |||
self.config['reply_followers_only'] = False | |||
# fav any tweets that mention this bot? | |||
self.config['autofav_mentions'] = False | |||
# fav any tweets containing these keywords? | |||
self.config['autofav_keywords'] = [] | |||
# follow back all followers? | |||
self.config['autofollow'] = False | |||
def on_scheduled_tweet(self): | |||
""" Make a public tweet to the bot's own timeline. """ | |||
# We might take senteces from somewhere and tweet them on a regular basis ... | |||
pass # don't do anything here ... | |||
def on_mention(self, tweet, prefix): | |||
""" Actions to take when a mention is received. """ | |||
text = tweet.text.split()[1] | |||
self.action(text) | |||
def on_timeline(self, tweet, prefix): | |||
""" Actions to take on a timeline tweet. """ | |||
pass | |||
def action(self, code): | |||
result = self.pillermann.eval(code) | |||
if result: # post the image | |||
self.post_tweet(code, media="img.png", file=self.pillermann.get_file()) | |||
else: # post the error | |||
self.post_tweet(self.pillermann.last_error) | |||
if __name__ == '__main__': | |||
bot = PilBot() | |||
if len(argv) == 1: | |||
bot.run() | |||
else: | |||
tweet_text = "text((300, 400), 'botsNplots', fill='#ff0')" | |||
bot.action(tweet_text) | |||
</source> | |||
=== pileval.py === | |||
<source lang="python"> | |||
from PIL import Image, ImageDraw | |||
from os import path | |||
from io import BytesIO | |||
class PILEval(): | |||
def __init__(self, filename='test.png'): | |||
self.filename = filename | |||
try: | |||
self.image = Image.open(filename) | |||
except FileNotFoundError as e: | |||
self.image = Image.new("RGBA", (640, 480), 'white') | |||
self.last_error = '' | |||
def __repr__(self): | |||
return "<PILEval: %s>" % self.filename | |||
def eval(self, code): | |||
draw = ImageDraw.Draw(self.image, "RGBA") | |||
code = "draw.%s" % code | |||
try: | |||
code_obj = compile(code, '<string>', 'exec') | |||
exec(code_obj) | |||
except Exception as e: | |||
self.last_error = str(e) | |||
return False | |||
self.image.save(self.filename) | |||
return True | |||
def get_file(self): | |||
img = Image.open(self.filename) | |||
file = BytesIO() | |||
img.save(file, format='PNG') | |||
return file | |||
if __name__ == '__main__': | |||
# some q'n'dirty tests | |||
print("main") | |||
pillermann = PILEval() | |||
# will work | |||
# result = pillermann.eval("rectangle([20, 10, 120, 200], fill='#f00')") | |||
# if result: | |||
# print('good') | |||
# will also work | |||
# result = pillermann.eval("ellipse([200, 50, 500, 500], fill='#def')") | |||
# result = pillermann.eval("point((123, 234), fill='#345')") | |||
# result = pillermann.eval("line([23, 34, 345, 456], fill='#eca', width=10)") | |||
# result = pillermann.eval("arc([500, 10, 600, 440], 10, 95, fill='#3f9')") | |||
# result = pillermann.eval("text((22, 202), '_pil_bot_!', fill='#a4f')") | |||
result = pillermann.eval("text((100, 100), text=code, fill='#000')") | |||
if result: | |||
print("ok") | |||
else: | |||
print(pillermann.last_error) | |||
# will yield a compiler error | |||
# result = pillermann.eval("rectangle([20, 10, 120, 200], fill='#f00'") | |||
# if not result: | |||
# print("error result: %s" % pillermann.last_error) | |||
# f = pillermann.get_file() | |||
# print(f) | |||
</source> | |||
=== twitterbot/bot.py === | |||
adapts thricedotted's (http://lczzz.me) twitterbot for python 3 | |||
<source lang="python"> | |||
import os | |||
import codecs | |||
import json | |||
import logging | |||
import tweepy | |||
import time | |||
import re | |||
import random | |||
import pickle | |||
def ignore(method): | |||
""" | |||
Use the @ignore decorator on TwitterBot methods you wish to leave | |||
unimplemented, such as on_timeline and on_mention. | |||
""" | |||
method.not_implemented = True | |||
return method | |||
class TwitterBot: | |||
def __init__(self): | |||
self.config = {} | |||
self.custom_handlers = [] | |||
self.config['reply_direct_mention_only'] = False | |||
self.config['reply_followers_only'] = True | |||
self.config['autofav_mentions'] = False | |||
self.config['autofav_keywords'] = [] | |||
self.config['autofollow'] = False | |||
self.config['tweet_interval'] = 30 * 60 | |||
self.config['tweet_interval_range'] = None | |||
self.config['reply_interval'] = 10 | |||
self.config['reply_interval_range'] = None | |||
self.config['ignore_timeline_mentions'] = True | |||
self.config['logging_level'] = logging.DEBUG | |||
self.config['storage'] = FileStorage() | |||
self.state = {} | |||
# call the custom initialization | |||
self.bot_init() | |||
auth = tweepy.OAuthHandler(self.config['api_key'], self.config['api_secret']) | |||
auth.set_access_token(self.config['access_key'], self.config['access_secret']) | |||
self.api = tweepy.API(auth) | |||
self.id = self.api.me().id | |||
self.screen_name = self.api.me().screen_name | |||
logging.basicConfig(format='%(asctime)s | %(levelname)s: %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', | |||
filename=self.screen_name + '.log', | |||
level=self.config['logging_level']) | |||
logging.info('Initializing bot...') | |||
try: | |||
with self.config['storage'].read(self.screen_name) as f: | |||
self.state = pickle.load(f) | |||
except IOError: | |||
self.state['last_timeline_id'] = 1 | |||
self.state['last_mention_id'] = 1 | |||
self.state['last_timeline_time'] = 0 | |||
self.state['last_mention_time'] = 0 | |||
self.state['last_tweet_id'] = 1 | |||
self.state['last_tweet_time'] = 1 | |||
self.state['last_reply_id'] = 0 | |||
self.state['last_reply_time'] = 0 | |||
self.state['recent_timeline'] = [] | |||
self.state['mention_queue'] = [] | |||
self.state['friends'] = self.api.friends_ids(self.id) | |||
self.state['followers'] = self.api.followers_ids(self.id) | |||
self.state['new_followers'] = [] | |||
self.state['last_follow_check'] = 0 | |||
logging.info('Bot initialized!') | |||
def bot_init(self): | |||
""" | |||
Initialize custom state values for your bot. | |||
""" | |||
raise NotImplementedError("You MUST have bot_init() implemented in your bot! What have you DONE!") | |||
def log(self, message, level=logging.INFO): | |||
if level == logging.ERROR: | |||
logging.error(message) | |||
else: | |||
logging.info(message) | |||
def _log_tweepy_error(self, message, e): | |||
try: | |||
e_message = e.message[0]['message'] | |||
code = e.message[0]['code'] | |||
self.log("{}: {} ({})".format(message, e_message, code), level=logging.ERROR) | |||
except: | |||
self.log(message, e) | |||
def _tweet_url(self, tweet): | |||
return "http://twitter.com/" + tweet.author.screen_name + "/status/" + str(tweet.id) | |||
def _save_state(self): | |||
with self.config['storage'].write(self.screen_name) as f: | |||
pickle.dump(self.state, f) | |||
self.log('Bot state saved') | |||
def on_scheduled_tweet(self): | |||
""" | |||
Post a general tweet to own timeline. | |||
""" | |||
#self.post_tweet(text) | |||
raise NotImplementedError("You need to implement this to tweet to timeline (or pass if you don't want to)!") | |||
def on_mention(self, tweet, prefix): | |||
""" | |||
Perform some action upon receiving a mention. | |||
""" | |||
#self.post_tweet(text) | |||
raise NotImplementedError("You need to implement this to reply to/fav mentions (or pass if you don't want to)!") | |||
def on_timeline(self, tweet, prefix): | |||
""" | |||
Perform some action on a tweet on the timeline. | |||
""" | |||
#self.post_tweet(text) | |||
raise NotImplementedError("You need to implement this to reply to/fav timeline tweets (or pass if you don't want to)!") | |||
def on_follow(self, f_id): | |||
""" | |||
Perform some action when followed. | |||
""" | |||
if self.config['autofollow']: | |||
try: | |||
self.api.create_friendship(f_id, follow=True) | |||
self.state['friends'].append(f_id) | |||
logging.info('Followed user id {}'.format(f_id)) | |||
except tweepy.TweepError as e: | |||
self._log_tweepy_error('Unable to follow user', e) | |||
time.sleep(3) | |||
self.state['followers'].append(f_id) | |||
def post_tweet(self, text, reply_to=None, media=None, file=None): | |||
""" | |||
Post a tweet containing the text. | |||
If you provide a filename the file will be added to the tweet. | |||
You can also provide the filedata, for example when using dynamically created images. | |||
""" | |||
kwargs = { "status": text } | |||
args = [] | |||
if media is not None: | |||
cmd = self.api.update_with_media | |||
args.insert(0, media) | |||
if file is not None: | |||
kwargs["file"] = file | |||
else: | |||
cmd = self.api.update_status | |||
try: | |||
self.log('Tweeting "{}"'.format(text)) | |||
if reply_to: | |||
self.log("-- Responding to status {}".format(self._tweet_url(reply_to))) | |||
kwargs['in_reply_to_status_id'] = reply_to.id | |||
else: | |||
self.log("-- Posting to own timeline") | |||
tweet = cmd(*args, **kwargs) | |||
self.log('Status posted at {}'.format(self._tweet_url(tweet))) | |||
return True | |||
except tweepy.TweepError as e: | |||
self._log_tweepy_error('Can\'t post status', e) | |||
return False | |||
def favorite_tweet(self, tweet): | |||
try: | |||
logging.info('Faving ' + self._tweet_url(tweet)) | |||
self.api.create_favorite(tweet.id) | |||
except tweepy.TweepError as e: | |||
self._log_tweepy_error('Can\'t fav status', e) | |||
def _ignore_method(self, method): | |||
return hasattr(method, 'not_implemented') and method.not_implemented | |||
def _handle_timeline(self): | |||
""" | |||
Reads the latest tweets in the bots timeline and perform some action. | |||
self.recent_timeline | |||
""" | |||
for tweet in self.state['recent_timeline']: | |||
prefix = self.get_mention_prefix(tweet) | |||
self.on_timeline(tweet, prefix) | |||
words = tweet.text.lower().split() | |||
if any(w in words for w in self.config['autofav_keywords']): | |||
self.favorite_tweet(tweet) | |||
#time.sleep(self.config['reply_interval']) | |||
def _handle_mentions(self): | |||
""" | |||
Performs some action on the mentions in self.mention_queue | |||
""" | |||
# TODO: only handle a certain number of mentions at a time? | |||
for mention in iter(self.state['mention_queue']): | |||
prefix = self.get_mention_prefix(mention) | |||
self.on_mention(mention, prefix) | |||
self.state['mention_queue'].remove(mention) | |||
if self.config['autofav_mentions']: | |||
self.favorite_tweet(mention) | |||
#time.sleep(self.config['reply_interval']) | |||
def get_mention_prefix(self, tweet): | |||
""" | |||
Returns a string of users to @-mention when responding to a tweet. | |||
""" | |||
mention_back = ['@' + tweet.author.screen_name] | |||
mention_back += [s for s in re.split('[^@\w]', tweet.text) if len(s) > 2 and s[0] == '@' and s[1:] != self.screen_name] | |||
if self.config['reply_followers_only']: | |||
mention_back = [s for s in mention_back if s[1:] in self.state['followers'] or s == '@' + tweet.author.screen_name] | |||
return ' '.join(mention_back) | |||
def _check_mentions(self): | |||
""" | |||
Checks mentions and loads most recent tweets into the mention queue | |||
""" | |||
if self._ignore_method(self.on_mention): | |||
logging.debug('Ignoring mentions') | |||
return | |||
try: | |||
current_mentions = self.api.mentions_timeline(since_id=self.state['last_mention_id'], count=100) | |||
# direct mentions only? | |||
if self.config['reply_direct_mention_only']: | |||
current_mentions = [t for t in current_mentions if re.split('[^@\w]', t.text)[0] == '@' + self.screen_name] | |||
if len(current_mentions) != 0: | |||
self.state['last_mention_id'] = current_mentions[0].id | |||
self.state['last_mention_time'] = time.time() | |||
self.state['mention_queue'] += reversed(current_mentions) | |||
logging.info('Mentions updated ({} retrieved, {} total in queue)'.format(len(current_mentions), len(self.state['mention_queue']))) | |||
except tweepy.TweepError as e: | |||
self._log_tweepy_error('Can\'t retrieve mentions', e) | |||
except IncompleteRead as e: | |||
self.log('Incomplete read error -- skipping mentions update') | |||
def _check_timeline(self): | |||
""" | |||
Checks timeline and loads most recent tweets into recent timeline | |||
""" | |||
if self._ignore_method(self.on_timeline): | |||
logging.debug('Ignoring timeline') | |||
return | |||
try: | |||
current_timeline = self.api.home_timeline(count=200, since_id=self.state['last_timeline_id']) | |||
# remove my tweets | |||
current_timeline = [t for t in current_timeline if t.author.screen_name.lower() != self.screen_name.lower()] | |||
# remove all tweets mentioning me | |||
current_timeline = [t for t in current_timeline if not re.search('@'+self.screen_name, t.text, flags=re.IGNORECASE)] | |||
if self.config['ignore_timeline_mentions']: | |||
# remove all tweets with mentions (heuristically) | |||
current_timeline = [t for t in current_timeline if '@' not in t.text] | |||
if len(current_timeline) != 0: | |||
self.state['last_timeline_id'] = current_timeline[0].id | |||
self.state['last_timeline_time'] = time.time() | |||
self.state['recent_timeline'] = list(reversed(current_timeline)) | |||
logging.info('Timeline updated ({} retrieved)'.format(len(current_timeline))) | |||
except tweepy.TweepError as e: | |||
self._log_tweepy_error('Can\'t retrieve timeline', e) | |||
except IncompleteRead as e: | |||
self.log('Incomplete read error -- skipping timeline update') | |||
def _check_followers(self): | |||
""" | |||
Checks followers. | |||
""" | |||
logging.info("Checking for new followers...") | |||
try: | |||
self.state['new_followers'] = [f_id for f_id in self.api.followers_ids(self.id) if f_id not in self.state['followers']] | |||
self.config['last_follow_check'] = time.time() | |||
except tweepy.TweepError as e: | |||
self._log_tweepy_error('Can\'t update followers', e) | |||
except IncompleteRead as e: | |||
self.log('Incomplete read error -- skipping followers update') | |||
def _handle_followers(self): | |||
""" | |||
Handles new followers. | |||
""" | |||
for f_id in self.state['new_followers']: | |||
self.on_follow(f_id) | |||
def register_custom_handler(self, action, interval): | |||
""" | |||
Register a custom action to run at some interval. | |||
""" | |||
handler = {} | |||
handler['action'] = action | |||
handler['interval'] = interval | |||
handler['last_run'] = 0 | |||
self.custom_handlers.append(handler) | |||
def run(self): | |||
""" | |||
Runs the bot! This probably shouldn't be in a "while True" lol. | |||
""" | |||
while True: | |||
# check followers every 15 minutes | |||
#if self.autofollow and (time.time() - self.last_follow_check) > (15 * 60): | |||
if self.state['last_follow_check'] > (15 * 60): | |||
self._check_followers() | |||
self._handle_followers() | |||
# check mentions every minute-ish | |||
#if self.reply_to_mentions and (time.time() - self.last_mention_time) > 60: | |||
if (time.time() - self.state['last_mention_time']) > 60: | |||
self._check_mentions() | |||
self._handle_mentions() | |||
# tweet to timeline | |||
#if self.reply_to_timeline and (time.time() - self.last_mention_time) > 60: | |||
if (time.time() - self.state['last_timeline_time']) > 60: | |||
self._check_timeline() | |||
self._handle_timeline() | |||
# tweet to timeline on the correct interval | |||
if (time.time() - self.state['last_tweet_time']) > self.config['tweet_interval']: | |||
self.on_scheduled_tweet() | |||
# TODO: maybe this should only run if the above is successful... | |||
if self.config['tweet_interval_range'] is not None: | |||
self.config['tweet_interval'] = random.randint(*self.config['tweet_interval_range']) | |||
self.log("Next tweet in {} seconds".format(self.config['tweet_interval'])) | |||
self.state['last_tweet_time'] = time.time() | |||
# run custom action | |||
for handler in self.custom_handlers: | |||
if (time.time() - handler['last_run']) > handler['interval']: | |||
handler['action']() | |||
handler['last_run'] = time.time() | |||
# save current state | |||
self._save_state() | |||
logging.info("Sleeping for a bit...") | |||
time.sleep(30) | |||
class FileStorage(object): | |||
""" | |||
Default storage adapter. | |||
Adapters must implement two methods: read(name) and write(name). | |||
""" | |||
def read(self, name): | |||
""" | |||
Return an IO-like object that will produce binary data when read from. | |||
If nothing is stored under the given name, raise IOError. | |||
""" | |||
filename = self._get_filename(name) | |||
if os.path.exists(filename): | |||
logging.debug("Reading from {}".format(filename)) | |||
else: | |||
logging.debug("{} doesn't exist".format(filename)) | |||
return open(filename, 'rb') | |||
def write(self, name): | |||
""" | |||
Return an IO-like object that will store binary data written to it. | |||
""" | |||
filename = self._get_filename(name) | |||
if os.path.exists(filename): | |||
logging.debug("Overwriting {}".format(filename)) | |||
else: | |||
logging.debug("Creating {}".format(filename)) | |||
return open(filename, 'wb') | |||
def _get_filename(self, name): | |||
return '{}_state.pkl'.format(name) | |||
</source> |
Revision as of 15:53, 17 June 2015
( ) ( )
(>•.•<)
(") (")
@_pil_bot_
pilbot.py
# -*- coding: utf-8 -*- #
from twitterbot import TwitterBot
import keys
from PIL import Image
from sys import argv
from random import random
from pileval import PILEval
class PilBot(TwitterBot):
def __init__(self):
TwitterBot.__init__(self)
self.pillermann = PILEval()
def bot_init(self):
""" Initialize and configure the bot """
############################
# REQUIRED: LOGIN DETAILS! #
############################
self.config['api_key'] = keys.consumer_key
self.config['api_secret'] = keys.consumer_secret
self.config['access_key'] = keys.access_token
self.config['access_secret'] = keys.access_token_secret
######################################
# SEMI-OPTIONAL: OTHER CONFIG STUFF! #
######################################
# how often to tweet, in seconds
self.config['tweet_interval'] = 1 * 5 # default: 1 minutes
# use this to define a (min, max) random range of how often to tweet
# e.g., self.config['tweet_interval_range'] = (5*60, 10*60) # tweets every 5-10 minutes
self.config['tweet_interval_range'] = None
# only reply to tweets that specifically mention the bot
self.config['reply_direct_mention_only'] = True
# only include bot followers (and original tweeter) in @-replies
self.config['reply_followers_only'] = False
# fav any tweets that mention this bot?
self.config['autofav_mentions'] = False
# fav any tweets containing these keywords?
self.config['autofav_keywords'] = []
# follow back all followers?
self.config['autofollow'] = False
def on_scheduled_tweet(self):
""" Make a public tweet to the bot's own timeline. """
# We might take senteces from somewhere and tweet them on a regular basis ...
pass # don't do anything here ...
def on_mention(self, tweet, prefix):
""" Actions to take when a mention is received. """
text = tweet.text.split()[1]
self.action(text)
def on_timeline(self, tweet, prefix):
""" Actions to take on a timeline tweet. """
pass
def action(self, code):
result = self.pillermann.eval(code)
if result: # post the image
self.post_tweet(code, media="img.png", file=self.pillermann.get_file())
else: # post the error
self.post_tweet(self.pillermann.last_error)
if __name__ == '__main__':
bot = PilBot()
if len(argv) == 1:
bot.run()
else:
tweet_text = "text((300, 400), 'botsNplots', fill='#ff0')"
bot.action(tweet_text)
pileval.py
from PIL import Image, ImageDraw
from os import path
from io import BytesIO
class PILEval():
def __init__(self, filename='test.png'):
self.filename = filename
try:
self.image = Image.open(filename)
except FileNotFoundError as e:
self.image = Image.new("RGBA", (640, 480), 'white')
self.last_error = ''
def __repr__(self):
return "<PILEval: %s>" % self.filename
def eval(self, code):
draw = ImageDraw.Draw(self.image, "RGBA")
code = "draw.%s" % code
try:
code_obj = compile(code, '<string>', 'exec')
exec(code_obj)
except Exception as e:
self.last_error = str(e)
return False
self.image.save(self.filename)
return True
def get_file(self):
img = Image.open(self.filename)
file = BytesIO()
img.save(file, format='PNG')
return file
if __name__ == '__main__':
# some q'n'dirty tests
print("main")
pillermann = PILEval()
# will work
# result = pillermann.eval("rectangle([20, 10, 120, 200], fill='#f00')")
# if result:
# print('good')
# will also work
# result = pillermann.eval("ellipse([200, 50, 500, 500], fill='#def')")
# result = pillermann.eval("point((123, 234), fill='#345')")
# result = pillermann.eval("line([23, 34, 345, 456], fill='#eca', width=10)")
# result = pillermann.eval("arc([500, 10, 600, 440], 10, 95, fill='#3f9')")
# result = pillermann.eval("text((22, 202), '_pil_bot_!', fill='#a4f')")
result = pillermann.eval("text((100, 100), text=code, fill='#000')")
if result:
print("ok")
else:
print(pillermann.last_error)
# will yield a compiler error
# result = pillermann.eval("rectangle([20, 10, 120, 200], fill='#f00'")
# if not result:
# print("error result: %s" % pillermann.last_error)
# f = pillermann.get_file()
# print(f)
twitterbot/bot.py
adapts thricedotted's (http://lczzz.me) twitterbot for python 3
import os
import codecs
import json
import logging
import tweepy
import time
import re
import random
import pickle
def ignore(method):
"""
Use the @ignore decorator on TwitterBot methods you wish to leave
unimplemented, such as on_timeline and on_mention.
"""
method.not_implemented = True
return method
class TwitterBot:
def __init__(self):
self.config = {}
self.custom_handlers = []
self.config['reply_direct_mention_only'] = False
self.config['reply_followers_only'] = True
self.config['autofav_mentions'] = False
self.config['autofav_keywords'] = []
self.config['autofollow'] = False
self.config['tweet_interval'] = 30 * 60
self.config['tweet_interval_range'] = None
self.config['reply_interval'] = 10
self.config['reply_interval_range'] = None
self.config['ignore_timeline_mentions'] = True
self.config['logging_level'] = logging.DEBUG
self.config['storage'] = FileStorage()
self.state = {}
# call the custom initialization
self.bot_init()
auth = tweepy.OAuthHandler(self.config['api_key'], self.config['api_secret'])
auth.set_access_token(self.config['access_key'], self.config['access_secret'])
self.api = tweepy.API(auth)
self.id = self.api.me().id
self.screen_name = self.api.me().screen_name
logging.basicConfig(format='%(asctime)s | %(levelname)s: %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p',
filename=self.screen_name + '.log',
level=self.config['logging_level'])
logging.info('Initializing bot...')
try:
with self.config['storage'].read(self.screen_name) as f:
self.state = pickle.load(f)
except IOError:
self.state['last_timeline_id'] = 1
self.state['last_mention_id'] = 1
self.state['last_timeline_time'] = 0
self.state['last_mention_time'] = 0
self.state['last_tweet_id'] = 1
self.state['last_tweet_time'] = 1
self.state['last_reply_id'] = 0
self.state['last_reply_time'] = 0
self.state['recent_timeline'] = []
self.state['mention_queue'] = []
self.state['friends'] = self.api.friends_ids(self.id)
self.state['followers'] = self.api.followers_ids(self.id)
self.state['new_followers'] = []
self.state['last_follow_check'] = 0
logging.info('Bot initialized!')
def bot_init(self):
"""
Initialize custom state values for your bot.
"""
raise NotImplementedError("You MUST have bot_init() implemented in your bot! What have you DONE!")
def log(self, message, level=logging.INFO):
if level == logging.ERROR:
logging.error(message)
else:
logging.info(message)
def _log_tweepy_error(self, message, e):
try:
e_message = e.message[0]['message']
code = e.message[0]['code']
self.log("{}: {} ({})".format(message, e_message, code), level=logging.ERROR)
except:
self.log(message, e)
def _tweet_url(self, tweet):
return "http://twitter.com/" + tweet.author.screen_name + "/status/" + str(tweet.id)
def _save_state(self):
with self.config['storage'].write(self.screen_name) as f:
pickle.dump(self.state, f)
self.log('Bot state saved')
def on_scheduled_tweet(self):
"""
Post a general tweet to own timeline.
"""
#self.post_tweet(text)
raise NotImplementedError("You need to implement this to tweet to timeline (or pass if you don't want to)!")
def on_mention(self, tweet, prefix):
"""
Perform some action upon receiving a mention.
"""
#self.post_tweet(text)
raise NotImplementedError("You need to implement this to reply to/fav mentions (or pass if you don't want to)!")
def on_timeline(self, tweet, prefix):
"""
Perform some action on a tweet on the timeline.
"""
#self.post_tweet(text)
raise NotImplementedError("You need to implement this to reply to/fav timeline tweets (or pass if you don't want to)!")
def on_follow(self, f_id):
"""
Perform some action when followed.
"""
if self.config['autofollow']:
try:
self.api.create_friendship(f_id, follow=True)
self.state['friends'].append(f_id)
logging.info('Followed user id {}'.format(f_id))
except tweepy.TweepError as e:
self._log_tweepy_error('Unable to follow user', e)
time.sleep(3)
self.state['followers'].append(f_id)
def post_tweet(self, text, reply_to=None, media=None, file=None):
"""
Post a tweet containing the text.
If you provide a filename the file will be added to the tweet.
You can also provide the filedata, for example when using dynamically created images.
"""
kwargs = { "status": text }
args = []
if media is not None:
cmd = self.api.update_with_media
args.insert(0, media)
if file is not None:
kwargs["file"] = file
else:
cmd = self.api.update_status
try:
self.log('Tweeting "{}"'.format(text))
if reply_to:
self.log("-- Responding to status {}".format(self._tweet_url(reply_to)))
kwargs['in_reply_to_status_id'] = reply_to.id
else:
self.log("-- Posting to own timeline")
tweet = cmd(*args, **kwargs)
self.log('Status posted at {}'.format(self._tweet_url(tweet)))
return True
except tweepy.TweepError as e:
self._log_tweepy_error('Can\'t post status', e)
return False
def favorite_tweet(self, tweet):
try:
logging.info('Faving ' + self._tweet_url(tweet))
self.api.create_favorite(tweet.id)
except tweepy.TweepError as e:
self._log_tweepy_error('Can\'t fav status', e)
def _ignore_method(self, method):
return hasattr(method, 'not_implemented') and method.not_implemented
def _handle_timeline(self):
"""
Reads the latest tweets in the bots timeline and perform some action.
self.recent_timeline
"""
for tweet in self.state['recent_timeline']:
prefix = self.get_mention_prefix(tweet)
self.on_timeline(tweet, prefix)
words = tweet.text.lower().split()
if any(w in words for w in self.config['autofav_keywords']):
self.favorite_tweet(tweet)
#time.sleep(self.config['reply_interval'])
def _handle_mentions(self):
"""
Performs some action on the mentions in self.mention_queue
"""
# TODO: only handle a certain number of mentions at a time?
for mention in iter(self.state['mention_queue']):
prefix = self.get_mention_prefix(mention)
self.on_mention(mention, prefix)
self.state['mention_queue'].remove(mention)
if self.config['autofav_mentions']:
self.favorite_tweet(mention)
#time.sleep(self.config['reply_interval'])
def get_mention_prefix(self, tweet):
"""
Returns a string of users to @-mention when responding to a tweet.
"""
mention_back = ['@' + tweet.author.screen_name]
mention_back += [s for s in re.split('[^@\w]', tweet.text) if len(s) > 2 and s[0] == '@' and s[1:] != self.screen_name]
if self.config['reply_followers_only']:
mention_back = [s for s in mention_back if s[1:] in self.state['followers'] or s == '@' + tweet.author.screen_name]
return ' '.join(mention_back)
def _check_mentions(self):
"""
Checks mentions and loads most recent tweets into the mention queue
"""
if self._ignore_method(self.on_mention):
logging.debug('Ignoring mentions')
return
try:
current_mentions = self.api.mentions_timeline(since_id=self.state['last_mention_id'], count=100)
# direct mentions only?
if self.config['reply_direct_mention_only']:
current_mentions = [t for t in current_mentions if re.split('[^@\w]', t.text)[0] == '@' + self.screen_name]
if len(current_mentions) != 0:
self.state['last_mention_id'] = current_mentions[0].id
self.state['last_mention_time'] = time.time()
self.state['mention_queue'] += reversed(current_mentions)
logging.info('Mentions updated ({} retrieved, {} total in queue)'.format(len(current_mentions), len(self.state['mention_queue'])))
except tweepy.TweepError as e:
self._log_tweepy_error('Can\'t retrieve mentions', e)
except IncompleteRead as e:
self.log('Incomplete read error -- skipping mentions update')
def _check_timeline(self):
"""
Checks timeline and loads most recent tweets into recent timeline
"""
if self._ignore_method(self.on_timeline):
logging.debug('Ignoring timeline')
return
try:
current_timeline = self.api.home_timeline(count=200, since_id=self.state['last_timeline_id'])
# remove my tweets
current_timeline = [t for t in current_timeline if t.author.screen_name.lower() != self.screen_name.lower()]
# remove all tweets mentioning me
current_timeline = [t for t in current_timeline if not re.search('@'+self.screen_name, t.text, flags=re.IGNORECASE)]
if self.config['ignore_timeline_mentions']:
# remove all tweets with mentions (heuristically)
current_timeline = [t for t in current_timeline if '@' not in t.text]
if len(current_timeline) != 0:
self.state['last_timeline_id'] = current_timeline[0].id
self.state['last_timeline_time'] = time.time()
self.state['recent_timeline'] = list(reversed(current_timeline))
logging.info('Timeline updated ({} retrieved)'.format(len(current_timeline)))
except tweepy.TweepError as e:
self._log_tweepy_error('Can\'t retrieve timeline', e)
except IncompleteRead as e:
self.log('Incomplete read error -- skipping timeline update')
def _check_followers(self):
"""
Checks followers.
"""
logging.info("Checking for new followers...")
try:
self.state['new_followers'] = [f_id for f_id in self.api.followers_ids(self.id) if f_id not in self.state['followers']]
self.config['last_follow_check'] = time.time()
except tweepy.TweepError as e:
self._log_tweepy_error('Can\'t update followers', e)
except IncompleteRead as e:
self.log('Incomplete read error -- skipping followers update')
def _handle_followers(self):
"""
Handles new followers.
"""
for f_id in self.state['new_followers']:
self.on_follow(f_id)
def register_custom_handler(self, action, interval):
"""
Register a custom action to run at some interval.
"""
handler = {}
handler['action'] = action
handler['interval'] = interval
handler['last_run'] = 0
self.custom_handlers.append(handler)
def run(self):
"""
Runs the bot! This probably shouldn't be in a "while True" lol.
"""
while True:
# check followers every 15 minutes
#if self.autofollow and (time.time() - self.last_follow_check) > (15 * 60):
if self.state['last_follow_check'] > (15 * 60):
self._check_followers()
self._handle_followers()
# check mentions every minute-ish
#if self.reply_to_mentions and (time.time() - self.last_mention_time) > 60:
if (time.time() - self.state['last_mention_time']) > 60:
self._check_mentions()
self._handle_mentions()
# tweet to timeline
#if self.reply_to_timeline and (time.time() - self.last_mention_time) > 60:
if (time.time() - self.state['last_timeline_time']) > 60:
self._check_timeline()
self._handle_timeline()
# tweet to timeline on the correct interval
if (time.time() - self.state['last_tweet_time']) > self.config['tweet_interval']:
self.on_scheduled_tweet()
# TODO: maybe this should only run if the above is successful...
if self.config['tweet_interval_range'] is not None:
self.config['tweet_interval'] = random.randint(*self.config['tweet_interval_range'])
self.log("Next tweet in {} seconds".format(self.config['tweet_interval']))
self.state['last_tweet_time'] = time.time()
# run custom action
for handler in self.custom_handlers:
if (time.time() - handler['last_run']) > handler['interval']:
handler['action']()
handler['last_run'] = time.time()
# save current state
self._save_state()
logging.info("Sleeping for a bit...")
time.sleep(30)
class FileStorage(object):
"""
Default storage adapter.
Adapters must implement two methods: read(name) and write(name).
"""
def read(self, name):
"""
Return an IO-like object that will produce binary data when read from.
If nothing is stored under the given name, raise IOError.
"""
filename = self._get_filename(name)
if os.path.exists(filename):
logging.debug("Reading from {}".format(filename))
else:
logging.debug("{} doesn't exist".format(filename))
return open(filename, 'rb')
def write(self, name):
"""
Return an IO-like object that will store binary data written to it.
"""
filename = self._get_filename(name)
if os.path.exists(filename):
logging.debug("Overwriting {}".format(filename))
else:
logging.debug("Creating {}".format(filename))
return open(filename, 'wb')
def _get_filename(self, name):
return '{}_state.pkl'.format(name)