UPDATE: 2022-10-28 20:05:42

はじめに

ポーリング処理的な処理をRでつくったときのメモ。

メモ

バッチ処理を一般的にシステムが稼働していない夜間や休日、サーバーの稼働が低いときに定期的に実行されるものとするならば、ポーリング処理は、処理を同期するために、複数の機器やプログラムに対して順番に定期的に問い合わせを行い、一定の条件を満たした場合に送受信や処理を行う処理方式のことで、それっぽいのをRで作ったという話。そもそもバッチ処理、ポーリング処理の理解が誤っている可能性がある。

この仕組を作れれば、フォルダでもサイトでもなんでもいいので(Xとする)、Xの中で変更があれば変更されたことを検知して(サーバーないし監視先が通知できないので)、Rでデータを処理して、通知したり、DBやフォルダに連携したりできる(Yとする)。つまりX⇔R→Yみたいなことができる。LambdaてきなことをRでしたい。サーバレスでない時点でめちゃくちゃではあるが…。なので、監視先からメッセージ通知がきて、それをトリガに処理を起動できるのであればそれが良い。

ここでは、フォルダに定期、非定期でcsvデータがインポートされたら、Rでcsvを読み込み、集約処理を行い、JSONにしてエクスポートするという流れを想定する。sparklyrパッケージのstream_read_csv()のような感じ。

定期的なデータのエクスポートはMySQLのストアドルーチンとイベントスケジュラーで環境構築し、非定期のデータインポートは手動で対応した。DBの部分は[https://mysql.hatenablog.jp/:title]を参照。

conv <- function(data) {
  res <- data %>%
    dplyr::filter(flg != 1) %>%
    dplyr::summarise(
      start_id = min(id),
      end_id = max(id),
      sum_val1 = sum(value1),
      sum_val2 = sum(value2),
      cnt = n(),
      cnt_flg = (nrow(data) - cnt)
    )
  
  return(jsonlite::toJSON(res))
}

library(dplyr)
library(stringr)

input_dir = "~/Desktop/test_in"
output_dir ="~/Desktop/test_out"

monitor <- function(input_dir, output_dir, log_display) {
  
  data_list <- list.files(path = input_dir, full.names = TRUE)
  data_list <- str_replace(string = data_list, pattern = path.expand("~/"), replacement = "~/")
  record_path <- paste0(input_dir, "/.records.csv")
   
  if (!file.exists(record_path)) {
    write.csv(x = data_list, file = record_path)
  }
  
  current_data <- read.csv(file = record_path, stringsAsFactors = FALSE)$x
  
  for (i in seq_along(data_list)) {
    
    if (!(data_list[[i]] %in% current_data)) {
      if(log_display) message(paste0("Imported:", data_list[[i]]))
      data <- read.csv(file = data_list[[i]])
      
      name <- str_replace(str_extract(pattern = "(?<=/)(?!.*/).+", string = data_list[[i]]), pattern = "\\.csv", replacement = "")
      export_path <- paste0(output_dir, "/", name, "_aggregated.json")
      
      if(log_display) message("Data Processing.....")
      
      jsonlite::write_json(x = conv(data), path = export_path)
      if(log_display) message(paste0("Exported:", export_path))
    }
  }
  write.csv(x = data_list, file = record_path)
}

polling <- function(input_dir, output_dir, poll_every, log_display){
  repeat {
    monitor(input_dir, output_dir, log_display)
    Sys.sleep(poll_every)
  }
}

polling(input_dir = "~/Desktop/test_in",
       output_dir ="~/Desktop/test_out",
       poll_every = 1,
       log_display = TRUE)

なんだか、全てがいまいちである。パスの加工とか、ファイルの差分を管理するところとか・・・。実行するとこうなる。

polling(input_dir = "~/Desktop/test_in",
       output_dir ="~/Desktop/test_out",
       poll_every = 1,
       log_display = TRUE)

Imported:~/Desktop/test_in/mysql2csv_logs_20201031_223443.csv
Data Processing.....
Exported:~/Desktop/test_out/mysql2csv_logs_20201031_223443_aggregated.json
Imported:~/Desktop/test_in/mysql2csv_logs_20201031_223458.csv
Data Processing.....
Exported:~/Desktop/test_out/mysql2csv_logs_20201031_223458_aggregated.json
Imported:~/Desktop/test_in/mysql2csv_logs_20201031_223513.csv
Data Processing.....
Exported:~/Desktop/test_out/mysql2csv_logs_20201031_223513_aggregated.json
【略】
Imported:~/Desktop/test_in/mysql2csv_logs_20201031_223643.csv
Data Processing.....
Exported:~/Desktop/test_out/mysql2csv_logs_20201031_223643_aggregated.json
Imported:~/Desktop/test_in/mysql2csv_logs_20201031_223658.csv
Data Processing.....
Exported:~/Desktop/test_out/mysql2csv_logs_20201031_223658_aggregated.json

あとはコマンドラインからRスクリプトとして、使えるようにすれば一旦完成。今回は、ポーリングの間に一時停止してループするバックグラウンドジョブRで実行する方針をとったが、cronなんかでスケジューリングして、Rスクリプトを定期的に実行するとかでもよいと思う。この手の仕組の知識や技術がないので、よい練習になりました。素直にS3とLambdaを使おう。