Статьи

Змеиный суп

Одним из (немногих) определяющих моментов Web 2.0 является использование удаленных данных и сервисов. Это замечательно, если ваш провайдер услуг — Amazon, Yahoo или Google, но не очень хорошо, если это ваши региональные выборные представители , которые, возможно, только что пришли в Web 1.0. Возможность добывать такие сайты для данных становится все более и более важной частью повседневной веб-разработки.

В любом случае, размышляя над тем, чего не хватает форумной матрице или вики-матрице , я решил, что это хороший повод, чтобы принять BeautifulSoup за спин; «Синтаксический анализатор Python HTML / XML, разработанный для быстрых поворотных проектов, таких как очистка экрана», один из лучших (если не самый лучший, по мнению экспертов ) инструментов такого рода (обратите внимание, что тот же автор также разработал RubyfulSoup ).

Beautiful Soup способен обрабатывать в значительной степени худший HTML, который вы можете бросить на него, и при этом предоставить вам полезную структуру данных. Например, учитывая некоторые HTML, как;

 <Я> <б> Aargh! </ I> </ B>

… и бегать по Прекрасному Супу как;

from BeautifulSoup import BeautifulSoup print BeautifulSoup('<i><b>Aargh!</i></b>').prettify() 

…Я получил;

 <Я>
  <Б>
   Aargh!
  </ B>
 </ I>

… обратите внимание, как изменился порядок тегов. Эта очистка позволяет мне получить доступ к внутреннему тексту, как;

from BeautifulSoup import BeautifulSoup soup = BeautifulSoup('<i><b>Aargh!</i></b>') print soup.ibstring
from BeautifulSoup import BeautifulSoup soup = BeautifulSoup('<i><b>Aargh!</i></b>') print soup.ibstring 

Это не полный учебник — документация обширная и превосходная. Еще одна ссылка, о которой вам следует знать, это urllib2 — Руководство по отсутствующим документам , в котором описана библиотека pyllons urllib2 (помимо прочего, имеется HTTP-клиент).

В любом случае, миссия состояла в том, чтобы разыскать MARC для рассылки консультативных рассылок Secunia , чтобы ускорить оценку записей безопасности.

MARC предоставляет интерфейс поиска, который отображает результаты на страницах до 30 одновременно. Несмотря на то, что все это легко можно получить с помощью HTTP GET-запросов, MARC, похоже, не подвергается регулярным изменениям HTML (все равно выглядит так же, как я помню, и эти теги <font /> — пустая вещь), что, как мы надеемся, означает что-нибудь полезное это HTML не будет «сломан» в ближайшем будущем.

Результат в advisories.py ;

#!/usr/bin/python """ Pulls out secunia security advisories from http://marc.theaimsgroup.com/?l=secunia-sec-adv DO NOT overuse! Make sure you read the following: http://marc.theaimsgroup.com/?q=about#Robots Also be aware that secunia _may_ feel you may be making inappropriate use of their advisories. For example they have strict rules regarding content _on_ their site (http://secunia.com/terms_and_conditions/) but this may not applying to the mailing list announcements License on the script is GPL: http://www.gnu.org/copyleft/gpl.html """ import urllib2, re, time from urllib import urlencode from BeautifulSoup import BeautifulSoup def fetchPage(application, page = 1): """ Fetches a page of advisories, using the marc search interface """ url = 'http://marc.theaimsgroup.com/?l=secunia-sec-adv&%s&%s' % (urlencode({'s':application}), urlencode({'r':page})) return urllib2.urlopen(url) def fetchMessage(mid): """ Fetches a single advisory, given it's marc message id """ url = 'http://marc.theaimsgroup.com/?l=secunia-sec-adv&%s&q=raw' % (urlencode({'m':mid})) return urllib2.urlopen(url).read() class LastPage(Exception): """ Used to flag that there are no pages of advisories to process """ pass class FlyInMySoup(Exception): """ Used to indicate the HTML being passed varies wildly from what was expected. """ pass class NotModified(Exception): """ Used to indicate there are no new advisories """ pass class Advisories: """ Controls BeautifulSoup, pulling out relevant information from a page of advisories and 'crawling' for additional pages as needed """ maxpages = 10 # If there are more than this num pages, give up requestdelay = 1 # Delay between successive requests - be kind to marc! __nohits = re.compile('^No hits found.*') __addate = re.compile('.*[0-9]+. ([0-9]{4}-[0-9]{2}-[0-9]{2}).*', re.DOTALL) __messageid = re.compile('.*m=([0-9]+).*') def __init__(self, application, lastMsgId = None): self.__application = application self.__lastMsgId = lastMsgId self.__advisories = [] self.__pages = [] self.__loaded = 0 def __loadPage(self, page = 0): """ Load a page and store it in mem as BeautifulSoup instance """ self.__pages.append(BeautifulSoup(fetchPage(self.__application, page+1))) time.sleep(self.requestdelay) def __hasAdvisories(self, page = 0): """ Test whether page has advisors. To be regarded as not having advisories, it must contain a font tag with the words "No hits found". Other input raises FlyInMySoup and will typically mean something is badly broken """ font = self.__pages[page].body.find(name='font', size='+1') if not font: if self.__pages[page].body.pre is None: raise FlyInMySoup, "body > pre tag ? advisories?n%s" % self.__pages[page].prettify return True if self.__nohits.match(font.string) == None: raise FlyInMySoup, "Nosir - dont like that font tag?n%s" % font.prettify return False def __hasAnotherPage(self, page = 0): """ Hunts for a img src = 'images/arrright.gif' (Next) in the advisories page and if found returns a page number to make another request with. Other raises a LastPage exception """ if page >= self.maxpages: raise LastPage; pre = self.__pages[page].body.pre imgs = pre.findAll(name='img', src='images/arrright.gif', limit=5) if len(imgs) > 0: return page + 1 raise LastPage def __fetchAdvisories(self, page = 0): """ Fetches a page of advisories, recursing if more pages of advisories were found """ self.__loadPage(page) if self.__hasAdvisories(page): advisory = {} in_advisory = 0 pre = self.__pages[page].body.pre for child in pre: if not in_advisory: m = self.__addate.match(str(child)) if m is not None: in_advisory = 1 advisory['date'] = m.group(1) else: try: advisory['mid'] = self.__messageid.match(child['href']).group(1) advisory['desc'] = child.string.strip() self.__advisories.append(advisory) advisory = {} in_advisory = 0 except: pass # Some sanity checks... if len(self.__advisories) == 0: raise FlyInMySoup, "No advisories in body > pre!n%s" % pre if in_advisory: raise FlyInMySoup, "Still looking for the last advisory" # More protection for marc if self.__lastMsgId and self.__advisories[0]['mid'] == str(self.__lastMsgId): raise NotModified, "Not modified - last message id: %s" % self.__lastMsgId try: nextpage = self.__hasAnotherPage(page) except: return self.__fetchAdvisories(nextpage) def __lazyFetch(self): """ Fetch advisories but only when needed """ if not self.__loaded: self.__fetchAdvisories() self.__loaded = 1 def __iter__(self): self.__lazyFetch() return self.__advisories.__iter__() def __len__(self): self.__lazyFetch() return len(self.__advisories) if __name__ == '__main__': import getopt, sys, csv from os import getcwd from os.path import isdir, isfile, realpath, join def usage(): """ advisories.py [-p=proxy_url] [-f] [-d=target_dir] <application> Pulls a list of security advisories for a given <application> Puts a summary list in <application>.csv and raw text in <application>_<msgid>.txt options: -d, --directory= (directory to write csv and raw msgs to) -f, --fetchmsgs (fetch raw messages announcements as well) -h, --help (display this message) -p, --proxy=http://user:[email protected]:8080 """ print usage.__doc__ def lastMsgId(csvfile): """ Pull out the last message id from the csvfile. Used to test for changes if the advisories page """ if not isfile(csvfile): return None try: fh = open(csvfile, 'rb') csvreader = csv.reader(fh, dialect='excel') csvreader.next() id = csvreader.next()[1] fh.close() return id except: return None app = None proxy = None fetchMsgs = 0 dir = getcwd() try: opts, args = getopt.getopt(sys.argv[1:], "fhp:d:", ["help", "fetchmsgs", "proxy=", "directory="]) for o, v in opts: if o in ("-h", "--help"): usage() sys.exit(0) if o in ("-f", "--fetchmsgs"): fetchMsgs = 1 elif o in ("-p", "--proxy"): proxy = v elif o in ("-d", "--directory"): if isdir(realpath(v)): dir = realpath(v) else: raise "Invalid dir %s" % v if len(args) == 1: app = args[0] else: raise getopt.error("Supply an app name to fetch advisories for!") except getopt.error, msg: print msg print "for help use --help" sys.exit(2) if proxy: # Use the explicit proxy passed as a CLI option proxy_support = urllib2.ProxyHandler({"http" : proxy}) else: # Prevent urllib2 from attempting to auto detect a proxy proxy_support = urllib2.ProxyHandler({}) opener = urllib2.build_opener(proxy_support, urllib2.HTTPHandler) urllib2.install_opener(opener) csvfile = join(dir,app+'.csv') advs = Advisories(app, lastMsgId(csvfile)) if len(advs) > 0: fh=open(csvfile, 'wb') csvwriter=csv.writer(fh, dialect='excel') csvwriter.writerow(('date','mid','desc')) for a in advs: csvwriter.writerow((a['date'], a['mid'], a['desc'])) if fetchMsgs: mfh=open(join(dir, "%s_%s.txt" % (app, a['mid'])), 'wb') mfh.write(fetchMessage(a['mid'])) mfh.close() fh.close() print "%s advisories found for %s" % (len(advs), app) else: print "No advisories found for %s" % app
#!/usr/bin/python """ Pulls out secunia security advisories from http://marc.theaimsgroup.com/?l=secunia-sec-adv DO NOT overuse! Make sure you read the following: http://marc.theaimsgroup.com/?q=about#Robots Also be aware that secunia _may_ feel you may be making inappropriate use of their advisories. For example they have strict rules regarding content _on_ their site (http://secunia.com/terms_and_conditions/) but this may not applying to the mailing list announcements License on the script is GPL: http://www.gnu.org/copyleft/gpl.html """ import urllib2, re, time from urllib import urlencode from BeautifulSoup import BeautifulSoup def fetchPage(application, page = 1): """ Fetches a page of advisories, using the marc search interface """ url = 'http://marc.theaimsgroup.com/?l=secunia-sec-adv&%s&%s' % (urlencode({'s':application}), urlencode({'r':page})) return urllib2.urlopen(url) def fetchMessage(mid): """ Fetches a single advisory, given it's marc message id """ url = 'http://marc.theaimsgroup.com/?l=secunia-sec-adv&%s&q=raw' % (urlencode({'m':mid})) return urllib2.urlopen(url).read() class LastPage(Exception): """ Used to flag that there are no pages of advisories to process """ pass class FlyInMySoup(Exception): """ Used to indicate the HTML being passed varies wildly from what was expected. """ pass class NotModified(Exception): """ Used to indicate there are no new advisories """ pass class Advisories: """ Controls BeautifulSoup, pulling out relevant information from a page of advisories and 'crawling' for additional pages as needed """ maxpages = 10 # If there are more than this num pages, give up requestdelay = 1 # Delay between successive requests - be kind to marc! __nohits = re.compile('^No hits found.*') __addate = re.compile('.*[0-9]+. ([0-9]{4}-[0-9]{2}-[0-9]{2}).*', re.DOTALL) __messageid = re.compile('.*m=([0-9]+).*') def __init__(self, application, lastMsgId = None): self.__application = application self.__lastMsgId = lastMsgId self.__advisories = [] self.__pages = [] self.__loaded = 0 def __loadPage(self, page = 0): """ Load a page and store it in mem as BeautifulSoup instance """ self.__pages.append(BeautifulSoup(fetchPage(self.__application, page+1))) time.sleep(self.requestdelay) def __hasAdvisories(self, page = 0): """ Test whether page has advisors. To be regarded as not having advisories, it must contain a font tag with the words "No hits found". Other input raises FlyInMySoup and will typically mean something is badly broken """ font = self.__pages[page].body.find(name='font', size='+1') if not font: if self.__pages[page].body.pre is None: raise FlyInMySoup, "body > pre tag ? advisories?n%s" % self.__pages[page].prettify return True if self.__nohits.match(font.string) == None: raise FlyInMySoup, "Nosir - dont like that font tag?n%s" % font.prettify return False def __hasAnotherPage(self, page = 0): """ Hunts for a img src = 'images/arrright.gif' (Next) in the advisories page and if found returns a page number to make another request with. Other raises a LastPage exception """ if page >= self.maxpages: raise LastPage; pre = self.__pages[page].body.pre imgs = pre.findAll(name='img', src='images/arrright.gif', limit=5) if len(imgs) > 0: return page + 1 raise LastPage def __fetchAdvisories(self, page = 0): """ Fetches a page of advisories, recursing if more pages of advisories were found """ self.__loadPage(page) if self.__hasAdvisories(page): advisory = {} in_advisory = 0 pre = self.__pages[page].body.pre for child in pre: if not in_advisory: m = self.__addate.match(str(child)) if m is not None: in_advisory = 1 advisory['date'] = m.group(1) else: try: advisory['mid'] = self.__messageid.match(child['href']).group(1) advisory['desc'] = child.string.strip() self.__advisories.append(advisory) advisory = {} in_advisory = 0 except: pass # Some sanity checks... if len(self.__advisories) == 0: raise FlyInMySoup, "No advisories in body > pre!n%s" % pre if in_advisory: raise FlyInMySoup, "Still looking for the last advisory" # More protection for marc if self.__lastMsgId and self.__advisories[0]['mid'] == str(self.__lastMsgId): raise NotModified, "Not modified - last message id: %s" % self.__lastMsgId try: nextpage = self.__hasAnotherPage(page) except: return self.__fetchAdvisories(nextpage) def __lazyFetch(self): """ Fetch advisories but only when needed """ if not self.__loaded: self.__fetchAdvisories() self.__loaded = 1 def __iter__(self): self.__lazyFetch() return self.__advisories.__iter__() def __len__(self): self.__lazyFetch() return len(self.__advisories) if __name__ == '__main__': import getopt, sys, csv from os import getcwd from os.path import isdir, isfile, realpath, join def usage(): """ advisories.py [-p=proxy_url] [-f] [-d=target_dir] <application> Pulls a list of security advisories for a given <application> Puts a summary list in <application>.csv and raw text in <application>_<msgid>.txt options: -d, --directory= (directory to write csv and raw msgs to) -f, --fetchmsgs (fetch raw messages announcements as well) -h, --help (display this message) -p, --proxy=http://user:[email protected]:8080 """ print usage.__doc__ def lastMsgId(csvfile): """ Pull out the last message id from the csvfile. Used to test for changes if the advisories page """ if not isfile(csvfile): return None try: fh = open(csvfile, 'rb') csvreader = csv.reader(fh, dialect='excel') csvreader.next() id = csvreader.next()[1] fh.close() return id except: return None app = None proxy = None fetchMsgs = 0 dir = getcwd() try: opts, args = getopt.getopt(sys.argv[1:], "fhp:d:", ["help", "fetchmsgs", "proxy=", "directory="]) for o, v in opts: if o in ("-h", "--help"): usage() sys.exit(0) if o in ("-f", "--fetchmsgs"): fetchMsgs = 1 elif o in ("-p", "--proxy"): proxy = v elif o in ("-d", "--directory"): if isdir(realpath(v)): dir = realpath(v) else: raise "Invalid dir %s" % v if len(args) == 1: app = args[0] else: raise getopt.error("Supply an app name to fetch advisories for!") except getopt.error, msg: print msg print "for help use --help" sys.exit(2) if proxy: # Use the explicit proxy passed as a CLI option proxy_support = urllib2.ProxyHandler({"http" : proxy}) else: # Prevent urllib2 from attempting to auto detect a proxy proxy_support = urllib2.ProxyHandler({}) opener = urllib2.build_opener(proxy_support, urllib2.HTTPHandler) urllib2.install_opener(opener) csvfile = join(dir,app+'.csv') advs = Advisories(app, lastMsgId(csvfile)) if len(advs) > 0: fh=open(csvfile, 'wb') csvwriter=csv.writer(fh, dialect='excel') csvwriter.writerow(('date','mid','desc')) for a in advs: csvwriter.writerow((a['date'], a['mid'], a['desc'])) if fetchMsgs: mfh=open(join(dir, "%s_%s.txt" % (app, a['mid'])), 'wb') mfh.write(fetchMessage(a['mid'])) mfh.close() fh.close() print "%s advisories found for %s" % (len(advs), app) else: print "No advisories found for %s" % app 

Предполагая, что у вас установлена ​​последняя версия python и Beautiful Soup 3.x + (загрузите tarball , распакуйте его куда-нибудь и запустите $ setup.py install для установки в вашу библиотеку Python), вы можете запустить этот скрипт из командной строки (он предназначен для cron ) лайк;

$ advisories.py phpbb 

… И он создаст файл phpbb.csv содержащий все найденные рекомендации. Есть несколько других функций, таких как поддержка прокси и возможность загрузки необработанных рекомендаций, о которых вы можете прочитать, запустив $ advisories.py --help . Убедитесь, что вы прочитали предупреждения в начале скрипта!

Итак, миссия в основном завершена. Интересная часть — выяснить, где поставить проверки в коде. В то время как Beautiful Soup позволяет читать практически все, что похоже на SGML , изменение структуры HTML-тегов MARC нарушило бы этот сценарий (в конце концов, это не официальный API), поэтому, надеюсь, он будет вызывать исключения в нужных местах. требуется вмешательство.

В противном случае, если вы попадаете в майнинг HTML, другим проектом, который нужно исследовать, является webstemmer (снова Python), который в некоторых случаях (например, новостной сайт) может быть достаточно умным, чтобы получить то, что вы хотите, с минимальными усилиями.