Python MongoDB(建议收藏)
💡一则或许对你有用的小广告
欢迎加入小哈的星球 ,你将获得:专属的项目实战 / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论
- 新项目:《从零手撸:仿小红书(微服务架构)》 正在持续爆肝中,基于
Spring Cloud Alibaba + Spring Boot 3.x + JDK 17...
,点击查看项目介绍 ;- 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/ ;
截止目前, 星球 内专栏累计输出 82w+ 字,讲解图 3441+ 张,还在持续爆肝中.. 后续还会上新更多项目,目标是将 Java 领域典型的项目都整一波,如秒杀系统, 在线商城, IM 即时通讯,权限管理,Spring Cloud Alibaba 微服务等等,已有 2900+ 小伙伴加入学习 ,欢迎点击围观
前言
在现代软件开发中,数据存储与处理是核心环节之一。Python 作为一门简洁高效的编程语言,与 MongoDB 这一灵活的文档型数据库结合,能够为开发者提供强大的开发工具链。无论是构建快速迭代的 Web 应用,还是处理非结构化数据,二者的优势互补都能显著提升开发效率。本文将从基础到进阶,系统讲解如何用 Python 操作 MongoDB,并通过实际案例帮助读者掌握关键技能。
环境准备:安装与连接
安装 MongoDB
MongoDB 是一个基于文档的 NoSQL 数据库,适合存储 JSON 类似的键值对数据。安装步骤如下:
- 下载与安装:访问 MongoDB 官网 下载对应操作系统的安装包。
- 启动服务:安装完成后,通过命令行启动 MongoDB 服务(例如:
mongod
)。
安装 Python 驱动
Python 与 MongoDB 的交互依赖于 pymongo
库。通过以下命令安装:
pip install pymongo
连接数据库
使用 pymongo.MongoClient
建立连接:
from pymongo import MongoClient
client = MongoClient("mongodb://localhost:27017/")
db = client["my_database"] # 选择或创建数据库
collection = db["users"] # 选择或创建集合(类似关系型数据库的表)
比喻:
可以把 MongoDB 想象成一个“数字图书馆”,每个数据库(my_database
)是图书馆中的一个分馆,集合(users
)是分馆内的一个书架,文档则是书架上的书籍。
基础操作:CRUD
创建数据(Create)
向集合中插入文档:
user_data = {
"name": "Alice",
"age": 25,
"email": "alice@example.com",
"hobbies": ["reading", "coding"]
}
result = collection.insert_one(user_data)
print(f"插入成功,ObjectId: {result.inserted_id}")
关键点:
insert_one()
返回包含新文档_id
的结果对象。- 若插入多个文档,可使用
insert_many()
方法。
读取数据(Read)
查询单条或全部数据:
first_user = collection.find_one({"name": "Alice"})
print(first_user)
for user in collection.find():
print(user["name"], user["age"])
注意事项:
find_one()
若未指定条件,返回第一条文档。find()
返回游标对象,需遍历或转换为列表。
更新数据(Update)
更新特定文档的字段:
update_result = collection.update_one(
{"age": 25}, # 查询条件
{"$set": {"age": 30}} # 更新操作
)
print(f"匹配到{update_result.matched_count}条,修改{update_result.modified_count}条")
操作符:
$set
:设置字段值。$inc
:递增数值(如{"$inc": {"age": 1}}"
)。
删除数据(Delete)
删除符合条件的文档:
delete_result = collection.delete_many({"age": {"$gt": 30}})
print(f"删除了{delete_result.deleted_count}条记录")
高级查询:灵活筛选与排序
条件查询与逻辑组合
使用操作符(如 $lt
, $gte
, $in
)和逻辑运算符($and
, $or
)构建复杂条件:
query = {
"$and": [
{"age": {"$gte": 20, "$lte": 30}},
{"email": {"$regex": "example.com"}}
]
}
results = collection.find(query)
分页与排序
通过 skip()
和 limit()
实现分页,sort()
控制排序方式:
page = 2
per_page = 10
cursor = collection.find().sort("age", -1).skip((page-1)*per_page).limit(per_page)
聚合管道:数据处理流水线
基本概念
聚合管道(Aggregation Pipeline)是 MongoDB 的核心功能之一,允许对数据进行多阶段处理,例如分组、计算、过滤等。
实例:统计用户年龄分布
pipeline = [
{"$match": {"age": {"$exists": True}}}, # 过滤有年龄字段的文档
{"$group": {
"_id": "$age", # 按年龄分组
"count": {"$sum": 1} # 统计每组数量
}},
{"$sort": {"count": -1}} # 按数量降序排列
]
result = collection.aggregate(pipeline)
for item in result:
print(f"年龄 {item['_id']} 出现 {item['count']} 次")
错误处理与事务
异常捕获
MongoDB 操作可能因网络中断或权限问题引发异常,需用 try-except
块处理:
try:
# 执行数据库操作
collection.insert_one({"name": "Bob"})
except pymongo.errors.DuplicateKeyError:
print("唯一键冲突,文档已存在!")
except Exception as e:
print(f"其他错误:{str(e)}")
事务支持(MongoDB 4.0+)
在支持事务的部署环境中,可以使用 start_session()
实现原子操作:
with client.start_session() as session:
session.start_transaction()
try:
collection.update_one({"name": "Alice"}, {"$set": {"status": "active"}}, session=session)
other_collection.insert_one({"log": "更新完成"}, session=session)
session.commit_transaction()
except Exception:
session.abort_transaction()
raise
实战案例:用户管理系统
需求描述
构建一个简单的用户管理系统,支持以下功能:
- 注册用户(插入数据);
- 查询用户列表并分页显示;
- 更新用户状态;
- 删除过期用户。
完整代码示例
from pymongo import MongoClient
def main():
client = MongoClient("mongodb://localhost:27017/")
db = client["user_management"]
users = db["users"]
# 注册新用户
def register_user(name, email):
user = {"name": name, "email": email, "status": "active"}
users.insert_one(user)
print(f"用户 {name} 注册成功!")
# 分页查询
def list_users(page=1, per_page=10):
skip_num = (page - 1) * per_page
cursor = users.find().sort("name", 1).skip(skip_num).limit(per_page)
for user in cursor:
print(user["name"], user["status"])
# 更新状态
def deactivate_user(email):
result = users.update_one(
{"email": email},
{"$set": {"status": "inactive"}}
)
if result.modified_count > 0:
print(f"用户 {email} 已停用")
# 删除过期用户
def delete_expired(days_ago=30):
# 假设过期条件为"last_login"字段小于当前时间
from datetime import datetime, timedelta
cutoff = datetime.now() - timedelta(days=days_ago)
result = users.delete_many({"last_login": {"$lt": cutoff}})
print(f"删除了 {result.deleted_count} 个过期用户")
# 示例调用
register_user("Charlie", "charlie@example.com")
list_users(1, 5)
deactivate_user("alice@example.com")
delete_expired(60)
if __name__ == "__main__":
main()
性能优化与最佳实践
索引优化
为常用查询字段创建索引:
users.create_index([("name", 1)], unique=True) # 可选:设置唯一索引
连接池与资源管理
使用 MongoClient
的连接池功能,并在代码结束时关闭连接:
client.close() # 显式关闭连接
避免常见陷阱
- 字段类型一致性:避免频繁修改字段类型(如从字符串改为数字)。
- 批量操作:批量插入或更新时,使用
insert_many()
和bulk_write()
提升效率。
结论
通过本文,读者已掌握从基础操作到高级聚合的 Python MongoDB 开发技能,并通过实际案例理解了如何在项目中应用这些技术。无论是小型工具还是复杂系统,MongoDB 的灵活性与 Python 的简洁性结合,都能为开发者提供高效、可扩展的解决方案。建议读者结合官方文档深入学习聚合框架和分片技术,进一步提升数据处理能力。
关键词布局检查:
- 标题与小标题:自然包含“Python MongoDB”
- 正文:通过代码示例、操作步骤等场景多次提及关键词
- 结尾:总结性强化关键词关联性