eventdev: relax SMP barriers with C11 atomics

The impl_opaque field is shared between the timer arm and cancel
operations. Meanwhile, the state flag acts as a guard variable to
make sure the update of impl_opaque is synchronized. The original
code uses rte_smp barriers to achieve that. This patch uses C11
atomics with an explicit one-way memory barrier instead of full
barriers rte_smp_w/rmb() to avoid the unnecessary barrier on aarch64.

Since compilers can generate the same instructions for volatile and
non-volatile variable in C11 __atomics built-ins, so remain the volatile
keyword in front of state enum to avoid the ABI break issue.

Cc: stable@dpdk.org

Signed-off-by: Phil Yang <phil.yang@arm.com>
Reviewed-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
Acked-by: Erik Gabriel Carrillo <erik.g.carrillo@intel.com>
This commit is contained in:
Phil Yang 2020-07-07 23:54:53 +08:00 committed by Thomas Monjalon
parent e84d9c62c6
commit 030c216411

View file

@ -629,7 +629,8 @@ swtim_callback(struct rte_timer *tim)
sw->expired_timers[sw->n_expired_timers++] = tim;
sw->stats.evtim_exp_count++;
evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
__atomic_store_n(&evtim->state, RTE_EVENT_TIMER_NOT_ARMED,
__ATOMIC_RELEASE);
}
if (event_buffer_batch_ready(&sw->buffer)) {
@ -1020,6 +1021,7 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
int n_lcores;
/* Timer list for this lcore is not in use. */
uint16_t exp_state = 0;
enum rte_event_timer_state n_state;
#ifdef RTE_LIBRTE_EVENTDEV_DEBUG
/* Check that the service is running. */
@ -1060,30 +1062,36 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
}
for (i = 0; i < nb_evtims; i++) {
/* Don't modify the event timer state in these cases */
if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) {
n_state = __atomic_load_n(&evtims[i]->state, __ATOMIC_ACQUIRE);
if (n_state == RTE_EVENT_TIMER_ARMED) {
rte_errno = EALREADY;
break;
} else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED ||
evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
} else if (!(n_state == RTE_EVENT_TIMER_NOT_ARMED ||
n_state == RTE_EVENT_TIMER_CANCELED)) {
rte_errno = EINVAL;
break;
}
ret = check_timeout(evtims[i], adapter);
if (unlikely(ret == -1)) {
evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE;
__atomic_store_n(&evtims[i]->state,
RTE_EVENT_TIMER_ERROR_TOOLATE,
__ATOMIC_RELAXED);
rte_errno = EINVAL;
break;
} else if (unlikely(ret == -2)) {
evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY;
__atomic_store_n(&evtims[i]->state,
RTE_EVENT_TIMER_ERROR_TOOEARLY,
__ATOMIC_RELAXED);
rte_errno = EINVAL;
break;
}
if (unlikely(check_destination_event_queue(evtims[i],
adapter) < 0)) {
evtims[i]->state = RTE_EVENT_TIMER_ERROR;
__atomic_store_n(&evtims[i]->state,
RTE_EVENT_TIMER_ERROR,
__ATOMIC_RELAXED);
rte_errno = EINVAL;
break;
}
@ -1099,13 +1107,18 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
SINGLE, lcore_id, NULL, evtims[i]);
if (ret < 0) {
/* tim was in RUNNING or CONFIG state */
evtims[i]->state = RTE_EVENT_TIMER_ERROR;
__atomic_store_n(&evtims[i]->state,
RTE_EVENT_TIMER_ERROR,
__ATOMIC_RELEASE);
break;
}
rte_smp_wmb();
EVTIM_LOG_DBG("armed an event timer");
evtims[i]->state = RTE_EVENT_TIMER_ARMED;
/* RELEASE ordering guarantees the adapter specific value
* changes observed before the update of state.
*/
__atomic_store_n(&evtims[i]->state, RTE_EVENT_TIMER_ARMED,
__ATOMIC_RELEASE);
}
if (i < nb_evtims)
@ -1132,6 +1145,7 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
struct rte_timer *timp;
uint64_t opaque;
struct swtim *sw = swtim_pmd_priv(adapter);
enum rte_event_timer_state n_state;
#ifdef RTE_LIBRTE_EVENTDEV_DEBUG
/* Check that the service is running. */
@ -1143,16 +1157,18 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
for (i = 0; i < nb_evtims; i++) {
/* Don't modify the event timer state in these cases */
if (evtims[i]->state == RTE_EVENT_TIMER_CANCELED) {
/* ACQUIRE ordering guarantees the access of implementation
* specific opaque data under the correct state.
*/
n_state = __atomic_load_n(&evtims[i]->state, __ATOMIC_ACQUIRE);
if (n_state == RTE_EVENT_TIMER_CANCELED) {
rte_errno = EALREADY;
break;
} else if (evtims[i]->state != RTE_EVENT_TIMER_ARMED) {
} else if (n_state != RTE_EVENT_TIMER_ARMED) {
rte_errno = EINVAL;
break;
}
rte_smp_rmb();
opaque = evtims[i]->impl_opaque[0];
timp = (struct rte_timer *)(uintptr_t)opaque;
RTE_ASSERT(timp != NULL);
@ -1166,9 +1182,12 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
rte_mempool_put(sw->tim_pool, (void **)timp);
evtims[i]->state = RTE_EVENT_TIMER_CANCELED;
rte_smp_wmb();
/* The RELEASE ordering here pairs with atomic ordering
* to make sure the state update data observed between
* threads.
*/
__atomic_store_n(&evtims[i]->state, RTE_EVENT_TIMER_CANCELED,
__ATOMIC_RELEASE);
}
return i;