以考虑性能的方式删除Sidekiq的作业

首先

The native Chinese translation for “はじめに” is “首先” .

我认为很多人在处理Rails时,当需要因某种原因延迟处理或异步处理时,都会使用Active Job。它非常方便实用,但是当你想要删除排队的作业时,就会立刻遇到困难。

前提 -唯一需要一个选择,对以下内容进行中文本地化改写:

适配器使用了文章标题中提到的Sidekiq。

使用Sidekiq API

 

Sidekiq::ScheduledSet.new.find_job(job_id).delete 方法会从所有作业中查找并解析数据,检查是否与 jid(作业的id)匹配,所以速度较慢。
虽然还有其他的删除方法,但我暂时不清楚参数的含义,所以决定先查看 Sidekiq 如何使用 Redis。

Redis-> Redis 用于问答系统

在Sidekiq中,我们使用了一种名为zset的数据类型来将作业存储在Redis中。

zset型 -> 有序集合类型

zset是一种数据类型,它按照存储的值具备的分数(score)来进行排序。
来源:http://redis.shibu.jp/commandreference/sortedsets.html

在Sidekiq中的分数

分数是指执行的日期和时间的Unix时间。

我来看一下可以在zset中使用的命令。

似乎可以通过在zrem中直接指定值来删除它。

使用命令 $ docker-compose exec redis redis-cli 或其他类似的命令启动 redis-cli,
并尝试以下操作,确认可以成功删除:
redis> select 5
redis> zrange schedule 0 5 withscores (获取前五个带有分数的数据)
redis> zrangebyscore schedule 1597600801 1697600801 (获取 score 介于 1597600801 到 1697600801 之间的任务)
redis> zrem schedule “任务内容” (删除任务)

尝试直接通过Redis删除(某物)。

保存的工作信息存在哪里是个问题。

实际上,保存在Redis中的作业如下所示。

{\"class\":\"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",\"wrapped\":\"TestJob\",\"queue\":\"default\",\"args\":[{\"job_class\":\"TestJob\",\"job_id\":\"482fb871-cdec-44ec-89e9-36ccccco9839\",\"queue_name\":\"default\",\"priority\":null,\"arguments\":[{\"test_id\":1,\"_aj_symbol_keys\":[\"test_id\",]}],\"executions\":0,\"locale\":\"ja\"}],\"retry\":true,\"jid\":\"ef2f98642ac6560fea66b999\",\"created_at\":1597812804.077493}"

然而,我不知道可以通过哪个流程来查看这些信息。

尝试阅读一遍Sidekiq的文档

听说如果使用客户端中间件,可以在将任务排入队列时进行各种操作。
https://github.com/mperham/sidekiq/wiki/Middleware
在这里查看了作业的内容,发现是我想要的 JSON 数据,所以决定在这里保存它。

中间件可以查看作业,而存储在Redis中的作业的JSON格式有些不同的问题。

空白、nil、null和哈希的格式「:」到「->」等之間有一些不同。
雖然有點困難,但我決定通過job.except(‘at’).inspect.delete(‘ ‘).gsub(/nil/, ‘null’).gsub(/=>/, ‘:’)進行一些修改。

结果的代码

中间件

我們將在這裡將工作保存到關聯式數據庫中。

module ClientMiddleware
  class TestJobAttribute
    def call(_worker_class, job, _queue, _redis_pool)
      # 削除したいジョブのときだけ
      if job['args'].first['job_class'] == 'TestJob'
        test = Test.find(job['args'].first['arguments'].first)
        # Testモデルのjobカラムにジョブのデータをそのまま入れる
        return false unless test.update(job: adjust_for_redis(job))
      end
      yield
    end

    private

    def adjust_for_redis(job)
      job.except('at').inspect.delete(' ').gsub(/nil/, 'null').gsub(/=>/, ':')
    end
  end
end

# 省略

Sidekiq.configure_client do |config|
  config.redis = { url: url }
  config.client_middleware do |chain|
    chain.add ClientMiddleware::TestJobAttribute
  end
end

删除工作的客户端

class Sidekiq::RedisClient
  def remove_jobs!(jobs)
    return if jobs.blank?
    Sidekiq.redis do |conn|
      fixnum = conn.zrem('schedule', jobs)
      raise SidekiqRedisClientError.new(jobs.length, fixnum) if jobs.length != fixnum
    end
  end
end

class SidekiqRedisClientError < StandardError
  def initialize(len, fixnum)
    super("The jobs could not be deleted. jobs.length is #{len}. fixnum is #{fixnum}.")
  end
end

使用Sidekiq::RedisClient.new.remove_jobs!([Test.first.job])应该能够删除作业。

广告
将在 10 秒后关闭
bannerAds