A Python Mastodon bot that's geared towards instance admins and their needs
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

331 lines
11 KiB

5 years ago
#!/usr/bin/env python3
4 years ago
"""Various Mastodon bots and their implementations."""
import argparse
import codecs
4 years ago
import getpass
import importlib
4 years ago
import os
4 years ago
import pathlib
import sqlite3
4 years ago
import sys
import feedparser
4 years ago
from mastodon import Mastodon
from ruamel.yaml import YAML
4 years ago
from botstreamlisteners import WelcomeBot, AutoRespondBot, FollowBot, NewFollowerAutoRespondBot
4 years ago
from toot import toot
def init(config):
4 years ago
"""Initialize the Mastdon API app and cache the API key.
Auto login if app creation succeeds.
"""
# Prompt user to find out if they want to continue if the client_cred_file exists already
# Shouldn't happen twice per docs
if os.path.exists(config['config']['client_cred_file']):
sys.exit((
'init should only ever be called once, try login instead, if that fails. delete {} '
'and re-run init'
).format(config['config']['client_cred_file']))
# Create app for API
Mastodon.create_app(
config['config']['app_name'],
api_base_url=config['config']['api_base_url'],
scopes=['read', 'write', 'follow'],
to_file=config['config']['client_cred_file']
)
# Login to seed login credentials
login(config)
def login(config):
4 years ago
"""Login to API and cache API access token(s)."""
# Prompt for user/password (NEVER store them on disk!)
email = input(
'what is the e-mail that was used to setup the bot account on {}? '.format(
config['config']['api_base_url']
)
)
password = getpass.getpass('what is the password for the account? ')
# Setup Mastodon API
mastodon = Mastodon(
client_id=config['config']['client_cred_file'],
api_base_url=config['config']['api_base_url']
)
totp_code = None
if config['config']['totp']:
print('TOTP enabled, open the link (below) in a browser to authorize access')
print(mastodon.auth_request_url())
print()
totp_code = input('enter the code you received from the url: ')
# Login and cache credential
if totp_code is None:
mastodon.log_in(
email,
password,
scopes=['read', 'write', 'follow'],
to_file=config['config']['user_cred_file']
)
else:
mastodon.log_in(
code=totp_code,
scopes=['read', 'write', 'follow'],
to_file=config['config']['user_cred_file']
)
# Default implementation on whether or not to include an RSS article
# Returns true and has the same method signature as what end users can setup for custom logic
def default_include_article(article):
4 years ago
"""Default implementation of RSS processing logic.
Extended via external file if necessary.
"""
if article is None:
return False
return True
def rss(config):
4 years ago
"""Parse RSS feed and toot any new articles."""
# Setup custom include logic before opening any database connections
include_article_fn = default_include_article
try:
custom_logic_module = config['rss']['custom_logic_include_file']
except KeyError:
pass
else:
4 years ago
try:
#module = importlib.import_module(custom_logic_module)
import importlib.machinery
loader = importlib.machinery.SourceFileLoader('custom.logic', custom_logic_module)
module = loader.load_module()
except ImportError:
4 years ago
sys.exit('custom logic file does not exist!')
include_article_fn = module.include_article
# Crash on reading the feed before doing any database operations or tooting
feed = feedparser.parse(config['rss']['feed'])
# Get access to cache
conn = sqlite3.connect(config['rss']['cache_file'])
cursor = conn.cursor()
# Ensure cache table has been created
cursor.execute('CREATE TABLE IF NOT EXISTS article_cache (id VARCHAR(256) PRIMARY KEY);')
conn.commit()
# Run through all articles in feed
for entry in feed['entries']:
if not include_article_fn(entry):
continue
#if no ID field is found for the article, use a SHA1 of its title instead
if entry.id is None:
import hashlib
entry.id = hashlib.sha1(entry.title)
# Check if article is in cache already and skip if found
cursor.execute('select count(1) as found from article_cache where id = ?', (entry.id,))
if cursor.fetchone()[0] > 0:
continue
# Toot article
toot(config, entry)
# Cache article
cursor.execute('insert into article_cache values (?)', (entry.id,))
conn.commit()
# Cleanup connection to sqlite database for rss cache
conn.close()
5 years ago
def welcome(config):
4 years ago
"""Welcome new users to the instance."""
# Setup Mastodon API
mastodon = Mastodon(
client_id=config['config']['client_cred_file'],
access_token=config['config']['user_cred_file'],
api_base_url=config['config']['api_base_url']
)
mastodon.public_stream(WelcomeBot(config))
def follow(config):
"""Follow new users flowing through public timeline that haven't been seen yet"""
mastodon = Mastodon(
client_id=config['config']['client_cred_file'],
access_token=config['config']['user_cred_file'],
api_base_url=config['config']['api_base_url']
)
mastodon.stream_user(FollowBot(config))
def followautorespond(config):
"""Auto respond to new followers"""
mastodon = Mastodon(
client_id=config['config']['client_cred_file'],
access_token=config['config']['user_cred_file'],
api_base_url=config['config']['api_base_url']
)
mastodon.stream_user(NewFollowerAutoRespondBot(config))
def autorespond(config):
"""Auto respond to @'s to the configured account"""
mastodon = Mastodon(
client_id=config['config']['client_cred_file'],
access_token=config['config']['user_cred_file'],
api_base_url=config['config']['api_base_url']
)
mastodon.stream_user(AutoRespondBot(config))
def handle_command_line():
4 years ago
"""Parse and act on the command line."""
# Global CLI arguments/options
parser = argparse.ArgumentParser()
parser.add_argument('--config', help='path to config file', required=True)
subparsers = parser.add_subparsers(help='commands', dest='command')
# Actions / commands
init_parser = subparsers.add_parser('init', help='initialize credentials')
init_parser.add_argument(
4 years ago
'--totp',
help=(
'use totp login (requires user to open URL in browser and then enter token to bot '
'during init/login)'
),
action='store_true')
login_parser = subparsers.add_parser(
4 years ago
'login',
help='login to instance if credentials have expired')
login_parser.add_argument(
4 years ago
'--totp',
help=(
'use totp login (requires user to open URL in browser and then enter token to bot '
'during init/login)'
),
action='store_true')
subparsers.add_parser('toot', help='send configured toot')
subparsers.add_parser('rss', help='cross post articles from an rss feed')
subparsers.add_parser('welcome', help='Run the welcome bot (service)')
subparsers.add_parser('autorespond', help='Run the auto respond bot (service)')
subparsers.add_parser('followautorespond', help='Run the new follower auto respond bot (service)')
subparsers.add_parser('follow', help='Run the follow bot (service) - UNSUPPORTED DON\'T RUN !!!!!!!!!!!!')
# Parse CLI arguments
args = parser.parse_args()
# Make sure a command was specified
if args.command is None:
sys.exit('command must be specified')
# Make sure the config file specified exists
config_path = os.path.abspath(args.config)
if not os.path.exists(config_path):
sys.exit('invalid path to config file')
# Read/parse config file
config = None
# Fix unicode special character error
with codecs.open(config_path, "r", "utf8") as stream:
config = YAML(typ='safe').load(stream)
# Add TOTP flag to CONFIG which is passed to each function
if 'totp' in args:
config['config']['totp'] = args.totp
# Ensure client_cred_file path is valid
client_cred_file = pathlib.Path(config['config']['client_cred_file'])
4 years ago
if not client_cred_file.parent.exists(): # pylint: disable=no-member
sys.exit('client_cred_file directory does not exist')
# Warn user that the config file WILL be created
4 years ago
if not client_cred_file.exists():
print('warning: client_cred_file will be created')
# Ensure user_cred_file path is valid
user_cred_file = pathlib.Path(config['config']['client_cred_file'])
4 years ago
if not user_cred_file.parent.exists(): # pylint: disable=no-member
sys.exit('user_cred_file directory does not exist')
4 years ago
if not user_cred_file.exists():
print('warning: user_cred file will be created')
# Deal with init command
if args.command == 'init':
init(config)
4 years ago
# Deal with login command
if args.command == 'login':
login(config)
4 years ago
# Deal with streaming command caches
if args.command == 'welcome' or args.command == 'follow':
# Verify toot cache file folder exists
cache_file = pathlib.Path(config['stream']['cache_file'])
4 years ago
if not cache_file.parent.exists(): # pylint: disable=no-member
# Warn if welcome cache file doesn't exist
sys.exit('cache_file directory does not exist')
4 years ago
if not cache_file.exists():
print('warning: cache_file file will be created')
# Deal with welcome command
if args.command == 'welcome':
welcome(config)
4 years ago
# Deal with new follower command
if args.command == 'followautorespond':
followautorespond(config)
# Deal with autorespond command
if args.command == 'autorespond':
autorespond(config)
# Deal with follow command
if args.command == 'follow':
follow(config)
# Deal with toot command
if args.command == 'toot':
# Ensure main toot is <= 500 characters
toot_length = len(config['toot']['toot_text'])
if 'cw_text' in config['toot']:
toot_length = toot_length + len(config['toot']['cw_text'])
if toot_length > 500:
sys.exit('toot length must be <= 500 characters (including cw_text)')
# Ensure sub toots are <= 500 characters
if 'subtoots' in config['toot']:
for subtoot in config['toot']['subtoots']:
toot_length = len(subtoot['toot_text'])
if 'cw_text' in subtoot:
toot_length = toot_length + len(subtoot['cw_text'])
if toot_length > 500:
sys.exit('sub toot length must be <= 500 characters (including cw_text)')
toot(config)
4 years ago
# Deal with rss command
if args.command == 'rss':
cache_file = pathlib.Path(config['rss']['cache_file'])
4 years ago
if not cache_file.parent.exists(): # pylint: disable=no-member
# Warn if RSS cache file doesn't exist
sys.exit('rss cache_file directory does not exist')
4 years ago
if not cache_file.exists():
print('warning: rss_cache_file file will be created')
rss(config)
if __name__ == '__main__':
handle_command_line()