Remove mastodon.py dependency

This commit is contained in:
Wardyn 2023-01-26 19:40:26 -08:00
parent e256ad251c
commit 69b73260e7
9 changed files with 234 additions and 145 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
config.json

View File

@ -1,13 +1,21 @@
#!/usr/bin/python3.8 #!/usr/bin/python3.8
from mastodon import Mastodon
import sys import sys
import os import os
import re import re
import sys
import argparse import argparse
import time import time
import json
import requests
parser = argparse.ArgumentParser(description='Remove follow requests that match the following filters:\n1: Has less than n (defaults to 1) posts\n2: Has no profile picture\n3: Has no bio\n\nAdding extra filters is somewhat easy if you are familiar with python as well as the mastodon.py wrapper.', formatter_class=argparse.RawTextHelpFormatter) parent = os.path.dirname(os.path.realpath(__file__))
if not os.path.exists(os.path.join(parent, 'config.json')):
generate_config("Wardyn's feditools", "read write follow push")
with open(os.path.join(parent, 'config.json'), 'r') as config_file:
config = json.load(config_file)
session = requests.Session()
session.headers.update({"Authorization" : "Bearer " + config['user_token']})
parser = argparse.ArgumentParser(description='Remove follow requests that match the following filters:\n1: Has less than n (defaults to 1) posts\n2: Has no profile picture\n3: Has no bio\n', formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('-t', '--threshold', parser.add_argument('-t', '--threshold',
action='store', action='store',
type=int, type=int,
@ -22,6 +30,13 @@ parser.add_argument('-i', '--instances',
default=None, default=None,
dest='instances' dest='instances'
) )
parser.add_argument('-b', '--blank',
action='store',
type=str,
default=None,
help='If your instance has a custom image for blank pfps set, please specify the url to the image with this flag',
dest='custompfp'
)
parser.add_argument('-p', '--posts', parser.add_argument('-p', '--posts',
action='store', action='store',
type=int, type=int,
@ -65,81 +80,56 @@ simulate = args.simulate
accept= args.accept accept= args.accept
loop = args.loop loop = args.loop
instances = args.instances instances = args.instances
custompfp = args.custompfp
if instances is not None: if instances is not None:
instances = instances.split(", ") instances = instances.split(", ")
if type(loop) == int: if type(loop) == int:
loop = loop * 60 * 60 loop = loop * 60 * 60
# Ensure Fediverse credentials blanks = [config['instance'] + '/avatars/original/missing.png', config['instance'] + '/images/avi.png']
parent = os.path.dirname(os.path.realpath(__file__)) if custompfp:
if os.path.exists(os.path.join(parent, '.creds')) == False: blanks.append(custompfp)
os.mkdir(os.path.join(parent, '.creds'))
if os.path.exists(os.path.join(parent, '.creds', 'client.secret')) == False:
instance = input('Please enter your instance: ')
if not instance[:4] == 'http':
instance = 'https://' + instance
Mastodon.create_app('Wardyns fedi tools', api_base_url = instance, to_file = os.path.join(parent, '.creds', 'client.secret'))
mastodon = Mastodon( def filterposts(min, follow_request):
client_id = os.path.join(parent, '.creds', 'client.secret'), posts = follow_request['statuses_count']
)
if os.path.exists(os.path.join(parent, '.creds', 'user.secret')) == False:
username = input('Enter your username: ')
password = input('Enter your password: ')
mastodon.log_in(username=username, password=password, scopes=['read', 'write'], to_file=os.path.join(parent, '.creds', 'user.secret'))
mastodon = Mastodon(
access_token = os.path.join(parent, '.creds', 'user.secret')
)
def filterposts(min, request):
posts = request['statuses_count']
if posts >= min: if posts >= min:
return(True) return(True)
else: else:
return(False) return(False)
def filterbio(request): def filterbio(follow_request):
if request['note'] == '<p></p>': if follow_request['note'] == '<p></p>':
return(False) return(False)
else: else:
return(True) return(True)
def filterpfp(request): def filterpfp(follow_request):
if request['avatar'] == mastodon.api_base_url + '/avatars/original/missing.png' or request['avatar'] == mastodon.api_base_url + '/images/avi.png': if follow_request['avatar'] in blanks:
return(False) return(False)
else: else:
return(True) return(True)
while True: while True:
denied=[] denied=[]
accepted=[] accepted=[]
while True:
try: follow_requests = session.get(config['instance'] + '/api/v1/follow_requests', params={'limit':80}).json()
requests = mastodon.follow_requests()
break if len(follow_requests) > 0:
except: for follow_request in follow_requests:
continue postcheck = filterposts(minposts, follow_request)
if len(requests) > 0: pfpcheck = filterpfp(follow_request)
for request in requests: biocheck = filterbio(follow_request)
#print(request['fqn'])
#print(request['id']) following = session.get(config['instance'] + '/api/v1/accounts/relationships', params={'id':follow_request['id']}).json()[0]['following']
postcheck = filterposts(minposts, request)
pfpcheck = filterpfp(request)
biocheck = filterbio(request)
#print('Has more than one post?: ' + str(postcheck))
#print('Has a pfp?: ' + str(pfpcheck))
#print('Has a bio?: ' + str(biocheck))
following = mastodon.account_relationships(request)[0]['following']
blockedinstance = False blockedinstance = False
if instances is not None: if instances is not None:
blockedinstance = request['fqn'].split('@')[-1] in instances blockedinstance = follow_request['fqn'].split('@')[-1] in instances
if postcheck + biocheck + pfpcheck < threshold and following == False or blockedinstance == True: if postcheck + biocheck + pfpcheck < threshold and following == False or blockedinstance == True:
denied.append(request) denied.append(follow_request)
elif accept == True: elif accept == True:
accepted.append(request) accepted.append(follow_request)
print('DENIED: ') print('DENIED: ')
for request in denied: for follow_request in denied:
print(request['fqn']) print(follow_request['fqn'])
confirm = None confirm = None
if auto == False: if auto == False:
while True: while True:
@ -151,16 +141,16 @@ while True:
continue continue
break break
if auto == True or confirm == 'y': if auto == True or confirm == 'y':
for request in denied: for follow_request in denied:
if simulate: if simulate:
print('This is where ' + request['fqn'] + ' would be rejected') print('This is where ' + follow_request['fqn'] + ' would be rejected')
else: else:
mastodon.follow_request_reject(request['id']) session.post(config['instance'] + '/api/v1/follow_requests/' + follow_request['id'] + '/reject')
for request in accepted: for follow_request in accepted:
if simulate: if simulate:
print('This is where ' + request['fqn'] + ' would be accepted') print('This is where ' + follow_request['fqn'] + ' would be accepted')
else: else:
mastodon.follow_request_authorize(request['id']) session.post(config['instance'] + '/api/v1/follow_requests/' + follow_request['id'] + '/authorize')
if loop == None: if loop == None:
break break
else: else:

6
config_template.json Normal file
View File

@ -0,0 +1,6 @@
{
"instance":"",
"client_id":"",
"client_secret":"",
"user_token":""
}

View File

@ -1,9 +1,19 @@
# Import modules # Import modules
from mastodon import Mastodon
import os import os
import html2text import html2text
from argparse import ArgumentParser from argparse import ArgumentParser
import os import json
import requests
parent = os.path.dirname(os.path.realpath(__file__))
if not os.path.exists(os.path.join(parent, 'config.json')):
generate_config("Wardyn's feditools", "read write follow push")
with open(os.path.join(parent, 'config.json'), 'r') as config_file:
config = json.load(config_file)
session = requests.Session()
session.headers.update({"Authorization" : "Bearer " + config['user_token']})
# Initialize arguments # Initialize arguments
parser = ArgumentParser(description='Search a fedi users posts for a specific word or phrase') parser = ArgumentParser(description='Search a fedi users posts for a specific word or phrase')
@ -36,30 +46,6 @@ dms = args.dms
account = args.account account = args.account
pattern = args.pattern pattern = args.pattern
# Ensure Fediverse credentials
parent = os.path.dirname(os.path.realpath(__file__))
if os.path.exists(os.path.join(parent, '.creds')) == False:
os.mkdir(os.path.join(parent, '.creds'))
if os.path.exists(os.path.join(parent, '.creds', 'client.secret')) == False:
instance = input('Please enter your instance: ')
if not instance[:4] == 'http':
instance = 'https://' + instance
Mastodon.create_app('Wardyns fedi tools', api_base_url = instance, to_file = os.path.join(parent, '.creds', 'client.secret'))
mastodon = Mastodon(
client_id = os.path.join(parent, '.creds', 'client.secret'),
)
if os.path.exists(os.path.join(parent, '.creds', 'user.secret')) == False:
username = input('Enter your username: ')
password = input('Enter your password: ')
mastodon.log_in(username=username, password=password, scopes=['read', 'write'], to_file=os.path.join(parent, '.creds', 'user.secret'))
mastodon = Mastodon(
access_token = os.path.join(parent, '.creds', 'user.secret')
)
# Main block # Main block
if case == False: if case == False:
pattern = pattern.lower() pattern = pattern.lower()
@ -70,8 +56,8 @@ htmlconvert.body_width = 0
if account[0] == '@': if account[0] == '@':
account = account[1:] account = account[1:]
if len(account.split('@')) == 1: if len(account.split('@')) == 1:
account = account + '@' + mastodon.api_base_url.split('/')[2] account = account + '@' + config['instance'].split('/')[2]
accountlist = mastodon.account_search(account) accountlist = session.get(config['instance'] + '/api/v2/search', params={'q':account}).json()['accounts']
for curaccount in accountlist: for curaccount in accountlist:
print(curaccount['fqn']) print(curaccount['fqn'])
@ -85,20 +71,11 @@ accid = account['id']
print('Searching for posts including "' + pattern + '" from user ' + account['fqn']) print('Searching for posts including "' + pattern + '" from user ' + account['fqn'])
print('\n---\n') print('\n---\n')
while True: while True:
while True: statuses = session.get(config['instance'] + '/api/v1/accounts/' + accid + '/statuses', params={'max_id':oldest_status_id, 'limit':40}).json()
try:
statuses = mastodon.account_statuses(accid, max_id=oldest_status_id, limit=1000)
break
except IndexError:
break
except:
pass
if len(statuses) == 0:
break
oldest_status_id = statuses[-1]['id'] oldest_status_id = statuses[-1]['id']
for status in statuses: for status in statuses:
if status['reblog'] == None: if status['reblog'] == None:
if status['visibility'] == 'direct' and dms == False: if status['visibility'] == 'direct' and not dms:
continue continue
content = str(htmlconvert.handle(status['content'])) content = str(htmlconvert.handle(status['content']))
if case == False: if case == False:
@ -107,4 +84,6 @@ while True:
print(content) print(content)
print('\nlink: ' + status['url']) print('\nlink: ' + status['url'])
print('\n---\n') print('\n---\n')
if len(statuses) < 40:
break
print('Finished searching') print('Finished searching')

34
generate_config.py Normal file
View File

@ -0,0 +1,34 @@
import os
import requests
import json
from urllib.parse import urlencode
def generate_config(app_name, scopes):
#Ensure Credentials
parent = os.path.dirname(os.path.realpath(__file__))
with open(os.path.join(parent, 'config_template.json'), 'r') as template:
config = json.load(template)
#Create app
instance = input('Please enter your instance: ')
if not instance[:4] == 'http':
instance = 'https://' + instance
response = requests.post(instance + '/api/v1/apps', data={'client_name':app_name,'scopes':scopes, 'redirect_uris':'urn:ietf:wg:oauth:2.0:oob'}).json()
client_id = config['client_id'] = response['client_id']
client_secret = config['client_secret'] = response['client_secret']
config['instance'] = instance
#Log in to user account
print(instance + '/oauth/authorize?', urlencode({'response_type':'code', 'client_id':client_id, 'redirect_uri':'urn:ietf:wg:oauth:2.0:oob', 'scope':scopes}))
code = input("To generate a token to access your account, " + app_name + " needs an authorization code. Please authorize using the link above and enter the code it provides you \nCode: ")
response = requests.post(instance + '/oauth/token', data={'grant_type':'authorization_code', 'code':code, 'client_id':client_id, 'client_secret':client_secret, 'redirect_uri':'urn:ietf:wg:oauth:2.0:oob', 'scope':scopes})
config['user_token'] = response.json()['access_token']
with open(os.path.join(parent, 'config.json'), 'w') as config_file:
config_file.write(json.dumps(config))
if __name__ == "__main__":
generate_config("test_app", "read")

View File

@ -1,52 +1,56 @@
#!/usr/bin/python3.8 #!/usr/bin/python3.8
from mastodon import Mastodon
import os import os
import sys import sys
import html2text import html2text
import json
import requests
parent = os.path.dirname(os.path.realpath(__file__))
if os.path.exists('./.creds/') == False: if not os.path.exists(os.path.join(parent, 'config.json')):
os.mkdir('./.creds') generate_config("Wardyn's feditools", "read write follow push")
if os.path.exists('./.creds/client.secret') == False: with open(os.path.join(parent, 'config.json'), 'r') as config_file:
instance = input('Please enter your instance: ') config = json.load(config_file)
if not instance[:4] == 'http': session = requests.Session()
instance = 'https://' + instance session.headers.update({"Authorization" : "Bearer " + config['user_token']})
Mastodon.create_app('Wardyns fedi tools', api_base_url = instance, to_file = './.creds/client.secret')
mastodon = Mastodon( if len(sys.argv) != 2:
client_id = './.creds/client.secret',
)
if os.path.exists('./.creds/user.secret') == False:
username = input('Enter your username: ')
password = input('Enter your password: ')
mastodon.log_in(username=username, password=password, scopes=['read', 'write'], to_file='./.creds/user.secret')
mastodon = Mastodon(
access_token = './.creds/user.secret'
)
if len(sys.argv) < 2:
print('Expected 1 argument (account)') print('Expected 1 argument (account)')
quit() quit()
else:
highestpost = None highestpost = None
oldest_status_id = None oldest_status_id = None
htmlconvert = html2text.HTML2Text() htmlconvert = html2text.HTML2Text()
htmlconvert.ignore_links = True htmlconvert.ignore_links = True
htmlconvert.body_width = 0 htmlconvert.body_width = 0
account = sys.argv[1] account = sys.argv[1]
accid = mastodon.account_search(account)[0]['id']
while True: if account[0] == '@':
statuses = mastodon.account_statuses(accid, max_id=oldest_status_id, limit=1000) account = account[1:]
if len(account.split('@')) == 1:
account = account + '@' + config['instance'].split('/')[-1]
searchlist = session.get(config['instance'] + '/api/v2/search?', params={'q':account, 'type':'accounts'}).json()['accounts']
acc_id = None
for user in searchlist:
if user['fqn'].lower() == account.lower():
acc_id = user['id']
acc_name = user['fqn']
break
if not acc_id:
print("Could not find user: " + account)
quit()
#accid = session.get(config['instance'] + '/api/v2/search?', params={'q':account, 'type':'accounts'}).json()['accounts'][0]['id']
while True:
statuses = session.get(config['instance'] + '/api/v1/accounts/' + acc_id + '/statuses', params={"max_id":oldest_status_id, "limit":40, "exclude_reblogs":True}).json()
try: try:
oldest_status_id = statuses[-1]['id'] oldest_status_id = statuses[-1]['id']
except IndexError: except IndexError:
print('Reached end of posts') print('Reached end of posts')
quit() quit()
for status in statuses: for status in statuses:
statusscore = status['reblogs_count'] + status['favourites_count'] status_score = status['reblogs_count'] + status['favourites_count']
if status['reblogged'] == False: if highestpost == None or status_score > highestpost['score']:
if highestpost == None or statusscore > highestpost['score']:
highestpost = {'score' : status['reblogs_count'] + status['favourites_count'], 'post' : status} highestpost = {'score' : status['reblogs_count'] + status['favourites_count'], 'post' : status}
content = str(htmlconvert.handle(status['content'])) content = str(htmlconvert.handle(status['content']))
print(content) print(content)

45
import_following.py Normal file
View File

@ -0,0 +1,45 @@
import requests
import json
import os
from argparse import ArgumentParser, FileType
from generate_config import generate_config
parent = os.path.dirname(os.path.realpath(__file__))
# Initialize arguments
parser = ArgumentParser(description='Import follows from a csv file')
parser.add_argument('file',
type=open,
help='csv file containing users to follow',
)
args = parser.parse_args()
csv = args.file.read()
if not os.path.exists(os.path.join(parent, 'config.json')):
generate_config("Wardyn's feditools", "read write follow push")
with open(os.path.join(parent, 'config.json'), 'r') as config_file:
config = json.load(config_file)
session = requests.Session()
session.headers.update({"Authorization" : "Bearer " + config['user_token']})
acclist = csv.split('\n')
for account in acclist:
searchlist = session.get(config['instance'] + '/api/v2/search?', params={'q':account, 'type':'accounts'}).json()
searchlist = searchlist['accounts']
found = False
for user in searchlist:
if user['fqn'] == account:
account = user
found = True
break
if found == False:
print("Could not find user: " + account)
continue
relationship = account['pleroma']['relationship']
if relationship['following'] == True:
print('Already following: ' + account['fqn'])
continue
print("Following: " + account['fqn'])
session.post(config['instance'] + '/api/v1/accounts/' + account['id'] + '/follow')

BIN
notification.mp3 Normal file

Binary file not shown.

30
notification_sound.py Normal file
View File

@ -0,0 +1,30 @@
import websocket
import os
from playsound import playsound
from generate_config import generate_config
import json
parent = os.path.dirname(os.path.realpath(__file__))
if not os.path.exists(os.path.join(parent, 'config.json')):
generate_config("Wardyn's feditools", "read write follow push")
with open(os.path.join(parent, 'config.json'), 'r') as config_file:
config = json.load(config_file)
sound = os.path.join(parent, 'notification.mp3')
def on_message(ws, message):
print('Received notification')
playsound(sound)
def on_open(ws):
print('Websocket opened')
def on_close(ws):
print('Websocket closed, attempting to reconnect')
connect_websocket()
def connect_websocket():
ws = websocket.WebSocketApp("wss://" + config['instance'].split('/')[-1] + "/api/v1/streaming?access_token=" + config['user_token'] + "&stream=user:notification",
on_message = on_message, on_open = on_open, on_close = on_close, on_error=on_close)
ws.run_forever()
connect_websocket()