前面五章一直都没有分析到主要的地方,今天有空,就写下sqlmap是如何检测注入,内在的检测逻辑是怎样的。
之前的分析也有说过,sqlmap整个检测的开始是start()函数,所以一切都要从start()函数开始,在
/sqlmap/lib/controller/controller.py
""" This function calls a function that performs checks on both URL stability and all GET, POST, Cookie and User-Agent parameters to check if they are dynamic and SQL injection affected 这个函数功能是检测两个URL的稳定性,然后根据GET、POST、Cookie、和User-Agent参数 去检测是否存在sql注入 """ # direct 对应 -d 命令,直接接管数据库 if conf.direct: initTargetEnv() setupTargetEnv() action() return True if conf.url and not any((conf.forms, conf.crawlDepth)): # 添加目标 kb.targets.add((conf.url, conf.method, conf.data, conf.cookie, None)) # 判断,没有目标就会报错 if conf.configFile and not kb.targets: errMsg = "you did not edit the configuration file properly, set " errMsg += "the target URL, list of targets or google dork" logger.error(errMsg) return False if kb.targets and len(kb.targets) > 1: infoMsg = "sqlmap got a total of %d targets" % len(kb.targets) logger.info(infoMsg)
函数比较长,我们挑选重要的分段讲解。上面代码的意思还是检测。后面太长就不列出来了,大概意义就是配置HTTP发包响应的各种参数。
如果用户有输入参数设置的话,会设置各种参数。这里一一忽略
# 该函数主要包含3个子功能: # 1.创建保存目标执行结果的目录和文件 # 2.将get或post发送的数据解析成字典形式,并保存到conf.paramDict中 # 3.读取session文件(如果存在的话),并提起文件中的数据,保存到kb变量中 setupTargetEnv() if not checkConnection(suppressOutput=conf.forms) or not checkString() or not checkRegexp(): continue checkWaf() # 检测waf if conf.identifyWaf: identifyWaf() # 识别waf
检测WAF,后面就测试网页的稳定性(查看参数是否是动态的)
后面根据level等级确定注入的位置
后面又经过了一大段判断代码,来到第一个判断注入的地方。
# 简单的检测下数字型注入和一些其他漏洞 check = heuristicCheckSqlInjection(place, parameter)这个函数翻译为中文为启发性注入检测,函数里面内容就是判断一下数字型注入,例如id=1这个参数会变成id=randINT+id-randINT,用两个页面的差异性来判断是否存在注入,如果两者相似度很高,则可以说明存在注入了。
当然,这个参数里面也会用一些payload 来检测XSS和文件包含漏洞。
接着来到了sqlmap检测的核心地方。
injection = checkSqlInjection(place, parameter, value)这里是整个程序最核心的检测函数,函数在
/lib/controller/checks.py
这个文件中。
在往后,就是保存信息hash之类的东西了。所以,为了找到sqlmap是如何检测注入的,有必要到checks.py文件中去一探究竟。
def checkSqlInjection(place, parameter, value): # Store here the details about boundaries and payload used to # successfully inject # 存储成功用过的boundaries 和 pyaload细节 injection = InjectionDict() # Localized thread data needed for some methods # 局部现成数据为后面方法 threadData = getCurrentThreadData() # Favoring non-string specific boundaries in case of digit-like parameter values # 在类似digit的参数值时,支持非特定于字符串的边界 if value.isdigit(): kb.cache.intBoundaries = kb.cache.intBoundaries or sorted(copy.deepcopy(conf.boundaries), key=lambda boundary: any(_ in (boundary.prefix or "") or _ in (boundary.suffix or "") for _ in ('"', '\''))) boundaries = kb.cache.intBoundaries elif value.isalpha(): kb.cache.alphaBoundaries = kb.cache.alphaBoundaries or sorted(copy.deepcopy(conf.boundaries), key=lambda boundary: not any(_ in (boundary.prefix or "") or _ in (boundary.suffix or "") for _ in ('"', '\''))) boundaries = kb.cache.alphaBoundaries else: boundaries = conf.boundaries # Set the flag for SQL injection test mode kb.testMode = True paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else place tests = getSortedInjectionTests() # paramType是注入的类型,如GET。tests是要测试的列表,包含了每个测试项的名称,这些数据都是和/sqlmap/xml/payloads/目录下每个xml相对应的。 seenPayload = set() kb.data.setdefault("randomInt", str(randomInt(10))) kb.data.setdefault("randomStr", str(randomStr(10))) while tests: test = tests.pop(0) try: if kb.endDetection: break if conf.dbms is None: # If the DBMS has not yet been fingerprinted (via simple heuristic check # or via DBMS-specific payload) and boolean-based blind has been identified # then attempt to identify with a simple DBMS specific boolean-based # test what the DBMS may be # 如果数据库管理系统还没有指纹(通过简单的启发式检查) # 或通过dbms特定的有效负载)和基于booleande的盲人已经被识别。 # 然后尝试使用一个简单的基于sql的数据库管理系统 # 测试什么是DBMS if not injection.dbms and PAYLOAD.TECHNIQUE.BOOLEAN in injection.data: if not Backend.getIdentifiedDbms() and kb.heuristicDbms is None and not kb.droppingRequests: kb.heuristicDbms = heuristicCheckDbms(injection) # 启发式检测DBms # If the DBMS has already been fingerprinted (via DBMS-specific # error message, simple heuristic check or via DBMS-specific # payload), ask the user to limit the tests to the fingerprinted # DBMS # 这里DBMS已经被识别出来了,询问用户是否测试别的DBS if kb.reduceTests is None and not conf.testFilter and (intersect(Backend.getErrorParsedDBMSes(), SUPPORTED_DBMS, True) or kb.heuristicDbms or injection.dbms): msg = "it looks like the back-end DBMS is '%s'. " % (Format.getErrorParsedDBMSes() or kb.heuristicDbms or injection.dbms) msg += "Do you want to skip test payloads specific for other DBMSes? [Y/n]" kb.reduceTests = (Backend.getErrorParsedDBMSes() or [kb.heuristicDbms]) if readInput(msg, default='Y', boolean=True) else [] # If the DBMS has been fingerprinted (via DBMS-specific error # message, via simple heuristic check or via DBMS-specific # payload), ask the user to extend the tests to all DBMS-specific, # regardless of --level and --risk values provided # 如果数据库管理系统已被指纹(通过特定于DBMS的错误) # 消息,通过简单的启发式检查或通过特定于dbms # ,要求用户将测试扩展到所有特定于dbms的测试, # 不考虑所提供的级别和风险值 if kb.extendTests is None and not conf.testFilter and (conf.level < 5 or conf.risk < 3) and (intersect(Backend.getErrorParsedDBMSes(), SUPPORTED_DBMS, True) or kb.heuristicDbms or injection.dbms): msg = "for the remaining tests, do you want to include all tests " msg += "for '%s' extending provided " % (Format.getErrorParsedDBMSes() or kb.heuristicDbms or injection.dbms) msg += "level (%d)" % conf.level if conf.level < 5 else "" msg += " and " if conf.level < 5 and conf.risk < 3 else "" msg += "risk (%d)" % conf.risk if conf.risk < 3 else "" msg += " values? [Y/n]" if conf.level < 5 and conf.risk < 3 else " value? [Y/n]" kb.extendTests = (Backend.getErrorParsedDBMSes() or [kb.heuristicDbms]) if readInput(msg, default='Y', boolean=True) else [] title = test.title kb.testType = stype = test.stype clause = test.clause unionExtended = False trueCode, falseCode = None, None if conf.httpCollector is not None: conf.httpCollector.setExtendedArguments({ "_title": title, "_place": place, "_parameter": parameter, }) if stype == PAYLOAD.TECHNIQUE.UNION: # 会判断是不是union注入,这个stype就是payload文件夹下面xml文件中的stype,如果是union,就会进入,然后配置列的数量等。 configUnion(test.request.char) if "[CHAR]" in title: if conf.uChar is None: continue else: title = title.replace("[CHAR]", conf.uChar) elif "[RANDNUM]" in title or "(NULL)" in title: title = title.replace("[RANDNUM]", "random number") if test.request.columns == "[COLSTART]-[COLSTOP]": if conf.uCols is None: continue else: title = title.replace("[COLSTART]", str(conf.uColsStart)) title = title.replace("[COLSTOP]", str(conf.uColsStop)) elif conf.uCols is not None: debugMsg = "skipping test '%s' because the user " % title debugMsg += "provided custom column range %s" % conf.uCols logger.debug(debugMsg) continue match = re.search(r"(\d+)-(\d+)", test.request.columns) if match and injection.data: lower, upper = int(match.group(1)), int(match.group(2)) for _ in (lower, upper): if _ > 1: __ = 2 * (_ - 1) + 1 if _ == lower else 2 * _ unionExtended = True test.request.columns = re.sub(r"\b%d\b" % _, str(__), test.request.columns) title = re.sub(r"\b%d\b" % _, str(__), title) test.title = re.sub(r"\b%d\b" % _, str(__), test.title)
sqlmap会根据下面的漏洞类型来生成payload检测注入。
B: Boolean-based blind SQL injection(布尔型注入)
E: Error-based SQL injection(报错型注入)
U: UNION query SQL injection(可联合查询注入)
S: Stacked queries SQL injection(可多语句查询注入)
T: Time-based blind SQL injection(基于时间延迟注入)
Q: inline_query(内联查询)
到这里,sqlmap的生成payload流程已经清晰了,通过boundaries.xml来确定payload的前缀和后缀,然后通过上述技术从sqlmap/xml/payloads/中
根据level等级获取payload最后组合成sqlmap发包用的payload。
发表评论