本文共 10537 字,大约阅读时间需要 35 分钟。
在安全测试过程中,大部分人会使用burpsuite的scanner模块进行测试,可以发现一些浅显的漏洞:比如xss、sql injection、cf、xxe、Arbitrary file existence disclosure in Act、明文传输等。
说到sql injection,测试人员都会有一种想法是否存在一款自动化工具,可以将某一网站的所有链接都去尝试一边,尽可能的发现所有的sql injection。有了这种想法后大家会去想解决方案,有一种解决方案是编写burpsuite插件。本文接上一篇文章,上一文记录到继承IHttpListener接口编写插件,影响测试效率,本文将介绍另一种方式,将sqlmapapi 和burpsuite进行对比先上插件代码:
> from burp import IBurpExtender> from burp import IScannerCheck> from java.io import PrintWriter> import re> import urllib> import urllib2> import time> import json> from threading import Thread> import requests> > > > > class BurpExtender(IBurpExtender, IScannerCheck):> > #> #implement IBurpExtender> #> def registerExtenderCallbacks(self, callbacks):> # keep a reference to our callbacks object> self._callbacks = callbacks> > # set our extension name> callbacks.setExtensionName("fanyingjie")> > # obtain our output stream> self._stdout = PrintWriter(callbacks.getStdout(), True)> > self._helpers = callbacks.getHelpers()> > # register ourselves as an> callbacks.registerScannerCheck(self)> > > def doActiveScan(self, baseRequestResponse, insertionPoint):> pass> def doPassiveScan(self, baseRequestResponse):> a=self._helpers.analyzeRequest(baseRequestResponse)> method=a.getMethod()> url=str(a.getUrl())> if(("?" in url) and (method=="GET")):> self._stdout.println("start")> t=AutoSqli(target=url,stdout=self._stdout,method=method)> t.run()> > def consolidateDuplicateIssues(self, existingIssue, newIssue):> pass> > > > class AutoSqli(Thread):> def __init__(self,target,stdout,method):> self.server="http://192.168.159.134:8775"> self.taskid = ''> self.target=target> self.method=method> self._stdout=stdout> self.start_time = time.time()> > def task_new(self):> self.taskid = json.loads(urllib2.urlopen(self.server + '/task/new').read())['taskid']> self._stdout.println('Created new task: ' + self.taskid )> if len(self.taskid) > 0:> return True> return False> > def task_delete(self):> if json.loads(urllib2.urlopen(self.server + '/task/' + self.taskid + '/delete').read())['success']:> self._stdout.println('[%s] Deleted task' % (self.taskid))> return True> return False> > > def scan_start(self):> headers = {'Content-Type': 'application/json'}> payload = {'url':self.target}> url = self.server + '/scan/' + self.taskid + '/start'> #t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text)> > req=urllib2.Request(url,data=json.dumps(payload),headers=headers)> t=json.loads(urllib2.urlopen(req).read())> self._stdout.println("start "+ self.taskid)> > if len(str(t['engineid'])) > 0 and t['success']:> return True> return False> > def scan_status(self):> status = json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/status').read())['status']> if status == 'running':> return 'running'> if status == 'terminated':> return 'terminated'> return "error"> > def scan_data(self):> > data = json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/data').read())['data']> if len(data) == 0:> self._stdout.println('not injection:\t' + self.target)> return False> else:> self._stdout.println('injection:\t' + self.target)> return True> > def scan_kill(self):> json.loads(rurllib2.urlopen(self.server + '/scan/' + self.taskid + '/kill').read())['success']> self._stdout.println("%s kill")%(self.taskid)> > > def scan_stop(self):> json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/stop').read())['success']> self._stdout.println("%s stop")%(self.taskid)> > def run(self):> try:> if not self.task_new():> return False> if not self.scan_start():> return False> while True:> if self.scan_status() == 'running':> time.sleep(10)> elif self.scan_status() == 'terminated':> break> else:> break> #print self.target + ":\t" + str(time.time() - self.start_time)> if time.time() - self.start_time > 500:> self.scan_stop()> self.scan_kill()> break> self.scan_data()> #self.task_delete()> > except Exception as e:> pass>
使用burp scanner模块和上面写的插件对http://172.16.173.136/sqli-labs/Less-8/?id=234 进行sql注入测试,burp scanner模块可以测试出存在注入,但是插件无法测试出;
因此插件进行改写,修改默认配置,在插件中添加以下代码,我只修改了level和riskdef optionSet(self): headers = {'Content-Type': 'application/json'} payload={"level":"5","risk":"3"} url = self.server + '/option/' + self.taskid + '/set' req = urllib2.Request(url, data=json.dumps(payload), headers=headers) t = json.loads(urllib2.urlopen(req).read()) self._stdout.println("set option " + self.taskid)
然后在新建扫描后,调用配置设置函数,修改后的代码:
from burp import IBurpExtenderfrom burp import IScannerCheckfrom java.io import PrintWriterimport reimport urllibimport urllib2import timeimport jsonfrom threading import Threadimport requestsclass BurpExtender(IBurpExtender, IScannerCheck): # # implement IBurpExtender # def registerExtenderCallbacks(self, callbacks): # keep a reference to our callbacks object self._callbacks = callbacks # set our extension name callbacks.setExtensionName("fanyingjie") # obtain our output stream self._stdout = PrintWriter(callbacks.getStdout(), True) self._helpers = callbacks.getHelpers() # register ourselves as an callbacks.registerScannerCheck(self) def doActiveScan(self, baseRequestResponse, insertionPoint): pass def doPassiveScan(self, baseRequestResponse): a = self._helpers.analyzeRequest(baseRequestResponse) method = a.getMethod() url = str(a.getUrl()) if (("?" in url) and (method == "GET")): self._stdout.println("start") t = AutoSqli(target=url, stdout=self._stdout, method=method) t.run() def consolidateDuplicateIssues(self, existingIssue, newIssue): passclass AutoSqli(Thread): def __init__(self, target, stdout, method): self.server = "http://172.16.173.136:8775" self.taskid = '' self.target = target self.method = method self._stdout = stdout self.start_time = time.time() def task_new(self): self.taskid = json.loads(urllib2.urlopen(self.server + '/task/new').read())['taskid'] self._stdout.println('Created new task: ' + self.taskid) if len(self.taskid) > 0: return True return False def task_delete(self): if json.loads(urllib2.urlopen(self.server + '/task/' + self.taskid + '/delete').read())['success']: self._stdout.println('[%s] Deleted task' % (self.taskid)) return True return False def optionSet(self): headers = {'Content-Type': 'application/json'} payload={"level":"5","risk":"3"} url = self.server + '/option/' + self.taskid + '/set' req = urllib2.Request(url, data=json.dumps(payload), headers=headers) t = json.loads(urllib2.urlopen(req).read()) self._stdout.println("set option " + self.taskid) def scan_start(self): headers = {'Content-Type': 'application/json'} payload = {'url': self.target} url = self.server + '/scan/' + self.taskid + '/start' # t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text) req = urllib2.Request(url, data=json.dumps(payload), headers=headers) t = json.loads(urllib2.urlopen(req).read()) self._stdout.println("start " + self.taskid) if len(str(t['engineid'])) > 0 and t['success']: return True return False def scan_status(self): status = json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/status').read())['status'] if status == 'running': return 'running' if status == 'terminated': return 'terminated' return "error" def scan_data(self): data = json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/data').read())['data'] if len(data) == 0: self._stdout.println('not injection:\t' + self.target) return False else: self._stdout.println('injection:\t' + self.target) return True def scan_kill(self): json.loads(rurllib2.urlopen(self.server + '/scan/' + self.taskid + '/kill').read())['success'] self._stdout.println("%s kill") % (self.taskid) def scan_stop(self): json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/stop').read())['success'] self._stdout.println("%s stop") % (self.taskid) def run(self): try: if not self.task_new(): return False self.optionSet() if not self.scan_start(): return False while True: if self.scan_status() == 'running': time.sleep(10) elif self.scan_status() == 'terminated': break else: break # print self.target + ":\t" + str(time.time() - self.start_time) if time.time() - self.start_time > 500: self.scan_stop() self.scan_kill() break self.scan_data() # self.task_delete() except Exception as e: pass
再次对上面代码进行测试,可以测试出存在注入
插件的输出:start
Created new task: 42f685742c94a985set option 42f685742c94a985start 42f685742c94a985injection:比较sqlmapapi和burpsuite scanner模块:通过修改sqlmapapi默认设置可以提高注入检测的准确率,但是修改默认配置后sqlmapapi不见的比burpsuite scanner模块更厉害,而且sqlmapapi只能检测get请求类型,且不能检测登录后的get请求,因此看来在安全测试过程中,如果目的只是发现网站是否存在sql注入,还是使用burpsuite scanner 模块比较好用;在知道存在sql注入后,可以使用sqlmap来进行后续的操作。
转载于:https://blog.51cto.com/13770310/2130522