以考虑性能的方式删除Sidekiq的作业
首先
The native Chinese translation for “はじめに” is “首先” .
我认为很多人在处理Rails时,当需要因某种原因延迟处理或异步处理时,都会使用Active Job。它非常方便实用,但是当你想要删除排队的作业时,就会立刻遇到困难。
前提 -唯一需要一个选择,对以下内容进行中文本地化改写:
适配器使用了文章标题中提到的Sidekiq。
使用Sidekiq API
Sidekiq::ScheduledSet.new.find_job(job_id).delete 方法会从所有作业中查找并解析数据,检查是否与 jid(作业的id)匹配,所以速度较慢。
虽然还有其他的删除方法,但我暂时不清楚参数的含义,所以决定先查看 Sidekiq 如何使用 Redis。
Redis-> Redis 用于问答系统
在Sidekiq中,我们使用了一种名为zset的数据类型来将作业存储在Redis中。
zset型 -> 有序集合类型
zset是一种数据类型,它按照存储的值具备的分数(score)来进行排序。
来源:http://redis.shibu.jp/commandreference/sortedsets.html
在Sidekiq中的分数
分数是指执行的日期和时间的Unix时间。
我来看一下可以在zset中使用的命令。
似乎可以通过在zrem中直接指定值来删除它。
使用命令 $ docker-compose exec redis redis-cli 或其他类似的命令启动 redis-cli,
并尝试以下操作,确认可以成功删除:
redis> select 5
redis> zrange schedule 0 5 withscores (获取前五个带有分数的数据)
redis> zrangebyscore schedule 1597600801 1697600801 (获取 score 介于 1597600801 到 1697600801 之间的任务)
redis> zrem schedule “任务内容” (删除任务)
尝试直接通过Redis删除(某物)。
保存的工作信息存在哪里是个问题。
实际上,保存在Redis中的作业如下所示。
{\"class\":\"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",\"wrapped\":\"TestJob\",\"queue\":\"default\",\"args\":[{\"job_class\":\"TestJob\",\"job_id\":\"482fb871-cdec-44ec-89e9-36ccccco9839\",\"queue_name\":\"default\",\"priority\":null,\"arguments\":[{\"test_id\":1,\"_aj_symbol_keys\":[\"test_id\",]}],\"executions\":0,\"locale\":\"ja\"}],\"retry\":true,\"jid\":\"ef2f98642ac6560fea66b999\",\"created_at\":1597812804.077493}"
然而,我不知道可以通过哪个流程来查看这些信息。
尝试阅读一遍Sidekiq的文档
听说如果使用客户端中间件,可以在将任务排入队列时进行各种操作。
https://github.com/mperham/sidekiq/wiki/Middleware
在这里查看了作业的内容,发现是我想要的 JSON 数据,所以决定在这里保存它。
中间件可以查看作业,而存储在Redis中的作业的JSON格式有些不同的问题。
空白、nil、null和哈希的格式「:」到「->」等之間有一些不同。
雖然有點困難,但我決定通過job.except(‘at’).inspect.delete(‘ ‘).gsub(/nil/, ‘null’).gsub(/=>/, ‘:’)進行一些修改。
结果的代码
中间件
我們將在這裡將工作保存到關聯式數據庫中。
module ClientMiddleware
class TestJobAttribute
def call(_worker_class, job, _queue, _redis_pool)
# 削除したいジョブのときだけ
if job['args'].first['job_class'] == 'TestJob'
test = Test.find(job['args'].first['arguments'].first)
# Testモデルのjobカラムにジョブのデータをそのまま入れる
return false unless test.update(job: adjust_for_redis(job))
end
yield
end
private
def adjust_for_redis(job)
job.except('at').inspect.delete(' ').gsub(/nil/, 'null').gsub(/=>/, ':')
end
end
end
# 省略
Sidekiq.configure_client do |config|
config.redis = { url: url }
config.client_middleware do |chain|
chain.add ClientMiddleware::TestJobAttribute
end
end
删除工作的客户端
class Sidekiq::RedisClient
def remove_jobs!(jobs)
return if jobs.blank?
Sidekiq.redis do |conn|
fixnum = conn.zrem('schedule', jobs)
raise SidekiqRedisClientError.new(jobs.length, fixnum) if jobs.length != fixnum
end
end
end
class SidekiqRedisClientError < StandardError
def initialize(len, fixnum)
super("The jobs could not be deleted. jobs.length is #{len}. fixnum is #{fixnum}.")
end
end
使用Sidekiq::RedisClient.new.remove_jobs!([Test.first.job])应该能够删除作业。