首页 > Python资料 博客日记
Python 爬虫爬取京东商品信息
2024-09-16 02:00:08Python资料围观65次
Python 爬虫爬取京东商品信息 下面我将逐一解释每一部分的代码
导入库
from selenium import webdriver
from selenium.webdriver.edge.service import Service
from selenium.webdriver.edge.options import Options
import time
import random
import csv
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from pyquery import PyQuery as pq
import os
import requests
from datetime import datetime
import re
-
作用:导入所有必要的库和模块。
- 技术栈:
-
selenium
: 用于自动化浏览器操作。 -
time
,random
: 用于添加延迟和生成随机数。 -
csv
: 用于读写CSV文件。 -
os
: 用于文件系统操作。 -
requests
: 用于发送HTTP请求(例如下载图片)。 -
datetime
: 用于处理日期和时间。 -
re
: 用于正则表达式匹配。
-
配置变量
KEYWORD = '美妆'
CSV_FILE = 'DepartmentStore.csv'
id = 3538
dratenums = 5
drate = 5
num = 5
-
作用:初始化全局变量。
-
技术栈:无特殊技术栈。
设置WebDriver
# 创建EdgeOptions对象
options = Options()
# 设置Edge WebDriver的路径
webdriver_path = 'G:\\大三下暑期实训基于电商商品推荐平台\\爬虫\\a\\msedgedriver.exe'
service = Service(webdriver_path)
# 初始化Edge WebDriver
driver = webdriver.Edge(service=service, options=options)
# 窗口最大化
driver.maximize_window()
wait = WebDriverWait(driver, 15)
-
作用:初始化Microsoft Edge WebDriver。
- 技术栈:
-
selenium
: 用于自动化浏览器操作。
-
函数定义
search_goods
函数
def search_goods(start_page, total_pages):
# ...
-
作用:搜索特定商品并处理翻页逻辑。
- 技术栈:
-
selenium
: 用于自动化浏览器操作。 -
pyquery
: 用于解析HTML文档。 -
time
: 用于添加延迟。 -
random
: 用于生成随机数。
-
page_turning
函数
def page_turning(page_number):
# ...
-
作用:实现翻页操作。
-
技术栈:
selenium
: 用于自动化浏览器操作。
random_sleep
函数
def random_sleep(timeS, timeE):
# ...
-
作用:随机等待一段时间,防止被网站检测到爬虫行为。
-
技术栈:
time
,random
: 用于添加延迟和生成随机数。
scroll_smoothly
函数
def scroll_smoothly(duration):
# ...
-
作用:平滑滚动页面以模拟真实用户行为。
-
技术栈:
selenium
: 用于自动化浏览器操作。
get_goods
函数
def get_goods():
# ...
-
作用:获取商品列表中的具体信息。
- 技术栈:
-
selenium
: 用于自动化浏览器操作。 -
pyquery
: 用于解析HTML文档。 -
os
: 用于文件系统操作。 -
requests
: 用于发送HTTP请求(例如下载图片)。 -
datetime
: 用于处理日期和时间。 -
re
: 用于正则表达式匹配。
-
save_to_csv
函数
def save_to_csv(result):
# ...
-
作用:将获取到的商品信息保存到CSV文件中。
-
技术栈:
csv
: 用于读写CSV文件。
main
函数
def main():
# ...
-
作用:主函数,用于启动爬虫并处理异常。
-
技术栈:无特殊技术栈。
主程序入口
if __name__ == '__main__':
main()
-
作用:程序的入口点。
-
技术栈:无特殊技术栈。
接下来,我将针对每个函数进行详细的代码解释。
search_goods
函数
def search_goods(start_page, total_pages):
print('正在搜索: ')
try:
driver.get('https://www.taobao.com')
time.sleep(10)
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument",
{"source": """Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"""})
input = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "#q")))
submit = wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, '#J_TSearchForm > div.search-button > button')))
input.send_keys(KEYWORD)
submit.click()
time.sleep(10)
if start_page != 1:
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
random_sleep(1, 3)
pageInput = wait.until(EC.presence_of_element_located(
(By.XPATH, '//*[@id="root"]/div/div[3]/div[1]/div[1]/div[2]/div[4]/div/div/span[3]/input')))
pageInput.send_keys(start_page)
admit = wait.until(EC.element_to_be_clickable(
(By.XPATH, '//*[@id="root"]/div/div[3]/div[1]/div[1]/div[2]/div[4]/div/div/button[3]')))
admit.click()
get_goods()
for i in range(start_page + 1, start_page + total_pages):
page_turning(i)
except TimeoutException:
print("search_goods: error")
return search_goods(start_page, total_pages)
-
作用:搜索特定商品并处理翻页逻辑。
- 细节:
-
打开淘宝网站并等待页面加载。
-
使用CDP命令禁用WebDriver标志,以防被网站检测。
-
在搜索框中输入关键词并提交。
-
如果不是第一页,则跳转到指定的起始页。
-
调用
get_goods
函数获取商品信息。 -
循环翻页并获取商品信息。
-
page_turning
函数
def page_turning(page_number):
print('正在翻页: ', page_number)
try:
submit = wait.until(
EC.element_to_be_clickable((By.XPATH, '//*[@id="sortBarWrap"]/div[1]/div[2]/div[2]/div[8]/div/button[2]')))
submit.click()
wait.until(EC.text_to_be_present_in_element(
(By.XPATH, '//*[@id="sortBarWrap"]/div[1]/div[2]/div[2]/div[8]/div/span/em'), str(page_number)))
get_goods()
except TimeoutException:
page_turning(page_number)
-
作用:实现翻页操作。
- 细节:
-
点击下一页按钮。
-
等待新的页面加载完成。
-
调用
get_goods
函数获取商品信息。
-
random_sleep
函数
def random_sleep(timeS, timeE):
random_sleep_time = random.uniform(timeS, timeE)
time.sleep(random_sleep_time)
-
作用:随机等待一段时间,防止被网站检测到爬虫行为。
- 细节:
-
生成一个在
timeS
到timeE
之间的随机数作为睡眠时间。 -
使用
time.sleep
暂停指定的时间。
-
scroll_smoothly
函数
def scroll_smoothly(duration):
total_height = driver.execute_script("return document.body.scrollHeight")
viewport_height = driver.execute_script("return window.innerHeight")
steps = 30 # 分成30步
step_duration = duration / steps # 每步的时间
step_height = (total_height - viewport_height) / steps # 每步的高度
for i in range(steps):
driver.execute_script(f"window.scrollBy(0, {step_height});")
time.sleep(step_duration)
-
作用:平滑滚动页面以模拟真实用户行为。
- 细节:
-
计算页面的总高度和视窗的高度。
-
计算每次滚动的距离和时间。
-
使用JavaScript执行平滑滚动效果。
-
get_goods
函数
def get_goods():
global id, drate, dratenums, num # 声明使用全局变量 id
random_sleep(2, 4)
# 滑动浏览器滚轮,向下滑动
scroll_smoothly(4)
random_sleep(4, 4) # 等待页面加载
html = driver.page_source
doc = pq(html)
items = doc(
'div.PageContent--contentWrap--mep7AEm > div.LeftLay--leftWrap--xBQipVc > div.LeftLay--leftContent--AMmPNfB > div.Content--content--sgSCZ12 > div > div').items()
for item in items:
title = item.find('.Title--title--jCOPvpf span').text()
price_int = item.find('.Price--priceInt--ZlsSi_M').text()
price_float = item.find('.Price--priceFloat--h2RR0RK').text()
if price_int and price_float:
price = float(f"{price_int}{price_float}")
else:
price = 0.0
deal = item.find('.Price--realSales--FhTZc7U').text()
location = item.find('.Price--procity--_7Vt3mX').text()
shop = item.find('.ShopInfo--TextAndPic--yH0AZfx a').text()
postText = item.find('.SalesPoint--subIconWrapper--s6vanNY span').text()
result = 1 if "包邮" in postText else 0
image = item.find('.MainPic--mainPicWrapper--iv9Yv90 img').attr('src')
# 图片的URL
image_url = image
pattern = r"https:\/\/.*?\.jpg"
match = re.search(pattern, image_url)
if match:
print("Extracted URL:", match.group(0))
else:
print("No match found")
image_url = match.group(0)
# 保存图片的文件夹
save_folder = "shopping_cover"
# 确保保存路径存在
if not os.path.exists(save_folder):
os.makedirs(save_folder)
# 将图片文件名固定为 '1.jpg'
image_name = str(id) + ".jpg"
# 完整的保存路径
save_path = os.path.join(save_folder, image_name)
# 获取当前日期
current_date = datetime.now()
# 格式化日期为 'yyyy-mm-dd'
formatted_date = current_date.strftime('%Y-%m-%d')
imdblink = item.find('.Card--doubleCardWrapper--L2XFE73 a').attr('href')
# 下载图片并保存
try:
with requests.get(image_url, stream=True) as response:
response.raise_for_status()
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
print(f"图片已保存到: {save_path}")
product = {
'id': id,
'name': title,
'director': shop,
'country': location,
'years': formatted_date,
'leader': price,
'd_rate_nums': dratenums,
'd_rate': drate,
'intro': deal + str(result),
'num': num,
'orign_image_link': image,
'image_link': save_folder + '/' + image_name,
'imdb_link': imdblink
}
save_to_csv(product)
id += 1
except requests.RequestException as e:
print(f"下载图片失败:{e}")
# 直接滑动浏览器滚轮到最顶部
driver.execute_script("window.scrollTo(0, 0);")
-
作用:获取商品列表中的具体信息。
- 细节:
-
使用
pyquery
解析HTML源码。 -
提取每个商品的信息。
-
下载并保存商品图片。
-
调用
save_to_csv
保存数据。
-
save_to_csv
函数
def save_to_csv(result):
try:
with open(CSV_FILE, mode='a', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow([result['id'], result['name'], result['director'], result['country'], result['years'], result['leader'], result['d_rate_nums'], result['d_rate'], result['intro'], result['num'], result['orign_image_link'], result['image_link'], result['imdb_link']])
print('存储到CSV成功: ', result)
except Exception as e:
print('存储到CSV出错: ', result, e)
-
作用:将获取到的商品信息保存到CSV文件中。
- 细节:
-
将商品信息追加到CSV文件中。
-
输出保存成功的消息或错误消息。
-
main
函数
def main():
try:
# 检查文件是否存在,如果不存在则创建并写入标题行
if not os.path.exists(CSV_FILE):
with open(CSV_FILE, mode='w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow(['id', 'name', 'director', 'country', 'years', 'leader', 'd_rate_nums', 'd_rate', 'intro', 'num', 'orign_image_link', 'image_link', 'imdb_link'])
pageStart = int(input("输入您想开始爬取的页面数: "))
pageAll = int(input("输入您想爬取的总页面数: "))
search_goods(pageStart, pageAll)
except Exception as e:
print('main函数报错: ', e)
-
作用:主函数,用于启动爬虫并处理异常。
- 细节:
-
检查CSV文件是否存在,若不存在则创建并写入表头。
-
获取用户输入的起始页和总页数。
-
调用
search_goods
函数开始爬取。
-
主程序入口
if __name__ == '__main__':
main()
-
作用:程序的入口点。
-
细节:当直接运行此文件时,执行
main
函数。
以下是完整代码
from selenium import webdriver
from selenium.webdriver.edge.service import Service
from selenium.webdriver.edge.options import Options
import time
import random
import csv
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from pyquery import PyQuery as pq
import os
import requests
from datetime import datetime
import re
KEYWORD = '美妆'
CSV_FILE = 'DepartmentStore.csv'
id = 3538
dratenums = 5
drate = 5
num = 5
# 创建EdgeOptions对象
options = Options()
# 设置Edge WebDriver的路径
webdriver_path = 'G:\\大三下暑期实训基于电商商品推荐平台\\爬虫\\a\\msedgedriver.exe'
service = Service(webdriver_path)
# 初始化Edge WebDriver
driver = webdriver.Edge(service=service, options=options)
# 窗口最大化
driver.maximize_window()
wait = WebDriverWait(driver, 15)
def search_goods(start_page, total_pages):
print('正在搜索: ')
try:
driver.get('https://www.taobao.com')
time.sleep(10)
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument",
{"source": """Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"""})
input = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "#q")))
submit = wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, '#J_TSearchForm > div.search-button > button')))
input.send_keys(KEYWORD)
submit.click()
time.sleep(10)
if start_page != 1:
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
random_sleep(1, 3)
pageInput = wait.until(EC.presence_of_element_located(
(By.XPATH, '//*[@id="root"]/div/div[3]/div[1]/div[1]/div[2]/div[4]/div/div/span[3]/input')))
pageInput.send_keys(start_page)
admit = wait.until(EC.element_to_be_clickable(
(By.XPATH, '//*[@id="root"]/div/div[3]/div[1]/div[1]/div[2]/div[4]/div/div/button[3]')))
admit.click()
get_goods()
for i in range(start_page + 1, start_page + total_pages):
page_turning(i)
except TimeoutException:
print("search_goods: error")
return search_goods(start_page, total_pages)
def page_turning(page_number):
print('正在翻页: ', page_number)
try:
submit = wait.until(
EC.element_to_be_clickable((By.XPATH, '//*[@id="sortBarWrap"]/div[1]/div[2]/div[2]/div[8]/div/button[2]')))
submit.click()
wait.until(EC.text_to_be_present_in_element(
(By.XPATH, '//*[@id="sortBarWrap"]/div[1]/div[2]/div[2]/div[8]/div/span/em'), str(page_number)))
get_goods()
except TimeoutException:
page_turning(page_number)
def random_sleep(timeS, timeE):
random_sleep_time = random.uniform(timeS, timeE)
time.sleep(random_sleep_time)
def scroll_smoothly(duration):
total_height = driver.execute_script("return document.body.scrollHeight")
viewport_height = driver.execute_script("return window.innerHeight")
steps = 30 # 分成30步
step_duration = duration / steps # 每步的时间
step_height = (total_height - viewport_height) / steps # 每步的高度
for i in range(steps):
driver.execute_script(f"window.scrollBy(0, {step_height});")
time.sleep(step_duration)
def get_goods():
global id, drate, dratenums, num # 声明使用全局变量 id
random_sleep(2, 4)
# 滑动浏览器滚轮,向下滑动
scroll_smoothly(4)
random_sleep(4, 4) # 等待页面加载
html = driver.page_source
doc = pq(html)
items = doc(
'div.PageContent--contentWrap--mep7AEm > div.LeftLay--leftWrap--xBQipVc > div.LeftLay--leftContent--AMmPNfB > div.Content--content--sgSCZ12 > div > div').items()
for item in items:
title = item.find('.Title--title--jCOPvpf span').text()
price_int = item.find('.Price--priceInt--ZlsSi_M').text()
price_float = item.find('.Price--priceFloat--h2RR0RK').text()
if price_int and price_float:
price = float(f"{price_int}{price_float}")
else:
price = 0.0
deal = item.find('.Price--realSales--FhTZc7U').text()
location = item.find('.Price--procity--_7Vt3mX').text()
shop = item.find('.ShopInfo--TextAndPic--yH0AZfx a').text()
postText = item.find('.SalesPoint--subIconWrapper--s6vanNY span').text()
result = 1 if "包邮" in postText else 0
image = item.find('.MainPic--mainPicWrapper--iv9Yv90 img').attr('src')
# 图片的URL
image_url = image
pattern = r"https:\/\/.*?\.jpg"
match = re.search(pattern, image_url)
if match:
print("Extracted URL:", match.group(0))
else:
print("No match found")
image_url = match.group(0)
# 保存图片的文件夹
save_folder = "shopping_cover"
# 确保保存路径存在
if not os.path.exists(save_folder):
os.makedirs(save_folder)
# 将图片文件名固定为 '1.jpg'
image_name = str(id) + ".jpg"
# 完整的保存路径
save_path = os.path.join(save_folder, image_name)
# 获取当前日期
current_date = datetime.now()
# 格式化日期为 'yyyy-mm-dd'
formatted_date = current_date.strftime('%Y-%m-%d')
imdblink = item.find('.Card--doubleCardWrapper--L2XFE73 a').attr('href')
# 下载图片并保存
try:
with requests.get(image_url, stream=True) as response:
response.raise_for_status()
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
print(f"图片已保存到: {save_path}")
product = {
'id': id,
'name': title,
'director': shop,
'country': location,
'years': formatted_date,
'leader': price,
'd_rate_nums': dratenums,
'd_rate': drate,
'intro': deal + str(result),
'num': num,
'orign_image_link': image,
'image_link': save_folder + '/' + image_name,
'imdb_link': imdblink
}
save_to_csv(product)
id += 1
except requests.RequestException as e:
print(f"下载图片失败:{e}")
# 直接滑动浏览器滚轮到最顶部
driver.execute_script("window.scrollTo(0, 0);")
def save_to_csv(result):
try:
with open(CSV_FILE, mode='a', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow([result['id'], result['name'], result['director'], result['country'], result['years'], result['leader'], result['d_rate_nums'], result['d_rate'], result['intro'], result['num'], result['orign_image_link'], result['image_link'], result['imdb_link']])
print('存储到CSV成功: ', result)
except Exception as e:
print('存储到CSV出错: ', result, e)
def main():
try:
# 检查文件是否存在,如果不存在则创建并写入标题行
if not os.path.exists(CSV_FILE):
with open(CSV_FILE, mode='w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow(['id', 'name', 'director', 'country', 'years', 'leader', 'd_rate_nums', 'd_rate', 'intro', 'num', 'orign_image_link', 'image_link', 'imdb_link'])
pageStart = int(input("输入您想开始爬取的页面数: "))
pageAll = int(input("输入您想爬取的总页面数: "))
search_goods(pageStart, pageAll)
except Exception as e:
print('main函数报错: ', e)
if __name__ == '__main__':
main()
最后,如果觉得本项目对你有帮助的话,求求点个star🤩吧,谢谢了~
标签:
相关文章
最新发布
- 光流法结合深度学习神经网络的原理及应用(完整代码都有Python opencv)
- Python 图像处理进阶:特征提取与图像分类
- 大数据可视化分析-基于python的电影数据分析及可视化系统_9532dr50
- 【Python】入门(运算、输出、数据类型)
- 【Python】第一弹---解锁编程新世界:深入理解计算机基础与Python入门指南
- 华为OD机试E卷 --第k个排列 --24年OD统一考试(Java & JS & Python & C & C++)
- Python已安装包在import时报错未找到的解决方法
- 【Python】自动化神器PyAutoGUI —告别手动操作,一键模拟鼠标键盘,玩转微信及各种软件自动化
- Pycharm连接SQL Sever(详细教程)
- Python编程练习题及解析(49题)
点击排行
- 版本匹配指南:Numpy版本和Python版本的对应关系
- 版本匹配指南:PyTorch版本、torchvision 版本和Python版本的对应关系
- Python 可视化 web 神器:streamlit、Gradio、dash、nicegui;低代码 Python Web 框架:PyWebIO
- 相关性分析——Pearson相关系数+热力图(附data和Python完整代码)
- Anaconda版本和Python版本对应关系(持续更新...)
- Python与PyTorch的版本对应
- Windows上安装 Python 环境并配置环境变量 (超详细教程)
- Python pyinstaller打包exe最完整教程