A Python Mastodon bot that's geared towards instance admins and their needs
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

420 lines
15 KiB

#!/usr/bin/env python3
"""Different stream listener implementations used by bots."""
import copy
import pprint
import sqlite3
from toot import toot
from mastodon import Mastodon, StreamListener
class PublicTimelineBot(StreamListener):
"""Implementation of the Mastodon.py StreamListener class for public timeline bots."""
def process_toot(self, status):
"""Implement this for public timeline bots"""
pass
def process_notification(self, notification):
"""Implement this for processing notifications from user_stream"""
pass
def setup_cache(self):
"""Setup any custom caching/tables/etc here"""
pass
def __init__(self, bot_config):
StreamListener.__init__(self)
self.bot_config = bot_config
mastodon = Mastodon(
client_id=self.bot_config['config']['client_cred_file'],
access_token=self.bot_config['config']['user_cred_file'],
api_base_url=self.bot_config['config']['api_base_url']
)
user = mastodon.account_verify_credentials()
self.account_id = user['id']
# Get access to cache
self.conn = sqlite3.connect(self.bot_config['stream']['cache_file'])
self.cursor = self.conn.cursor()
self.cursor.execute("""\
CREATE TABLE IF NOT EXISTS toot_cache (
toot_id INTEGER PRIMARY KEY
);
""")
self.cursor.execute("""\
CREATE TABLE IF NOT EXISTS notification_cache (
notification_id INTEGER PRIMARY KEY
);
""")
self.conn.commit()
self.setup_cache()
# Replay any toots that were missed while offline and welcome new users
print('Replaying missed toots')
#self.replay_toots(True)
print(' Replay complete')
print('Replaying missed notifications')
#self.replay_notifications()
print(' Replay complete')
def __del__(self):
# Cleanup connection to sqlite database for welcome cache
self.conn.commit()
self.conn.close()
def fetch_remaining(self, mastodon, first_page):
"""Work around for odd behavior in Mastodon.py's official fetch_remaining code."""
# FIXME: Remove this method when below GitHub issue is closed
# and a new release available FIXME: Don't forget to update
# minimum version of Mastodon.py to match when the fix is
# released https://github.com/halcy/Mastodon.py/issues/59
first_page = copy.deepcopy(first_page)
all_pages = []
current_page = first_page
while current_page is not None and current_page:
all_pages.extend(current_page)
current_page = mastodon.fetch_next(current_page)
return all_pages
def replay_toots(self, recurse=False):
"""Replay toots that were posted while the bot was offline."""
# Setup Mastodon API
mastodon = Mastodon(
client_id=self.bot_config['config']['client_cred_file'],
access_token=self.bot_config['config']['user_cred_file'],
api_base_url=self.bot_config['config']['api_base_url'])
self.cursor.execute('select max(toot_id) from toot_cache;')
last_seen_toot_id = self.cursor.fetchone()[0]
if not last_seen_toot_id:
last_seen_toot_id = 0
first_page = mastodon.timeline_public(since_id=last_seen_toot_id)
all_pages = self.fetch_remaining(mastodon, first_page)
# Catch up ALL welcome messages that may have been missed
for status in all_pages:
self.process_toot(status)
# Update max seen toot id (any calls to welcome_user will move the max)
self.cursor.execute('select max(toot_id) from toot_cache;')
last_seen_toot_id = self.cursor.fetchone()[0]
if not last_seen_toot_id:
last_seen_toot_id = 0
# Update last seen toot id
new_last_seen_toot_id = -1
if all_pages:
new_last_seen_toot_id = all_pages[0]['id']
if int(new_last_seen_toot_id) > last_seen_toot_id:
self.cursor.execute('insert into toot_cache values (?)',
(new_last_seen_toot_id, ))
self.conn.commit()
# Recurse in case the catch up took long enough for more toots to enter the public timeline
# Do this only once to be safe
if recurse:
# Recurse ONCE to catch up on any missing toots posted while doing initial catch up
self.replay_toots()
def replay_notifications(self):
"""Replay notifications that were posted while the bot was offline."""
# Setup Mastodon API
mastodon = Mastodon(
client_id=self.bot_config['config']['client_cred_file'],
access_token=self.bot_config['config']['user_cred_file'],
api_base_url=self.bot_config['config']['api_base_url'])
self.cursor.execute('select max(notification_id) from notification_cache;')
last_seen_notification_id = self.cursor.fetchone()[0]
if not last_seen_notification_id:
last_seen_notification_id = 0
try:
first_page = mastodon.notifications(since_id=last_seen_notification_id)
except KeyError:
return
# Catch up ALL notifications that may have been missed
for notification in first_page:
self.process_notification(notification)
# Update max seen notification id (any calls to process_notification will move the max)
self.cursor.execute('select max(notification_id) from notification_cache;')
last_seen_notification_id = self.cursor.fetchone()[0]
if not last_seen_notification_id:
last_seen_notification_id = 0
# Update last seen toot id
new_last_seen_notification_id = -1
if first_page:
new_last_seen_notification_id = first_page[0]['id']
if int(new_last_seen_notification_id) > last_seen_notification_id:
self.cursor.execute('insert into notification_cache values (?)',
(new_last_seen_notification_id, ))
self.conn.commit()
self.replay_notifications()
def on_update(self, status):
"""A new status has appeared!
'status' is the parsed JSON dictionary describing the
status.
"""
toot_id = status['id']
self.cursor.execute('select max(toot_id) from toot_cache;')
last_seen_toot_id = self.cursor.fetchone()[0]
if last_seen_toot_id == toot_id:
return
# Cache toot
try:
self.cursor.execute('insert into toot_cache values (?)', (toot_id, ))
self.conn.commit()
except:
pass
self.process_toot(status)
def on_notification(self, notification):
"""A new notification.
'notification' is the parsed JSON dictionary describing the
notification.
"""
notification_id = notification['id']
self.cursor.execute('select max(notification_id) from notification_cache;')
last_seen_notification_id = self.cursor.fetchone()[0]
if last_seen_notification_id == notification_id:
return
# Cache notification
self.cursor.execute('insert into notification_cache values (?)', (notification_id, ))
self.conn.commit()
self.process_notification(notification)
def on_delete(self, status_id):
"""A status has been deleted.
status_id is the status' integer ID.
"""
# Remove the status from the toot_cache if we see a delete
self.cursor.execute('delete from toot_cache where toot_id = ?',
(status_id, ))
self.conn.commit()
def handle_heartbeat(self):
"""The server has sent us a keep-alive message.
This callback may be useful to carry out periodic housekeeping
tasks, or just to confirm that the connection is still
open.
"""
# Consistently/constantly trim the toot cache to the most recent seen toot
self.cursor.execute("""\
DELETE FROM toot_cache WHERE toot_id <= ((SELECT MAX(toot_id) FROM toot_cache) - 1);
""")
# Consistently/constantly trim the notification cache to the most recent seen notification
self.cursor.execute("""\
DELETE FROM notification_cache WHERE notification_id <= ((SELECT MAX(notification_id) FROM notification_cache) - 1);
""")
self.conn.commit()
class WelcomeBot(PublicTimelineBot):
"""Public timeline bot that welcomes users to an instance"""
def setup_cache(self):
"""Setup the local cache"""
# Ensure cache table has been created
self.cursor.execute("""\
CREATE TABLE IF NOT EXISTS welcome_cache (
username VARCHAR(2048) PRIMARY KEY,
seen_timestamp TIMESTAMP
);
""")
self.conn.commit()
def process_toot(self, status):
"""Method that sets up toot and welcomes new users.
Method due to use in multiple places."""
federated = '@' in status['account']['acct']
username = status['account']['acct']
timestamp = status['created_at']
visibility = status['visibility']
# Welcome any user who's posted publicly
if visibility == 'public' and not federated:
# Check if username has been seen for welcome
self.cursor.execute(
'select count(1) as found from welcome_cache where username = ?',
(username, ))
if self.cursor.fetchone()[0] > 0:
return
# Send welcome toot
toot(self.bot_config, username=username)
# Cache user to avoid duping welcome messages
self.cursor.execute('insert into welcome_cache values (?, ?)',
(username, timestamp))
self.conn.commit()
class AutoRespondBot(PublicTimelineBot):
"""User timeline bot that auto responds to @'s"""
def setup_cache(self):
"""No extra logic necessary"""
pass
def process_notification(self, notification):
"""Method that handles notifications."""
if not notification['type'] == 'mention':
return
status = notification['status']
username = status['account']['username']
visibility = status['visibility']
in_reply_to = status['id']
mentions = status['mentions']
mentions.append(status['account'])
# Respond to @'s
if visibility == 'public' and not status['account']['id'] == self.account_id:
# Send autoresponse toot
autoresponse = toot(self.bot_config, username=username, in_reply_to=in_reply_to,
mentions=mentions)
autoresponse_id = autoresponse['id']
# Cache newly sent toot to prevent any looping
self.cursor.execute('insert into toot_cache values (?)', (autoresponse_id, ))
self.conn.commit()
class NewFollowerAutoRespondBot(PublicTimelineBot):
"""User timeline bot that auto responds to new followers"""
def setup_cache(self):
"""No extra logic necessary"""
pass
def process_notification(self, notification):
"""Method that handles notifications."""
if not notification['type'] == 'follow':
return
# If auto-responses to followers should be limited to local accounts
if 'local_only' in self.bot_config['config'] and '@' in notification['account']['acct']:
return
if not notification['account']['id'] == self.account_id:
# Send autoresponse toot
autoresponse = toot(self.bot_config, username=notification['account']['username'])
autoresponse_id = autoresponse['id']
# Cache newly sent toot to prevent any looping
try:
self.cursor.execute('insert into toot_cache values (?)', (autoresponse_id, ))
self.conn.commit()
except:
pass
class FollowBot(PublicTimelineBot):
"""Public timeline bot that follows newly seen users"""
def setup_cache(self):
"""Setup the local cache"""
# Ensure cache table has been created
self.cursor.execute("""\
CREATE TABLE IF NOT EXISTS follow_cache (
username VARCHAR(2048) PRIMARY KEY,
seen_timestamp TIMESTAMP
)
""")
self.conn.commit()
def process_toot(self, status):
"""Method that follows unseen users."""
print('========================')
pprint.pprint(status)
if 'reblog' in status and status['reblog'] is not None:
print('has reblog!')
status = status['reblog']
else:
print('no reblog')
return # No need for additional stuff below if it's not a boost
if status is None:
print('status is none?')
return # Avoid a potential null error when not a reblog
username = status['account']['acct']
federated = '@' in username
instance = None
print(username)
if federated:
print(' IS federated')
instance = status['account']['acct'].split('@')[1]
timestamp = status['created_at']
visibility = status['visibility']
# Follow any user who's posted publicly
if visibility == 'public' and federated \
and instance.lower() not in (instance.lower() for instance in self.bot_config['follow']['blacklist']['instances']) \
and username.lower() not in (user.lower() for user in self.bot_config['follow']['blacklist']['users']) \
and 'nobot' not in status['account']['note'].lower():
print(' go ahead with follow')
# ABOVE CONDITIONAL WARNING: depending on how the formatted bio looks, #nobot may not match!!!!!
# Deal with tag processing
tags_ok = True # Default to tag's ok (assumes tags are not defined in config)
if 'tags' in self.bot_config['follow']:
tags_ok = False
for toot_tag in status['tags']:
if toot_tag['name'].lower() in (tag.lower() for tag in self.bot_config['follow']['tags']):
tags_ok = True
break
# Bail out of toot wasn't tagged properly
if not tags_ok:
print(' no tags matched')
return
# Check if username has been seen for welcome
self.cursor.execute(
'select count(1) as found from follow_cache where username = ?',
(username, ))
if self.cursor.fetchone()[0] > 0:
print(' already followed (cached), returning')
return
print(' following')
mastodon = Mastodon(
client_id=self.bot_config['config']['client_cred_file'],
access_token=self.bot_config['config']['user_cred_file'],
api_base_url=self.bot_config['config']['api_base_url']
)
mastodon.account_follow(status['account']['id'])
# Cache user to avoid duping welcome messages
self.cursor.execute('insert into follow_cache values (?, ?)',
(username, timestamp))
self.conn.commit()
print(' processing complete')