博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
阅读sqlmap源代码,编写burpsuite插件--sqlmapapi(二)
阅读量:6031 次
发布时间:2019-06-20

本文共 10537 字,大约阅读时间需要 35 分钟。

burpsuite插件编写---sql injection

0x00 概要

在安全测试过程中,大部分人会使用burpsuite的scanner模块进行测试,可以发现一些浅显的漏洞:比如xss、sql injection、cf、xxe、Arbitrary file existence disclosure in Act、明文传输等。

说到sql injection,测试人员都会有一种想法是否存在一款自动化工具,可以将某一网站的所有链接都去尝试一边,尽可能的发现所有的sql injection。有了这种想法后大家会去想解决方案,有一种解决方案是编写burpsuite插件。
本文接上一篇文章,上一文记录到继承IHttpListener接口编写插件,影响测试效率,本文将介绍另一种方式,将sqlmapapi 和burpsuite进行对比

0x01 继承IScannerCheck接口(方法2)

先上插件代码:

> from burp import IBurpExtender> from burp import IScannerCheck> from java.io import PrintWriter> import re> import urllib> import urllib2> import time> import json> from threading import Thread> import requests> > > > > class BurpExtender(IBurpExtender, IScannerCheck):> >     #>     #implement IBurpExtender>     #>     def    registerExtenderCallbacks(self, callbacks):>         # keep a reference to our callbacks object>         self._callbacks = callbacks> >         # set our extension name>         callbacks.setExtensionName("fanyingjie")> >         # obtain our output stream>         self._stdout = PrintWriter(callbacks.getStdout(), True)> >         self._helpers  = callbacks.getHelpers()> >         # register ourselves as an>         callbacks.registerScannerCheck(self)> > >     def doActiveScan(self, baseRequestResponse, insertionPoint):>         pass>     def doPassiveScan(self, baseRequestResponse):>         a=self._helpers.analyzeRequest(baseRequestResponse)>         method=a.getMethod()>         url=str(a.getUrl())>         if(("?" in url) and (method=="GET")):>             self._stdout.println("start")>             t=AutoSqli(target=url,stdout=self._stdout,method=method)>             t.run()> >     def consolidateDuplicateIssues(self, existingIssue, newIssue):>         pass> >             > > class AutoSqli(Thread):>     def __init__(self,target,stdout,method):>         self.server="http://192.168.159.134:8775">         self.taskid = ''>         self.target=target>         self.method=method>         self._stdout=stdout>         self.start_time = time.time()> >     def task_new(self):>         self.taskid = json.loads(urllib2.urlopen(self.server + '/task/new').read())['taskid']>         self._stdout.println('Created new task: ' + self.taskid )>         if len(self.taskid) > 0:>             return True>         return False> >     def task_delete(self):>         if json.loads(urllib2.urlopen(self.server + '/task/' + self.taskid + '/delete').read())['success']:>             self._stdout.println('[%s] Deleted task' % (self.taskid))>             return True>         return False> > >     def scan_start(self):>         headers = {'Content-Type': 'application/json'}>         payload = {'url':self.target}>         url = self.server + '/scan/' + self.taskid + '/start'>         #t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text)>         >         req=urllib2.Request(url,data=json.dumps(payload),headers=headers)>         t=json.loads(urllib2.urlopen(req).read())>         self._stdout.println("start "+ self.taskid)> >         if len(str(t['engineid'])) > 0 and t['success']:>             return True>         return False> >     def scan_status(self):>         status = json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/status').read())['status']>         if status == 'running':>             return 'running'>         if status == 'terminated':>             return 'terminated'>         return "error"> >     def scan_data(self):>         >         data = json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/data').read())['data']>         if len(data) == 0:>             self._stdout.println('not injection:\t' + self.target)>             return False>         else:>             self._stdout.println('injection:\t' + self.target)>             return True> >     def scan_kill(self):>         json.loads(rurllib2.urlopen(self.server + '/scan/' + self.taskid + '/kill').read())['success']>         self._stdout.println("%s kill")%(self.taskid)> > >     def scan_stop(self):>         json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/stop').read())['success']>         self._stdout.println("%s stop")%(self.taskid)> >     def run(self):>         try:>             if not self.task_new():>                 return False>             if not self.scan_start():>                 return False>             while True:>                 if self.scan_status() == 'running':>                     time.sleep(10)>                 elif self.scan_status() == 'terminated':>                     break>                 else:>                     break>                 #print self.target + ":\t" + str(time.time() - self.start_time)>                 if time.time() - self.start_time > 500:>                     self.scan_stop()>                     self.scan_kill()>                     break>             self.scan_data()>             #self.task_delete()> >         except Exception as e:>             pass>

使用burp scanner模块和上面写的插件对http://172.16.173.136/sqli-labs/Less-8/?id=234 进行sql注入测试,burp scanner模块可以测试出存在注入,但是插件无法测试出;

因此插件进行改写,修改默认配置,在插件中添加以下代码,我只修改了level和risk

def optionSet(self):        headers = {'Content-Type': 'application/json'}        payload={"level":"5","risk":"3"}        url = self.server + '/option/' + self.taskid + '/set'        req = urllib2.Request(url, data=json.dumps(payload), headers=headers)        t = json.loads(urllib2.urlopen(req).read())        self._stdout.println("set option " + self.taskid)

然后在新建扫描后,调用配置设置函数,修改后的代码:

from burp import IBurpExtenderfrom burp import IScannerCheckfrom java.io import PrintWriterimport reimport urllibimport urllib2import timeimport jsonfrom threading import Threadimport requestsclass BurpExtender(IBurpExtender, IScannerCheck):    #    # implement IBurpExtender    #    def registerExtenderCallbacks(self, callbacks):        # keep a reference to our callbacks object        self._callbacks = callbacks        # set our extension name        callbacks.setExtensionName("fanyingjie")        # obtain our output stream        self._stdout = PrintWriter(callbacks.getStdout(), True)        self._helpers = callbacks.getHelpers()        # register ourselves as an        callbacks.registerScannerCheck(self)    def doActiveScan(self, baseRequestResponse, insertionPoint):        pass    def doPassiveScan(self, baseRequestResponse):        a = self._helpers.analyzeRequest(baseRequestResponse)        method = a.getMethod()        url = str(a.getUrl())        if (("?" in url) and (method == "GET")):            self._stdout.println("start")            t = AutoSqli(target=url, stdout=self._stdout, method=method)            t.run()    def consolidateDuplicateIssues(self, existingIssue, newIssue):        passclass AutoSqli(Thread):    def __init__(self, target, stdout, method):        self.server = "http://172.16.173.136:8775"        self.taskid = ''        self.target = target        self.method = method        self._stdout = stdout        self.start_time = time.time()    def task_new(self):        self.taskid = json.loads(urllib2.urlopen(self.server + '/task/new').read())['taskid']        self._stdout.println('Created new task: ' + self.taskid)        if len(self.taskid) > 0:            return True        return False    def task_delete(self):        if json.loads(urllib2.urlopen(self.server + '/task/' + self.taskid + '/delete').read())['success']:            self._stdout.println('[%s] Deleted task' % (self.taskid))            return True        return False    def optionSet(self):        headers = {'Content-Type': 'application/json'}        payload={"level":"5","risk":"3"}        url = self.server + '/option/' + self.taskid + '/set'        req = urllib2.Request(url, data=json.dumps(payload), headers=headers)        t = json.loads(urllib2.urlopen(req).read())        self._stdout.println("set option " + self.taskid)    def scan_start(self):        headers = {'Content-Type': 'application/json'}        payload = {'url': self.target}        url = self.server + '/scan/' + self.taskid + '/start'        # t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text)        req = urllib2.Request(url, data=json.dumps(payload), headers=headers)        t = json.loads(urllib2.urlopen(req).read())        self._stdout.println("start " + self.taskid)        if len(str(t['engineid'])) > 0 and t['success']:            return True        return False    def scan_status(self):        status = json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/status').read())['status']        if status == 'running':            return 'running'        if status == 'terminated':            return 'terminated'        return "error"    def scan_data(self):        data = json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/data').read())['data']        if len(data) == 0:            self._stdout.println('not injection:\t' + self.target)            return False        else:            self._stdout.println('injection:\t' + self.target)            return True    def scan_kill(self):        json.loads(rurllib2.urlopen(self.server + '/scan/' + self.taskid + '/kill').read())['success']        self._stdout.println("%s kill") % (self.taskid)    def scan_stop(self):        json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/stop').read())['success']        self._stdout.println("%s stop") % (self.taskid)    def run(self):        try:            if not self.task_new():                return False            self.optionSet()            if not self.scan_start():                return False            while True:                if self.scan_status() == 'running':                    time.sleep(10)                elif self.scan_status() == 'terminated':                    break                else:                    break                # print self.target + ":\t" + str(time.time() - self.start_time)                if time.time() - self.start_time > 500:                    self.scan_stop()                    self.scan_kill()                    break            self.scan_data()            # self.task_delete()        except Exception as e:            pass

再次对上面代码进行测试,可以测试出存在注入

插件的输出:

start

Created new task: 42f685742c94a985
set option 42f685742c94a985
start 42f685742c94a985
injection:


比较sqlmapapi和burpsuite scanner模块:

通过修改sqlmapapi默认设置可以提高注入检测的准确率,但是修改默认配置后sqlmapapi不见的比burpsuite scanner模块更厉害,而且sqlmapapi只能检测get请求类型,且不能检测登录后的get请求,因此看来在安全测试过程中,如果目的只是发现网站是否存在sql注入,还是使用burpsuite scanner 模块比较好用;在知道存在sql注入后,可以使用sqlmap来进行后续的操作。

转载于:https://blog.51cto.com/13770310/2130522

你可能感兴趣的文章
32位系统和64位系统的选择
查看>>
01配置管理过程指南
查看>>
python 搭配 及目录结构
查看>>
设计模式总结
查看>>
os/exec
查看>>
iOS后台任务,争取一段时间处理后事
查看>>
如何在linux下修改组权限
查看>>
把jpg转换成pdf软件
查看>>
RestTemplate
查看>>
build
查看>>
nutch2.1+mysql报错及解决
查看>>
spring boot打jar包发布
查看>>
《JavaScript高级程序设计》节点层次和DOM操作技术
查看>>
form 提交多个对象及springMVC接收
查看>>
jstl格式化时间
查看>>
一则关于运算符的小例
查看>>
centos7 ambari2.6.1.5+hdp2.6.4.0 大数据集群安装部署
查看>>
cronexpression 详解
查看>>
一周小程序学习 第1天
查看>>
小孩的linux
查看>>