|
|
|
import random
|
|
|
|
from sys import stderr
|
|
|
|
from textwrap import wrap
|
|
|
|
from time import sleep, time
|
|
|
|
import tweepy as t
|
|
|
|
from tweepy.errors import Forbidden, TooManyRequests
|
|
|
|
import typing
|
|
|
|
|
|
|
|
from .command import Command, mk_commands
|
|
|
|
from .exceptions import *
|
|
|
|
from .pastas import PASTAS
|
|
|
|
from .util import (
|
|
|
|
get_timestamp_s,
|
|
|
|
jdump,
|
|
|
|
get_tweet_text,
|
|
|
|
log_t_by_sname,
|
|
|
|
surl_prefix,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class PseudBot:
|
|
|
|
last_stat = None
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
tcfg: dict,
|
|
|
|
custom_welcome: str = None,
|
|
|
|
last_id: int = None,
|
|
|
|
reply_all: bool = True,
|
|
|
|
target_screen_name: str = None,
|
|
|
|
proxy_url: str = None,
|
|
|
|
quiet: bool = False,
|
|
|
|
debug: bool = False,
|
|
|
|
):
|
|
|
|
self.debug = debug
|
|
|
|
self.reply_all = True
|
|
|
|
tauth = t.OAuthHandler(tcfg["consumer"], tcfg["consumer_secret"])
|
|
|
|
tauth.set_access_token(tcfg["tok"], tcfg["tok_secret"])
|
|
|
|
|
|
|
|
if proxy_url is not None:
|
|
|
|
self.tapi = t.API(tauth, proxy=proxy_url)
|
|
|
|
else:
|
|
|
|
self.tapi = t.API(tauth)
|
|
|
|
|
|
|
|
if quiet is False:
|
|
|
|
if custom_welcome is not None:
|
|
|
|
welcome_tweet = custom_welcome
|
|
|
|
else:
|
|
|
|
welcome_tweet = "Powered on at " + str(int(time()))
|
|
|
|
|
|
|
|
self.target_screen_name = target_screen_name
|
|
|
|
|
|
|
|
self.wstatus = self.tapi.update_status(welcome_tweet)
|
|
|
|
self.screen_name = self.wstatus.user.screen_name
|
|
|
|
self.url_prefix = (
|
|
|
|
"https://twitter.com/" + self.screen_name + "/status/"
|
|
|
|
)
|
|
|
|
self._log_tweet(welcome_tweet, self.wstatus)
|
|
|
|
|
|
|
|
if last_id is None:
|
|
|
|
idr = open("last_id", mode="r")
|
|
|
|
self.last_id = int(idr.read())
|
|
|
|
idr.close()
|
|
|
|
sleep(0.5)
|
|
|
|
else:
|
|
|
|
self.last_id = last_id
|
|
|
|
|
|
|
|
def list_actions(self):
|
|
|
|
"""
|
|
|
|
List actions that Pseudbot can run.
|
|
|
|
"""
|
|
|
|
actions = [
|
|
|
|
meth_name
|
|
|
|
for meth_name in dir(self)
|
|
|
|
if callable(getattr(self, meth_name))
|
|
|
|
and not meth_name.startswith("_")
|
|
|
|
]
|
|
|
|
for action_name in actions:
|
|
|
|
print(action_name + ":", end="")
|
|
|
|
docstr = getattr(self, action_name).__doc__
|
|
|
|
print(docstr)
|
|
|
|
if docstr is None:
|
|
|
|
print()
|
|
|
|
|
|
|
|
def _log_tweet(self, msg, tstat) -> None:
|
|
|
|
print('[TWEET]: "{}" ({})'.format(msg, self.url_prefix + str(tstat.id)))
|
|
|
|
|
|
|
|
def _check_sname_exists(self):
|
|
|
|
if self.target_screen_name is None:
|
|
|
|
raise PseudBotNoTargetScreenName
|
|
|
|
|
|
|
|
def user_timeline(self):
|
|
|
|
"""
|
|
|
|
Get all tweets from a user's timeline.
|
|
|
|
Requires ``target_screen_name`` to be set
|
|
|
|
(set by ``-s`` if using the CLI).
|
|
|
|
"""
|
|
|
|
self._check_sname_exists()
|
|
|
|
tweets_j = []
|
|
|
|
|
|
|
|
rq_n = 0
|
|
|
|
for tweet in t.Cursor(
|
|
|
|
self.tapi.user_timeline,
|
|
|
|
screen_name=self.target_screen_name,
|
|
|
|
since_id=1,
|
|
|
|
tweet_mode="extended",
|
|
|
|
).items():
|
|
|
|
log_t_by_sname(tweet)
|
|
|
|
tweets_j.append(tweet._json)
|
|
|
|
sleep(0.2)
|
|
|
|
rq_n += 1
|
|
|
|
if rq_n % 899 == 0:
|
|
|
|
print(
|
|
|
|
"[WARN]: At standard v1.1 API limit! "
|
|
|
|
+ "Sleeping for 15 minutes...",
|
|
|
|
file=stderr,
|
|
|
|
)
|
|
|
|
sleep(901)
|
|
|
|
|
|
|
|
jdump(tweets_j, extra_tag=self.target_screen_name)
|
|
|
|
|
|
|
|
def timeline(self):
|
|
|
|
"""
|
|
|
|
Get and dump recent tweets from your Pseudbot account's home timeline.
|
|
|
|
"""
|
|
|
|
home_tl = self.tapi.home_timeline(tweet_mode="extended")
|
|
|
|
jsons = []
|
|
|
|
for tweet in home_tl:
|
|
|
|
jsons.append(tweet._json)
|
|
|
|
|
|
|
|
jdump(jsons, extra_tag=self.screen_name)
|
|
|
|
|
|
|
|
def _tweet_media(
|
|
|
|
self, id_reply_to: int, parent_screen_name: str, media: [str] = []
|
|
|
|
):
|
|
|
|
_stat = self.last_stat
|
|
|
|
try:
|
|
|
|
self.last_stat = self.tapi.update_status_with_media(
|
|
|
|
"@" + parent_screen_name,
|
|
|
|
in_reply_to_status_id=id_reply_to,
|
|
|
|
filename=media.pop(0),
|
|
|
|
)
|
|
|
|
except Forbidden:
|
|
|
|
return _stat
|
|
|
|
|
|
|
|
if len(media) > 0:
|
|
|
|
sleep(2)
|
|
|
|
return self._tweet_media(
|
|
|
|
self.last_stat.id, self.last_stat.user.screen_name, media
|
|
|
|
)
|
|
|
|
else:
|
|
|
|
return self.last_stat
|
|
|
|
|
|
|
|
def _tweet_pasta(self, id_reply_to: int, pasta: [str], media: [str] = []):
|
|
|
|
"""
|
|
|
|
Recursively tweet an entire pasta, noodle by noodle::
|
|
|
|
In this house we stan recursion.
|
|
|
|
"""
|
|
|
|
_stat = self.last_stat
|
|
|
|
try:
|
|
|
|
noodle = pasta.pop(0)
|
|
|
|
if len(media) > 0:
|
|
|
|
self.last_stat = self.tapi.update_status_with_media(
|
|
|
|
noodle,
|
|
|
|
in_reply_to_status_id=id_reply_to,
|
|
|
|
filename=media.pop(0),
|
|
|
|
)
|
|
|
|
else:
|
|
|
|
self.last_stat = self.tapi.update_status(
|
|
|
|
noodle, in_reply_to_status_id=id_reply_to
|
|
|
|
)
|
|
|
|
self._log_tweet(noodle, self.last_stat)
|
|
|
|
except Forbidden:
|
|
|
|
return _stat
|
|
|
|
if len(pasta) > 0:
|
|
|
|
sleep(2)
|
|
|
|
return self._tweet_pasta(self.last_stat.id, pasta, media)
|
|
|
|
else:
|
|
|
|
return self.last_stat
|
|
|
|
|
|
|
|
def hello(self):
|
|
|
|
"""
|
|
|
|
Tweet "Hello pseudbot" with a timestamp.
|
|
|
|
"""
|
|
|
|
hello_msg = get_timestamp_s() + ": Hello pseudbot"
|
|
|
|
hello_stat = self.tapi.update_status(hello_msg)
|
|
|
|
jdump(hello_stat._json)
|
|
|
|
self._log_tweet(hello_msg, hello_stat)
|
|
|
|
|
|
|
|
def _write_last_id(self):
|
|
|
|
"""
|
|
|
|
Write the tweet ID of the last mention responded to.
|
|
|
|
"""
|
|
|
|
idw = open("last_id", mode="w")
|
|
|
|
idw.write(str(self.last_id))
|
|
|
|
idw.close()
|
|
|
|
|
|
|
|
def dump_all_mentions(self):
|
|
|
|
"""
|
|
|
|
Dump all times your bot has been mentioned.
|
|
|
|
"""
|
|
|
|
self.dump_mentions(start_id=1)
|
|
|
|
|
|
|
|
def dump_mentions(self, start_id: int = None):
|
|
|
|
"""
|
|
|
|
Dump all mentions since ``last_id``.
|
|
|
|
Override with ``-i`` on the command line.
|
|
|
|
"""
|
|
|
|
if start_id is None:
|
|
|
|
start_id = self.last_id
|
|
|
|
|
|
|
|
tweets_j = []
|
|
|
|
for tweet in t.Cursor(
|
|
|
|
self.tapi.mentions_timeline,
|
|
|
|
since_id=start_id,
|
|
|
|
tweet_mode="extended",
|
|
|
|
).items():
|
|
|
|
if tweet.user.screen_name == self.screen_name:
|
|
|
|
continue
|
|
|
|
|
|
|
|
print(
|
|
|
|
"Mentioned by @{} in: {}".format(
|
|
|
|
tweet.user.screen_name, self.url_prefix + str(tweet.id)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
tweets_j.append(tweet._json)
|
|
|
|
|
|
|
|
self.last_id = max(tweet.id, self.last_id)
|
|
|
|
sleep(2)
|
|
|
|
|
|
|
|
jdump(tweets_j)
|
|
|
|
|
|
|
|
def dump_tweet(self):
|
|
|
|
"""
|
|
|
|
Dump the JSON data dictionary of a specific tweet.
|
|
|
|
If called from the CLI, requires ``-i`` to be set.
|
|
|
|
"""
|
|
|
|
tweets = self.tapi.lookup_statuses(
|
|
|
|
[self.last_id], tweet_mode="extended"
|
|
|
|
)
|
|
|
|
jtweets = []
|
|
|
|
for tweet in tweets:
|
|
|
|
log_t_by_sname(tweet)
|
|
|
|
jtweets.append(tweet._json)
|
|
|
|
|
|
|
|
jdump(jtweets, extra_tag=self.last_id)
|
|
|
|
|
|
|
|
def pasta_tweet(self):
|
|
|
|
"""
|
|
|
|
Insert a copy pasta in a Tweet chain manually starting from a specific
|
|
|
|
tweet ID. Requires ``-i`` to be set if calling from the CLI.
|
|
|
|
"""
|
|
|
|
pasta = []
|
|
|
|
while len(pasta) < 1:
|
|
|
|
pasta = random.choice(PASTAS)
|
|
|
|
|
|
|
|
print("[INFO]: Replying to {}...".format(self.last_id))
|
|
|
|
tweets = self.tapi.lookup_statuses(
|
|
|
|
[self.last_id], tweet_mode="extended"
|
|
|
|
)
|
|
|
|
for tweet in tweets:
|
|
|
|
self._parse_mention(tweet)
|
|
|
|
|
|
|
|
def get_tweet_parents(self):
|
|
|
|
"""
|
|
|
|
Print the parent tuple list (parent_id, reply_to_screen_name).
|
|
|
|
|
|
|
|
Works best when invoked with -i on a tweet status ID.
|
|
|
|
"""
|
|
|
|
print(
|
|
|
|
self._get_reply_parents(
|
|
|
|
self.tapi.lookup_statuses(
|
|
|
|
[self.last_id], tweet_mode="extended"
|
|
|
|
)[0]
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
def _get_reply_parents(self, tweet) -> [(int, str)]:
|
|
|
|
"""
|
|
|
|
Builds and returns the parent tuple list from a starting tweet:
|
|
|
|
(parent_id, reply_to_screen_name)
|
|
|
|
"""
|
|
|
|
if tweet.in_reply_to_screen_name is not None:
|
|
|
|
if tweet.in_reply_to_screen_name != self.screen_name:
|
|
|
|
parent_name = tweet.in_reply_to_screen_name
|
|
|
|
else:
|
|
|
|
parent_name = None
|
|
|
|
print(
|
|
|
|
"[INFO]: Replying to {}'s mention directly...".format(
|
|
|
|
tweet.user.screen_name
|
|
|
|
)
|
|
|
|
)
|
|
|
|
else:
|
|
|
|
parent_name = None
|
|
|
|
|
|
|
|
parents = []
|
|
|
|
if tweet.in_reply_to_status_id is not None and parent_name is not None:
|
|
|
|
reply_to_screen_name = tweet.in_reply_to_screen_name
|
|
|
|
parent_id = tweet.in_reply_to_status_id
|
|
|
|
parents = parents + self._get_reply_parents(
|
|
|
|
tweet=self.tapi.lookup_statuses(
|
|
|
|
[tweet.in_reply_to_status_id], tweet_mode="extended"
|
|
|
|
)[0]
|
|
|
|
)
|
|
|
|
else:
|
|
|
|
reply_to_screen_name = tweet.user.screen_name
|
|
|
|
parent_id = tweet.id
|
|
|
|
|
|
|
|
parents = [(parent_id, reply_to_screen_name)] + parents
|
|
|
|
|
|
|
|
return parents
|
|
|
|
|
|
|
|
def test_parser(self):
|
|
|
|
"""
|
|
|
|
Run the command parser in a dry run (e.g. without sending any tweets) on
|
|
|
|
a specific tweet.
|
|
|
|
|
|
|
|
Works best when invoked with -i on a tweet status ID.
|
|
|
|
"""
|
|
|
|
self._parse_mention(
|
|
|
|
self.tapi.lookup_statuses([self.last_id], tweet_mode="extended")[0],
|
|
|
|
wet_run=False,
|
|
|
|
)
|
|
|
|
|
|
|
|
def _parse_mention(self, tweet, wet_run: bool = True):
|
|
|
|
"""
|
|
|
|
Parse commands in tweet and do something
|
|
|
|
"""
|
|
|
|
# Each parent in the list is a (parent_id, parent_screen_name) tuple.
|
|
|
|
parents = self._get_reply_parents(tweet)
|
|
|
|
|
|
|
|
drybug = False if wet_run is True else True
|
|
|
|
|
|
|
|
for command in mk_commands(get_tweet_text(tweet), debug=drybug):
|
|
|
|
if len(command.pasta) > 0:
|
|
|
|
pasta = self._make_pasta_chain(parents, command)
|
|
|
|
if wet_run is True:
|
|
|
|
self._tweet_pasta(parents[0][0], pasta, command.media)
|
|
|
|
else:
|
|
|
|
print("[DRY]: pasta: {}".format(pasta))
|
|
|
|
print("[DRY]: Such wow, very tweet!")
|
|
|
|
elif len(command.media) > 0:
|
|
|
|
if wet_run is True:
|
|
|
|
self._tweet_media(
|
|
|
|
parents[0][0], parent_screen_name, command.media
|
|
|
|
)
|
|
|
|
else:
|
|
|
|
# TODO: show media in dry mode?
|
|
|
|
print("[DRY]: Such wow, very media!")
|
|
|
|
else:
|
|
|
|
print(
|
|
|
|
'[WARN]: Unable to parse tweet segment: "{}"'.format(
|
|
|
|
command.text
|
|
|
|
),
|
|
|
|
file=stderr,
|
|
|
|
)
|
|
|
|
|
|
|
|
def _make_pasta_chain(self, parents: [(int, str)], cmd: Command) -> [str]:
|
|
|
|
"""
|
|
|
|
Make a text reply chain.
|
|
|
|
"""
|
|
|
|
|
|
|
|
parent_screen_names = " ".join(list(map(lambda p: "@" + p[1], parents)))
|
|
|
|
pasta = []
|
|
|
|
|
|
|
|
in_pasta = wrap(parent_screen_names + " " + cmd.pasta, width=280)
|
|
|
|
pasta.append(in_pasta.pop(0))
|
|
|
|
while len(in_pasta) > 0:
|
|
|
|
tmp_pasta = wrap(
|
|
|
|
"@" + self.screen_name + " " + parent_screen_names + " " + in_pasta.pop(0),
|
|
|
|
width=280,
|
|
|
|
)
|
|
|
|
pasta.append(tmp_pasta.pop(0))
|
|
|
|
if len(in_pasta) > 1:
|
|
|
|
in_pasta = [" ".join(tmp_pasta) + in_pasta[0]] + in_pasta[1:]
|
|
|
|
else:
|
|
|
|
try:
|
|
|
|
in_pasta = [" ".join(tmp_pasta) + in_pasta[0]]
|
|
|
|
except IndexError:
|
|
|
|
if self.debug is True:
|
|
|
|
print("[DEBUG]: in_pasta empty!")
|
|
|
|
|
|
|
|
return pasta
|
|
|
|
|
|
|
|
def _reply_mentions(self):
|
|
|
|
"""
|
|
|
|
Check mentions since ``last_id`` and reply in each thread with a
|
|
|
|
copypasta chain.
|
|
|
|
"""
|
|
|
|
for tweet in t.Cursor(
|
|
|
|
self.tapi.mentions_timeline,
|
|
|
|
since_id=self.last_id,
|
|
|
|
tweet_mode="extended",
|
|
|
|
).items():
|
|
|
|
if tweet.user.screen_name == self.screen_name:
|
|
|
|
continue
|
|
|
|
|
|
|
|
self.last_id = max(tweet.id, self.last_id)
|
|
|
|
|
|
|
|
self._parse_mention(tweet)
|
|
|
|
|
|
|
|
if self.last_stat is not None:
|
|
|
|
print("Finished chain with {}".format(self.last_stat.id))
|
|
|
|
sleep(10)
|
|
|
|
|
|
|
|
self._write_last_id()
|
|
|
|
|
|
|
|
def run_bot(self):
|
|
|
|
"""
|
|
|
|
Start Pseudbot in its main mode: mention listener mode.
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
self._reply_mentions()
|
|
|
|
sleep(120)
|
|
|
|
except TooManyRequests:
|
|
|
|
cooldown = 1000
|
|
|
|
print(
|
|
|
|
"[WARN]: Rate limited, cooling down for {} seconds...".format(
|
|
|
|
cooldown
|
|
|
|
)
|
|
|
|
)
|
|
|
|
sleep(cooldown)
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
print()
|
|
|
|
shutdown_msg = (
|
|
|
|
"Shut down for maintenance at " + str(int(time())) + " 👋"
|
|
|
|
)
|
|
|
|
self._log_tweet(shutdown_msg, self.tapi.update_status(shutdown_msg))
|