Rails + Active Job + Delayed::Jobにて、ジョブを作ってみたり、メールジョブと組み合わせてみたり、テストを書いてみたりしてみた

Rails + Active Job + Delayed::Jobな構成でジョブを作ってみた時に、色々調べたためメモを残します。

なお、記事が長いため、途中のソースコードは説明使う部分以外を省略しています。

必要に応じて、実際のソースコードGithub リポジトリでご確認ください。
https://github.com/thinkAmi-sandbox/rails_delayed_job-sample

 
目次

 

環境

 

環境構築

rails new

今回は、RailsAPIアプリ + Delayed::Job の組み合わせで色々試してみます。

rails new して bundle installします。

% bundle exec rails new rails_delayed_job_sample --api --skip-bundle

% cd rails_delayed_job_sample

% bundle install

 

Delayed::Jobのセットアップ

Delayed::JobのREADMEに従いセットアップを行います。

今回は Delayed::Jobのバックエンドに Active Record を使うため、Gemfileに delayed_job_active_record を追加し、bundle installします。

gem 'delayed_job_active_record'

 
続いてジェネレートとマイグレーションを実行します。

# ジェネレート
% bin/rails generate delayed_job:active_record
      create  bin/delayed_job
       chmod  bin/delayed_job
      create  db/migrate/20220703062311_create_delayed_jobs.rb

# DB作成
 % bin/rails db:migrate
== 20220703062311 CreateDelayedJobs: migrating ================================
-- create_table(:delayed_jobs)
   -> 0.0020s
-- add_index(:delayed_jobs, [:priority, :run_at], {:name=>"delayed_jobs_priority"})
   -> 0.0007s
== 20220703062311 CreateDelayedJobs: migrated (0.0029s) =======================

 
次に、Active Job の queue_adapter として使えるよう、 config/application.rb に設定を追加します。

config.active_job.queue_adapter = :delayed_job

 

APIアプリのモデルを作成

annotate を使う

モデルにスキーマを明示するため、 annotate gemをGemfileに追加し、bundle installします。
https://github.com/ctran/annotate_models

 
インストールが終わったら、ジェネレータを使って annotate の初期設定を行います。

% bin/rails g annotate:install

 

モデルを作成

今回は name 列だけを持つ apple モデルを使います。

bin/rails g model apple name:string
      invoke  active_record
      create    db/migrate/20220703065930_create_apples.rb
      create    app/models/apple.rb
      invoke    test_unit
      create      test/models/apple_test.rb
      create      test/fixtures/apples.yml

 
以上で準備ができました。

 

Delayed::Jobを試す

初めてのジョブを作る

実装

初めてのジョブとして

  1. コントローラで、HTTPリクエストボディを元に、 apple モデルへデータを保存する
  2. コントローラで、ジョブをキューに登録する
  3. HTTPレスポンスを返す

を作ってみます。

 
まずはジョブから作成します。

ジョブ生成用のジェネレータを使います。
3 ジョブを作成する | Active Job の基礎 - Railsガイド

% bin/rails g job apple_instance       
      invoke  test_unit
      create    test/jobs/apple_instance_job_test.rb
      create  app/jobs/apple_instance_job.rb

 
ジョブの雛形ができたので、次はコントローラを作成します。

なお、ジョブにモデルのインスタンスを直接渡せるのは、GlobalIDという仕組みのようです。
10 GlobalID | Active Job の基礎 - Railsガイド

class Api::PassingInstance::ApplesController < ApplicationController
  def create
    # モデルへの保存
    apple = Apple.new(name: params[:name])
    apple.save

    # モデルのインスタンスを渡してジョブをキューに登録する
    AppleInstanceJob.perform_later(apple)

    render json: { status: params[:name]}
  end
end

 
続いて、ジョブを編集します。

今回は、受け取ったモデルのインスタンスをログ出力します。

class AppleInstanceJob < ApplicationJob
  queue_as :default

  def perform(apple)
    puts "[instance job] (#{apple.name}) #{Time.zone.now} start ============>"
    puts "[instance job] (#{apple.name}) #{Time.zone.now} end   <============"
  end
end

 
最後に routes.rb にルーティングを追加します。

namespace :api do
  namespace :passing_instance do
    resources :apples, only: :create
  end
end

 

動作確認

Railsを起動後、curlAPIエンドポイントにHTTPリクエストします。

% curl -X POST -H "Content-Type: application/json" -d '{"name":"ふじ"}' http://localhost:3000/api/passing_instance/apples
{"status":"ふじ"}

 
delayed_jobs テーブルの中身を見ると、データが登録されています。

 
bin/rails jobs:work で Delayed Job Worker を起動し、ワーカーのログを確認します。

作成したジョブが動作したようです。

[Worker(host:*** pid:97322)] Starting job worker
[Worker(host:***pid:97322)] Job AppleInstanceJob [d93ef4e6-9162-4cf3-b805-a03eb751e4c6] from DelayedJob(default) with arguments: [{"_aj_globalid"=>"gid://rails-delayed-job-sample/Apple/23"}] (id=114) (queue=default) RUNNING
[instance job] (ふじ) 2022-07-10 08:15:21 +0900 start ============>
[instance job] (ふじ) 2022-07-10 08:15:21 +0900 end   <============
[Worker(host:*** pid:97322)] Job AppleInstanceJob [d93ef4e6-9162-4cf3-b805-a03eb751e4c6] from DelayedJob(default) with arguments: [{"_aj_globalid"=>"gid://rails-delayed-job-sample/Apple/23"}] (id=114) (queue=default) COMPLETED after 0.0272
[Worker(host:*** pid:97322)] 1 jobs processed at 15.4619 j/s, 0 failed

 

引数としてサポートされていない、OpenStruct 型の引数を試してみる

Active Job でサポートされる引数の型の一覧は以下にあります。
9 引数でサポートされる型 | Active Job の基礎 - Railsガイド

 
上記に書かれていない型を引数として渡すとどうなるかを試してみます。

今回は OpenStruct を使ってみます。
class OpenStruct (Ruby 3.1 リファレンスマニュアル)

 
ジョブを生成します。

% bin/rails g job open_struct
      invoke  test_unit
      create    test/jobs/open_struct_job_test.rb
      create  app/jobs/open_struct_job.rb

 
コントローラを実装します。

class Api::PassingOpenStruct::ApplesController < ApplicationController
  def create
    apple = OpenStruct.new(
      {
        id: 1,
        name: params[:name]
      }
    )

    # OpenStruct を渡す
    OpenStructJob.perform_later(apple)

    render json: { status: apple.name}
  end
end

 
ジョブを修正します。

class OpenStructJob < ApplicationJob
  queue_as :default

  def perform(apple)
    puts "[open_struct job] (#{apple.name}) #{Time.zone.now} start ============>"
    puts "[open_struct job] (#{apple.name}) #{Time.zone.now} end   <============"
  end
end

 
routes を追加し、Railsを起動した後に curl でアクセスすると、エラーになりました。

% curl -X POST -H "Content-Type: application/json" -d '{"name":"ふじ"}' http://localhost:3000/api/passing_open_struct/apples
{"status":500,"error":"Internal Server Error", ... }

 
Railsのログには OpenStruct 型がサポートされていない旨が出力されていました。

Started POST "/api/passing_open_struct/apples" for 127.0.0.1 at 2022-07-10 08:31:51 +0900
   (0.0ms)  SELECT sqlite_version(*)
Processing by Api::PassingOpenStruct::ApplesController#create as */*
  Parameters: {"name"=>"ふじ", "apple"=>{"name"=>"ふじ"}}
[ActiveJob] Failed enqueuing OpenStructJob to DelayedJob(default): ActiveJob::SerializationError (Unsupported argument type: OpenStruct)

 
OpenStruct 型をサポートするには、シリアライザを定義すれば良いようです。
10.1 シリアライザ | Active Job の基礎 - Railsガイド

 
ただ、Ruby 3.0 から OpenStruct の使用に警告が入ったこともあり、今後新規で OpenStruct を使うのは控えることになるのかなと思っています。

 
そこで、今回は OpenStruct のシリアライザ定義をがんばるのではなく、 to_h でハッシュにします。

 
コントローラを修正します。

class Api::PassingOpenStruct::ApplesController < ApplicationController
  def create
    apple = OpenStruct.new(
      {
        id: 1,
        name: params[:name]
      }
    )

    # ハッシュにするよう修正
    OpenStructJob.perform_later(apple.to_h)

    render json: { status: apple.name}
  end
end

 
ハッシュから取り出すよう、ジョブも修正します。

class OpenStructJob < ApplicationJob
  queue_as :default

  def perform(apple)
    puts "[open_struct job] (#{apple[:name]}) #{Time.zone.now} start ============>"
    puts "[open_struct job] (#{apple[:name]}) #{Time.zone.now} end   <============"
  end
end

 
再度 curl でアクセスすると、正常なレスポンスが返ってきました。

% curl -X POST -H "Content-Type: application/json" -d '{"name":"ふじ"}' http://localhost:3000/api/passing_open_struct/apples
{"status":"ふじ"}

ワーカーのログにも正常に終了したことが出力されています。

[open_struct job] (ふじ) 2022-07-10 09:21:39 +0900 start ============>
[open_struct job] (ふじ) 2022-07-10 09:21:39 +0900 end   <============

 

Delayed::Jobにて、モデルのインスタンスメソッド実行をジョブ化する

Delayed::Job の README には、 .delay.method(params) とメソッドの前に .delay をはさむことでメソッドをジョブ実行できる旨が記載されているため、試してみます。
https://github.com/collectiveidea/delayed_job#queuing-jobs

 
モデルにインスタンスメソッド (post_api) を生やします。

class Apple < ApplicationRecord
  def post_api
    # 外部APIを呼んでいるとする
    puts "[instance] (#{name}) #{Time.zone.now} start ============>"
    puts "[instance] (#{name}) #{Time.zone.now} end   <============"
  end
end

 
コントローラにて、 .delay 付きでモデルのインスタンスメソッドを呼んでみます。

class Api::InstanceMethod::ApplesController < ApplicationController
  def create
    apple = Apple.new(name: params[:name])
    apple.save

    # モデルのインスタンスメソッドをジョブ実行
    apple.delay.post_api

    render json: { status: params[:name]}
  end
end

 
ルーティングの追加とRailsの起動を行い、curlで動作確認します。

% curl -X POST -H "Content-Type: application/json" -d '{"name":"ふじ"}' http://localhost:3000/api/instance_method/apples    
{"status":"ふじ"}

 
ワーカーのログに実行結果が出力されていました。

[Worker(host:*** pid:97322)] Job Apple#post_api (id=116) RUNNING
[instance] (ふじ) 2022-07-10 09:30:22 +0900 start ============>
[instance] (ふじ) 2022-07-10 09:30:22 +0900 end   <============
[Worker(host:*** pid:97322)] Job Apple#post_api (id=116) COMPLETED after 0.0044

 

キューを振り分ける

準備

Delayed::Job では、名前付きキューを使うことでキューを振り分けることができることから、試してみます。
https://github.com/collectiveidea/delayed_job#named-queues

 
まずは、名前付きキューを使うジョブを3つ用意します。

キュー名:default

class DefaultQueueJob < ApplicationJob
  queue_as :default

  def perform(apple_name)
    puts "[default queue job] (#{apple_name}) #{Time.zone.now} start ============>"
    sleep 1
    puts "[default queue job] (#{apple_name}) #{Time.zone.now} end   <============"
  end
end

 
キュー名:custom

このジョブは重く、途中で10秒かかるものとします

class CustomQueueJob < ApplicationJob
  queue_as :custom

  def perform(apple_name)
    puts "[custom queue job] (#{apple_name}) #{Time.zone.now} start ============>"
    sleep 10
    puts "[custom queue job] (#{apple_name}) #{Time.zone.now} end   <============"
  end
end

 
キュー名:another

class AnotherQueueJob < ApplicationJob
  queue_as :another

  def perform(apple_name)
    puts "[another queue job] (#{apple_name}) #{Time.zone.now} start ============>"
    puts "[another queue job] (#{apple_name}) #{Time.zone.now} end   <============"
  end
end

 
続いて、これらのジョブを使うようなコントローラ、およびルーティングを追加します。

コントローラはこんな感じです。

class Api::QueueName::DefaultQueuesController < ApplicationController
  def create
    DefaultQueueJob.perform_later(params[:name])

    render json: { status: params[:name]}
  end
end

 
準備が終わったため、キューやワーカーの条件を変えて動作確認してみます。

 

環境変数 QUEUE や QUEUES を使い、Delayed::Job でワーカーが扱えるキューを指定する

Delayed::Job でワーカーが扱えるキューの設定方法を調べたところ、READMEに記載がありました。

If you want to just run all available jobs and exit you can use rake jobs:workoff

Work off queues by setting the QUEUE or QUEUES environment variable.

QUEUE=tracking rake jobs:work
QUEUES=mailers,tasks rake jobs:work

 

2つの名前付きキューに入れて、すべてのキューを扱えるワーカー1つを動かす

最初に、ワーカーを停止しておきます。

次に、キュー名 custom -> another -> custom の順でキューに積んでおきます。

# custom
curl -X POST -H "Content-Type: application/json" -d '{"name":"ふじ"}' http://localhost:3000/api/queue_name/custom_queues

# another
curl -X POST -H "Content-Type: application/json" -d '{"name":"秋映"}' http://localhost:3000/api/queue_name/another_queues

# もう一回、custom
curl -X POST -H "Content-Type: application/json" -d '{"name":"ふじ"}' http://localhost:3000/api/queue_name/custom_queues

 
ここで delayed_job テーブルを確認すると、3件のレコードが登録されています。

 
続いて bin/rails jobs:work でワーカーを起動します。

ワーカーには環境変数を設定していないため、すべてのキューを扱うことができました。

また、優先度も設定していないため、キューに入った順番に処理が実行されました。

[Worker(host:*** pid:2223)] Job CustomQueueJob [1f86142c-58ec-401e-ae7a-5b9c528ded85] from DelayedJob(custom) with arguments: ["ふじ"] (id=121) (queue=custom) RUNNING
[custom queue job] (ふじ) 2022-07-10 10:04:04 +0900 start ============>
[custom queue job] (ふじ) 2022-07-10 10:04:14 +0900 end   <============
[Worker(host:*** pid:2223)] Job CustomQueueJob [1f86142c-58ec-401e-ae7a-5b9c528ded85] from DelayedJob(custom) with arguments: ["ふじ"] (id=121) (queue=custom) COMPLETED after 10.0265
[Worker(host:*** pid:2223)] Job AnotherQueueJob [c55ea7b9-8a13-4d6d-aea3-b8954ea910fa] from DelayedJob(another) with arguments: ["秋映"] (id=122) (queue=another) RUNNING
[another queue job] (秋映) 2022-07-10 10:04:14 +0900 start ============>
[another queue job] (秋映) 2022-07-10 10:04:14 +0900 end   <============
[Worker(host:*** pid:2223)] Job AnotherQueueJob [c55ea7b9-8a13-4d6d-aea3-b8954ea910fa] from DelayedJob(another) with arguments: ["秋映"] (id=122) (queue=another) COMPLETED after 0.0141
[Worker(host:*** pid:2223)] Job CustomQueueJob [6f69a43e-3a4b-42ae-9a79-2be09659f5f9] from DelayedJob(custom) with arguments: ["ふじ"] (id=123) (queue=custom) RUNNING
[custom queue job] (ふじ) 2022-07-10 10:04:14 +0900 start ============>
[custom queue job] (ふじ) 2022-07-10 10:04:24 +0900 end   <============
[Worker(host:*** pid:2223)] Job CustomQueueJob [6f69a43e-3a4b-42ae-9a79-2be09659f5f9] from DelayedJob(custom) with arguments: ["ふじ"] (id=123) (queue=custom) COMPLETED after 10.0187
[Worker(host:*** pid:2223)] 3 jobs processed at 0.1491 j/s, 0 failed

 

3つの名前付きキューに入れて、指定のキューを扱えるワーカー2つを動かす

先ほどと同様、curlでアクセスしてキューにためておきます。

# default
curl -X POST -H "Content-Type: application/json" -d '{"name":"つがる"}' http://localhost:3000/api/queue_name/default_queues

# custom
curl -X POST -H "Content-Type: application/json" -d '{"name":"ふじ"}' http://localhost:3000/api/queue_name/custom_queues

# another
curl -X POST -H "Content-Type: application/json" -d '{"name":"秋映"}' http://localhost:3000/api/queue_name/another_queues

 
次に、ワーカーを起動します。

localで複数のワーカーを起動する場合、 bin/rails jobs:work を別ターミナルで起動することになります。今回は worker1worker2 を用意します。

その際、ターミナルごとに環境変数を設定して起動します。

  • worker1は QUEUE=default
  • worker2は QUEUES=custom,another

 
worker1のログを見ると、キュー名 default だけを扱っています。

[Worker(host:*** pid:3308)] Job DefaultQueueJob [6659c50b-0182-4d69-9d16-83dd95213812] from DelayedJob(default) with arguments: ["つがる"] (id=127) (queue=default) RUNNING
[default queue job] (つがる) 2022-07-10 10:17:22 +0900 start ============>
[default queue job] (つがる) 2022-07-10 10:17:23 +0900 end   <============
[Worker(host:*** pid:3308)] Job DefaultQueueJob [6659c50b-0182-4d69-9d16-83dd95213812] from DelayedJob(default) with arguments: ["つがる"] (id=127) (queue=default) COMPLETED after 1.0250
[Worker(host:*** pid:3308)] 1 jobs processed at 0.9412 j/s, 0 failed

 
worker2のログを見ると、キュー名 customanother を扱っています。

worker1は空いているにも関わらず、キュー another は worker2 で動いています。

[Worker(host:*** pid:3672)] Job CustomQueueJob [0271bd52-abca-401d-bcc3-668def1447b7] from DelayedJob(custom) with arguments: ["ふじ"] (id=128) (queue=custom) RUNNING
[custom queue job] (ふじ) 2022-07-10 10:17:25 +0900 start ============>
[custom queue job] (ふじ) 2022-07-10 10:17:35 +0900 end   <============
[Worker(host:*** pid:3672)] Job CustomQueueJob [0271bd52-abca-401d-bcc3-668def1447b7] from DelayedJob(custom) with arguments: ["ふじ"] (id=128) (queue=custom) COMPLETED after 10.0248
[Worker(host:*** pid:3672)] Job AnotherQueueJob [f4a407d3-b34f-4c81-8701-6c650c1003d6] from DelayedJob(another) with arguments: ["秋映"] (id=129) (queue=another) RUNNING
[another queue job] (秋映) 2022-07-10 10:17:35 +0900 start ============>
[another queue job] (秋映) 2022-07-10 10:17:35 +0900 end   <============
[Worker(host:*** pid:3672)] Job AnotherQueueJob [f4a407d3-b34f-4c81-8701-6c650c1003d6] from DelayedJob(another) with arguments: ["秋映"] (id=129) (queue=another) COMPLETED after 0.0133
[Worker(host:*** pid:3672)] 2 jobs processed at 0.1986 j/s, 0 failed

 

ネストしたジョブの扱いについて

親ジョブから子ジョブを生成する

Delayed::Jobでは、あるジョブ(親)から別のジョブ(子) を生成できます。

試してみます。

class ParentJob < ApplicationJob
  queue_as :default

  def perform(prefix)
    puts "[parent job] (#{prefix}) #{Time.zone.now} start ============>"
    
    # 子のジョブを生成する
    ChildJob.perform_later(prefix)
    
    puts "[parent job] (#{prefix}) #{Time.zone.now} end   <============"
  end
end

 

class ChildJob < ApplicationJob
  queue_as :default

  def perform(prefix)
    puts "[child job] (#{prefix}デリシャス) #{Time.zone.now} start ============>"
    puts "[child job] (#{prefix}デリシャス) #{Time.zone.now} end   <============"
  end
end

 
コントローラでは、親ジョブのみ生成します。

class Api::ParentChild::StarkingsController < ApplicationController
  def create
    ParentJob.perform_later(params[:prefix])

    render json: { status: params[:prefix]}
  end
end

 
ルーティングを作成し、Railsとワーカーを起動した後、curlでアクセスします。

% curl -X POST -H "Content-Type: application/json" -d '{"prefix":"スターキング"}' http://localhost:3000/api/parent_child/starkings

 
delayed_jobs テーブルには親のみ登録されています。

 
ワーカーを起動すると、ワーカーのログに以下が出力されました。親→子の順番で処理されたようです。

[Worker(host:*** pid:4569)] Job ParentJob [84349539-8a02-4dd3-8139-93a13c9b73a0] from DelayedJob(default) with arguments: ["スターキング"] (id=130) (queue=default) RUNNING
[parent job] (スターキング) 2022-07-10 10:27:54 +0900 start ============>
[parent job] (スターキング) 2022-07-10 10:27:54 +0900 end   <============
[Worker(host:*** pid:4569)] Job ParentJob [84349539-8a02-4dd3-8139-93a13c9b73a0] from DelayedJob(default) with arguments: ["スターキング"] (id=130) (queue=default) COMPLETED after 0.0250
[Worker(host:*** pid:4569)] Job ChildJob [b7bb8376-a19a-47d4-bbab-5d8e2012e5bb] from DelayedJob(default) with arguments: ["スターキング"] (id=131) (queue=default) RUNNING
[child job] (スターキングデリシャス) 2022-07-10 10:27:54 +0900 start ============>
[child job] (スターキングデリシャス) 2022-07-10 10:27:54 +0900 end   <============
[Worker(host:*** pid:4569)] Job ChildJob [b7bb8376-a19a-47d4-bbab-5d8e2012e5bb] from DelayedJob(default) with arguments: ["スターキング"] (id=131) (queue=default) COMPLETED after 0.0086
[Worker(host:*** pid:4569)] 2 jobs processed at 31.0791 j/s, 0 failed

 

親のジョブと別ジョブがキューに登録されていた場合の挙動

今度は

  • 親ジョブ
  • 別のジョブ

が登録されていた場合、どの順番で実行されるかを試してみます。

 

# 親のジョブ
% curl -X POST -H "Content-Type: application/json" -d '{"prefix":"スターキング"}' http://localhost:3000/api/parent_child/starkings

# 別のジョブ
% curl -X POST -H "Content-Type: application/json" -d '{"name":"つがる"}' http://localhost:3000/api/queue_name/default_queues

 
ワーカーを起動する前の delayed_jobs テーブルです。

2レコード登録され、2つ目が別のジョブになっています。

 
ワーカーを起動すると、ワーカーのログには 親 → 別 → 子 の順番で処理されました。

キューに入った順番でワーカーは処理するようです。

[Worker(host:*** pid:5208)] Job ParentJob [2276fe30-0102-4dfc-af2c-83fc77aa4169] from DelayedJob(default) with arguments: ["スターキング"] (id=132) (queue=default) RUNNING
[parent job] (スターキング) 2022-07-10 10:33:22 +0900 start ============>
[parent job] (スターキング) 2022-07-10 10:33:22 +0900 end   <============
[Worker(host:*** pid:5208)] Job ParentJob [2276fe30-0102-4dfc-af2c-83fc77aa4169] from DelayedJob(default) with arguments: ["スターキング"] (id=132) (queue=default) COMPLETED after 0.0241
[Worker(host:*** pid:5208)] Job DefaultQueueJob [65dbfba4-8047-4814-87ea-763ec390ef3a] from DelayedJob(default) with arguments: ["つがる"] (id=133) (queue=default) RUNNING
[default queue job] (つがる) 2022-07-10 10:33:22 +0900 start ============>
[default queue job] (つがる) 2022-07-10 10:33:23 +0900 end   <============
[Worker(host:*** pid:5208)] Job DefaultQueueJob [65dbfba4-8047-4814-87ea-763ec390ef3a] from DelayedJob(default) with arguments: ["つがる"] (id=133) (queue=default) COMPLETED after 1.0195
[Worker(host:*** pid:5208)] Job ChildJob [0bbda4f7-2bf8-4d34-8631-0fcacfe238c9] from DelayedJob(default) with arguments: ["スターキング"] (id=134) (queue=default) RUNNING
[child job] (スターキングデリシャス) 2022-07-10 10:33:24 +0900 start ============>
[child job] (スターキングデリシャス) 2022-07-10 10:33:24 +0900 end   <============
[Worker(host:*** pid:5208)] Job ChildJob [0bbda4f7-2bf8-4d34-8631-0fcacfe238c9] from DelayedJob(default) with arguments: ["スターキング"] (id=134) (queue=default) COMPLETED after 0.0128
[Worker(host:*** pid:5208)] 3 jobs processed at 2.7135 j/s, 0 failed

 

親のジョブと別ジョブがキューに登録され、かつ、別ジョブは子ジョブよりも優先度が低い場合の挙動

Active Jobでは、 queue_with_priority に値を設定することで、ジョブの優先度を付けることができます。
https://api.rubyonrails.org/v7.0/classes/ActiveJob/QueuePriority/ClassMethods.html

 
そこで、queue_with_priority に値を設定した別ジョブ LowPriorityJob を作成し、子のジョブより優先度を下げてみて、動作を確認してみます。

class LowPriorityJob < ApplicationJob
  queue_as :default
  queue_with_priority 30

  def perform(*args)
    puts "[low priority job] #{Time.zone.now} start ============>"
    puts "[low priority job] #{Time.zone.now} end   <============"
  end
end

 
コントローラーではジョブを呼ぶだけです。

class Api::ParentChild::LowPrioritiesController < ApplicationController
  def create
    LowPriorityJob.perform_later(params[:name])

    render json: { status: params[:name]}
  end
end

 
ルーティングを作成し、Railsとワーカーを起動した後、curlでアクセスします。

# 親ジョブ
% curl -X POST -H "Content-Type: application/json" -d '{"prefix":"スターキング"}' http://localhost:3000/api/parent_child/starkings

# 別ジョブ
% curl -X POST -H "Content-Type: application/json" -d '{"name":"国光"}' http://localhost:3000/api/parent_child/low_priorities

 
delayed_jobs テーブルを見ると、2つ目のジョブの priority が設定されていました。

 
この状態でワーカーを起動します。

ワーカーのログには 親 → 子 → 別 の順番で処理した結果が記録されていました。

[Worker(host:*** pid:6162)] Job ParentJob [ce97898f-c43d-43ba-bf5e-62b12de598cb] from DelayedJob(default) with arguments: ["スターキング"] (id=138) (queue=default) RUNNING
[parent job] (スターキング) 2022-07-10 10:44:22 +0900 start ============>
[parent job] (スターキング) 2022-07-10 10:44:22 +0900 end   <============
[Worker(host:*** pid:6162)] Job ParentJob [ce97898f-c43d-43ba-bf5e-62b12de598cb] from DelayedJob(default) with arguments: ["スターキング"] (id=138) (queue=default) COMPLETED after 0.0214
[Worker(host:*** pid:6162)] Job ChildJob [e18aa897-63c4-4e7d-8c84-c3c5d0c08172] from DelayedJob(default) with arguments: ["スターキング"] (id=140) (queue=default) RUNNING
[child job] (スターキングデリシャス) 2022-07-10 10:44:22 +0900 start ============>
[child job] (スターキングデリシャス) 2022-07-10 10:44:22 +0900 end   <============
[Worker(host:*** pid:6162)] Job ChildJob [e18aa897-63c4-4e7d-8c84-c3c5d0c08172] from DelayedJob(default) with arguments: ["スターキング"] (id=140) (queue=default) COMPLETED after 0.0079
[Worker(host:*** pid:6162)] Job LowPriorityJob [18f99e0e-f78d-493b-bd2e-ec335e4828f3] from DelayedJob(default) with arguments: ["国光"] (id=139) (queue=default) RUNNING
[low priority job] 2022-07-10 10:44:22 +0900 start ============>
[low priority job] 2022-07-10 10:44:22 +0900 end   <============
[Worker(host:*** pid:6162)] Job LowPriorityJob [18f99e0e-f78d-493b-bd2e-ec335e4828f3] from DelayedJob(default) with arguments: ["国光"] (id=139) (queue=default) COMPLETED after 0.0136
[Worker(host:*** pid:6162)] 3 jobs processed at 36.9590 j/s, 0 failed

 

トランザクションを使って、モデルの登録/更新とキューへの登録を同期する

今回の delayed_job_active_record では Delayed::Job のバックエンドにDBを使っています。

そのため、「モデルの登録/更新とキューへの登録を同期させる。モデルの更新が失敗したら、キューの登録も取り消す」という機能は、トランザクションを使うと簡単に実現できます。

 
ためしに以下のコントローラを作成し、動作を確認してみます。

今回は「ロールバックだけを行い、APIは正常終了する」よう、 ActiveRecord::Rollback を使ってロールバックを行います。
https://api.rubyonrails.org/classes/ActiveRecord/Rollback.html

class Api::Rollback::ApplesController < ApplicationController
  def create
    # トランザクションの開始
    ActiveRecord::Base.transaction do

      # モデルの作成
      Apple.create(name: params[:name])

      # キューへの登録
      DefaultQueueJob.perform_later(params[:name])

      # トランザクションロールバックの発生
      raise ActiveRecord::Rollback
    end

    render json: { status: params[:name]}
  end
end

 
処理前の状態を Rails console で確認したところ、両方とも0件でした。

>> Apple.count
  Apple Count (0.2ms)  SELECT COUNT(*) FROM "apples"
=> 0
>> Delayed::Job.count
  Delayed::Backend::ActiveRecord::Job Count (0.4ms)  SELECT COUNT(*) FROM "delayed_jobs"
=> 0

 
続いて curl でリクエストします。

% curl -X POST -H "Content-Type: application/json" -d '{"name":"国光"}' http://localhost:3000/api/rollback/apples

 
Railsの実行ログを見ると、トランザクションロールバックが発生しています。

Started POST "/api/rollback/apples" for 127.0.0.1 at 2022-07-10 11:00:43 +0900
   (0.2ms)  SELECT sqlite_version(*)
Processing by Api::Rollback::ApplesController#create as */*
  Parameters: {"name"=>"国光", "apple"=>{"name"=>"国光"}}

# トランザクションが開始された
  TRANSACTION (0.1ms)  begin transaction
  ↳ app/controllers/api/rollback/apples_controller.rb:7:in `block in create'

  Apple Create (0.3ms)  INSERT INTO "apples" ("name", "created_at", "updated_at") VALUES (?, ?, ?)  [["name", "国光"], ["created_at", "2022-07-10 02:00:44.074838"], ["updated_at", "2022-07-10 02:00:44.074838"]]
  ↳ app/controllers/api/rollback/apples_controller.rb:7:in `block in create'
[ActiveJob]   Delayed::Backend::ActiveRecord::Job Create (0.2ms)  INSERT INTO "delayed_jobs" ("priority", "attempts", "handler", "last_error", "run_at", "locked_at", "failed_at", "locked_by", "queue", "created_at", "updated_at") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)  [["priority", 0], ["attempts", 0], ["handler", "--- !ruby/object:ActiveJob::QueueAdapters::DelayedJobAdapter::JobWrapper\njob_data:\n  job_class: DefaultQueueJob\n  job_id: 6a8f57c6-9b36-4479-a64a-30b5f4959cdd\n  provider_job_id:\n  queue_name: default\n  priority:\n  arguments:\n  - 国光\n  executions: 0\n  exception_executions: {}\n  locale: en\n  timezone: Tokyo\n  enqueued_at: '2022-07-10T02:00:44Z'\n"], ["last_error", nil], ["run_at", "2022-07-10 02:00:44.082236"], ["locked_at", nil], ["failed_at", nil], ["locked_by", nil], ["queue", "default"], ["created_at", "2022-07-10 02:00:44.082260"], ["updated_at", "2022-07-10 02:00:44.082260"]]
[ActiveJob]   ↳ app/controllers/api/rollback/apples_controller.rb:10:in `block in create'
[ActiveJob] Enqueued DefaultQueueJob (Job ID: 6a8f57c6-9b36-4479-a64a-30b5f4959cdd) to DelayedJob(default) with arguments: "国光"

# ここでトランザクションのロールバックが走った
  TRANSACTION (0.6ms)  rollback transaction
  ↳ app/controllers/api/rollback/apples_controller.rb:4:in `create'
Completed 200 OK in 15ms (Views: 0.3ms | ActiveRecord: 3.0ms | Allocations: 4673)

 
Rails console を確認しても、何も登録されていません。

>> Apple.count
  Apple Count (0.3ms)  SELECT COUNT(*) FROM "apples"
=> 0
>> Delayed::Job.count
  Delayed::Backend::ActiveRecord::Job Count (0.2ms)  SELECT COUNT(*) FROM "delayed_jobs"
=> 0

 

ジョブごとに Active Job のバックエンドを切り替える

ここまで、Active Job のバックエンドは Delayed::Job を使ってきました。

ただ、「一部のジョブだけ別の Active Job バックエンドを使いたい」というケースがあるかもしれません。

Railsガイドによると、一部だけ別のバックエンドにしたい場合は、ジョブで queue_adapter を定義すれば良さそうです。
4.2 バックエンドを設定する | Active Job の基礎 - Railsガイド

 
そこで今回は、一部のジョブだけ、Active Jobのデフォルトバックエンドである async への切り替えて実行してみます。
https://api.rubyonrails.org/v7.0/classes/ActiveJob/QueueAdapters/AsyncAdapter.html

class AsyncBackendJob < ApplicationJob
  # キュー名は、Delayed::Job と同じ
  queue_as :default

  # バックエンドだけ async
  self.queue_adapter = :async

  def perform(*args)
    puts "[async job] #{Time.zone.now} start ============>"
    puts "[async job] #{Time.zone.now} end   <============"
  end
end

 
ルーティングを追加し、Railsアプリを起動します。また、ワーカーは停止しておきます。

その状態で curl でリクエストします。

# Delayed::Job バックエンド
curl -X POST -H "Content-Type: application/json" -d '{"name":"国光"}' http://localhost:3000/api/queue_name/default_queues

# async バックエンド
curl -X POST -H "Content-Type: application/json" -d '{"name":"国光"}' http://localhost:3000/api/queue_backend/async_backends

 
Railsサーバのログを見ると、コントローラへのアクセスの後に、ジョブを実行していることが記録されていました。

Started POST "/api/queue_backend/async_backends" for 127.0.0.1 at 2022-07-10 11:11:19 +0900
Processing by Api::QueueBackend::AsyncBackendsController#create as */*
  Parameters: {"name"=>"国光", "async_backend"=>{"name"=>"国光"}}
[ActiveJob] Enqueued AsyncBackendJob (Job ID: 3a28fdf8-2b39-4293-ac2d-250b011ff660) to Async(default)
Completed 200 OK in 4ms (Views: 0.2ms | ActiveRecord: 0.0ms | Allocations: 848)


[ActiveJob] [AsyncBackendJob] [3a28fdf8-2b39-4293-ac2d-250b011ff660] Performing AsyncBackendJob (Job ID: 3a28fdf8-2b39-4293-ac2d-250b011ff660) from Async(default) enqueued at 2022-07-10T02:11:19Z
[async job] 2022-07-10 11:11:19 +0900 start ============>
[async job] 2022-07-10 11:11:19 +0900 end   <============
[ActiveJob] [AsyncBackendJob] [3a28fdf8-2b39-4293-ac2d-250b011ff660] Performed AsyncBackendJob (Job ID: 3a28fdf8-2b39-4293-ac2d-250b011ff660) from Async(default) in 3.85ms

 
なお、ワーカーを停止しているため、Delayed::Job バックエンドはキューに残ったままになっています。

 

Delayed::Job のワーカーの設定を変更する

Delayed::Job の README に、ワーカーの設定変更方法が記載されています。
https://github.com/collectiveidea/delayed_job#gory-details

 
そこで、 config/initializers/delayed_job_config.rb を作成し、設定を追加して試してみます。

また、動作確認用に、必ず落ちるジョブを作っておきます。

class AlwaysFailJob < ApplicationJob
  queue_as :default

  def perform(*args)
    puts "[fail job] #{Time.zone.now} start ============>"

    # 常にエラー
    raise StandardError

    puts "[fail job] #{Time.zone.now} end   <============"
  end
end

 
このジョブを起動するコントローラーはこちら。

class Api::AlwaysFailJobs::ApplesController < ApplicationController
  def create
    AlwaysFailJob.perform_later

    render json: { status: params[:name] }
  end
end

 

max_attempt (リトライ回数)

デフォルトは 25 のようです。

そこで、数値を色々変更し、動作を確認してみます。

なお、リトライ間隔については、等間隔での再実行ではなく、

On error, the job is scheduled again in 5 seconds + N ** 4, where N is the number of attempts or using the job's defined reschedule_at method.

https://github.com/collectiveidea/delayed_job#gory-details

とのことです。

 

max_attempt = 2

設定します。

# config/initializers/delayed_job_config.rb
Delayed::Worker.max_attempts = 2

 
workerを再起動し、curlでリクエストします。

curl -X POST -H "Content-Type: application/json" -d '{"name":"国光"}' http://localhost:3000/api/always_fail_jobs/apples

 
2回実行されました。

2回目のエラー時には FAILED permanently because of 2 consecutive failures と出ています。

また、しばらく待ってもリトライはされませんでした。

[Worker(host:*** pid:9568)] Job AlwaysFailJob [4a7ac3b1-4d1f-4884-a481-7484fbf10af1] from DelayedJob(default) with arguments: [] (id=142) (queue=default) RUNNING
[fail job] 2022-07-10 11:21:18 +0900 start ============>
[Worker(host:*** pid:9568)] Job AlwaysFailJob [4a7ac3b1-4d1f-4884-a481-7484fbf10af1] from DelayedJob(default) with arguments: [] (id=142) (queue=default) FAILED (0 prior attempts) with StandardError: StandardError
[Worker(host:*** pid:9568)] 1 jobs processed at 30.6636 j/s, 1 failed
[Worker(host:*** pid:9568)] Job AlwaysFailJob [4a7ac3b1-4d1f-4884-a481-7484fbf10af1] from DelayedJob(default) with arguments: [] (id=142) (queue=default) RUNNING
[fail job] 2022-07-10 11:21:29 +0900 start ============>
[Worker(host:*** pid:9568)] Job AlwaysFailJob [4a7ac3b1-4d1f-4884-a481-7484fbf10af1] from DelayedJob(default) with arguments: [] (id=142) (queue=default) FAILED (1 prior attempts) with StandardError: StandardError
[Worker(host:*** pid:9568)] Job AlwaysFailJob [4a7ac3b1-4d1f-4884-a481-7484fbf10af1] from DelayedJob(default) with arguments: [] (id=142) (queue=default) FAILED permanently because of 2 consecutive failures
[Worker(host:*** pid:9568)] 1 jobs processed at 59.3155 j/s, 1 failed

 

max_attempt = 1

1回だけ実行された後、 FAILED permanently because of 1 consecutive failures が表示されています。

[Worker(host:*** pid:10246)] Job AlwaysFailJob [0789bf63-34b3-439f-89f9-549ce98208d0] from DelayedJob(default) with arguments: [] (id=143) (queue=default) RUNNING
[fail job] 2022-07-10 11:27:10 +0900 start ============>
[Worker(host:*** pid:10246)] Job AlwaysFailJob [0789bf63-34b3-439f-89f9-549ce98208d0] from DelayedJob(default) with arguments: [] (id=143) (queue=default) FAILED (0 prior attempts) with StandardError: StandardError
[Worker(host:*** pid:10246)] Job AlwaysFailJob [0789bf63-34b3-439f-89f9-549ce98208d0] from DelayedJob(default) with arguments: [] (id=143) (queue=default) FAILED permanently because of 1 consecutive failures
[Worker(host:*** pid:10246)] 1 jobs processed at 34.0634 j/s, 1 failed

 

max_attempt = 0

1回だけ実行された後、 FAILED permanently because of 1 consecutive failures が表示されています。

[Worker(host:*** pid:10660)] Job AlwaysFailJob [78821bed-0214-42d2-a121-0a3e7bda59f5] from DelayedJob(default) with arguments: [] (id=144) (queue=default) RUNNING
[fail job] 2022-07-10 11:28:34 +0900 start ============>
[Worker(host:*** pid:10660)] Job AlwaysFailJob [78821bed-0214-42d2-a121-0a3e7bda59f5] from DelayedJob(default) with arguments: [] (id=144) (queue=default) FAILED (0 prior attempts) with StandardError: StandardError
[Worker(host:*** pid:10660)] Job AlwaysFailJob [78821bed-0214-42d2-a121-0a3e7bda59f5] from DelayedJob(default) with arguments: [] (id=144) (queue=default) FAILED permanently because of 1 consecutive failures
[Worker(host:*** pid:10660)] 1 jobs processed at 20.7310 j/s, 1 failed

 

destroy_failed_jobs = false (ジョブが失敗しても削除しない)

デフォルトでは、 destroy_failed_jobs = true になっているため、エラーになったジョブは delayed_jobs テーブルから削除されます。

「失敗時のジョブの情報を残したい」などの要件がある場合は、失敗したジョブも残すよう false へと変更します。

# config/initializers/delayed_job_config.rb

Delayed::Worker.destroy_failed_jobs = false

 
常に失敗するジョブのエンドポイントへ curl でリクエストします。

% curl -X POST -H "Content-Type: application/json" -d '{"name":"国光"}' http://localhost:3000/api/always_fail_jobs/apples

ジョブは失敗します。

[Worker(host:*** pid:10660)] Job AlwaysFailJob [cd497d7b-d58f-4c0a-b096-874b9c52d18c] from DelayedJob(default) with arguments: [] (id=145) (queue=default) RUNNING
[fail job] 2022-07-10 11:31:45 +0900 start ============>
[Worker(host:*** pid:10660)] Job AlwaysFailJob [cd497d7b-d58f-4c0a-b096-874b9c52d18c] from DelayedJob(default) with arguments: [] (id=145) (queue=default) FAILED (0 prior attempts) with StandardError: StandardError
[Worker(host:*** pid:10660)] Job AlwaysFailJob [cd497d7b-d58f-4c0a-b096-874b9c52d18c] from DelayedJob(default) with arguments: [] (id=145) (queue=default) FAILED permanently because of 1 consecutive failures
[Worker(host:*** pid:10660)] 1 jobs processed at 58.3737 j/s, 1 failed

 
delayed_jobs テーブルを見ると、エラーレコードが残ったままになっています。

失敗したジョブについては failed_at などが設定されています。

次のスクリーンショットの場合は、1行目のジョブが失敗しています。

 

max_run_time (ジョブの実行時間)

デフォルトは 4.hours です。

そこで、max_run_time を2秒に変更して、中で10秒sleepするジョブを実行してみます。

# config/initializers/delayed_job_config.rb

# ジョブがエラーになった時の繰り返し回数
# 0は1と同じ
Delayed::Worker.max_attempts = 2

# ジョブの実行時間
Delayed::Worker.max_run_time = 2.seconds

curlでリクエストします。

curl -X POST -H "Content-Type: application/json" -d '{"name":"秋映"}' http://localhost:3000/api/queue_name/custom_queues

 
ワーカーのログを見ると、2秒経過したところでジョブが失敗していました。

[Worker(host:*** pid:11637)] Job CustomQueueJob [742d6182-d3ab-4942-b9da-763bc4dfed69] from DelayedJob(custom) with arguments: ["秋映"] (id=147) (queue=custom) RUNNING
[custom queue job] (秋映) 2022-07-10 11:40:01 +0900 start ============>
[Worker(host:*** pid:11637)] Job CustomQueueJob [742d6182-d3ab-4942-b9da-763bc4dfed69] from DelayedJob(custom) with arguments: ["秋映"] (id=147) (queue=custom) FAILED (0 prior attempts) with Delayed::WorkerTimeout: execution expired (Delayed::Worker.max_run_time is only 2 seconds)
[Worker(host:*** pid:11637)] 1 jobs processed at 0.4868 j/s, 1 failed
[Worker(host:*** pid:11637)] Job CustomQueueJob [742d6182-d3ab-4942-b9da-763bc4dfed69] from DelayedJob(custom) with arguments: ["秋映"] (id=147) (queue=custom) RUNNING
[custom queue job] (秋映) 2022-07-10 11:40:13 +0900 start ============>
[Worker(host:*** pid:11637)] Job CustomQueueJob [742d6182-d3ab-4942-b9da-763bc4dfed69] from DelayedJob(custom) with arguments: ["秋映"] (id=147) (queue=custom) FAILED (1 prior attempts) with Delayed::WorkerTimeout: execution expired (Delayed::Worker.max_run_time is only 2 seconds)
[Worker(host:*** pid:11637)] Job CustomQueueJob [742d6182-d3ab-4942-b9da-763bc4dfed69] from DelayedJob(custom) with arguments: ["秋映"] (id=147) (queue=custom) FAILED permanently because of 2 consecutive failures
[Worker(host:*** pid:11637)] 1 jobs processed at 0.4938 j/s, 1 failed

 

read_ahead 設定がDBエンジンによっては無視される

項目の説明として

The default behavior is to read 5 jobs from the queue when finding an available job. You can configure this by setting Delayed::Worker.read_ahead.

とありました。

この説明を軽く読んだ時、「 read_ahead の分だけジョブを読んで、その中の優先度が高いものを実行する」なのかもしれないと感じました。

 
そこで、

# config/initializers/delayed_job_config.rb

Delayed::Worker.read_ahead = 3

と先読みを3件にするよう設定した後、7個目に優先度が高いものがジョブがある時にどうなるかを見てみることにしました。

ジョブを2つ用意します。

優先度: 30 (低い)

class LowPriorityJob < ApplicationJob
  queue_as :default
  queue_with_priority 30

  def perform(*args)
    puts "[low priority job] #{Time.zone.now} start ============>"
    puts "[low priority job] #{Time.zone.now} end   <============"
  end
end

優先度:10 (高い)

class HighPriorityJob < ApplicationJob
  queue_as :default
  queue_with_priority 10

  def perform(*args)
    puts "[high priority job] #{Time.zone.now} start ============>"
    puts "[high priority job] #{Time.zone.now} end   <============"
  end
end

次に、これらのジョブに対応するAPIエンドポイントを用意します。

 
準備ができたため、動作を確認します。ワーカーを止めた状態で、

  • 優先度が低いジョブを6個キューに入れる
  • 優先度が高いジョブを1個キューに入れる

となるよう、curlでリクエストします。

# 6回実行
curl -X POST -H "Content-Type: application/json" -d '{"name":"秋映"}' http://localhost:3000/api/priority/low_priorities
curl -X POST -H "Content-Type: application/json" -d '{"name":"秋映"}' http://localhost:3000/api/priority/low_priorities
curl -X POST -H "Content-Type: application/json" -d '{"name":"秋映"}' http://localhost:3000/api/priority/low_priorities
curl -X POST -H "Content-Type: application/json" -d '{"name":"秋映"}' http://localhost:3000/api/priority/low_priorities
curl -X POST -H "Content-Type: application/json" -d '{"name":"秋映"}' http://localhost:3000/api/priority/low_priorities
curl -X POST -H "Content-Type: application/json" -d '{"name":"秋映"}' http://localhost:3000/api/priority/low_priorities

# 1回実行
curl -X POST -H "Content-Type: application/json" -d '{"name":"秋映"}' http://localhost:3000/api/priority/high_priorities

 
ワーカーを起動してログを確認したところ、優先度:高いから実行されているように見えました。

# 優先度:高が1回実行される
[Worker(host:*** pid:12532)] Job HighPriorityJob [a94294d8-5810-4902-87d7-120a1cd71b19] from DelayedJob(default) with arguments: [] (id=154) (queue=default) RUNNING
[high priority job] 2022-07-10 11:50:26 +0900 start ============>
[high priority job] 2022-07-10 11:50:26 +0900 end   <============
[Worker(host:*** pid:12532)] Job HighPriorityJob [a94294d8-5810-4902-87d7-120a1cd71b19] from DelayedJob(default) with arguments: [] (id=154) (queue=default) COMPLETED after 0.0183

# 続いて、優先度:低が6回実行される
[Worker(host:*** pid:12532)] Job LowPriorityJob [66de3de9-d61e-4fd6-a315-275bacd3a4d5] from DelayedJob(default) with arguments: [] (id=148) (queue=default) RUNNING
[low priority job] 2022-07-10 11:50:26 +0900 start ============>
[low priority job] 2022-07-10 11:50:26 +0900 end   <============
[Worker(host:*** pid:12532)] Job LowPriorityJob [66de3de9-d61e-4fd6-a315-275bacd3a4d5] from DelayedJob(default) with arguments: [] (id=148) (queue=default) COMPLETED after 0.0078
[Worker(host:*** pid:12532)] Job LowPriorityJob [db85ccb8-2fb6-4c27-84d1-c5e4afed5905] from DelayedJob(default) with arguments: [] (id=149) (queue=default) RUNNING
[low priority job] 2022-07-10 11:50:26 +0900 start ============>
[low priority job] 2022-07-10 11:50:26 +0900 end   <============
[Worker(host:*** pid:12532)] Job LowPriorityJob [db85ccb8-2fb6-4c27-84d1-c5e4afed5905] from DelayedJob(default) with arguments: [] (id=149) (queue=default) COMPLETED after 0.0075
[Worker(host:*** pid:12532)] Job LowPriorityJob [633dc530-94f4-43b6-aa6d-8257099c5110] from DelayedJob(default) with arguments: [] (id=150) (queue=default) RUNNING
[low priority job] 2022-07-10 11:50:26 +0900 start ============>
[low priority job] 2022-07-10 11:50:26 +0900 end   <============
[Worker(host:*** pid:12532)] Job LowPriorityJob [633dc530-94f4-43b6-aa6d-8257099c5110] from DelayedJob(default) with arguments: [] (id=150) (queue=default) COMPLETED after 0.0081
[Worker(host:*** pid:12532)] Job LowPriorityJob [9000f99a-6f67-4d55-ba96-558548245fd9] from DelayedJob(default) with arguments: [] (id=151) (queue=default) RUNNING
[low priority job] 2022-07-10 11:50:26 +0900 start ============>
[low priority job] 2022-07-10 11:50:26 +0900 end   <============
[Worker(host:*** pid:12532)] Job LowPriorityJob [9000f99a-6f67-4d55-ba96-558548245fd9] from DelayedJob(default) with arguments: [] (id=151) (queue=default) COMPLETED after 0.0080
[Worker(host:*** pid:12532)] Job LowPriorityJob [fa1401cf-5f9f-4a60-abac-90d13056376d] from DelayedJob(default) with arguments: [] (id=152) (queue=default) RUNNING
[low priority job] 2022-07-10 11:50:26 +0900 start ============>
[low priority job] 2022-07-10 11:50:26 +0900 end   <============
[Worker(host:*** pid:12532)] Job LowPriorityJob [fa1401cf-5f9f-4a60-abac-90d13056376d] from DelayedJob(default) with arguments: [] (id=152) (queue=default) COMPLETED after 0.0078
[Worker(host:*** pid:12532)] Job LowPriorityJob [1a71b2a8-39fb-4828-a063-33c29de9e503] from DelayedJob(default) with arguments: [] (id=153) (queue=default) RUNNING
[low priority job] 2022-07-10 11:50:26 +0900 start ============>
[low priority job] 2022-07-10 11:50:26 +0900 end   <============
[Worker(host:*** pid:12532)] Job LowPriorityJob [1a71b2a8-39fb-4828-a063-33c29de9e503] from DelayedJob(default) with arguments: [] (id=153) (queue=default) COMPLETED after 0.0075
[Worker(host:*** pid:12532)] 7 jobs processed at 56.1185 j/s, 0 failed

 
そのため、

read_ahead の分だけジョブを読んで、その中の優先度が高いものを実行する」

という目的の設定ではないと分かりました。

 
そこで、 read_ahead について詳しく知るため、 read_ahead についての記事を読みました。

 
次に、ソースコードを読んだところ、 reserve_with_scope_using_default_sql の中で read_ahead を使っていました。
https://github.com/collectiveidea/delayed_job_active_record/blob/v4.1.7/lib/delayed/backend/active_record.rb#L100-L167

そこではDBエンジンとして

あたりを使っていると reserve_with_scope_using_default_sql を使うロジックに入らないように見えました。

もし、上記DBを使っていて read_ahead を使う reserve_with_scope_using_default_sql ロジックに入りたい場合は、

Delayed::Backend::ActiveRecord.configuration.reserve_sql_strategy = :default_sql

な設定が必要そうでした。

 
今回試している環境のDBエンジンは SQLite なものの、自分の関係する本番環境では無視されるようだったため、 read_ahead についてはこれ以上追求することをやめました。

 

Action Mailer と Delayed::Job を一緒に使う

Delayed::Job の README によると、Delayed::Job は Action Mailer とともに使えるようでしたので、ためしてみます。
https://github.com/collectiveidea/delayed_job#rails-mailers

 
なお、実際にメールを送信すると確認が手間なので、今回は Letter Opener を使ってメールを受信してみます。
https://github.com/ryanb/letter_opener

Gemfileに追加して bundle install します。

group :development do
  # メールの受信
  gem "letter_opener"
end

 

Action Mailer と Delayed::Job を組み合わせてみる

使い方としては

の2パターンがあるようです。

今回は Active Job のインタフェースに合わせ、

Action Mailer の .deliver_later を使う

な方式で実装して試してみます。

 
まずは ActionMailer のジェネレータを実行します。

今回は HelloWorld という Mailer を作成します。

% bin/rails generate mailer HelloWorld
      create  app/mailers/hello_world_mailer.rb
      invoke  erb
      create    app/views/hello_world_mailer
      invoke  test_unit
      create    test/mailers/hello_world_mailer_test.rb
      create    test/mailers/previews/hello_world_mailer_preview.rb

 
Mailerを編集します。

# app/mailers/hello_world_mailer.rb

class HelloWorldMailer < ApplicationMailer
  def welcome_email
    @name = params[:name]
    mail(to: 'bar@example.com', subject: 'Hello, world!')
  end
end

 
メールのビュー (app/views/hello_world_mailer/welcome_email.text.erb) も作成します。

<%= @name %>

Hello, world!

 
コントローラーを作成します。

class Api::Email::OnlyEmailsController < ApplicationController
  def create
    HelloWorldMailer.with(name: params[:name]).welcome_email.deliver_later

    render json: { status: params[:name] }
  end
end

 
ルーティングを追加した後、Railsとジョブワーカーを起動し、curlでアクセスします。

% curl -X POST -H "Content-Type: application/json" -d '{"name":"国光"}' http://localhost:3000/api/email/only_emails

 
すると、ワーカーがメールのジョブを実行し、メールが受信できました。

メールジョブの設定を行う

キュー名や優先度をメールジョブ全体で設定

キュー名は、config/applicantion.rbconfig.action_mailer.deliver_later_queue_name にて設定できます。

 
一方、優先度については Mailer 用の initializer を作成し、その中で ActionMailer::MailDeliveryJob.priority を指定することになります。
How to set priority of Rails ActionMailer - Stack Overflow

 
それぞれ定義してみます。

キュー名

# config/application.rb

# ...
module RailsDelayedJobSample
  class Application < Rails::Application
    # Initialize configuration defaults for originally generated Rails version.
    config.load_defaults 7.0

    # ...

    # Action Mailerの設定
    # キュー名を設定
    # https://guides.rubyonrails.org/configuring.html#config-action-mailer-deliver-later-queue-name
    config.action_mailer.deliver_later_queue_name = 'mail_queue'
  end
end

 
優先度

# config/initializer/mail_delivery_job.rb

ActionMailer::MailDeliveryJob.priority = 30

 
準備ができたため、ワーカーを停止した後、Railsを起動して curl でリクエストします。

curl -X POST -H "Content-Type: application/json" -d '{"name":"国光"}' http://localhost:3000/api/email/only_emails

 
delayed_jobs テーブルを見ると、キュー名や優先度が指定されていました。

 
ワーカーを起動すると、メールが送信されました。

ログを見ると、ワーカーでメールを送信していることが分かります。

[Worker(host:*** pid:15576)] Job ActionMailer::MailDeliveryJob [e9f626e0-e971-4f15-978e-bb524a9c85ab] from DelayedJob(mail_queue) with arguments: ["HelloWorldMailer", "welcome_email", "deliver_now", {"params"=>{"name"=>"国光", "_aj_symbol_keys"=>["name"]}, "args"=>[], "_aj_ruby2_keywords"=>["params", "args"]}] (id=157) (queue=mail_queue) RUNNING
[Worker(host:*** pid:15576)] Job ActionMailer::MailDeliveryJob [e9f626e0-e971-4f15-978e-bb524a9c85ab] from DelayedJob(mail_queue) with arguments: ["HelloWorldMailer", "welcome_email", "deliver_now", {"params"=>{"name"=>"国光", "_aj_symbol_keys"=>["name"]}, "args"=>[], "_aj_ruby2_keywords"=>["params", "args"]}] (id=157) (queue=mail_queue) COMPLETED after 0.1572
[Worker(host:*** pid:15576)] 1 jobs processed at 5.1985 j/s, 0 failed

 

キューや優先度を送信するメールごとに設定

Action Mailer のdeliver_later() の引数にて指定することで、設定がオーバーライドされるようです。 https://api.rubyonrails.org/classes/ActionMailer/MessageDelivery.html#method-i-deliver_later

 
コントローラーで deliver_later の引数を設定します。

class Api::Email::DeliverOptionsController < ApplicationController
  def create
    HelloWorldMailer.with(name: params[:name]).welcome_email.deliver_later(
      queue: 'override_queue',
      priority: 45
    )

    render json: { status: params[:name] }
  end
end

 
ルーティングを追加、ワーカーを停止し、Railsを起動してから curl でリクエストします。

curl -X POST -H "Content-Type: application/json" -d '{"name":"国光"}' http://localhost:3000/api/email/deliver_options

 
delayed_jobs テーブルを見ると、データが入っています。

 
ジョブワーカーを起動するとメールも送信されました。

[Worker(host:*** pid:16625)] Job ActionMailer::MailDeliveryJob [6f78d26c-fc0d-4d67-9ec1-eb101707ed0d] from DelayedJob(override_queue) with arguments: ["HelloWorldMailer", "welcome_email", "deliver_now", {"params"=>{"name"=>"国光", "_aj_symbol_keys"=>["name"]}, "args"=>[], "_aj_ruby2_keywords"=>["params", "args"]}] (id=158) (queue=override_queue) RUNNING
[Worker(host:*** pid:16625)] Job ActionMailer::MailDeliveryJob [6f78d26c-fc0d-4d67-9ec1-eb101707ed0d] from DelayedJob(override_queue) with arguments: ["HelloWorldMailer", "welcome_email", "deliver_now", {"params"=>{"name"=>"国光", "_aj_symbol_keys"=>["name"]}, "args"=>[], "_aj_ruby2_keywords"=>["params", "args"]}] (id=158) (queue=override_queue) COMPLETED after 0.1779
[Worker(host:*** pid:16625)] 1 jobs processed at 4.5583 j/s, 0 failed

 

メールジョブと通常のジョブを一緒にロールバックする

メールジョブでも Delayed::Job を使う場合

  • モデルの保存
  • 一般のジョブをキューへ登録
  • メールジョブをキューへ登録

を1つのトランザクションとして処理できます。

そこで、トランザクション内で上記の各処理を実行し、最後にトランザクションロールバックするとどうなるかを確認してみます。

 
まずは、コントローラーでトランザクションを使って実装します。

class Api::Email::TransactionsController < ApplicationController
  def create
    ActiveRecord::Base.transaction do
      HelloWorldMailer.with(name: params[:name]).welcome_email.deliver_later

      Apple.create(name: params[:name])

      DefaultQueueJob.perform_later(params[:name])

      raise ActiveRecord::Rollback
    end

    render json: { status: params[:name] }
  end
end

 
Rails console を使って、事前のモデルの状況を確認します。いずれも登録されていません。

>> Apple.count
   (1.7ms)  SELECT sqlite_version(*)
  Apple Count (0.5ms)  SELECT COUNT(*) FROM "apples"
=> 0
>> Delayed::Job.count
  Delayed::Backend::ActiveRecord::Job Count (0.3ms)  SELECT COUNT(*) FROM "delayed_jobs"
=> 0

 
次に Rails を起動し、curlでリクエストします。

curl -X POST -H "Content-Type: application/json" -d '{"name":"国光"}' http://localhost:3000/api/email/transactions

 
Railsのログを見ると、トランザクションロールバックされています。

Started POST "/api/email/transactions" for 127.0.0.1 at 2022-07-10 13:16:35 +0900
Processing by Api::Email::TransactionsController#create as */*
  Parameters: {"name"=>"国光", "transaction"=>{"name"=>"国光"}}
   (0.1ms)  SELECT sqlite_version(*)
  ↳ app/controllers/api/email/transactions_controller.rb:3:in `create'

# トランザクション開始
[ActiveJob]   TRANSACTION (0.1ms)  begin transaction
[ActiveJob]   ↳ app/controllers/api/email/transactions_controller.rb:4:in `block in create'

[ActiveJob]   Delayed::Backend::ActiveRecord::Job Create (0.6ms)  INSERT INTO "delayed_jobs" ("priority", "attempts", "handler", "last_error", "run_at", "locked_at", "failed_at", "locked_by", "queue", "created_at", "updated_at") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)  [["priority", 30], ["attempts", 0], ["handler", "--- !ruby/object:ActiveJob::QueueAdapters::DelayedJobAdapter::JobWrapper\njob_data:\n  job_class: ActionMailer::MailDeliveryJob\n  job_id: c3555602-bac5-47f5-9038-8246b3194306\n  provider_job_id:\n  queue_name: mail_queue\n  priority: 30\n  arguments:\n  - HelloWorldMailer\n  - welcome_email\n  - deliver_now\n  - params:\n      name: 国光\n      _aj_symbol_keys:\n      - name\n    args: []\n    _aj_ruby2_keywords:\n    - params\n    - args\n  executions: 0\n  exception_executions: {}\n  locale: en\n  timezone: Tokyo\n  enqueued_at: '2022-07-10T04:16:35Z'\n"], ["last_error", nil], ["run_at", "2022-07-10 04:16:35.040988"], ["locked_at", nil], ["failed_at", nil], ["locked_by", nil], ["queue", "mail_queue"], ["created_at", "2022-07-10 04:16:35.041026"], ["updated_at", "2022-07-10 04:16:35.041026"]]
[ActiveJob]   ↳ app/controllers/api/email/transactions_controller.rb:4:in `block in create'
[ActiveJob] Enqueued ActionMailer::MailDeliveryJob (Job ID: c3555602-bac5-47f5-9038-8246b3194306) to DelayedJob(mail_queue) with arguments: "HelloWorldMailer", "welcome_email", "deliver_now", {:params=>{:name=>"国光"}, :args=>[]}
  Apple Create (0.2ms)  INSERT INTO "apples" ("name", "created_at", "updated_at") VALUES (?, ?, ?)  [["name", "国光"], ["created_at", "2022-07-10 04:16:35.052783"], ["updated_at", "2022-07-10 04:16:35.052783"]]
  ↳ app/controllers/api/email/transactions_controller.rb:6:in `block in create'
[ActiveJob]   Delayed::Backend::ActiveRecord::Job Create (0.1ms)  INSERT INTO "delayed_jobs" ("priority", "attempts", "handler", "last_error", "run_at", "locked_at", "failed_at", "locked_by", "queue", "created_at", "updated_at") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)  [["priority", 0], ["attempts", 0], ["handler", "--- !ruby/object:ActiveJob::QueueAdapters::DelayedJobAdapter::JobWrapper\njob_data:\n  job_class: DefaultQueueJob\n  job_id: c5477611-3075-4c1f-a8a2-f22e7a2b049f\n  provider_job_id:\n  queue_name: default\n  priority:\n  arguments:\n  - 国光\n  executions: 0\n  exception_executions: {}\n  locale: en\n  timezone: Tokyo\n  enqueued_at: '2022-07-10T04:16:35Z'\n"], ["last_error", nil], ["run_at", "2022-07-10 04:16:35.057817"], ["locked_at", nil], ["failed_at", nil], ["locked_by", nil], ["queue", "default"], ["created_at", "2022-07-10 04:16:35.057840"], ["updated_at", "2022-07-10 04:16:35.057840"]]
[ActiveJob]   ↳ app/controllers/api/email/transactions_controller.rb:8:in `block in create'
[ActiveJob] Enqueued DefaultQueueJob (Job ID: c5477611-3075-4c1f-a8a2-f22e7a2b049f) to DelayedJob(default) with arguments: "国光"

# ロールバックが発生
  TRANSACTION (0.4ms)  rollback transaction
  ↳ app/controllers/api/email/transactions_controller.rb:3:in `create'
Completed 200 OK in 25ms (Views: 0.2ms | ActiveRecord: 2.0ms | Allocations: 8209)

 
また、各テーブルには何も登録されていません。

>> Apple.count
  Apple Count (0.2ms)  SELECT COUNT(*) FROM "apples"
=> 0
>> Delayed::Job.count
  Delayed::Backend::ActiveRecord::Job Count (0.2ms)  SELECT COUNT(*) FROM "delayed_jobs"
=> 0

 
以上より、

  • モデルの保存
  • 一般のジョブをキューへ登録
  • メールジョブをキューへ登録

を1つのトランザクションとして処理できました。

 

Delayed::Job + RSpec によるテストコードを書く

ここまでで各実装方法を見てきました。

ここからは Delayed::Job を使った時のテストコードを書いてみます。

なお、今回のテストコードは RSpec で書いてみますので、 rails-rspec をインストールしておきます。
https://github.com/rspec/rspec-rails

 

Job specを書く

キューに入ることを確認する

RSpechave_been_enqueued マッチャを使い、 perform_later を実行したら、キュー名 heavy に1回入ることを確認します。
have_been_enqueued matcher - Matchers - RSpec Rails - RSpec - Relish

RSpec.describe 'HeavyQueueJob', type: :job do
  describe '#perform_later' do
    context 'RSpecの have_enqueued_job マッチャを使う' do
      it '名前付きキューに1回入ること' do
      it '名前付きキューに1回入ること' do
        expect {
          HeavyQueueJob.perform_later
        }.to have_enqueued_job(HeavyQueueJob).exactly(:once).on_queue(:heavy)
      end
    end
  end
end

 
実行するとテストが落ちます。

StandardError: To use ActiveJob matchers set `ActiveJob::Base.queue_adapter = :test`

 
queue_adapter:delayed_job のままではエラーになるようです。

そのため、 before:test に変更し、 after で元に戻す (:delayed_job) ようにすると、テストがパスします。

# ...
context 'RSpecの have_enqueued_job マッチャを使う' do
  # 追加
  before do
    ActiveJob::Base.queue_adapter = :test
  end

  after do
    ActiveJob::Base.queue_adapter = :delayed_job
  end

  # ...

 

Delayed::Job テーブルにデータが存在することを確認する

こちらは change を使って、テーブルにデータが増えていることを確認します。

また、今回は追加された中身( priority )も確認するため、 have_attributes マッチャを使います。
have_attributes matcher - Built in matchers - RSpec Expectations - RSpec - Relish

なお、こちらの場合はDBの中身を確認することから、キューの中身が実行されないよう queue_adapter:delayed_job となっている必要があります。

context '自分でテーブルを調べる' do
  it 'job specの場合は、ジョブがキューに入ること' do
    expect {
      HeavyQueueJob.perform_later
    }.to change(Delayed::Job, :count).from(0).to(1)
  end

  it '優先度も設定されていること' do
    HeavyQueueJob.perform_later

    actual = Delayed::Job.first
    expect(actual).to have_attributes(priority: 10)
  end
end

 

ジョブが失敗の上限に達しても、Delayed::Job テーブルからジョブが削除されないことを確認する

今回のRailsアプリでは、ジョブが失敗の上限に達しても、Delayed::Job テーブルからジョブが削除されないよう

Delayed::Worker.destroy_failed_jobs = false

となっているのが仕様とします。

そのため、ジョブが削除されないことをテストで確認します。

なお、 Delayed::Job Workerには

  • work_off
  • run

の2つのメソッドがあります。
https://www.rubydoc.info/gems/delayed_job/Delayed/Worker#work_off-instance_method

ただ、wrok_off の場合、失敗が2回目以降だと Delayed::Job テーブルの attempts が更新されません。

そのため、今回は run を使ってワーカーによるジョブ実行を行っています。

require 'rails_helper'

RSpec.describe 'HeavyQueueJob', type: :job do
  describe '#perform_now' do
    context '失敗の上限になった場合でも、delayed_jobsテーブルにレコードが残ったままになっているか' do
      it 'キューにデータが残り続けていること' do
        expect {
          # キューに入れる
          AlwaysFailJob.perform_later

          # ジョブを2回実行する
          Delayed::Worker.new.run(Delayed::Job.first)
          Delayed::Worker.new.run(Delayed::Job.first)

        }.to change(Delayed::Job, :count).from(0).to(1) # テーブルには1件だけ追加されていること

        # ジョブが残っており、2回実行されたこと
        actual = Delayed::Job.first
        expect(actual).to have_attributes(attempts: 2)

        # なお、実際の Delayed::Job Worker と異なり、3回目も実行できる
        Delayed::Worker.new.run(Delayed::Job.first)
        expect(actual.reload).to have_attributes(attempts: 3)
      end
    end
  end
end

 

Request spec を書く

次は Request spec で動作を確認します。

 

【注意】 Rails6以降、Request spec では Active Job の queue_adapter が TestAdapter になる

Rails6以降、Request specのデフォルトだと、テストを実行する時のデフォルトの queue_adapterTestAdapter (:test) で固定されます。

固定された原因として、以下では ActionDispatch::SystemTestCase にて ActiveJob::TestHelper を include したため、 TestAdapter が :test に固定されるとあります。

 
では、RSpecの Request spec は何に相当するかを調べたところ、READMEによると ActionDispatch::IntegrationTest でした。
https://github.com/rspec/rspec-rails#what-tests-should-i-write

ActionDispatch::IntegrationTest については、以下の issue やプルリクで ActiveJob::TestHelper が include されるようになりました。

そのため、 ActionDispatch::SystemTestCase 同様、 queue_adapter:test に固定されることとなっているようです。

 
queue_adapter:test となることの影響として、テストコード内ではジョブがインラインで実行されるようになります。

デフォルトでは ActiveJob::TestCase がキューアダプタを :test に設定してジョブがインラインで実行されるようにします。

13.1 基本のテストケース | Rails テスティングガイド - Railsガイド

 
つまり、Request specの中で「 Delayed::Job のキューテーブルにレコードが存在すること」のようなテストコードを書いていると、デフォルトのままでは失敗してしまいます。

これについては、RSpecのマッチャ have_enqueued_mail などで代替できそうに見えます。
https://relishapp.com/rspec/rspec-rails/v/5-1/docs/matchers/have-enqueued-mail-matcher

しかし、そのマッチャの挙動が

  • キューに入った形跡があることは確認できる
  • Delayed::Job のレコードがあるかどうかは確認できない

となることから、「Delayed::Job のテーブルにレコードが存在すること」まで確認したい場合に困ってしまいます。

 
queue_adapter を差し替える方法としては

などが見つかりました。

ただ、RSpechave_been_enqueued マッチャを使っている場合、 queue_adapter:test でないと動作しなくなります。

そこで、局所的に差し替えられる、上記の一番最後の案

TestAdapterを差し替えたいところだけ、 before でキューを差し替える

にて今回は実装していきます。

 

指定したキューに登録されることを確認する

Job specの場合は have_enqueued_job を使いましたが、今回は have_been_enqueued を使ってみます。
have_been_enqueued matcher - Matchers - RSpec Rails - RSpec - Relish

なお、先程の注意にある通り、Request spec の queue_adapter:test になりますが、 have_been_enqueued マッチャは :test のままで良いです。

RSpec.describe 'Api::Priority::HighPrioritiesController', type: :request do
  describe 'POST /api/priority/high_priorities' do
    context 'RSpecの have_been_enqueued マッチャを使う' do
      it 'POSTすると、キュー default に1回入る' do
        post api_priority_high_priorities_path, params: { name: 'test' }

        expect(HighPriorityJob).to have_been_enqueued.exactly(:once).on_queue(:default)
      end
    end
  end
end

 

Delayed::Job テーブルにデータが存在することを確認する

今回は優先度が想定通りかをテーブルの中を見て確認します。

なお、Reqeust spec でテーブルの中身を使う場合は、 queue_adapter の差し替えを行わないよう設定します。

context '自分でテーブルを調べる' do
  before do
    queue_adapter_changed_jobs.each(&:disable_test_adapter)
  end

  it 'ジョブが delayed_job テーブルに登録され、優先度が10であること' do
    expect {
      post api_priority_high_priorities_path, params: { name: 'シナノスイート' }
    }.to change(Delayed::Job, :count).from(0).to(1)

    actual = Delayed::Job.first
    expect(actual).to have_attributes(priority: 10)
  end
end

 

ジョブが失敗の上限に達しても、Delayed::Job テーブルからジョブが削除されないことを確認する

Job spec同様にして確認できます。

なお、テーブルの中身を確認しているため、 queue_adapter:test へ差し替わらないようにします。

require 'rails_helper'

RSpec.describe 'Api::AlwaysFailJobs::ApplesController', type: :request do
  describe 'POST /api/always_fail_jobs/apples' do
    context '失敗回数の上限に達した場合' do
      before do
        queue_adapter_changed_jobs.each(&:disable_test_adapter)
      end

      it 'キューにデータが残り続けていること' do
        expect {
          post api_always_fail_jobs_apples_path, params: { name: '秋映' }
        }.to change(Delayed::Job, :count).from(0).to(1)

        Delayed::Worker.max_attempts.times do |i|
          # work_off だと2回目以降でエラーになった場合に Delayed::Job への更新が行われないっぽいので、
          # run() でジョブを明示的に指定して実行する
          Delayed::Worker.new.run(Delayed::Job.first)

          expect(Delayed::Job.first).to have_attributes(attempts: i + 1)
        end

        # ちなみに、実際のワーカーと異なり、runではもう一回実行することもできる。この場合は attempts がインクリメントされる
        Delayed::Worker.new.run(Delayed::Job.first)
        expect(Delayed::Job.first).to have_attributes(attempts: 3)
      end
    end
  end
end

 

メールと組み合わせた時のspecを書く

メールがキューに入るかを確認する

【注意】 have_enqueued_mail マッチャは、Rails7 + rspec-rails 5系の環境では動かない

RSpecでは have_enqueued_mail マッチャを使うことで、メールがキューに入ったかを確認できます。
have_enqueued_mail matcher - Matchers - RSpec Rails - RSpec - Relish

 
しかし、Rails7 + rspec-rails 5系の環境で have_enqueued_mail マッチャを使うとエラーになります。

NameError:
  uninitialized constant ActionMailer::DeliveryJob

    job[:job] <= ActionMailer::DeliveryJob
                                ^^^^^^^^^^^^^
  Did you mean?  ActionMailer::MailDeliveryJob

 
以下のissueに原因が記載されています。
uninitialized constant ActionMailer::DeliveryJob with latest rails/master · Issue #2531 · rspec/rspec-rails

issueの最後に

This has been released as 6.0.0.rc1 theres still potential for the breaking change to the mailer api to do with params / args matching, but those needing support from a tagged release can now get this via rc1.

Rails 7 support is only via version 6.x per our versioning strategy.

https://github.com/rspec/rspec-rails/issues/2531#issuecomment-1086959613

とあるように、Rails7 では rspec-rails 6系を使ったほうが良さそうです。

 

have_enqueued_mailマッチャを使ってメールのキュー登録を確認する

以下のようにすることで、キューに入る回数とキュー名を確認できます。

RSpec.describe 'Api::Email::DeliverOptionsController', type: :request do
  describe 'POST /api/email/deliver_options' do
    context 'キューに入るかを確認' do
      it 'キュー名「override_queue」というキューに1回だけ入る' do
        expect {
          post api_email_deliver_options_path, params: { name: '王林' }
        }.to have_enqueued_mail(HelloWorldMailer, :welcome_email)
               .exactly(1) # 回数
               .on_queue(:override_queue) # キュー名
      end
    end
  end
end

 

メールが送信されることを確認する

ActiveJob::TestHelperperform_enqueued_jobs を使うことで、キューに入ったメールを送信できます。 https://api.rubyonrails.org/v7.0.3/classes/ActiveJob/TestHelper.html#method-i-perform_enqueued_jobs

また、テストでメールを送信すると ActionMailer::Base.deliveries に中身が含まれます。
Action Mailer の基礎 - Railsガイド

そのため、ActionMailer::Base.deliveries から取り出して確認します。

context '送信されたメールを確認' do
  it 'メールが1通送信され、中身も想定した通りであること' do
    expect {
      perform_enqueued_jobs do
        post api_email_deliver_options_path, params: { name: 'シナノスイート' }
      end
    }.to change(ActionMailer::Base.deliveries, :count).from(0).to(1)

    actual = ActionMailer::Base.deliveries[0]

    # Rails全体の設定値 config.action_mailer.default_options = { from: 'no-reply@example.com' } ではなく
    # ApplicationMailer で設定した from@example.com が指定されていること
    expect(actual.from).to eq ['from@example.com']

    expect(actual.to).to eq ['bar@example.com']
    expect(actual.subject).to eq 'Hello, world!'

    expect(actual.body).to include('シナノスイート')
    expect(actual.body).to include('Hello, world!')

  end
end

 

トランザクションロールバックしたら、何もDBに入っていないか確認する

ここでは、プロダクションコードでトランザクション & ロールバックを行っているコントローラに対するテストを書きます。

class Api::Email::TransactionsController < ApplicationController
  def create
    ActiveRecord::Base.transaction do
      HelloWorldMailer.with(name: params[:name]).welcome_email.deliver_later

      Apple.create(name: params[:name])

      DefaultQueueJob.perform_later(params[:name])

      raise ActiveRecord::Rollback
    end

    render json: { status: params[:name] }
  end
end

 

RSpec::Matchers.define_negated_matcherについて

HelloWorldMailerDefaultQueueJob の両方がキューに登録されていないことを確認するため、

expect {
  post api_email_transactions_path, params: { name: '王林' }
}.not_to have_enqueued_mail(HelloWorldMailer, :welcome_email)
       .and have_enqueued_job

のように書きたくなりますが、 NotImplementedError になります。

NotImplementedError:
  `expect(...).not_to matcher.and matcher` is not supported, since it creates a bit of an ambiguity.
  Instead, define negated versions of whatever matchers you wish to negate with `RSpec::Matchers.define_negated_matcher`
  and use `expect(...).to matcher.and matcher`.

エラーメッセージにある通り、 RSpec::Matchers.define_negated_matcher を定義すれば良さそうです。

 

キューに入らないことを確認する (が、ロールバックしてもキューに登録された形跡あり)

ファイルの冒頭に RSpec::Matchers.define_negated_matcher を追加してコードを書きます。

RSpec::Matchers.define_negated_matcher :not_have_enqueued_mail, :have_enqueued_mail
RSpec::Matchers.define_negated_matcher :not_have_enqueued_job, :have_enqueued_job

RSpec.describe 'Api::Email::TransactionsController', type: :request do
  describe 'POST /api/email/transactions' do
    context 'トランザクションでロールバックした場合' do
      context 'キューの確認' do
        it 'メールとジョブを確認すると、キューに形跡はある模様' do
          expect {
            post api_email_transactions_path, params: { name: '王林' }
          }.to not_have_enqueued_mail(HelloWorldMailer, :welcome_email)
                 .and not_have_enqueued_job
        end
      end
    end
  end
end

 
しかし、このテストはパスせず、以下のメッセージが出力されます。

ロールバックしているはずなのに、キューに登録されたことは検知されているようです。

   expected not to enqueue HelloWorldMailer.welcome_email at least 1 time but enqueued 1

...and:

   expected not to enqueue at least 1 jobs, but enqueued 2

そのため、 pending を使って、落ちることが正しいようなテストにしておきます。

it 'メールとジョブを確認すると、キューに形跡はある模様' do
  pending('以下のspecがfailするので、キューには入った形跡がある')

  expect {
    post api_email_transactions_path, params: { name: '王林' }
  }.to not_have_enqueued_mail(HelloWorldMailer, :welcome_email)
         .and not_have_enqueued_job
end

 

テーブルにジョブが登録されていないことを確認する

たとえキューに入っていた形跡があったとしても、Delayed::Job テーブルに登録されていなければワーカーによってジョブは実行されません。

そこで、 queue_adapter:test としないよう設定した上で、テーブルにジョブが登録されていないことを確認すると、テストがパスします。

context 'テーブルを確認' do
  before do
    queue_adapter_changed_jobs.each(&:disable_test_adapter)
  end

  it 'ジョブが delayed_job テーブルに登録されていないこと' do
    expect {
      post api_email_transactions_path, params: { name: 'シナノスイート' }
    }.not_to change(Delayed::Job, :count)
  end
end

 
以上でテストコードでも一通りの挙動を確認できました。

 

ソースコード

Gtihubに上げました。
https://github.com/thinkAmi-sandbox/rails_delayed_job-sample

今回のプルリクはこちらです。
https://github.com/thinkAmi-sandbox/rails_delayed_job-sample/pull/1