XFusion API v1.3.0
载入中...
搜索中...
未找到
xf_task_mbus.c
浏览该文件的文档.
1
12/* ==================== [Includes] ========================================== */
13
14#include "xf_task_mbus.h"
15#include "xf_task_queue.h"
16
17/* ==================== [Defines] =========================================== */
18
19#if XF_TASK_MBUS_IS_ENABLE
20
21/* ==================== [Typedefs] ========================================== */
22
23typedef struct _xf_task_xtopic_t {
25 xf_list_t sub_list; // 订阅链表
26 xf_task_queue_t pub_queue; // 发布链表,有缓存有限用缓存,没缓存则创建
27 uint32_t id; // topic id
28 uint32_t size; // topic发布消息大小
30
31typedef struct _xf_task_xsub_t {
34 void *user_data; // 用户订阅回调参数
36
37/* ==================== [Static Prototypes] ================================= */
38
39static void xf_task_mbus_run(xf_task_mtopic_t *mtopic, void *data);
40static xf_err_t xf_task_mbus_find(uint32_t topic_id, xf_task_mtopic_t **topic);
41
42/* ==================== [Static Variables] ================================== */
43
45
46/* ==================== [Macros] ============================================ */
47
48#define TAG "mbus"
49#define DEFAULT_QUEUE_COUNT (2)
50
51/* ==================== [Global Functions] ================================== */
52
53xf_err_t xf_task_mbus_reg_topic(uint32_t topic_id, uint32_t size)
54{
55 XF_ASSERT(xf_task_mbus_find(topic_id, NULL), XF_ERR_INITED, TAG, "topic:%d is exists", (int)topic_id);
56
57 // 找到指定的manager后,注册mtopic
59 void *buf = (void *)((uint8_t *)mtopic + sizeof(xf_task_mtopic_t));
60
61 if (mtopic == NULL) {
62 XF_LOGE(TAG, "memory alloc failed!");
63 return XF_ERR_NO_MEM;
64 }
65
66 xf_list_init(&mtopic->node);
67 xf_list_init(&mtopic->sub_list);
69 mtopic->id = topic_id;
70 mtopic->size = size;
72
73 return XF_OK;
74}
75
77{
78 xf_task_mtopic_t *mtopic = NULL;
79 xf_task_msub_t *msub, *_msub;
80
81 if (xf_task_mbus_find(topic_id, &mtopic) == XF_ERR_NOT_FOUND) {
82 XF_LOGE(TAG, "topic:%d not found", (int)topic_id);
83 return XF_ERR_NOT_FOUND;
84 }
85
86 xf_list_for_each_entry_safe(msub, _msub, &mtopic->sub_list, xf_task_msub_t, node) {
87 xf_list_del_init(&msub->node);
88 xf_free(msub);
89 }
90 xf_list_del_init(&mtopic->node);
91 xf_free(mtopic);
92
93 return XF_OK;
94}
95
96xf_err_t xf_task_mbus_pub_async(uint32_t topic_id, void *data)
97{
98 XF_ASSERT(data, XF_ERR_INVALID_ARG, TAG, "data must not be NULL");
99
100 xf_task_mtopic_t *mtopic = NULL;
101
102 if (xf_task_mbus_find(topic_id, &mtopic) == XF_ERR_NOT_FOUND) {
103 XF_LOGE(TAG, "topic:%d not found", (int)topic_id);
104 return XF_ERR_NOT_FOUND;
105 }
106
108 return err;
109}
110
111xf_err_t xf_task_mbus_pub_sync(uint32_t topic_id, void *data)
112{
113 XF_ASSERT(data, XF_ERR_INVALID_ARG, TAG, "data must not be NULL");
114
115 xf_task_mtopic_t *mtopic = NULL;
116
117 if (xf_task_mbus_find(topic_id, &mtopic) == XF_ERR_NOT_FOUND) {
118 XF_LOGE(TAG, "topic:%d not found", (int)topic_id);
119 return XF_ERR_NOT_FOUND;
120 }
121
122 xf_task_mbus_run(mtopic, data);
123
124 return XF_OK;
125}
126
127xf_err_t xf_task_mbus_sub(uint32_t topic_id, xf_task_mbus_func_t mbus_cb, void *user_data)
128{
129 XF_ASSERT(mbus_cb, XF_ERR_INVALID_ARG, TAG, "mbus_cb must not be NULL");
130
131 xf_task_mtopic_t *mtopic = NULL;
132 xf_task_msub_t *msub = NULL;
133
134 if (xf_task_mbus_find(topic_id, &mtopic) == XF_ERR_NOT_FOUND) {
135 XF_LOGE(TAG, "topic:%d not found", (int)topic_id);
136 return XF_ERR_NOT_FOUND;
137 }
138
139 xf_list_for_each_entry(msub, &mtopic->sub_list, xf_task_msub_t, node) {
140 if (msub->mbus_cb == mbus_cb) {
141 XF_LOGD(TAG, "mbus_cb is exists!");
142 return XF_ERR_INITED;
143 }
144 }
145
146 // 如果没有重复注册,则创建并加入链表
147 msub = (xf_task_msub_t *)xf_malloc(sizeof(xf_task_msub_t));
148
149 if (msub == NULL) {
150 XF_LOGE(TAG, "memory alloc failed!");
151 return XF_ERR_NO_MEM;
152 }
153
154 xf_list_init(&msub->node);
155
156 msub->mbus_cb = mbus_cb;
157 msub->user_data = user_data;
158
159 xf_list_add_tail(&msub->node, &mtopic->sub_list);
160
161 return XF_OK;
162}
163
165{
166 XF_ASSERT(mbus_cb, XF_ERR_INVALID_ARG, TAG, "mbus_cb must not be NULL");
167
168 xf_task_mtopic_t *mtopic = NULL;
169 xf_task_msub_t *msub, *_msub;
170
171 if (xf_task_mbus_find(topic_id, &mtopic) == XF_ERR_NOT_FOUND) {
172 XF_LOGE(TAG, "topic:%d not found", (int)topic_id);
173 return XF_ERR_NOT_FOUND;
174 }
175
176 xf_list_for_each_entry_safe(msub, _msub, &mtopic->sub_list, xf_task_msub_t, node) {
177 if (msub->mbus_cb == mbus_cb) {
178 xf_list_del_init(&msub->node);
179 xf_free(msub);
180 return XF_OK;
181 }
182 }
183
184 XF_LOGE(TAG, "mbus_cb not found!");
185
186 return XF_ERR_NOT_FOUND;
187}
188
190{
191 xf_task_mtopic_t *mtopic = NULL;
192 xf_task_msub_t *msub, *_msub;
193
194 if (xf_task_mbus_find(topic_id, &mtopic) == XF_ERR_NOT_FOUND) {
195 XF_LOGE(TAG, "topic:%d not found", (int)topic_id);
196 return XF_ERR_NOT_FOUND;
197 }
198
199 xf_list_for_each_entry_safe(msub, _msub, &mtopic->sub_list, xf_task_msub_t, node) {
200 xf_list_del_init(&msub->node);
201 xf_free(msub);
202 }
203
204 return XF_OK;
205}
206
208{
209 // 循环执行订阅回调
210 xf_task_mtopic_t *mtopic;
212 while (!xf_task_queue_is_empty(&mtopic->pub_queue)) {
213 void *pub_data = xf_task_queue_peek(&mtopic->pub_queue);
214 xf_task_mbus_run(mtopic, pub_data);
216 }
217 }
218}
219
220/* ==================== [Static Functions] ================================== */
221
222static void xf_task_mbus_run(xf_task_mtopic_t *mtopic, void *data)
223{
224 xf_task_msub_t *msub;
225 xf_list_for_each_entry(msub, &mtopic->sub_list, xf_task_msub_t, node) {
226 msub->mbus_cb(data, msub->user_data);
227 }
228}
229
230static xf_err_t xf_task_mbus_find(uint32_t topic_id, xf_task_mtopic_t **topic)
231{
232 xf_task_mtopic_t *mtopic;
234 if (mtopic->id == topic_id) {
235 if (topic != NULL) {
236 *topic = mtopic;
237 }
238 return XF_OK;
239 }
240 }
241
242 return XF_ERR_NOT_FOUND;
243}
244
245#endif // XF_TASK_MBUS_IS_ENABLE
void(* xf_task_mbus_func_t)(const void *const data, void *user_data)
mbus 的数据订阅回调函数原型。
xf_err_t xf_task_mbus_unsub(uint32_t topic_id, xf_task_mbus_func_t mbus_cb)
解除订阅。
xf_err_t xf_task_mbus_reg_topic(uint32_t topic_id, uint32_t size)
注册 topic。
xf_err_t xf_task_mbus_pub_async(uint32_t topic_id, void *data)
异步发布指定的 topic ,不会阻塞代码运行。
xf_err_t xf_task_mbus_unsub_all(uint32_t topic_id)
解除 topic下所有订阅。
void xf_task_mbus_handle(void)
处理异步的消息。
xf_err_t xf_task_mbus_sub(uint32_t topic_id, xf_task_mbus_func_t mbus_cb, void *user_data)
订阅指定的 topic。
xf_err_t xf_task_mbus_pub_sync(uint32_t topic_id, void *data)
同步发布,直接执行订阅者的回调,执行速度快。
xf_err_t xf_task_mbus_unreg_topic(uint32_t topic_id)
注销 topic
bool xf_task_queue_is_empty(const xf_task_queue_t *const queue)
判断队列是否为空。
xf_err_t xf_task_queue_init(xf_task_queue_t *const queue, void *data, const size_t size, const size_t count)
队列对象初始化。
xf_err_t xf_task_queue_remove_front(xf_task_queue_t *const queue)
从队列删除第一个元素。
void * xf_task_queue_peek(const xf_task_queue_t *const queue)
获取队列第一个元素。
xf_err_t xf_task_queue_send(xf_task_queue_t *const queue, void *item, const xf_task_queue_mode_t pos)
队列发送数据。
@ XF_TASK_QUEUE_SEND_TO_BACK
#define XF_ASSERT(condition, retval, tag, format,...)
xfusion 断言宏(条件 不成立 时则输出日志后返回)。
Definition xf_check.h:150
int32_t xf_err_t
整形错误类型。 错误码具体值见 xf_err_code_t.
Definition xf_err.h:69
@ XF_ERR_INVALID_ARG
Definition xf_err.h:46
@ XF_ERR_INITED
Definition xf_err.h:55
@ XF_OK
Definition xf_err.h:43
@ XF_ERR_NOT_FOUND
Definition xf_err.h:50
@ XF_ERR_NO_MEM
Definition xf_err.h:45
static void xf_list_init(xf_list_t *list)
动态初始化链表.
Definition xf_list.h:97
static void xf_list_add_tail(xf_list_t *new_node, xf_list_t *head)
xf_list_add_tail - 在指定节点之前添加一个 new_node.
Definition xf_list.h:168
static void xf_list_del_init(xf_list_t *entry)
xf_list_del_init - 从链表中删除节点, 并重新初始化.
Definition xf_list.h:266
#define XF_LIST_HEAD_INIT(name)
静态定义时初始化链表。
Definition xf_list.h:82
#define xf_list_for_each_entry_safe(pos, n, head, type, member)
list_for_each_entry_safe - 安全地迭代给定类型的链表,可删除链表节点。
Definition xf_list.h:876
#define xf_list_for_each_entry(pos, head, type, member)
list_for_each_entry - 迭代给定类型的链表。
Definition xf_list.h:732
#define xf_malloc(x)
Definition xf_stdlib.h:38
#define xf_free(x)
Definition xf_stdlib.h:39
消息队列对象结构体。
xf_task_mbus_func_t mbus_cb
xf_task_queue_t pub_queue
双向链表结构体.
Definition xf_list.h:64
#define XF_LOGE(tag, format,...)
#define XF_LOGD(tag, format,...)
#define DEFAULT_QUEUE_COUNT
struct _xf_task_xsub_t xf_task_msub_t
static xf_list_t _topic_list
static xf_err_t xf_task_mbus_find(uint32_t topic_id, xf_task_mtopic_t **topic)
struct _xf_task_xtopic_t xf_task_mtopic_t
static void xf_task_mbus_run(xf_task_mtopic_t *mtopic, void *data)
#define TAG
消息总线(发布订阅)。
消息队列。