diff --git a/README.md b/README.md index 7773317..3985630 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,13 @@ # AI-Tweets +### ReplyBot + +- 代码优化,每15分钟只能点赞一次 +- 重复回复问题未解决 + + + +### TweetsBot + +- 代码优化 +- 搜索指定用户最近的5条推文,用deepseek总结发送推文 diff --git a/ReplyBot.py b/ReplyBot.py new file mode 100644 index 0000000..910e4da --- /dev/null +++ b/ReplyBot.py @@ -0,0 +1,370 @@ +import requests +import time +import json +import os +from openai import OpenAI +import tweepy + +# API密钥配置 +TWITTER_API_KEY = "3nt1jN4VvqUaaXGHv9AN5VsTV" +TWITTER_API_SECRET = "M2io73S7TzitFiBw825QIq8atyZRljbIDQuTpH39uFZanQ4XFh" +TWITTER_ACCESS_TOKEN = "1944636908-prxfjL6OIb56BQjuFTdChrUPh81OjmBbV7pfnWw" +TWITTER_ACCESS_SECRET = "D5AdCVRvIhGEmTmXA8hL5ciAUxIqNMZ3K3B3YejpqqNKj" +TWITTER_BEARER_TOKEN = "AAAAAAAAAAAAAAAAAAAAAGOd0gEAAAAALtv%2BzLsfGLLa5ydUt60ci6J5ce0%3DMBXViJ1NLY4XYdeuMq1xZQ98kHbeGK5lAJoV2j7Ssmcafk8Skn" +TWITTER_API_IO_KEY = "e3dad005b0e54bdc88c6178a89adec13" +DEEPSEEK_API_KEY = "sk-8a121704a9bc4ec6a5ab0ae16e0bc0ba" + +# 创建DeepSeek客户端 +deepseek_client = OpenAI( + api_key=DEEPSEEK_API_KEY, + base_url="https://api.deepseek.com" +) + +# 已处理的回复文件 +PROCESSED_REPLIES_FILE = "../../../processed_replies.json" + + +# OAuth 1.0a 认证 +def get_oauth(): + return tweepy.OAuth1UserHandler( + TWITTER_API_KEY, + TWITTER_API_SECRET, + TWITTER_ACCESS_TOKEN, + TWITTER_ACCESS_SECRET + ) + + +# 使用高级搜索API获取推文 +def advanced_search_tweets(query_type="Latest", keywords=None, username=None, max_results=10): + url = "https://api.twitterapi.io/twitter/tweet/advanced_search" + + # 构建查询参数 + params = { + "queryType": query_type, + "maxResults": max_results + } + + # 添加可选参数 + if keywords: + params["query"] = keywords + + if username: + params["fromUsers"] = username + + headers = { + "X-API-Key": TWITTER_API_IO_KEY + } + + print(f"发送高级搜索请求到 TwitterAPI.io:") + print(f"URL: {url}") + print(f"参数: {params}") + print(f"Headers: {{'X-API-Key': '****' + TWITTER_API_IO_KEY[-4:]}}") + + response = requests.get(url, headers=headers, params=params) + print(f"响应状态码: {response.status_code}") + + if response.status_code == 200: + data = response.json() + print(f"响应数据结构: {list(data.keys() if isinstance(data, dict) else ['非字典'])}") + + # 检查并提取推文 + tweets = [] + if isinstance(data, dict): + if "tweets" in data: + tweets = data["tweets"] + elif "data" in data: + tweets = data["data"] + + print(f"找到 {len(tweets)} 条推文") + return data + else: + print(f"高级搜索请求失败: {response.status_code}") + print(response.text[:500]) # 只打印前500个字符,避免过长 + return None + + +# 获取用户最新推文 - 使用TwitterAPI.io的user/last_tweets端点 +def get_user_last_tweets(username, count=5): + url = "https://api.twitterapi.io/twitter/user/last_tweets" + params = { + "userName": username, + "count": count + } + headers = { + "X-API-Key": TWITTER_API_IO_KEY + } + print(f"发送请求到 TwitterAPI.io 获取用户推文:") + print(f"URL: {url}") + print(f"参数: {params}") + print(f"Headers: {{'X-API-Key': '****' + TWITTER_API_IO_KEY[-4:]}}") + + try: + response = requests.get(url, headers=headers, params=params) + print(f"响应状态码: {response.status_code}") + print(f"响应前100个字符: {response.text[:100]}") + + if response.status_code == 200: + data = response.json() + if "status" in data and data["status"] == "success": + # 检查是否有推文数据 + if "data" in data and "tweets" in data["data"] and len(data["data"]["tweets"]) > 0: + print(f"找到 {len(data['data']['tweets'])} 条推文") + # 把推文数据调整为与原代码兼容的格式 + return {"tweets": data["data"]["tweets"]} + else: + print("API返回成功但没有推文数据") + print(f"响应键: {list(data.keys())}") + if "data" in data: + print(f"data键: {list(data['data'].keys())}") + return None + else: + print(f"API返回非成功状态: {data.get('status', 'unknown')}") + return None + else: + print(f"获取推文失败: {response.status_code}") + print(response.text[:500]) # 只打印前500个字符 + return None + except Exception as e: + print(f"获取用户推文时出错: {str(e)}") + print("尝试使用高级搜索API作为备选方案...") + # 作为备选方案,使用高级搜索API + return advanced_search_tweets( + query_type="Latest", + username=username, + max_results=count + ) + + +# 获取推文回复 +def get_tweet_replies(tweet_id, count=10): + url = "https://api.twitterapi.io/twitter/tweet/replies" + params = { + "tweetId": tweet_id, + "count": count + } + headers = { + "X-API-Key": TWITTER_API_IO_KEY + } + print(f"获取推文 {tweet_id} 的回复...") + print(f"URL: {url}") + print(f"参数: {params}") + + response = requests.get(url, headers=headers, params=params) + print(f"响应状态码: {response.status_code}") + + if response.status_code == 200: + reply_data = response.json() + print(f"回复数据结构: {list(reply_data.keys() if isinstance(reply_data, dict) else ['非字典'])}") + if isinstance(reply_data, dict) and "tweets" in reply_data: + print(f"找到 {len(reply_data['tweets'])} 条回复") + return reply_data + else: + print(f"获取回复失败: {response.status_code}") + print(response.text[:500]) # 只打印前500个字符 + return None + + +# 使用DeepSeek生成回复内容 +def generate_reply(tweet_content, reply_content): + prompt = f""" + 我收到了以下推文和回复: + + 我的推文: "{tweet_content}" + 用户的回复: "{reply_content}" + + 请生成一个友好且相关的回复,不超过200个字符。回复应该表达感谢并继续对话。不要包含字符数或其他元信息。 + """ + try: + response = deepseek_client.chat.completions.create( + model="deepseek-chat", + messages=[ + {"role": "system", "content": "你是一个友好的社交媒体助手,擅长简短而有趣的回复。"}, + {"role": "user", "content": prompt}, + ], + stream=False + ) + reply = response.choices[0].message.content.strip() + print(f"生成的回复: {reply}") + return reply[:200] # 确保不超过200字符 + except Exception as e: + print(f"生成回复失败: {e}") + return "谢谢你的回复!" + + +# 获取自身user_id +def get_my_user_id(): + user = tweepy.Client( + consumer_key=TWITTER_API_KEY, + consumer_secret=TWITTER_API_SECRET, + access_token=TWITTER_ACCESS_TOKEN, + access_token_secret=TWITTER_ACCESS_SECRET + ).get_me() + return user.data.id + + +# 点赞推文 +def like_tweet(tweet_id): + try: + tweepy.Client( + consumer_key=TWITTER_API_KEY, + consumer_secret=TWITTER_API_SECRET, + access_token=TWITTER_ACCESS_TOKEN, + access_token_secret=TWITTER_ACCESS_SECRET + ).like(tweet_id) + print(f"成功点赞推文: {tweet_id}") + return True + except Exception as e: + print(f"点赞失败: {e}") + return False + + +# 回复推文 +def reply_to_tweet(tweet_id, reply_text): + try: + tweepy.Client( + consumer_key=TWITTER_API_KEY, + consumer_secret=TWITTER_API_SECRET, + access_token=TWITTER_ACCESS_TOKEN, + access_token_secret=TWITTER_ACCESS_SECRET + ).create_tweet(in_reply_to_tweet_id=tweet_id, text=reply_text) + print(f"成功回复: {tweet_id}") + return True + except Exception as e: + print(f"回复失败: {e}") + return False + + +# 加载已处理的回复 +def load_processed_replies(): + try: + if os.path.exists(PROCESSED_REPLIES_FILE): + with open(PROCESSED_REPLIES_FILE, "r") as f: + data = json.load(f) + return data + return {} + except Exception as e: + print(f"加载已处理回复时出错: {e}") + return {} + + +# 保存已处理的回复 +def save_processed_replies(processed_replies_dict): + try: + with open(PROCESSED_REPLIES_FILE, "w") as f: + json.dump(processed_replies_dict, f) + print(f"成功保存已处理的回复到 {PROCESSED_REPLIES_FILE}") + except Exception as e: + print(f"保存已处理回复时出错: {e}") + + +# 处理回复函数 +def process_replies(replies, tweet_content, your_username, tweet_id, processed_replies_dict, my_user_id): + if tweet_id not in processed_replies_dict: + processed_replies_dict[tweet_id] = [] + + processed_replies = set(processed_replies_dict[tweet_id]) + + print("检查回复数据结构...") + print(f"回复类型: {type(replies)}") + + # 尝试提取回复列表 + reply_items = [] + if isinstance(replies, dict): + if "tweets" in replies and isinstance(replies["tweets"], list): + reply_items = replies["tweets"] + print(f"使用 tweets 字段,找到 {len(reply_items)} 条回复") + elif "data" in replies: + data = replies["data"] + if isinstance(data, list): + reply_items = data + print(f"使用 data 字段,找到 {len(reply_items)} 条回复") + + if not reply_items: + print("未能找到回复项") + return + + for reply in reply_items: + try: + # 提取回复信息 + reply_id = reply.get("id") or reply.get("id_str") + if not reply_id: + print("无法获取回复ID,跳过") + continue + + reply_content = reply.get("text", "") or reply.get("full_text", "") or reply.get("content", "") + + # 提取作者信息 + reply_author = None + if "user" in reply and isinstance(reply["user"], dict): + reply_author = reply["user"].get("username") or reply["user"].get("screen_name") + elif "username" in reply: + reply_author = reply["username"] + + if not reply_author: + print(f"无法获取回复作者,使用默认值") + reply_author = "unknown_user" + + print(f"\n处理回复: ID={reply_id}, 作者={reply_author}") + print(f"回复内容: {reply_content}") + + # 跳过自己的回复或已处理的回复 + if reply_author.lower() == your_username.lower(): + print("跳过自己的回复") + continue + + if reply_id in processed_replies: + print("跳过已处理的回复") + continue + + # 点赞回复 + like_result = like_tweet(reply_id) + # 生成回复内容 + generated_reply = generate_reply(tweet_content, reply_content) + + # 回复推文 + reply_result = reply_to_tweet(reply_id, generated_reply) + if like_result and reply_result: + processed_replies.add(reply_id) + processed_replies_dict[tweet_id] = list(processed_replies) + save_processed_replies(processed_replies_dict) + time.sleep(2) # 点赞/回复间隔 + except Exception as e: + print(f"处理回复时出错: {e}") + continue + + +# 主函数 +def main(): + print("Twitter留言点赞+回复机器人启动 [Tweepy版]") + processed_replies_dict = load_processed_replies() + your_username = "GWAaiagent" # 修改为你的Twitter用户名 + my_user_id = get_my_user_id() + if not my_user_id: + print("无法获取自身user_id,程序退出") + return + tweets_data = get_user_last_tweets(your_username, count=5) + if not tweets_data or "tweets" not in tweets_data or not tweets_data["tweets"]: + print("无法获取最近推文") + return + # 只处理最多5条推文,防止API超量返回 + if 'tweets' in tweets_data and len(tweets_data['tweets']) > 5: + tweets_data['tweets'] = tweets_data['tweets'][:5] + for tweet in tweets_data["tweets"]: + tweet_id = tweet["id"] + tweet_content = tweet.get("text", "") or tweet.get("full_text", "") + if not tweet_content: + continue + print(f"\n处理推文: {tweet_id}") + replies = get_tweet_replies(tweet_id) + if replies: + process_replies(replies, tweet_content, your_username, tweet_id, processed_replies_dict, my_user_id) + else: + print(f"推文 {tweet_id} 没有留言") + time.sleep(3) # 推文处理间隔 + + +if __name__ == "__main__": + try: + main() + except Exception as e: + print(f"程序异常: {e}") \ No newline at end of file diff --git a/TweetsBot.py b/TweetsBot.py new file mode 100644 index 0000000..f2148c9 --- /dev/null +++ b/TweetsBot.py @@ -0,0 +1,195 @@ +import os +from openai import OpenAI +import tweepy +import logging +import requests + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[logging.FileHandler("../../../../twitter_user_bot.log"), logging.StreamHandler()] +) +logger = logging.getLogger("TwitterUserBot") + +class TwitterUserBot: + def __init__(self): + """初始化Twitter和用户推文总结机器人""" + try: + # 初始化OpenAI客户端 + self.openai_client = OpenAI( + api_key="sk-8a121704a9bc4ec6a5ab0ae16e0bc0ba", # 替换为DeepSeek的API密钥 + base_url="https://api.deepseek.com" + ) + + # Twitter API凭证 + self.api_key = "3nt1jN4VvqUaaXGHv9AN5VsTV" + self.api_secret = "M2io73S7TzitFiBw825QIq8atyZRljbIDQuTpH39uFZanQ4XFh" + self.access_token = "1944636908-prxfjL6OIb56BQjuFTdChrUPh81OjmBbV7pfnWw" + self.access_secret = "D5AdCVRvIhGEmTmXA8hL5ciAUxIqNMZ3K3B3YejpqqNKj" + self.bearer_token = "AAAAAAAAAAAAAAAAAAAAAGOd0gEAAAAALtv%2BzLsfGLLa5ydUt60ci6J5ce0%3DMBXViJ1NLY4XYdeuMq1xZQ98kHbeGK5lAJoV2j7Ssmcafk8Skn" + + # 初始化Twitter客户端(用于发送) + self.twitter_client = tweepy.Client( + bearer_token=self.bearer_token, + consumer_key=self.api_key, + consumer_secret=self.api_secret, + access_token=self.access_token, + access_token_secret=self.access_secret + ) + + # 调试信息 + logger.info("TwitterUserBot 初始化成功") + logger.info(f"Twitter API密钥长度: {len(self.api_key)}") + logger.info(f"Twitter API秘钥长度: {len(self.api_secret)}") + logger.info(f"Twitter Access Token长度: {len(self.access_token)}") + logger.info(f"Twitter Access Secret长度: {len(self.access_secret)}") + except Exception as e: + logger.error(f"初始化失败: {str(e)}") + raise + + def get_user_tweets(self, username): + """获取指定用户的最近5条推文,使用 twitterapi.io 的 advanced_search 端点""" + username = username.lstrip('@') + + url = "https://api.twitterapi.io/twitter/tweet/advanced_search" + headers = {"X-API-Key": "e3dad005b0e54bdc88c6178a89adec13"} + params = { + "queryType": "Latest", + "query": f"from:{username}", + "count": 5 + } + try: + logger.info(f"请求URL: {url}") + logger.info(f"请求头: {headers}") + logger.info(f"请求参数: {params}") + + response = requests.get(url, headers=headers, params=params) + logger.info(f"响应状态码: {response.status_code}") + + if response.status_code == 200: + data = response.json() + # 优化日志输出,只打印关键信息 + tweets = data.get("tweets", []) + if tweets: + for tweet in tweets: + tweet_id = tweet.get("id") + created_at = tweet.get("createdAt") + text = tweet.get("text", "")[:100] + logger.info(f"推文ID: {tweet_id},时间: {created_at},内容: {text}...") + tweet_texts = [tweet["text"] for tweet in tweets] + logger.info(f"成功获取用户 '{username}' 的 {len(tweet_texts)} 条推文") + return "\n".join(tweet_texts) + else: + logger.warning(f"没有找到用户 '{username}' 的推文") + return None + else: + logger.error(f"API 请求失败,状态码: {response.status_code}") + try: + error_content = response.text + logger.error(f"错误内容: {error_content}") + except: + pass + return None + except Exception as e: + logger.error(f"获取用户推文失败: {str(e)}") + return None + + def generate_summary(self, tweet_content, username): + """根据推文内容生成总结,以序号分隔并控制长度,删除不完整的要点,使用简体字""" + try: + # 计算最大摘要长度 + prefix = f"{username} 的最近推文摘要:" + max_tweet_length = 280 + max_summary_length = max_tweet_length - len(prefix) - 1 + + # 要求 AI 以序号分隔格式生成摘要,使用简体字,并控制长度 + completion = self.openai_client.chat.completions.create( + model="deepseek-chat", + messages=[ + {"role": "system", "content": "You are a helpful assistant"}, + {"role": "user", + "content": f"请总结用户 @{username} 以下5条推文的主要内容,必须使用简体中文,总结必须以序号(1.、2.、...)分隔每条要点,每条要点简洁明了且以句号或感叹号结尾,总长度不得超过 {max_summary_length} 字符(包括标点、空格和换行符),不要添加字数统计:\n{tweet_content}"} + ], + stream=False + ) + + # 直接获取完整内容 + summary_content = completion.choices[0].message.content if hasattr(completion.choices[0], 'message') else '' + + # 确保返回的内容不超过限制 + summary = summary_content.strip() + if len(summary) > max_summary_length: + summary = summary[:max_summary_length] + + # 检查并删除不完整的最后一条要点 + lines = summary.split('\n') + if lines: + last_line = lines[-1] + # 检查最后一条是否以句号、感叹号或问号结尾 + if not last_line.endswith(('。', '!', '?', '.')): + # 如果不完整,删除最后一条 + lines.pop() + summary = '\n'.join(lines) + + return summary + except Exception as e: + logger.error(f"生成总结失败: {str(e)}") + return None + + def send_tweet(self, summary, username): + """发送推文""" + try: + prefix = f"{username} 的最近推文摘要:" + text = f"{prefix}\n{summary}" + + logger.info(f"准备发送推文: {text}") + logger.info(f"推文长度: {len(text)} 字符") + + response = self.twitter_client.create_tweet(text=text) + tweet_id = response.data['id'] if hasattr(response, 'data') and 'id' in response.data else "未知" + logger.info(f"推文发送成功,ID: {tweet_id}") + return True + except Exception as e: + logger.error(f"发送推文失败: {str(e)}") + return False + + def run(self): + """运行机器人:获取用户推文,生成总结并发送""" + try: + username = input("请输入用户名(不含@符号):") + logger.info(f"接收到用户名: {username}") + + tweet_content = self.get_user_tweets(username) + if not tweet_content: + logger.error("无法获取用户推文,退出") + print(f"未找到用户 @{username} 的推文,程序退出。") + return + + logger.info(f"获取到的用户推文内容: \n{tweet_content}") + print(f"成功获取 @{username} 的推文内容") + + print(f"正在使用AI对推文内容进行总结...") + summary = self.generate_summary(tweet_content, username) + if not summary: + logger.error("无法生成总结,退出") + print("生成总结失败,程序退出。") + return + + logger.info(f"生成总结: {summary}") + print(f"总结生成成功: {summary}") + + print("正在发送总结到Twitter...") + success = self.send_tweet(summary, username) + if success: + print(f"关于 @{username} 的推文摘要已成功发送!") + else: + print("发送推文失败,请检查日志。") + except Exception as e: + logger.error(f"运行时出错: {str(e)}") + print(f"发生错误: {str(e)}") + print("更多详细信息请查看日志。") + +if __name__ == "__main__": + bot = TwitterUserBot() + bot.run() \ No newline at end of file