春运又到了,献上更新版抓黄牛脚本。

好不容易搞定了火车票(当然不是通过酷讯或者黄牛),把去年写过的抓黄牛脚本重写了一下,提供给各位还在等待购买火车票的 Programmer 使用。说是抓黄牛,自然还包括普通转票者。原理还是通过轮询酷讯网站上的内容,但是增加了几个新特性:

  • 用 re 提供的正则表达式替换掉了 SGMLParser 提高效率
  • 可以轮询多个地址了,比如我到吉安和井冈山都可以,所以我要遍历两个地址
  • 可以将转向链接直接打印在屏幕上了
  • 提供了 Python 3 的 Package 级支持,但是因为 re 模块变更,正则表达式在 Python 3 里无法运行,暂时没心思更新了。

尽管酷讯推出了秒杀器,不过还是觉得不妥,一是没任何输出,谁知道它是否真的能秒到,二是不跨平台,在 Mac 和 Linux 上暂时无法使用。

Patches are welcome. :-)

#!/usr/bin/python
# encoding: utf-8
#
# Catch the yellow cattles script
#
# Author: Xuqing Kuang <xuqingkuang@gmail.com>
# New features:
# * Use regexp to instead of SGMLParser for performance
# * Polling multiple URL at one time.
# * Print out the redirect URL.
# * Basic packages compatible with Python 3
# TODO:
# * Use one regexp to split the href and text of link
# * Update re package usage to compatible with Python 3

import time
import os
import re

try:
    import urllib2 as urllib
except ImportError: # Python 3 compatible
    import urllib.request, urllib.error

urls = (
    "http://piao.kuxun.cn/beijing-jinggangshan/",
    "http://piao.kuxun.cn/beijing-jian/",

)

keyword = '3张'
sequence = 60

class TrainTicket(object):
    """
    Catch the yellow cattle
    """
    def __init__(self, urls, keyword, sequence = 60):
        self.urls = urls
        self.keyword = keyword
        self.sequence = sequence
        self.cache=[]
        self.html = ''
        self.links = []
        if hasattr(urllib, 'build_opener'):
            self.opener = urllib.build_opener()
        else: # Python 3 compatible
            self.opener = urllib.request.build_opener()
        self.result = []
        self.re_links = re.compile('<a.*?href=.*?<\/a>', re.I)
        # self.re_element = re.compile('', re.I) # Hardcode at following
        self.requests = []
        for url in urls:
            if hasattr(urllib, 'Request'):
                request = urllib.Request(url)
            else: # Python 3 compatible
                request = urllib.request.Request(url)

            request.add_header('User-Agent', 'Mozilla/5.0')
            self.requests.append(request)

    def get_page(self, request):
        """
        Open the page.
        """
        try:
            self.html = self.opener.open(request).read()
        except urllib.HTTPError:
            return False
        return self.html

    def get_links(self, html = ''):
        """
        Process the page, get all of links
        """
        if not html:
            html = self.html
        self.links = self.re_links.findall(html)
        return self.links

    def get_element(self, link = ''):
        """
        Process the link generated by self.get_links().
        Return list of the href and text
        """
        # FIXME: have no idea how to split the href and text with one regex
        # So use two regex for temporary solution
        href = re.findall('(?<=href=").*?(?=")', link) # Get the href attribute
        if not href:                                   # Process the no href attr
            href = ['']
        text = re.split('(<.*?>)', link)[2]            # Get the text of link a.
        href.append(text)                              # Append to the list.
        return href

    def get_ticket(self, request = None):
        """
        Generate the data structure of tickets for each URL.
        """
        if not request:
            request = self.requests[0]

        self.get_page(request)
        self.get_links()

        i = 0
        while i < len(self.links):
            link = self.get_element(self.links[i])
            if not link:
                continue
            url = link[0]
            name = link[1]
            if name and name.find(keyword) >= 0 and url not in self.cache:
                self.result.append((
                    i, name, url,
                ))
                self.cache.append(url)
            i += 1
        return self.result

    def print_tickets(self):
        """
        Process all of URLS and print out the tickets information.
        """
        while 1:
            self.result = []
            try:
                print('Begin retrive')
                for request in self.requests:
                    print('Begin scan %s' % request.get_full_url())
                    self.get_ticket(request)
                    print('Found %s urls.' % len(self.links))
                    for r in self.result:
                        print('Index: %s\nName: %s\nURL: %s\n' % (
                            r[0], r[1], r[2]
                        ))
                print('Scan finished, begin sleep %s seconds' % self.sequence)
                time.sleep(self.sequence)
            except KeyboardInterrupt:
                exit()
            except:
                raise

if __name__ == '__main__':
    tt = TrainTicket(urls, keyword, sequence)
    tt.print_tickets()

 

Posted by K*K Tue, 18 Jan 2011 22:54:18 +0800


写了个监视酷讯火车票的 Python 程序

受不了了,买火车票买不到,只好盯上黄牛票了,可是没法不停地刷页面啊,刚刚就错过了一个发布了 20 分钟的黄牛票,打电话回去时已经打不通了。。。-_-#

就写了个程序来解决这个问题,粘了一堆代码(参考太多,头一次写这种东西,原作者勿怪),总算成了,可能有 bug,欢迎提交 patch 或者更好的解决办法。

可以通过修改下面的参数来修改程序执行: 

url = "http://piao.kuxun.cn/beijing-jinggangshan/" # 把火车票的搜索地址粘在这里,这里假设是北京到井冈山的
key = "2张"                                        # 搜索关键字,我得俩人啊。。。 
sequence = 60#60 * 5                               # 搜索间隔,给服务器压力别太大,每分钟一次就行了。

 

#!/usr/bin/python
# encoding: utf-8

import urllib2
import mailbox
import time
import os
import re
from sgmllib import SGMLParser

class URLListName(SGMLParser):
    is_a=""
    name=[]
    def start_a(self, attrs):
        self.is_a=1
    def end_a(self):
        self.is_a=""
    def handle_data(self, text):
        if self.is_a:
            self.name.append(text)

url = "http://piao.kuxun.cn/beijing-jinggangshan/"
key = "2张"
sequence = 60#60 * 5

request = urllib2.Request(url)
request.add_header('User-Agent', 'Mozilla/5.0')
opener = urllib2.build_opener()
data = opener.open(request).read()

ticket_name = URLListName()
ticket_name.feed(data)
cache=[]

while 1:
    try:
        print "beign retrive"
        data = opener.open(request).read()
        ticket_name.feed(data)
        print "beign scan"
        
        for result in ticket_name.name:
            if result and result.find(key) >= 0:
                if result in cache:
                    pass
                else:
                    print "found:" + result
                    cache.append(result)
        
        print "scan finished, begin sleep " + str(sequence) + " seconds."
        time.sleep(sequence)
    except:
        raise

Posted by K*K Fri, 29 Jan 2010 23:49:28 +0800