Browse Source

Merge branch 'feature/pythonicize' into 'master'

Feature/pythonicize

See merge request !1
merge-requests/3/head 20170712.1
Mike C 5 years ago
parent
commit
edf4fa1425
  1. 5
      README.md
  2. 487
      botstreamlisteners.py
  3. 12
      custom_logic/example_custom_rss_include.py
  4. 17
      requirements.txt
  5. 2
      setup.cfg
  6. 15
      setup.py
  7. 0
      tests/__init__.py
  8. 9
      tests/test_toot_bot.py
  9. 119
      toot.py
  10. 225
      toot_bot.py

5
README.md

@ -21,8 +21,13 @@ This was designed with the admin in mind and functionality will grow over time.
# Installation
1. Install the dependencies: ```pip3 install -r requirements.txt```
1. Install the bot: ```python setup.py install```
1. Run the bot: ```python .\toot_bot.py --config .\configs\config.yaml [init|login|toot|rss]```
# Testing
1. For development, install the bot with: ```python setup.py develop```
1. Use [pytest](https://docs.pytest.org/en/latest/) to run tests: ```python setup.py develop```
# Tooting
## Reminder Toots
To send reminder toots take a look at 'configs/example_reminder.yaml' and do the following

487
BotStreamListeners.py → botstreamlisteners.py

@ -1,220 +1,267 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Adjust PyLint settings
# pylint: disable=C0301
''' Different stream listener implementations used by bots '''
import sys, pprint
import sqlite3
import copy
from mastodon import Mastodon, StreamListener
from Toot import toot
class WelcomeBot(StreamListener):
''' Implementation of the Mastodon.py StreamListener class for welcome bot purposes '''
def __init__(self, bot_config):
StreamListener.__init__(self)
self.bot_config = bot_config
# Get access to cache
self.conn = sqlite3.connect(self.bot_config['welcome']['cache_file'])
self.cursor = self.conn.cursor()
# Ensure cache table has been created
self.cursor.execute('''create table if not exists welcome_cache (
username varchar(2048) primary key,
seen_timestamp timestamp
);'''
)
self.cursor.execute('''create table if not exists toot_cache (
toot_id integer primary key
);'''
)
self.conn.commit()
# Replay any toots that were missed while offline and welcome new users
self.replay_toots(True)
def __del__(self):
# Cleanup connection to sqlite database for welcome cache
self.conn.commit()
self.conn.close()
def fetch_remaining(self, mastodon, first_page):
''' Work around for odd behavior in Mastodon.py's official fetch_remaining code '''
# FIXME: Remove this method when below GitHub issue is closed and a new release available
# FIXME: Don't forget to update minimum version of Mastodon.py to match when the fix is released
# https://github.com/halcy/Mastodon.py/issues/59
first_page = copy.deepcopy(first_page)
all_pages = []
current_page = first_page
while current_page != None and current_page:
all_pages.extend(current_page)
current_page = mastodon.fetch_next(current_page)
return all_pages
def replay_toots(self, recurse=False):
''' Replay toots that were posted while the bot was offline '''
# Setup Mastodon API
mastodon = Mastodon(
client_id=self.bot_config['config']['client_cred_file'],
access_token=self.bot_config['config']['user_cred_file'],
api_base_url=self.bot_config['config']['api_base_url']
)
self.cursor.execute('select max(toot_id) from toot_cache;')
last_seen_toot_id = self.cursor.fetchone()[0]
if last_seen_toot_id is None:
last_seen_toot_id = 0
first_page = mastodon.timeline_local(since_id=last_seen_toot_id)
all_pages = self.fetch_remaining(mastodon, first_page)
# Catch up ALL welcome messages that may have been missed
for status in all_pages:
self.welcome_user(status)
# Update max seen toot id (any calls to welcome_user will move the max)
self.cursor.execute('select max(toot_id) from toot_cache;')
last_seen_toot_id = self.cursor.fetchone()[0]
# Update last seen toot id
new_last_seen_toot_id = -1
if all_pages:
new_last_seen_toot_id = all_pages[0]['id']
if new_last_seen_toot_id > last_seen_toot_id:
self.cursor.execute('insert into toot_cache values (?)', (new_last_seen_toot_id,))
self.conn.commit()
# Recurse in case the catch up took long enough for more toots to enter the public timeline
# Do this only once to be safe
if recurse:
self.replay_toots() # Recurse ONCE to catch up on any missing toots posted while doing initial catch up
def welcome_user(self, status):
''' Method that sets up toot and welcomes new users (method due to use in multiple places) '''
toot_id = status['id']
federated = '@' in status['account']['acct']
username = status['account']['acct']
timestamp = status['created_at']
visibility = status['visibility']
# Cache toot
self.cursor.execute('insert into toot_cache values (?)', (toot_id,))
self.conn.commit()
# Welcome any user who's posted publicly
if visibility == 'public' and not federated:
# Check if username has been seen for welcome
self.cursor.execute('select count(1) as found from welcome_cache where username = ?', (username,))
if self.cursor.fetchone()[0] > 0:
return
# Send welcome toot
toot(self.bot_config, username=username)
# Cache user to avoid duping welcome messages
self.cursor.execute('insert into welcome_cache values (?, ?)', (username, timestamp))
self.conn.commit()
def on_update(self, status):
'''A new status has appeared! 'status' is the parsed JSON dictionary
describing the status.'''
self.welcome_user(status)
def on_notification(self, notification):
'''A new notification. 'notification' is the parsed JSON dictionary
describing the notification.'''
# We don't care if notifications come through our bot / curation account
# Leave handling notifications/folow up to the admins and e-mail notifications
pass
def on_delete(self, status_id):
'''A status has been deleted. status_id is the status' integer ID.'''
# Remove the status from the toot_cache if we see a delete
self.cursor.execute('delete from toot_cache where toot_id = ?', (status_id,))
self.conn.commit()
def handle_heartbeat(self):
'''The server has sent us a keep-alive message. This callback may be
useful to carry out periodic housekeeping tasks, or just to confirm
that the connection is still open.'''
# Consistently/constantly trim the toot cache to the most recent seen toot
self.cursor.execute('delete from toot_cache where toot_id <= ((select max(toot_id) from toot_cache) - 1);')
self.conn.commit()
class CurationBot(StreamListener):
''' Implementation of the Mastodon.py StreamListener class for curation bot purposes '''
def __init__(self, bot_config):
StreamListener.__init__(self)
self.bot_config = bot_config
# Get access to cache
self.conn = sqlite3.connect(self.bot_config['curation']['cache_file'])
self.cursor = self.conn.cursor()
# Ensure cache table has been created
self.cursor.execute('''create table if not exists toot_cache (
toot_id integer primary key,
federated bool,
username varchar(2048),
is_reply bool,
is_boost bool,
toot_timestamp timestamp,
favorites integer,
boosts integer
);'''
)
self.conn.commit()
def __del__(self):
# Cleanup connection to sqlite database for rss cache
self.conn.close()
def on_update(self, status):
'''A new status has appeared! 'status' is the parsed JSON dictionary
describing the status.'''
if status['visibility'] == 'public':
toot_id = status['id']
federated = '@' in status['account']['acct']
username = status['account']['acct']
is_reply = status['in_reply_to_id'] is not None
is_boost = status['reblog'] is not None
timestamp = status['created_at']
favorites = status['favourites_count']
boosts = status['reblogs_count']
#tags = status['tags'] # TODO: Add support for tags? - Will need many to 1 relationship and a cross table
# Ensure a toot isn't cached twice for some odd reason
self.cursor.execute('select count(1) as found from toot_cache where toot_id = ?', (toot_id,))
if self.cursor.fetchone()[0] > 0:
return
# Cache toot
self.cursor.execute('insert into toot_cache values (?, ?, ?, ?, ?, ?, ?, ?)', (toot_id, federated, username, is_reply, is_boost, timestamp, favorites, boosts))
self.conn.commit()
def on_notification(self, notification):
'''A new notification. 'notification' is the parsed JSON dictionary
describing the notification.'''
# We don't care if notifications come through our bot / curation account
# Leave handling notifications/folow up to the admins and e-mail notifications
pass
def on_delete(self, status_id):
'''A status has been deleted. status_id is the status' integer ID.'''
# Remove the status from the toot_cache if we see a delete
self.cursor.execute('delete from toot_cache where toot_id = ?', (status_id,))
self.conn.commit()
def handle_heartbeat(self):
'''The server has sent us a keep-alive message. This callback may be
useful to carry out periodic housekeeping tasks, or just to confirm
that the connection is still open.'''
print('!!!!!!!!heartbeat!!!!!!!!')
print(' we should probably update statuses and whatnot here or at least do some housekeeping for old toots')
#!/usr/bin/env python3
"""Different stream listener implementations used by bots."""
import copy
import sqlite3
from toot import toot
from mastodon import Mastodon, StreamListener
class WelcomeBot(StreamListener):
"""Implementation of the Mastodon.py StreamListener class for welcome bot purposes."""
def __init__(self, bot_config):
StreamListener.__init__(self)
self.bot_config = bot_config
# Get access to cache
self.conn = sqlite3.connect(self.bot_config['welcome']['cache_file'])
self.cursor = self.conn.cursor()
# Ensure cache table has been created
self.cursor.execute("""\
CREATE TABLE IF NOT EXISTS welcome_cache (
username VARCHAR(2048) PRIMARY KEY,
seen_timestamp TIMESTAMP
);
""")
self.cursor.execute("""\
CREATE TABLE IF NOT EXISTS toot_cache (
toot_id INTEGER PRIMARY KEY
);
""")
self.conn.commit()
# Replay any toots that were missed while offline and welcome new users
self.replay_toots(True)
def __del__(self):
# Cleanup connection to sqlite database for welcome cache
self.conn.commit()
self.conn.close()
def fetch_remaining(self, mastodon, first_page):
"""Work around for odd behavior in Mastodon.py's official fetch_remaining code."""
# FIXME: Remove this method when below GitHub issue is closed
# and a new release available FIXME: Don't forget to update
# minimum version of Mastodon.py to match when the fix is
# released https://github.com/halcy/Mastodon.py/issues/59
first_page = copy.deepcopy(first_page)
all_pages = []
current_page = first_page
while current_page is not None and current_page:
all_pages.extend(current_page)
current_page = mastodon.fetch_next(current_page)
return all_pages
def replay_toots(self, recurse=False):
"""Replay toots that were posted while the bot was offline."""
# Setup Mastodon API
mastodon = Mastodon(
client_id=self.bot_config['config']['client_cred_file'],
access_token=self.bot_config['config']['user_cred_file'],
api_base_url=self.bot_config['config']['api_base_url'])
self.cursor.execute('select max(toot_id) from toot_cache;')
last_seen_toot_id = self.cursor.fetchone()[0]
if last_seen_toot_id is None:
last_seen_toot_id = 0
first_page = mastodon.timeline_local(since_id=last_seen_toot_id)
all_pages = self.fetch_remaining(mastodon, first_page)
# Catch up ALL welcome messages that may have been missed
for status in all_pages:
self.welcome_user(status)
# Update max seen toot id (any calls to welcome_user will move the max)
self.cursor.execute('select max(toot_id) from toot_cache;')
last_seen_toot_id = self.cursor.fetchone()[0]
# Update last seen toot id
new_last_seen_toot_id = -1
if all_pages:
new_last_seen_toot_id = all_pages[0]['id']
if new_last_seen_toot_id > last_seen_toot_id:
self.cursor.execute('insert into toot_cache values (?)',
(new_last_seen_toot_id, ))
self.conn.commit()
# Recurse in case the catch up took long enough for more toots to enter the public timeline
# Do this only once to be safe
if recurse:
# Recurse ONCE to catch up on any missing toots posted while doing initial catch up
self.replay_toots()
def welcome_user(self, status):
"""Method that sets up toot and welcomes new users.
Method due to use in multiple places."""
toot_id = status['id']
federated = '@' in status['account']['acct']
username = status['account']['acct']
timestamp = status['created_at']
visibility = status['visibility']
# Cache toot
self.cursor.execute('insert into toot_cache values (?)', (toot_id, ))
self.conn.commit()
# Welcome any user who's posted publicly
if visibility == 'public' and not federated:
# Check if username has been seen for welcome
self.cursor.execute(
'select count(1) as found from welcome_cache where username = ?',
(username, ))
if self.cursor.fetchone()[0] > 0:
return
# Send welcome toot
toot(self.bot_config, username=username)
# Cache user to avoid duping welcome messages
self.cursor.execute('insert into welcome_cache values (?, ?)',
(username, timestamp))
self.conn.commit()
def on_update(self, status):
"""A new status has appeared!
'status' is the parsed JSON dictionary describing the
status.
"""
self.welcome_user(status)
def on_notification(self, notification):
"""A new notification.
'notification' is the parsed JSON dictionary describing the
notification.
"""
# We don't care if notifications come through our bot / curation account
# Leave handling notifications/folow up to the admins and e-mail notifications
pass
def on_delete(self, status_id):
"""A status has been deleted.
status_id is the status' integer ID.
"""
# Remove the status from the toot_cache if we see a delete
self.cursor.execute('delete from toot_cache where toot_id = ?',
(status_id, ))
self.conn.commit()
def handle_heartbeat(self):
"""The server has sent us a keep-alive message.
This callback may be useful to carry out periodic housekeeping
tasks, or just to confirm that the connection is still
open.
"""
# Consistently/constantly trim the toot cache to the most recent seen toot
self.cursor.execute("""\
DELETE FROM toot_cache WHERE toot_id <= ((SELECT MAX(toot_id) FROM toot_cache) - 1);
""")
self.conn.commit()
class CurationBot(StreamListener):
"""Implementation of the Mastodon.py StreamListener class for curation bot purposes."""
def __init__(self, bot_config):
StreamListener.__init__(self)
self.bot_config = bot_config
# Get access to cache
self.conn = sqlite3.connect(self.bot_config['curation']['cache_file'])
self.cursor = self.conn.cursor()
# Ensure cache table has been created
self.cursor.execute("""\
CREATE TABLE IF NOT EXISTS toot_cache (
toot_id INTEGER PRIMARY KEY,
federated BOOL,
username VARCHAR(2048),
is_reply BOOL,
is_boost BOOL,
toot_timestamp TIMESTAMP,
favorites INTEGER,
boosts INTEGER
);
""")
self.conn.commit()
def __del__(self):
# Cleanup connection to sqlite database for rss cache
self.conn.close()
def on_update(self, status):
"""A new status has appeared!
'status' is the parsed JSON dictionary describing the
status.
"""
if status['visibility'] == 'public':
toot_id = status['id']
federated = '@' in status['account']['acct']
username = status['account']['acct']
is_reply = status['in_reply_to_id'] is not None
is_boost = status['reblog'] is not None
timestamp = status['created_at']
favorites = status['favourites_count']
boosts = status['reblogs_count']
# TODO: Add support for tags? - Will need many to 1 relationship and a cross table
# tags = status['tags']
# Ensure a toot isn't cached twice for some odd reason
self.cursor.execute(
'select count(1) as found from toot_cache where toot_id = ?',
(toot_id, ))
if self.cursor.fetchone()[0] > 0:
return
# Cache toot
self.cursor.execute(
'insert into toot_cache values (?, ?, ?, ?, ?, ?, ?, ?)',
(toot_id, federated, username, is_reply, is_boost, timestamp,
favorites, boosts))
self.conn.commit()
def on_notification(self, notification):
"""A new notification.
'notification' is the parsed JSON dictionary describing the
notification.
"""
# We don't care if notifications come through our bot / curation account
# Leave handling notifications/folow up to the admins and e-mail notifications
pass
def on_delete(self, status_id):
"""A status has been deleted.
status_id is the status' integer ID.
"""
# Remove the status from the toot_cache if we see a delete
self.cursor.execute('delete from toot_cache where toot_id = ?',
(status_id, ))
self.conn.commit()
def handle_heartbeat(self):
"""The server has sent us a keep-alive message.
This callback may be useful to carry out periodic housekeeping
tasks, or just to confirm that the connection is still
open.
"""
print('!!!!!!!!heartbeat!!!!!!!!')
print(
' we should probably update statuses and whatnot here or at least do some '
'housekeeping for old toots')

12
custom_logic/example_custom_rss_include.py

@ -1,8 +1,8 @@
# -*- coding: utf-8 -*-
# Custom include logic for dPS photog challenges posted on main RSS feed
def include_article(article):
if 'Weekly' in article.title and 'Challenge' in article.title:
return True
"""Example of how to build an article filter."""
return False
def include_article(article):
"""Custom include logic for dPS photog challenges posted on main RSS feed."""
if 'Weekly' in article.title and 'Challenge' in article.title:
return True
return False

17
requirements.txt

@ -1,3 +1,14 @@
Mastodon.py
feedparser
ruamel.yaml
certifi==2017.4.17
chardet==3.0.4
dateutils==0.6.6
feedparser==5.2.1
idna==2.5
Mastodon.py==1.0.8
py==1.4.34
pytest==3.1.3
python-dateutil==2.6.1
pytz==2017.2
requests==2.18.1
ruamel.yaml==0.15.18
six==1.10.0
urllib3==1.21.1

2
setup.cfg

@ -0,0 +1,2 @@
[aliases]
test=pytest

15
setup.py

@ -0,0 +1,15 @@
"""mastodon_bot installer."""
from setuptools import setup
setup(
name='mastodon_bot',
version='0.1.0',
description='An announcement bot for Mastodon instances',
url='https://gitlab.com/photog.social/mastodon_bot',
author='KemoNine',
author_email='mcrosson_mastobot@kemonine.info',
license='GPLv3',
setup_requires=['pytest-runner'],
tests_require=['pytest'],
)

0
tests/__init__.py

9
tests/test_toot_bot.py

@ -0,0 +1,9 @@
"""Tests for the toot_bot module."""
# pylint: disable=missing-docstring
import toot_bot
def test_default_include_article():
assert toot_bot.default_include_article('some article')
assert not toot_bot.default_include_article(None)

119
Toot.py → toot.py

@ -1,58 +1,61 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Adjust PyLint settings
# pylint: disable=C0301
''' Various Toot help method for bots '''
from mastodon import Mastodon
def toot(config, article=None, username=None):
''' Send a toot '''
# Setup Mastodon API
mastodon = Mastodon(
client_id=config['config']['client_cred_file'],
access_token=config['config']['user_cred_file'],
api_base_url=config['config']['api_base_url']
)
# Process main toot
cw_text = None
if 'cw_text' in config['toot']:
cw_text = config['toot']['cw_text']
# Replace RSS variables if they are present
if article is not None:
cw_text = cw_text.replace('[[ title ]]', article.title)
cw_text = cw_text.replace('[[ url ]]', article.link)
cw_text = cw_text.replace('[[ published ]]', article.published)
# Replace username variable if it's present
if username is not None:
cw_text = cw_text.replace('[[ username ]]', username)
toot_text = config['toot']['toot_text']
# Replace RSS varaibles if they are present
if article is not None:
toot_text = toot_text.replace('[[ title ]]', article.title)
toot_text = toot_text.replace('[[ url ]]', article.link)
toot_text = toot_text.replace('[[ published ]]', article.published)
# Replace username variable if it's present
if username is not None:
toot_text = toot_text.replace('[[ username ]]', username)
# Send main toot
parent_toot = mastodon.status_post(toot_text, visibility=config['toot']['visibility'], spoiler_text=cw_text)
parent_toot_id = parent_toot['id']
# Handle any sub-toots
if 'subtoots' in config['toot']:
for subtoot in config['toot']['subtoots']:
cw_text = None
if 'cw_text' in subtoot:
cw_text = subtoot['cw_text']
mastodon.status_post(subtoot['toot_text'], in_reply_to_id=parent_toot_id, visibility=subtoot['visibility'], spoiler_text=cw_text)
# Return the parent toot dict just in case it's needed elsewhere
return parent_toot
#!/usr/bin/env python3
"""Various Toot help method for bots."""
from mastodon import Mastodon
def toot(config, article=None, username=None):
"""Send a toot."""
# Setup Mastodon API
mastodon = Mastodon(
client_id=config['config']['client_cred_file'],
access_token=config['config']['user_cred_file'],
api_base_url=config['config']['api_base_url']
)
# Process main toot
cw_text = None
if 'cw_text' in config['toot']:
cw_text = config['toot']['cw_text']
# Replace RSS variables if they are present
if article is not None:
cw_text = cw_text.replace('[[ title ]]', article.title)
cw_text = cw_text.replace('[[ url ]]', article.link)
cw_text = cw_text.replace('[[ published ]]', article.published)
# Replace username variable if it's present
if username is not None:
cw_text = cw_text.replace('[[ username ]]', username)
toot_text = config['toot']['toot_text']
# Replace RSS varaibles if they are present
if article is not None:
toot_text = toot_text.replace('[[ title ]]', article.title)
toot_text = toot_text.replace('[[ url ]]', article.link)
toot_text = toot_text.replace('[[ published ]]', article.published)
# Replace username variable if it's present
if username is not None:
toot_text = toot_text.replace('[[ username ]]', username)
# Send main toot
parent_toot = mastodon.status_post(toot_text, visibility=config['toot']['visibility'],
spoiler_text=cw_text)
parent_toot_id = parent_toot['id']
# Handle any sub-toots
if 'subtoots' in config['toot']:
for subtoot in config['toot']['subtoots']:
cw_text = None
if 'cw_text' in subtoot:
cw_text = subtoot['cw_text']
mastodon.status_post(
subtoot['toot_text'],
in_reply_to_id=parent_toot_id,
visibility=subtoot['visibility'],
spoiler_text=cw_text
)
# Return the parent toot dict just in case it's needed elsewhere
return parent_toot

225
toot_bot.py

@ -1,30 +1,36 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Adjust PyLint settings
# pylint: disable=C0301
''' Various Mastodon bots and their implementations '''
"""Various Mastodon bots and their implementations."""
import argparse
import sys
import os
import getpass
import codecs
import getpass
import importlib
import os
import pathlib
import sqlite3
from mastodon import Mastodon
import sys
import feedparser
from mastodon import Mastodon
from ruamel.yaml import YAML
from Toot import toot
from BotStreamListeners import CurationBot, WelcomeBot
from botstreamlisteners import CurationBot, WelcomeBot
from toot import toot
def init(config):
''' Initialize the Mastdon API app and cache the API key
Auto login if app creation succeeds '''
"""Initialize the Mastdon API app and cache the API key.
Auto login if app creation succeeds.
"""
# Prompt user to find out if they want to continue if the client_cred_file exists already
# Shouldn't happen twice per docs
# Shouldn't happen twice per docs
if os.path.exists(config['config']['client_cred_file']):
sys.exit('init should only ever be called once, try login instead, if that fails. delete ' + config['config']['client_cred_file'] + ' and re-run init')
sys.exit((
'init should only ever be called once, try login instead, if that fails. delete {} '
'and re-run init'
).format(config['config']['client_cred_file']))
# Create app for API
Mastodon.create_app(
@ -37,10 +43,15 @@ def init(config):
# Login to seed login credentials
login(config)
def login(config):
''' Login to API and cache API access token(s) '''
"""Login to API and cache API access token(s)."""
# Prompt for user/password (NEVER store them on disk!)
email = input('what is the e-mail that was used to setup the bot account on ' + config['config']['api_base_url'] + '? ')
email = input(
'what is the e-mail that was used to setup the bot account on {}? '.format(
config['config']['api_base_url']
)
)
password = getpass.getpass('what is the password for the account? ')
# Setup Mastodon API
@ -71,24 +82,35 @@ def login(config):
to_file=config['config']['user_cred_file']
)
# Default implementation on whether or not to include an RSS article
# Returns true and has the same method signature as what end users can setup for custom logic
def default_include_article(article):
''' Default implementation of RSS processing logic
Extended via external file if necessary '''
"""Default implementation of RSS processing logic.
Extended via external file if necessary.
"""
if article is None:
return False
return True
def rss(config):
''' Parse RSS feed and toot any new articles '''
"""Parse RSS feed and toot any new articles."""
# Setup custom include logic before opening any database connections
include_article_fn = default_include_article
if 'custom_logic_include_file' in config['rss'] and os.path.exists(os.path.abspath(config['rss']['custom_logic_include_file'])):
from importlib.machinery import SourceFileLoader
custom_logic = SourceFileLoader('custom.logic', os.path.abspath(config['rss']['custom_logic_include_file'])).load_module()
include_article_fn = custom_logic.include_article
try:
custom_logic_module = config['rss']['custom_logic_include_file']
except KeyError:
pass
else:
try:
module = importlib.import_module(custom_logic_module)
except ImportError:
sys.exit('custom logic file does not exist!')
include_article_fn = module.include_article
# Crash on reading the feed before doing any database operations or tooting
feed = feedparser.parse(config['rss']['feed'])
@ -121,8 +143,9 @@ def rss(config):
# Cleanup connection to sqlite database for rss cache
conn.close()
def curate(config):
''' Curate (fav/boost) popular posts on local and/or federated timelines '''
"""Curate (fav/boost) popular posts on local and/or federated timelines."""
# Setup Mastodon API
mastodon = Mastodon(
client_id=config['config']['client_cred_file'],
@ -132,8 +155,9 @@ def curate(config):
mastodon.public_stream(CurationBot(config))
def welcome(config):
''' Welcome new users to the instance '''
"""Welcome new users to the instance."""
# Setup Mastodon API
mastodon = Mastodon(
client_id=config['config']['client_cred_file'],
@ -143,113 +167,136 @@ def welcome(config):
mastodon.public_stream(WelcomeBot(config))
if __name__ == '__main__':
def handle_command_line():
"""Parse and act on the command line."""
# Global CLI arguments/options
PARSER = argparse.ArgumentParser()
PARSER.add_argument('--config', help='path to config file', required=True)
SUBPARSERS = PARSER.add_subparsers(help='commands', dest='command')
parser = argparse.ArgumentParser()
parser.add_argument('--config', help='path to config file', required=True)
subparsers = parser.add_subparsers(help='commands', dest='command')
# Actions / commands
INIT_PARSER = SUBPARSERS.add_parser('init', help='initialize credentials')
INIT_PARSER.add_argument('--totp',
help='use totp login (requires user to open URL in browser and then enter token to bot during init/login)',
action='store_true')
LOGIN_PARSER = SUBPARSERS.add_parser('login', help='login to instance if credentials have expired')
LOGIN_PARSER.add_argument('--totp',
help='use totp login (requires user to open URL in browser and then enter token to bot during init/login)',
action='store_true')
TOOT_PARSER = SUBPARSERS.add_parser('toot', help='send configured toot')
RSS_PARSER = SUBPARSERS.add_parser('rss', help='cross post articles from an rss feed')
CURATE_PARSER = SUBPARSERS.add_parser('curation', help='Run the curation bot (service)')
WELCOME_PARSER = SUBPARSERS.add_parser('welcome', help='Run the welcome bot (service)')
init_parser = subparsers.add_parser('init', help='initialize credentials')
init_parser.add_argument(
'--totp',
help=(
'use totp login (requires user to open URL in browser and then enter token to bot '
'during init/login)'
),
action='store_true')
login_parser = subparsers.add_parser(
'login',
help='login to instance if credentials have expired')
login_parser.add_argument(
'--totp',
help=(
'use totp login (requires user to open URL in browser and then enter token to bot '
'during init/login)'
),
action='store_true')
subparsers.add_parser('toot', help='send configured toot')
subparsers.add_parser('rss', help='cross post articles from an rss feed')
subparsers.add_parser('curation', help='Run the curation bot (service)')
subparsers.add_parser('welcome', help='Run the welcome bot (service)')
# Parse CLI arguments
ARGS = PARSER.parse_args()
args = parser.parse_args()
# Make sure a command was specified
if ARGS.command is None:
if args.command is None:
sys.exit('command must be specified')
# Make sure the config file specified exists
CONFIG_PATH = os.path.abspath(ARGS.config)
if not os.path.exists(CONFIG_PATH):
config_path = os.path.abspath(args.config)
if not os.path.exists(config_path):
sys.exit('invalid path to config file')
# Read/parse config file
CONFIG = None
config = None
# Fix unicode special character error
with codecs.open(CONFIG_PATH, "r", "utf8") as stream:
CONFIG = YAML(typ='safe').load(stream)
with codecs.open(config_path, "r", "utf8") as stream:
config = YAML(typ='safe').load(stream)
# Add TOTP flag to CONFIG which is passed to each function
if 'totp' in ARGS:
CONFIG['config']['totp'] = ARGS.totp
if 'totp' in args:
config['config']['totp'] = args.totp
# Ensure client_cred_file path is valid
if not os.path.exists(os.path.abspath(os.path.split(CONFIG['config']['client_cred_file'])[0])):
client_cred_file = pathlib.Path(config['config']['client_cred_file'])
if not client_cred_file.parent.exists(): # pylint: disable=no-member
sys.exit('client_cred_file directory does not exist')
# Warn user that the config file WILL be created
if not os.path.exists(os.path.abspath(CONFIG['config']['client_cred_file'])):
if not client_cred_file.exists():
print('warning: client_cred_file will be created')
# Ensure user_cred_file path is valid
if not os.path.exists(os.path.abspath(os.path.split(CONFIG['config']['user_cred_file'])[0])):
user_cred_file = pathlib.Path(config['config']['client_cred_file'])
if not user_cred_file.parent.exists(): # pylint: disable=no-member
sys.exit('user_cred_file directory does not exist')
if not os.path.exists(os.path.abspath(CONFIG['config']['user_cred_file'])):
if not user_cred_file.exists():
print('warning: user_cred file will be created')
# Deal with init command
if ARGS.command == 'init':
init(CONFIG)
if args.command == 'init':
init(config)
# Deal with login command
if ARGS.command == 'login':
login(CONFIG)
if args.command == 'login':
login(config)
# Deal with curation command
if ARGS.command == 'curation':
if args.command == 'curation':
# Verify toot cache file folder exists
if not os.path.exists(os.path.abspath(os.path.split(CONFIG['curation']['cache_file'])[0])):
cache_file = pathlib.Path(config['curation']['cache_file'])
if not cache_file.parent.exists(): # pylint: disable=no-member
# Warn if curation cache file doesn't exist
sys.exit('curation cache_file directory does not exist')
# Warn if curation cache file doesn't exist
if not os.path.exists(os.path.abspath(CONFIG['curation']['cache_file'])):
if not cache_file.exists():
print('warning: curation cache_file file will be created')
curate(CONFIG)
curate(config)
# Deal with welcome command
if ARGS.command == 'welcome':
if args.command == 'welcome':
# Verify toot cache file folder exists
if not os.path.exists(os.path.abspath(os.path.split(CONFIG['welcome']['cache_file'])[0])):
cache_file = pathlib.Path(config['welcome']['cache_file'])
if not cache_file.parent.exists(): # pylint: disable=no-member
# Warn if welcome cache file doesn't exist
sys.exit('welcome cache_file directory does not exist')
# Warn if welcome cache file doesn't exist
if not os.path.exists(os.path.abspath(CONFIG['welcome']['cache_file'])):
if not cache_file.exists():
print('warning: welcome cache_file file will be created')
welcome(CONFIG)
welcome(config)
# Deal with toot command
if ARGS.command == 'toot':
if args.command == 'toot':
# Ensure main toot is <= 500 characters
TOOT_LENGTH = len(CONFIG['toot']['toot_text'])
if 'cw_text' in CONFIG['toot']:
TOOT_LENGTH = TOOT_LENGTH + len(CONFIG['toot']['cw_text'])
if TOOT_LENGTH > 500:
toot_length = len(config['toot']['toot_text'])
if 'cw_text' in config['toot']:
toot_length = toot_length + len(config['toot']['cw_text'])
if toot_length > 500:
sys.exit('toot length must be <= 500 characters (including cw_text)')
# Ensure sub toots are <= 500 characters
if 'subtoots' in CONFIG['toot']:
for subtoot in CONFIG['toot']['subtoots']:
TOOT_LENGTH = len(subtoot['toot_text'])
if 'subtoots' in config['toot']:
for subtoot in config['toot']['subtoots']:
toot_length = len(subtoot['toot_text'])
if 'cw_text' in subtoot:
TOOT_LENGTH = TOOT_LENGTH + len(subtoot['cw_text'])
if TOOT_LENGTH > 500:
toot_length = toot_length + len(subtoot['cw_text'])
if toot_length > 500:
sys.exit('sub toot length must be <= 500 characters (including cw_text)')
toot(CONFIG)
toot(config)
# Deal with rss command
if ARGS.command == 'rss':
if 'custom_logic_include_file' in CONFIG['rss']:
if not os.path.exists(os.path.abspath(CONFIG['rss']['custom_logic_include_file'])):
sys.exit('custom logic file does not exist!')
# Verify RSS cache file folder exists
if not os.path.exists(os.path.abspath(os.path.split(CONFIG['rss']['cache_file'])[0])):
if args.command == 'rss':
cache_file = pathlib.Path(config['rss']['cache_file'])
if not cache_file.parent.exists(): # pylint: disable=no-member
# Warn if RSS cache file doesn't exist
sys.exit('rss cache_file directory does not exist')
# Warn if RSS cache file doesn't exist
if not os.path.exists(os.path.abspath(CONFIG['rss']['cache_file'])):
if not cache_file.exists():
print('warning: rss_cache_file file will be created')
rss(CONFIG)
rss(config)
if __name__ == '__main__':
handle_command_line()

Loading…
Cancel
Save