diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index a939332f4c..30bf54ddf0 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -1315,6 +1315,21 @@ static void DeallocateCq(ibv_cq* cq) { LOG_IF(WARNING, 0 != err) << "Fail to destroy CQ: " << berror(err); } +static int DrainCq(ibv_cq* cq) { + if (NULL == cq) { + return 0; + } + + ibv_wc wc; + int ret; + do { + ret = ibv_poll_cq(cq, 1, &wc); + } while (ret > 0); + + LOG_IF(ERROR, ret < 0) << "drain CQ failed: " << ret; + return ret; +} + void RdmaEndpoint::DeallocateResources() { if (!_resource) { return; @@ -1341,6 +1356,7 @@ void RdmaEndpoint::DeallocateResources() { } bool remove_consumer = true; +_reclaim: if (!move_to_rdma_resource_list) { if (NULL != _resource->qp) { int err = IbvDestroyQp(_resource->qp); @@ -1384,6 +1400,24 @@ void RdmaEndpoint::DeallocateResources() { } if (move_to_rdma_resource_list) { + // When a QP is moved to the RESET state, all associated send and + // receive queues are flushed, meaning any outstanding WRs are effectively + // abandoned by the hardware. + // + // However, the CQ associated with that QP is *not* cleared automatically, + // meaning that it will still contain entries for WRs that completed before + // the reset. + // + // The application should finish polling the CQ to remove these obsolete + // entries before reusing the QP. + int ret = DrainCq(_resource->polling_cq); + ret += DrainCq(_resource->send_cq); + ret += DrainCq(_resource->recv_cq); + if (ret < 0) { + move_to_rdma_resource_list = false; + goto _reclaim; + } + BAIDU_SCOPED_LOCK(*g_rdma_resource_mutex); _resource->next = g_rdma_resource_list; g_rdma_resource_list = _resource;