You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
364 lines
11 KiB
364 lines
11 KiB
import random |
|
from sys import stderr |
|
from textwrap import indent |
|
from time import sleep, time |
|
import tweepy as t |
|
from tweepy.errors import Forbidden, TooManyRequests |
|
import typing |
|
|
|
from .command import Command, mk_commands |
|
from .exceptions import * |
|
from .pastas import PASTAS |
|
from .util import ( |
|
get_timestamp_s, |
|
jdump, |
|
get_tweet_text, |
|
log_t_by_sname, |
|
surl_prefix, |
|
) |
|
|
|
|
|
class PseudBot: |
|
last_stat = None |
|
|
|
def __init__( |
|
self, |
|
tcfg: dict, |
|
custom_welcome: str = None, |
|
last_id: int = None, |
|
target_screen_name: str = None, |
|
proxy_url: str = None, |
|
quiet: bool = False, |
|
debug: bool = False, |
|
): |
|
self.debug = debug |
|
tauth = t.OAuthHandler(tcfg["consumer"], tcfg["consumer_secret"]) |
|
tauth.set_access_token(tcfg["tok"], tcfg["tok_secret"]) |
|
|
|
if proxy_url is not None: |
|
self.tapi = t.API(tauth, proxy=proxy_url) |
|
else: |
|
self.tapi = t.API(tauth) |
|
|
|
if quiet is False: |
|
if custom_welcome is not None: |
|
welcome_tweet = custom_welcome |
|
else: |
|
welcome_tweet = "Powered on at " + str(int(time())) |
|
|
|
self.target_screen_name = target_screen_name |
|
|
|
self.wstatus = self.tapi.update_status(welcome_tweet) |
|
self.screen_name = self.wstatus.user.screen_name |
|
self.url_prefix = ( |
|
"https://twitter.com/" + self.screen_name + "/status/" |
|
) |
|
self._log_tweet(welcome_tweet, self.wstatus) |
|
|
|
if last_id is None: |
|
idr = open("last_id", mode="r") |
|
self.last_id = int(idr.read()) |
|
idr.close() |
|
sleep(0.5) |
|
else: |
|
self.last_id = last_id |
|
|
|
def list_actions(self): |
|
""" |
|
List actions that Pseudbot can run. |
|
""" |
|
actions = [ |
|
meth_name |
|
for meth_name in dir(self) |
|
if callable(getattr(self, meth_name)) |
|
and not meth_name.startswith("_") |
|
] |
|
for action_name in actions: |
|
print(action_name + ":", end="") |
|
docstr = getattr(self, action_name).__doc__ |
|
print(docstr) |
|
if docstr is None: |
|
print() |
|
|
|
def _log_tweet(self, msg, tstat) -> None: |
|
print('[TWEET]: "{}" ({})'.format(msg, self.url_prefix + str(tstat.id))) |
|
|
|
def _check_sname_exists(self): |
|
if self.target_screen_name is None: |
|
raise PseudBotNoTargetScreenName |
|
|
|
def user_timeline(self): |
|
""" |
|
Get all tweets from a user's timeline. |
|
Requires ``target_screen_name`` to be set |
|
(set by ``-s`` if using the CLI). |
|
""" |
|
self._check_sname_exists() |
|
tweets_j = [] |
|
|
|
rq_n = 0 |
|
for tweet in t.Cursor( |
|
self.tapi.user_timeline, |
|
screen_name=self.target_screen_name, |
|
since_id=1, |
|
tweet_mode="extended", |
|
).items(): |
|
log_t_by_sname(tweet) |
|
tweets_j.append(tweet._json) |
|
sleep(0.2) |
|
rq_n += 1 |
|
if rq_n % 899 == 0: |
|
print( |
|
"[WARN]: At standard v1.1 API limit! " |
|
+ "Sleeping for 15 minutes...", |
|
file=stderr, |
|
) |
|
sleep(901) |
|
|
|
jdump(tweets_j, extra_tag=self.target_screen_name) |
|
|
|
def timeline(self): |
|
""" |
|
Get and dump recent tweets from your Pseudbot account's home timeline. |
|
""" |
|
home_tl = self.tapi.home_timeline(tweet_mode="extended") |
|
jsons = [] |
|
for tweet in home_tl: |
|
jsons.append(tweet._json) |
|
|
|
jdump(jsons, extra_tag=self.screen_name) |
|
|
|
def _tweet_media( |
|
self, id_reply_to: int, parent_screen_name: str, media: [str] = [] |
|
): |
|
_stat = self.last_stat |
|
try: |
|
self.last_stat = self.tapi.update_status_with_media( |
|
"@" + parent_screen_name, |
|
in_reply_to_status_id=id_reply_to, |
|
filename=media.pop(0), |
|
) |
|
except Forbidden: |
|
return _stat |
|
|
|
if len(media) > 0: |
|
sleep(2) |
|
return self._tweet_media( |
|
self.last_stat.id, self.last_stat.user.screen_name, media |
|
) |
|
else: |
|
return self.last_stat |
|
|
|
def _tweet_pasta(self, id_reply_to: int, pasta: [str], media: [str] = []): |
|
""" |
|
Recursively tweet an entire pasta, noodle by noodle:: |
|
In this house we stan recursion. |
|
""" |
|
_stat = self.last_stat |
|
try: |
|
noodle = pasta.pop(0) |
|
if len(media) > 0: |
|
self.last_stat = self.tapi.update_status_with_media( |
|
noodle, |
|
in_reply_to_status_id=id_reply_to, |
|
filename=media.pop(0), |
|
) |
|
else: |
|
self.last_stat = self.tapi.update_status( |
|
noodle, in_reply_to_status_id=id_reply_to |
|
) |
|
self._log_tweet(noodle, self.last_stat) |
|
except Forbidden: |
|
return _stat |
|
if len(pasta) > 0: |
|
pasta[0] = "@" + self.last_stat.user.screen_name + " " + pasta[0] |
|
sleep(2) |
|
return self._tweet_pasta(self.last_stat.id, pasta, media) |
|
else: |
|
return self.last_stat |
|
|
|
def hello(self): |
|
""" |
|
Tweet "Hello pseudbot" with a timestamp. |
|
""" |
|
hello_msg = get_timestamp_s() + ": Hello pseudbot" |
|
hello_stat = self.tapi.update_status(hello_msg) |
|
jdump(hello_stat._json) |
|
self._log_tweet(hello_msg, hello_stat) |
|
|
|
def _write_last_id(self): |
|
""" |
|
Write the tweet ID of the last mention responded to. |
|
""" |
|
idw = open("last_id", mode="w") |
|
idw.write(str(self.last_id)) |
|
idw.close() |
|
|
|
def dump_all_mentions(self): |
|
""" |
|
Dump all times your bot has been mentioned. |
|
""" |
|
self.dump_mentions(start_id=1) |
|
|
|
def dump_mentions(self, start_id: int = None): |
|
""" |
|
Dump all mentions since ``last_id``. |
|
Override with ``-i`` on the command line. |
|
""" |
|
if start_id is None: |
|
start_id = self.last_id |
|
|
|
tweets_j = [] |
|
for tweet in t.Cursor( |
|
self.tapi.mentions_timeline, |
|
since_id=start_id, |
|
tweet_mode="extended", |
|
).items(): |
|
if tweet.user.screen_name == self.screen_name: |
|
continue |
|
|
|
print( |
|
"Mentioned by @{} in: {}".format( |
|
tweet.user.screen_name, self.url_prefix + str(tweet.id) |
|
) |
|
) |
|
tweets_j.append(tweet._json) |
|
|
|
self.last_id = max(tweet.id, self.last_id) |
|
sleep(2) |
|
|
|
jdump(tweets_j) |
|
|
|
def dump_tweet(self): |
|
""" |
|
Dump the JSON data dictionary of a specific tweet. |
|
If called from the CLI, requires ``-i`` to be set. |
|
""" |
|
tweets = self.tapi.lookup_statuses( |
|
[self.last_id], tweet_mode="extended" |
|
) |
|
jtweets = [] |
|
for tweet in tweets: |
|
log_t_by_sname(tweet) |
|
jtweets.append(tweet._json) |
|
|
|
jdump(jtweets, extra_tag=self.last_id) |
|
|
|
def pasta_tweet(self): |
|
""" |
|
Insert a copy pasta in a Tweet chain manually starting from a specific |
|
tweet ID. Requires ``-i`` to be set if calling from the CLI. |
|
""" |
|
pasta = [] |
|
while len(pasta) < 1: |
|
pasta = random.choice(PASTAS) |
|
|
|
print("[INFO]: Replying to {}...".format(self.last_id)) |
|
tweets = self.tapi.lookup_statuses( |
|
[self.last_id], tweet_mode="extended" |
|
) |
|
for tweet in tweets: |
|
self._parse_mention(tweet) |
|
|
|
def _get_reply_parent(self, tweet) -> (int, str): |
|
if tweet.in_reply_to_screen_name is not None: |
|
if tweet.in_reply_to_screen_name != self.screen_name: |
|
parent_name = tweet.in_reply_to_screen_name |
|
else: |
|
parent_name = None |
|
print( |
|
"[INFO]: Replying to {}'s mention ".format( |
|
tweet.user.screen_name |
|
) |
|
+ "instead of replying to myself..." |
|
) |
|
else: |
|
parent_name = None |
|
|
|
if tweet.in_reply_to_status_id is not None and parent_name is not None: |
|
reply_to_screen_name = tweet.in_reply_to_screen_name |
|
parent_id = tweet.in_reply_to_status_id |
|
else: |
|
reply_to_screen_name = tweet.user.screen_name |
|
parent_id = tweet.id |
|
|
|
return (parent_id, reply_to_screen_name) |
|
|
|
def _parse_mention(self, tweet): |
|
""" |
|
Parse commands in tweet and do something |
|
""" |
|
(parent_id, parent_screen_name) = self._get_reply_parent(tweet) |
|
|
|
for command in mk_commands(get_tweet_text(tweet)): |
|
if command.pasta is True: |
|
pasta = self._make_pasta_chain(parent_screen_name) |
|
self._tweet_pasta(parent_id, pasta, command.media) |
|
elif len(command.media) > 0: |
|
self._tweet_media(parent_id, parent_screen_name, command.media) |
|
else: |
|
print( |
|
'[WARN]: Unable to parse tweet segment: "{}"'.format( |
|
command.text |
|
), |
|
file=stderr, |
|
) |
|
|
|
def _make_pasta_chain(self, parent_screen_name: str) -> [str]: |
|
""" |
|
Send a copypasta chain. |
|
""" |
|
pasta = [] |
|
while len(pasta) < 1: |
|
pasta = random.choice(PASTAS) |
|
|
|
pasta[0] = "@" + parent_screen_name + " " + pasta[0] |
|
|
|
return pasta |
|
|
|
def _reply_mentions(self): |
|
""" |
|
Check mentions since ``last_id`` and reply in each thread with a |
|
copypasta chain. |
|
""" |
|
for tweet in t.Cursor( |
|
self.tapi.mentions_timeline, |
|
since_id=self.last_id, |
|
tweet_mode="extended", |
|
).items(): |
|
if tweet.user.screen_name == self.screen_name: |
|
continue |
|
|
|
self.last_id = max(tweet.id, self.last_id) |
|
|
|
self._parse_mention(tweet) |
|
|
|
if self.last_stat is not None: |
|
print("Finished chain with {}".format(self.last_stat.id)) |
|
sleep(10) |
|
|
|
self._write_last_id() |
|
|
|
def run_bot(self): |
|
""" |
|
Start Pseudbot in its main mode: mention listener mode. |
|
""" |
|
try: |
|
while True: |
|
try: |
|
self._reply_mentions() |
|
sleep(120) |
|
except TooManyRequests: |
|
cooldown = 1000 |
|
print( |
|
"[WARN]: Rate limited, cooling down for {} seconds...".format( |
|
cooldown |
|
) |
|
) |
|
sleep(cooldown) |
|
except KeyboardInterrupt: |
|
print() |
|
shutdown_msg = ( |
|
"Shut down for maintenance at " + str(int(time())) + " 👋" |
|
) |
|
self._log_tweet(shutdown_msg, self.tapi.update_status(shutdown_msg))
|
|
|