Mqagent-1.0说明文档

    技术2024-12-03  24

    1        Mqagent 的用途

    Mqagent 是为了适应分布式 memcachq 的需求,在 magent 基础上进行二次开发形成,因为其时专门为 memcacheq 服务的,因此我命名为 mqagent

    其作为代理层,隔离了开发者和后端的多个 memcachq server ,使得后端对开发者透明,开发人员只需要和代理层,依据 memcache 的协议进行消息队列的 get/set 操作即可。

    2        Mqagent 使用 流程

    1 、开发者申请一个消息队列名称,例如 msg_deliver_email, 该消息队列主要用来处理异步发送邮件。

    2 mqagent 的维护者,需要根据开发者申请的消息队列的名称配置使用的后端的 memcacheq 的服务器的地址。例如:下面代码是 mqagent 配置文件中的【 tasks 】部分,主要配置 消息队列的名称和后端 memcachq 之间的映射关系。

          

    [tasks]

    sync_pre_process = 127.0.0.1:22201,127.0.0.1:22203

    advance_sync_pre_process = 127.0.0.1:22201,127.0.0.1:22203

           msg_deliver_email = 127.0.0.1:22024

           此时 kill -1 xxx, xxx mqagent pid 号码,这样就可以 reload 新的配置了。

           3 、设置完成后, mqagent 维护者返回给开发者配置好的 mqagent 的地址和端口号,及消息队列名称,二者基于下面介绍的协议就可以通信了。

    3        Mqagent 协议的设计

    总体来说 mqagent 协议是基于 memcache 协议的,但是基于项目的需求

    1 、为了保证对数据的一致性,对同一 jid 的操作需要保证时序性。

    2 、分布式的 memcacheq

    基于上面两点,我们改造了 set 的协议,其设计思想是,对比非分布式的 memcacheq ,分布式的 memcacheq 系统需要支持对同一 jid 的操作的时序性,因此需要将同一 jid 的消息放入同一个 memcacheq 中去,同时还需要按照传统的消息队列的格式指明消息队列的名称。简而言之,需要开发者传递两个参数, 1 、消息队列的名称, 2 Mqagent 进行一致性 hash 计算所需要的 key 值(这里为 jid )。

           对与 get 的改动不大,我们只需要对传递消息队列的名称即可。

    3.1      Set 协议

           <command name> <key> <flags> <exptime> <bytes>/r/n <data block>/r/n

    - <command name> set

        * set 意思是 “储存此数据”,这里是队列的 push 操作。

    - <key> 是接下来的客户端所要求储存的数据的键值,此时的 key 值为 jid ,主要是作为一致性 hash 计算的 key 值,而非消息队列的名称。

    - <flags> 是在取回内容时,与数据和发送块一同保存服务器上的任意 16 位无符号整形(用十进制来书写)。客户端可以用它作为“位域”来存储一些特定的信息;它对服务器是不透明的。

           - <bytes> 是随后的数据区块的字节长度,不包括用于分野的“ /r/n ”。它可以是 0 (这时后面跟随一个空的数据区块)。

    在这一行以后,客户端发送数据区块。    

    <data block>/r/n

    主要是对 <data block> 做了改动, mqagent <data block> 修改为 mq_name/<data block> data block 实际上包含了两部分,两部分用“ / ”分割,前半部分代表的是消息队列的名称,后半部分是消息队列的实体。

    例如: "sync_pre_process/ger@gozap.com:13488860631, [13488860637, add]"  block data 标示的是队列名称为 sync_pre_process ,队列内容为 ger@gozap.com:13488860631, [13488860637, add]

    3.2      Get 协议

    get <key>/r/n

    其中 key 为消息队列的名称。

    4        Mqagent 的程序实现

    4.1      配置文件

    [General]

    #max keep alive connections for one memcached server

    maxidle=20

     

    #max connection from  all clients

    maxconns=4096

     

    #mqagent port;

    port=11215

     

    #use ketama consistence hash

    useketama=true

     

    #daemon mode

    daemon_mode=true

     

    #logfile

    logdir=/home/saint

     

    [tasks]

    sync_pre_process = 127.0.0.1:22201,127.0.0.1:22203

    advance_sync_pre_process = 127.0.0.1:22201,127.0.0.1:22203

    sync_link_deal = 127.0.0.1:22202

    davance_sync_link_deal = 127.0.0.1:22204

    其中配置文件中【 General 】模块是继承了 magent 来实现的,而【 tasks 】模块是 mqagent 独有的,主要配置 消息队列的名称和后端 memcacheq 之间的映射关系。

    4.2      设计原理   

    读取配置文件,将每一个 tasks item ,作为一个 key-value 键值对存入到 hash table 中,使用者通过传递进来的 mq_name 就可以知道要使用哪些后端的机器。

    Set 命令,会解析出 block data 中的 task_name, 然后根据传入的 key 值,执行一致性 hash 算法,取得对应 hash table key 为该 task_name ,且序号为 hash 算法得出的 value 的值的 memcacheq 服务器的地址。依据 memcache 协议,重写 set 命令,传递到真正的 memcacheq 服务器中去。

    Get 命令,相对比较简单,使用的是 round-robin 算法,每个 get 传入的 key 值为消息队列的名称。通过 key 找到 hash table 中的后端的 memcacheq group ,然后轮询的执行 get 命令。

    5        客户端编写示例 (c)

    1、  仅支持memcached 的协议下的getset 命令。

    2、  Get 相对于队列的pop 操作,set 相当于队列的push 操作。

    3、  可以借助现有的 memcached client 的库来实现。

    #include "mt.h" #include<stdio.h> #include<string.h> #include<stdlib.h> #include<sys/types.h> #include<libmemcached/memcached.h> int main(int argc,char *argv[]) { memcached_st *memc; memcached_return rc; memcached_server_st *servers; memc = memcached_create(NULL); memcached_behavior_set(memc,MEMCACHED_BEHAVIOR_KETAMA,1); servers = memcached_server_list_append_with_weight(NULL,"localhost",11215,1,&rc); rc = memcached_server_push(memc,servers); memcached_server_free(servers); if(rc == MEMCACHED_SUCCESS) fprintf(stdout,"puse success/n"); char *key = "wangminghua@gozap.com1"; size_t lenval,rtv; uint32_t rtf; char *val = "advance_sync_pre_process/ger@gozap.com, [13488860631,add]"; rc = memcached_set(memc,key,strlen(key),val,strlen(val),(time_t)0,(uint32_t)0); char * testobj = memcached_get(memc,key,strlen(key),&rtv,&rtf,&rc); if(rc != MEMCACHED_SUCCESS){ fprintf(stderr,"can't find!/n"); exit(2); } //fprintf(stdout,"age =%d,id=%d,len %d/n",testobj->age,testobj->id,rtv); fprintf(stdout,"%s len is %d/n", testobj,rtv); testobj = memcached_get(memc,key,strlen(key),&rtv,&rtf,&rc); if(rc != MEMCACHED_SUCCESS){ fprintf(stderr,"can't find!/n"); exit(2); } fprintf(stdout,"%s len is %d/n", testobj,rtv); free(testobj); memcached_free(memc); }

    6        地址

    http://svn.gozap.com/svn/mqagent

    最新回复(0)