A Python Mastodon bot that's geared towards instance admins and their needs
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

171 lines
6.0 KiB

#!/usr/bin/env python3
"""Different stream listener implementations used by bots."""
import copy
import sqlite3
from toot import toot
from mastodon import Mastodon, StreamListener
class WelcomeBot(StreamListener):
"""Implementation of the Mastodon.py StreamListener class for welcome bot purposes."""
def __init__(self, bot_config):
StreamListener.__init__(self)
self.bot_config = bot_config
# Get access to cache
self.conn = sqlite3.connect(self.bot_config['welcome']['cache_file'])
self.cursor = self.conn.cursor()
# Ensure cache table has been created
self.cursor.execute("""\
CREATE TABLE IF NOT EXISTS welcome_cache (
username VARCHAR(2048) PRIMARY KEY,
seen_timestamp TIMESTAMP
);
""")
self.cursor.execute("""\
CREATE TABLE IF NOT EXISTS toot_cache (
toot_id INTEGER PRIMARY KEY
);
""")
self.conn.commit()
# Replay any toots that were missed while offline and welcome new users
self.replay_toots(True)
def __del__(self):
# Cleanup connection to sqlite database for welcome cache
self.conn.commit()
self.conn.close()
def fetch_remaining(self, mastodon, first_page):
"""Work around for odd behavior in Mastodon.py's official fetch_remaining code."""
# FIXME: Remove this method when below GitHub issue is closed
# and a new release available FIXME: Don't forget to update
# minimum version of Mastodon.py to match when the fix is
# released https://github.com/halcy/Mastodon.py/issues/59
first_page = copy.deepcopy(first_page)
all_pages = []
current_page = first_page
while current_page is not None and current_page:
all_pages.extend(current_page)
current_page = mastodon.fetch_next(current_page)
return all_pages
def replay_toots(self, recurse=False):
"""Replay toots that were posted while the bot was offline."""
# Setup Mastodon API
mastodon = Mastodon(
client_id=self.bot_config['config']['client_cred_file'],
access_token=self.bot_config['config']['user_cred_file'],
api_base_url=self.bot_config['config']['api_base_url'])
self.cursor.execute('select max(toot_id) from toot_cache;')
last_seen_toot_id = self.cursor.fetchone()[0]
if last_seen_toot_id is None:
last_seen_toot_id = 0
first_page = mastodon.timeline_local(since_id=last_seen_toot_id)
all_pages = self.fetch_remaining(mastodon, first_page)
# Catch up ALL welcome messages that may have been missed
for status in all_pages:
self.welcome_user(status)
# Update max seen toot id (any calls to welcome_user will move the max)
self.cursor.execute('select max(toot_id) from toot_cache;')
last_seen_toot_id = self.cursor.fetchone()[0]
# Update last seen toot id
new_last_seen_toot_id = -1
if all_pages:
new_last_seen_toot_id = all_pages[0]['id']
if new_last_seen_toot_id > last_seen_toot_id:
self.cursor.execute('insert into toot_cache values (?)',
(new_last_seen_toot_id, ))
self.conn.commit()
# Recurse in case the catch up took long enough for more toots to enter the public timeline
# Do this only once to be safe
if recurse:
# Recurse ONCE to catch up on any missing toots posted while doing initial catch up
self.replay_toots()
def welcome_user(self, status):
"""Method that sets up toot and welcomes new users.
Method due to use in multiple places."""
toot_id = status['id']
federated = '@' in status['account']['acct']
username = status['account']['acct']
timestamp = status['created_at']
visibility = status['visibility']
# Cache toot
self.cursor.execute('insert into toot_cache values (?)', (toot_id, ))
self.conn.commit()
# Welcome any user who's posted publicly
if visibility == 'public' and not federated:
# Check if username has been seen for welcome
self.cursor.execute(
'select count(1) as found from welcome_cache where username = ?',
(username, ))
if self.cursor.fetchone()[0] > 0:
return
# Send welcome toot
toot(self.bot_config, username=username)
# Cache user to avoid duping welcome messages
self.cursor.execute('insert into welcome_cache values (?, ?)',
(username, timestamp))
self.conn.commit()
def on_update(self, status):
"""A new status has appeared!
'status' is the parsed JSON dictionary describing the
status.
"""
self.welcome_user(status)
def on_notification(self, notification):
"""A new notification.
'notification' is the parsed JSON dictionary describing the
notification.
"""
# We don't care if notifications come through our bot / curation account
# Leave handling notifications/folow up to the admins and e-mail notifications
pass
def on_delete(self, status_id):
"""A status has been deleted.
status_id is the status' integer ID.
"""
# Remove the status from the toot_cache if we see a delete
self.cursor.execute('delete from toot_cache where toot_id = ?',
(status_id, ))
self.conn.commit()
def handle_heartbeat(self):
"""The server has sent us a keep-alive message.
This callback may be useful to carry out periodic housekeeping
tasks, or just to confirm that the connection is still
open.
"""
# Consistently/constantly trim the toot cache to the most recent seen toot
self.cursor.execute("""\
DELETE FROM toot_cache WHERE toot_id <= ((SELECT MAX(toot_id) FROM toot_cache) - 1);
""")
self.conn.commit()