Condition Variables
Concept
条件变量基本上是一个线程队列,当某些执行状态不符合预期时,线程可以将自己放入该队列中。
k_condvar_wait()
:
- Releases the last acquired mutex.
- Puts the current thread in the condition variable queue.
其他某个线程在更改所述状态时,可以唤醒一个(或多个)等待线程,通过k_condvar_signal()
or k_condvar_broadcast()
:
- Re-acquires the mutex previously released.
- Returns from k_condvar_wait().
Implementation
Defining a Condition Variable
struct k_condvar my_condvar;
k_condvar_init(&my_condvar); // 或
K_CONDVAR_DEFINE(my_condvar);
Waiting on a Condition Variable
k_mutex_lock(&mutex, K_FOREVER);
/* block this thread until another thread signals cond. While
* blocked, the mutex is released, then re-acquired before this
* thread is woken up and the call returns.
*/
k_condvar_wait(&condvar, &mutex, K_FOREVER);
...
k_mutex_unlock(&mutex);
Signaling a Condition Variable
k_mutex_lock(&mutex, K_FOREVER);
/*
* Do some work and fulfill the condition
*/
k_condvar_signal(&condvar);
k_mutex_unlock(&mutex);
Zephyr源码分析
Init函数初始化条件变量的wait queue。
struct k_condvar {
_wait_q_t wait_q;
};
int z_impl_k_condvar_init(struct k_condvar *condvar)
{
z_waitq_init(&condvar->wait_q);
return 0;
}
int z_impl_k_condvar_wait(struct k_condvar *condvar, struct k_mutex *mutex, k_timeout_t timeout)
{
k_spinlock_key_t key;
int ret;
key = k_spin_lock(&lock);
k_mutex_unlock(mutex);
//将当前线程加入wait_q,
ret = z_pend_curr(&lock, key, &condvar->wait_q, timeout);
k_mutex_lock(mutex, K_FOREVER);
return ret;
}
int z_impl_k_condvar_signal(struct k_condvar *condvar)
{
k_spinlock_key_t key = k_spin_lock(&lock);
// 唤醒wait_q优先级最高的线程
struct k_thread *thread = z_unpend_first_thread(&condvar->wait_q);
if (thread != NULL) {
arch_thread_return_value_set(thread, 0);
z_ready_thread(thread);
// 重新调度被唤醒的这个线程
z_reschedule(&lock, key);
} else {
k_spin_unlock(&lock, key);
}
return 0;
}
唤醒所有等待线程:
int z_impl_k_condvar_broadcast(struct k_condvar *condvar)
Semaphores
Concept
信号量有下面两个关键的属性:
A count that indicates the number of times the semaphore can be taken. A count of zero indicates that the semaphore is unavailable.
A limit that indicates the maximum value the semaphore’s count can reach.
Implementation
Defining a Semaphore
struct k_sem my_sem;
k_sem_init(&my_sem, 0, 1); // 或
K_SEM_DEFINE(my_sem, 0, 1);
Giving a Semaphore
k_sem_give(&my_sem);
Taking a Semaphore
if (k_sem_take(&my_sem, K_MSEC(50)) != 0) {
printk("Input data not available!");
} else {
/* fetch available data */
}
Zephyr源码分析
struct k_sem {
_wait_q_t wait_q;
unsigned int count;
unsigned int limit;
sys_dlist_t poll_events;
};
int z_impl_k_sem_init(struct k_sem *sem, unsigned int initial_count,
unsigned int limit)
{
/*
* Limit cannot be zero and count cannot be greater than limit
*/
CHECKIF(limit == 0U || limit > K_SEM_MAX_LIMIT || initial_count > limit) {
return -EINVAL;
}
sem->count = initial_count;
sem->limit = limit;
z_waitq_init(&sem->wait_q);
#if defined(CONFIG_POLL)
sys_dlist_init(&sem->poll_events);
#endif
return 0;
}
int z_impl_k_sem_take(struct k_sem *sem, k_timeout_t timeout)
{
int ret = 0;
__ASSERT(((arch_is_in_isr() == false) ||
K_TIMEOUT_EQ(timeout, K_NO_WAIT)), "");
k_spinlock_key_t key = k_spin_lock(&lock);
// 成功拿到semaphore
if (likely(sem->count > 0U)) {
sem->count--;
k_spin_unlock(&lock, key);
ret = 0;
goto out;
}
// timeout为NO_WAIT的话直接返回BUSY
if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
k_spin_unlock(&lock, key);
ret = -EBUSY;
goto out;
}
// 将当前线程移除kernel ready queue, 加入wait_q,进入pending状态等待其他线程唤醒。
ret = z_pend_curr(&lock, key, &sem->wait_q, timeout);
out:
return ret;
}
void z_impl_k_sem_give(struct k_sem *sem)
{
k_spinlock_key_t key = k_spin_lock(&lock);
struct k_thread *thread;
bool resched = true;
// 唤醒wait queue中优先级最高的线程
thread = z_unpend_first_thread(&sem->wait_q);
if (thread != NULL) {
arch_thread_return_value_set(thread, 0);
// 重新加入调度
z_ready_thread(thread);
} else {
// 如果没有线程在等待semaphore,count++
sem->count += (sem->count != sem->limit) ? 1U : 0U;
resched = handle_poll_events(sem);
}
if (resched) {
z_reschedule(&lock, key);
} else {
k_spin_unlock(&lock, key);
}
}
唤醒所有等待线程,并将semaphore的count值设为0。void z_impl_k_sem_reset(struct k_sem *sem)