- 浏览: 554390 次
- 性别:
- 来自: 深圳
文章分类
最新评论
-
jiang2011jiang:
mybatis3源码核心类1--Configuration -
tuyf_hs:
同求 图片
zookeeper+dubbo+dubbo管理集群的简要配置[单机] -
安静听歌:
请问图片还能找的会吗?你的图片和原文的图片都挂了,,,如果有图 ...
zookeeper+dubbo+dubbo管理集群的简要配置[单机] -
ahua186186:
yngwiet 写道楼主,有一个地方不太明白,为什么要用“ge ...
ListView中getChildAt(index)的使用注意事项 -
yngwiet:
楼主,有一个地方不太明白,为什么要用“getChildAt(p ...
ListView中getChildAt(index)的使用注意事项
1.总体思路:
通过约定的XML规则(分表分库规则)和 封装jdbc的Connection和PreparedStatement来实现SQL解析,sql路由和sql重写。
2. 3个核心类:ConnectionWrapper(JDBC Connection包装),PreparedStatementWrapper( JDBC PreparedStatement包装),SimpleExecutor(sql执行器,类似mybatis的SimpleExecutor)
3. 3个上下文传参数:ConnectionContext,StatementContext,transactionContext。
4. 真正干活的类:
(1)DefaultStatementContextBuilder类:解析SQL并保存BatchItem到StatementContext,--作者貌似直接用的mycat里面的sql解析的代码,直接拿来主义实现价值啊。
(2)DefaultTargetDispatcher:根据batchItem和batchItem中的tableInfo获取路由信息和重写SQL,达到路由到指定的分库分表的目的
(3)SimpleExecutor 和 HandlerFactory:
根据StatementContext的RouteTarget(路由数据),
新建事务并获取数据库连接, 实际执行JDBC curd操作的类.
兴趣点:发现doUpdate的时候有用同步工具类:
CyclicBarrier barrier = new CyclicBarrier(n);
5.集成Mybatis:
因为Mybatis获取连接是通过PooledDataSource或UnpooledDataSource获取的,所以写个插件:包装下DataSource,把oceanus的connentionWrap包装进去即可实现整合。
通过约定的XML规则(分表分库规则)和 封装jdbc的Connection和PreparedStatement来实现SQL解析,sql路由和sql重写。
2. 3个核心类:ConnectionWrapper(JDBC Connection包装),PreparedStatementWrapper( JDBC PreparedStatement包装),SimpleExecutor(sql执行器,类似mybatis的SimpleExecutor)
3. 3个上下文传参数:ConnectionContext,StatementContext,transactionContext。
4. 真正干活的类:
(1)DefaultStatementContextBuilder类:解析SQL并保存BatchItem到StatementContext,--作者貌似直接用的mycat里面的sql解析的代码,直接拿来主义实现价值啊。
public StatementContext build(String sql, StatementContext context) throws SQLException { if (context == null) { context = new StatementContext(); StatementContext.setContext(context); if (logger.isDebugEnabled()) { logger.debug("create context!sql=" + sql); } } if (context.getCurrentBatch().getSql() == null) { context.getCurrentBatch().setSql(sql); } StatementContextHandler handler = null; if (context.isBatch()) { handler = HandlerFactory.create(StatementType.BATCH); StatementContext resultContext = handler.handle(sql, context); processPreparedValues(resultContext); return resultContext; } TrackerExecutor.trackBegin(TrackPoint.PARSE_SQL, sql); SQLParser parser = StatementHelper.createSQLParser(); try { DMLStatementNode statementNode = (DMLStatementNode) parser .parseStatement(sql); switch (statementNode.getNodeType()) { case NodeTypes.CURSOR_NODE: handler = HandlerFactory.create(StatementType.SELECT); break; case NodeTypes.DELETE_NODE: handler = HandlerFactory.create(StatementType.DELETE); break; case NodeTypes.UPDATE_NODE: handler = HandlerFactory.create(StatementType.UPDATE); break; case NodeTypes.INSERT_NODE: handler = HandlerFactory.create(StatementType.INSERT); break; case NodeTypes.CALL_STATEMENT_NODE: handler = HandlerFactory.create(StatementType.CALLABLE); break; } StatementContext resultContext = handler.handle(statementNode, context); TrackerExecutor.trackEnd(TrackPoint.PARSE_SQL); processPreparedValues(resultContext); return resultContext; } catch (StandardException se) { System.out.println("sql parse error, sql:"+sql); se.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } processPreparedValues(context); return context; }
(2)DefaultTargetDispatcher:根据batchItem和batchItem中的tableInfo获取路由信息和重写SQL,达到路由到指定的分库分表的目的
Set<RouteTarget> getSpecifyTargets(TableInfo tableInfo, BatchItem batchItem) { Set<RouteTarget> targetSet = new LinkedHashSet<RouteTarget>(); Configurations configurations = Configurations.getInstance(); /** * 解析where中符合分库分表字段的值 */ Map<String, List<TableColumn>> resolveColumns = RouteHelper .getResolveColumns(tableInfo.getOrgName(), batchItem.getAnalyzeResult()); List<Map<String, Object>> parameters = RouteHelper .getParameterValues(resolveColumns); Set<Integer> indexs = new HashSet<Integer>(); TableDescription desc = configurations.getTableDescription(tableInfo .getOrgName()); List<NameNodeHolder> nameNodes = desc.getNameNodes(); Function func = desc.getFunction(); //分库分表函数,本质就是获取table节点中namenode节点的序号 for (Map<String, Object> item : parameters) { checkParameters(item, batchItem); int i = func.execute(nameNodes.size(), item); indexs.add(i); } if (indexs.size() > 0) { AnalyzeResult analyzeResult = batchItem.getAnalyzeResult(); HavingInfo havingInfo = analyzeResult.getHavingInfo(); if (havingInfo != null) { AnalyzerCallback callback = havingInfo.getCallback(); if (callback != null) { callback.call(); } } } if ((!batchItem.getAnalyzeResult().getAppendResultColumns().isEmpty() || batchItem .getAnalyzeResult().getLimit() != null) && indexs.size() > 1) {// 存在limit或者avg等聚集函数,需要重新生成sql Collection<AnalyzerCallback> analyzerCallbacks = batchItem .getAnalyzeResult().getAnalyzerCallbacks(); if (batchItem.getAnalyzeResult().getLimit() != null) { SqlValueItem limitItem = batchItem.getAnalyzeResult() .getLimit(); SqlValueItem offsetItem = batchItem.getAnalyzeResult() .getOffset(); if(offsetItem==null){ offsetItem=new SqlValueItem(); offsetItem.setValue(0); } if (limitItem.getParameterIndex() > 0 && offsetItem.getParameterIndex() > 0) {// limit ?,? Integer limitSize = limitItem.getValue() + offsetItem.getValue(); batchItem.getCallback(limitItem.getParameterIndex()) .setParameter(limitSize); batchItem.getCallback(offsetItem.getParameterIndex()) .setParameter(0); if(limitItem.getParameterIndex() > offsetItem.getParameterIndex()){ batchItem.getCallback(offsetItem.getParameterIndex()) .setParameterIndex(limitItem.getParameterIndex()); batchItem.getCallback(limitItem.getParameterIndex()) .setParameterIndex(offsetItem.getParameterIndex()); } } else if (limitItem.getParameterIndex() > 0) {// limit 1,? Integer limitSize = limitItem.getValue() + offsetItem.getValue(); batchItem.getCallback(limitItem.getParameterIndex()) .setParameter(limitSize); } else if (offsetItem.getParameterIndex() > 0) {// limit ?,10 batchItem.getCallback(offsetItem.getParameterIndex()) .setParameter(0); } } for (AnalyzerCallback callback : analyzerCallbacks) { callback.call(); } } else {// 在单库路由的情况下,如果检测到是limit ?,?,就置换Parameter顺序 SqlValueItem limitItem = batchItem.getAnalyzeResult() .getLimit(); SqlValueItem offsetItem = batchItem.getAnalyzeResult() .getOffset(); if(limitItem !=null && offsetItem !=null && limitItem.getParameterIndex() > offsetItem.getParameterIndex()){ batchItem.getCallback(offsetItem.getParameterIndex()) .setParameterIndex(limitItem.getParameterIndex()); batchItem.getCallback(limitItem.getParameterIndex()) .setParameterIndex(offsetItem.getParameterIndex()); } } for (Integer i : indexs) {// 生成target NameNode nameNode = configurations.getNameNode( tableInfo.getOrgName(), i); DefaultRouteTarget target = this.createTarget(batchItem, nameNode, tableInfo); targetSet.add(target); } for (RouteTarget item : targetSet) { DefaultRouteTarget target = (DefaultRouteTarget) item; SqlExecuteInfo info = new SqlExecuteInfo(); info.setCallbacks(new LinkedHashSet<ParameterCallback<?>>(batchItem .getCallbacks())); if (desc.isDifferentName()) { info.setExecuteSql(configurations.getGenerator().generate( (NameNodeHolder) target.getNameNode(), batchItem.getAnalyzeResult())); } else if ((!batchItem.getAnalyzeResult().getAppendResultColumns().isEmpty() || batchItem .getAnalyzeResult().getLimit() != null) && nameNodes.size() > 1) {// 存在limit或者avg等聚集函数,需要重新生成sql,必须要超过1个路由结果 info.setExecuteSql(configurations.getLimitAvgGenerator() .generate((NameNodeHolder) target.getNameNode(), batchItem.getAnalyzeResult())); } else { info.setExecuteSql(batchItem.getSql()); } target.setExecuteInfo(info); } return targetSet; }
(3)SimpleExecutor 和 HandlerFactory:
根据StatementContext的RouteTarget(路由数据),
新建事务并获取数据库连接, 实际执行JDBC curd操作的类.
兴趣点:发现doUpdate的时候有用同步工具类:
CyclicBarrier barrier = new CyclicBarrier(n);
@SuppressWarnings({ "rawtypes", "unchecked" }) public class SimpleExecutor implements Executor { static Logger logger = LoggerFactory.getLogger(SimpleExecutor.class); static final ExecuteHandler<Integer> deleteHandler = new DeleteExecuteHandler(); static final ExecuteHandler<Integer> insertHandler = new InsertExecuteHandler(); static final ExecuteHandler<Integer> updateHandler = new UpdateExecuteHandler(); static final ExecuteHandler<ResultSet> queryHandler = new QueryExecuteHandler(); @Override public Object execute(StatementContext context, ExecuteCallback callback) throws SQLException { switch (context.getCurrentBatch().getAnalyzeResult().getStatementType()) { case SELECT: return this.doQuery(context, callback); case INSERT: case UPDATE: case DELETE: return doUpdate(context, callback); default: break; } return null; } ExecuteHandler<?> getHandler(StatementType statementType) { switch (statementType) { case SELECT: return queryHandler; case INSERT: return insertHandler; case UPDATE: return updateHandler; case DELETE: return deleteHandler; default: break; } return null; } ...
5.集成Mybatis:
因为Mybatis获取连接是通过PooledDataSource或UnpooledDataSource获取的,所以写个插件:包装下DataSource,把oceanus的connentionWrap包装进去即可实现整合。
发表评论
-
shiro落地的设计复杂度(最后总结)
2018-06-19 17:22 547经过1周的源码研究,终于对shiro的原理有了深刻的理解,基于 ... -
shiro login成功后保存了哪些数据
2018-06-19 17:05 1421shiro login成功后 保存了Principals 和 ... -
shiro 会话原理分析
2018-06-19 12:40 15541、从哪里获取sessionid每次请求都会尝试获取ses ... -
shiro内部原理分析
2018-06-15 17:07 2350一句话总结:会话域Context一路收集principal ... -
Zookeeper入门-001 源码环境搭建
2018-03-15 11:47 8771.到github下载源码:https://github.c ... -
dubbo服务治理之路由规则研究
2018-01-31 15:50 21801.今天没太多事情,挤 ... -
shiro SecurityUtils.getSubject()深度分析
2018-01-12 17:38 488441.总的来说,SecurityUtils.getSubject ... -
@Async核心实现1 --------AsyncExecutionAspectSupport
2017-12-27 10:34 1946基本原理: 通过spring的扩展接口AbstractBea ... -
从零开始玩转JMX(1):简介和 Standard MBean
2017-08-23 15:20 0http://www.importnew.com/22299. ... -
java基础回顾
2017-08-15 11:21 0http://www.cnblogs.com/skywang1 ... -
mybatis-generator-maven-plugin 插件扩展 增加自定义方法
2017-08-10 16:50 0https://my.oschina.net/alexgaoy ... -
解决了DeferredResult请求长时间占用数据库连接的问题
2017-08-04 09:55 2321最近看了看开源项目appllo配置中心的源码,发现一个很有意思 ... -
httpclient发送webservice
2017-05-03 23:25 0http://aperise.iteye.com/blog/2 ... -
与大师面对面交流:Chris Richardson 来华布道微服务架构
2016-11-28 21:28 778http://www.daocloud.io/microser ... -
eclipse反编译
2016-11-23 20:48 0http://jingyan.baidu.com/articl ... -
spring security
2016-09-12 19:28 0http://www.importnew.com/5641.h ... -
浅析JPA中EntityManager无法remove entity的问题
2016-07-18 21:50 0http://rickqin.blog.51cto.com/1 ... -
Permission Denied(publickey) 解决
2016-07-14 19:18 27361.生成公钥和私钥放到C:\Users\itservice\. ... -
权限管理系统
2016-04-18 13:29 0http://git.oschina.net/ketayao/ ... -
Java工程师成神之路--面试必须复习的基础
2015-12-28 17:20 0http://www.open-open.com/news/v ...
相关推荐
资源来自pypi官网。 资源全名:tencentcloud-sdk-python-oceanus-3.0.448.tar.gz
资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.544.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.386.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.547.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.507.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.385.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.357.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.330.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
58同城的分布式数据库中间件架构设计与实现的介绍
58同城数据库中间件 关于DB中间件 在DB存储需求中,尽管业务不同,技术难点还是类似的,开源世界有很多DB中间件,解决方案也以通用方案为主,满足业务需要为前提,支持各种类型的需求。 Oceanus致力于打造一个功能...
1引言1.1 编写目的Oceanus 是 58 同城的数据库中间件,这篇文档介绍 Oceanus 的使用方法和注意事项,由于底层服务极为重要,请严格按照文档要求
依赖环境PHP 5.6.0版本及以上从腾讯云控制台开通相应产品获取SecretID,SecretKey以及调用地址(端点),端点为oceanus.tencentcloudapi.com,具体参考各产品说明。获取安装通过Composer安装通过Composer获取安装是...
snow ball 希望自己在日常工作学习中学会总结经验教训,提升专业技能 ...https://github.com/58code/Oceanus https://github.com/esimakin/twbs-pagination https://github.com/mtjs/mt https://github.com/
这次分享主要包含四个议题,会首先阐述一下腾讯在实时计算中使用Flink的历程,然后会简单介绍一下腾讯围绕Flink的产品化实践:我们打造了一个Oceanus平台,同时腾讯云也早已提供基于Flink的实时流计算服务,接着我们...
Oceanus:美团HTTP流量定制化路由的实践 ...................................................................... 47 UAS-点评侧用户行为检索系统 ................................................................
2023云原生峰会(公开)PPT汇总...云 Oceanus 对 Flink 云原生演进的实践与思考 助力构建低成本数据湖分析的最佳实践 7、云原生大数据AI一体化论坛 开源一站式云原生机器学习平台 云原生AI Paas平台的实践 ……等等。