StarkerSong Get Busy Living

基于scrapy-redis分布式网络爬虫存储数据分析(二)


本系列文章主要在简书上发布,图片链接如果有问题可直接访问我的简书页面: 基于scrapy-redis分布式网络爬虫存储数据分析

文件目录结构

在Windows的命令窗口中输入tree /f dqd命令,出现以下文件目录结构:

C:\Python27\Scripts>tree /f  dqd
文件夹 PATH 列表
卷序列号为 A057-81B6
C:\PYTHON27\SCRIPTS\DQD
  docker-compose.yml
  Dockerfile
  mongodb2mysql.py
  process_items.py
  scrapy.cfg

├─.idea
      dqd.iml
      misc.xml
      modules.xml
      workspace.xml

├─dqd
    image_pipelines.py
    image_pipelines.pyc
    items.py
    mongo_pipelines.py
    mongo_pipelines.pyc
    mysql_pipelines.py
    mysql_pipelines.pyc
    redis_pipelines.py
    redis_pipelines.pyc
    settings.py
    settings.pyc
    __init__.py
    __init__.pyc
  
  └─spiders
          dqdspider.py
          dqdspider.pyc
          __init__.py
          __init__.pyc

└─Image
    └─full
          full.rar
        
        └─女球迷采访:由萌yolanda
                480-150605104925433.jpg
                480-150605104940P1.jpg
                480-15060510522UT.jpg
                480-150605105242F9.jpg
                480-15060510525X18.jpg
                480-150605105312V0.jpg

下载和存储管理

settings.py设置

BOT_NAME = 'dqd'

SPIDER_MODULES = ['dqd.spiders']
NEWSPIDER_MODULE = 'dqd.spiders'

USER_AGENT = 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; 360SE)'
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
SCHEDULER_PERSIST = True
SCHEDULER_QUEUE_CLASS = "scrapy_redis.queue.SpiderPriorityQueue"
#SCHEDULER_QUEUE_CLASS = "scrapy_redis.queue.SpiderQueue"
#SCHEDULER_QUEUE_CLASS = "scrapy_redis.queue.SpiderStack"

ITEM_PIPELINES = {
    # 'dqd.image_pipelines.DownloadImagesPipeline':1,   #下载图片
    'dqd.redis_pipelines.DqdPipeline': 200,
    'scrapy_redis.pipelines.RedisPipeline': 300,
    'dqd.mongo_pipelines.MongoDBPipeline':400,
    'dqd.mysql_pipelines.MySQLPipeline': 1
}
IMAGES_STORE='.\Image'

# redis 在process_items.py文件中进行设置

#################    MONGODB     #############################
MONGODB_SERVER='localhost'
MONGODB_PORT=27017
MONGODB_DB='dqd_db'
MONGODB_COLLECTION='dqd_collection'

####################    MYSQL      #############################
MYSQL_HOST = 'localhost'
MYSQL_DBNAME = 'dqd_database'
MYSQL_USER = 'root'
MYSQL_PASSWD = '******'

LOG_LEVEL = 'DEBUG'
DEPTH_LIMIT=1
# Introduce an artifical delay to make use of parallelism. to speed up the
# crawl.
DOWNLOAD_DELAY = 0.2

当Item在Spider中被收集之后,它将会被传递到Item Pipeline,一些组件会按照一定的顺序执行对Item的处理。

每个item pipeline组件(有时称之为“Item Pipeline”)是实现了简单方法的Python类。他们接收到Item并通过它执行一些行为,同时也决定此Item是否继续通过pipeline,或是被丢弃而不再进行处理。

  • 清理HTML数据
  • 验证爬取的数据(检查item包含某些字段)
  • 查重(并丢弃)
  • 将爬取结果保存到数据库中

image_pipelines.py

# -*- coding: utf-8 -*-
import scrapy
from scrapy.pipelines.images import ImagesPipeline
from scrapy.exceptions import DropItem
from scrapy import Request
import codecs

class DownloadImagesPipeline(ImagesPipeline):
    def get_media_requests(self,item,info): #下载图片
        for image_url in item['image_urls']:
            yield Request(image_url,meta={'item':item,'index':item['image_urls'].index(image_url)}) #添加meta是为了下面重命名文件名使用

    def file_path(self,request,response=None,info=None):
        item=request.meta['item'] #通过上面的meta传递过来item
        index=request.meta['index'] #通过上面的index传递过来列表中当前下载图片的下标

        #图片文件名 
        image_guid = request.url.split('/')[-1]
        #图片下载目录  
        filename = u'full/{0}/{1}'.format(item['news_title'], image_guid)
        return filename

以下图片为下载内容

redis_pipelines.py

# -*- coding: utf-8 -*-

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: http://doc.scrapy.org/en/latest/topics/item-pipeline.html
from datetime import datetime

class DqdPipeline(object):
    def process_item(self, item, spider):
        item["crawled"] = datetime.utcnow()
        item["spider"] = spider.name
        return item

在截图中,dqdspider中应该有3个队列,但是因为我已经下载完毕,所以dqdspider:request队列自动删除了。

  • dqdspider:request待爬队列
  • dqdspider:dupefilter用来过滤重复的请求
  • dqdspider:items爬取的信息内容

mongo_pipelines.py

# -*- coding:utf-8 -*-
import pymongo
from scrapy.exceptions import DropItem
from scrapy.conf import settings
# from scrapy import log


class MongoDBPipeline(object):
    #Connect to the MongoDB database
    def __init__(self):
        connection = pymongo.MongoClient(settings['MONGODB_SERVER'], settings['MONGODB_PORT'])
        db = connection[settings['MONGODB_DB']]
        self.collection = db[settings['MONGODB_COLLECTION']]



    def process_item(self, item, spider):
        valid=True
        for data in item:
            if not data:
                valid=False
                raise DropItem('Missing{0}!'.format(data))
        if valid:

            self.collection.insert(dict(item))
            log.msg('question added to mongodb database!',
                    level=log.DEBUG,spider=spider)
        return item

为了展示MongoDB中的数据内容使用了管理工具Robomongo查看爬取的内容。

mysql_pipelines.py

# -*- coding:utf-8 -*-
from scrapy.conf import settings
import MySQLdb

_DEBUG=True

class MySQLPipeline(object):
    #Connect to the MySQL database
    def __init__(self):
        self.conn =  MySQLdb.connect(
            user=settings['MYSQL_USER'],
            passwd=settings['MYSQL_PASSWD'],
            db=settings['MYSQL_DBNAME'],
            host=settings['MONGODB_SERVER'],
            charset='utf8',
            use_unicode = True
        )
        self.cursor=self.conn.cursor()
        #清空表:注意区分和delete的区别
        self.cursor.execute("truncate table news_main;") #清空表的信息
        self.cursor.execute("truncate table news_comment;") #清空表的信息
        self.conn.commit()

    def process_item(self, item, spider):
        try:
            self.insert_news(item)     #将文章信息插入到数据库中
            self.insert_comment(item,item["source_url"])     # 将评论信息信息插入到数据库中
            self.conn.commit()

        except MySQLdb.Error as e:
                print (("Error %d: %s") % (e.args[0],e.args[1]))
        return item

    #将文章信息插入到数据库中
    def insert_news(self,item):
        args = (item["source_url"], item["news_title"], item["news_author"],
                item["news_time"], item["news_content"],item["news_source"],
                item["news_allCommentAllCount"],  item["news_hotCommentHotCount"])

        newsSqlText = "insert into news_main(" \
                      "news_url,news_title,news_author,news_time,news_content,news_source," \
                      "news_commentAllCount,news_commentHotCount) " \
                      "values ('%s','%s','%s','%s','%s','%s','%s','%s')" % args
        self.cursor.execute(newsSqlText)
        self.conn.commit()

    # 将评论信息信息插入到数据库中
    def insert_comment(self, item,url):
        #因为评论是列表,以下为并列迭代
        for comment_content,comment_author,comment_time,comment_likeCount \
                in zip(item["news_hotCommentContent"],item["news_hotCommentAuthor"],
                     item["news_hotCommentTime"],item["news_hotCommentLikeCount"]):
            newsSqlText = "insert into news_comment(comment_content,comment_author,comment_time,comment_likeCount,source_url) " \
                          "values (\"%s\",'%s','%s','%s','%s')" % (comment_content,comment_author,comment_time,comment_likeCount[2:-1],url)
            # #加入调试代码 监视newsSqlText的取值
            # if _DEBUG == True:
            #     import pdb
            #     pdb.set_trace()
            # self.cursor.execute(newsSqlText.encode().decode("unicode-escape").replace('\\','').replace('\']','').replace('[u\'','').strip())
            self.cursor.execute(newsSqlText.encode().decode("unicode-escape").replace('\\',''))
            self.conn.commit()

在这里将爬取的信息进行清洗和转储。

总结

本部分主要介绍了数据的存储,主要为图片以及文章内容部分的存储,涉及到的知识点包括以下部分:

  • mysql建表、数据查询、数据操纵、索引优化
  • redis存储队列
  • mongodb存储
  • tree命令

Comments