Mutex

Zephyr Mutex有下面几个重要特性:

  • lock count。该线程lock了mutux的次数。为0的时候表示unlock。
  • owning thread。

当一个线程拥有锁,其他线程在都在等待锁释放。一旦锁释放后,等待时间最长优先级最高的线程将被调度。

Zephyr中ISRs中不允许使用Mutex。

Reetrant Locking

Zephyr Mutex是可重入锁,可多次加锁。

这样拥有锁的线程可以访问相关资源,不用管是否上锁。

可重入锁的目的就是防止死锁,导致同一个线程不可重入上锁代码段,目的就是让同一个线程可以重新进入上锁代码段。

Priority Inheritance

优先级继承。如果当前线程拥有锁,此时来了一个优先级更高的线程想获取锁,该线程会开始block等待。

此时系统会把拥有锁的线程优先级提高到和等待的线程相同,来尽快完成临界区的任务。一旦释放了锁,线程优先级又会调回去。

CONFIG_PRIORITY_CEILING用来配置最高提高到的优先级等级,默认为0为无限制。

Implementation

Defining a Mutex

struct k_mutex my_mutex;

k_mutex_init(&my_mutex);
//或
K_MUTEX_DEFINE(my_mutex);

Locking a Mutex

k_mutex_lock(&my_mutex, K_FOREVER); // 无限阻塞等待获取锁

if (k_mutex_lock(&my_mutex, K_MSEC(100)) == 0) { // 等待100ms,没获取到返回
    /* mutex successfully locked */
} else {
    printf("Cannot lock XYZ display\n");
}

Unlocking a Mutex

k_mutex_unlock(&my_mutex);

Futex

Futex(fast userspace mutex)是比Mutex更轻量级的互斥访问原语。是一种用于用户空间应用程序的通用同步工具。

int k_futex_wait(struct k_futex *futex, int expected, k_timeout_t timeout)
int k_futex_wake(struct k_futex *futex, bool wake_all)

API Reference

User Space Mutex API 把k_xxx前缀换成sys_xxx,比如sys_mutex_lock()

在没打开CONFIG_USERSPACE的情况下,sys_mutex_lock()会等同于k_mutex_lock()

源码分析

去除了一些kobject,以及tracing相关的代码。

kernel.h mutex结构体:

struct k_mutex {
	/** Mutex wait queue */
	_wait_q_t wait_q;
	/** Mutex owner */
	struct k_thread *owner;
	/** Current lock count */
	uint32_t lock_count;
	/** Original thread priority */
	int owner_orig_prio;
};

mutex.c

int z_impl_k_mutex_init(struct k_mutex *mutex)
{
	mutex->owner = NULL;
	mutex->lock_count = 0U;

	z_waitq_init(&mutex->wait_q); // 初始化wait queue

	return 0;
}
int z_impl_k_mutex_lock(struct k_mutex *mutex, k_timeout_t timeout)
{
	int new_prio;
	k_spinlock_key_t key;
	bool resched = false;

	// zephyr在中断中不允许使用锁,因为zephyr的mutex并没有禁止中断,如果在中断中使用mutex,会导致死锁。
	__ASSERT(!arch_is_in_isr(), "mutexes cannot be used inside ISRs");

	key = k_spin_lock(&lock); // 一个全局spin lock,用来保护改变mutex结构体中的变量的临界区。

	// 对应第一次拿到锁和重入锁的两种情况
	if (likely((mutex->lock_count == 0U) || (mutex->owner == _current))) {
		// 如果线程是第一次拿到锁,mutex->owner_orig_prio为当前线程的优先级。
		// 如果是重入锁(lock_count!=0),mutex->owner_orig_prio为之前保存的线程优先级。
		mutex->owner_orig_prio = (mutex->lock_count == 0U) ?
					_current->base.prio :
					mutex->owner_orig_prio;

		mutex->lock_count++;
		mutex->owner = _current; // owner为当前线程

		k_spin_unlock(&lock, key);

		return 0;
	}

	// 如果传入的timeout为0,且没拿到锁,返回-EBUSY
	if (unlikely(K_TIMEOUT_EQ(timeout, K_NO_WAIT))) {
		k_spin_unlock(&lock, key);

		return -EBUSY;
	}
	// 如果当前线程没拿到锁,返回当前线程和拥有mutex线程两者中更高的优先级
	new_prio = new_prio_for_inheritance(_current->base.prio,
					    mutex->owner->base.prio);
	// 如果拥有mutex线程的优先级比当前线程低,提高拥有mutex线程的优先级到和当前线程一样,让其尽快执行,并返回resched=True。
	if (z_is_prio_higher(new_prio, mutex->owner->base.prio)) {
		resched = adjust_owner_prio(mutex, new_prio);
	}
	//将当前线程从内核ready queue删除,加入到mutex->wait queue(所有等待线程以优先级排序),并且设置为pending状态,等待timeout时间后再设置为ready状态查看其他线程是否将z_pend_curr的返回值设置为0了(在unlock函数中通过arch_thread_return_value_set)
	//如果timeout传入的是forever,会在这里切换线程,并且当前线程不会再被调度,只有等待unlock来唤醒。z_pend_curr可以参考reference。
	int got_mutex = z_pend_curr(&lock, key, &mutex->wait_q, timeout);
	// 如果等待timeout时间后拿到了锁,返回0
	if (got_mutex == 0) {
		return 0;
	}

	/* timed out */
	// 这里只有timeout设置了一定时间,并且到了时间仍然没拿到锁才会走到这。
	LOG_DBG("%p timeout on mutex %p", _current, mutex);

	key = k_spin_lock(&lock);

	/*
	 * Check if mutex was unlocked after this thread was unpended.
	 * If so, skip adjusting owner's priority down.
	 */
	if (likely(mutex->owner != NULL)) {
		struct k_thread *waiter = z_waitq_head(&mutex->wait_q);
		// 将当前线程优先级设置为等待队列中头部最高waiter的优先级。
		new_prio = (waiter != NULL) ?
			new_prio_for_inheritance(waiter->base.prio, mutex->owner_orig_prio) :
			mutex->owner_orig_prio;

		LOG_DBG("adjusting prio down on mutex %p", mutex);
		/* 如果调整后的优先级 new_prio 和Mutex持有者优先级不一致,说明继承的优先级更高,
		 * 或者前面提高了持有者的优先级,现在上锁超时,需要将优先级恢复至原有值,
		 * 这两种情况会改变持有者优先级,在此必须进行一次上下文切换,使新的优先级立即生效
		 */
		resched = adjust_owner_prio(mutex, new_prio) || resched;
	}

	if (resched) {
		z_reschedule(&lock, key);
	} else {
		k_spin_unlock(&lock, key);
	}

	return -EAGAIN;
}
int z_impl_k_mutex_unlock(struct k_mutex *mutex)
{
	struct k_thread *new_owner;

	__ASSERT(!arch_is_in_isr(), "mutexes cannot be used inside ISRs");

	/*
	 * Attempt to unlock a mutex which is unlocked. mutex->lock_count
	 * cannot be zero if the current thread is equal to mutex->owner,
	 * therefore no underflow check is required. Use assert to catch
	 * undefined behavior.
	 */
	__ASSERT_NO_MSG(mutex->lock_count > 0U);

	LOG_DBG("mutex %p lock_count: %d", mutex, mutex->lock_count);

	/*
	 * If we are the owner and count is greater than 1, then decrement
	 * the count and return and keep current thread as the owner.
	 */
	if (mutex->lock_count > 1U) {
		mutex->lock_count--;
		goto k_mutex_unlock_return;
	}

	k_spinlock_key_t key = k_spin_lock(&lock);

	adjust_owner_prio(mutex, mutex->owner_orig_prio);

	/* Get the new owner, if any */
	// 唤醒等待队列中在mutex_lock中进入睡眠优先级最高的线程
	new_owner = z_unpend_first_thread(&mutex->wait_q);

	mutex->owner = new_owner;

	LOG_DBG("new owner of mutex %p: %p (prio: %d)",
		mutex, new_owner, new_owner ? new_owner->base.prio : -1000);

	if (new_owner != NULL) {
		/*
		 * new owner is already of higher or equal prio than first
		 * waiter since the wait queue is priority-based: no need to
		 * adjust its priority
		 */
		mutex->owner_orig_prio = new_owner->base.prio;
		// 设置好切换回mutex_lock线程的返回值
		arch_thread_return_value_set(new_owner, 0);
		// 重新加入kernel ready_q中,等待调度
		z_ready_thread(new_owner);
		// 调度在睡眠中的线程
		z_reschedule(&lock, key);
	} else {
		mutex->lock_count = 0U;
		k_spin_unlock(&lock, key);
	}


k_mutex_unlock_return:

	return 0;
}

Reference

Zephyr线程阻塞和超时机制分析