A Python Mastodon bot that's geared towards instance admins and their needs
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

339 lines
14 KiB

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Adjust PyLint settings
# pylint: disable=C0301
''' Basic bot that toots '''
import argparse
import sys
import os
import getpass
import codecs
import sqlite3
from mastodon import Mastodon, StreamListener
import feedparser
from ruamel.yaml import YAML
class CurationBot(StreamListener):
''' Implementation of the Mastodon.py StreamListener class for curation bot purposes '''
def __init__(self, bot_config):
self.bot_config = bot_config
# Get access to cache
self.conn = sqlite3.connect(self.bot_config['curation']['cache_file'])
self.cursor = self.conn.cursor()
# Ensure cache table has been created
self.cursor.execute('''create table if not exists toot_cache (
toot_id integer primary key,
federated bool,
username varchar(2048),
is_reply bool,
is_boost bool,
toot_timestamp timestamp,
favorites integer,
boosts integer
def __del__(self):
# Cleanup connection to sqlite database for rss cache
def on_update(self, status):
'''A new status has appeared! 'status' is the parsed JSON dictionary
describing the status.'''
if status['visibility'] == 'public':
toot_id = status['id']
federated = '@' in status['account']['acct']
username = status['account']['acct']
is_reply = status['in_reply_to_id'] is not None
is_boost = status['reblog'] is not None
timestamp = status['created_at']
favorites = status['favourites_count']
boosts = status['reblogs_count']
#tags = status['tags'] # TODO: Add support for tags? - Will need many to 1 relationship and a cross table
# Ensure a toot isn't cached twice for some odd reason
self.cursor.execute('select count(1) as found from toot_cache where toot_id = ?', (toot_id,))
if self.cursor.fetchone()[0] > 0:
# Cache toot
self.cursor.execute('insert into toot_cache values (?, ?, ?, ?, ?, ?, ?, ?)', (toot_id, federated, username, is_reply, is_boost, timestamp, favorites, boosts))
def on_notification(self, notification):
'''A new notification. 'notification' is the parsed JSON dictionary
describing the notification.'''
# We don't care if notifications come through our bot / curation account
# Leave handling notifications/folow up to the admins and e-mail notifications
def on_delete(self, status_id):
'''A status has been deleted. status_id is the status' integer ID.'''
# Remove the status from the toot_cache if we see a delete
self.cursor.execute('delete from toot_cache where toot_id = ?', (status_id,))
def handle_heartbeat(self):
'''The server has sent us a keep-alive message. This callback may be
useful to carry out periodic housekeeping tasks, or just to confirm
that the connection is still open.'''
print(' we should probably update statuses and whatnot here or at least do some housekeeping for old toots')
def init(config):
''' Initialize the Mastdon API app and cache the API key
Auto login if app creation succeeds '''
# Prompt user to find out if they want to continue if the client_cred_file exists already
# Shouldn't happen twice per docs
if os.path.exists(config['config']['client_cred_file']):
sys.exit('init should only ever be called once, try login instead, if that fails. delete ' + config['config']['client_cred_file'] + ' and re-run init')
# Create app for API
scopes=['read', 'write', 'follow'],
# Login to seed login credentials
def login(config):
''' Login to API and cache API access token(s) '''
# Prompt for user/password (NEVER store them on disk!)
email = input('what is the e-mail that was used to setup the bot account on ' + config['config']['api_base_url'] + '? ')
password = getpass.getpass('what is the password for the account? ')
# Setup Mastodon API
mastodon = Mastodon(
totp_code = None
if config['config']['totp']:
print('TOTP enabled, open the link (below) in a browser to authorize access')
totp_code = input('enter the code you received from the url: ')
# Login and cache credential
if totp_code is None:
scopes=['read', 'write', 'follow'],
scopes=['read', 'write', 'follow'],
def toot(config, article=None):
''' Send a toot '''
# Setup Mastodon API
mastodon = Mastodon(
# Process main toot
cw_text = None
if 'cw_text' in config['toot']:
cw_text = config['toot']['cw_text']
if article is not None:
cw_text = cw_text.replace('[[ title ]]', article.title)
cw_text = cw_text.replace('[[ url ]]', article.link)
cw_text = cw_text.replace('[[ published ]]', article.published)
toot_text = config['toot']['toot_text']
if article is not None:
toot_text = toot_text.replace('[[ title ]]', article.title)
toot_text = toot_text.replace('[[ url ]]', article.link)
toot_text = toot_text.replace('[[ published ]]', article.published)
parent_toot = mastodon.status_post(toot_text, visibility=config['toot']['visibility'], spoiler_text=cw_text)
parent_toot_id = parent_toot['id']
# Handle any sub-toots
if 'subtoots' in config['toot']:
for subtoot in config['toot']['subtoots']:
cw_text = None
if 'cw_text' in subtoot:
cw_text = subtoot['cw_text']
mastodon.status_post(subtoot['toot_text'], in_reply_to_id=parent_toot_id, visibility=subtoot['visibility'], spoiler_text=cw_text)
# Return the parent toot dict just in case it's needed elsewhere
return parent_toot
# Default implementation on whether or not to include an RSS article
# Returns true and has the same method signature as what end users can setup for custom logic
def default_include_article(article):
''' Default implementation of RSS processing logic
Extended via external file if necessary '''
if article is None:
return False
return True
def rss(config):
''' Parse RSS feed and toot any new articles '''
# Setup custom include logic before opening any database connections
include_article_fn = default_include_article
if 'custom_logic_include_file' in config['rss'] and os.path.exists(os.path.abspath(config['rss']['custom_logic_include_file'])):
from importlib.machinery import SourceFileLoader
custom_logic = SourceFileLoader('custom.logic', os.path.abspath(config['rss']['custom_logic_include_file'])).load_module()
include_article_fn = custom_logic.include_article
# Crash on reading the feed before doing any database operations or tooting
feed = feedparser.parse(config['rss']['feed'])
# Get access to cache
conn = sqlite3.connect(config['rss']['cache_file'])
cursor = conn.cursor()
# Ensure cache table has been created
cursor.execute('create table if not exists article_cache (id varchar(256) primary key);')
# Run through all articles in feed
for entry in feed['entries']:
if not include_article_fn(entry):
# Check if article is in cache already and skip if found
cursor.execute('select count(1) as found from article_cache where id = ?', (entry.id,))
if cursor.fetchone()[0] > 0:
# Toot article
toot(config, entry)
# Cache article
cursor.execute('insert into article_cache values (?)', (entry.id,))
# Cleanup connection to sqlite database for rss cache
def curate(config):
''' Curate (fav/boost) popular posts on local and/or federated timelines '''
# Setup Mastodon API
mastodon = Mastodon(
if __name__ == '__main__':
# Global CLI arguments/options
PARSER = argparse.ArgumentParser()
PARSER.add_argument('--config', help='path to config file', required=True)
SUBPARSERS = PARSER.add_subparsers(help='commands', dest='command')
# Actions / commands
INIT_PARSER = SUBPARSERS.add_parser('init', help='initialize credentials')
help='use totp login (requires user to open URL in browser and then enter token to bot during init/login)',
LOGIN_PARSER = SUBPARSERS.add_parser('login', help='login to instance if credentials have expired')
help='use totp login (requires user to open URL in browser and then enter token to bot during init/login)',
TOOT_PARSER = SUBPARSERS.add_parser('toot', help='send configured toot')
RSS_PARSER = SUBPARSERS.add_parser('rss', help='cross post articles from an rss feed')
CURATE_PARSER = SUBPARSERS.add_parser('curation', help='Run the curation bot (service)')
# Parse CLI arguments
ARGS = PARSER.parse_args()
# Make sure a command was specified
if ARGS.command is None:
sys.exit('command must be specified')
# Make sure the config file specified exists
CONFIG_PATH = os.path.abspath(ARGS.config)
if not os.path.exists(CONFIG_PATH):
sys.exit('invalid path to config file')
# Read/parse config file
# Fix unicode special character error
with codecs.open(CONFIG_PATH, "r", "utf8") as stream:
CONFIG = YAML(typ='safe').load(stream)
# Add TOTP flag to CONFIG which is passed to each function
if 'totp' in ARGS:
CONFIG['config']['totp'] = ARGS.totp
# Ensure client_cred_file path is valid
if not os.path.exists(os.path.abspath(os.path.split(CONFIG['config']['client_cred_file'])[0])):
sys.exit('client_cred_file directory does not exist')
# Warn user that the config file WILL be created
if not os.path.exists(os.path.abspath(CONFIG['config']['client_cred_file'])):
print('warning: client_cred_file will be created')
# Ensure user_cred_file path is valid
if not os.path.exists(os.path.abspath(os.path.split(CONFIG['config']['user_cred_file'])[0])):
sys.exit('user_cred_file directory does not exist')
if not os.path.exists(os.path.abspath(CONFIG['config']['user_cred_file'])):
print('warning: user_cred file will be created')
# Deal with init command
if ARGS.command == 'init':
# Deal with login command
if ARGS.command == 'login':
# Deal with curation command
if ARGS.command == 'curation':
# Verify toot cache file folder exists
if not os.path.exists(os.path.abspath(os.path.split(CONFIG['curation']['cache_file'])[0])):
sys.exit('curation cache_file directory does not exist')
# Warn if curation cache file doesn't exist
if not os.path.exists(os.path.abspath(CONFIG['curation']['cache_file'])):
print('warning: curation cache_file file will be created')
# Deal with toot command
if ARGS.command == 'toot':
# Ensure main toot is <= 500 characters
TOOT_LENGTH = len(CONFIG['toot']['toot_text'])
if 'cw_text' in CONFIG['toot']:
TOOT_LENGTH = TOOT_LENGTH + len(CONFIG['toot']['cw_text'])
if TOOT_LENGTH > 500:
sys.exit('toot length must be <= 500 characters (including cw_text)')
# Ensure sub toots are <= 500 characters
if 'subtoots' in CONFIG['toot']:
for subtoot in CONFIG['toot']['subtoots']:
TOOT_LENGTH = len(subtoot['toot_text'])
if 'cw_text' in subtoot:
TOOT_LENGTH = TOOT_LENGTH + len(subtoot['cw_text'])
if TOOT_LENGTH > 500:
sys.exit('sub toot length must be <= 500 characters (including cw_text)')
# Deal with rss command
if ARGS.command == 'rss':
if 'custom_logic_include_file' in CONFIG['rss']:
if not os.path.exists(os.path.abspath(CONFIG['rss']['custom_logic_include_file'])):
sys.exit('custom logic file does not exist!')
# Verify RSS cache file folder exists
if not os.path.exists(os.path.abspath(os.path.split(CONFIG['rss']['cache_file'])[0])):
sys.exit('rss cache_file directory does not exist')
# Warn if RSS cache file doesn't exist
if not os.path.exists(os.path.abspath(CONFIG['rss']['cache_file'])):
print('warning: rss_cache_file file will be created')