今天给大家分享一篇,DPDK高性能无锁队列的实现,这才是真正实用,非常考验大家的工程能力的高级数据结构,很多人说算法和数据结构没有用,可能你只知道做算法题里面那些理想数据结构,不知道工作中的各种框架,虚拟机,标准库,操作系统为了高性能帮你做好了这一切。
一、dpdk的rte_ring简介
rte_ring的实质是FIFO的无锁环形队列,无锁队列的出队入队操作是rte_ring实现的关键。常用于多线程/多进程之间的通信。
ring的特点:
使用方法:
1.创建一个ring对象。
接口:structrte_ring *rte_ring_create(constchar *name,unsignedcount,intsocket_id,unsignedflags)其中:name:ring的namecount:ring队列的长度必须是2的幂次方socket_id:ring位于的socketflags:指定创建的ring的属性:单/多生产者、单/多消费者两者之间的组合;0表示使用默认属性(多生产者、多消费者),不同的属性出入队的操作会有所不同例如:struct rte_ring *r = rte_ring_create(“MY_RING”,1024,rte_socket_id(),0);
2.出入队
有不同的出入队方式(单、bulk、burst)都在rte_ring.h中。例如:rte_ring_enqueue和rte_ring_dequeue
这种数据结构与链表队列相比:
优点如下:
缺点如下:
二、rte_ring结构体分析
无锁环形队列的结构体如下:
struct{rte_ringTAILQ_ENTRY(rte_ring) next;/** Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI* compatibility requirements, it could be changed to RTE_RING_NAMESIZE* next time the ABI changes*//**charname[RTE_MEMZONE_NAMESIZE];/**intflags;/**conststructrte_memzone*memzone;/**/** Ring producer status. */structprod{uint32_twatermark;/**uint32_tsp_enqueue;/**uint32_tsize;/**uint32_tmask;/**// 生产者头尾指针,生产完成后都指向队尾volatileuint32_thead;/**volatiletail;uint32_t} prod __rte_cache_aligned;/**/** Ring consumer status. */structcons{uint32_tsc_dequeue;/**uint32_tsize;/**uint32_tmask;/**// 消费者头尾指针,生产完成后都指向队头volatileuint32_thead;/**volatileuint32_ttail;/**#ifdef RTE_RING_SPLIT_PROD_CONS} cons __rte_cache_aligned;#else} cons;#endif#ifdef RTE_LIBRTE_RING_DEBUGstructrte_ring_debug_statsstats*ring[] __rte_cache_aligned;[RTE_MAX_LCORE];#endif// 队列中保存的所有对象void};/*** not volatile so need to be careful* about compiler re-ordering */
dpdk在rte_ring_list链表中创建一个rte_tailq_entry节点,在memzone中根据队列的大小count申请一块内存(rte_ring的大小加上count*sizeof(void *))。紧邻着rte_ring结构的void *数组用于放置入队的对象(单纯的赋值指针值)。rte_ring结构中有生产者结构prod、消费者结构cons,初始化参数之后,把rte_tailq_entry的data节点指向rte_ring结构地址。
可以注意到cons.head、cons.tail、prod.head、prod.tail的类型都是uint32_t。除此之外,队列的大小count被限制为2的幂次方。这两个条件放到一起构成了一个很巧妙的情景。因为队列的大小一般不会有2的32次方那么大,所以,把队列取为32位的一个窗口,当窗口的大小是2的幂次方,则32位包含整数个窗口。
这样,用来存放ring对象的void *指针数组空间就可只申请一个窗口大小即可。根据二进制的回环性,可以直接用(uint32_t)( prod_tail \- cons_tail)计算队列中有多少生产的产品(即使溢出了也不会出错,如(uint32_t)5-65535 = 6)。
三、rte_ring实现多进程间通信
rte_ring需要与rte_mempool配合使用,通过rte_mempool来共享内存。dpdk多进程示例解读(examples/multi_process/simple_mp),实现进程之间的master和slave线程互发字串 :
intmain(intargc,char**argv){constunsignedflags =0;constunsignedring_size =64;constunsignedpool_size =1024;constunsignedpool_cache =32;constunsigned(rte_eal_process_type() == RTE_PROC_PRIMARY){ send_ring = rte_ring_create(_PRI_2_SEC, ring_size, rte_socket_id(), flags); recv_ring = rte_ring_create(_SEC_2_PRI, ring_size, rte_socket_id(), flags); message_pool = rte_mempool_create(_MSG_POOL, pool_size, STR_TOKEN_SIZE, pool_cache, priv_data_sz,priv_data_sz =0;intret;unsignedlcore_id; ret = rte_eal_init(argc, argv);if(ret 0) rte_exit(EXIT_FAILURE,"Cannot init EAL\n");//首先primary进程创建ring和mempoolif}RTE_LCORE_FOREACH_SLAVE(lcore_id) { rte_eal_remote_launch(lcore_recv,NULL,NULL,NULL,NULL, rte_socket_id(), flags);//secondary进程在primary进程启动后,通过rte_ring_lookup和rte_mempool_lookup来获取ring和mempool的地址//发送和接受队列的顺序跟primary相反else{ recv_ring = rte_ring_lookup(_PRI_2_SEC); send_ring = rte_ring_lookup(_SEC_2_PRI); message_pool = rte_mempool_lookup(_MSG_POOL); }if(send_ring ==NULL) rte_exit(EXIT_FAILURE,"Problem getting sending ring\n");if(recv_ring ==NULL) rte_exit(EXIT_FAILURE,"Problem getting receiving ring\n");if(message_pool ==NULL) rte_exit(EXIT_FAILURE,"Problem getting message pool\n"); RTE_LOG(INFO, APP,"Finished Process Init.\n");/* call lcore_recv() on every slave lcore */NULL, lcore_id); }/* call cmd prompt on master lcore */structcmdline*cl=cmdline_stdin_new(simple_mp_ctx, "\nsimple_mp> ");if(cl ==NULL) rte_exit(EXIT_FAILURE,"Cannot create cmdline instance\n"); cmdline_interact(cl); cmdline_stdin_exit(cl); rte_eal_mp_wait_lcore();return0;}
使用时,rte_mempool_get从mempool中获取一个对象,然后使用rte_ring_enqueue入队列,另一个进程通过rte_ring_dequeue来出队列,使用完成后需要rte_mempool_put将对象放回mempool:
send:
staticvoidcmd_send_parsed*(void*parsed_result, __attribute__((unused)) struct cmdline *cl, __attribute__((unused))void*data){void*msg =NULL;structcmd_send_result"Failed to get message buffer\n"res=parsed_result;if); strlcpy((char*)msg, res->message, STR_TOKEN_SIZE);if(rte_ring_enqueue(send_ring, msg) 0) {printf("Failed to send message - message discarded\n"); rte_mempool_put(message_pool, msg); }}
receive:
staticlcore_recvint5(__attribute__((unused))void*arg){unsignedlcore_id = rte_lcore_id();printf("Starting core %u\n", lcore_id);while(!quit){void*msg;if);continue; }printf("core %u: Received '%s'\n", lcore_id, (char*)msg); rte_mempool_put(message_pool, msg); }return0;}
四、实现多生产/消费者同时生产/消费(同时出入队)
1、多生产者入队流程:
当该消费者消费结束,且在此之前的消费也都结束后,移动cons.tail表示实际消费的位置
同样,移动cons.head表示消费者预定的消费数量
当该生产者生产结束,且在此之前的生产也都结束后,移动prod.tail表示实际生产的位置
移动prod.head表示生产者预定的生产数量
许多环在内存方面的成本比链表列表的成本更高。空环至少包含N个指针。
大小固定
CAS(Compare and Swap)是个原子操作
适用于批量入队/出队操作。因为指针存储在表中,多个对象出队并不会像链表队列那样产生大量的缓存未命中,此外,多个对象批量出队不会比单个对象出队开销大
比完全无锁队列简单
更快:比较void *大小的数据,只需要执行单次Compare-And-Swap指令,而不需要执行2次Compare-And-Swap指令
多消费/生产者同时出入队
无锁出入队(除了cas(compare and swap)操作)
4000520066 欢迎批评指正
All Rights Reserved 新浪公司 版权所有