A Python Mastodon bot that's geared towards instance admins and their needs
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

282 lines
10 KiB

#!/usr/bin/env python3
''' Various Mastodon bots and their implementations '''
import argparse
import sys
import os
import getpass
import codecs
import sqlite3
from mastodon import Mastodon
import feedparser
from ruamel.yaml import YAML
from Toot import toot
from BotStreamListeners import CurationBot, WelcomeBot
def init(config):
''' Initialize the Mastdon API app and cache the API key
Auto login if app creation succeeds '''
# Prompt user to find out if they want to continue if the client_cred_file exists already
# Shouldn't happen twice per docs
if os.path.exists(config['config']['client_cred_file']):
sys.exit((
'init should only ever be called once, try login instead, if that fails. delete {} '
' and re-run init'
).format(config['config']['client_cred_file']))
# Create app for API
Mastodon.create_app(
config['config']['app_name'],
api_base_url=config['config']['api_base_url'],
scopes=['read', 'write', 'follow'],
to_file=config['config']['client_cred_file']
)
# Login to seed login credentials
login(config)
def login(config):
''' Login to API and cache API access token(s) '''
# Prompt for user/password (NEVER store them on disk!)
email = input(
'what is the e-mail that was used to setup the bot account on {}? '.format(
config['config']['api_base_url']
)
)
password = getpass.getpass('what is the password for the account? ')
# Setup Mastodon API
mastodon = Mastodon(
client_id=config['config']['client_cred_file'],
api_base_url=config['config']['api_base_url']
)
totp_code = None
if config['config']['totp']:
print('TOTP enabled, open the link (below) in a browser to authorize access')
print(mastodon.auth_request_url())
print()
totp_code = input('enter the code you received from the url: ')
# Login and cache credential
if totp_code is None:
mastodon.log_in(
email,
password,
scopes=['read', 'write', 'follow'],
to_file=config['config']['user_cred_file']
)
else:
mastodon.log_in(
code=totp_code,
scopes=['read', 'write', 'follow'],
to_file=config['config']['user_cred_file']
)
# Default implementation on whether or not to include an RSS article
# Returns true and has the same method signature as what end users can setup for custom logic
def default_include_article(article):
''' Default implementation of RSS processing logic
Extended via external file if necessary '''
if article is None:
return False
return True
def rss(config):
''' Parse RSS feed and toot any new articles '''
# Setup custom include logic before opening any database connections
include_article_fn = default_include_article
include_path = os.path.abspath(config['rss']['custom_logic_include_file'])
if (
'custom_logic_include_file' in config['rss']
and os.path.exists(include_path)
):
from importlib.machinery import SourceFileLoader
custom_logic = SourceFileLoader( # pylint: disable=deprecated-method
'custom.logic', include_path
).load_module()
include_article_fn = custom_logic.include_article
# Crash on reading the feed before doing any database operations or tooting
feed = feedparser.parse(config['rss']['feed'])
# Get access to cache
conn = sqlite3.connect(config['rss']['cache_file'])
cursor = conn.cursor()
# Ensure cache table has been created
cursor.execute('create table if not exists article_cache (id varchar(256) primary key);')
conn.commit()
# Run through all articles in feed
for entry in feed['entries']:
if not include_article_fn(entry):
continue
# Check if article is in cache already and skip if found
cursor.execute('select count(1) as found from article_cache where id = ?', (entry.id,))
if cursor.fetchone()[0] > 0:
continue
# Toot article
toot(config, entry)
# Cache article
cursor.execute('insert into article_cache values (?)', (entry.id,))
conn.commit()
# Cleanup connection to sqlite database for rss cache
conn.close()
def curate(config):
''' Curate (fav/boost) popular posts on local and/or federated timelines '''
# Setup Mastodon API
mastodon = Mastodon(
client_id=config['config']['client_cred_file'],
access_token=config['config']['user_cred_file'],
api_base_url=config['config']['api_base_url']
)
mastodon.public_stream(CurationBot(config))
def welcome(config):
''' Welcome new users to the instance '''
# Setup Mastodon API
mastodon = Mastodon(
client_id=config['config']['client_cred_file'],
access_token=config['config']['user_cred_file'],
api_base_url=config['config']['api_base_url']
)
mastodon.public_stream(WelcomeBot(config))
if __name__ == '__main__':
# Global CLI arguments/options
PARSER = argparse.ArgumentParser()
PARSER.add_argument('--config', help='path to config file', required=True)
SUBPARSERS = PARSER.add_subparsers(help='commands', dest='command')
# Actions / commands
INIT_PARSER = SUBPARSERS.add_parser('init', help='initialize credentials')
INIT_PARSER.add_argument(
'--totp',
help=(
'use totp login (requires user to open URL in browser and then enter token to bot '
'during init/login)'
),
action='store_true')
LOGIN_PARSER = SUBPARSERS.add_parser(
'login',
help='login to instance if credentials have expired')
LOGIN_PARSER.add_argument(
'--totp',
help=(
'use totp login (requires user to open URL in browser and then enter token to bot '
'during init/login)'
),
action='store_true')
TOOT_PARSER = SUBPARSERS.add_parser('toot', help='send configured toot')
RSS_PARSER = SUBPARSERS.add_parser('rss', help='cross post articles from an rss feed')
CURATE_PARSER = SUBPARSERS.add_parser('curation', help='Run the curation bot (service)')
WELCOME_PARSER = SUBPARSERS.add_parser('welcome', help='Run the welcome bot (service)')
# Parse CLI arguments
ARGS = PARSER.parse_args()
# Make sure a command was specified
if ARGS.command is None:
sys.exit('command must be specified')
# Make sure the config file specified exists
CONFIG_PATH = os.path.abspath(ARGS.config)
if not os.path.exists(CONFIG_PATH):
sys.exit('invalid path to config file')
# Read/parse config file
CONFIG = None
# Fix unicode special character error
with codecs.open(CONFIG_PATH, "r", "utf8") as stream:
CONFIG = YAML(typ='safe').load(stream)
# Add TOTP flag to CONFIG which is passed to each function
if 'totp' in ARGS:
CONFIG['config']['totp'] = ARGS.totp
# Ensure client_cred_file path is valid
if not os.path.exists(os.path.abspath(os.path.split(CONFIG['config']['client_cred_file'])[0])):
sys.exit('client_cred_file directory does not exist')
# Warn user that the config file WILL be created
if not os.path.exists(os.path.abspath(CONFIG['config']['client_cred_file'])):
print('warning: client_cred_file will be created')
# Ensure user_cred_file path is valid
if not os.path.exists(os.path.abspath(os.path.split(CONFIG['config']['user_cred_file'])[0])):
sys.exit('user_cred_file directory does not exist')
if not os.path.exists(os.path.abspath(CONFIG['config']['user_cred_file'])):
print('warning: user_cred file will be created')
# Deal with init command
if ARGS.command == 'init':
init(CONFIG)
# Deal with login command
if ARGS.command == 'login':
login(CONFIG)
# Deal with curation command
if ARGS.command == 'curation':
# Verify toot cache file folder exists
if not os.path.exists(os.path.abspath(os.path.split(CONFIG['curation']['cache_file'])[0])):
# Warn if curation cache file doesn't exist
sys.exit('curation cache_file directory does not exist')
if not os.path.exists(os.path.abspath(CONFIG['curation']['cache_file'])):
print('warning: curation cache_file file will be created')
curate(CONFIG)
# Deal with welcome command
if ARGS.command == 'welcome':
# Verify toot cache file folder exists
if not os.path.exists(os.path.abspath(os.path.split(CONFIG['welcome']['cache_file'])[0])):
# Warn if welcome cache file doesn't exist
sys.exit('welcome cache_file directory does not exist')
if not os.path.exists(os.path.abspath(CONFIG['welcome']['cache_file'])):
print('warning: welcome cache_file file will be created')
welcome(CONFIG)
# Deal with toot command
if ARGS.command == 'toot':
# Ensure main toot is <= 500 characters
TOOT_LENGTH = len(CONFIG['toot']['toot_text'])
if 'cw_text' in CONFIG['toot']:
TOOT_LENGTH = TOOT_LENGTH + len(CONFIG['toot']['cw_text'])
if TOOT_LENGTH > 500:
sys.exit('toot length must be <= 500 characters (including cw_text)')
# Ensure sub toots are <= 500 characters
if 'subtoots' in CONFIG['toot']:
for subtoot in CONFIG['toot']['subtoots']:
TOOT_LENGTH = len(subtoot['toot_text'])
if 'cw_text' in subtoot:
TOOT_LENGTH = TOOT_LENGTH + len(subtoot['cw_text'])
if TOOT_LENGTH > 500:
sys.exit('sub toot length must be <= 500 characters (including cw_text)')
toot(CONFIG)
# Deal with rss command
if ARGS.command == 'rss':
if 'custom_logic_include_file' in CONFIG['rss']:
if not os.path.exists(os.path.abspath(CONFIG['rss']['custom_logic_include_file'])):
# Verify RSS cache file folder exists
sys.exit('custom logic file does not exist!')
if not os.path.exists(os.path.abspath(os.path.split(CONFIG['rss']['cache_file'])[0])):
# Warn if RSS cache file doesn't exist
sys.exit('rss cache_file directory does not exist')
if not os.path.exists(os.path.abspath(CONFIG['rss']['cache_file'])):
print('warning: rss_cache_file file will be created')
rss(CONFIG)