A Python Mastodon bot that's geared towards instance admins and their needs
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

330 lines
11 KiB

#!/usr/bin/env python3
"""Various Mastodon bots and their implementations."""
import argparse
import codecs
import getpass
import importlib
import os
import pathlib
import sqlite3
import sys
import feedparser
from mastodon import Mastodon
from ruamel.yaml import YAML
from botstreamlisteners import WelcomeBot, AutoRespondBot, FollowBot, NewFollowerAutoRespondBot
from toot import toot
def init(config):
"""Initialize the Mastdon API app and cache the API key.
Auto login if app creation succeeds.
"""
# Prompt user to find out if they want to continue if the client_cred_file exists already
# Shouldn't happen twice per docs
if os.path.exists(config['config']['client_cred_file']):
sys.exit((
'init should only ever be called once, try login instead, if that fails. delete {} '
'and re-run init'
).format(config['config']['client_cred_file']))
# Create app for API
Mastodon.create_app(
config['config']['app_name'],
api_base_url=config['config']['api_base_url'],
scopes=['read', 'write', 'follow'],
to_file=config['config']['client_cred_file']
)
# Login to seed login credentials
login(config)
def login(config):
"""Login to API and cache API access token(s)."""
# Prompt for user/password (NEVER store them on disk!)
email = input(
'what is the e-mail that was used to setup the bot account on {}? '.format(
config['config']['api_base_url']
)
)
password = getpass.getpass('what is the password for the account? ')
# Setup Mastodon API
mastodon = Mastodon(
client_id=config['config']['client_cred_file'],
api_base_url=config['config']['api_base_url']
)
totp_code = None
if config['config']['totp']:
print('TOTP enabled, open the link (below) in a browser to authorize access')
print(mastodon.auth_request_url())
print()
totp_code = input('enter the code you received from the url: ')
# Login and cache credential
if totp_code is None:
mastodon.log_in(
email,
password,
scopes=['read', 'write', 'follow'],
to_file=config['config']['user_cred_file']
)
else:
mastodon.log_in(
code=totp_code,
scopes=['read', 'write', 'follow'],
to_file=config['config']['user_cred_file']
)
# Default implementation on whether or not to include an RSS article
# Returns true and has the same method signature as what end users can setup for custom logic
def default_include_article(article):
"""Default implementation of RSS processing logic.
Extended via external file if necessary.
"""
if article is None:
return False
return True
def rss(config):
"""Parse RSS feed and toot any new articles."""
# Setup custom include logic before opening any database connections
include_article_fn = default_include_article
try:
custom_logic_module = config['rss']['custom_logic_include_file']
except KeyError:
pass
else:
try:
#module = importlib.import_module(custom_logic_module)
import importlib.machinery
loader = importlib.machinery.SourceFileLoader('custom.logic', custom_logic_module)
module = loader.load_module()
except ImportError:
sys.exit('custom logic file does not exist!')
include_article_fn = module.include_article
# Crash on reading the feed before doing any database operations or tooting
feed = feedparser.parse(config['rss']['feed'])
# Get access to cache
conn = sqlite3.connect(config['rss']['cache_file'])
cursor = conn.cursor()
# Ensure cache table has been created
cursor.execute('CREATE TABLE IF NOT EXISTS article_cache (id VARCHAR(256) PRIMARY KEY);')
conn.commit()
# Run through all articles in feed
for entry in feed['entries']:
if not include_article_fn(entry):
continue
#if no ID field is found for the article, use a SHA1 of its title instead
if entry.id is None:
import hashlib
entry.id = hashlib.sha1(entry.title)
# Check if article is in cache already and skip if found
cursor.execute('select count(1) as found from article_cache where id = ?', (entry.id,))
if cursor.fetchone()[0] > 0:
continue
# Toot article
toot(config, entry)
# Cache article
cursor.execute('insert into article_cache values (?)', (entry.id,))
conn.commit()
# Cleanup connection to sqlite database for rss cache
conn.close()
def welcome(config):
"""Welcome new users to the instance."""
# Setup Mastodon API
mastodon = Mastodon(
client_id=config['config']['client_cred_file'],
access_token=config['config']['user_cred_file'],
api_base_url=config['config']['api_base_url']
)
mastodon.public_stream(WelcomeBot(config))
def follow(config):
"""Follow new users flowing through public timeline that haven't been seen yet"""
mastodon = Mastodon(
client_id=config['config']['client_cred_file'],
access_token=config['config']['user_cred_file'],
api_base_url=config['config']['api_base_url']
)
mastodon.stream_user(FollowBot(config))
def followautorespond(config):
"""Auto respond to new followers"""
mastodon = Mastodon(
client_id=config['config']['client_cred_file'],
access_token=config['config']['user_cred_file'],
api_base_url=config['config']['api_base_url']
)
mastodon.stream_user(NewFollowerAutoRespondBot(config))
def autorespond(config):
"""Auto respond to @'s to the configured account"""
mastodon = Mastodon(
client_id=config['config']['client_cred_file'],
access_token=config['config']['user_cred_file'],
api_base_url=config['config']['api_base_url']
)
mastodon.stream_user(AutoRespondBot(config))
def handle_command_line():
"""Parse and act on the command line."""
# Global CLI arguments/options
parser = argparse.ArgumentParser()
parser.add_argument('--config', help='path to config file', required=True)
subparsers = parser.add_subparsers(help='commands', dest='command')
# Actions / commands
init_parser = subparsers.add_parser('init', help='initialize credentials')
init_parser.add_argument(
'--totp',
help=(
'use totp login (requires user to open URL in browser and then enter token to bot '
'during init/login)'
),
action='store_true')
login_parser = subparsers.add_parser(
'login',
help='login to instance if credentials have expired')
login_parser.add_argument(
'--totp',
help=(
'use totp login (requires user to open URL in browser and then enter token to bot '
'during init/login)'
),
action='store_true')
subparsers.add_parser('toot', help='send configured toot')
subparsers.add_parser('rss', help='cross post articles from an rss feed')
subparsers.add_parser('welcome', help='Run the welcome bot (service)')
subparsers.add_parser('autorespond', help='Run the auto respond bot (service)')
subparsers.add_parser('followautorespond', help='Run the new follower auto respond bot (service)')
subparsers.add_parser('follow', help='Run the follow bot (service) - UNSUPPORTED DON\'T RUN !!!!!!!!!!!!')
# Parse CLI arguments
args = parser.parse_args()
# Make sure a command was specified
if args.command is None:
sys.exit('command must be specified')
# Make sure the config file specified exists
config_path = os.path.abspath(args.config)
if not os.path.exists(config_path):
sys.exit('invalid path to config file')
# Read/parse config file
config = None
# Fix unicode special character error
with codecs.open(config_path, "r", "utf8") as stream:
config = YAML(typ='safe').load(stream)
# Add TOTP flag to CONFIG which is passed to each function
if 'totp' in args:
config['config']['totp'] = args.totp
# Ensure client_cred_file path is valid
client_cred_file = pathlib.Path(config['config']['client_cred_file'])
if not client_cred_file.parent.exists(): # pylint: disable=no-member
sys.exit('client_cred_file directory does not exist')
# Warn user that the config file WILL be created
if not client_cred_file.exists():
print('warning: client_cred_file will be created')
# Ensure user_cred_file path is valid
user_cred_file = pathlib.Path(config['config']['client_cred_file'])
if not user_cred_file.parent.exists(): # pylint: disable=no-member
sys.exit('user_cred_file directory does not exist')
if not user_cred_file.exists():
print('warning: user_cred file will be created')
# Deal with init command
if args.command == 'init':
init(config)
# Deal with login command
if args.command == 'login':
login(config)
# Deal with streaming command caches
if args.command == 'welcome' or args.command == 'follow':
# Verify toot cache file folder exists
cache_file = pathlib.Path(config['stream']['cache_file'])
if not cache_file.parent.exists(): # pylint: disable=no-member
# Warn if welcome cache file doesn't exist
sys.exit('cache_file directory does not exist')
if not cache_file.exists():
print('warning: cache_file file will be created')
# Deal with welcome command
if args.command == 'welcome':
welcome(config)
# Deal with new follower command
if args.command == 'followautorespond':
followautorespond(config)
# Deal with autorespond command
if args.command == 'autorespond':
autorespond(config)
# Deal with follow command
if args.command == 'follow':
follow(config)
# Deal with toot command
if args.command == 'toot':
# Ensure main toot is <= 500 characters
toot_length = len(config['toot']['toot_text'])
if 'cw_text' in config['toot']:
toot_length = toot_length + len(config['toot']['cw_text'])
if toot_length > 500:
sys.exit('toot length must be <= 500 characters (including cw_text)')
# Ensure sub toots are <= 500 characters
if 'subtoots' in config['toot']:
for subtoot in config['toot']['subtoots']:
toot_length = len(subtoot['toot_text'])
if 'cw_text' in subtoot:
toot_length = toot_length + len(subtoot['cw_text'])
if toot_length > 500:
sys.exit('sub toot length must be <= 500 characters (including cw_text)')
toot(config)
# Deal with rss command
if args.command == 'rss':
cache_file = pathlib.Path(config['rss']['cache_file'])
if not cache_file.parent.exists(): # pylint: disable=no-member
# Warn if RSS cache file doesn't exist
sys.exit('rss cache_file directory does not exist')
if not cache_file.exists():
print('warning: rss_cache_file file will be created')
rss(config)
if __name__ == '__main__':
handle_command_line()