啥是P站??

萌咖大佬写的,明晚考完试后学习一下里面的多线程和命令行

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Author:  MoeClub.org

from urllib import request
import threading
import queue
import re
import os


class pixiv:
    def __init__(self):
        self.folder = 'PixivImage'
        self.web_coding = 'utf-8'
        self.root = os.path.dirname(os.path.abspath(__file__))
        self.DefaultHeader = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:52.0) Gecko/20100101 Firefox/52.0",
            "Accept": "*/*",
            "Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3",
            "Accept-Encoding": "",
            "Connection": "keep-alive",
        }
        self.data_low = []
        self.num = 0

    def _http(self, url, headers, Obj=False):
        res = request.urlopen(request.Request(url, headers=headers, method='GET'))
        if Obj:
            return res
        else:
            return res.read().decode(self.web_coding, "ignore")

    def data_image(self, url_id):
        _header = self.DefaultHeader.copy()
        _header["Referer"] = "https://www.pixiv.net/member_illust.php?mode=medium&illust_id={}".format(url_id)
        _url_data = "https://www.pixiv.net/touch/ajax/illust/details?illust_id={}".format(url_id)
        _data_details = self._http(_url_data, _header)
        data_url = self.sort_data(re.findall('"url_big":"[^"]*"', _data_details))
        data_uid = str(str(str(re.findall('"user_id":"[^"]*"', _data_details)[0]).split(':', 1)[-1]).strip('"'))
        return data_url, _header, data_uid

    def sort_data(self, data):
        _data = []
        for item in data:
            if item not in _data:
                _data.append(item)
        return [str(str(item).replace('\\', '').split(':', 1)[-1]).strip('"') for item in _data]

    def get_item(self, UserID=None):
        if not UserID:
            UserID = 'https://www.pixiv.net/ranking.php?mode=male'
        if '://' in str(UserID):
            Mode_ID = False
        else:
            Mode_ID = True
        if Mode_ID:
            _url = "https://www.pixiv.net/ajax/user/{}/profile/all".format(str(UserID))
            page = self._http(_url, self.DefaultHeader, True)
            if page.code != 200:
                raise Exception("Pixiv Page:", page.code)
            _data = re.findall('"[0-9]+":null', page.read().decode(self.web_coding, "ignore"))
            self.data_low = [str(str(item).split(":")[0]).strip('"') for item in _data if ':null' in str(item)]
        else:
            page = self._http(UserID, self.DefaultHeader, True)
            if page.code != 200:
                raise Exception("Pixiv Page:", page.code)
            _data = re.findall('data-src="[^"]*"', page.read().decode(self.web_coding, "ignore"))
            self.data_low = [str(str(str(str(str(item).split("=", 1)[-1]).strip('"')).rsplit('/', 1)[-1]).split('_')[0]) for item in _data if '/img-master/img/' in str(item)]
        self.fliter_item()

    def fliter_item(self):
        folder = os.path.join(self.root, self.folder)
        if not os.path.exists(folder):
            return None
        _split = "_"
        _exist = {}.fromkeys([str(str(item).split(_split)[1]) for item in os.listdir(folder) if _split in item]).keys()
        print("Exist Item:", len(_exist))
        for _item in self.data_low.copy():
            if _item in _exist:
                self.data_low.remove(_item)

    def get_data_by_item(self, item):
        data = self.data_image(item)
        for data_url in data[0]:
            image = self._http(data_url, data[1], True)
            if image.code != 200:
                raise Exception("Pixiv Image: [{} | {}]".format(image.code, data[0]))
            self.write(str("{}_{}").format(str(data[2]), str(str(data_url).rsplit('/', 1)[-1])), image.read())

    def get_data(self, data_list=None):
        if not data_list:
            data_list = self.data_low
        for item in data_list:
            self.get_data_by_item(item)
        print("\nTotal Image: ", self.num)

    def write(self, name, data):
        folder = os.path.join(self.root, self.folder)
        if not os.path.exists(folder):
            os.mkdir(folder)
        file = os.path.join(folder, str(name))
        fp = open(file, 'wb')
        fp.write(data)
        fp.close()
        self.num += 1
        print("Pixiv Image: [ OK | {} ]".format(file))

    def add_queue(self, _queue, data_list=None):
        for item in data_list:
            _item = str(item).strip()
            if item and _item:
                _queue.put(_item)

    def multi_data(self, data_list=None, max=25):
        if not data_list:
            data_list = self.data_low
        print("New Item:", len(data_list))
        _threads = []
        _queue = queue.Queue(maxsize=max)
        task_main = threading.Thread(target=self.add_queue, args=(_queue, data_list))
        task_main.setName("TaskMain")
        task_main.setDaemon(True)
        task_main.start()
        while _queue.qsize() > 0:
            if len(_threads) >= max:
                for _item in _threads.copy():
                    if not _item.is_alive():
                        _threads.remove(_item)
                continue
            item = _queue.get()
            task = threading.Thread(target=self.get_data_by_item, args=(item,))
            task.setDaemon(True)
            task.start()
            _threads.append(task)
        for _task in _threads:
            _task.join()
        print("\nTotal Image: ", self.num)


if __name__ == '__main__':
    try:
        task = os.sys.argv[1]
    except:
        task = None
    p = pixiv()
    p.get_item(task)
    p.multi_data(max=25)
Last modification:September 9th, 2020 at 03:26 pm