Concept
Zbus实现了一种线程间多对多的消息通信机制。
有三种类型的观察者:
- Listener,event dispatcher每次发布或通知通道时,都会执行Listener的回调函数。是同步的。
- Subscriber,内部依赖于消息队列,event dispatcher每次发布或通知通道时,都会在其中放置更改的channel的引用。注意,这种观察者本身并不接收消息。收到通知后应主动从通道中读取消息。Subscriber线程中需要主动从channel中读出当前的message。是异步的。
- Message subscribers,event dispatcher每次发布或通知通道时,都会copy message到buf中再挂入Message subscribers的message fifo。Message subscribers线程中只需要从message fifo中取出message。是异步的。
For example,在下图中,Timer是Channel Trigger的Publisher,Sensor Thread是Subscriber,Blink是Listener。
当timer发布message后,会执行Blink的回调函数来闪烁,Sensor thread可以获取sensor数据。
Sensor thread处理完数据后,又是Sensor data channel的publisher,发布message后,core thread接收到message,来处理sensor data。
Core thread处理完后,通知LoRa thread,在最后一个channel处理完成后,再次调用Blink的回调函数来闪烁。
可以打开或关闭某个订阅者,某个订阅者也可以选择打开或关闭某个channel。
Virtual Distributed Event Dispatcher
假设一个场景,有一个channelA。T1为publisher,S1为Subscriber,L1,L2为Listener,MS1,MS2为Message Subscriber。
The VDED execution总是发生在publisher’s context, 基本流程为:
- 给channel上锁
- channel通过直接拷贝(memcpy)把new message拷贝过去。
- VDED执行listener的回调函数,把message拷贝给message subscriber,把channel reference加入到subscriber的notification message queue。Listener可以通过
zbus_chan_const_msg()
直接获取message的reference。 - 给channel解锁
如下图演示了执行流程,假设线程优先级为T1>MS1>MS2>S1:
可以看到Listener只直接引用channel里的message而不用拷贝,而Publisher会把message拷贝到Message subscriber。Subscriber则是当调度到的时候自己去拷贝message。
如果线程优先级为T1<MS1<MS2<S1:
注意此时的执行顺序,在f时,T1发布message,会立刻调度到MS1, 在g时,会立刻调度到MS2。
HLP priority boost
Zbus实现了自动提高publisher线程优先级的操作,选项为CONFIG_ZBUS_PRIORITY_BOOST
,是默认自动打开的。这样能publisher线程不会像上面一样被打断,可以提高执行效率。
为了使用该特性,需要将observer attach到一个线程:
ZBUS_SUBSCRIBER_DEFINE(s1, 4);
void s1_thread(void *ptr1, void *ptr2, void *ptr3)
{
const struct zbus_channel *chan;
zbus_obs_attach_to_thread(&s1); // 将s1 attach到线程
while (1) {
zbus_sub_wait(&s1, &chan, K_FOREVER);
/* Subscriber implementation */
}
}
K_THREAD_DEFINE(s1_id, CONFIG_MAIN_STACK_SIZE, s1_thread, NULL, NULL, NULL, 2, 0, 0);
Limitation
考虑到Zbus的benchmark,不适用于传输高速流数据。
密集字节流传输考虑用message queue和pipes。
ZBus使用network buffers,需要考虑CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE 和 CONFIG_HEAP_MEM_POOL_SIZE的大小
如果在Subscriber读取channel之前publisher发布两次message到channel,第二次的数据会覆盖第一次。 Subscriber会收到两个notifications,但读出来的都是第二个数据,导致数据丢失。
这个问题目前只能给subscriber足够的时间处理?不要过快publish channel。
Message subscriber是可以保证数据不会丢失的,因为在publish的过程中把message挂入了fifo
源码分析
应用层
API
// 如果observer和channel是在别的文件定义的,需要先声明
#define ZBUS_OBS_DECLARE(...)
#define ZBUS_CHAN_DECLARE(...)
// 定义channel
#define ZBUS_CHAN_DEFINE(_name, _type, _validator, _user_data, _observers, _init_val)
// 定义subscriber
#define ZBUS_SUBSCRIBER_DEFINE(_name, _queue_size)
// 定义listener
#define ZBUS_LISTENER_DEFINE(_name, _cb)
// 定义message subscriber
#define ZBUS_MSG_SUBSCRIBER_DEFINE(_name)
// 发布channel
int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout);
// read channel
int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeout);
// 相当于channel的锁,同一时间只能有一个人能拥有channel
int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout);
// 与claim一起使用,释放channel
int zbus_chan_finish(const struct zbus_channel *chan);
// 和zbus_chan_pub类似,不过不会更改message,只会通知observers。
// 在claim和finish后可能有用?
int zbus_chan_notify(const struct zbus_channel *chan, k_timeout_t timeout);
// listener中用来获取channel中的message
static inline void *zbus_chan_msg(const struct zbus_channel *chan)
static inline const void *zbus_chan_const_msg(const struct zbus_channel *chan)
// runtime运行中动态增加observer
int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observer *obs, k_timeout_t timeout);
// runtime运行中动态删除observer
int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer *obs, k_timeout_t timeout);
// 设置observer开关
int zbus_obs_set_enable(struct zbus_observer *obs, bool enabled);
// 判断observer是否enable
static inline int zbus_obs_is_enabled(struct zbus_observer *obs, bool *enable)
// mask observer需要观察的channel, 参考Figure2
int zbus_obs_set_chan_notification_mask(const struct zbus_observer *obs,
const struct zbus_channel *chan, bool masked);
// 判断是否channel是否被mask
int zbus_obs_is_chan_notification_masked(const struct zbus_observer *obs,
const struct zbus_channel *chan, bool *masked);
// 设置当前observer优先级为当前线程的优先级。只有当打开了CONFIG_ZBUS_PRIORITY_BOOST,需要设置observer的线程优先级。
int zbus_obs_attach_to_thread(const struct zbus_observer *obs);
// 恢复当前observer到默认优先级
int zbus_obs_detach_from_thread(const struct zbus_observer *obs);
// subscriber等channel发布的block函数
int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan, k_timeout_t timeout);
//
int zbus_sub_wait_msg(const struct zbus_observer *sub, const struct zbus_channel **chan, void *msg, k_timeout_t timeout);
Sample
samples/subsys/zbus/hello_world/
首先需要定义传递的message type以及channel。
// 第一个channel传递的message类型
struct version_msg {
uint8_t major;
uint8_t minor;
uint16_t build;
};
// 第二个channel传递的message类型
struct acc_msg {
int x;
int y;
int z;
};
// 定义第一个channel, 当publisher发布message到channel需要经过validator函数验证,这里为NULL。
ZBUS_CHAN_DEFINE(version_chan, /* Name */
struct version_msg, /* Message type */
NULL, /* Validator */
NULL, /* User data */
ZBUS_OBSERVERS_EMPTY, /* observers */
ZBUS_MSG_INIT(.major = 0, .minor = 1,
.build = 2) /* Initial value major 0, minor 1, build 2 */
);
//定义第二个channel
ZBUS_CHAN_DEFINE(acc_data_chan, /* Name */
struct acc_msg, /* Message type */
NULL, /* Validator */
NULL, /* User data */
ZBUS_OBSERVERS(foo_lis, bar_sub), /* observers */
ZBUS_MSG_INIT(.x = 0, .y = 0, .z = 0) /* Initial value */
);
接着定义channel中observer的类型:
ZBUS_LISTENER_DEFINE(foo_lis, listener_callback_example);
ZBUS_SUBSCRIBER_DEFINE(bar_sub, 4);
实现层
Publisher发布channel
int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout)
{
int err;
if (k_is_in_isr()) { // 中断中需要立即publish message
timeout = K_NO_WAIT;
}
//publish发布的限制时间
k_timepoint_t end_time = sys_timepoint_calc(timeout);
// 发布的message需要通过channel的validator函数
if (chan->validator != NULL && !chan->validator(msg, chan->message_size)) {
return -ENOMSG;
}
int context_priority = ZBUS_MIN_THREAD_PRIORITY;
// 拷贝message到channel->message以及vded执行需要上锁,防止竞态
err = chan_lock(chan, timeout, &context_priority);
if (err) {
return err;
}
// 拷贝数据
memcpy(chan->message, msg, chan->message_size);
//VDED开始执行,这个函数分析在下面
err = _zbus_vded_exec(chan, end_time);
chan_unlock(chan, context_priority);
return err;
}
static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t end_time)
{
#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
buf = _zbus_create_net_buf(&_zbus_msg_subscribers_pool, zbus_chan_msg_size(chan),
sys_timepoint_timeout(end_time));
// 这里把channel中的数据拷贝到buf中
net_buf_add_mem(buf, zbus_chan_msg(chan), zbus_chan_msg_size(chan));
#endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
// 通知这个channel的observers,根据不同类型执行不同流程。
_zbus_notify_observer(chan, obs, end_time, buf);
}
static inline int _zbus_notify_observer(const struct zbus_channel *chan,
const struct zbus_observer *obs, k_timepoint_t end_time,
struct net_buf *buf)
{
switch (obs->type) {
// listener直接执行回调函数
case ZBUS_OBSERVER_LISTENER_TYPE: {
obs->callback(chan);
break;
}
// subsriber是把channel的的地址传递到msgq中
case ZBUS_OBSERVER_SUBSCRIBER_TYPE: {
return k_msgq_put(obs->queue, &chan, sys_timepoint_timeout(end_time));
}
#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
case ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE: {
// 克隆一个buf,用来传到message subscriber的message fifo中
struct net_buf *cloned_buf = net_buf_clone(buf, sys_timepoint_timeout(end_time));
// 在这里把该channel地址复制到buf->userdata中
memcpy(net_buf_user_data(cloned_buf), &chan, sizeof(struct zbus_channel *));
// 把buf挂入message subsriber的message_fifo中
net_buf_put(obs->message_fifo, cloned_buf);
break;
}
#endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
return 0;
}
}
Subscriber 接收channel
// subscriber接收线程:
static void subscriber_task(void)
{
const struct zbus_channel *chan;
while (!zbus_sub_wait(&bar_sub, &chan, K_FOREVER)) {
struct acc_msg acc;
// 确保只有一个线程在操作channel, 此时其他线程无法执行pub/read/notify...
if (zbus_chan_claim(chan, K_FOREVER) != 0) {
k_oops();
}
if (&acc_data_chan == chan) {
zbus_chan_read(&acc_data_chan, &acc, K_MSEC(500));
}
zbus_chan_finish(chan);
}
}
// 这个函数只能给subscriber使用:
int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan, k_timeout_t timeout)
{
// 中断中不能睡眠等待
_ZBUS_ASSERT(!k_is_in_isr(), "zbus_sub_wait cannot be used inside ISRs");
_ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_SUBSCRIBER_TYPE, "sub must be a SUBSCRIBER");
// 等待publisher发布channel
return k_msgq_get(sub->queue, chan, timeout);
}
int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeout)
{
if (k_is_in_isr()) {
timeout = K_NO_WAIT;
}
int err = k_sem_take(&chan->data->sem, timeout);
// 读出channel中保存的message
memcpy(msg, chan->message, chan->message_size);
k_sem_give(&chan->data->sem);
return 0;
}
Message Subscriber接收channel
// Message subscriber接收线程:
static void msg_subscriber_task(void *sub)
{
const struct zbus_channel *chan;
struct acc_msg acc;
const struct zbus_observer *subscriber = sub;
while (!zbus_sub_wait_msg(subscriber, &chan, &acc, K_FOREVER)) {
if (&acc_data_chan != chan) {
LOG_ERR("Wrong channel %p!", chan);
continue;
}
LOG_INF("From msg subscriber %s -> Acc x=%d, y=%d, z=%d", zbus_obs_name(subscriber),
acc.x, acc.y, acc.z);
}
}
int zbus_sub_wait_msg(const struct zbus_observer *sub, const struct zbus_channel **chan, void *msg,
k_timeout_t timeout)
{
_ZBUS_ASSERT(!k_is_in_isr(), "zbus_sub_wait_msg cannot be used inside ISRs");
_ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE,
"sub must be a MSG_SUBSCRIBER");
_ZBUS_ASSERT(sub->message_fifo != NULL, "sub message_fifo is required");
// 从publisher保存到message_fifo的Buf中读数据
struct net_buf *buf = net_buf_get(sub->message_fifo, timeout);
// 读出buf->user_data,在publish中保存的对应channel
*chan = *((struct zbus_channel **)net_buf_user_data(buf));
// copy buf中保存的message
memcpy(msg, net_buf_remove_mem(buf, zbus_chan_msg_size(*chan)), zbus_chan_msg_size(*chan));
net_buf_unref(buf);
return 0;
}
Priority boost
// Todo:
Reference
https://docs.zephyrproject.org/latest/services/zbus/index.html
https://docs.zephyrproject.org/latest/samples/subsys/zbus/zbus.html
https://docs.zephyrproject.org/latest/samples/subsys/zbus/hello_world/README.html