Авторы книги, где использовался известный алгоритм кластеризации K-средних для поиска похожих игроков на stackoverflow.com , где критерием сходства был набор авторов вопросов / ответов, которые пользователи поднимали / понижали.
Одним словом, алгоритм K-средних итеративно находит кластеры точек / векторов, расположенных близко друг к другу, в многомерном пространстве. Применительно к проблеме поиска похожих игроков в StackOverflow, мы предполагаем, что каждая ось в многомерном пространстве является пользователем, где расстояние от нуля является суммой очков, присуждаемых вопросам / ответам, заданным другими игроками (теми, кто Размеры также часто называют «объектами», где расстояние — «вес объекта»).
Очевидно, что тот же подход может быть применен к одной из самых сложных проблем в многопользовательском онлайн-покере — обнаружению сговора. Мы делаем простое предположение, что если два или более игрока сыграли слишком много игр друг с другом (принимая во внимание, что любой из игроков мог просто быть активным игроком, который играл с кем-либо во многих играх), они могли бы быть в сговоре.
Мы разбиваем огромную группу игроков на маленькие узкие кластеры (желательно по 2-8 игроков в каждой), используя алгоритм кластеризации K-средних. В базовой реализации, которую мы пройдем дальше, каждый пользователь представлен в виде вектора, где осями являются другие игроки, с которыми он играл (а вес функции — это количество игр, сыгранных вместе).
Этап 1. Создание словаря
В качестве первого шага нам нужно создать словарь / список всех игроков, вовлеченных в поднабор истории рук, который мы анализируем:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// extract user ID from hand history record val userId = (playerHistory : PlayerHandHistory) = > new Text(playerHistory.getUserId.toString) // Builds basic dixtionary (enumeration, in fact) of all the players, participated in the selected subset of hand // history records class Builder(args : Args) extends Job(args) { // input tap is an HTable with hand history entries: hand history id -> hand history record, serialized with ProtoBuf val input = new HBaseSource( "hand" , args( "hbasehost" ), 'handId, Array("d"), Array(' blob)) // output tap - plain text file with player IDs val output = TextLine(args( "output" )) input .read .flatMap( 'blob -> ' player) { // every hand history record contains the list of players, participated in the hand blob : Array[Byte] = > // at the first stage, we simply extract the list of IDs, and add it to the flat list HandHistory.parseFrom(blob).getPlayerList.map(userId) } .unique( 'player) // remove duplicate user IDs .project(' player) // leave only 'player column from the tuple .write(output) } |
1
2
3
4
5
6
|
1003 1004 1005 1006 1007 ... |
Этап 2. Добавление индексов в словарь
Во-вторых, мы сопоставляем идентификаторы пользователей с позицией / индексом игрока в векторе.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
class Indexer(args : Args) extends Job(args) { val output = WritableSequenceFile(args( "output" ), classOf[Text], classOf[IntWritable], 'userId -> ' idx) TextLine(args( "input" )).read .map(( 'offset -> ' line) -> ( 'userId -> ' idx)) { // dictionary lines are read with indices from TextLine source // out of the box. For some reason, in my case, indices were multiplied by 5, so I have had to divide them tuple : (Int, String) = > ( new Text(tuple. _ 2 .toString) -> new IntWritable((tuple. _ 1 / 5 ))) } .project(( 'userId -> ' idx)) // only userId -> index tuple is passed to the output .write(output) } |
1
2
3
4
5
6
|
1003 0 1004 1 1005 2 1006 3 1007 4 ... |
Этап 3. Построение векторов
Мы строим векторы, которые будут переданы в качестве входа в алгоритм кластеризации K-средних. Как мы отмечали выше, каждая позиция в векторе соответствует другому игроку, с которым игрок играл:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
/** * K-means clustering algorithm requires the input to be represented as vectors. * In out case, the vector, itself, represents the player, where other users, the player has played with, are * vector axises/features (the weigh of the feature is a number of games, played together) * User: remeniuk */ class VectorBuilder(args : Args) extends Job(args) { import Dictionary. _ // initializes dictionary pipe val dictionary = TextLine(args( "dictionary" )) .read .map(( 'offset -> ' line) -> ( 'userId -> ' dictionaryIdx)) { tuple : (Int, String) = > (tuple. _ 2 -> tuple. _ 1 / 5 ) } .project(( 'userId -> ' dictionaryIdx)) val input = new HBaseSource( "hand" , args( "hbasehost" ), 'handId, Array("d"), Array(' blob)) val output = WritableSequenceFile(args( "output" ), classOf[Text], classOf[VectorWritable], 'player1Id -> ' vector) input .read .flatMap( 'blob -> (' player 1 Id -> 'player2Id)) { //builds a flat list of pairs of users that player together blob: Array[Byte] => val playerList = HandsHistoryCoreInternalDomain.HandHistory.parseFrom(blob).getPlayerList.map(userId) playerList.flatMap { playerId => playerList.filterNot(_ == playerId).map(otherPlayerId => (playerId -> otherPlayerId.toString)) } } .joinWithSmaller(' player 2 Id -> 'userId, dictionary) // joins the list of pairs of //user that played together with // the dictionary, so that the second member of the tuple (ID of the second //player) is replaced with th index //in the dictionary .groupBy(' player 1 Id -> 'dictionaryIdx) { group => group.size // groups pairs of players, played together, counting the number of hands } .map((' player 1 Id, 'dictionaryIdx, ' size) ->( 'playerId, ' partialVector)) { tuple : (String, Int, Int) = > val partialVector = new NamedVector( new SequentialAccessSparseVector(args( "dictionarySize" ).toInt), tuple. _ 1 ) // turns a tuple of two users // into a vector with one feature partialVector.set(tuple. _ 2 , tuple. _ 3 ) ( new Text(tuple. _ 1 ), new VectorWritable(partialVector)) } .groupBy( 'player1Id) { // combines partial vectors into one vector that represents the number of hands, //played with other players group => group.reduce(' partialVector -> 'vector) { (left : VectorWritable, right : VectorWritable) = > new VectorWritable(left.get.plus(right.get)) } } .write(output) } |
1
2
3
4
5
6
|
1003 { 3 : 5.0 , 5 : 4.0 , 6 : 4.0 , 9 : 4.0 } 1004 { 2 : 4.0 , 4 : 4.0 , 8 : 4.0 , 37 : 4.0 } 1005 { 1 : 4.0 , 4 : 5.0 , 8 : 4.0 , 37 : 4.0 } 1006 { 0 : 5.0 , 5 : 4.0 , 6 : 4.0 , 9 : 4.0 } 1007 { 1 : 4.0 , 2 : 5.0 , 8 : 4.0 , 37 : 4.0 } ... |
Весь рабочий процесс, необходимый для векторизации ввода:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
val conf = new Configuration conf.set( "io.serializations" , "org.apache.hadoop.io.serializer.JavaSerialization," + "org.apache.hadoop.io.serializer.WritableSerialization" ) // the path, where the vectors will be stored to val vectorsPath = new Path( "job/vectors" ) // enumeration of all users involved in a selected subset of hand history records val dictionaryPath = new Path( "job/dictionary" ) // text file with the dictionary size val dictionarySizePath = new Path( "job/dictionary-size" ) // indexed dictionary (every user ID in the dictionary is mapped to an index, from 0) val indexedDictionaryPath = new Path( "job/indexed-dictionary" ) println( "Building dictionary..." ) // extracts IDs of all the users, participating in selected subset of hand history records Tool.main(Array(classOf[Dictionary.Builder].getName, "--hdfs" , "--hbasehost" , "localhost" , "--output" , dictionaryPath.toString)) // adds index to the dictionary Tool.main(Array(classOf[Dictionary.Indexer].getName, "--hdfs" , "--input" , dictionaryPath.toString, "--output" , indexedDictionaryPath.toString)) // calculates dictionary size, and stores it to the FS Tool.main(Array(classOf[Dictionary.Size].getName, "--hdfs" , "--input" , dictionaryPath.toString, "--output" , dictionarySizePath.toString)) // reads dictionary size val fs = FileSystem.get(dictionaryPath.toUri, conf) val dictionarySize = new BufferedReader( new InputStreamReader( fs.open( new Path(dictionarySizePath, "part-00000" )) )).readLine().toInt println( "Vectorizing..." ) // builds vectors (player -> other players in the game) // IDs of other players (in the vectors) are replaces with indices, taken from dictionary Tool.main(Array(classOf[VectorBuilder].getName, "--hdfs" , "--dictionary" , dictionaryPath.toString, "--hbasehost" , "localhost" , "--output" , vectorsPath.toString, "--dictionarySize" , dictionarySize.toString)) |
Этап 4. Генерация n-случайных кластеров
Случайные кластеры / центроиды — это точка входа для алгоритма K-средних:
1
2
3
4
5
6
7
|
//randomly selected cluster the will be passed as an input to K-means val inputClustersPath = new Path( 'jobinput-clusters' ) val distanceMeasure = new EuclideanDistanceMeasure println( 'Making random seeds...' ) //build 30 initial random clusterscentroids RandomSeedGenerator.buildRandom(conf, vectorsPath, inputClustersPath, 30 , distanceMeasure) |
Этап 5. Запуск алгоритмов K-средних
На каждой следующей итерации K-means будет находить лучшие центроиды и кластеры. В результате мы имеем 30 групп игроков, которые чаще всего играют друг с другом:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// clusterization results val outputClustersPath = new Path( "job/output-clusters" ) // textual dump of clusterization results val dumpPath = "job/dump" println( "Running K-means..." ) // runs K-means algorithm with up to 20 iterations, to find clusters of colluding players (assumption of collusion is // made on the basis of number hand player together with other player[s]) KMeansDriver.run(conf, vectorsPath, inputClustersPath, outputClustersPath, new CosineDistanceMeasure(), 0.01 , 20 , true , 0 , false ) println( "Printing results..." ) // dumps clusters to a text file val clusterizationResult = finalClusterPath(conf, outputClustersPath, 20 ) val clusteredPoints = new Path(outputClustersPath, "clusteredPoints" ) val clusterDumper = new ClusterDumper(clusterizationResult, clusteredPoints) clusterDumper.setNumTopFeatures( 10 ) clusterDumper.setOutputFile(dumpPath) clusterDumper.setTermDictionary( new Path(indexedDictionaryPath, "part-00000" ).toString, "sequencefile" ) clusterDumper.printClusters( null ) |
Полученные результаты
Теперь перейдем к «job / dump» — этот файл содержит текстовые дампы всех кластеров, сгенерированных K-means. Вот небольшой фрагмент файла:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
VL-0{n=5 c=[1003:3.400, 1006:3.400, 1008:3.200, 1009:3.200, 1012:3.200] r=[1003:1.744, 1006:1.744, 1008:1.600, 1009:1.600, 1012:1.600]} Top Terms: 1006 => 3.4 1003 => 3.4 1012 => 3.2 1009 => 3.2 1008 => 3.2 VL-15{n=1 c=[1016:4.000, 1019:3.000, 1020:3.000, 1021:3.000, 1022:3.000, 1023:3.000, 1024:3.000, 1025:3.000] r=[]} Top Terms: 1016 => 4.0 1025 => 3.0 1024 => 3.0 1023 => 3.0 1022 => 3.0 1021 => 3.0 1020 => 3.0 1019 => 3.0 |
Как мы видим, было обнаружено 2 кластера игроков: один с 8 игроками, который провел много игр друг с другом, а второй с 4 игроками.
Ссылка: Обнаружение сговора в покере с Mahout и Scalding от нашего партнера JCG Василя Ременюка в блоге Васила Ременюка .