Статьи

Как я получил данные Twitter на SQL Server

Я смотрел на то, как можно было бы перенести данные из Twitter в SQL Server.

Вы можете спросить, почему ????

А почему бы не ? Это скорее упражнение в том, как это можно сделать с помощью доступных инструментов.

Я прошел несколько шагов, и я почти уверен, что может быть лучше, и если вы можете подумать о каких-либо улучшениях, не стесняйтесь использовать раздел комментариев ниже.

Шаг 1 — Получение твитов

Для начала нам нужно получить данные из Twitter. Существует множество способов сделать это, однако самый простой способ, который я нашел, — это использовать продукт под названием cURL (доступен здесь: http://curl.haxx.se/download.html ).

Я видел это, на которое ссылались при исследовании Microsoft Hadoop на сайте Azure ( https://www.windowsazure.com/en-us/develop/net/tutorials/hadoop-social-web-data/ ), который использовался для извлечения данных для подачи в базу данных Hive.

Получение данных из Твиттера с помощью cURL состоит из трех частей.

Часть 1 — Получить CURL, вы можете скачать это, используя ссылку выше. Я использовал версию бинарного SSL Win64.

Часть 2. Создание файла параметров. Как показано в приведенной выше ссылке MS, файл параметров действует как фильтр для получения нужных данных из ленты Twitter. Хотя можно фильтровать данные по хештегам, я хотел получить более обобщенный набор данных. Для этого я поместил следующий фильтр в файл параметров. Это эффективно фильтрует данные по любым твитам с геотегами.

locations=-180,-90,180,90

Часть 3. Создание командного файла для запуска задания. Созданный командный файл фактически такой же, как тот, на который ссылается ссылка MS. Файл называется GetTwitterStream.cmd и содержит следующий текст. Вам нужно заменить <twitterusername> и <twitterpassword> своими учетными данными в твиттере.

curl -d @twitter_params.txt -k https://stream.twitter.com/1/statuses/filter.json –u<twitterusername>:<twitterpassword> >>twitter_stream_seq.txt

Когда вы запускаете файл GetTwitterStream.cmd, он запускает cURL и начинает получать данные из общедоступного потокового API Twitter, как показано ниже.

образ

Это дает нам файл, содержащий канал JSON из Twitter.

Шаг 2. Загрузка данных JSON в Twitter в SQL

Далее нам нужно получить данные JSON из Twitter в SQL. Для этого я создал таблицу Load со следующей структурой:

CREATE TABLE [dbo].[TweetJSON](
[JSONData] [varchar](8000) NULL,
[ID] [int] IDENTITY(1,1) NOT NULL,
[Processed] [char](1) NULL
) ON [PRIMARY]

Затем мы можем загрузить файл JSON, созданный из cURL на шаге 1, используя BULK INSERT . Для этого нам нужен файл формата , показанный ниже и называемый BIFormatFile.txt

9.0
1
1 SQLCHAR 0 8000 “\r\n” 1 [JSONData] “”

Затем данные могут быть загружены с помощью задачи «Массовая вставка»:

BULK INSERT [dbo].[TweetJSON]
from ‘c:\BigData\TwitterData\twitter_stream_seq.txt’
with (CODEPAGE=’RAW’, FORMATFILE=’C:\BigData\twitterdata\BIFormatFile.txt’)

Итак, теперь у нас есть таблица с данными JSON и столбец Identity, чтобы дать нам идентификатор, на который мы можем ссылаться.

Шаг 3 — Разбор JSON

Фил Фактор написал отличную статью (здесь http://www.simple-talk.com/sql/t-sql-programming/consuming-json-strings-in-sql-server/ ), которая посвящена анализу JSON в T- SQL. Я использовал функцию parseJSON из этой статьи, чтобы извлечь необходимые поля из таблицы Load.

Я создал промежуточный стол:

CREATE TABLE [dbo].[TweetJSONStaging](
    [Country] [varchar](200) NULL,
    [id_str] [varchar](200) NULL,
    [followers_count] [int] NULL,
    [profile_image_url] [varchar](200) NULL,
    [statuses_count] [int] NULL,
    [profile_background_image_url] [varchar](200) NULL,
    [created_at] [datetime] NULL,
    [friends_count] [int] NULL,
    [location] [varchar](200) NULL,
    [name] [varchar](200) NULL,
    [lang] [varchar](200) NULL,
    [screen_name] [varchar](200) NULL,

    [varchar](200) NULL,
    [geo_lat] [varchar](200) NULL,
    [geo_long] [varchar](200) NULL
    ) ON [Staging]

Затем использовал следующий процесс, чтобы перебрать данные и получить их в нужном формате. Далее следует процесс создания курсора (я вернусь к этому через минуту) с изменяемыми записями и вызова против него функции ParseJSON, чтобы разделить поля, а затем получить нужные поля и вставить их в стол. Затем мы устанавливаем флаг Обработано и повторяем процесс до тех пор, пока не останется больше записей для обработки.

declare @JSON NVARCHAR(MAX), @ID int

    declare jsCursor CURSOR FOR
    select JSONData, ID from tweetJson where Processed is null

    open jsCursor

    FETCH NEXT from jsCursor into @JSON, @ID
    while @@FETCH_STATUS=0
    BEGIN
    begin try
    insert into TweetJSONStaging ( Country, id_str, followers_count,
    profile_image_url,statuses_count,profile_background_image_url,created_at,
    friends_count,location,name,lang, screen_name, source, geo_lat, geo_long)
    select
    max(case when NAME=’country’ then StringValue end) as Country,
    max(case when NAME=’id_str’ then StringValue end) as id_str,
    max(case when NAME=’followers_count’ then convert (int,StringValue) end)
    as followers_count,
    max(case when NAME=’profile_image_url’ then StringValue end)
    as profile_image_url,
    max(case when NAME=’statuses_count’ then convert(int,StringValue) end)
    as statuses_count,
    max(case when NAME=’profile_background_image_url’ then StringValue end)
    as profile_background_image_url,
    max(case when NAME=’created_at’ then convert(datetime,
    (substring (StringValue,9,2)+’ ‘+substring (StringValue,5,3)+’ ‘+
    substring (StringValue,27,4) +’ ‘+substring (StringValue,12,2) +’:'+
    substring (StringValue,15,2)+’:'+substring (StringValue,18,2) ) ) end)
    as created_at,
    max(case when NAME=’friends_count’ then convert(int,StringValue) end)
    as friends_count,
    max(case when NAME=’location’ then StringValue end) as location,
    max(case when NAME=’name’ then StringValue end) as name,
    max(case when NAME=’lang’ then StringValue end) as lang,
    max(case when NAME=’screen_name’ then StringValue end) as screen_name,
    max(case when NAME=’source’ then StringValue end) as source,
    max(case when element_id=’1′ then StringValue end) as geo_lat,
    max(case when element_id=’2′ then StringValue end) as geo_long
    from dbo.parseJSON( @JSON)

    update tweetJSON
    set Processed = ‘Y’
    where ID=@ID

    end try
    begin catch
    update tweetJSON
    set Processed = ‘X’
    where ID=@ID
    end catch
    FETCH NEXT from jsCursor into @JSON, @ID

    end
    close jsCursor
    deallocate jsCursor

Чтобы этот процесс мог выполняться за разумное время, я создал пару индексов для таблицы загрузки (tweetJSON). Индексы находятся в поле идентификатора (Clustered Index) и во флаге Processed.

CREATE UNIQUE CLUSTERED INDEX CI_ID ON [dbo].[TweetJSON]
    ( [ID] ASC ) ON [PRIMARY]

    CREATE NONCLUSTERED INDEX NCI_Processed ON [dbo].[TweetJSON]
    ( [Processed] ASC ) ON [PRIMARY]

Запуск этого процесса занял ок. 26 секунд для загрузки 1000 записей, поэтому ок. 38 записей в секунду.

Итак, я подумал, что попробую это с условием «В то время как», а не с курсором, и, что интересно, на 1000 записей потребовалось столько же времени для запуска.

Обновление: как было сказано Дейвом Баллантайном ( @davebally ), это показывает, что предложение While фактически делает то же самое, что и Cursor, поскольку процесс по-прежнему работает над записями одна за другой. (Дополнительную информацию можно найти здесь ).

declare @JSON varchar(8000), @ID int, @count int

    while 1=1
    BEGIN
    select top 1 @JSON = JSONData, @ID=ID from tweetJson where Processed =’N’
    begin try
    insert into TweetJSONStaging ( Country, id_str, followers_count,
    profile_image_url,statuses_count,profile_background_image_url,created_at,
    friends_count,location,name,lang, screen_name, source, geo_lat, geo_long)
    select
    max(case when NAME=’country’ then StringValue end) as Country,
    max(case when NAME=’id_str’ then StringValue end) as id_str,
    max(case when NAME=’followers_count’ then convert (int,StringValue) end)
    as followers_count,
    max(case when NAME=’profile_image_url’ then StringValue end)
    as profile_image_url,
    max(case when NAME=’statuses_count’ then convert(int,StringValue) end)
    as statuses_count,
    max(case when NAME=’profile_background_image_url’ then StringValue end)
    as profile_background_image_url,
    max(case when NAME=’created_at’ then convert(datetime,
    (substring (StringValue,9,2)+’ ‘+substring (StringValue,5,3)+’ ‘+
    substring (StringValue,27,4) +’ ‘+substring (StringValue,12,2) +’:'+
    substring (StringValue,15,2)+’:'+substring (StringValue,18,2) ) ) end)
    as created_at,
    max(case when NAME=’friends_count’ then convert(int,StringValue) end)
    as friends_count,
    max(case when NAME=’location’ then StringValue end) as location,
    max(case when NAME=’name’ then StringValue end) as name,
    max(case when NAME=’lang’ then StringValue end) as lang,
    max(case when NAME=’screen_name’ then StringValue end) as screen_name,
    max(case when NAME=’source’ then StringValue end) as source,
    max(case when element_id=’1′ then StringValue end) as geo_lat,
    max(case when element_id=’2′ then StringValue end) as geo_long
    from dbo.parseJSON( @JSON)

    update tweetJSON
    set Processed = ‘Y’
    where ID=@ID

    end try
    begin catch
    update tweetJSON
    set Processed = ‘X’
    where ID=@ID
    end catch

    select @count=count(1) from tweetJson where Processed =’N’

    if @count=0
    break
    else
    continue
    end

Спасибо за чтение! Я добавлю обновление, когда внесу изменения, чтобы сделать его более производительным.