# The FluMapper Data Pipeline Spatiotemporal transformation of social media geostreams: a case study of Twitter for flu risk analysis (Myunghwa Hwang, Shaowen Wang, Zhenhua Zhang, IWGS '13 Proceedings of the 4th ACM SIGSPATIAL International Workshop on GeoStreaming Pages 12-21) ###### tags: `ソーシャルメディアデータ` `ジオストリム` `データアナリティクス` `データストリーム` `Social Media and Location-Based Services` ## 背景 - ジオストリームベースの疾病監視を達成する要求は、大量のジオストリームを分析手法がすぐに適用できる時空間的に編成されたデータにリアルタイムで変換できるスケーラブルなデータインフラストラクチャを確立すること。 - 疾患監視の目的でソーシャルメディアジオストリームの時空間分析を可能にする問題を調査した。 ## 問題 - 大量の空間データの連続的なストリーミング( the continuous streaming of massive spatial data)  - データ内容のテキストによる非構造化表現(the textual, unstructured representation of data contents)  - 対話型データ探索の必要性(the need for interactive data exploration)  - 空間的および時間的スケールに対する感度を含む( sensitivity to spatial and temporal scales)  ## 概要 - この研究では、ソーシャルメディアのジオストリームをほぼリアルタイムで収集、処理、および集計するために複数の計算コンポーネントを統合するデータパイプラインソリューションを開発した。   - ソーシャルメディアジオストリーム、Twitterデータストリーム、および1つのタイプの病気の流行病、インフルエンザに焦点を当てる。  - **data pipeline solution**: ソーシャルメディアジオストリームを時空間データエンティティに変換するためのデータパイプラインソリューション **メリット:** 1. ソーシャルメディアにおける疾患関連会話の探索的時空間分析のためのジオストリームデータのほぼリアルタイムのマルチスケール時空間的編成をサポートする 2. このソリューションは他の種類のSMGにも広く適用できる。 * **FluMapper data pipeline**: ツイートメッセージを継続的に処理し、スライディングウィンドウ方式でTwitterユーザーのフローを計算し、階層的なマルチスケール形式でフローデータを集約することによって、インフルエンザの流行のほぼリアルタイムの時空間監視を容易にする。 ## The FluMapper Data Pipeline ### 要求: 1. ツイートデータのリアルタイム収集(インフルエンザのリスクとツイーターの機動性の時空間パターンの異常な変化を早期に検出するため) 2. ILIに関連するツイートメッセージの識別(インフルエンザのリスクとTwitterユーザーのインフルエンザ関連の流れを推定するため)  3. 非構造化テキストツイートを空間データエンティティに変換し → リスク推定とフローマッピングの分析方法を適用できる  4. スケーラブルなデータストレージシステムを使用する(Twitterのデータストリームの膨大さと、インフルエンザのリスクとツイーターの動きを過去から監視する必要があるため) 5. ツイートメッセージの前処理済みデータへのサービスベースのアクセスを提供する ### アーキテクチャ: - 4つのソフトウェアコンポーネント  - Twitterからの生のツイートをキャッシュするテキストファイルのコーパス  - ツイートとツイーターのフローの空間データエンティティを管理するための分散データベースのセット  ### コンポーネント ![](https://i.imgur.com/VJsAw0u.png) - Crawler 生ツイートをテキストファイルとしてキャッシュする(Twitterからツイートメッセージを収集し、履歴データ分析を可能にするため) crawlerはTwitterのStreaming API(FluMDPの主なデータソース)にアクセスし、それとの長期にわたる接続を維持する - Tweet Importer 個々のツイッタを解析し、retweet、空間的な足跡の情報はなく、 アメリカの外に場合はそれらを除外 keyword一致を使用して、各ツイートメッセージのILIとの関連性を判断(flurelatedとしてマークされる) keyword: “flu”, "tamiflu", "h1n1", "swine",... > 注意点: > - 地理座標、各ツイートが投稿されたセルが階層空間グリッドの最高レベルで識別され、そのセルのIDがツイートの属性として追加され、後でTwitterユーザーのflow統計を計算する > 問題:flowの計算 > ![](https://i.imgur.com/DTKt71R.png) > 収集された軌跡データを存在に従って個々のツイーターの流れに集約した > - 空間グリッドが𝑅行と𝐾列からなる𝐺_0である > 𝑚_(𝑖,𝑡):第r行k列のセル   (𝑟−1)∗𝐾+𝑘 : 𝑚_(𝑖,𝑡)のセルID 𝐿_𝑖の連続した位置は線分𝑆_(𝑖,𝑡,𝑡+1) と呼ばれる **𝑆_(𝑖,𝑡,𝑡+1)={𝑚_(𝑖,𝑡),𝑚_(𝑖,𝑡+1) }={𝑐(𝑟,𝑘),𝑐(𝑟^′,𝑘^′ )}={(𝑟−1)∗𝐾+𝑘,(𝑟^′−1)∗𝐾+𝑘^′}** r≠𝑟^′ または𝑘≠𝑘^′ 場合:**フローが生じる** - 𝐿_𝑖の更新:   Tweet Importerは軌跡がG0内の2つ異なるセルにまたがっている場合は、ツイートDBのFlowテーブルに新しいレコードを追加する ![](https://i.imgur.com/J2M5faI.png) > - TweetDBはMongoDBの使用を通してスケーラブルなデータストレージをサポートする > ‐リアルタイムのサーベイランスのために、DBのレコードは現在のsliding-time windowのデータのみを保持するように増分更新される - Cube generator 個々のレベル(individual-level)のデータは空間グリッドの階層システムに沿って集約される ツイートとクエリの開始時刻から最近7日間のデータをクエリする   →ツイート、インフルエンザ関連のツイート、ツイーター、アウトフロー、インフローの数を数える(3時間ごとに集計する) →2時間の新しいデータコレクションとしてSpace-time Cube DBに保存されたため、ツイータの動きの時間的ダイナミクスを3時間間隔でスムーズに調べる - Data filter service ユーザー定義の期間内に各ツイーターによって投稿された最初のツイートを返す 時間的照会のデフォルト期間は、照会開始時刻から1週間  1人のユーザーがILIについて投稿した重複ツイートを排除するために、data filter serviceは最初のツイートのみを返す ### データに関する #### パイプラインでは7日間の長さのsliding windowを使用した  原因: これまでの実験では、この長さよりも小さいウィンドウサイズでも、さまざまな空間スケールでツイーターの動きに大きな変化が見られない #### 毎日のボリュームのツイートメッセージを示す - cralerによって収集され、DBにインポートされ、つぶやき投稿の場所の地理座標を持ち、ILI関連の症状と名前の1つ以上のキーワードを含む ![](https://i.imgur.com/h0RefPc.png) > 北米地域で作成された約270万のツイート: > - 25% リツイート、不適切な地理参照、または米国外への投稿 > - DBのツイートコレクションには平均約1400万件のレコードが保存される > - 約92.6%がツイート位置の正確な地理座標の情報を持って > - 1.2%だけがILI関連である - ツイーターの軌跡と動きの統計は、FluMapper data pipeline の機能を実証する ![](https://i.imgur.com/IBG6nzD.png) > - 1日の平均増分は18,927に達す > - 1 km² 付近から別の場所へのTwitterユーザーの流れが2億2500万回 - ツイーターのモビリティのマルチスケール時空間探索が大きなサイズの構造化空間データを要求し、FluMapperデータパイプラインの設計がそのような要求に対処するのに効率的であったこと ![](https://i.imgur.com/Y7i8BSz.png) > - 1週間に少なくとも1つツイートが投稿されたセルの平均数はグリッドのすべてのレベルで約130万 > - レベル0グリッドのセルは、キューブ内のドキュメント全体の50%以上を占めた >- Spatial-time cube DBのデータテーブルで観察された ## まとめ - Flumapper data pipelineの効率性と拡張性のある対策を提供した - pipelineのデータ容量とパフォーマンスのさらなる強化する必要がある