`
ahua186186
  • 浏览: 554390 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

oceanus-58总体框架理解

 
阅读更多
1.总体思路:
通过约定的XML规则(分表分库规则)和 封装jdbc的Connection和PreparedStatement来实现SQL解析,sql路由和sql重写。

2.  3个核心类:ConnectionWrapper(JDBC Connection包装),PreparedStatementWrapper( JDBC PreparedStatement包装),SimpleExecutor(sql执行器,类似mybatis的SimpleExecutor)

3.  3个上下文传参数:ConnectionContext,StatementContext,transactionContext。

4.  真正干活的类:

(1)DefaultStatementContextBuilder类:解析SQL并保存BatchItem到StatementContext,--作者貌似直接用的mycat里面的sql解析的代码,直接拿来主义实现价值啊。
public StatementContext build(String sql, StatementContext context)
			throws SQLException {
		if (context == null) {
			context = new StatementContext();
			StatementContext.setContext(context);
			if (logger.isDebugEnabled()) {
				logger.debug("create context!sql=" + sql);
			}
		}
		if (context.getCurrentBatch().getSql() == null) {
			context.getCurrentBatch().setSql(sql);
		}

		StatementContextHandler handler = null;
		if (context.isBatch()) {
			handler = HandlerFactory.create(StatementType.BATCH);
			StatementContext resultContext = handler.handle(sql, context);
			processPreparedValues(resultContext);
			return resultContext;
		}
		
		TrackerExecutor.trackBegin(TrackPoint.PARSE_SQL, sql);
		SQLParser parser = StatementHelper.createSQLParser();
		try {
			DMLStatementNode statementNode = (DMLStatementNode) parser
					.parseStatement(sql);
			switch (statementNode.getNodeType()) {
			case NodeTypes.CURSOR_NODE:
				handler = HandlerFactory.create(StatementType.SELECT);
				break;
			case NodeTypes.DELETE_NODE:
				handler = HandlerFactory.create(StatementType.DELETE);
				break;
			case NodeTypes.UPDATE_NODE:
				handler = HandlerFactory.create(StatementType.UPDATE);
				break;
			case NodeTypes.INSERT_NODE:
				handler = HandlerFactory.create(StatementType.INSERT);
				break;
			case NodeTypes.CALL_STATEMENT_NODE:
				handler = HandlerFactory.create(StatementType.CALLABLE);
				break;
			}
			
			StatementContext resultContext = handler.handle(statementNode,
					context);
			
			TrackerExecutor.trackEnd(TrackPoint.PARSE_SQL);
			
			processPreparedValues(resultContext);
			return resultContext;

		} catch (StandardException se) {
			System.out.println("sql parse error, sql:"+sql);
			se.printStackTrace();
		} catch (Exception e) {
			e.printStackTrace();
		}
		processPreparedValues(context);
		return context;
	}

(2)DefaultTargetDispatcher:根据batchItem和batchItem中的tableInfo获取路由信息和重写SQL,达到路由到指定的分库分表的目的

Set<RouteTarget> getSpecifyTargets(TableInfo tableInfo, BatchItem batchItem) {
		Set<RouteTarget> targetSet = new LinkedHashSet<RouteTarget>();
		Configurations configurations = Configurations.getInstance();

		/**
		 * 解析where中符合分库分表字段的值
		 */
		Map<String, List<TableColumn>> resolveColumns = RouteHelper
				.getResolveColumns(tableInfo.getOrgName(),
						batchItem.getAnalyzeResult());
		List<Map<String, Object>> parameters = RouteHelper
				.getParameterValues(resolveColumns);
		Set<Integer> indexs = new HashSet<Integer>();
		TableDescription desc = configurations.getTableDescription(tableInfo
				.getOrgName());
		List<NameNodeHolder> nameNodes = desc.getNameNodes();
		Function func = desc.getFunction();
//分库分表函数,本质就是获取table节点中namenode节点的序号
		for (Map<String, Object> item : parameters) {
			checkParameters(item, batchItem);
			int i = func.execute(nameNodes.size(), item);
			indexs.add(i);
		}

		if (indexs.size() > 0) {
			AnalyzeResult analyzeResult = batchItem.getAnalyzeResult();
			HavingInfo havingInfo = analyzeResult.getHavingInfo();
			if (havingInfo != null) {
				AnalyzerCallback callback = havingInfo.getCallback();
				if (callback != null) {
					callback.call();
				}
			}
		}
		if ((!batchItem.getAnalyzeResult().getAppendResultColumns().isEmpty() || batchItem
				.getAnalyzeResult().getLimit() != null) && indexs.size() > 1) {// 存在limit或者avg等聚集函数,需要重新生成sql
			Collection<AnalyzerCallback> analyzerCallbacks = batchItem
					.getAnalyzeResult().getAnalyzerCallbacks();
			if (batchItem.getAnalyzeResult().getLimit() != null) {
				SqlValueItem limitItem = batchItem.getAnalyzeResult()
						.getLimit();
				SqlValueItem offsetItem = batchItem.getAnalyzeResult()
						.getOffset();
				if(offsetItem==null){
					offsetItem=new SqlValueItem();
					offsetItem.setValue(0);
				}
				if (limitItem.getParameterIndex() > 0
						&& offsetItem.getParameterIndex() > 0) {// limit ?,?
					Integer limitSize = limitItem.getValue()
							+ offsetItem.getValue();
					batchItem.getCallback(limitItem.getParameterIndex())
							.setParameter(limitSize);
					batchItem.getCallback(offsetItem.getParameterIndex())
							.setParameter(0);
					
					if(limitItem.getParameterIndex() > offsetItem.getParameterIndex()){
						batchItem.getCallback(offsetItem.getParameterIndex())
							.setParameterIndex(limitItem.getParameterIndex());
						
						batchItem.getCallback(limitItem.getParameterIndex())
							.setParameterIndex(offsetItem.getParameterIndex());
					}
				} else if (limitItem.getParameterIndex() > 0) {// limit 1,?
					Integer limitSize = limitItem.getValue()
							+ offsetItem.getValue();
					batchItem.getCallback(limitItem.getParameterIndex())
							.setParameter(limitSize);
				} else if (offsetItem.getParameterIndex() > 0) {// limit ?,10
					batchItem.getCallback(offsetItem.getParameterIndex())
							.setParameter(0);
				}
			}
			for (AnalyzerCallback callback : analyzerCallbacks) {
				callback.call();
			}
		} else {// 在单库路由的情况下,如果检测到是limit ?,?,就置换Parameter顺序
			SqlValueItem limitItem = batchItem.getAnalyzeResult()
					.getLimit();
			SqlValueItem offsetItem = batchItem.getAnalyzeResult()
					.getOffset();
			
			if(limitItem !=null && offsetItem !=null && 
					limitItem.getParameterIndex() > offsetItem.getParameterIndex()){
				
				batchItem.getCallback(offsetItem.getParameterIndex())
					.setParameterIndex(limitItem.getParameterIndex());
				
				batchItem.getCallback(limitItem.getParameterIndex())
					.setParameterIndex(offsetItem.getParameterIndex());
			}
		}
		for (Integer i : indexs) {// 生成target
			NameNode nameNode = configurations.getNameNode(
					tableInfo.getOrgName(), i);
			DefaultRouteTarget target = this.createTarget(batchItem, nameNode, tableInfo);
			targetSet.add(target);
		}

		
		for (RouteTarget item : targetSet) {
			DefaultRouteTarget target = (DefaultRouteTarget) item;
			SqlExecuteInfo info = new SqlExecuteInfo();
			info.setCallbacks(new LinkedHashSet<ParameterCallback<?>>(batchItem
					.getCallbacks()));

			if (desc.isDifferentName()) {
				info.setExecuteSql(configurations.getGenerator().generate(
						(NameNodeHolder) target.getNameNode(),
						batchItem.getAnalyzeResult()));
			} else if ((!batchItem.getAnalyzeResult().getAppendResultColumns().isEmpty() || batchItem
					.getAnalyzeResult().getLimit() != null) && nameNodes.size() > 1) {// 存在limit或者avg等聚集函数,需要重新生成sql,必须要超过1个路由结果
				info.setExecuteSql(configurations.getLimitAvgGenerator()
						.generate((NameNodeHolder) target.getNameNode(),
								batchItem.getAnalyzeResult()));
			} else {
				info.setExecuteSql(batchItem.getSql());
			}
			target.setExecuteInfo(info);
		}
		return targetSet;
	}


(3)SimpleExecutor 和 HandlerFactory:

根据StatementContext的RouteTarget(路由数据),

新建事务并获取数据库连接,   实际执行JDBC curd操作的类.

兴趣点:发现doUpdate的时候有用同步工具类:

CyclicBarrier barrier = new CyclicBarrier(n);



@SuppressWarnings({ "rawtypes", "unchecked" })
public class SimpleExecutor implements Executor {
	static Logger logger = LoggerFactory.getLogger(SimpleExecutor.class);
	static final ExecuteHandler<Integer> deleteHandler = new DeleteExecuteHandler();
	static final ExecuteHandler<Integer> insertHandler = new InsertExecuteHandler();
	static final ExecuteHandler<Integer> updateHandler = new UpdateExecuteHandler();
	static final ExecuteHandler<ResultSet> queryHandler = new QueryExecuteHandler();

	@Override
	public Object execute(StatementContext context, ExecuteCallback callback)
			throws SQLException {

		switch (context.getCurrentBatch().getAnalyzeResult().getStatementType()) {
		case SELECT:
			return this.doQuery(context, callback);
		case INSERT:
		case UPDATE:
		case DELETE:
			return doUpdate(context, callback);
		default:
			break;
		}
		return null;
	}

	ExecuteHandler<?> getHandler(StatementType statementType) {
		switch (statementType) {
		case SELECT:
			return queryHandler;
		case INSERT:
			return insertHandler;
		case UPDATE:
			return updateHandler;
		case DELETE:
			return deleteHandler;
		default:
			break;
		}
		return null;
	}
...


5.集成Mybatis:

因为Mybatis获取连接是通过PooledDataSource或UnpooledDataSource获取的,所以写个插件:包装下DataSource,把oceanus的connentionWrap包装进去即可实现整合。
分享到:
评论

相关推荐

    PyPI 官网下载 | tencentcloud-sdk-python-oceanus-3.0.448.tar.gz

    资源来自pypi官网。 资源全名:tencentcloud-sdk-python-oceanus-3.0.448.tar.gz

    Python库 | tencentcloud-sdk-python-oceanus-3.0.544.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.544.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Python库 | tencentcloud-sdk-python-oceanus-3.0.386.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.386.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Python库 | tencentcloud-sdk-python-oceanus-3.0.547.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.547.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Python库 | tencentcloud-sdk-python-oceanus-3.0.507.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.507.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Python库 | tencentcloud-sdk-python-oceanus-3.0.385.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.385.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Python库 | tencentcloud-sdk-python-oceanus-3.0.357.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.357.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Python库 | tencentcloud-sdk-python-oceanus-3.0.330.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.330.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    oceanus架构

    58同城的分布式数据库中间件架构设计与实现的介绍

    58同城数据库中间件-58同城数据库中间件

    58同城数据库中间件 关于DB中间件 在DB存储需求中,尽管业务不同,技术难点还是类似的,开源世界有很多DB中间件,解决方案也以通用方案为主,满足业务需要为前提,支持各种类型的需求。 Oceanus致力于打造一个功能...

    Oceanus 使用文档1

    1引言1.1 编写目的Oceanus 是 58 同城的数据库中间件,这篇文档介绍 Oceanus 的使用方法和注意事项,由于底层服务极为重要,请严格按照文档要求

    oceanus

    依赖环境PHP 5.6.0版本及以上从腾讯云控制台开通相应产品获取SecretID,SecretKey以及调用地址(端点),端点为oceanus.tencentcloudapi.com,具体参考各产品说明。获取安装通过Composer安装通过Composer获取安装是...

    snow-ball:杂录

    snow ball 希望自己在日常工作学习中学会总结经验教训,提升专业技能 ...https://github.com/58code/Oceanus https://github.com/esimakin/twbs-pagination https://github.com/mtjs/mt https://github.com/

    腾讯基于Flink的实时流计算平台演进之路

    这次分享主要包含四个议题,会首先阐述一下腾讯在实时计算中使用Flink的历程,然后会简单介绍一下腾讯围绕Flink的产品化实践:我们打造了一个Oceanus平台,同时腾讯云也早已提供基于Flink的实时流计算服务,接着我们...

    2018美团点评技术文章-后台篇

    Oceanus:美团HTTP流量定制化路由的实践 ...................................................................... 47 UAS-点评侧用户行为检索系统 ................................................................

    2023云原生峰会(公开)PPT汇总(24份).zip

    2023云原生峰会(公开)PPT汇总...云 Oceanus 对 Flink 云原生演进的实践与思考 助力构建低成本数据湖分析的最佳实践 7、云原生大数据AI一体化论坛 开源一站式云原生机器学习平台 云原生AI Paas平台的实践 ……等等。

Global site tag (gtag.js) - Google Analytics