这篇博文展示了如何在 Python 中使用 Google Spreadsheets 作为数据源构建 Twitter 机器人。
此处介绍的服务最初是为我在 Megacorp Inc 工作的朋友创建的。他们有一个营销情报部门,负责填写有关潜在客户的跟踪信息。此信息存储在 Google 电子表格中。每天都有新的电子表格到达文件夹。然后我的朋友继续查看电子表格中的所有线索,检查谁拥有 Twitter 帐户,并在 Twitter 上骚扰他们关于 Megacorp Inc. 产品的信息。
为了让我的朋友失业,我决定用 Python 脚本代替他繁琐的工作流程。 Python 是一种使简单任务变得简单的编程语言,消除了用尽可能少的行重复自己的感觉。因此,它是粉碎中产阶级劳动力参与的一个很好的选择武器。
该机器人向每个 Twitter 用户发送两条推文。推文和第二条推文之间的时间是随机的,只是为了确保没有人可以轻易地认为他们实际上是在与机器人交流。
这个 Twitter 机器人的成分是
- Python 3.4+ ——人人喜爱的蛇形编程语言
- gspread—— 一个用于谷歌电子表格的 Python 客户端,可以减少读取和操作数据的痛苦
- tweepy – Python 的 Twitter 客户端库
- ZODB – 一个符合 ACID 标准的本地 Python 对象事务数据库
该脚本几乎是独立的,大约 200 行 Python 代码和三个小时的工作。
1.第三方服务认证
该机器人使用 OAuth 协议针对 Google 服务(Google Drive、Google Spreadsheet)和 Twitter 对自己进行身份验证。在 OAuth 中,您可以通过普通的 Web 浏览器访问服务提供商网站。如果您尚未登录,该服务会要求您登录。然后您会被要求授权该应用程序。 Twitter 身份验证是在一个单独的脚本 tweepyauth.py 中完成的,该脚本要求您输入 Twitter 网站上显示的 PIN 码。 Google API 客户端以不同的方式做事并启动在本地主机端口中运行的本地 Web 服务器。当您在 Google 服务上授权时,它会将您重定向回本地网络服务器,脚本会从那里获取身份验证令牌。
该脚本将身份验证令牌存储在 JSON 文件中您可以先在本地计算机上运行该脚本以生成 JSON 文件,然后将其移动到可能无法使用 Web 浏览器进行身份验证的服务器。
2. 保持持久状态
机器人需要保持状态。它需要每天处理一个新的电子表格,但有时机器人可能不会运行。因此,它需要记住已经处理过的电子表格。有时电子表格可能包含同一 Twitter 句柄的重复条目,我们不想一遍又一遍地骚扰这些 Twitter 用户。一些数据清理应用于列内容,因为它可能是原始 Twitter 句柄、Twitter 用户的 HTTP 或 HTTPS URL——那些营销情报人员对他们泄漏到电子表格中的内容不是很严格。
使用 ZODB 维护状态。 ZODB 是一个非常健壮的事务数据库。它很成熟,可能比某些博文读者更老,在全球范围内拥有数 GB 的部署工厂。它可以像 SQLite 一样在进程内运行,不需要在机器上运行其他软件。它不需要任何 ORM,因为它使用本机 Python 对象。因此,要使您的应用程序持久化,您只需将 Python 对象粘贴到 ZODB 根目录。事务上下文管理器中的所有内容都写入磁盘,或者什么都不写入磁盘。
作为旁注,在他们的 REST API 上使用 Google Spreadsheets 非常慢。如果您需要处理大量数据,将数据作为 CSV 导出文件下载到本地并从那里执行可能会更高效。
三、使用说明
此代码是示例性的。您无法使用它,因为您没有正确的数据或无法访问数据。用它来激发你的想象力。但是,如果您要使用它,则会发生这样的情况:
- 按照官方 Python 包安装指南安装依赖项
- 注册推特应用程序。 将回调 URL 留空。
- 注册一个谷歌服务应用程序 。
- 运行 tweepyauth.py 以获取存储在 twitter_oauth.json 中的 Twitter 令牌。
- 在您的本地计算机上运行一次 bot 并根据 Google 服务对其进行身份验证并编写 client_secrets.json 。
- 看到机器人开始工作。
- 将其移动到服务器。
- 让它永远在 Bash 提示符下循环运行: $ while true;做 python chirper.py;睡觉
4.源代码
鸣叫.py
"""
Installation:
pip install --upgrade oauth2client gspread google-api-python-client ZODB zodbpickle tweepy iso8601
"""
import time
import datetime
import json
import httplib2
import os
import sys
Authorize server-to-server interactions from Google Compute Engine.
from apiclient import discovery
import oauth2client
from oauth2client import client
from oauth2client import tools
ZODB
import ZODB
import ZODB.FileStorage
import BTrees.OOBTree
from persistent.mapping import PersistentMapping
import random
import transaction
Date parsing
import iso8601
https://github.com/burnash/gspread
import gspread
Twitter client
import tweepy
try:
import argparse
flags = argparse.ArgumentParser(parents=[tools.argparser]).parse_args()
except ImportError:
flags = None
We need permissions to drive list files, drive read files, spreadsheet manipulation
SCOPES = ['https://www.googleapis.com/auth/devstorage.read_write', 'https://www.googleapis.com/auth/drive.metadata.readonly', 'https://spreadsheets.google.com/feeds']
CLIENT_SECRET_FILE = 'client_secrets.json'
APPLICATION_NAME = 'MEGACORP SPREADSHEET SCRAPER BOT'
OAUTH_DATABASE = "oauth_authorization.json"
FIRST_TWEET_CHOICES = [
"WE AT MEGACORP THINK YOU MIGHT LIKE US - http://megacorp.example.com",
]
SECOND_TWEET_CHOICES = [
"AS WELL, WE ARE PROBABLY CHEAPER THAN COMPETITORCORP INC. http://megacorp.example.com/prices",
"AS WELL, OUR FEATURE SET IS LONGER THAN MISSISSIPPI http://megacorp.example.com/features",
"AS WELL, OUR CEO IS VERY HANDSOME http://megacorp.example.com/team",
]
Make sure our text is edited correctly
for tweet in FIRST_TWEET_CHOICES + SECOND_TWEET_CHOICES:
assert len(tweet) < 140
How many tweets can be send in one run... limit for testing / debugging
MAX_TWEET_COUNT = 10
https://developers.google.com/drive/web/quickstart/python
def get_google_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
credential_path = os.path.join(os.getcwd(), OAUTH_DATABASE)
store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatability with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def get_tweepy():
"""Create a Tweepy client instance."""
creds = json.load(open("twitter_oauth.json", "rt"))
auth = tweepy.OAuthHandler(creds["consumer_key"], creds["consumer_secret"])
auth.set_access_token(creds["access_token"], creds["access_token_secret"])
api = tweepy.API(auth)
return api
def get_database():
"""Get or create a ZODB database where we store information about processed spreadsheets and sent tweets."""
storage = ZODB.FileStorage.FileStorage('chirper.data.fs')
db = ZODB.DB(storage)
connection = db.open()
root = connection.root
# Initialize root data structure if not present yet
with transaction.manager:
if not hasattr(root, "files"):
root.files = BTrees.OOBTree.BTree()
if not hasattr(root, "twitter_handles"):
# Format of {added: datetime, imported: datetime, sheet: str, first_tweet_at: datetime, second_tweet_at: datetime}
root.twitter_handles = BTrees.OOBTree.BTree()
return root
def extract_twitter_handles(spread, sheet_id, column_id="L"):
"""Process one spreadsheet and return Twitter handles in it."""
twitter_url_prefix = ["https://twitter.com/", "http://twitter.com/"]
worksheet = spread.open_by_key(sheet_id).sheet1
col_index = ord(column_id) - ord("A") + 1
# Painfully slow, 2600 records = 3+ min.
start = time.time()
print("Fetching data from sheet {}".format(sheet_id))
twitter_urls = worksheet.col_values(col_index)
print("Fetched everything in {} seconds".format(time.time() - start))
valid_handles = []
# Cell contents are URLs (possibly) pointing to a Twitter
# Extract the Twitter handle from these urls if they exist
for cell_content in twitter_urls:
if not cell_content:
continue
# Twitter handle as it
if "://" not in cell_content:
valid_handles.append(cell_content.strip())
continue
# One cell can contain multiple URLs, comma separated
urls = [url.strip() for url in cell_content.split(",")]
for url in urls:
for prefix in twitter_url_prefix:
if url.startswith(prefix):
handle = url[len(prefix):]
# Clean old style fragment URLs e.g #!/foobar
if handle.startswith("#!/"):
handle = handle[len("#!/"):]
valid_handles.append(handle)
return valid_handles
def watch_files(http, title_match=None, folder_id=None) -> list:
"""Check all Google Drive files which match certain file pattern.
Drive API:
https://developers.google.com/drive/web/search-parameters
:return: Iterable GDrive file list
"""
service = discovery.build('drive', 'v2', http=http)
if folder_id:
results = service.files().list(q="'{}' in parents".format(folder_id)).execute()
elif title_match:
results = service.files().list(q="title contains '{}'".format(title_match)).execute()
else:
raise RuntimeError("Unknown criteria")
return results["items"]
def scan_for_new_spreadsheets(http, db):
"""Check Google Drive for new spreadsheets.
1. Use Google Drive API to list all files matching our spreadsheet criteria
2. If the file is not seen before add it to our list of files to process
"""
# First discover new spreadsheets
discovered = False
for file in watch_files(http, folder_id="0BytechWnbrJVTlNqbGpWZllaYW8"):
title = file["title"]
last_char = title[-1]
# It's .csv, photos, etc. misc files
if not last_char.isdigit():
continue
with transaction.manager:
file_id = file["id"]
if file_id not in db.files:
print("Discovered file {}: {}".format(file["title"], file_id))
db.files[file_id] = PersistentMapping(file)
discovered = True
if not discovered:
print("No new spreadsheets available")
def extract_twitter_handles_from_spreadsheets(spread, db):
"""Extract new Twitter handles from spreadsheets.
1. Go through all spreadsheets we know.
2. If the spreadsheet is not marked as processed extract Twitter handles out of it
3. If any of the Twitter handles is unseen before add it to the database with empty record
"""
# Then extract Twitter handles from the files we know about
for file_id, file_data in db.files.items():
spreadsheet_creation_date = iso8601.parse_date(file_data["createdDate"])
print("Processing {} created at {}".format(file_data["title"], spreadsheet_creation_date))
# Check the processing flag on the file
if not file_data.get("processed"):
handles = extract_twitter_handles(spread, file_id)
# Using this transaction lock we write all the handles to the database once or none of them
with transaction.manager:
for handle in handles:
# If we have not seen this
if handle not in db.twitter_handles:
print("Importing Twitter handle {}".format(handle))
db.twitter_handles[handle] = PersistentMapping({"added": spreadsheet_creation_date, "imported": datetime.datetime.utcnow(), "sheet": file_id})
file_data["processed"] = True
def send_tweet(twitter, msg):
"""Send a Tweet.
"""
try:
twitter.update_status(status=msg)
except tweepy.error.TweepError as e:
try:
# {"errors":[{"code":187,"message":"Status is a duplicate."}]}
resp = json.loads(e.response.text)
if resp.get("errors"):
if resp["errors"][0]["code"] == 187:
print("Was duplicate {}".format(msg))
time.sleep(10 + random.randint(0, 10))
return
except:
pass
raise RuntimeError("Twitter doesn't like us: {}".format(e.response.text or str(e))) from e
# Throttle down the bot
time.sleep(30 + random.randint(0, 90))
def tweet_everything(twitter, db):
"""Run through all users and check if we need to Tweet to them. """
tweet_count = 0
for handle_id, handle_data in db.twitter_handles.items():
with transaction.manager:
# Check if we had not sent the first Tweet yet and send it
if not handle_data.get("first_tweet_at"):
tweet = "@{} {}".format(handle_id, random.choice(FIRST_TWEET_CHOICES))
print("Tweeting {} at {}".format(tweet, datetime.datetime.utcnow()))
send_tweet(twitter, tweet)
handle_data["first_tweet_at"] = datetime.datetime.utcnow()
tweet_count += 1
# Check if we had not sent the first Tweet yet and send it
elif not handle_data.get("second_tweet_at"):
tweet = "@{} {}".format(handle_id, random.choice(SECOND_TWEET_CHOICES))
print("Tweeting {} at {}".format(tweet, datetime.datetime.utcnow()))
send_tweet(twitter, tweet)
handle_data["second_tweet_at"] = datetime.datetime.utcnow()
tweet_count += 1
if tweet_count >= MAX_TWEET_COUNT:
# Testing limiter - don't spam too much if our test run is out of control
break
def main():
script_name = sys.argv[1] if sys.argv[0] == "python" else sys.argv[0]
print("Starting {} at {} UTC".format(script_name, datetime.datetime.utcnow()))
# open database
db = get_database()
# get OAuth permissions from Google for Drive client and Spreadsheet client
credentials = get_google_credentials()
http = credentials.authorize(httplib2.Http())
spread = gspread.authorize(credentials)
twitter = get_tweepy()
# Do action
scan_for_new_spreadsheets(http, db)
extract_twitter_handles_from_spreadsheets(spread, db)
tweet_everything(twitter, db)
main()
tweepyauth.py
"""
Installation:
pip install --upgrade oauth2client gspread google-api-python-client ZODB zodbpickle tweepy iso8601
"""
import time
import datetime
import json
import httplib2
import os
import sys
Authorize server-to-server interactions from Google Compute Engine.
from apiclient import discovery
import oauth2client
from oauth2client import client
from oauth2client import tools
ZODB
import ZODB
import ZODB.FileStorage
import BTrees.OOBTree
from persistent.mapping import PersistentMapping
import random
import transaction
Date parsing
import iso8601
https://github.com/burnash/gspread
import gspread
Twitter client
import tweepy
try:
import argparse
flags = argparse.ArgumentParser(parents=[tools.argparser]).parse_args()
except ImportError:
flags = None
We need permissions to drive list files, drive read files, spreadsheet manipulation
SCOPES = ['https://www.googleapis.com/auth/devstorage.read_write', 'https://www.googleapis.com/auth/drive.metadata.readonly', 'https://spreadsheets.google.com/feeds']
CLIENT_SECRET_FILE = 'client_secrets.json'
APPLICATION_NAME = 'MEGACORP SPREADSHEET SCRAPER BOT'
OAUTH_DATABASE = "oauth_authorization.json"
FIRST_TWEET_CHOICES = [
"WE AT MEGACORP THINK YOU MIGHT LIKE US - http://megacorp.example.com",
]
SECOND_TWEET_CHOICES = [
"AS WELL, WE ARE PROBABLY CHEAPER THAN COMPETITORCORP INC. http://megacorp.example.com/prices",
"AS WELL, OUR FEATURE SET IS LONGER THAN MISSISSIPPI http://megacorp.example.com/features",
"AS WELL, OUR CEO IS VERY HANDSOME http://megacorp.example.com/team",
]
Make sure our text is edited correctly
for tweet in FIRST_TWEET_CHOICES + SECOND_TWEET_CHOICES:
assert len(tweet) < 140
How many tweets can be send in one run... limit for testing / debugging
MAX_TWEET_COUNT = 10
https://developers.google.com/drive/web/quickstart/python
def get_google_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
credential_path = os.path.join(os.getcwd(), OAUTH_DATABASE)
store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatability with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def get_tweepy():
"""Create a Tweepy client instance."""
creds = json.load(open("twitter_oauth.json", "rt"))
auth = tweepy.OAuthHandler(creds["consumer_key"], creds["consumer_secret"])
auth.set_access_token(creds["access_token"], creds["access_token_secret"])
api = tweepy.API(auth)
return api
def get_database():
"""Get or create a ZODB database where we store information about processed spreadsheets and sent tweets."""
storage = ZODB.FileStorage.FileStorage('chirper.data.fs')
db = ZODB.DB(storage)
connection = db.open()
root = connection.root
# Initialize root data structure if not present yet
with transaction.manager:
if not hasattr(root, "files"):
root.files = BTrees.OOBTree.BTree()
if not hasattr(root, "twitter_handles"):
# Format of {added: datetime, imported: datetime, sheet: str, first_tweet_at: datetime, second_tweet_at: datetime}
root.twitter_handles = BTrees.OOBTree.BTree()
return root
def extract_twitter_handles(spread, sheet_id, column_id="L"):
"""Process one spreadsheet and return Twitter handles in it."""
twitter_url_prefix = ["https://twitter.com/", "http://twitter.com/"]
worksheet = spread.open_by_key(sheet_id).sheet1
col_index = ord(column_id) - ord("A") + 1
# Painfully slow, 2600 records = 3+ min.
start = time.time()
print("Fetching data from sheet {}".format(sheet_id))
twitter_urls = worksheet.col_values(col_index)
print("Fetched everything in {} seconds".format(time.time() - start))
valid_handles = []
# Cell contents are URLs (possibly) pointing to a Twitter
# Extract the Twitter handle from these urls if they exist
for cell_content in twitter_urls:
if not cell_content:
continue
# Twitter handle as it
if "://" not in cell_content:
valid_handles.append(cell_content.strip())
continue
# One cell can contain multiple URLs, comma separated
urls = [url.strip() for url in cell_content.split(",")]
for url in urls:
for prefix in twitter_url_prefix:
if url.startswith(prefix):
handle = url[len(prefix):]
# Clean old style fragment URLs e.g #!/foobar
if handle.startswith("#!/"):
handle = handle[len("#!/"):]
valid_handles.append(handle)
return valid_handles
def watch_files(http, title_match=None, folder_id=None) -> list:
"""Check all Google Drive files which match certain file pattern.
Drive API:
https://developers.google.com/drive/web/search-parameters
:return: Iterable GDrive file list
"""
service = discovery.build('drive', 'v2', http=http)
if folder_id:
results = service.files().list(q="'{}' in parents".format(folder_id)).execute()
elif title_match:
results = service.files().list(q="title contains '{}'".format(title_match)).execute()
else:
raise RuntimeError("Unknown criteria")
return results["items"]
def scan_for_new_spreadsheets(http, db):
"""Check Google Drive for new spreadsheets.
1. Use Google Drive API to list all files matching our spreadsheet criteria
2. If the file is not seen before add it to our list of files to process
"""
# First discover new spreadsheets
discovered = False
for file in watch_files(http, folder_id="0BytechWnbrJVTlNqbGpWZllaYW8"):
title = file["title"]
last_char = title[-1]
# It's .csv, photos, etc. misc files
if not last_char.isdigit():
continue
with transaction.manager:
file_id = file["id"]
if file_id not in db.files:
print("Discovered file {}: {}".format(file["title"], file_id))
db.files[file_id] = PersistentMapping(file)
discovered = True
if not discovered:
print("No new spreadsheets available")
def extract_twitter_handles_from_spreadsheets(spread, db):
"""Extract new Twitter handles from spreadsheets.
1. Go through all spreadsheets we know.
2. If the spreadsheet is not marked as processed extract Twitter handles out of it
3. If any of the Twitter handles is unseen before add it to the database with empty record
"""
# Then extract Twitter handles from the files we know about
for file_id, file_data in db.files.items():
spreadsheet_creation_date = iso8601.parse_date(file_data["createdDate"])
print("Processing {} created at {}".format(file_data["title"], spreadsheet_creation_date))
# Check the processing flag on the file
if not file_data.get("processed"):
handles = extract_twitter_handles(spread, file_id)
# Using this transaction lock we write all the handles to the database once or none of them
with transaction.manager:
for handle in handles:
# If we have not seen this
if handle not in db.twitter_handles:
print("Importing Twitter handle {}".format(handle))
db.twitter_handles[handle] = PersistentMapping({"added": spreadsheet_creation_date, "imported": datetime.datetime.utcnow(), "sheet": file_id})
file_data["processed"] = True
def send_tweet(twitter, msg):
"""Send a Tweet.
"""
try:
twitter.update_status(status=msg)
except tweepy.error.TweepError as e:
try:
# {"errors":[{"code":187,"message":"Status is a duplicate."}]}
resp = json.loads(e.response.text)
if resp.get("errors"):
if resp["errors"][0]["code"] == 187:
print("Was duplicate {}".format(msg))
time.sleep(10 + random.randint(0, 10))
return
except:
pass
raise RuntimeError("Twitter doesn't like us: {}".format(e.response.text or str(e))) from e
# Throttle down the bot
time.sleep(30 + random.randint(0, 90))
def tweet_everything(twitter, db):
"""Run through all users and check if we need to Tweet to them. """
tweet_count = 0
for handle_id, handle_data in db.twitter_handles.items():
with transaction.manager:
# Check if we had not sent the first Tweet yet and send it
if not handle_data.get("first_tweet_at"):
tweet = "@{} {}".format(handle_id, random.choice(FIRST_TWEET_CHOICES))
print("Tweeting {} at {}".format(tweet, datetime.datetime.utcnow()))
send_tweet(twitter, tweet)
handle_data["first_tweet_at"] = datetime.datetime.utcnow()
tweet_count += 1
# Check if we had not sent the first Tweet yet and send it
elif not handle_data.get("second_tweet_at"):
tweet = "@{} {}".format(handle_id, random.choice(SECOND_TWEET_CHOICES))
print("Tweeting {} at {}".format(tweet, datetime.datetime.utcnow()))
send_tweet(twitter, tweet)
handle_data["second_tweet_at"] = datetime.datetime.utcnow()
tweet_count += 1
if tweet_count >= MAX_TWEET_COUNT:
# Testing limiter - don't spam too much if our test run is out of control
break
def main():
script_name = sys.argv[1] if sys.argv[0] == "python" else sys.argv[0]
print("Starting {} at {} UTC".format(script_name, datetime.datetime.utcnow()))
# open database
db = get_database()
# get OAuth permissions from Google for Drive client and Spreadsheet client
credentials = get_google_credentials()
http = credentials.authorize(httplib2.Http())
spread = gspread.authorize(credentials)
twitter = get_tweepy()
# Do action
scan_for_new_spreadsheets(http, db)
extract_twitter_handles_from_spreadsheets(spread, db)
tweet_everything(twitter, db)
main()