春运又到了,献上更新版抓黄牛脚本。
好不容易搞定了火车票(当然不是通过酷讯或者黄牛),把去年写过的抓黄牛脚本重写了一下,提供给各位还在等待购买火车票的 Programmer 使用。说是抓黄牛,自然还包括普通转票者。原理还是通过轮询酷讯网站上的内容,但是增加了几个新特性:
- 用 re 提供的正则表达式替换掉了 SGMLParser 提高效率
- 可以轮询多个地址了,比如我到吉安和井冈山都可以,所以我要遍历两个地址
- 可以将转向链接直接打印在屏幕上了
- 提供了 Python 3 的 Package 级支持,但是因为 re 模块变更,正则表达式在 Python 3 里无法运行,暂时没心思更新了。
尽管酷讯推出了秒杀器,不过还是觉得不妥,一是没任何输出,谁知道它是否真的能秒到,二是不跨平台,在 Mac 和 Linux 上暂时无法使用。
Patches are welcome. :-)
#!/usr/bin/python
# encoding: utf-8
#
# Catch the yellow cattles script
#
# Author: Xuqing Kuang <xuqingkuang@gmail.com>
# New features:
# * Use regexp to instead of SGMLParser for performance
# * Polling multiple URL at one time.
# * Print out the redirect URL.
# * Basic packages compatible with Python 3
# TODO:
# * Use one regexp to split the href and text of link
# * Update re package usage to compatible with Python 3
import time
import os
import re
try:
import urllib2 as urllib
except ImportError: # Python 3 compatible
import urllib.request, urllib.error
urls = (
"http://piao.kuxun.cn/beijing-jinggangshan/",
"http://piao.kuxun.cn/beijing-jian/",
)
keyword = '3张'
sequence = 60
class TrainTicket(object):
"""
Catch the yellow cattle
"""
def __init__(self, urls, keyword, sequence = 60):
self.urls = urls
self.keyword = keyword
self.sequence = sequence
self.cache=[]
self.html = ''
self.links = []
if hasattr(urllib, 'build_opener'):
self.opener = urllib.build_opener()
else: # Python 3 compatible
self.opener = urllib.request.build_opener()
self.result = []
self.re_links = re.compile('<a.*?href=.*?<\/a>', re.I)
# self.re_element = re.compile('', re.I) # Hardcode at following
self.requests = []
for url in urls:
if hasattr(urllib, 'Request'):
request = urllib.Request(url)
else: # Python 3 compatible
request = urllib.request.Request(url)
request.add_header('User-Agent', 'Mozilla/5.0')
self.requests.append(request)
def get_page(self, request):
"""
Open the page.
"""
try:
self.html = self.opener.open(request).read()
except urllib.HTTPError:
return False
return self.html
def get_links(self, html = ''):
"""
Process the page, get all of links
"""
if not html:
html = self.html
self.links = self.re_links.findall(html)
return self.links
def get_element(self, link = ''):
"""
Process the link generated by self.get_links().
Return list of the href and text
"""
# FIXME: have no idea how to split the href and text with one regex
# So use two regex for temporary solution
href = re.findall('(?<=href=").*?(?=")', link) # Get the href attribute
if not href: # Process the no href attr
href = ['']
text = re.split('(<.*?>)', link)[2] # Get the text of link a.
href.append(text) # Append to the list.
return href
def get_ticket(self, request = None):
"""
Generate the data structure of tickets for each URL.
"""
if not request:
request = self.requests[0]
self.get_page(request)
self.get_links()
i = 0
while i < len(self.links):
link = self.get_element(self.links[i])
if not link:
continue
url = link[0]
name = link[1]
if name and name.find(keyword) >= 0 and url not in self.cache:
self.result.append((
i, name, url,
))
self.cache.append(url)
i += 1
return self.result
def print_tickets(self):
"""
Process all of URLS and print out the tickets information.
"""
while 1:
self.result = []
try:
print('Begin retrive')
for request in self.requests:
print('Begin scan %s' % request.get_full_url())
self.get_ticket(request)
print('Found %s urls.' % len(self.links))
for r in self.result:
print('Index: %s\nName: %s\nURL: %s\n' % (
r[0], r[1], r[2]
))
print('Scan finished, begin sleep %s seconds' % self.sequence)
time.sleep(self.sequence)
except KeyboardInterrupt:
exit()
except:
raise
if __name__ == '__main__':
tt = TrainTicket(urls, keyword, sequence)
tt.print_tickets()
写了个监视酷讯火车票的 Python 程序
受不了了,买火车票买不到,只好盯上黄牛票了,可是没法不停地刷页面啊,刚刚就错过了一个发布了 20 分钟的黄牛票,打电话回去时已经打不通了。。。-_-#
就写了个程序来解决这个问题,粘了一堆代码(参考太多,头一次写这种东西,原作者勿怪),总算成了,可能有 bug,欢迎提交 patch 或者更好的解决办法。
可以通过修改下面的参数来修改程序执行:
url = "http://piao.kuxun.cn/beijing-jinggangshan/" # 把火车票的搜索地址粘在这里,这里假设是北京到井冈山的 key = "2张" # 搜索关键字,我得俩人啊。。。 sequence = 60#60 * 5 # 搜索间隔,给服务器压力别太大,每分钟一次就行了。
#!/usr/bin/python
# encoding: utf-8
import urllib2
import mailbox
import time
import os
import re
from sgmllib import SGMLParser
class URLListName(SGMLParser):
is_a=""
name=[]
def start_a(self, attrs):
self.is_a=1
def end_a(self):
self.is_a=""
def handle_data(self, text):
if self.is_a:
self.name.append(text)
url = "http://piao.kuxun.cn/beijing-jinggangshan/"
key = "2张"
sequence = 60#60 * 5
request = urllib2.Request(url)
request.add_header('User-Agent', 'Mozilla/5.0')
opener = urllib2.build_opener()
data = opener.open(request).read()
ticket_name = URLListName()
ticket_name.feed(data)
cache=[]
while 1:
try:
print "beign retrive"
data = opener.open(request).read()
ticket_name.feed(data)
print "beign scan"
for result in ticket_name.name:
if result and result.find(key) >= 0:
if result in cache:
pass
else:
print "found:" + result
cache.append(result)
print "scan finished, begin sleep " + str(sequence) + " seconds."
time.sleep(sequence)
except:
raise