在 Python 中使用 Google 电子表格构建 Twitter 机器人

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论

  • 新项目:《从零手撸:仿小红书(微服务架构)》 正在持续爆肝中,基于 Spring Cloud Alibaba + Spring Boot 3.x + JDK 17...点击查看项目介绍 ;
  • 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/ ;

截止目前, 星球 内专栏累计输出 63w+ 字,讲解图 2808+ 张,还在持续爆肝中.. 后续还会上新更多项目,目标是将 Java 领域典型的项目都整一波,如秒杀系统, 在线商城, IM 即时通讯,权限管理,Spring Cloud Alibaba 微服务等等,已有 2200+ 小伙伴加入学习 ,欢迎点击围观

这篇博文展示了如何在 Python 中使用 Google Spreadsheets 作为数据源构建 Twitter 机器人。

此处介绍的服务最初是为我在 Megacorp Inc 工作的朋友创建的。他们有一个营销情报部门,负责填写有关潜在客户的跟踪信息。此信息存储在 Google 电子表格中。每天都有新的电子表格到达文件夹。然后我的朋友继续查看电子表格中的所有线索,检查谁拥有 Twitter 帐户,并在 Twitter 上骚扰他们关于 Megacorp Inc. 产品的信息。

为了让我的朋友失业,我决定用 Python 脚本代替他繁琐的工作流程。 Python 是一种使简单任务变得简单的编程语言,消除了用尽可能少的行重复自己的感觉。因此,它是粉碎中产阶级劳动力参与的一个很好的选择武器。

该机器人向每个 Twitter 用户发送两条推文。推文和第二条推文之间的时间是随机的,只是为了确保没有人可以轻易地认为他们实际上是在与机器人交流。

这个 Twitter 机器人的成分是

  • Python 3.4+ ——人人喜爱的蛇形编程语言
  • gspread—— 一个用于谷歌电子表格的 Python 客户端,可以减少读取和操作数据的痛苦
  • tweepy – Python 的 Twitter 客户端库
  • ZODB – 一个符合 ACID 标准的本地 Python 对象事务数据库

该脚本几乎是独立的,大约 200 行 Python 代码和三个小时的工作。

1.第三方服务认证

该机器人使用 OAuth 协议针对 Google 服务(Google Drive、Google Spreadsheet)和 Twitter 对自己进行身份验证。在 OAuth 中,您可以通过普通的 Web 浏览器访问服务提供商网站。如果您尚未登录,该服务会要求您登录。然后您会被要求授权该应用程序。 Twitter 身份验证是在一个单独的脚本 tweepyauth.py 中完成的,该脚本要求您输入 Twitter 网站上显示的 PIN 码。 Google API 客户端以不同的方式做事并启动在本地主机端口中运行的本地 Web 服务器。当您在 Google 服务上授权时,它会将您重定向回本地网络服务器,脚本会从那里获取身份验证令牌。

该脚本将身份验证令牌存储在 JSON 文件中您可以先在本地计算机上运行该脚本以生成 JSON 文件,然后将其移动到可能无法使用 Web 浏览器进行身份验证的服务器。

2. 保持持久状态

机器人需要保持状态。它需要每天处理一个新的电子表格,但有时机器人可能不会运行。因此,它需要记住已经处理过的电子表格。有时电子表格可能包含同一 Twitter 句柄的重复条目,我们不想一遍又一遍地骚扰这些 Twitter 用户。一些数据清理应用于列内容,因为它可能是原始 Twitter 句柄、Twitter 用户的 HTTP 或 HTTPS URL——那些营销情报人员对他们泄漏到电子表格中的内容不是很严格。

使用 ZODB 维护状态。 ZODB 是一个非常健壮的事务数据库。它很成熟,可能比某些博文读者更老,在全球范围内拥有数 GB 的部署工厂。它可以像 SQLite 一样在进程内运行,不需要在机器上运行其他软件。它不需要任何 ORM,因为它使用本机 Python 对象。因此,要使您的应用程序持久化,您只需将 Python 对象粘贴到 ZODB 根目录。事务上下文管理器中的所有内容都写入磁盘,或者什么都不写入磁盘。

作为旁注,在他们的 REST API 上使用 Google Spreadsheets 非常慢。如果您需要处理大量数据,将数据作为 CSV 导出文件下载到本地并从那里执行可能会更高效。

三、使用说明

此代码是示例性的。您无法使用它,因为您没有正确的数据或无法访问数据。用它来激发你的想象力。但是,如果您要使用它,则会发生这样的情况:

4.源代码

鸣叫.py


 """

Installation:

pip install --upgrade oauth2client gspread google-api-python-client ZODB zodbpickle tweepy iso8601

"""

import time import datetime import json import httplib2 import os import sys

Authorize server-to-server interactions from Google Compute Engine.

from apiclient import discovery import oauth2client from oauth2client import client from oauth2client import tools

ZODB

import ZODB import ZODB.FileStorage import BTrees.OOBTree from persistent.mapping import PersistentMapping import random import transaction

Date parsing

import iso8601

https://github.com/burnash/gspread

import gspread

Twitter client

import tweepy

try: import argparse flags = argparse.ArgumentParser(parents=[tools.argparser]).parse_args() except ImportError: flags = None

We need permissions to drive list files, drive read files, spreadsheet manipulation

SCOPES = ['https://www.googleapis.com/auth/devstorage.read_write', 'https://www.googleapis.com/auth/drive.metadata.readonly', 'https://spreadsheets.google.com/feeds'] CLIENT_SECRET_FILE = 'client_secrets.json' APPLICATION_NAME = 'MEGACORP SPREADSHEET SCRAPER BOT' OAUTH_DATABASE = "oauth_authorization.json"

FIRST_TWEET_CHOICES = [ "WE AT MEGACORP THINK YOU MIGHT LIKE US - http://megacorp.example.com", ]

SECOND_TWEET_CHOICES = [ "AS WELL, WE ARE PROBABLY CHEAPER THAN COMPETITORCORP INC. http://megacorp.example.com/prices", "AS WELL, OUR FEATURE SET IS LONGER THAN MISSISSIPPI http://megacorp.example.com/features", "AS WELL, OUR CEO IS VERY HANDSOME http://megacorp.example.com/team",

]

Make sure our text is edited correctly

for tweet in FIRST_TWEET_CHOICES + SECOND_TWEET_CHOICES: assert len(tweet) < 140

How many tweets can be send in one run... limit for testing / debugging

MAX_TWEET_COUNT = 10

https://developers.google.com/drive/web/quickstart/python

def get_google_credentials(): """Gets valid user credentials from storage.

If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.

Returns:
    Credentials, the obtained credential.
"""

credential_path = os.path.join(os.getcwd(), OAUTH_DATABASE)

store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
    flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
    flow.user_agent = APPLICATION_NAME
    if flags:
        credentials = tools.run_flow(flow, store, flags)
    else: # Needed only for compatability with Python 2.6
        credentials = tools.run(flow, store)
    print('Storing credentials to ' + credential_path)
return credentials

def get_tweepy(): """Create a Tweepy client instance.""" creds = json.load(open("twitter_oauth.json", "rt"))

auth = tweepy.OAuthHandler(creds["consumer_key"], creds["consumer_secret"])
auth.set_access_token(creds["access_token"], creds["access_token_secret"])
api = tweepy.API(auth)
return api

def get_database(): """Get or create a ZODB database where we store information about processed spreadsheets and sent tweets."""

storage = ZODB.FileStorage.FileStorage('chirper.data.fs')
db = ZODB.DB(storage)
connection = db.open()
root = connection.root

# Initialize root data structure if not present yet
with transaction.manager:
    if not hasattr(root, "files"):
        root.files = BTrees.OOBTree.BTree()
    if not hasattr(root, "twitter_handles"):
        # Format of {added: datetime, imported: datetime, sheet: str, first_tweet_at: datetime, second_tweet_at: datetime}
        root.twitter_handles = BTrees.OOBTree.BTree()


return root

def extract_twitter_handles(spread, sheet_id, column_id="L"): """Process one spreadsheet and return Twitter handles in it."""

twitter_url_prefix = ["https://twitter.com/", "http://twitter.com/"]

worksheet = spread.open_by_key(sheet_id).sheet1

col_index = ord(column_id) - ord("A") + 1

# Painfully slow, 2600 records = 3+ min.
start = time.time()
print("Fetching data from sheet {}".format(sheet_id))
twitter_urls =  worksheet.col_values(col_index)
print("Fetched everything in {} seconds".format(time.time() - start))

valid_handles = []

# Cell contents are URLs (possibly) pointing to a Twitter
# Extract the Twitter handle from these urls if they exist
for cell_content in twitter_urls:

    if not cell_content:
        continue

    # Twitter handle as it
    if "://" not in cell_content:
        valid_handles.append(cell_content.strip())
        continue

    # One cell can contain multiple URLs, comma separated
    urls = [url.strip() for url in cell_content.split(",")]

    for url in urls:
        for prefix in twitter_url_prefix:
            if url.startswith(prefix):
                handle = url[len(prefix):]

                # Clean old style fragment URLs e.g #!/foobar
                if handle.startswith("#!/"):
                    handle = handle[len("#!/"):]

                valid_handles.append(handle)

return valid_handles

def watch_files(http, title_match=None, folder_id=None) -> list: """Check all Google Drive files which match certain file pattern.

Drive API:

https://developers.google.com/drive/web/search-parameters

:return: Iterable GDrive file list
"""

service = discovery.build('drive', 'v2', http=http)

if folder_id:
    results = service.files().list(q="'{}' in parents".format(folder_id)).execute()
elif title_match:
    results = service.files().list(q="title contains '{}'".format(title_match)).execute()
else:
    raise RuntimeError("Unknown criteria")

return results["items"]

def scan_for_new_spreadsheets(http, db): """Check Google Drive for new spreadsheets.

    1. Use Google Drive API to list all files matching our spreadsheet criteria
    2. If the file is not seen before add it to our list of files to process
"""
# First discover new spreadsheets

discovered = False

for file in watch_files(http, folder_id="0BytechWnbrJVTlNqbGpWZllaYW8"):
    title = file["title"]
    last_char = title[-1]

    # It's .csv, photos, etc. misc files
    if not last_char.isdigit():
        continue

    with transaction.manager:
        file_id = file["id"]
        if file_id not in db.files:
            print("Discovered file {}: {}".format(file["title"], file_id))
            db.files[file_id] = PersistentMapping(file)
            discovered = True

if not discovered:
    print("No new spreadsheets available")

def extract_twitter_handles_from_spreadsheets(spread, db): """Extract new Twitter handles from spreadsheets.

    1. Go through all spreadsheets we know.
    2. If the spreadsheet is not marked as processed extract Twitter handles out of it
    3. If any of the Twitter handles is unseen before add it to the database with empty record

"""

# Then extract Twitter handles from the files we know about
for file_id, file_data in db.files.items():

    spreadsheet_creation_date = iso8601.parse_date(file_data["createdDate"])

    print("Processing {} created at {}".format(file_data["title"], spreadsheet_creation_date))

    # Check the processing flag on the file
    if not file_data.get("processed"):
        handles = extract_twitter_handles(spread, file_id)

        # Using this transaction lock we write all the handles to the database once or none of them
        with transaction.manager:
            for handle in handles:
                # If we have not seen this
                if handle not in db.twitter_handles:
                    print("Importing Twitter handle {}".format(handle))
                    db.twitter_handles[handle] = PersistentMapping({"added": spreadsheet_creation_date, "imported": datetime.datetime.utcnow(), "sheet": file_id})

            file_data["processed"] = True

def send_tweet(twitter, msg): """Send a Tweet. """

try:
    twitter.update_status(status=msg)
except tweepy.error.TweepError as e:
    try:
        # {"errors":[{"code":187,"message":"Status is a duplicate."}]}
        resp = json.loads(e.response.text)
        if resp.get("errors"):
            if resp["errors"][0]["code"] == 187:
                print("Was duplicate {}".format(msg))
                time.sleep(10 + random.randint(0, 10))
                return
    except:
        pass

    raise RuntimeError("Twitter doesn't like us: {}".format(e.response.text or str(e))) from e

# Throttle down the bot
time.sleep(30 + random.randint(0, 90))

def tweet_everything(twitter, db): """Run through all users and check if we need to Tweet to them. """

tweet_count = 0

for handle_id, handle_data in db.twitter_handles.items():

    with transaction.manager:

        # Check if we had not sent the first Tweet yet and send it
        if not handle_data.get("first_tweet_at"):

            tweet = "@{} {}".format(handle_id, random.choice(FIRST_TWEET_CHOICES))

            print("Tweeting {} at {}".format(tweet, datetime.datetime.utcnow()))
            send_tweet(twitter, tweet)
            handle_data["first_tweet_at"] = datetime.datetime.utcnow()
            tweet_count += 1

        # Check if we had not sent the first Tweet yet and send it
        elif not handle_data.get("second_tweet_at"):

            tweet = "@{} {}".format(handle_id, random.choice(SECOND_TWEET_CHOICES))

            print("Tweeting {} at {}".format(tweet, datetime.datetime.utcnow()))
            send_tweet(twitter, tweet)
            handle_data["second_tweet_at"] = datetime.datetime.utcnow()
            tweet_count += 1

    if tweet_count &gt;= MAX_TWEET_COUNT:
        # Testing limiter - don't spam too much if our test run is out of control
        break

def main():

script_name = sys.argv[1] if sys.argv[0] == "python" else sys.argv[0]
print("Starting {} at {} UTC".format(script_name, datetime.datetime.utcnow()))

# open database
db = get_database()

# get OAuth permissions from Google for Drive client and Spreadsheet client
credentials = get_google_credentials()
http = credentials.authorize(httplib2.Http())
spread = gspread.authorize(credentials)
twitter = get_tweepy()

# Do action
scan_for_new_spreadsheets(http, db)
extract_twitter_handles_from_spreadsheets(spread, db)
tweet_everything(twitter, db)

main()

tweepyauth.py


 """

Installation:

pip install --upgrade oauth2client gspread google-api-python-client ZODB zodbpickle tweepy iso8601

"""

import time import datetime import json import httplib2 import os import sys

Authorize server-to-server interactions from Google Compute Engine.

from apiclient import discovery import oauth2client from oauth2client import client from oauth2client import tools

ZODB

import ZODB import ZODB.FileStorage import BTrees.OOBTree from persistent.mapping import PersistentMapping import random import transaction

Date parsing

import iso8601

https://github.com/burnash/gspread

import gspread

Twitter client

import tweepy

try: import argparse flags = argparse.ArgumentParser(parents=[tools.argparser]).parse_args() except ImportError: flags = None

We need permissions to drive list files, drive read files, spreadsheet manipulation

SCOPES = ['https://www.googleapis.com/auth/devstorage.read_write', 'https://www.googleapis.com/auth/drive.metadata.readonly', 'https://spreadsheets.google.com/feeds'] CLIENT_SECRET_FILE = 'client_secrets.json' APPLICATION_NAME = 'MEGACORP SPREADSHEET SCRAPER BOT' OAUTH_DATABASE = "oauth_authorization.json"

FIRST_TWEET_CHOICES = [ "WE AT MEGACORP THINK YOU MIGHT LIKE US - http://megacorp.example.com", ]

SECOND_TWEET_CHOICES = [ "AS WELL, WE ARE PROBABLY CHEAPER THAN COMPETITORCORP INC. http://megacorp.example.com/prices", "AS WELL, OUR FEATURE SET IS LONGER THAN MISSISSIPPI http://megacorp.example.com/features", "AS WELL, OUR CEO IS VERY HANDSOME http://megacorp.example.com/team",

]

Make sure our text is edited correctly

for tweet in FIRST_TWEET_CHOICES + SECOND_TWEET_CHOICES: assert len(tweet) < 140

How many tweets can be send in one run... limit for testing / debugging

MAX_TWEET_COUNT = 10

https://developers.google.com/drive/web/quickstart/python

def get_google_credentials(): """Gets valid user credentials from storage.

If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.

Returns:
    Credentials, the obtained credential.
"""

credential_path = os.path.join(os.getcwd(), OAUTH_DATABASE)

store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
    flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
    flow.user_agent = APPLICATION_NAME
    if flags:
        credentials = tools.run_flow(flow, store, flags)
    else: # Needed only for compatability with Python 2.6
        credentials = tools.run(flow, store)
    print('Storing credentials to ' + credential_path)
return credentials

def get_tweepy(): """Create a Tweepy client instance.""" creds = json.load(open("twitter_oauth.json", "rt"))

auth = tweepy.OAuthHandler(creds["consumer_key"], creds["consumer_secret"])
auth.set_access_token(creds["access_token"], creds["access_token_secret"])
api = tweepy.API(auth)
return api

def get_database(): """Get or create a ZODB database where we store information about processed spreadsheets and sent tweets."""

storage = ZODB.FileStorage.FileStorage('chirper.data.fs')
db = ZODB.DB(storage)
connection = db.open()
root = connection.root

# Initialize root data structure if not present yet
with transaction.manager:
    if not hasattr(root, "files"):
        root.files = BTrees.OOBTree.BTree()
    if not hasattr(root, "twitter_handles"):
        # Format of {added: datetime, imported: datetime, sheet: str, first_tweet_at: datetime, second_tweet_at: datetime}
        root.twitter_handles = BTrees.OOBTree.BTree()


return root

def extract_twitter_handles(spread, sheet_id, column_id="L"): """Process one spreadsheet and return Twitter handles in it."""

twitter_url_prefix = ["https://twitter.com/", "http://twitter.com/"]

worksheet = spread.open_by_key(sheet_id).sheet1

col_index = ord(column_id) - ord("A") + 1

# Painfully slow, 2600 records = 3+ min.
start = time.time()
print("Fetching data from sheet {}".format(sheet_id))
twitter_urls =  worksheet.col_values(col_index)
print("Fetched everything in {} seconds".format(time.time() - start))

valid_handles = []

# Cell contents are URLs (possibly) pointing to a Twitter
# Extract the Twitter handle from these urls if they exist
for cell_content in twitter_urls:

    if not cell_content:
        continue

    # Twitter handle as it
    if "://" not in cell_content:
        valid_handles.append(cell_content.strip())
        continue

    # One cell can contain multiple URLs, comma separated
    urls = [url.strip() for url in cell_content.split(",")]

    for url in urls:
        for prefix in twitter_url_prefix:
            if url.startswith(prefix):
                handle = url[len(prefix):]

                # Clean old style fragment URLs e.g #!/foobar
                if handle.startswith("#!/"):
                    handle = handle[len("#!/"):]

                valid_handles.append(handle)

return valid_handles

def watch_files(http, title_match=None, folder_id=None) -> list: """Check all Google Drive files which match certain file pattern.

Drive API:

https://developers.google.com/drive/web/search-parameters

:return: Iterable GDrive file list
"""

service = discovery.build('drive', 'v2', http=http)

if folder_id:
    results = service.files().list(q="'{}' in parents".format(folder_id)).execute()
elif title_match:
    results = service.files().list(q="title contains '{}'".format(title_match)).execute()
else:
    raise RuntimeError("Unknown criteria")

return results["items"]

def scan_for_new_spreadsheets(http, db): """Check Google Drive for new spreadsheets.

    1. Use Google Drive API to list all files matching our spreadsheet criteria
    2. If the file is not seen before add it to our list of files to process
"""
# First discover new spreadsheets

discovered = False

for file in watch_files(http, folder_id="0BytechWnbrJVTlNqbGpWZllaYW8"):
    title = file["title"]
    last_char = title[-1]

    # It's .csv, photos, etc. misc files
    if not last_char.isdigit():
        continue

    with transaction.manager:
        file_id = file["id"]
        if file_id not in db.files:
            print("Discovered file {}: {}".format(file["title"], file_id))
            db.files[file_id] = PersistentMapping(file)
            discovered = True

if not discovered:
    print("No new spreadsheets available")

def extract_twitter_handles_from_spreadsheets(spread, db): """Extract new Twitter handles from spreadsheets.

    1. Go through all spreadsheets we know.
    2. If the spreadsheet is not marked as processed extract Twitter handles out of it
    3. If any of the Twitter handles is unseen before add it to the database with empty record

"""

# Then extract Twitter handles from the files we know about
for file_id, file_data in db.files.items():

    spreadsheet_creation_date = iso8601.parse_date(file_data["createdDate"])

    print("Processing {} created at {}".format(file_data["title"], spreadsheet_creation_date))

    # Check the processing flag on the file
    if not file_data.get("processed"):
        handles = extract_twitter_handles(spread, file_id)

        # Using this transaction lock we write all the handles to the database once or none of them
        with transaction.manager:
            for handle in handles:
                # If we have not seen this
                if handle not in db.twitter_handles:
                    print("Importing Twitter handle {}".format(handle))
                    db.twitter_handles[handle] = PersistentMapping({"added": spreadsheet_creation_date, "imported": datetime.datetime.utcnow(), "sheet": file_id})

            file_data["processed"] = True

def send_tweet(twitter, msg): """Send a Tweet. """

try:
    twitter.update_status(status=msg)
except tweepy.error.TweepError as e:
    try:
        # {"errors":[{"code":187,"message":"Status is a duplicate."}]}
        resp = json.loads(e.response.text)
        if resp.get("errors"):
            if resp["errors"][0]["code"] == 187:
                print("Was duplicate {}".format(msg))
                time.sleep(10 + random.randint(0, 10))
                return
    except:
        pass

    raise RuntimeError("Twitter doesn't like us: {}".format(e.response.text or str(e))) from e

# Throttle down the bot
time.sleep(30 + random.randint(0, 90))

def tweet_everything(twitter, db): """Run through all users and check if we need to Tweet to them. """

tweet_count = 0

for handle_id, handle_data in db.twitter_handles.items():

    with transaction.manager:

        # Check if we had not sent the first Tweet yet and send it
        if not handle_data.get("first_tweet_at"):

            tweet = "@{} {}".format(handle_id, random.choice(FIRST_TWEET_CHOICES))

            print("Tweeting {} at {}".format(tweet, datetime.datetime.utcnow()))
            send_tweet(twitter, tweet)
            handle_data["first_tweet_at"] = datetime.datetime.utcnow()
            tweet_count += 1

        # Check if we had not sent the first Tweet yet and send it
        elif not handle_data.get("second_tweet_at"):

            tweet = "@{} {}".format(handle_id, random.choice(SECOND_TWEET_CHOICES))

            print("Tweeting {} at {}".format(tweet, datetime.datetime.utcnow()))
            send_tweet(twitter, tweet)
            handle_data["second_tweet_at"] = datetime.datetime.utcnow()
            tweet_count += 1

    if tweet_count &gt;= MAX_TWEET_COUNT:
        # Testing limiter - don't spam too much if our test run is out of control
        break

def main():

script_name = sys.argv[1] if sys.argv[0] == "python" else sys.argv[0]
print("Starting {} at {} UTC".format(script_name, datetime.datetime.utcnow()))

# open database
db = get_database()

# get OAuth permissions from Google for Drive client and Spreadsheet client
credentials = get_google_credentials()
http = credentials.authorize(httplib2.Http())
spread = gspread.authorize(credentials)
twitter = get_tweepy()

# Do action
scan_for_new_spreadsheets(http, db)
extract_twitter_handles_from_spreadsheets(spread, db)
tweet_everything(twitter, db)

main()


相关文章