mongo-connector原理及改造

By 刘志军 , 2014-10-24, 分类: python

mongodb

这段时间因为项目的某个需求需要改造mongo-connector,改造开源产品首先要读懂别人的代码,于是总结记录一下自己的分析过程。这里假设你对Solr有一定的了解。

mongo-connctor是一款用于同步MongoDB数据到其他系统组件,比如它能同步数据到Solr、ElasticSearch或者其他MongoDB集群中去。它的实现原理是依据MongoDB的Replica Set复制模式,通过分析oplog日志文件达到最终的同步目的。安装配置启动过程可参考官方文档。这篇文章具体来分析一下mongo-connetor的执行过程是怎样的。

oplog

首先需要了解oplog的格式,它的格式在不同版本的mongodb上有所区别,大致是:

PRIMARY> db.version()
2.2.2
PRIMARY> db.oplog.rs.findOne()
{
    "ts" : Timestamp(1364186197000, 58),
    "h" : NumberLong("-7878220425718087654"),
    "v" : 2,
    "op" : "u",
    "ns" : "exaitem_gmsbatchtask.jdgmsbatchtask",
    "o2" : {
        "_id" : "83f09a98-6a41-497b-a988-99ba5399d296"
    },
    "o" : {
        "_id" : "83f09a98-6a41-497b-a988-99ba5399d296",
        "status" : 2,
        "content" : "",
        "type" : 17,
        "business" : "832722",
        "optype" : 2,
        "addDate" : ISODate("2013-03-25T04:36:38.511Z"),
        "modifyDate" : ISODate("2013-03-25T04:36:39.131Z"),
        "source" : 5
    }

mongo-connector

mongo-connetor的核心目录结构是:

├── mongo_connector
   ├── __init__.py
   ├── compat.py
   ├── config.txt
   ├── connector.py
   ├── constants.py
   ├── doc_managers
      ├── __init__.py
      ├── config.txt
      ├── doc_manager_simulator.py
      ├── elastic_doc_manager.py
      ├── formatters.py
      ├── mongo_doc_manager.py
      ├── schema.xml
      ├── solr_doc_manager.py
      └── solr_doc_manager.pyc
   ├── errors.py
   ├── locking_dict.py
   ├── oplog_manager.py
   └── util.py

程序的入口就是connector.py,main方法通过从命令行中接收参数信息,参数信息可以用mongo-conntor --help查看,根据参数信息构建connector对象,connetor继承Thread,具体的执行流程如下:
mongo-connetor

基本的执行流程理解之后,我有这个一个需求,同mongodb同步到solr中的document中的部分field是手动加上去的,比如:用户的粉丝数量在mongodb中没有存储,而是放在redis中,此时有会通过其他方式把粉丝数同步到solr中去,同步之后,就会遇到一个问题,如果mongodb中的那条记录有更新操作,比如:该纪录修改了usename字段,但是他用的是mongodb中的赋值操作,而不是修改器$set,此时mongo-conntor会把这条记录的所有字段更新过去,相当于把对应solr中的那个document删除,重新索引,这是粉丝数就没有了。具体的细节可以通过查看solr_doc_manager.py文件来验证。也可以看看我从官方fork的一份代码中查看,这份代码做了些注视。

通过在apply_update方法中添加逻辑:

 #solr中有的字段但mongodb中没有的字段继续保留在solr中
 for key in doc:
     if key not in update_spec:
         update_spec[key] = doc[key]

就可以解决该问题了。


关注公众号「Python之禅」,回复「1024」免费获取Python资源

python之禅