You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
420 lines
15 KiB
420 lines
15 KiB
#!/usr/bin/env python3 |
|
|
|
"""Different stream listener implementations used by bots.""" |
|
|
|
import copy |
|
import pprint |
|
import sqlite3 |
|
|
|
from toot import toot |
|
from mastodon import Mastodon, StreamListener |
|
|
|
class PublicTimelineBot(StreamListener): |
|
"""Implementation of the Mastodon.py StreamListener class for public timeline bots.""" |
|
|
|
def process_toot(self, status): |
|
"""Implement this for public timeline bots""" |
|
pass |
|
|
|
def process_notification(self, notification): |
|
"""Implement this for processing notifications from user_stream""" |
|
pass |
|
|
|
def setup_cache(self): |
|
"""Setup any custom caching/tables/etc here""" |
|
pass |
|
|
|
def __init__(self, bot_config): |
|
StreamListener.__init__(self) |
|
self.bot_config = bot_config |
|
|
|
mastodon = Mastodon( |
|
client_id=self.bot_config['config']['client_cred_file'], |
|
access_token=self.bot_config['config']['user_cred_file'], |
|
api_base_url=self.bot_config['config']['api_base_url'] |
|
) |
|
user = mastodon.account_verify_credentials() |
|
self.account_id = user['id'] |
|
|
|
# Get access to cache |
|
self.conn = sqlite3.connect(self.bot_config['stream']['cache_file']) |
|
self.cursor = self.conn.cursor() |
|
|
|
self.cursor.execute("""\ |
|
CREATE TABLE IF NOT EXISTS toot_cache ( |
|
toot_id INTEGER PRIMARY KEY |
|
); |
|
""") |
|
self.cursor.execute("""\ |
|
CREATE TABLE IF NOT EXISTS notification_cache ( |
|
notification_id INTEGER PRIMARY KEY |
|
); |
|
""") |
|
self.conn.commit() |
|
|
|
self.setup_cache() |
|
|
|
# Replay any toots that were missed while offline and welcome new users |
|
print('Replaying missed toots') |
|
#self.replay_toots(True) |
|
print(' Replay complete') |
|
print('Replaying missed notifications') |
|
#self.replay_notifications() |
|
print(' Replay complete') |
|
|
|
def __del__(self): |
|
# Cleanup connection to sqlite database for welcome cache |
|
self.conn.commit() |
|
self.conn.close() |
|
|
|
def fetch_remaining(self, mastodon, first_page): |
|
"""Work around for odd behavior in Mastodon.py's official fetch_remaining code.""" |
|
|
|
# FIXME: Remove this method when below GitHub issue is closed |
|
# and a new release available FIXME: Don't forget to update |
|
# minimum version of Mastodon.py to match when the fix is |
|
# released https://github.com/halcy/Mastodon.py/issues/59 |
|
|
|
first_page = copy.deepcopy(first_page) |
|
|
|
all_pages = [] |
|
current_page = first_page |
|
while current_page is not None and current_page: |
|
all_pages.extend(current_page) |
|
current_page = mastodon.fetch_next(current_page) |
|
|
|
return all_pages |
|
|
|
def replay_toots(self, recurse=False): |
|
"""Replay toots that were posted while the bot was offline.""" |
|
# Setup Mastodon API |
|
mastodon = Mastodon( |
|
client_id=self.bot_config['config']['client_cred_file'], |
|
access_token=self.bot_config['config']['user_cred_file'], |
|
api_base_url=self.bot_config['config']['api_base_url']) |
|
|
|
self.cursor.execute('select max(toot_id) from toot_cache;') |
|
last_seen_toot_id = self.cursor.fetchone()[0] |
|
|
|
if not last_seen_toot_id: |
|
last_seen_toot_id = 0 |
|
first_page = mastodon.timeline_public(since_id=last_seen_toot_id) |
|
all_pages = self.fetch_remaining(mastodon, first_page) |
|
|
|
# Catch up ALL welcome messages that may have been missed |
|
for status in all_pages: |
|
self.process_toot(status) |
|
|
|
# Update max seen toot id (any calls to welcome_user will move the max) |
|
self.cursor.execute('select max(toot_id) from toot_cache;') |
|
last_seen_toot_id = self.cursor.fetchone()[0] |
|
if not last_seen_toot_id: |
|
last_seen_toot_id = 0 |
|
|
|
# Update last seen toot id |
|
new_last_seen_toot_id = -1 |
|
if all_pages: |
|
new_last_seen_toot_id = all_pages[0]['id'] |
|
|
|
if int(new_last_seen_toot_id) > last_seen_toot_id: |
|
self.cursor.execute('insert into toot_cache values (?)', |
|
(new_last_seen_toot_id, )) |
|
self.conn.commit() |
|
|
|
# Recurse in case the catch up took long enough for more toots to enter the public timeline |
|
# Do this only once to be safe |
|
if recurse: |
|
# Recurse ONCE to catch up on any missing toots posted while doing initial catch up |
|
self.replay_toots() |
|
|
|
def replay_notifications(self): |
|
"""Replay notifications that were posted while the bot was offline.""" |
|
# Setup Mastodon API |
|
mastodon = Mastodon( |
|
client_id=self.bot_config['config']['client_cred_file'], |
|
access_token=self.bot_config['config']['user_cred_file'], |
|
api_base_url=self.bot_config['config']['api_base_url']) |
|
|
|
self.cursor.execute('select max(notification_id) from notification_cache;') |
|
last_seen_notification_id = self.cursor.fetchone()[0] |
|
if not last_seen_notification_id: |
|
last_seen_notification_id = 0 |
|
try: |
|
first_page = mastodon.notifications(since_id=last_seen_notification_id) |
|
except KeyError: |
|
return |
|
|
|
# Catch up ALL notifications that may have been missed |
|
for notification in first_page: |
|
self.process_notification(notification) |
|
|
|
# Update max seen notification id (any calls to process_notification will move the max) |
|
self.cursor.execute('select max(notification_id) from notification_cache;') |
|
last_seen_notification_id = self.cursor.fetchone()[0] |
|
if not last_seen_notification_id: |
|
last_seen_notification_id = 0 |
|
|
|
# Update last seen toot id |
|
new_last_seen_notification_id = -1 |
|
if first_page: |
|
new_last_seen_notification_id = first_page[0]['id'] |
|
|
|
if int(new_last_seen_notification_id) > last_seen_notification_id: |
|
self.cursor.execute('insert into notification_cache values (?)', |
|
(new_last_seen_notification_id, )) |
|
self.conn.commit() |
|
self.replay_notifications() |
|
|
|
def on_update(self, status): |
|
"""A new status has appeared! |
|
|
|
'status' is the parsed JSON dictionary describing the |
|
status. |
|
""" |
|
toot_id = status['id'] |
|
|
|
self.cursor.execute('select max(toot_id) from toot_cache;') |
|
last_seen_toot_id = self.cursor.fetchone()[0] |
|
if last_seen_toot_id == toot_id: |
|
return |
|
|
|
# Cache toot |
|
try: |
|
self.cursor.execute('insert into toot_cache values (?)', (toot_id, )) |
|
self.conn.commit() |
|
except: |
|
pass |
|
self.process_toot(status) |
|
|
|
def on_notification(self, notification): |
|
"""A new notification. |
|
|
|
'notification' is the parsed JSON dictionary describing the |
|
notification. |
|
""" |
|
notification_id = notification['id'] |
|
|
|
self.cursor.execute('select max(notification_id) from notification_cache;') |
|
last_seen_notification_id = self.cursor.fetchone()[0] |
|
if last_seen_notification_id == notification_id: |
|
return |
|
|
|
# Cache notification |
|
self.cursor.execute('insert into notification_cache values (?)', (notification_id, )) |
|
self.conn.commit() |
|
self.process_notification(notification) |
|
|
|
def on_delete(self, status_id): |
|
"""A status has been deleted. |
|
|
|
status_id is the status' integer ID. |
|
""" |
|
# Remove the status from the toot_cache if we see a delete |
|
self.cursor.execute('delete from toot_cache where toot_id = ?', |
|
(status_id, )) |
|
self.conn.commit() |
|
|
|
def handle_heartbeat(self): |
|
"""The server has sent us a keep-alive message. |
|
|
|
This callback may be useful to carry out periodic housekeeping |
|
tasks, or just to confirm that the connection is still |
|
open. |
|
""" |
|
# Consistently/constantly trim the toot cache to the most recent seen toot |
|
self.cursor.execute("""\ |
|
DELETE FROM toot_cache WHERE toot_id <= ((SELECT MAX(toot_id) FROM toot_cache) - 1); |
|
""") |
|
# Consistently/constantly trim the notification cache to the most recent seen notification |
|
self.cursor.execute("""\ |
|
DELETE FROM notification_cache WHERE notification_id <= ((SELECT MAX(notification_id) FROM notification_cache) - 1); |
|
""") |
|
self.conn.commit() |
|
|
|
class WelcomeBot(PublicTimelineBot): |
|
"""Public timeline bot that welcomes users to an instance""" |
|
|
|
def setup_cache(self): |
|
"""Setup the local cache""" |
|
|
|
# Ensure cache table has been created |
|
self.cursor.execute("""\ |
|
CREATE TABLE IF NOT EXISTS welcome_cache ( |
|
username VARCHAR(2048) PRIMARY KEY, |
|
seen_timestamp TIMESTAMP |
|
); |
|
""") |
|
self.conn.commit() |
|
|
|
def process_toot(self, status): |
|
"""Method that sets up toot and welcomes new users. |
|
|
|
Method due to use in multiple places.""" |
|
federated = '@' in status['account']['acct'] |
|
username = status['account']['acct'] |
|
timestamp = status['created_at'] |
|
visibility = status['visibility'] |
|
|
|
# Welcome any user who's posted publicly |
|
if visibility == 'public' and not federated: |
|
# Check if username has been seen for welcome |
|
self.cursor.execute( |
|
'select count(1) as found from welcome_cache where username = ?', |
|
(username, )) |
|
if self.cursor.fetchone()[0] > 0: |
|
return |
|
|
|
# Send welcome toot |
|
toot(self.bot_config, username=username) |
|
|
|
# Cache user to avoid duping welcome messages |
|
self.cursor.execute('insert into welcome_cache values (?, ?)', |
|
(username, timestamp)) |
|
self.conn.commit() |
|
|
|
class AutoRespondBot(PublicTimelineBot): |
|
"""User timeline bot that auto responds to @'s""" |
|
|
|
def setup_cache(self): |
|
"""No extra logic necessary""" |
|
pass |
|
|
|
def process_notification(self, notification): |
|
"""Method that handles notifications.""" |
|
if not notification['type'] == 'mention': |
|
return |
|
|
|
status = notification['status'] |
|
username = status['account']['username'] |
|
visibility = status['visibility'] |
|
in_reply_to = status['id'] |
|
mentions = status['mentions'] |
|
mentions.append(status['account']) |
|
|
|
# Respond to @'s |
|
if visibility == 'public' and not status['account']['id'] == self.account_id: |
|
# Send autoresponse toot |
|
autoresponse = toot(self.bot_config, username=username, in_reply_to=in_reply_to, |
|
mentions=mentions) |
|
autoresponse_id = autoresponse['id'] |
|
# Cache newly sent toot to prevent any looping |
|
self.cursor.execute('insert into toot_cache values (?)', (autoresponse_id, )) |
|
self.conn.commit() |
|
|
|
class NewFollowerAutoRespondBot(PublicTimelineBot): |
|
"""User timeline bot that auto responds to new followers""" |
|
|
|
def setup_cache(self): |
|
"""No extra logic necessary""" |
|
pass |
|
|
|
def process_notification(self, notification): |
|
"""Method that handles notifications.""" |
|
if not notification['type'] == 'follow': |
|
return |
|
# If auto-responses to followers should be limited to local accounts |
|
if 'local_only' in self.bot_config['config'] and '@' in notification['account']['acct']: |
|
return |
|
|
|
if not notification['account']['id'] == self.account_id: |
|
# Send autoresponse toot |
|
autoresponse = toot(self.bot_config, username=notification['account']['username']) |
|
autoresponse_id = autoresponse['id'] |
|
# Cache newly sent toot to prevent any looping |
|
try: |
|
self.cursor.execute('insert into toot_cache values (?)', (autoresponse_id, )) |
|
self.conn.commit() |
|
except: |
|
pass |
|
|
|
class FollowBot(PublicTimelineBot): |
|
"""Public timeline bot that follows newly seen users""" |
|
|
|
def setup_cache(self): |
|
"""Setup the local cache""" |
|
|
|
# Ensure cache table has been created |
|
self.cursor.execute("""\ |
|
CREATE TABLE IF NOT EXISTS follow_cache ( |
|
username VARCHAR(2048) PRIMARY KEY, |
|
seen_timestamp TIMESTAMP |
|
) |
|
""") |
|
|
|
self.conn.commit() |
|
|
|
def process_toot(self, status): |
|
"""Method that follows unseen users.""" |
|
print('========================') |
|
pprint.pprint(status) |
|
|
|
if 'reblog' in status and status['reblog'] is not None: |
|
print('has reblog!') |
|
status = status['reblog'] |
|
else: |
|
print('no reblog') |
|
return # No need for additional stuff below if it's not a boost |
|
if status is None: |
|
print('status is none?') |
|
return # Avoid a potential null error when not a reblog |
|
|
|
username = status['account']['acct'] |
|
federated = '@' in username |
|
instance = None |
|
|
|
print(username) |
|
|
|
if federated: |
|
print(' IS federated') |
|
instance = status['account']['acct'].split('@')[1] |
|
|
|
timestamp = status['created_at'] |
|
visibility = status['visibility'] |
|
|
|
# Follow any user who's posted publicly |
|
if visibility == 'public' and federated \ |
|
and instance.lower() not in (instance.lower() for instance in self.bot_config['follow']['blacklist']['instances']) \ |
|
and username.lower() not in (user.lower() for user in self.bot_config['follow']['blacklist']['users']) \ |
|
and 'nobot' not in status['account']['note'].lower(): |
|
|
|
print(' go ahead with follow') |
|
|
|
# ABOVE CONDITIONAL WARNING: depending on how the formatted bio looks, #nobot may not match!!!!! |
|
|
|
# Deal with tag processing |
|
tags_ok = True # Default to tag's ok (assumes tags are not defined in config) |
|
if 'tags' in self.bot_config['follow']: |
|
tags_ok = False |
|
for toot_tag in status['tags']: |
|
if toot_tag['name'].lower() in (tag.lower() for tag in self.bot_config['follow']['tags']): |
|
tags_ok = True |
|
break |
|
# Bail out of toot wasn't tagged properly |
|
if not tags_ok: |
|
print(' no tags matched') |
|
return |
|
|
|
# Check if username has been seen for welcome |
|
self.cursor.execute( |
|
'select count(1) as found from follow_cache where username = ?', |
|
(username, )) |
|
if self.cursor.fetchone()[0] > 0: |
|
print(' already followed (cached), returning') |
|
return |
|
|
|
print(' following') |
|
|
|
mastodon = Mastodon( |
|
client_id=self.bot_config['config']['client_cred_file'], |
|
access_token=self.bot_config['config']['user_cred_file'], |
|
api_base_url=self.bot_config['config']['api_base_url'] |
|
) |
|
|
|
mastodon.account_follow(status['account']['id']) |
|
|
|
# Cache user to avoid duping welcome messages |
|
self.cursor.execute('insert into follow_cache values (?, ?)', |
|
(username, timestamp)) |
|
self.conn.commit() |
|
print(' processing complete') |
|
|
|
|