Rails + Active Job + Delayed::Jobな構成でジョブを作ってみた時に、色々調べたためメモを残します。
なお、記事が長いため、途中のソースコードは説明使う部分以外を省略しています。
必要に応じて、実際のソースコードを Github リポジトリでご確認ください。
https://github.com/thinkAmi-sandbox/rails_delayed_job-sample
目次
- 環境
- Delayed::Jobを試す
- Delayed::Job + RSpec によるテストコードを書く
- ソースコード
環境
- Rails 7.0.3
- delayed_job 4.1.10
- delayed_job_active_record 4.1.7
- rspec-rails 6.0.0.rc1
環境構築
rails new
今回は、Rails製APIアプリ + Delayed::Job の組み合わせで色々試してみます。
rails new
して bundle install
します。
% bundle exec rails new rails_delayed_job_sample --api --skip-bundle % cd rails_delayed_job_sample % bundle install
Delayed::Jobのセットアップ
Delayed::JobのREADMEに従いセットアップを行います。
今回は Delayed::Jobのバックエンドに Active Record を使うため、Gemfileに delayed_job_active_record
を追加し、bundle installします。
gem 'delayed_job_active_record'
続いてジェネレートとマイグレーションを実行します。
# ジェネレート % bin/rails generate delayed_job:active_record create bin/delayed_job chmod bin/delayed_job create db/migrate/20220703062311_create_delayed_jobs.rb # DB作成 % bin/rails db:migrate == 20220703062311 CreateDelayedJobs: migrating ================================ -- create_table(:delayed_jobs) -> 0.0020s -- add_index(:delayed_jobs, [:priority, :run_at], {:name=>"delayed_jobs_priority"}) -> 0.0007s == 20220703062311 CreateDelayedJobs: migrated (0.0029s) =======================
次に、Active Job の queue_adapter
として使えるよう、 config/application.rb
に設定を追加します。
config.active_job.queue_adapter = :delayed_job
APIアプリのモデルを作成
annotate を使う
モデルにスキーマを明示するため、 annotate
gemをGemfileに追加し、bundle installします。
https://github.com/ctran/annotate_models
インストールが終わったら、ジェネレータを使って annotate の初期設定を行います。
% bin/rails g annotate:install
モデルを作成
今回は name
列だけを持つ apple
モデルを使います。
bin/rails g model apple name:string invoke active_record create db/migrate/20220703065930_create_apples.rb create app/models/apple.rb invoke test_unit create test/models/apple_test.rb create test/fixtures/apples.yml
以上で準備ができました。
Delayed::Jobを試す
初めてのジョブを作る
実装
初めてのジョブとして
- コントローラで、HTTPリクエストボディを元に、
apple
モデルへデータを保存する - コントローラで、ジョブをキューに登録する
- HTTPレスポンスを返す
を作ってみます。
まずはジョブから作成します。
ジョブ生成用のジェネレータを使います。
3 ジョブを作成する | Active Job の基礎 - Railsガイド
% bin/rails g job apple_instance invoke test_unit create test/jobs/apple_instance_job_test.rb create app/jobs/apple_instance_job.rb
ジョブの雛形ができたので、次はコントローラを作成します。
なお、ジョブにモデルのインスタンスを直接渡せるのは、GlobalIDという仕組みのようです。
10 GlobalID | Active Job の基礎 - Railsガイド
class Api::PassingInstance::ApplesController < ApplicationController def create # モデルへの保存 apple = Apple.new(name: params[:name]) apple.save # モデルのインスタンスを渡してジョブをキューに登録する AppleInstanceJob.perform_later(apple) render json: { status: params[:name]} end end
続いて、ジョブを編集します。
今回は、受け取ったモデルのインスタンスをログ出力します。
class AppleInstanceJob < ApplicationJob queue_as :default def perform(apple) puts "[instance job] (#{apple.name}) #{Time.zone.now} start ============>" puts "[instance job] (#{apple.name}) #{Time.zone.now} end <============" end end
最後に routes.rb にルーティングを追加します。
namespace :api do namespace :passing_instance do resources :apples, only: :create end end
動作確認
Railsを起動後、curlでAPIエンドポイントにHTTPリクエストします。
% curl -X POST -H "Content-Type: application/json" -d '{"name":"ふじ"}' http://localhost:3000/api/passing_instance/apples {"status":"ふじ"}
delayed_jobs
テーブルの中身を見ると、データが登録されています。
bin/rails jobs:work
で Delayed Job Worker を起動し、ワーカーのログを確認します。
作成したジョブが動作したようです。
[Worker(host:*** pid:97322)] Starting job worker [Worker(host:***pid:97322)] Job AppleInstanceJob [d93ef4e6-9162-4cf3-b805-a03eb751e4c6] from DelayedJob(default) with arguments: [{"_aj_globalid"=>"gid://rails-delayed-job-sample/Apple/23"}] (id=114) (queue=default) RUNNING [instance job] (ふじ) 2022-07-10 08:15:21 +0900 start ============> [instance job] (ふじ) 2022-07-10 08:15:21 +0900 end <============ [Worker(host:*** pid:97322)] Job AppleInstanceJob [d93ef4e6-9162-4cf3-b805-a03eb751e4c6] from DelayedJob(default) with arguments: [{"_aj_globalid"=>"gid://rails-delayed-job-sample/Apple/23"}] (id=114) (queue=default) COMPLETED after 0.0272 [Worker(host:*** pid:97322)] 1 jobs processed at 15.4619 j/s, 0 failed
引数としてサポートされていない、OpenStruct 型の引数を試してみる
Active Job でサポートされる引数の型の一覧は以下にあります。
9 引数でサポートされる型 | Active Job の基礎 - Railsガイド
上記に書かれていない型を引数として渡すとどうなるかを試してみます。
今回は OpenStruct を使ってみます。
class OpenStruct (Ruby 3.1 リファレンスマニュアル)
ジョブを生成します。
% bin/rails g job open_struct invoke test_unit create test/jobs/open_struct_job_test.rb create app/jobs/open_struct_job.rb
コントローラを実装します。
class Api::PassingOpenStruct::ApplesController < ApplicationController def create apple = OpenStruct.new( { id: 1, name: params[:name] } ) # OpenStruct を渡す OpenStructJob.perform_later(apple) render json: { status: apple.name} end end
ジョブを修正します。
class OpenStructJob < ApplicationJob queue_as :default def perform(apple) puts "[open_struct job] (#{apple.name}) #{Time.zone.now} start ============>" puts "[open_struct job] (#{apple.name}) #{Time.zone.now} end <============" end end
routes を追加し、Railsを起動した後に curl でアクセスすると、エラーになりました。
% curl -X POST -H "Content-Type: application/json" -d '{"name":"ふじ"}' http://localhost:3000/api/passing_open_struct/apples {"status":500,"error":"Internal Server Error", ... }
Railsのログには OpenStruct 型がサポートされていない旨が出力されていました。
Started POST "/api/passing_open_struct/apples" for 127.0.0.1 at 2022-07-10 08:31:51 +0900 (0.0ms) SELECT sqlite_version(*) Processing by Api::PassingOpenStruct::ApplesController#create as */* Parameters: {"name"=>"ふじ", "apple"=>{"name"=>"ふじ"}} [ActiveJob] Failed enqueuing OpenStructJob to DelayedJob(default): ActiveJob::SerializationError (Unsupported argument type: OpenStruct)
OpenStruct 型をサポートするには、シリアライザを定義すれば良いようです。
10.1 シリアライザ | Active Job の基礎 - Railsガイド
ただ、Ruby 3.0 から OpenStruct の使用に警告が入ったこともあり、今後新規で OpenStruct を使うのは控えることになるのかなと思っています。
- Caveats | class OpenStruct - RDoc Documentation
- 【Rails】RuboCop1.23でOpenStructの使用を監視するCopが追加された話|TechTechMedia
そこで、今回は OpenStruct のシリアライザ定義をがんばるのではなく、 to_h
でハッシュにします。
コントローラを修正します。
class Api::PassingOpenStruct::ApplesController < ApplicationController def create apple = OpenStruct.new( { id: 1, name: params[:name] } ) # ハッシュにするよう修正 OpenStructJob.perform_later(apple.to_h) render json: { status: apple.name} end end
ハッシュから取り出すよう、ジョブも修正します。
class OpenStructJob < ApplicationJob queue_as :default def perform(apple) puts "[open_struct job] (#{apple[:name]}) #{Time.zone.now} start ============>" puts "[open_struct job] (#{apple[:name]}) #{Time.zone.now} end <============" end end
再度 curl でアクセスすると、正常なレスポンスが返ってきました。
% curl -X POST -H "Content-Type: application/json" -d '{"name":"ふじ"}' http://localhost:3000/api/passing_open_struct/apples {"status":"ふじ"}
ワーカーのログにも正常に終了したことが出力されています。
[open_struct job] (ふじ) 2022-07-10 09:21:39 +0900 start ============> [open_struct job] (ふじ) 2022-07-10 09:21:39 +0900 end <============
Delayed::Jobにて、モデルのインスタンスメソッド実行をジョブ化する
Delayed::Job の README には、 .delay.method(params)
とメソッドの前に .delay
をはさむことでメソッドをジョブ実行できる旨が記載されているため、試してみます。
https://github.com/collectiveidea/delayed_job#queuing-jobs
モデルにインスタンスメソッド (post_api
) を生やします。
class Apple < ApplicationRecord def post_api # 外部APIを呼んでいるとする puts "[instance] (#{name}) #{Time.zone.now} start ============>" puts "[instance] (#{name}) #{Time.zone.now} end <============" end end
コントローラにて、 .delay
付きでモデルのインスタンスメソッドを呼んでみます。
class Api::InstanceMethod::ApplesController < ApplicationController def create apple = Apple.new(name: params[:name]) apple.save # モデルのインスタンスメソッドをジョブ実行 apple.delay.post_api render json: { status: params[:name]} end end
ルーティングの追加とRailsの起動を行い、curlで動作確認します。
% curl -X POST -H "Content-Type: application/json" -d '{"name":"ふじ"}' http://localhost:3000/api/instance_method/apples {"status":"ふじ"}
ワーカーのログに実行結果が出力されていました。
[Worker(host:*** pid:97322)] Job Apple#post_api (id=116) RUNNING [instance] (ふじ) 2022-07-10 09:30:22 +0900 start ============> [instance] (ふじ) 2022-07-10 09:30:22 +0900 end <============ [Worker(host:*** pid:97322)] Job Apple#post_api (id=116) COMPLETED after 0.0044
キューを振り分ける
準備
Delayed::Job では、名前付きキューを使うことでキューを振り分けることができることから、試してみます。
https://github.com/collectiveidea/delayed_job#named-queues
まずは、名前付きキューを使うジョブを3つ用意します。
キュー名:default
class DefaultQueueJob < ApplicationJob queue_as :default def perform(apple_name) puts "[default queue job] (#{apple_name}) #{Time.zone.now} start ============>" sleep 1 puts "[default queue job] (#{apple_name}) #{Time.zone.now} end <============" end end
キュー名:custom
このジョブは重く、途中で10秒かかるものとします
class CustomQueueJob < ApplicationJob queue_as :custom def perform(apple_name) puts "[custom queue job] (#{apple_name}) #{Time.zone.now} start ============>" sleep 10 puts "[custom queue job] (#{apple_name}) #{Time.zone.now} end <============" end end
キュー名:another
class AnotherQueueJob < ApplicationJob queue_as :another def perform(apple_name) puts "[another queue job] (#{apple_name}) #{Time.zone.now} start ============>" puts "[another queue job] (#{apple_name}) #{Time.zone.now} end <============" end end
続いて、これらのジョブを使うようなコントローラ、およびルーティングを追加します。
コントローラはこんな感じです。
class Api::QueueName::DefaultQueuesController < ApplicationController def create DefaultQueueJob.perform_later(params[:name]) render json: { status: params[:name]} end end
準備が終わったため、キューやワーカーの条件を変えて動作確認してみます。
環境変数 QUEUE や QUEUES を使い、Delayed::Job でワーカーが扱えるキューを指定する
Delayed::Job でワーカーが扱えるキューの設定方法を調べたところ、READMEに記載がありました。
If you want to just run all available jobs and exit you can use
rake jobs:workoff
Work off queues by setting the
QUEUE
orQUEUES
environment variable.QUEUE=tracking rake jobs:work QUEUES=mailers,tasks rake jobs:work
2つの名前付きキューに入れて、すべてのキューを扱えるワーカー1つを動かす
最初に、ワーカーを停止しておきます。
次に、キュー名 custom -> another -> custom
の順でキューに積んでおきます。
# custom curl -X POST -H "Content-Type: application/json" -d '{"name":"ふじ"}' http://localhost:3000/api/queue_name/custom_queues # another curl -X POST -H "Content-Type: application/json" -d '{"name":"秋映"}' http://localhost:3000/api/queue_name/another_queues # もう一回、custom curl -X POST -H "Content-Type: application/json" -d '{"name":"ふじ"}' http://localhost:3000/api/queue_name/custom_queues
ここで delayed_job テーブルを確認すると、3件のレコードが登録されています。
続いて bin/rails jobs:work
でワーカーを起動します。
ワーカーには環境変数を設定していないため、すべてのキューを扱うことができました。
また、優先度も設定していないため、キューに入った順番に処理が実行されました。
[Worker(host:*** pid:2223)] Job CustomQueueJob [1f86142c-58ec-401e-ae7a-5b9c528ded85] from DelayedJob(custom) with arguments: ["ふじ"] (id=121) (queue=custom) RUNNING [custom queue job] (ふじ) 2022-07-10 10:04:04 +0900 start ============> [custom queue job] (ふじ) 2022-07-10 10:04:14 +0900 end <============ [Worker(host:*** pid:2223)] Job CustomQueueJob [1f86142c-58ec-401e-ae7a-5b9c528ded85] from DelayedJob(custom) with arguments: ["ふじ"] (id=121) (queue=custom) COMPLETED after 10.0265 [Worker(host:*** pid:2223)] Job AnotherQueueJob [c55ea7b9-8a13-4d6d-aea3-b8954ea910fa] from DelayedJob(another) with arguments: ["秋映"] (id=122) (queue=another) RUNNING [another queue job] (秋映) 2022-07-10 10:04:14 +0900 start ============> [another queue job] (秋映) 2022-07-10 10:04:14 +0900 end <============ [Worker(host:*** pid:2223)] Job AnotherQueueJob [c55ea7b9-8a13-4d6d-aea3-b8954ea910fa] from DelayedJob(another) with arguments: ["秋映"] (id=122) (queue=another) COMPLETED after 0.0141 [Worker(host:*** pid:2223)] Job CustomQueueJob [6f69a43e-3a4b-42ae-9a79-2be09659f5f9] from DelayedJob(custom) with arguments: ["ふじ"] (id=123) (queue=custom) RUNNING [custom queue job] (ふじ) 2022-07-10 10:04:14 +0900 start ============> [custom queue job] (ふじ) 2022-07-10 10:04:24 +0900 end <============ [Worker(host:*** pid:2223)] Job CustomQueueJob [6f69a43e-3a4b-42ae-9a79-2be09659f5f9] from DelayedJob(custom) with arguments: ["ふじ"] (id=123) (queue=custom) COMPLETED after 10.0187 [Worker(host:*** pid:2223)] 3 jobs processed at 0.1491 j/s, 0 failed
3つの名前付きキューに入れて、指定のキューを扱えるワーカー2つを動かす
先ほどと同様、curlでアクセスしてキューにためておきます。
# default curl -X POST -H "Content-Type: application/json" -d '{"name":"つがる"}' http://localhost:3000/api/queue_name/default_queues # custom curl -X POST -H "Content-Type: application/json" -d '{"name":"ふじ"}' http://localhost:3000/api/queue_name/custom_queues # another curl -X POST -H "Content-Type: application/json" -d '{"name":"秋映"}' http://localhost:3000/api/queue_name/another_queues
次に、ワーカーを起動します。
localで複数のワーカーを起動する場合、 bin/rails jobs:work
を別ターミナルで起動することになります。今回は worker1
と worker2
を用意します。
その際、ターミナルごとに環境変数を設定して起動します。
- worker1は
QUEUE=default
- worker2は
QUEUES=custom,another
worker1のログを見ると、キュー名 default
だけを扱っています。
[Worker(host:*** pid:3308)] Job DefaultQueueJob [6659c50b-0182-4d69-9d16-83dd95213812] from DelayedJob(default) with arguments: ["つがる"] (id=127) (queue=default) RUNNING [default queue job] (つがる) 2022-07-10 10:17:22 +0900 start ============> [default queue job] (つがる) 2022-07-10 10:17:23 +0900 end <============ [Worker(host:*** pid:3308)] Job DefaultQueueJob [6659c50b-0182-4d69-9d16-83dd95213812] from DelayedJob(default) with arguments: ["つがる"] (id=127) (queue=default) COMPLETED after 1.0250 [Worker(host:*** pid:3308)] 1 jobs processed at 0.9412 j/s, 0 failed
worker2のログを見ると、キュー名 custom
と another
を扱っています。
worker1は空いているにも関わらず、キュー another
は worker2 で動いています。
[Worker(host:*** pid:3672)] Job CustomQueueJob [0271bd52-abca-401d-bcc3-668def1447b7] from DelayedJob(custom) with arguments: ["ふじ"] (id=128) (queue=custom) RUNNING [custom queue job] (ふじ) 2022-07-10 10:17:25 +0900 start ============> [custom queue job] (ふじ) 2022-07-10 10:17:35 +0900 end <============ [Worker(host:*** pid:3672)] Job CustomQueueJob [0271bd52-abca-401d-bcc3-668def1447b7] from DelayedJob(custom) with arguments: ["ふじ"] (id=128) (queue=custom) COMPLETED after 10.0248 [Worker(host:*** pid:3672)] Job AnotherQueueJob [f4a407d3-b34f-4c81-8701-6c650c1003d6] from DelayedJob(another) with arguments: ["秋映"] (id=129) (queue=another) RUNNING [another queue job] (秋映) 2022-07-10 10:17:35 +0900 start ============> [another queue job] (秋映) 2022-07-10 10:17:35 +0900 end <============ [Worker(host:*** pid:3672)] Job AnotherQueueJob [f4a407d3-b34f-4c81-8701-6c650c1003d6] from DelayedJob(another) with arguments: ["秋映"] (id=129) (queue=another) COMPLETED after 0.0133 [Worker(host:*** pid:3672)] 2 jobs processed at 0.1986 j/s, 0 failed
ネストしたジョブの扱いについて
親ジョブから子ジョブを生成する
Delayed::Jobでは、あるジョブ(親)から別のジョブ(子) を生成できます。
試してみます。
親
class ParentJob < ApplicationJob queue_as :default def perform(prefix) puts "[parent job] (#{prefix}) #{Time.zone.now} start ============>" # 子のジョブを生成する ChildJob.perform_later(prefix) puts "[parent job] (#{prefix}) #{Time.zone.now} end <============" end end
子
class ChildJob < ApplicationJob queue_as :default def perform(prefix) puts "[child job] (#{prefix}デリシャス) #{Time.zone.now} start ============>" puts "[child job] (#{prefix}デリシャス) #{Time.zone.now} end <============" end end
コントローラでは、親ジョブのみ生成します。
class Api::ParentChild::StarkingsController < ApplicationController def create ParentJob.perform_later(params[:prefix]) render json: { status: params[:prefix]} end end
ルーティングを作成し、Railsとワーカーを起動した後、curlでアクセスします。
% curl -X POST -H "Content-Type: application/json" -d '{"prefix":"スターキング"}' http://localhost:3000/api/parent_child/starkings
delayed_jobs
テーブルには親のみ登録されています。
ワーカーを起動すると、ワーカーのログに以下が出力されました。親→子の順番で処理されたようです。
[Worker(host:*** pid:4569)] Job ParentJob [84349539-8a02-4dd3-8139-93a13c9b73a0] from DelayedJob(default) with arguments: ["スターキング"] (id=130) (queue=default) RUNNING [parent job] (スターキング) 2022-07-10 10:27:54 +0900 start ============> [parent job] (スターキング) 2022-07-10 10:27:54 +0900 end <============ [Worker(host:*** pid:4569)] Job ParentJob [84349539-8a02-4dd3-8139-93a13c9b73a0] from DelayedJob(default) with arguments: ["スターキング"] (id=130) (queue=default) COMPLETED after 0.0250 [Worker(host:*** pid:4569)] Job ChildJob [b7bb8376-a19a-47d4-bbab-5d8e2012e5bb] from DelayedJob(default) with arguments: ["スターキング"] (id=131) (queue=default) RUNNING [child job] (スターキングデリシャス) 2022-07-10 10:27:54 +0900 start ============> [child job] (スターキングデリシャス) 2022-07-10 10:27:54 +0900 end <============ [Worker(host:*** pid:4569)] Job ChildJob [b7bb8376-a19a-47d4-bbab-5d8e2012e5bb] from DelayedJob(default) with arguments: ["スターキング"] (id=131) (queue=default) COMPLETED after 0.0086 [Worker(host:*** pid:4569)] 2 jobs processed at 31.0791 j/s, 0 failed
親のジョブと別ジョブがキューに登録されていた場合の挙動
今度は
- 親ジョブ
- 別のジョブ
が登録されていた場合、どの順番で実行されるかを試してみます。
# 親のジョブ % curl -X POST -H "Content-Type: application/json" -d '{"prefix":"スターキング"}' http://localhost:3000/api/parent_child/starkings # 別のジョブ % curl -X POST -H "Content-Type: application/json" -d '{"name":"つがる"}' http://localhost:3000/api/queue_name/default_queues
ワーカーを起動する前の delayed_jobs
テーブルです。
2レコード登録され、2つ目が別のジョブになっています。
ワーカーを起動すると、ワーカーのログには 親 → 別 → 子
の順番で処理されました。
キューに入った順番でワーカーは処理するようです。
[Worker(host:*** pid:5208)] Job ParentJob [2276fe30-0102-4dfc-af2c-83fc77aa4169] from DelayedJob(default) with arguments: ["スターキング"] (id=132) (queue=default) RUNNING [parent job] (スターキング) 2022-07-10 10:33:22 +0900 start ============> [parent job] (スターキング) 2022-07-10 10:33:22 +0900 end <============ [Worker(host:*** pid:5208)] Job ParentJob [2276fe30-0102-4dfc-af2c-83fc77aa4169] from DelayedJob(default) with arguments: ["スターキング"] (id=132) (queue=default) COMPLETED after 0.0241 [Worker(host:*** pid:5208)] Job DefaultQueueJob [65dbfba4-8047-4814-87ea-763ec390ef3a] from DelayedJob(default) with arguments: ["つがる"] (id=133) (queue=default) RUNNING [default queue job] (つがる) 2022-07-10 10:33:22 +0900 start ============> [default queue job] (つがる) 2022-07-10 10:33:23 +0900 end <============ [Worker(host:*** pid:5208)] Job DefaultQueueJob [65dbfba4-8047-4814-87ea-763ec390ef3a] from DelayedJob(default) with arguments: ["つがる"] (id=133) (queue=default) COMPLETED after 1.0195 [Worker(host:*** pid:5208)] Job ChildJob [0bbda4f7-2bf8-4d34-8631-0fcacfe238c9] from DelayedJob(default) with arguments: ["スターキング"] (id=134) (queue=default) RUNNING [child job] (スターキングデリシャス) 2022-07-10 10:33:24 +0900 start ============> [child job] (スターキングデリシャス) 2022-07-10 10:33:24 +0900 end <============ [Worker(host:*** pid:5208)] Job ChildJob [0bbda4f7-2bf8-4d34-8631-0fcacfe238c9] from DelayedJob(default) with arguments: ["スターキング"] (id=134) (queue=default) COMPLETED after 0.0128 [Worker(host:*** pid:5208)] 3 jobs processed at 2.7135 j/s, 0 failed
親のジョブと別ジョブがキューに登録され、かつ、別ジョブは子ジョブよりも優先度が低い場合の挙動
Active Jobでは、 queue_with_priority
に値を設定することで、ジョブの優先度を付けることができます。
https://api.rubyonrails.org/v7.0/classes/ActiveJob/QueuePriority/ClassMethods.html
そこで、queue_with_priority
に値を設定した別ジョブ LowPriorityJob
を作成し、子のジョブより優先度を下げてみて、動作を確認してみます。
class LowPriorityJob < ApplicationJob queue_as :default queue_with_priority 30 def perform(*args) puts "[low priority job] #{Time.zone.now} start ============>" puts "[low priority job] #{Time.zone.now} end <============" end end
コントローラーではジョブを呼ぶだけです。
class Api::ParentChild::LowPrioritiesController < ApplicationController def create LowPriorityJob.perform_later(params[:name]) render json: { status: params[:name]} end end
ルーティングを作成し、Railsとワーカーを起動した後、curlでアクセスします。
# 親ジョブ % curl -X POST -H "Content-Type: application/json" -d '{"prefix":"スターキング"}' http://localhost:3000/api/parent_child/starkings # 別ジョブ % curl -X POST -H "Content-Type: application/json" -d '{"name":"国光"}' http://localhost:3000/api/parent_child/low_priorities
delayed_jobs
テーブルを見ると、2つ目のジョブの priority
が設定されていました。
この状態でワーカーを起動します。
ワーカーのログには 親 → 子 → 別
の順番で処理した結果が記録されていました。
[Worker(host:*** pid:6162)] Job ParentJob [ce97898f-c43d-43ba-bf5e-62b12de598cb] from DelayedJob(default) with arguments: ["スターキング"] (id=138) (queue=default) RUNNING [parent job] (スターキング) 2022-07-10 10:44:22 +0900 start ============> [parent job] (スターキング) 2022-07-10 10:44:22 +0900 end <============ [Worker(host:*** pid:6162)] Job ParentJob [ce97898f-c43d-43ba-bf5e-62b12de598cb] from DelayedJob(default) with arguments: ["スターキング"] (id=138) (queue=default) COMPLETED after 0.0214 [Worker(host:*** pid:6162)] Job ChildJob [e18aa897-63c4-4e7d-8c84-c3c5d0c08172] from DelayedJob(default) with arguments: ["スターキング"] (id=140) (queue=default) RUNNING [child job] (スターキングデリシャス) 2022-07-10 10:44:22 +0900 start ============> [child job] (スターキングデリシャス) 2022-07-10 10:44:22 +0900 end <============ [Worker(host:*** pid:6162)] Job ChildJob [e18aa897-63c4-4e7d-8c84-c3c5d0c08172] from DelayedJob(default) with arguments: ["スターキング"] (id=140) (queue=default) COMPLETED after 0.0079 [Worker(host:*** pid:6162)] Job LowPriorityJob [18f99e0e-f78d-493b-bd2e-ec335e4828f3] from DelayedJob(default) with arguments: ["国光"] (id=139) (queue=default) RUNNING [low priority job] 2022-07-10 10:44:22 +0900 start ============> [low priority job] 2022-07-10 10:44:22 +0900 end <============ [Worker(host:*** pid:6162)] Job LowPriorityJob [18f99e0e-f78d-493b-bd2e-ec335e4828f3] from DelayedJob(default) with arguments: ["国光"] (id=139) (queue=default) COMPLETED after 0.0136 [Worker(host:*** pid:6162)] 3 jobs processed at 36.9590 j/s, 0 failed
トランザクションを使って、モデルの登録/更新とキューへの登録を同期する
今回の delayed_job_active_record
では Delayed::Job のバックエンドにDBを使っています。
そのため、「モデルの登録/更新とキューへの登録を同期させる。モデルの更新が失敗したら、キューの登録も取り消す」という機能は、トランザクションを使うと簡単に実現できます。
ためしに以下のコントローラを作成し、動作を確認してみます。
今回は「ロールバックだけを行い、APIは正常終了する」よう、 ActiveRecord::Rollback
を使ってロールバックを行います。
https://api.rubyonrails.org/classes/ActiveRecord/Rollback.html
class Api::Rollback::ApplesController < ApplicationController def create # トランザクションの開始 ActiveRecord::Base.transaction do # モデルの作成 Apple.create(name: params[:name]) # キューへの登録 DefaultQueueJob.perform_later(params[:name]) # トランザクションロールバックの発生 raise ActiveRecord::Rollback end render json: { status: params[:name]} end end
処理前の状態を Rails console で確認したところ、両方とも0件でした。
>> Apple.count Apple Count (0.2ms) SELECT COUNT(*) FROM "apples" => 0 >> Delayed::Job.count Delayed::Backend::ActiveRecord::Job Count (0.4ms) SELECT COUNT(*) FROM "delayed_jobs" => 0
% curl -X POST -H "Content-Type: application/json" -d '{"name":"国光"}' http://localhost:3000/api/rollback/apples
Railsの実行ログを見ると、トランザクションのロールバックが発生しています。
Started POST "/api/rollback/apples" for 127.0.0.1 at 2022-07-10 11:00:43 +0900 (0.2ms) SELECT sqlite_version(*) Processing by Api::Rollback::ApplesController#create as */* Parameters: {"name"=>"国光", "apple"=>{"name"=>"国光"}} # トランザクションが開始された TRANSACTION (0.1ms) begin transaction ↳ app/controllers/api/rollback/apples_controller.rb:7:in `block in create' Apple Create (0.3ms) INSERT INTO "apples" ("name", "created_at", "updated_at") VALUES (?, ?, ?) [["name", "国光"], ["created_at", "2022-07-10 02:00:44.074838"], ["updated_at", "2022-07-10 02:00:44.074838"]] ↳ app/controllers/api/rollback/apples_controller.rb:7:in `block in create' [ActiveJob] Delayed::Backend::ActiveRecord::Job Create (0.2ms) INSERT INTO "delayed_jobs" ("priority", "attempts", "handler", "last_error", "run_at", "locked_at", "failed_at", "locked_by", "queue", "created_at", "updated_at") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) [["priority", 0], ["attempts", 0], ["handler", "--- !ruby/object:ActiveJob::QueueAdapters::DelayedJobAdapter::JobWrapper\njob_data:\n job_class: DefaultQueueJob\n job_id: 6a8f57c6-9b36-4479-a64a-30b5f4959cdd\n provider_job_id:\n queue_name: default\n priority:\n arguments:\n - 国光\n executions: 0\n exception_executions: {}\n locale: en\n timezone: Tokyo\n enqueued_at: '2022-07-10T02:00:44Z'\n"], ["last_error", nil], ["run_at", "2022-07-10 02:00:44.082236"], ["locked_at", nil], ["failed_at", nil], ["locked_by", nil], ["queue", "default"], ["created_at", "2022-07-10 02:00:44.082260"], ["updated_at", "2022-07-10 02:00:44.082260"]] [ActiveJob] ↳ app/controllers/api/rollback/apples_controller.rb:10:in `block in create' [ActiveJob] Enqueued DefaultQueueJob (Job ID: 6a8f57c6-9b36-4479-a64a-30b5f4959cdd) to DelayedJob(default) with arguments: "国光" # ここでトランザクションのロールバックが走った TRANSACTION (0.6ms) rollback transaction ↳ app/controllers/api/rollback/apples_controller.rb:4:in `create' Completed 200 OK in 15ms (Views: 0.3ms | ActiveRecord: 3.0ms | Allocations: 4673)
Rails console を確認しても、何も登録されていません。
>> Apple.count Apple Count (0.3ms) SELECT COUNT(*) FROM "apples" => 0 >> Delayed::Job.count Delayed::Backend::ActiveRecord::Job Count (0.2ms) SELECT COUNT(*) FROM "delayed_jobs" => 0
ジョブごとに Active Job のバックエンドを切り替える
ここまで、Active Job のバックエンドは Delayed::Job
を使ってきました。
ただ、「一部のジョブだけ別の Active Job バックエンドを使いたい」というケースがあるかもしれません。
Railsガイドによると、一部だけ別のバックエンドにしたい場合は、ジョブで queue_adapter
を定義すれば良さそうです。
4.2 バックエンドを設定する | Active Job の基礎 - Railsガイド
そこで今回は、一部のジョブだけ、Active Jobのデフォルトバックエンドである async
への切り替えて実行してみます。
https://api.rubyonrails.org/v7.0/classes/ActiveJob/QueueAdapters/AsyncAdapter.html
class AsyncBackendJob < ApplicationJob # キュー名は、Delayed::Job と同じ queue_as :default # バックエンドだけ async self.queue_adapter = :async def perform(*args) puts "[async job] #{Time.zone.now} start ============>" puts "[async job] #{Time.zone.now} end <============" end end
ルーティングを追加し、Railsアプリを起動します。また、ワーカーは停止しておきます。
# Delayed::Job バックエンド curl -X POST -H "Content-Type: application/json" -d '{"name":"国光"}' http://localhost:3000/api/queue_name/default_queues # async バックエンド curl -X POST -H "Content-Type: application/json" -d '{"name":"国光"}' http://localhost:3000/api/queue_backend/async_backends
Railsサーバのログを見ると、コントローラへのアクセスの後に、ジョブを実行していることが記録されていました。
Started POST "/api/queue_backend/async_backends" for 127.0.0.1 at 2022-07-10 11:11:19 +0900 Processing by Api::QueueBackend::AsyncBackendsController#create as */* Parameters: {"name"=>"国光", "async_backend"=>{"name"=>"国光"}} [ActiveJob] Enqueued AsyncBackendJob (Job ID: 3a28fdf8-2b39-4293-ac2d-250b011ff660) to Async(default) Completed 200 OK in 4ms (Views: 0.2ms | ActiveRecord: 0.0ms | Allocations: 848) [ActiveJob] [AsyncBackendJob] [3a28fdf8-2b39-4293-ac2d-250b011ff660] Performing AsyncBackendJob (Job ID: 3a28fdf8-2b39-4293-ac2d-250b011ff660) from Async(default) enqueued at 2022-07-10T02:11:19Z [async job] 2022-07-10 11:11:19 +0900 start ============> [async job] 2022-07-10 11:11:19 +0900 end <============ [ActiveJob] [AsyncBackendJob] [3a28fdf8-2b39-4293-ac2d-250b011ff660] Performed AsyncBackendJob (Job ID: 3a28fdf8-2b39-4293-ac2d-250b011ff660) from Async(default) in 3.85ms
なお、ワーカーを停止しているため、Delayed::Job バックエンドはキューに残ったままになっています。
Delayed::Job のワーカーの設定を変更する
Delayed::Job の README に、ワーカーの設定変更方法が記載されています。
https://github.com/collectiveidea/delayed_job#gory-details
そこで、 config/initializers/delayed_job_config.rb
を作成し、設定を追加して試してみます。
また、動作確認用に、必ず落ちるジョブを作っておきます。
class AlwaysFailJob < ApplicationJob queue_as :default def perform(*args) puts "[fail job] #{Time.zone.now} start ============>" # 常にエラー raise StandardError puts "[fail job] #{Time.zone.now} end <============" end end
このジョブを起動するコントローラーはこちら。
class Api::AlwaysFailJobs::ApplesController < ApplicationController def create AlwaysFailJob.perform_later render json: { status: params[:name] } end end
max_attempt (リトライ回数)
デフォルトは 25
のようです。
そこで、数値を色々変更し、動作を確認してみます。
なお、リトライ間隔については、等間隔での再実行ではなく、
On error, the job is scheduled again in 5 seconds + N ** 4, where N is the number of attempts or using the job's defined reschedule_at method.
とのことです。
max_attempt = 2
設定します。
# config/initializers/delayed_job_config.rb Delayed::Worker.max_attempts = 2
curl -X POST -H "Content-Type: application/json" -d '{"name":"国光"}' http://localhost:3000/api/always_fail_jobs/apples
2回実行されました。
2回目のエラー時には FAILED permanently because of 2 consecutive failures
と出ています。
また、しばらく待ってもリトライはされませんでした。
[Worker(host:*** pid:9568)] Job AlwaysFailJob [4a7ac3b1-4d1f-4884-a481-7484fbf10af1] from DelayedJob(default) with arguments: [] (id=142) (queue=default) RUNNING [fail job] 2022-07-10 11:21:18 +0900 start ============> [Worker(host:*** pid:9568)] Job AlwaysFailJob [4a7ac3b1-4d1f-4884-a481-7484fbf10af1] from DelayedJob(default) with arguments: [] (id=142) (queue=default) FAILED (0 prior attempts) with StandardError: StandardError [Worker(host:*** pid:9568)] 1 jobs processed at 30.6636 j/s, 1 failed [Worker(host:*** pid:9568)] Job AlwaysFailJob [4a7ac3b1-4d1f-4884-a481-7484fbf10af1] from DelayedJob(default) with arguments: [] (id=142) (queue=default) RUNNING [fail job] 2022-07-10 11:21:29 +0900 start ============> [Worker(host:*** pid:9568)] Job AlwaysFailJob [4a7ac3b1-4d1f-4884-a481-7484fbf10af1] from DelayedJob(default) with arguments: [] (id=142) (queue=default) FAILED (1 prior attempts) with StandardError: StandardError [Worker(host:*** pid:9568)] Job AlwaysFailJob [4a7ac3b1-4d1f-4884-a481-7484fbf10af1] from DelayedJob(default) with arguments: [] (id=142) (queue=default) FAILED permanently because of 2 consecutive failures [Worker(host:*** pid:9568)] 1 jobs processed at 59.3155 j/s, 1 failed
max_attempt = 1
1回だけ実行された後、 FAILED permanently because of 1 consecutive failures
が表示されています。
[Worker(host:*** pid:10246)] Job AlwaysFailJob [0789bf63-34b3-439f-89f9-549ce98208d0] from DelayedJob(default) with arguments: [] (id=143) (queue=default) RUNNING [fail job] 2022-07-10 11:27:10 +0900 start ============> [Worker(host:*** pid:10246)] Job AlwaysFailJob [0789bf63-34b3-439f-89f9-549ce98208d0] from DelayedJob(default) with arguments: [] (id=143) (queue=default) FAILED (0 prior attempts) with StandardError: StandardError [Worker(host:*** pid:10246)] Job AlwaysFailJob [0789bf63-34b3-439f-89f9-549ce98208d0] from DelayedJob(default) with arguments: [] (id=143) (queue=default) FAILED permanently because of 1 consecutive failures [Worker(host:*** pid:10246)] 1 jobs processed at 34.0634 j/s, 1 failed
max_attempt = 0
1回だけ実行された後、 FAILED permanently because of 1 consecutive failures
が表示されています。
[Worker(host:*** pid:10660)] Job AlwaysFailJob [78821bed-0214-42d2-a121-0a3e7bda59f5] from DelayedJob(default) with arguments: [] (id=144) (queue=default) RUNNING [fail job] 2022-07-10 11:28:34 +0900 start ============> [Worker(host:*** pid:10660)] Job AlwaysFailJob [78821bed-0214-42d2-a121-0a3e7bda59f5] from DelayedJob(default) with arguments: [] (id=144) (queue=default) FAILED (0 prior attempts) with StandardError: StandardError [Worker(host:*** pid:10660)] Job AlwaysFailJob [78821bed-0214-42d2-a121-0a3e7bda59f5] from DelayedJob(default) with arguments: [] (id=144) (queue=default) FAILED permanently because of 1 consecutive failures [Worker(host:*** pid:10660)] 1 jobs processed at 20.7310 j/s, 1 failed
destroy_failed_jobs = false (ジョブが失敗しても削除しない)
デフォルトでは、 destroy_failed_jobs = true
になっているため、エラーになったジョブは delayed_jobs
テーブルから削除されます。
「失敗時のジョブの情報を残したい」などの要件がある場合は、失敗したジョブも残すよう false
へと変更します。
# config/initializers/delayed_job_config.rb Delayed::Worker.destroy_failed_jobs = false
常に失敗するジョブのエンドポイントへ curl でリクエストします。
% curl -X POST -H "Content-Type: application/json" -d '{"name":"国光"}' http://localhost:3000/api/always_fail_jobs/apples
ジョブは失敗します。
[Worker(host:*** pid:10660)] Job AlwaysFailJob [cd497d7b-d58f-4c0a-b096-874b9c52d18c] from DelayedJob(default) with arguments: [] (id=145) (queue=default) RUNNING [fail job] 2022-07-10 11:31:45 +0900 start ============> [Worker(host:*** pid:10660)] Job AlwaysFailJob [cd497d7b-d58f-4c0a-b096-874b9c52d18c] from DelayedJob(default) with arguments: [] (id=145) (queue=default) FAILED (0 prior attempts) with StandardError: StandardError [Worker(host:*** pid:10660)] Job AlwaysFailJob [cd497d7b-d58f-4c0a-b096-874b9c52d18c] from DelayedJob(default) with arguments: [] (id=145) (queue=default) FAILED permanently because of 1 consecutive failures [Worker(host:*** pid:10660)] 1 jobs processed at 58.3737 j/s, 1 failed
delayed_jobs
テーブルを見ると、エラーレコードが残ったままになっています。
失敗したジョブについては failed_at
などが設定されています。
次のスクリーンショットの場合は、1行目のジョブが失敗しています。
max_run_time (ジョブの実行時間)
デフォルトは 4.hours
です。
そこで、max_run_time
を2秒に変更して、中で10秒sleepするジョブを実行してみます。
# config/initializers/delayed_job_config.rb # ジョブがエラーになった時の繰り返し回数 # 0は1と同じ Delayed::Worker.max_attempts = 2 # ジョブの実行時間 Delayed::Worker.max_run_time = 2.seconds
curl -X POST -H "Content-Type: application/json" -d '{"name":"秋映"}' http://localhost:3000/api/queue_name/custom_queues
ワーカーのログを見ると、2秒経過したところでジョブが失敗していました。
[Worker(host:*** pid:11637)] Job CustomQueueJob [742d6182-d3ab-4942-b9da-763bc4dfed69] from DelayedJob(custom) with arguments: ["秋映"] (id=147) (queue=custom) RUNNING [custom queue job] (秋映) 2022-07-10 11:40:01 +0900 start ============> [Worker(host:*** pid:11637)] Job CustomQueueJob [742d6182-d3ab-4942-b9da-763bc4dfed69] from DelayedJob(custom) with arguments: ["秋映"] (id=147) (queue=custom) FAILED (0 prior attempts) with Delayed::WorkerTimeout: execution expired (Delayed::Worker.max_run_time is only 2 seconds) [Worker(host:*** pid:11637)] 1 jobs processed at 0.4868 j/s, 1 failed [Worker(host:*** pid:11637)] Job CustomQueueJob [742d6182-d3ab-4942-b9da-763bc4dfed69] from DelayedJob(custom) with arguments: ["秋映"] (id=147) (queue=custom) RUNNING [custom queue job] (秋映) 2022-07-10 11:40:13 +0900 start ============> [Worker(host:*** pid:11637)] Job CustomQueueJob [742d6182-d3ab-4942-b9da-763bc4dfed69] from DelayedJob(custom) with arguments: ["秋映"] (id=147) (queue=custom) FAILED (1 prior attempts) with Delayed::WorkerTimeout: execution expired (Delayed::Worker.max_run_time is only 2 seconds) [Worker(host:*** pid:11637)] Job CustomQueueJob [742d6182-d3ab-4942-b9da-763bc4dfed69] from DelayedJob(custom) with arguments: ["秋映"] (id=147) (queue=custom) FAILED permanently because of 2 consecutive failures [Worker(host:*** pid:11637)] 1 jobs processed at 0.4938 j/s, 1 failed
read_ahead 設定がDBエンジンによっては無視される
項目の説明として
The default behavior is to read 5 jobs from the queue when finding an available job. You can configure this by setting Delayed::Worker.read_ahead.
とありました。
この説明を軽く読んだ時、「 read_ahead
の分だけジョブを読んで、その中の優先度が高いものを実行する」なのかもしれないと感じました。
そこで、
# config/initializers/delayed_job_config.rb Delayed::Worker.read_ahead = 3
と先読みを3件にするよう設定した後、7個目に優先度が高いものがジョブがある時にどうなるかを見てみることにしました。
ジョブを2つ用意します。
優先度: 30 (低い)
class LowPriorityJob < ApplicationJob queue_as :default queue_with_priority 30 def perform(*args) puts "[low priority job] #{Time.zone.now} start ============>" puts "[low priority job] #{Time.zone.now} end <============" end end
優先度:10 (高い)
class HighPriorityJob < ApplicationJob queue_as :default queue_with_priority 10 def perform(*args) puts "[high priority job] #{Time.zone.now} start ============>" puts "[high priority job] #{Time.zone.now} end <============" end end
次に、これらのジョブに対応するAPIエンドポイントを用意します。
準備ができたため、動作を確認します。ワーカーを止めた状態で、
- 優先度が低いジョブを6個キューに入れる
- 優先度が高いジョブを1個キューに入れる
# 6回実行 curl -X POST -H "Content-Type: application/json" -d '{"name":"秋映"}' http://localhost:3000/api/priority/low_priorities curl -X POST -H "Content-Type: application/json" -d '{"name":"秋映"}' http://localhost:3000/api/priority/low_priorities curl -X POST -H "Content-Type: application/json" -d '{"name":"秋映"}' http://localhost:3000/api/priority/low_priorities curl -X POST -H "Content-Type: application/json" -d '{"name":"秋映"}' http://localhost:3000/api/priority/low_priorities curl -X POST -H "Content-Type: application/json" -d '{"name":"秋映"}' http://localhost:3000/api/priority/low_priorities curl -X POST -H "Content-Type: application/json" -d '{"name":"秋映"}' http://localhost:3000/api/priority/low_priorities # 1回実行 curl -X POST -H "Content-Type: application/json" -d '{"name":"秋映"}' http://localhost:3000/api/priority/high_priorities
ワーカーを起動してログを確認したところ、優先度:高いから実行されているように見えました。
# 優先度:高が1回実行される [Worker(host:*** pid:12532)] Job HighPriorityJob [a94294d8-5810-4902-87d7-120a1cd71b19] from DelayedJob(default) with arguments: [] (id=154) (queue=default) RUNNING [high priority job] 2022-07-10 11:50:26 +0900 start ============> [high priority job] 2022-07-10 11:50:26 +0900 end <============ [Worker(host:*** pid:12532)] Job HighPriorityJob [a94294d8-5810-4902-87d7-120a1cd71b19] from DelayedJob(default) with arguments: [] (id=154) (queue=default) COMPLETED after 0.0183 # 続いて、優先度:低が6回実行される [Worker(host:*** pid:12532)] Job LowPriorityJob [66de3de9-d61e-4fd6-a315-275bacd3a4d5] from DelayedJob(default) with arguments: [] (id=148) (queue=default) RUNNING [low priority job] 2022-07-10 11:50:26 +0900 start ============> [low priority job] 2022-07-10 11:50:26 +0900 end <============ [Worker(host:*** pid:12532)] Job LowPriorityJob [66de3de9-d61e-4fd6-a315-275bacd3a4d5] from DelayedJob(default) with arguments: [] (id=148) (queue=default) COMPLETED after 0.0078 [Worker(host:*** pid:12532)] Job LowPriorityJob [db85ccb8-2fb6-4c27-84d1-c5e4afed5905] from DelayedJob(default) with arguments: [] (id=149) (queue=default) RUNNING [low priority job] 2022-07-10 11:50:26 +0900 start ============> [low priority job] 2022-07-10 11:50:26 +0900 end <============ [Worker(host:*** pid:12532)] Job LowPriorityJob [db85ccb8-2fb6-4c27-84d1-c5e4afed5905] from DelayedJob(default) with arguments: [] (id=149) (queue=default) COMPLETED after 0.0075 [Worker(host:*** pid:12532)] Job LowPriorityJob [633dc530-94f4-43b6-aa6d-8257099c5110] from DelayedJob(default) with arguments: [] (id=150) (queue=default) RUNNING [low priority job] 2022-07-10 11:50:26 +0900 start ============> [low priority job] 2022-07-10 11:50:26 +0900 end <============ [Worker(host:*** pid:12532)] Job LowPriorityJob [633dc530-94f4-43b6-aa6d-8257099c5110] from DelayedJob(default) with arguments: [] (id=150) (queue=default) COMPLETED after 0.0081 [Worker(host:*** pid:12532)] Job LowPriorityJob [9000f99a-6f67-4d55-ba96-558548245fd9] from DelayedJob(default) with arguments: [] (id=151) (queue=default) RUNNING [low priority job] 2022-07-10 11:50:26 +0900 start ============> [low priority job] 2022-07-10 11:50:26 +0900 end <============ [Worker(host:*** pid:12532)] Job LowPriorityJob [9000f99a-6f67-4d55-ba96-558548245fd9] from DelayedJob(default) with arguments: [] (id=151) (queue=default) COMPLETED after 0.0080 [Worker(host:*** pid:12532)] Job LowPriorityJob [fa1401cf-5f9f-4a60-abac-90d13056376d] from DelayedJob(default) with arguments: [] (id=152) (queue=default) RUNNING [low priority job] 2022-07-10 11:50:26 +0900 start ============> [low priority job] 2022-07-10 11:50:26 +0900 end <============ [Worker(host:*** pid:12532)] Job LowPriorityJob [fa1401cf-5f9f-4a60-abac-90d13056376d] from DelayedJob(default) with arguments: [] (id=152) (queue=default) COMPLETED after 0.0078 [Worker(host:*** pid:12532)] Job LowPriorityJob [1a71b2a8-39fb-4828-a063-33c29de9e503] from DelayedJob(default) with arguments: [] (id=153) (queue=default) RUNNING [low priority job] 2022-07-10 11:50:26 +0900 start ============> [low priority job] 2022-07-10 11:50:26 +0900 end <============ [Worker(host:*** pid:12532)] Job LowPriorityJob [1a71b2a8-39fb-4828-a063-33c29de9e503] from DelayedJob(default) with arguments: [] (id=153) (queue=default) COMPLETED after 0.0075 [Worker(host:*** pid:12532)] 7 jobs processed at 56.1185 j/s, 0 failed
そのため、
「
read_ahead
の分だけジョブを読んで、その中の優先度が高いものを実行する」
という目的の設定ではないと分かりました。
そこで、 read_ahead
について詳しく知るため、 read_ahead
についての記事を読みました。
- ruby on rails - How does priority interact with read_ahead in delayed_job? - Stack Overflow
- PostgreSQL バックエンドの delayed_job でワーカーが一度に複数のジョブを予約する問題 - Qiita
次に、ソースコードを読んだところ、 reserve_with_scope_using_default_sql
の中で read_ahead
を使っていました。
https://github.com/collectiveidea/delayed_job_active_record/blob/v4.1.7/lib/delayed/backend/active_record.rb#L100-L167
そこではDBエンジンとして
あたりを使っていると reserve_with_scope_using_default_sql
を使うロジックに入らないように見えました。
もし、上記DBを使っていて read_ahead
を使う reserve_with_scope_using_default_sql
ロジックに入りたい場合は、
Delayed::Backend::ActiveRecord.configuration.reserve_sql_strategy = :default_sql
な設定が必要そうでした。
今回試している環境のDBエンジンは SQLite なものの、自分の関係する本番環境では無視されるようだったため、 read_ahead
についてはこれ以上追求することをやめました。
Action Mailer と Delayed::Job を一緒に使う
Delayed::Job の README によると、Delayed::Job は Action Mailer とともに使えるようでしたので、ためしてみます。
https://github.com/collectiveidea/delayed_job#rails-mailers
なお、実際にメールを送信すると確認が手間なので、今回は Letter Opener
を使ってメールを受信してみます。
https://github.com/ryanb/letter_opener
Gemfileに追加して bundle install
します。
group :development do # メールの受信 gem "letter_opener" end
Action Mailer と Delayed::Job を組み合わせてみる
使い方としては
.delay
を間に挟んで使う- Action Mailer の
.deliver_later
を使う
の2パターンがあるようです。
今回は Active Job のインタフェースに合わせ、
Action Mailer の
.deliver_later
を使う
な方式で実装して試してみます。
まずは ActionMailer のジェネレータを実行します。
今回は HelloWorld
という Mailer を作成します。
% bin/rails generate mailer HelloWorld create app/mailers/hello_world_mailer.rb invoke erb create app/views/hello_world_mailer invoke test_unit create test/mailers/hello_world_mailer_test.rb create test/mailers/previews/hello_world_mailer_preview.rb
Mailerを編集します。
# app/mailers/hello_world_mailer.rb class HelloWorldMailer < ApplicationMailer def welcome_email @name = params[:name] mail(to: 'bar@example.com', subject: 'Hello, world!') end end
メールのビュー (app/views/hello_world_mailer/welcome_email.text.erb
) も作成します。
<%= @name %> Hello, world!
コントローラーを作成します。
class Api::Email::OnlyEmailsController < ApplicationController def create HelloWorldMailer.with(name: params[:name]).welcome_email.deliver_later render json: { status: params[:name] } end end
ルーティングを追加した後、Railsとジョブワーカーを起動し、curlでアクセスします。
% curl -X POST -H "Content-Type: application/json" -d '{"name":"国光"}' http://localhost:3000/api/email/only_emails
すると、ワーカーがメールのジョブを実行し、メールが受信できました。
メールジョブの設定を行う
キュー名や優先度をメールジョブ全体で設定
キュー名は、config/applicantion.rb
の config.action_mailer.deliver_later_queue_name
にて設定できます。
- 3.12.15 config.action_mailer.deliver_later_queue_name | Rails アプリケーションを設定する - Railsガイド
- 3.13.15 config.action_mailer.deliver_later_queue_name | Configuring Rails Applications — Ruby on Rails Guides
一方、優先度については Mailer 用の initializer を作成し、その中で ActionMailer::MailDeliveryJob.priority
を指定することになります。
How to set priority of Rails ActionMailer - Stack Overflow
それぞれ定義してみます。
キュー名
# config/application.rb # ... module RailsDelayedJobSample class Application < Rails::Application # Initialize configuration defaults for originally generated Rails version. config.load_defaults 7.0 # ... # Action Mailerの設定 # キュー名を設定 # https://guides.rubyonrails.org/configuring.html#config-action-mailer-deliver-later-queue-name config.action_mailer.deliver_later_queue_name = 'mail_queue' end end
優先度
# config/initializer/mail_delivery_job.rb ActionMailer::MailDeliveryJob.priority = 30
準備ができたため、ワーカーを停止した後、Railsを起動して curl でリクエストします。
curl -X POST -H "Content-Type: application/json" -d '{"name":"国光"}' http://localhost:3000/api/email/only_emails
delayed_jobs
テーブルを見ると、キュー名や優先度が指定されていました。
ワーカーを起動すると、メールが送信されました。
ログを見ると、ワーカーでメールを送信していることが分かります。
[Worker(host:*** pid:15576)] Job ActionMailer::MailDeliveryJob [e9f626e0-e971-4f15-978e-bb524a9c85ab] from DelayedJob(mail_queue) with arguments: ["HelloWorldMailer", "welcome_email", "deliver_now", {"params"=>{"name"=>"国光", "_aj_symbol_keys"=>["name"]}, "args"=>[], "_aj_ruby2_keywords"=>["params", "args"]}] (id=157) (queue=mail_queue) RUNNING [Worker(host:*** pid:15576)] Job ActionMailer::MailDeliveryJob [e9f626e0-e971-4f15-978e-bb524a9c85ab] from DelayedJob(mail_queue) with arguments: ["HelloWorldMailer", "welcome_email", "deliver_now", {"params"=>{"name"=>"国光", "_aj_symbol_keys"=>["name"]}, "args"=>[], "_aj_ruby2_keywords"=>["params", "args"]}] (id=157) (queue=mail_queue) COMPLETED after 0.1572 [Worker(host:*** pid:15576)] 1 jobs processed at 5.1985 j/s, 0 failed
キューや優先度を送信するメールごとに設定
Action Mailer のdeliver_later()
の引数にて指定することで、設定がオーバーライドされるようです。
https://api.rubyonrails.org/classes/ActionMailer/MessageDelivery.html#method-i-deliver_later
コントローラーで deliver_later
の引数を設定します。
class Api::Email::DeliverOptionsController < ApplicationController def create HelloWorldMailer.with(name: params[:name]).welcome_email.deliver_later( queue: 'override_queue', priority: 45 ) render json: { status: params[:name] } end end
ルーティングを追加、ワーカーを停止し、Railsを起動してから curl でリクエストします。
curl -X POST -H "Content-Type: application/json" -d '{"name":"国光"}' http://localhost:3000/api/email/deliver_options
delayed_jobs
テーブルを見ると、データが入っています。
ジョブワーカーを起動するとメールも送信されました。
[Worker(host:*** pid:16625)] Job ActionMailer::MailDeliveryJob [6f78d26c-fc0d-4d67-9ec1-eb101707ed0d] from DelayedJob(override_queue) with arguments: ["HelloWorldMailer", "welcome_email", "deliver_now", {"params"=>{"name"=>"国光", "_aj_symbol_keys"=>["name"]}, "args"=>[], "_aj_ruby2_keywords"=>["params", "args"]}] (id=158) (queue=override_queue) RUNNING [Worker(host:*** pid:16625)] Job ActionMailer::MailDeliveryJob [6f78d26c-fc0d-4d67-9ec1-eb101707ed0d] from DelayedJob(override_queue) with arguments: ["HelloWorldMailer", "welcome_email", "deliver_now", {"params"=>{"name"=>"国光", "_aj_symbol_keys"=>["name"]}, "args"=>[], "_aj_ruby2_keywords"=>["params", "args"]}] (id=158) (queue=override_queue) COMPLETED after 0.1779 [Worker(host:*** pid:16625)] 1 jobs processed at 4.5583 j/s, 0 failed
メールジョブと通常のジョブを一緒にロールバックする
メールジョブでも Delayed::Job を使う場合
- モデルの保存
- 一般のジョブをキューへ登録
- メールジョブをキューへ登録
を1つのトランザクションとして処理できます。
そこで、トランザクション内で上記の各処理を実行し、最後にトランザクションをロールバックするとどうなるかを確認してみます。
まずは、コントローラーでトランザクションを使って実装します。
class Api::Email::TransactionsController < ApplicationController def create ActiveRecord::Base.transaction do HelloWorldMailer.with(name: params[:name]).welcome_email.deliver_later Apple.create(name: params[:name]) DefaultQueueJob.perform_later(params[:name]) raise ActiveRecord::Rollback end render json: { status: params[:name] } end end
Rails console を使って、事前のモデルの状況を確認します。いずれも登録されていません。
>> Apple.count (1.7ms) SELECT sqlite_version(*) Apple Count (0.5ms) SELECT COUNT(*) FROM "apples" => 0 >> Delayed::Job.count Delayed::Backend::ActiveRecord::Job Count (0.3ms) SELECT COUNT(*) FROM "delayed_jobs" => 0
curl -X POST -H "Content-Type: application/json" -d '{"name":"国光"}' http://localhost:3000/api/email/transactions
Railsのログを見ると、トランザクションがロールバックされています。
Started POST "/api/email/transactions" for 127.0.0.1 at 2022-07-10 13:16:35 +0900 Processing by Api::Email::TransactionsController#create as */* Parameters: {"name"=>"国光", "transaction"=>{"name"=>"国光"}} (0.1ms) SELECT sqlite_version(*) ↳ app/controllers/api/email/transactions_controller.rb:3:in `create' # トランザクション開始 [ActiveJob] TRANSACTION (0.1ms) begin transaction [ActiveJob] ↳ app/controllers/api/email/transactions_controller.rb:4:in `block in create' [ActiveJob] Delayed::Backend::ActiveRecord::Job Create (0.6ms) INSERT INTO "delayed_jobs" ("priority", "attempts", "handler", "last_error", "run_at", "locked_at", "failed_at", "locked_by", "queue", "created_at", "updated_at") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) [["priority", 30], ["attempts", 0], ["handler", "--- !ruby/object:ActiveJob::QueueAdapters::DelayedJobAdapter::JobWrapper\njob_data:\n job_class: ActionMailer::MailDeliveryJob\n job_id: c3555602-bac5-47f5-9038-8246b3194306\n provider_job_id:\n queue_name: mail_queue\n priority: 30\n arguments:\n - HelloWorldMailer\n - welcome_email\n - deliver_now\n - params:\n name: 国光\n _aj_symbol_keys:\n - name\n args: []\n _aj_ruby2_keywords:\n - params\n - args\n executions: 0\n exception_executions: {}\n locale: en\n timezone: Tokyo\n enqueued_at: '2022-07-10T04:16:35Z'\n"], ["last_error", nil], ["run_at", "2022-07-10 04:16:35.040988"], ["locked_at", nil], ["failed_at", nil], ["locked_by", nil], ["queue", "mail_queue"], ["created_at", "2022-07-10 04:16:35.041026"], ["updated_at", "2022-07-10 04:16:35.041026"]] [ActiveJob] ↳ app/controllers/api/email/transactions_controller.rb:4:in `block in create' [ActiveJob] Enqueued ActionMailer::MailDeliveryJob (Job ID: c3555602-bac5-47f5-9038-8246b3194306) to DelayedJob(mail_queue) with arguments: "HelloWorldMailer", "welcome_email", "deliver_now", {:params=>{:name=>"国光"}, :args=>[]} Apple Create (0.2ms) INSERT INTO "apples" ("name", "created_at", "updated_at") VALUES (?, ?, ?) [["name", "国光"], ["created_at", "2022-07-10 04:16:35.052783"], ["updated_at", "2022-07-10 04:16:35.052783"]] ↳ app/controllers/api/email/transactions_controller.rb:6:in `block in create' [ActiveJob] Delayed::Backend::ActiveRecord::Job Create (0.1ms) INSERT INTO "delayed_jobs" ("priority", "attempts", "handler", "last_error", "run_at", "locked_at", "failed_at", "locked_by", "queue", "created_at", "updated_at") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) [["priority", 0], ["attempts", 0], ["handler", "--- !ruby/object:ActiveJob::QueueAdapters::DelayedJobAdapter::JobWrapper\njob_data:\n job_class: DefaultQueueJob\n job_id: c5477611-3075-4c1f-a8a2-f22e7a2b049f\n provider_job_id:\n queue_name: default\n priority:\n arguments:\n - 国光\n executions: 0\n exception_executions: {}\n locale: en\n timezone: Tokyo\n enqueued_at: '2022-07-10T04:16:35Z'\n"], ["last_error", nil], ["run_at", "2022-07-10 04:16:35.057817"], ["locked_at", nil], ["failed_at", nil], ["locked_by", nil], ["queue", "default"], ["created_at", "2022-07-10 04:16:35.057840"], ["updated_at", "2022-07-10 04:16:35.057840"]] [ActiveJob] ↳ app/controllers/api/email/transactions_controller.rb:8:in `block in create' [ActiveJob] Enqueued DefaultQueueJob (Job ID: c5477611-3075-4c1f-a8a2-f22e7a2b049f) to DelayedJob(default) with arguments: "国光" # ロールバックが発生 TRANSACTION (0.4ms) rollback transaction ↳ app/controllers/api/email/transactions_controller.rb:3:in `create' Completed 200 OK in 25ms (Views: 0.2ms | ActiveRecord: 2.0ms | Allocations: 8209)
また、各テーブルには何も登録されていません。
>> Apple.count Apple Count (0.2ms) SELECT COUNT(*) FROM "apples" => 0 >> Delayed::Job.count Delayed::Backend::ActiveRecord::Job Count (0.2ms) SELECT COUNT(*) FROM "delayed_jobs" => 0
以上より、
- モデルの保存
- 一般のジョブをキューへ登録
- メールジョブをキューへ登録
を1つのトランザクションとして処理できました。
Delayed::Job + RSpec によるテストコードを書く
ここまでで各実装方法を見てきました。
ここからは Delayed::Job を使った時のテストコードを書いてみます。
なお、今回のテストコードは RSpec で書いてみますので、 rails-rspec
をインストールしておきます。
https://github.com/rspec/rspec-rails
Job specを書く
キューに入ることを確認する
RSpec の have_been_enqueued
マッチャを使い、 perform_later
を実行したら、キュー名 heavy
に1回入ることを確認します。
have_been_enqueued matcher - Matchers - RSpec Rails - RSpec - Relish
RSpec.describe 'HeavyQueueJob', type: :job do describe '#perform_later' do context 'RSpecの have_enqueued_job マッチャを使う' do it '名前付きキューに1回入ること' do it '名前付きキューに1回入ること' do expect { HeavyQueueJob.perform_later }.to have_enqueued_job(HeavyQueueJob).exactly(:once).on_queue(:heavy) end end end end
実行するとテストが落ちます。
StandardError: To use ActiveJob matchers set `ActiveJob::Base.queue_adapter = :test`
queue_adapter
が :delayed_job
のままではエラーになるようです。
そのため、 before
で :test
に変更し、 after
で元に戻す (:delayed_job
) ようにすると、テストがパスします。
# ... context 'RSpecの have_enqueued_job マッチャを使う' do # 追加 before do ActiveJob::Base.queue_adapter = :test end after do ActiveJob::Base.queue_adapter = :delayed_job end # ...
Delayed::Job テーブルにデータが存在することを確認する
こちらは change
を使って、テーブルにデータが増えていることを確認します。
また、今回は追加された中身( priority
)も確認するため、 have_attributes
マッチャを使います。
have_attributes
matcher - Built in matchers - RSpec Expectations - RSpec - Relish
なお、こちらの場合はDBの中身を確認することから、キューの中身が実行されないよう queue_adapter
は :delayed_job
となっている必要があります。
context '自分でテーブルを調べる' do it 'job specの場合は、ジョブがキューに入ること' do expect { HeavyQueueJob.perform_later }.to change(Delayed::Job, :count).from(0).to(1) end it '優先度も設定されていること' do HeavyQueueJob.perform_later actual = Delayed::Job.first expect(actual).to have_attributes(priority: 10) end end
ジョブが失敗の上限に達しても、Delayed::Job テーブルからジョブが削除されないことを確認する
今回のRailsアプリでは、ジョブが失敗の上限に達しても、Delayed::Job テーブルからジョブが削除されないよう
Delayed::Worker.destroy_failed_jobs = false
となっているのが仕様とします。
そのため、ジョブが削除されないことをテストで確認します。
なお、 Delayed::Job Workerには
- work_off
- run
の2つのメソッドがあります。
https://www.rubydoc.info/gems/delayed_job/Delayed/Worker#work_off-instance_method
ただ、wrok_off
の場合、失敗が2回目以降だと Delayed::Job テーブルの attempts
が更新されません。
そのため、今回は run
を使ってワーカーによるジョブ実行を行っています。
require 'rails_helper' RSpec.describe 'HeavyQueueJob', type: :job do describe '#perform_now' do context '失敗の上限になった場合でも、delayed_jobsテーブルにレコードが残ったままになっているか' do it 'キューにデータが残り続けていること' do expect { # キューに入れる AlwaysFailJob.perform_later # ジョブを2回実行する Delayed::Worker.new.run(Delayed::Job.first) Delayed::Worker.new.run(Delayed::Job.first) }.to change(Delayed::Job, :count).from(0).to(1) # テーブルには1件だけ追加されていること # ジョブが残っており、2回実行されたこと actual = Delayed::Job.first expect(actual).to have_attributes(attempts: 2) # なお、実際の Delayed::Job Worker と異なり、3回目も実行できる Delayed::Worker.new.run(Delayed::Job.first) expect(actual.reload).to have_attributes(attempts: 3) end end end end
Request spec を書く
次は Request spec で動作を確認します。
【注意】 Rails6以降、Request spec では Active Job の queue_adapter が TestAdapter になる
Rails6以降、Request specのデフォルトだと、テストを実行する時のデフォルトの queue_adapter
は TestAdapter (:test)
で固定されます。
固定された原因として、以下では ActionDispatch::SystemTestCase
にて ActiveJob::TestHelper
を include したため、 TestAdapter が :test
に固定されるとあります。
- Rails 6 inconsistently overrides ActiveJob queue_adapter setting with TestAdapter · Issue #37270 · rails/rails
- Rails6でActiveJobのQueueAdapterがTestAdapterに上書きされてしまい非同期で実行されてしまう - Qiita
では、RSpecの Request spec は何に相当するかを調べたところ、READMEによると ActionDispatch::IntegrationTest
でした。
https://github.com/rspec/rspec-rails#what-tests-should-i-write
ActionDispatch::IntegrationTest
については、以下の issue やプルリクで ActiveJob::TestHelper
が include されるようになりました。
- ActionDispatch::IntegrationTest should include ActiveJob::TestHelper, ActionMailer::TestHelper by default · Issue #33838 · rails/rails
- Include test helpers in ActionDispatch::IntegrationTest by ricardotk002 · Pull Request #33849 · rails/rails
- https://github.com/rails/rails/blob/v7.0.3/activejob/lib/active_job/railtie.rb#L41
- Rails 7.0.3 で include されている箇所
そのため、 ActionDispatch::SystemTestCase
同様、 queue_adapter
が :test
に固定されることとなっているようです。
queue_adapter
が :test
となることの影響として、テストコード内ではジョブがインラインで実行されるようになります。
デフォルトでは
ActiveJob::TestCase
がキューアダプタを:test
に設定してジョブがインラインで実行されるようにします。
つまり、Request specの中で「 Delayed::Job
のキューテーブルにレコードが存在すること」のようなテストコードを書いていると、デフォルトのままでは失敗してしまいます。
これについては、RSpecのマッチャ have_enqueued_mail
などで代替できそうに見えます。
https://relishapp.com/rspec/rspec-rails/v/5-1/docs/matchers/have-enqueued-mail-matcher
しかし、そのマッチャの挙動が
- キューに入った形跡があることは確認できる
- Delayed::Job のレコードがあるかどうかは確認できない
となることから、「Delayed::Job のテーブルにレコードが存在すること」まで確認したい場合に困ってしまいます。
queue_adapter を差し替える方法としては
- 各ジョブのテストコードの先頭に
queue_adapter_for_test
メソッドを定義する spec_helper
のbefore
に TestAdapter を使えないよう設定する- TestAdapterを差し替えたいところだけ、
before
でキューを差し替える
などが見つかりました。
ただ、RSpecの have_been_enqueued
マッチャを使っている場合、 queue_adapter
が :test
でないと動作しなくなります。
そこで、局所的に差し替えられる、上記の一番最後の案
TestAdapterを差し替えたいところだけ、
before
でキューを差し替える
にて今回は実装していきます。
指定したキューに登録されることを確認する
Job specの場合は have_enqueued_job
を使いましたが、今回は have_been_enqueued
を使ってみます。
have_been_enqueued matcher - Matchers - RSpec Rails - RSpec - Relish
なお、先程の注意にある通り、Request spec の queue_adapter
は :test
になりますが、 have_been_enqueued
マッチャは :test
のままで良いです。
RSpec.describe 'Api::Priority::HighPrioritiesController', type: :request do describe 'POST /api/priority/high_priorities' do context 'RSpecの have_been_enqueued マッチャを使う' do it 'POSTすると、キュー default に1回入る' do post api_priority_high_priorities_path, params: { name: 'test' } expect(HighPriorityJob).to have_been_enqueued.exactly(:once).on_queue(:default) end end end end
Delayed::Job テーブルにデータが存在することを確認する
今回は優先度が想定通りかをテーブルの中を見て確認します。
なお、Reqeust spec でテーブルの中身を使う場合は、 queue_adapter
の差し替えを行わないよう設定します。
context '自分でテーブルを調べる' do before do queue_adapter_changed_jobs.each(&:disable_test_adapter) end it 'ジョブが delayed_job テーブルに登録され、優先度が10であること' do expect { post api_priority_high_priorities_path, params: { name: 'シナノスイート' } }.to change(Delayed::Job, :count).from(0).to(1) actual = Delayed::Job.first expect(actual).to have_attributes(priority: 10) end end
ジョブが失敗の上限に達しても、Delayed::Job テーブルからジョブが削除されないことを確認する
Job spec同様にして確認できます。
なお、テーブルの中身を確認しているため、 queue_adapter
は :test
へ差し替わらないようにします。
require 'rails_helper' RSpec.describe 'Api::AlwaysFailJobs::ApplesController', type: :request do describe 'POST /api/always_fail_jobs/apples' do context '失敗回数の上限に達した場合' do before do queue_adapter_changed_jobs.each(&:disable_test_adapter) end it 'キューにデータが残り続けていること' do expect { post api_always_fail_jobs_apples_path, params: { name: '秋映' } }.to change(Delayed::Job, :count).from(0).to(1) Delayed::Worker.max_attempts.times do |i| # work_off だと2回目以降でエラーになった場合に Delayed::Job への更新が行われないっぽいので、 # run() でジョブを明示的に指定して実行する Delayed::Worker.new.run(Delayed::Job.first) expect(Delayed::Job.first).to have_attributes(attempts: i + 1) end # ちなみに、実際のワーカーと異なり、runではもう一回実行することもできる。この場合は attempts がインクリメントされる Delayed::Worker.new.run(Delayed::Job.first) expect(Delayed::Job.first).to have_attributes(attempts: 3) end end end end
メールと組み合わせた時のspecを書く
メールがキューに入るかを確認する
【注意】 have_enqueued_mail マッチャは、Rails7 + rspec-rails 5系の環境では動かない
RSpecでは have_enqueued_mail
マッチャを使うことで、メールがキューに入ったかを確認できます。
have_enqueued_mail matcher - Matchers - RSpec Rails - RSpec - Relish
しかし、Rails7 + rspec-rails 5系の環境で have_enqueued_mail
マッチャを使うとエラーになります。
NameError: uninitialized constant ActionMailer::DeliveryJob job[:job] <= ActionMailer::DeliveryJob ^^^^^^^^^^^^^ Did you mean? ActionMailer::MailDeliveryJob
以下のissueに原因が記載されています。
uninitialized constant ActionMailer::DeliveryJob with latest rails/master · Issue #2531 · rspec/rspec-rails
issueの最後に
This has been released as 6.0.0.rc1 theres still potential for the breaking change to the mailer api to do with params / args matching, but those needing support from a tagged release can now get this via rc1.
Rails 7 support is only via version 6.x per our versioning strategy.
https://github.com/rspec/rspec-rails/issues/2531#issuecomment-1086959613
とあるように、Rails7 では rspec-rails 6系を使ったほうが良さそうです。
have_enqueued_mailマッチャを使ってメールのキュー登録を確認する
以下のようにすることで、キューに入る回数とキュー名を確認できます。
RSpec.describe 'Api::Email::DeliverOptionsController', type: :request do describe 'POST /api/email/deliver_options' do context 'キューに入るかを確認' do it 'キュー名「override_queue」というキューに1回だけ入る' do expect { post api_email_deliver_options_path, params: { name: '王林' } }.to have_enqueued_mail(HelloWorldMailer, :welcome_email) .exactly(1) # 回数 .on_queue(:override_queue) # キュー名 end end end end
メールが送信されることを確認する
ActiveJob::TestHelper
の perform_enqueued_jobs
を使うことで、キューに入ったメールを送信できます。
https://api.rubyonrails.org/v7.0.3/classes/ActiveJob/TestHelper.html#method-i-perform_enqueued_jobs
また、テストでメールを送信すると ActionMailer::Base.deliveries
に中身が含まれます。
Action Mailer の基礎 - Railsガイド
そのため、ActionMailer::Base.deliveries
から取り出して確認します。
context '送信されたメールを確認' do it 'メールが1通送信され、中身も想定した通りであること' do expect { perform_enqueued_jobs do post api_email_deliver_options_path, params: { name: 'シナノスイート' } end }.to change(ActionMailer::Base.deliveries, :count).from(0).to(1) actual = ActionMailer::Base.deliveries[0] # Rails全体の設定値 config.action_mailer.default_options = { from: 'no-reply@example.com' } ではなく # ApplicationMailer で設定した from@example.com が指定されていること expect(actual.from).to eq ['from@example.com'] expect(actual.to).to eq ['bar@example.com'] expect(actual.subject).to eq 'Hello, world!' expect(actual.body).to include('シナノスイート') expect(actual.body).to include('Hello, world!') end end
トランザクションをロールバックしたら、何もDBに入っていないか確認する
ここでは、プロダクションコードでトランザクション & ロールバックを行っているコントローラに対するテストを書きます。
class Api::Email::TransactionsController < ApplicationController def create ActiveRecord::Base.transaction do HelloWorldMailer.with(name: params[:name]).welcome_email.deliver_later Apple.create(name: params[:name]) DefaultQueueJob.perform_later(params[:name]) raise ActiveRecord::Rollback end render json: { status: params[:name] } end end
RSpec::Matchers.define_negated_matcherについて
HelloWorldMailer
と DefaultQueueJob
の両方がキューに登録されていないことを確認するため、
expect { post api_email_transactions_path, params: { name: '王林' } }.not_to have_enqueued_mail(HelloWorldMailer, :welcome_email) .and have_enqueued_job
のように書きたくなりますが、 NotImplementedError
になります。
NotImplementedError: `expect(...).not_to matcher.and matcher` is not supported, since it creates a bit of an ambiguity. Instead, define negated versions of whatever matchers you wish to negate with `RSpec::Matchers.define_negated_matcher` and use `expect(...).to matcher.and matcher`.
エラーメッセージにある通り、 RSpec::Matchers.define_negated_matcher
を定義すれば良さそうです。
- Define negated matcher - RSpec Expectations - RSpec - Relish
- RSpecのDefine negated matcherが地味に便利 - Qiita
キューに入らないことを確認する (が、ロールバックしてもキューに登録された形跡あり)
ファイルの冒頭に RSpec::Matchers.define_negated_matcher
を追加してコードを書きます。
RSpec::Matchers.define_negated_matcher :not_have_enqueued_mail, :have_enqueued_mail RSpec::Matchers.define_negated_matcher :not_have_enqueued_job, :have_enqueued_job RSpec.describe 'Api::Email::TransactionsController', type: :request do describe 'POST /api/email/transactions' do context 'トランザクションでロールバックした場合' do context 'キューの確認' do it 'メールとジョブを確認すると、キューに形跡はある模様' do expect { post api_email_transactions_path, params: { name: '王林' } }.to not_have_enqueued_mail(HelloWorldMailer, :welcome_email) .and not_have_enqueued_job end end end end end
しかし、このテストはパスせず、以下のメッセージが出力されます。
ロールバックしているはずなのに、キューに登録されたことは検知されているようです。
expected not to enqueue HelloWorldMailer.welcome_email at least 1 time but enqueued 1 ...and: expected not to enqueue at least 1 jobs, but enqueued 2
そのため、 pending
を使って、落ちることが正しいようなテストにしておきます。
it 'メールとジョブを確認すると、キューに形跡はある模様' do pending('以下のspecがfailするので、キューには入った形跡がある') expect { post api_email_transactions_path, params: { name: '王林' } }.to not_have_enqueued_mail(HelloWorldMailer, :welcome_email) .and not_have_enqueued_job end
テーブルにジョブが登録されていないことを確認する
たとえキューに入っていた形跡があったとしても、Delayed::Job テーブルに登録されていなければワーカーによってジョブは実行されません。
そこで、 queue_adapter
を :test
としないよう設定した上で、テーブルにジョブが登録されていないことを確認すると、テストがパスします。
context 'テーブルを確認' do before do queue_adapter_changed_jobs.each(&:disable_test_adapter) end it 'ジョブが delayed_job テーブルに登録されていないこと' do expect { post api_email_transactions_path, params: { name: 'シナノスイート' } }.not_to change(Delayed::Job, :count) end end
以上でテストコードでも一通りの挙動を確認できました。
ソースコード
Gtihubに上げました。
https://github.com/thinkAmi-sandbox/rails_delayed_job-sample
今回のプルリクはこちらです。
https://github.com/thinkAmi-sandbox/rails_delayed_job-sample/pull/1