Статьи

Mahout и Scalding для выявления покерных сговоров

Когда я читал очень яркую книгу о Mahout, Mahout In Action (которая также является хорошим практическим введением в машинное обучение), один из примеров привлек мое внимание.
Авторы книги, где использовался известный алгоритм кластеризации K-средних для поиска похожих игроков на stackoverflow.com , где критерием сходства был набор авторов вопросов / ответов, которые пользователи поднимали / понижали.

Одним словом, алгоритм K-средних итеративно находит кластеры точек / векторов, расположенных близко друг к другу, в многомерном пространстве. Применительно к проблеме поиска похожих игроков в StackOverflow, мы предполагаем, что каждая ось в многомерном пространстве является пользователем, где расстояние от нуля является суммой очков, присуждаемых вопросам / ответам, заданным другими игроками (теми, кто Размеры также часто называют «объектами», где расстояние — «вес объекта»).

Очевидно, что тот же подход может быть применен к одной из самых сложных проблем в многопользовательском онлайн-покере — обнаружению сговора. Мы делаем простое предположение, что если два или более игрока сыграли слишком много игр друг с другом (принимая во внимание, что любой из игроков мог просто быть активным игроком, который играл с кем-либо во многих играх), они могли бы быть в сговоре.

Мы разбиваем огромную группу игроков на маленькие узкие кластеры (желательно по 2-8 игроков в каждой), используя алгоритм кластеризации K-средних. В базовой реализации, которую мы пройдем дальше, каждый пользователь представлен в виде вектора, где осями являются другие игроки, с которыми он играл (а вес функции — это количество игр, сыгранных вместе).

Этап 1. Создание словаря

В качестве первого шага нам нужно создать словарь / список всех игроков, вовлеченных в поднабор истории рук, который мы анализируем:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// extract user ID from hand history record
val userId = (playerHistory: PlayerHandHistory) =>
  new Text(playerHistory.getUserId.toString)
 
// Builds basic dixtionary (enumeration, in fact) of all the players, participated in the selected subset of hand
// history records
class Builder(args: Args) extends Job(args) {
 
  // input tap is an HTable with hand history entries: hand history id -> hand history record, serialized with ProtoBuf
  val input = new HBaseSource("hand", args("hbasehost"), 'handId, Array("d"), Array('blob))
  // output tap - plain text file with player IDs
  val output = TextLine(args("output"))
 
  input
    .read
    .flatMap('blob -> 'player) {
    // every hand history record contains the list of players, participated in the hand
    blob: Array[Byte] => // at the first stage, we simply extract the list of IDs, and add it to the flat list
      HandHistory.parseFrom(blob).getPlayerList.map(userId)
  }
    .unique('player) // remove duplicate user IDs
    .project('player) // leave only 'player column from the tuple
    .write(output)
 
}
1
2
3
4
5
6
1003
1004
1005
1006
1007
...

Этап 2. Добавление индексов в словарь

Во-вторых, мы сопоставляем идентификаторы пользователей с позицией / индексом игрока в векторе.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
class Indexer(args: Args) extends Job(args) {
 
  val output = WritableSequenceFile(args("output"), classOf[Text], classOf[IntWritable],
    'userId -> 'idx)
 
  TextLine(args("input")).read
    .map(('offset -> 'line) -> ('userId -> 'idx)) {
    // dictionary lines are read with indices from TextLine source
    // out of the box. For some reason, in my case, indices were multiplied by 5, so I have had to divide them
    tuple: (Int, String) => (new Text(tuple._2.toString) -> new IntWritable((tuple._1 / 5)))
  }
    .project(('userId -> 'idx)) // only userId -> index tuple is passed to the output
    .write(output)
 
}
1
2
3
4
5
6
1003 0
1004 1
1005 2
1006 3
1007 4
...

Этап 3. Построение векторов

Мы строим векторы, которые будут переданы в качестве входа в алгоритм кластеризации K-средних. Как мы отмечали выше, каждая позиция в векторе соответствует другому игроку, с которым игрок играл:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
 * K-means clustering algorithm requires the input to be represented as vectors.
 * In out case, the vector, itself, represents the player, where other users, the player has played with, are
 * vector axises/features (the weigh of the feature is a number of games, played together)
 * User: remeniuk
 */
class VectorBuilder(args: Args) extends Job(args) {
 
  import Dictionary._
 
  // initializes dictionary pipe
  val dictionary = TextLine(args("dictionary"))
    .read
    .map(('offset -> 'line) -> ('userId -> 'dictionaryIdx)) {
    tuple: (Int, String) =>
      (tuple._2 -> tuple._1 / 5)
  }
    .project(('userId -> 'dictionaryIdx))
 
  val input = new HBaseSource("hand", args("hbasehost"), 'handId, Array("d"), Array('blob))
  val output = WritableSequenceFile(args("output"), classOf[Text], classOf[VectorWritable],
    'player1Id -> 'vector)
 
  input
    .read
    .flatMap('blob -> ('player1Id -> 'player2Id)) {
    //builds a flat list of pairs of users that player together
    blob: Array[Byte] =>
      val playerList = HandsHistoryCoreInternalDomain.HandHistory.parseFrom(blob).getPlayerList.map(userId)
      playerList.flatMap {
        playerId =>
          playerList.filterNot(_ == playerId).map(otherPlayerId => (playerId -> otherPlayerId.toString))
      }
  }
    .joinWithSmaller('player2Id -> 'userId, dictionary) // joins the list of pairs of //user that played together with
    // the dictionary, so that the second member of the tuple (ID of the second //player) is replaced with th index
    //in the dictionary
    .groupBy('player1Id -> 'dictionaryIdx) {
    group => group.size // groups pairs of players, played together, counting the number of hands
  }
    .map(('player1Id, 'dictionaryIdx, 'size) ->('playerId, 'partialVector)) {
    tuple: (String, Int, Int) =>
      val partialVector = new NamedVector(
        new SequentialAccessSparseVector(args("dictionarySize").toInt), tuple._1)
// turns a tuple of two users
      // into a vector with one feature
      partialVector.set(tuple._2, tuple._3)
      (new Text(tuple._1), new VectorWritable(partialVector))
  }
    .groupBy('player1Id) {
    // combines partial vectors into one vector that represents the number of hands, //played with other players
    group => group.reduce('partialVector -> 'vector) {
      (left: VectorWritable, right: VectorWritable) =>
        new VectorWritable(left.get.plus(right.get))
    }
  }
    .write(output)
 
}
1
2
3
4
5
6
1003 {3:5.0,5:4.0,6:4.0,9:4.0}
1004 {2:4.0,4:4.0,8:4.0,37:4.0}
1005 {1:4.0,4:5.0,8:4.0,37:4.0}
1006 {0:5.0,5:4.0,6:4.0,9:4.0}
1007 {1:4.0,2:5.0,8:4.0,37:4.0}
...

Весь рабочий процесс, необходимый для векторизации ввода:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
val conf = new Configuration
conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
  + "org.apache.hadoop.io.serializer.WritableSerialization")
 
// the path, where the vectors will be stored to
val vectorsPath = new Path("job/vectors")
// enumeration of all users involved in a selected subset of hand history records
val dictionaryPath = new Path("job/dictionary")
// text file with the dictionary size
val dictionarySizePath = new Path("job/dictionary-size")
// indexed dictionary (every user ID in the dictionary is mapped to an index, from 0)
val indexedDictionaryPath = new Path("job/indexed-dictionary")
 
println("Building dictionary...")
// extracts IDs of all the users, participating in selected subset of hand history records
Tool.main(Array(classOf[Dictionary.Builder].getName, "--hdfs",
  "--hbasehost", "localhost", "--output", dictionaryPath.toString))
// adds index to the dictionary
Tool.main(Array(classOf[Dictionary.Indexer].getName, "--hdfs",
  "--input", dictionaryPath.toString, "--output", indexedDictionaryPath.toString))
// calculates dictionary size, and stores it to the FS
Tool.main(Array(classOf[Dictionary.Size].getName, "--hdfs",
  "--input", dictionaryPath.toString, "--output", dictionarySizePath.toString))
 
// reads dictionary size
val fs = FileSystem.get(dictionaryPath.toUri, conf)
val dictionarySize = new BufferedReader(
  new InputStreamReader(
    fs.open(new Path(dictionarySizePath, "part-00000"))
  )).readLine().toInt
 
println("Vectorizing...")
// builds vectors (player -> other players in the game)
// IDs of other players (in the vectors) are replaces with indices, taken from dictionary
Tool.main(Array(classOf[VectorBuilder].getName, "--hdfs",
  "--dictionary", dictionaryPath.toString, "--hbasehost", "localhost",
  "--output", vectorsPath.toString, "--dictionarySize", dictionarySize.toString))

Этап 4. Генерация n-случайных кластеров

Случайные кластеры / центроиды — это точка входа для алгоритма K-средних:

1
2
3
4
5
6
7
//randomly selected cluster the will be passed as an input to K-means
     val inputClustersPath = new Path('jobinput-clusters')
     val distanceMeasure = new EuclideanDistanceMeasure
  
     println('Making random seeds...')
      //build 30 initial random clusterscentroids
     RandomSeedGenerator.buildRandom(conf, vectorsPath, inputClustersPath, 30, distanceMeasure)

Этап 5. Запуск алгоритмов K-средних

На каждой следующей итерации K-means будет находить лучшие центроиды и кластеры. В результате мы имеем 30 групп игроков, которые чаще всего играют друг с другом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
// clusterization results
val outputClustersPath = new Path("job/output-clusters")
// textual dump of clusterization results
val dumpPath = "job/dump"
 
println("Running K-means...")
// runs K-means algorithm with up to 20 iterations, to find clusters of colluding players (assumption of collusion is
// made on the basis of number hand player together with other player[s])
KMeansDriver.run(conf, vectorsPath, inputClustersPath, outputClustersPath,
  new CosineDistanceMeasure(), 0.01, 20, true, 0, false)
 
println("Printing results...")
 
// dumps clusters to a text file
val clusterizationResult = finalClusterPath(conf, outputClustersPath, 20)
val clusteredPoints = new Path(outputClustersPath, "clusteredPoints")
val clusterDumper = new ClusterDumper(clusterizationResult, clusteredPoints)
clusterDumper.setNumTopFeatures(10)
clusterDumper.setOutputFile(dumpPath)
clusterDumper.setTermDictionary(new Path(indexedDictionaryPath, "part-00000").toString,
  "sequencefile")
clusterDumper.printClusters(null)

Полученные результаты

Теперь перейдем к «job / dump» — этот файл содержит текстовые дампы всех кластеров, сгенерированных K-means. Вот небольшой фрагмент файла:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
VL-0{n=5 c=[1003:3.400, 1006:3.400, 1008:3.200, 1009:3.200, 1012:3.200] r=[1003:1.744, 1006:1.744, 1008:1.600, 1009:1.600, 1012:1.600]}
 Top Terms:
  1006                                    =>                 3.4
  1003                                    =>                 3.4
  1012                                    =>                 3.2
  1009                                    =>                 3.2
  1008                                    =>                 3.2
 
VL-15{n=1 c=[1016:4.000, 1019:3.000, 1020:3.000, 1021:3.000, 1022:3.000, 1023:3.000, 1024:3.000, 1025:3.000] r=[]}
 Top Terms:
  1016                                    =>                 4.0
  1025                                    =>                 3.0
  1024                                    =>                 3.0
  1023                                    =>                 3.0
  1022                                    =>                 3.0
  1021                                    =>                 3.0
  1020                                    =>                 3.0
  1019                                    =>                 3.0

Как мы видим, было обнаружено 2 кластера игроков: один с 8 игроками, который провел много игр друг с другом, а второй с 4 игроками.

Ссылка: Обнаружение сговора в покере с Mahout и Scalding от нашего партнера JCG Василя Ременюка в блоге Васила Ременюка .