Mqagent 是为了适应分布式 memcachq 的需求,在 magent 基础上进行二次开发形成,因为其时专门为 memcacheq 服务的,因此我命名为 mqagent 。
其作为代理层,隔离了开发者和后端的多个 memcachq server ,使得后端对开发者透明,开发人员只需要和代理层,依据 memcache 的协议进行消息队列的 get/set 操作即可。
1 、开发者申请一个消息队列名称,例如 msg_deliver_email, 该消息队列主要用来处理异步发送邮件。
2 、 mqagent 的维护者,需要根据开发者申请的消息队列的名称配置使用的后端的 memcacheq 的服务器的地址。例如:下面代码是 mqagent 配置文件中的【 tasks 】部分,主要配置 消息队列的名称和后端 memcachq 之间的映射关系。
[tasks]
sync_pre_process = 127.0.0.1:22201,127.0.0.1:22203
advance_sync_pre_process = 127.0.0.1:22201,127.0.0.1:22203
msg_deliver_email = 127.0.0.1:22024
此时 kill -1 xxx, xxx 为 mqagent 的 pid 号码,这样就可以 reload 新的配置了。
3 、设置完成后, mqagent 维护者返回给开发者配置好的 mqagent 的地址和端口号,及消息队列名称,二者基于下面介绍的协议就可以通信了。
总体来说 mqagent 协议是基于 memcache 协议的,但是基于项目的需求
1 、为了保证对数据的一致性,对同一 jid 的操作需要保证时序性。
2 、分布式的 memcacheq 。
基于上面两点,我们改造了 set 的协议,其设计思想是,对比非分布式的 memcacheq ,分布式的 memcacheq 系统需要支持对同一 jid 的操作的时序性,因此需要将同一 jid 的消息放入同一个 memcacheq 中去,同时还需要按照传统的消息队列的格式指明消息队列的名称。简而言之,需要开发者传递两个参数, 1 、消息队列的名称, 2 、 Mqagent 进行一致性 hash 计算所需要的 key 值(这里为 jid )。
对与 get 的改动不大,我们只需要对传递消息队列的名称即可。
<command name> <key> <flags> <exptime> <bytes>/r/n <data block>/r/n
- <command name> 是 set
* set 意思是 “储存此数据”,这里是队列的 push 操作。
- <key> 是接下来的客户端所要求储存的数据的键值,此时的 key 值为 jid ,主要是作为一致性 hash 计算的 key 值,而非消息队列的名称。
- <flags> 是在取回内容时,与数据和发送块一同保存服务器上的任意 16 位无符号整形(用十进制来书写)。客户端可以用它作为“位域”来存储一些特定的信息;它对服务器是不透明的。
- <bytes> 是随后的数据区块的字节长度,不包括用于分野的“ /r/n ”。它可以是 0 (这时后面跟随一个空的数据区块)。
在这一行以后,客户端发送数据区块。
<data block>/r/n
主要是对 <data block> 做了改动, mqagent 将 <data block> 修改为 mq_name/<data block> 即 data block 实际上包含了两部分,两部分用“ / ”分割,前半部分代表的是消息队列的名称,后半部分是消息队列的实体。
例如: "sync_pre_process/ger@gozap.com:13488860631, [13488860637, add]" 该 block data 标示的是队列名称为 sync_pre_process ,队列内容为 ger@gozap.com:13488860631, [13488860637, add] 。
get <key>/r/n
其中 key 为消息队列的名称。
[General]
#max keep alive connections for one memcached server
maxidle=20
#max connection from all clients
maxconns=4096
#mqagent port;
port=11215
#use ketama consistence hash
useketama=true
#daemon mode
daemon_mode=true
#logfile
logdir=/home/saint
[tasks]
sync_pre_process = 127.0.0.1:22201,127.0.0.1:22203
advance_sync_pre_process = 127.0.0.1:22201,127.0.0.1:22203
sync_link_deal = 127.0.0.1:22202
davance_sync_link_deal = 127.0.0.1:22204
其中配置文件中【 General 】模块是继承了 magent 来实现的,而【 tasks 】模块是 mqagent 独有的,主要配置 消息队列的名称和后端 memcacheq 之间的映射关系。
读取配置文件,将每一个 tasks 的 item ,作为一个 key-value 键值对存入到 hash table 中,使用者通过传递进来的 mq_name 就可以知道要使用哪些后端的机器。
Set 命令,会解析出 block data 中的 task_name, 然后根据传入的 key 值,执行一致性 hash 算法,取得对应 hash table 中 key 为该 task_name ,且序号为 hash 算法得出的 value 的值的 memcacheq 服务器的地址。依据 memcache 协议,重写 set 命令,传递到真正的 memcacheq 服务器中去。
Get 命令,相对比较简单,使用的是 round-robin 算法,每个 get 传入的 key 值为消息队列的名称。通过 key 找到 hash table 中的后端的 memcacheq group ,然后轮询的执行 get 命令。
1、 仅支持memcached 的协议下的get ,set 命令。
2、 Get 相对于队列的pop 操作,set 相当于队列的push 操作。
3、 可以借助现有的 memcached 的 client 的库来实现。
#include "mt.h" #include<stdio.h> #include<string.h> #include<stdlib.h> #include<sys/types.h> #include<libmemcached/memcached.h> int main(int argc,char *argv[]) { memcached_st *memc; memcached_return rc; memcached_server_st *servers; memc = memcached_create(NULL); memcached_behavior_set(memc,MEMCACHED_BEHAVIOR_KETAMA,1); servers = memcached_server_list_append_with_weight(NULL,"localhost",11215,1,&rc); rc = memcached_server_push(memc,servers); memcached_server_free(servers); if(rc == MEMCACHED_SUCCESS) fprintf(stdout,"puse success/n"); char *key = "wangminghua@gozap.com1"; size_t lenval,rtv; uint32_t rtf; char *val = "advance_sync_pre_process/ger@gozap.com, [13488860631,add]"; rc = memcached_set(memc,key,strlen(key),val,strlen(val),(time_t)0,(uint32_t)0); char * testobj = memcached_get(memc,key,strlen(key),&rtv,&rtf,&rc); if(rc != MEMCACHED_SUCCESS){ fprintf(stderr,"can't find!/n"); exit(2); } //fprintf(stdout,"age =%d,id=%d,len %d/n",testobj->age,testobj->id,rtv); fprintf(stdout,"%s len is %d/n", testobj,rtv); testobj = memcached_get(memc,key,strlen(key),&rtv,&rtf,&rc); if(rc != MEMCACHED_SUCCESS){ fprintf(stderr,"can't find!/n"); exit(2); } fprintf(stdout,"%s len is %d/n", testobj,rtv); free(testobj); memcached_free(memc); }
http://svn.gozap.com/svn/mqagent