https://github.com/HaleyLeoZhang/message_queue
注意:当前时间戳,在执行以下操作前,就已经获取到了,后面的几步只是复用这个值
队列数据 = 队列唯一编号+队列内容+重试次数
{"job":{"id":"FC3LJXSJWX4DI6RIWOFEHAJYDJ7T0OIU","attemps":0,"data":"O:28:\"HaleyLeoZhang\\Job\\EchoLogJob\":4:{s:10:\"\u0000*\u0000payload\";N;s:13:\"\u0000*\u0000delay_time\";i:3;s:13:\"\u0000*\u0000queue_name\";N;s:6:\"object\";O:8:\"stdClass\":3:{s:2:\"id\";i:1;s:4:\"name\";s:13:\"HaleyLeoZhang\";s:4:\"time\";i:1548040630;}}"}}
zadd 延迟队列名 执行分数(延迟秒数+当前时间戳[整型]) redis队列数据)
"ZADD" "queues:echo_log_job:delayed" "1489802332" {"job":{"id":"FC3LJXSJWX4DI6RIWOFEHAJYDJ7T0OIU","attemps":0,"data":"O:28:\"HaleyLeoZhang\\Job\\EchoLogJob\":4:{s:10:\"\u0000*\u0000payload\";N;s:13:\"\u0000*\u0000delay_time\";i:3;s:13:\"\u0000*\u0000queue_name\";N;s:6:\"object\";O:8:\"stdClass\":3:{s:2:\"id\";i:1;s:4:\"name\";s:13:\"HaleyLeoZhang\";s:4:\"time\";i:1548040630;}}"}}
zrangebyscore 延迟队列名 最小分值(负无穷) 最大分值(当前时间戳)
rpush 队列名 队列数据
"zrangebyscore" "queues:echo_log_job:delayed" "-inf" "1489802333"
负无穷到当前时间戳的分值
"zremrangebyrank" "queues:echo_log_job:delayed" "0" 刚刚取出来的任务数量-1(因为下标是从0开始算)
"zremrangebyrank" "queues:echo_log_job:delayed" "0" "0"
rpush 队列名 队列数据
"rpush" "queues:echo_log_job" {"job":{"id":"FC3LJXSJWX4DI6RIWOFEHAJYDJ7T0OIU","attemps":0,"data":"O:28:\"HaleyLeoZhang\\Job\\EchoLogJob\":4:{s:10:\"\u0000*\u0000payload\";N;s:13:\"\u0000*\u0000delay_time\";i:3;s:13:\"\u0000*\u0000queue_name\";N;s:6:\"object\";O:8:\"stdClass\":3:{s:2:\"id\";i:1;s:4:\"name\";s:13:\"HaleyLeoZhang\";s:4:\"time\";i:1548040630;}}"}}
lpop 队列名
"lpop" "queues:echo_log_job"
获取当前执行的时间戳+超时时间,zadd插入到对应队列的reserved数据结构中,用以任务失败后,重新放入队列
"zadd" "queues:echo_log_job:reserved" "1489802343" {"job":{"id":"FC3LJXSJWX4DI6RIWOFEHAJYDJ7T0OIU","attemps":0,"data":"O:28:\"HaleyLeoZhang\\Job\\EchoLogJob\":4:{s:10:\"\u0000*\u0000payload\";N;s:13:\"\u0000*\u0000delay_time\";i:3;s:13:\"\u0000*\u0000queue_name\";N;s:6:\"object\";O:8:\"stdClass\":3:{s:2:\"id\";i:1;s:4:\"name\";s:13:\"HaleyLeoZhang\";s:4:\"time\";i:1548040630;}}"}}
zrem "queues:echo_log_job:reserved" {"job":{"id":"FC3LJXSJWX4DI6RIWOFEHAJYDJ7T0OIU","attemps":0,"data":"O:28:\"HaleyLeoZhang\\Job\\EchoLogJob\":4:{s:10:\"\u0000*\u0000payload\";N;s:13:\"\u0000*\u0000delay_time\";i:3;s:13:\"\u0000*\u0000queue_name\";N;s:6:\"object\";O:8:\"stdClass\":3:{s:2:\"id\";i:1;s:4:\"name\";s:13:\"HaleyLeoZhang\";s:4:\"time\";i:1548040630;}}"}}
程序端,for循环拉取,如果任务很多,考虑CPU占用
可以在处理完一次数据后,指定等待时间挂起进程再拉取新的任务
- TODO
- TODO
- TODO
下篇文章云天河将会讲解 RabbitMQ
的一些运作原理,及其与 Redis 实现的消息队列的优缺点,此外会引入下 Kafka
与 RabbitMQ
各自适用的场景
评论列表点此评论