Concept

Zbus实现了一种线程间多对多的消息通信机制。

有三种类型的观察者:

  • Listener,event dispatcher每次发布或通知通道时,都会执行Listener的回调函数。是同步的。
  • Subscriber,内部依赖于消息队列,event dispatcher每次发布或通知通道时,都会在其中放置更改的channel的引用。注意,这种观察者本身并不接收消息。收到通知后应主动从通道中读取消息。Subscriber线程中需要主动从channel中读出当前的message。是异步的。
  • Message subscribers,event dispatcher每次发布或通知通道时,都会copy message到buf中再挂入Message subscribers的message fifo。Message subscribers线程中只需要从message fifo中取出message。是异步的。

For example,在下图中,Timer是Channel Trigger的Publisher,Sensor Thread是Subscriber,Blink是Listener。
当timer发布message后,会执行Blink的回调函数来闪烁,Sensor thread可以获取sensor数据。
Sensor thread处理完数据后,又是Sensor data channel的publisher,发布message后,core thread接收到message,来处理sensor data。
Core thread处理完后,通知LoRa thread,在最后一个channel处理完成后,再次调用Blink的回调函数来闪烁。

Figure 1


可以打开或关闭某个订阅者,某个订阅者也可以选择打开或关闭某个channel。

Figure 2

Virtual Distributed Event Dispatcher

假设一个场景,有一个channelA。T1为publisher,S1为Subscriber,L1,L2为Listener,MS1,MS2为Message Subscriber。

The VDED execution总是发生在publisher’s context, 基本流程为:

  • 给channel上锁
  • channel通过直接拷贝(memcpy)把new message拷贝过去。
  • VDED执行listener的回调函数,把message拷贝给message subscriber,把channel reference加入到subscriber的notification message queue。Listener可以通过zbus_chan_const_msg()直接获取message的reference。
  • 给channel解锁

如下图演示了执行流程,假设线程优先级为T1>MS1>MS2>S1:

可以看到Listener只直接引用channel里的message而不用拷贝,而Publisher会把message拷贝到Message subscriber。Subscriber则是当调度到的时候自己去拷贝message。

如果线程优先级为T1<MS1<MS2<S1:

注意此时的执行顺序,在f时,T1发布message,会立刻调度到MS1, 在g时,会立刻调度到MS2。

HLP priority boost

Zbus实现了自动提高publisher线程优先级的操作,选项为CONFIG_ZBUS_PRIORITY_BOOST,是默认自动打开的。这样能publisher线程不会像上面一样被打断,可以提高执行效率。

为了使用该特性,需要将observer attach到一个线程:

ZBUS_SUBSCRIBER_DEFINE(s1, 4);
void s1_thread(void *ptr1, void *ptr2, void *ptr3)
{
        const struct zbus_channel *chan;

        zbus_obs_attach_to_thread(&s1); // 将s1 attach到线程

        while (1) {
                zbus_sub_wait(&s1, &chan, K_FOREVER);

                /* Subscriber implementation */

        }
}
K_THREAD_DEFINE(s1_id, CONFIG_MAIN_STACK_SIZE, s1_thread, NULL, NULL, NULL, 2, 0, 0);

Limitation

考虑到Zbus的benchmark,不适用于传输高速流数据。

密集字节流传输考虑用message queue和pipes。

ZBus使用network buffers,需要考虑CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE 和 CONFIG_HEAP_MEM_POOL_SIZE的大小

如果在Subscriber读取channel之前publisher发布两次message到channel,第二次的数据会覆盖第一次。 Subscriber会收到两个notifications,但读出来的都是第二个数据,导致数据丢失。

这个问题目前只能给subscriber足够的时间处理?不要过快publish channel。

Message subscriber是可以保证数据不会丢失的,因为在publish的过程中把message挂入了fifo

源码分析

应用层

API

// 如果observer和channel是在别的文件定义的,需要先声明
#define ZBUS_OBS_DECLARE(...)
#define ZBUS_CHAN_DECLARE(...)
// 定义channel
#define ZBUS_CHAN_DEFINE(_name, _type, _validator, _user_data, _observers, _init_val)
// 定义subscriber
#define ZBUS_SUBSCRIBER_DEFINE(_name, _queue_size)
// 定义listener
#define ZBUS_LISTENER_DEFINE(_name, _cb)
// 定义message subscriber
#define ZBUS_MSG_SUBSCRIBER_DEFINE(_name)
// 发布channel
int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout);
// read channel
int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeout);
// 相当于channel的锁,同一时间只能有一个人能拥有channel
int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout);
// 与claim一起使用,释放channel
int zbus_chan_finish(const struct zbus_channel *chan);
// 和zbus_chan_pub类似,不过不会更改message,只会通知observers。
// 在claim和finish后可能有用?
int zbus_chan_notify(const struct zbus_channel *chan, k_timeout_t timeout);
// listener中用来获取channel中的message
static inline void *zbus_chan_msg(const struct zbus_channel *chan)
static inline const void *zbus_chan_const_msg(const struct zbus_channel *chan)
// runtime运行中动态增加observer
int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observer *obs, k_timeout_t timeout);
// runtime运行中动态删除observer
int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer *obs, k_timeout_t timeout);
// 设置observer开关
int zbus_obs_set_enable(struct zbus_observer *obs, bool enabled);
// 判断observer是否enable
static inline int zbus_obs_is_enabled(struct zbus_observer *obs, bool *enable)
// mask observer需要观察的channel, 参考Figure2
int zbus_obs_set_chan_notification_mask(const struct zbus_observer *obs,
					const struct zbus_channel *chan, bool masked);
// 判断是否channel是否被mask
int zbus_obs_is_chan_notification_masked(const struct zbus_observer *obs,
					 const struct zbus_channel *chan, bool *masked);
// 设置当前observer优先级为当前线程的优先级。只有当打开了CONFIG_ZBUS_PRIORITY_BOOST,需要设置observer的线程优先级。
int zbus_obs_attach_to_thread(const struct zbus_observer *obs);
// 恢复当前observer到默认优先级
int zbus_obs_detach_from_thread(const struct zbus_observer *obs);
// subscriber等channel发布的block函数
int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan, k_timeout_t timeout);
//
int zbus_sub_wait_msg(const struct zbus_observer *sub, const struct zbus_channel **chan, void *msg, k_timeout_t timeout);

Sample

samples/subsys/zbus/hello_world/

首先需要定义传递的message type以及channel。

// 第一个channel传递的message类型
struct version_msg {
	uint8_t major;
	uint8_t minor;
	uint16_t build;
};

// 第二个channel传递的message类型
struct acc_msg {
	int x;
	int y;
	int z;
};

// 定义第一个channel, 当publisher发布message到channel需要经过validator函数验证,这里为NULL。
ZBUS_CHAN_DEFINE(version_chan,       /* Name */
		 struct version_msg, /* Message type */
		 NULL,                 /* Validator */
		 NULL,                 /* User data */
		 ZBUS_OBSERVERS_EMPTY, /* observers */
		 ZBUS_MSG_INIT(.major = 0, .minor = 1,
			       .build = 2) /* Initial value major 0, minor 1, build 2 */
);

//定义第二个channel
ZBUS_CHAN_DEFINE(acc_data_chan,  /* Name */
		 struct acc_msg, /* Message type */
		 NULL,                                 /* Validator */
		 NULL,                                 /* User data */
		 ZBUS_OBSERVERS(foo_lis, bar_sub),     /* observers */
		 ZBUS_MSG_INIT(.x = 0, .y = 0, .z = 0) /* Initial value */
);

接着定义channel中observer的类型:

ZBUS_LISTENER_DEFINE(foo_lis, listener_callback_example);
ZBUS_SUBSCRIBER_DEFINE(bar_sub, 4);

实现层

Publisher发布channel

int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout)
{
	int err;

	if (k_is_in_isr()) { // 中断中需要立即publish message
		timeout = K_NO_WAIT;
	}

	//publish发布的限制时间
	k_timepoint_t end_time = sys_timepoint_calc(timeout);

	// 发布的message需要通过channel的validator函数
	if (chan->validator != NULL && !chan->validator(msg, chan->message_size)) {
		return -ENOMSG;
	}

	int context_priority = ZBUS_MIN_THREAD_PRIORITY;

	// 拷贝message到channel->message以及vded执行需要上锁,防止竞态
	err = chan_lock(chan, timeout, &context_priority);
	if (err) {
		return err;
	}
	// 拷贝数据
	memcpy(chan->message, msg, chan->message_size);
	//VDED开始执行,这个函数分析在下面
	err = _zbus_vded_exec(chan, end_time);

	chan_unlock(chan, context_priority);

	return err;
}

static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t end_time)
{
#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
	buf = _zbus_create_net_buf(&_zbus_msg_subscribers_pool, zbus_chan_msg_size(chan),
				   sys_timepoint_timeout(end_time));
	// 这里把channel中的数据拷贝到buf中
	net_buf_add_mem(buf, zbus_chan_msg(chan), zbus_chan_msg_size(chan));
#endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */

	// 通知这个channel的observers,根据不同类型执行不同流程。
	_zbus_notify_observer(chan, obs, end_time, buf);
}

static inline int _zbus_notify_observer(const struct zbus_channel *chan,
					const struct zbus_observer *obs, k_timepoint_t end_time,
					struct net_buf *buf)
{
	switch (obs->type) {
	// listener直接执行回调函数
	case ZBUS_OBSERVER_LISTENER_TYPE: {
		obs->callback(chan);
		break;
	}
	// subsriber是把channel的的地址传递到msgq中
	case ZBUS_OBSERVER_SUBSCRIBER_TYPE: {
		return k_msgq_put(obs->queue, &chan, sys_timepoint_timeout(end_time));
	}
#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
	case ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE: {
		// 克隆一个buf,用来传到message subscriber的message fifo中
		struct net_buf *cloned_buf = net_buf_clone(buf, sys_timepoint_timeout(end_time));
		// 在这里把该channel地址复制到buf->userdata中
		memcpy(net_buf_user_data(cloned_buf), &chan, sizeof(struct zbus_channel *));
		// 把buf挂入message subsriber的message_fifo中
		net_buf_put(obs->message_fifo, cloned_buf);

		break;
	}
#endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */

	return 0;
	}
}

Subscriber 接收channel

// subscriber接收线程:
static void subscriber_task(void)
{
	const struct zbus_channel *chan;

	while (!zbus_sub_wait(&bar_sub, &chan, K_FOREVER)) {
		struct acc_msg acc;
		// 确保只有一个线程在操作channel, 此时其他线程无法执行pub/read/notify...
		if (zbus_chan_claim(chan, K_FOREVER) != 0) {
			k_oops();
		}
		if (&acc_data_chan == chan) {
			zbus_chan_read(&acc_data_chan, &acc, K_MSEC(500));
		}
		zbus_chan_finish(chan);
	}
}

// 这个函数只能给subscriber使用:
int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan, k_timeout_t timeout)
{
	// 中断中不能睡眠等待
	_ZBUS_ASSERT(!k_is_in_isr(), "zbus_sub_wait cannot be used inside ISRs");
	_ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_SUBSCRIBER_TYPE, "sub must be a SUBSCRIBER");
	// 等待publisher发布channel
	return k_msgq_get(sub->queue, chan, timeout);
}

int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeout)
{
	if (k_is_in_isr()) {
		timeout = K_NO_WAIT;
	}

	int err = k_sem_take(&chan->data->sem, timeout);
	// 读出channel中保存的message
	memcpy(msg, chan->message, chan->message_size);

	k_sem_give(&chan->data->sem);

	return 0;
}

Message Subscriber接收channel

// Message subscriber接收线程:
static void msg_subscriber_task(void *sub)
{
	const struct zbus_channel *chan;
	struct acc_msg acc;
	const struct zbus_observer *subscriber = sub;

	while (!zbus_sub_wait_msg(subscriber, &chan, &acc, K_FOREVER)) {
		if (&acc_data_chan != chan) {
			LOG_ERR("Wrong channel %p!", chan);
			continue;
		}
		LOG_INF("From msg subscriber %s -> Acc x=%d, y=%d, z=%d", zbus_obs_name(subscriber),
			acc.x, acc.y, acc.z);
	}
}

int zbus_sub_wait_msg(const struct zbus_observer *sub, const struct zbus_channel **chan, void *msg,
		      k_timeout_t timeout)
{
	_ZBUS_ASSERT(!k_is_in_isr(), "zbus_sub_wait_msg cannot be used inside ISRs");
	_ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE,
		     "sub must be a MSG_SUBSCRIBER");
	_ZBUS_ASSERT(sub->message_fifo != NULL, "sub message_fifo is required");

	// 从publisher保存到message_fifo的Buf中读数据
	struct net_buf *buf = net_buf_get(sub->message_fifo, timeout);
	// 读出buf->user_data,在publish中保存的对应channel
	*chan = *((struct zbus_channel **)net_buf_user_data(buf));
	// copy buf中保存的message
	memcpy(msg, net_buf_remove_mem(buf, zbus_chan_msg_size(*chan)), zbus_chan_msg_size(*chan));

	net_buf_unref(buf);

	return 0;
}

Priority boost

// Todo:

Reference

https://docs.zephyrproject.org/latest/services/zbus/index.html
https://docs.zephyrproject.org/latest/samples/subsys/zbus/zbus.html
https://docs.zephyrproject.org/latest/samples/subsys/zbus/hello_world/README.html