mc_rtc::Configuration 設定mc_rtc コントローラは通常、厳格なタイミング要件を持つリアルタイムスレッド内で動作しています。つまり、MCGlobalController::run() は計算 Timestep(通常は 2ms または 5ms)よりも一貫して速く結果を提供することが求められます。このタイムステップよりも遅くなった計算実行を「ミスイテレーション(missed iteration)」と呼びます。これは、ロボットのハードウェア上で動作する場合、低レベルハードウェアの扱い方によっては重大な影響を及ぼす可能性があります。ミスイテレーションの一般的な症状は、ジッター、関節のノイズ、ぎこちない動きなどです。イテレーションがミスされると、ロボットに送信されるコマンドは前回のイテレーションのものがそのまま維持されます:
さらに、多くのアルゴリズムはセンサデータが timestep で定義された一定のレートで受信されることを前提としています。
いずれにせよ、特に連続して複数回のイテレーションをミスすることは非常に悪いことであり、絶対に避けるべきです。ミスイテレーションの主な原因を見てみましょう:
timestep よりも十分に速い必要があります。平均計算時間は timestep の75%未満を目指しましょう。上記の例から明らかなように、複雑なコントローラを書く場合、何らかのスレッド化は避けられません。しかし、これはコードを大幅に複雑化させ、誤解や誤管理がバグ(多くは深刻かつ予測不能な未定義動作)につながりやすい側面です。本チュートリアルの残りでは以下を扱います:
mc_rtc におけるスレッド化:安全なこと/危険なことmc_rtc が提供するスレッド化支援ツールの紹介スレッドとは、プログラム内の独立した実行フローです。mc_rtc コントローラの文脈では、スレッドはログ記録、ファイルI/O、ビジョン処理など、リアルタイム制御ループには遅すぎる/予測できない処理をオフロードするためによく使われます。これにより、メイン制御ループは厳格なタイミング要件を維持しつつ、遅いタスクは並列で実行できます。
しかし、スレッドを使うとデータ同期の課題が生じます。複数のスレッドが共有データ(例:センサ値や制御コマンド)にアクセス・変更する必要がある場合、そのアクセスを適切に調整しなければなりません。これを怠ると、スレッドの実行タイミングに依存して結果が変わる「競合状態(race condition)」が発生し、データの不整合や破損につながります。ミューテックスやロックなどの同期プリミティブが役立ちますが、注意が必要です:リアルタイムスレッドでロックを保持すると、他のスレッドがロックを保持している間に遅延やミスイテレーションが発生する可能性があります。
また、スレッドの生成にはコストがあります。新しいスレッドを作成するにはOSがリソースを割り当て、スケジューリングを管理する必要があり、制御ループのタイムステップと比べてかなりの時間がかかります(通常は数十マイクロ秒程度ですが、カーネル依存で予測不能です)。そのため、スレッドは事前に作成し再利用するべきで、制御ループ内で頻繁に生成・破棄すべきではありません。代替案としてスレッドプールを使うと、複数のタスクで事前生成したスレッドを再利用できます。
複数のスレッドが共有データにアクセスする場合、同時にデータを変更しない、または読み書きが干渉しないようにしなければなりません。これをデータ同期と呼びます。適切な同期がないと、競合状態が発生し、プログラムの動作がスレッドの予測不能なタイミングに依存するバグが生じます。リアルタイム制御では、これがセンサデータの破損、不正なコマンド、さらにはロボットの危険な動作につながることもあります。
C++11で最も一般的な同期ツールは std::mutex です。ミューテックス(相互排他)は、同時に1つのスレッドだけがクリティカルセクションに入れるようにします。ミューテックスを使うには、共有データにアクセスする前にロックし、アクセス後にアンロックします。C++11の std::lock_guard を使うとロック/アンロックを自動管理できます。注意:ミューテックスは魔法の道具ではありません。保護したいデータを読み書きするすべてのスレッドでロックする必要があります(コントローラ、プラグインなど両方で)。
例えば、バックグラウンドスレッドが更新し、リアルタイム制御スレッドが読み取る共有変数 shared_value があるとします。
1. ミューテックスを使った複雑な共有オブジェクト
#include <mutex>
#include <thread>
#include <iostream>
struct SensorData
{
double position;
double velocity;
};
SensorData shared_sensor_data{0.0, 0.0};
std::mutex mtx;
void background_thread()
{
for(int i = 0; i < 10; ++i)
{
{
std::lock_guard<std::mutex> lock(mtx);
shared_sensor_data.position += 1.0;
shared_sensor_data.velocity += 0.5;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
void control_thread()
{
for(int i = 0; i < 10; ++i)
{
SensorData local_copy;
{
std::lock_guard<std::mutex> lock(mtx);
local_copy = shared_sensor_data;
}
std::cout << "Position: " << local_copy.position
<< ", Velocity: " << local_copy.velocity << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(2));
}
}
2. 単純なデータにはアトミック型を使う
整数やブーリアンなど単純な型には、ロックフリー同期のために std::atomic を使えます:
#include <atomic>
#include <thread>
#include <iostream>
std::atomic<int> shared_counter{0};
void background_thread()
{
for(int i = 0; i < 10; ++i)
{
++shared_counter; // アトミックインクリメント
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
void control_thread()
{
for(int i = 0; i < 10; ++i)
{
int value = shared_counter.load();
std::cout << "Counter: " << value << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(2));
}
}
注意: アトミック型は整数やブーリアンなど単純なデータにのみ適しています。複雑なオブジェクトにはミューテックスや専用のロックフリー構造体を使いましょう。
mc_rtc のデータ構造は基本的にスレッドセーフではありません。つまり、MCGlobalController、Robot、Task などの mc_rtc データに複数スレッドからアクセス・変更する場合は、未定義動作を避けるために適切な同期が必要です。
以下は、メインスレッドがロボットデータを変更している間に別スレッドがロボットデータを読み取る危険な例です:
#include <mc_rbdyn/Robot.h>
#include <thread>
#include <iostream>
void unsafe_read(mc_rbdyn::Robot * robot)
{
// この関数は別スレッドでロボットデータを読み取る
for(int i = 0; i < 100; ++i)
{
// 危険:他のスレッドが同時に robot->posW() を変更している可能性あり
// 結果は未定義動作。正常に動く場合もあれば、クラッシュやゴミ値になることも
std::cout << "Robot position: " << robot->posW().translation().transpose() << std::endl;
}
}
void unsafe_example(mc_rbdyn::Robot * robot)
{
// ロボットデータを読み取るスレッドを開始
std::thread t(unsafe_read, robot);
// 一方、メインスレッドはロボットデータを変更
for(int i = 0; i < 100; ++i)
{
// 危険:ロボットの位置を変更
robot->posW(sva::PTransformd{sva::RotZ(i * mc_rtc::constants::PI), Eigen::Vector3d{i/10., 0, 0}});
}
t.join();
}
警告:
この例は、両方のスレッドが同期なしで同じ mc_rbdyn::Robot オブジェクトにアクセス・変更しているため危険です。データ競合や未定義動作の原因となります。
今度は、std::mutex を使って mc_rbdyn::Robot オブジェクトへのアクセスを同期した安全なバージョンです。これにより同時の読み書きを防ぎ、データ競合を回避します。
#include <mc_rbdyn/Robot.h>
#include <thread>
#include <mutex>
#include <iostream>
std::mutex robot_mutex;
void safe_read(mc_rbdyn::Robot * robot)
{
for(int i = 0; i < 100; ++i)
{
std::lock_guard<std::mutex> lock(robot_mutex);
std::cout << "Robot position: " << robot->posW().translation().transpose() << std::endl;
}
}
void safe_example(mc_rbdyn::Robot * robot)
{
std::thread t(safe_read, robot);
for(int i = 0; i < 100; ++i)
{
std::lock_guard<std::mutex> lock(robot_mutex);
robot->posW(sva::PTransformd{sva::RotZ(i * mc_rtc::constants::PI), Eigen::Vector3d{i/10., 0, 0}});
}
t.join();
}
これで、robot オブジェクトへのすべてのアクセスがミューテックスで保護され、スレッドセーフになります。
デッドロックとは、2つ以上のスレッドが互いにリソース(ミューテックスなど)の解放を待ち続け、全スレッドが永久に待機状態になることです。これは、複数のミューテックスを異なる順序でロックする場合によく発生します。
以下の例は、ミューテックスの誤った使い方でデッドロックが発生する様子を示しています:
#include <mutex>
#include <thread>
#include <iostream>
std::mutex mutexA;
std::mutex mutexB;
void thread1()
{
std::lock_guard<std::mutex> lockA(mutexA);
// 作業のシミュレーション
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::lock_guard<std::mutex> lockB(mutexB); // mutexBを待つ
std::cout << "Thread 1 acquired both mutexes\n";
}
void thread2()
{
std::lock_guard<std::mutex> lockB(mutexB);
// 作業のシミュレーション
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::lock_guard<std::mutex> lockA(mutexA); // mutexAを待つ
std::cout << "Thread 2 acquired both mutexes\n";
}
void deadlock_example()
{
std::thread t1(thread1);
std::thread t2(thread2);
t1.join();
t2.join();
}
フレームワーク内でよくあるケースに対応するため、mc_rtc はスレッド化やデータ同期を支援するユーティリティを提供しています:
mc_rtc::threading::AsyncJob: ジョブを別スレッドで非同期実行する便利な方法を提供します。std::async ジョブを作成・管理します。mc_rtc::threading::AsyncJob クラスは、ジョブを別スレッドで非同期実行する便利な方法を提供します。内部的には std::async を使い、ジョブの完了確認、結果取得、例外処理などのメソッドを提供します。
利点:
欠点:
AsyncJob クラスは CRTP(Curiously Recurring Template Pattern)ベースのクラスで、mc_rtc で非同期計算を実行するために使います。ジョブの状態管理、結果取得、ログやGUIとの統合も可能です。
使い方の手順:
MyInput クラスを作成 ジョブに必要なすべてのデータをコピーで保持します。
MyResult クラスを作成 ジョブが生成するすべてのデータをコピーで保持します。
ジョブクラスを定義
mc_rtc::threading::MakeAsyncJob<YourJob, InputType, ResultType> を継承し、ResultType computeJob() メソッドを実装します。
(オプション)ログやGUIの追加
void addToLoggerImpl() や void addToGUIImpl() を実装してカスタムログ/GUI要素を追加できます。
ジョブの作成と利用
startAsync() で計算を開始checkResult() を呼び、状態更新と結果取得lastResult() で結果にアクセス注意:
running() == false)行うことcheckResult() を呼び、結果処理と管理を行うこと基本例:
// MyAsyncJob.h
struct MyInput
{
double data; // ジョブへの入力値
};
struct MyResult
{
double value; // 計算結果
};
struct MyAsyncJob : public mc_rtc::threading::MakeAsyncJob<MyAsyncJob, MyInput, MyResult>
{
MyResult computeJob()
{
MyResult res;
res.value = input_.data * 2;
return res;
}
void addToLoggerImpl()
{
logger_->addLogEntry(loggerPrefix_ + "_my_job_value", this, [this]() { return lastResult_->value; });
}
void addToGUIImpl()
{
gui_->addElement(this, guiCategory_, mc_rtc::gui::Label("Result", [this]() { return lastResult_->value; }));
}
};
MyAsyncJob を使ったFSMステートの例:
// SimpleAsyncState.h
#include <mc_control/fsm/State.h>
#include <mc_rtc/threading/AsyncJob.h>
#include <mc_rtc/gui/StateBuilder.h>
#include <mc_rtc/log/Logger.h>
#include "MyAsyncJob.h"
// シンプルなAsyncJobを使ったFSMステート。各結果ごとに再実行
struct SimpleAsyncState : public mc_control::fsm::State
{
MyAsyncJob job_; // ジョブクラスの生成(ジョブはまだ開始しない)
int counter_ = 0;
int maxIterations_ = 5; // 5回で終了
void configure(const mc_rtc::Configuration & config) override
{
// async jobの初期入力をセット
config("input", job_.input().data);
config("maxIterations", maxIterations_);
}
void start(mc_control::fsm::Controller & ctl) override
{
// 最初のasync jobを開始。結果はrun()内でjob_.checkResult()で取得
job_.startAsync();
job_.addToLogger(ctl.logger(), name());
job_.addToGUI(ctl.gui(), {name()});
counter_ = 0;
}
bool run(mc_control::fsm::Controller & ctl) override
{
if(job_.checkResult())
{ // 前回のasync jobが完了したので結果を取得
const auto & result = *job_.lastResult();
mc_rtc::log::info("[{}] Async result {}: {}", name(), counter_, result.value);
counter_++;
// ジョブが実行中でないので、次の入力を安全に更新できる
job_.input().data += 1.0;
if(counter_ < maxIterations_)
{
job_.startAsync(); // 次のasync jobを開始
return false; // 継続
}
else
{
output("OK"); // maxIterations_回で終了
return true;
}
}
return false;
}
void teardown(mc_control::fsm::Controller & ctl) override
{
// AsyncJobのデストラクタでクリーンアップ
}
};