Я смотрел на то, как можно было бы перенести данные из Twitter в SQL Server.
Вы можете спросить, почему ????
А почему бы не ? Это скорее упражнение в том, как это можно сделать с помощью доступных инструментов.
Я прошел несколько шагов, и я почти уверен, что может быть лучше, и если вы можете подумать о каких-либо улучшениях, не стесняйтесь использовать раздел комментариев ниже.
Шаг 1 — Получение твитов
Для начала нам нужно получить данные из Twitter. Существует множество способов сделать это, однако самый простой способ, который я нашел, — это использовать продукт под названием cURL (доступен здесь: http://curl.haxx.se/download.html ).
Я видел это, на которое ссылались при исследовании Microsoft Hadoop на сайте Azure ( https://www.windowsazure.com/en-us/develop/net/tutorials/hadoop-social-web-data/ ), который использовался для извлечения данных для подачи в базу данных Hive.
Получение данных из Твиттера с помощью cURL состоит из трех частей.
Часть 1 — Получить CURL, вы можете скачать это, используя ссылку выше. Я использовал версию бинарного SSL Win64.
Часть 2. Создание файла параметров. Как показано в приведенной выше ссылке MS, файл параметров действует как фильтр для получения нужных данных из ленты Twitter. Хотя можно фильтровать данные по хештегам, я хотел получить более обобщенный набор данных. Для этого я поместил следующий фильтр в файл параметров. Это эффективно фильтрует данные по любым твитам с геотегами.
locations=-180,-90,180,90
Часть 3. Создание командного файла для запуска задания. Созданный командный файл фактически такой же, как тот, на который ссылается ссылка MS. Файл называется GetTwitterStream.cmd и содержит следующий текст. Вам нужно заменить <twitterusername> и <twitterpassword> своими учетными данными в твиттере.
curl -d @twitter_params.txt -k https://stream.twitter.com/1/statuses/filter.json –u<twitterusername>:<twitterpassword> >>twitter_stream_seq.txt
Когда вы запускаете файл GetTwitterStream.cmd, он запускает cURL и начинает получать данные из общедоступного потокового API Twitter, как показано ниже.
Это дает нам файл, содержащий канал JSON из Twitter.
Шаг 2. Загрузка данных JSON в Twitter в SQL
Далее нам нужно получить данные JSON из Twitter в SQL. Для этого я создал таблицу Load со следующей структурой:
CREATE TABLE [dbo].[TweetJSON]( [JSONData] [varchar](8000) NULL, [ID] [int] IDENTITY(1,1) NOT NULL, [Processed] [char](1) NULL ) ON [PRIMARY]
Затем мы можем загрузить файл JSON, созданный из cURL на шаге 1, используя BULK INSERT . Для этого нам нужен файл формата , показанный ниже и называемый BIFormatFile.txt
9.0 1 1 SQLCHAR 0 8000 “\r\n” 1 [JSONData] “”
Затем данные могут быть загружены с помощью задачи «Массовая вставка»:
BULK INSERT [dbo].[TweetJSON] from ‘c:\BigData\TwitterData\twitter_stream_seq.txt’ with (CODEPAGE=’RAW’, FORMATFILE=’C:\BigData\twitterdata\BIFormatFile.txt’)
Итак, теперь у нас есть таблица с данными JSON и столбец Identity, чтобы дать нам идентификатор, на который мы можем ссылаться.
Шаг 3 — Разбор JSON
Фил Фактор написал отличную статью (здесь http://www.simple-talk.com/sql/t-sql-programming/consuming-json-strings-in-sql-server/ ), которая посвящена анализу JSON в T- SQL. Я использовал функцию parseJSON из этой статьи, чтобы извлечь необходимые поля из таблицы Load.
Я создал промежуточный стол:
CREATE TABLE [dbo].[TweetJSONStaging]( [Country] [varchar](200) NULL, [id_str] [varchar](200) NULL, [followers_count] [int] NULL, [profile_image_url] [varchar](200) NULL, [statuses_count] [int] NULL, [profile_background_image_url] [varchar](200) NULL, [created_at] [datetime] NULL, [friends_count] [int] NULL, [location] [varchar](200) NULL, [name] [varchar](200) NULL, [lang] [varchar](200) NULL, [screen_name] [varchar](200) NULL, [varchar](200) NULL, [geo_lat] [varchar](200) NULL, [geo_long] [varchar](200) NULL ) ON [Staging]
Затем использовал следующий процесс, чтобы перебрать данные и получить их в нужном формате. Далее следует процесс создания курсора (я вернусь к этому через минуту) с изменяемыми записями и вызова против него функции ParseJSON, чтобы разделить поля, а затем получить нужные поля и вставить их в стол. Затем мы устанавливаем флаг Обработано и повторяем процесс до тех пор, пока не останется больше записей для обработки.
declare @JSON NVARCHAR(MAX), @ID int declare jsCursor CURSOR FOR select JSONData, ID from tweetJson where Processed is null open jsCursor FETCH NEXT from jsCursor into @JSON, @ID while @@FETCH_STATUS=0 BEGIN begin try insert into TweetJSONStaging ( Country, id_str, followers_count, profile_image_url,statuses_count,profile_background_image_url,created_at, friends_count,location,name,lang, screen_name, source, geo_lat, geo_long) select max(case when NAME=’country’ then StringValue end) as Country, max(case when NAME=’id_str’ then StringValue end) as id_str, max(case when NAME=’followers_count’ then convert (int,StringValue) end) as followers_count, max(case when NAME=’profile_image_url’ then StringValue end) as profile_image_url, max(case when NAME=’statuses_count’ then convert(int,StringValue) end) as statuses_count, max(case when NAME=’profile_background_image_url’ then StringValue end) as profile_background_image_url, max(case when NAME=’created_at’ then convert(datetime, (substring (StringValue,9,2)+’ ‘+substring (StringValue,5,3)+’ ‘+ substring (StringValue,27,4) +’ ‘+substring (StringValue,12,2) +’:'+ substring (StringValue,15,2)+’:'+substring (StringValue,18,2) ) ) end) as created_at, max(case when NAME=’friends_count’ then convert(int,StringValue) end) as friends_count, max(case when NAME=’location’ then StringValue end) as location, max(case when NAME=’name’ then StringValue end) as name, max(case when NAME=’lang’ then StringValue end) as lang, max(case when NAME=’screen_name’ then StringValue end) as screen_name, max(case when NAME=’source’ then StringValue end) as source, max(case when element_id=’1′ then StringValue end) as geo_lat, max(case when element_id=’2′ then StringValue end) as geo_long from dbo.parseJSON( @JSON) update tweetJSON set Processed = ‘Y’ where ID=@ID end try begin catch update tweetJSON set Processed = ‘X’ where ID=@ID end catch FETCH NEXT from jsCursor into @JSON, @ID end close jsCursor deallocate jsCursor
Чтобы этот процесс мог выполняться за разумное время, я создал пару индексов для таблицы загрузки (tweetJSON). Индексы находятся в поле идентификатора (Clustered Index) и во флаге Processed.
CREATE UNIQUE CLUSTERED INDEX CI_ID ON [dbo].[TweetJSON] ( [ID] ASC ) ON [PRIMARY] CREATE NONCLUSTERED INDEX NCI_Processed ON [dbo].[TweetJSON] ( [Processed] ASC ) ON [PRIMARY]
Запуск этого процесса занял ок. 26 секунд для загрузки 1000 записей, поэтому ок. 38 записей в секунду.
Итак, я подумал, что попробую это с условием «В то время как», а не с курсором, и, что интересно, на 1000 записей потребовалось столько же времени для запуска.
Обновление: как было сказано Дейвом Баллантайном ( @davebally ), это показывает, что предложение While фактически делает то же самое, что и Cursor, поскольку процесс по-прежнему работает над записями одна за другой. (Дополнительную информацию можно найти здесь ).
declare @JSON varchar(8000), @ID int, @count int while 1=1 BEGIN select top 1 @JSON = JSONData, @ID=ID from tweetJson where Processed =’N’ begin try insert into TweetJSONStaging ( Country, id_str, followers_count, profile_image_url,statuses_count,profile_background_image_url,created_at, friends_count,location,name,lang, screen_name, source, geo_lat, geo_long) select max(case when NAME=’country’ then StringValue end) as Country, max(case when NAME=’id_str’ then StringValue end) as id_str, max(case when NAME=’followers_count’ then convert (int,StringValue) end) as followers_count, max(case when NAME=’profile_image_url’ then StringValue end) as profile_image_url, max(case when NAME=’statuses_count’ then convert(int,StringValue) end) as statuses_count, max(case when NAME=’profile_background_image_url’ then StringValue end) as profile_background_image_url, max(case when NAME=’created_at’ then convert(datetime, (substring (StringValue,9,2)+’ ‘+substring (StringValue,5,3)+’ ‘+ substring (StringValue,27,4) +’ ‘+substring (StringValue,12,2) +’:'+ substring (StringValue,15,2)+’:'+substring (StringValue,18,2) ) ) end) as created_at, max(case when NAME=’friends_count’ then convert(int,StringValue) end) as friends_count, max(case when NAME=’location’ then StringValue end) as location, max(case when NAME=’name’ then StringValue end) as name, max(case when NAME=’lang’ then StringValue end) as lang, max(case when NAME=’screen_name’ then StringValue end) as screen_name, max(case when NAME=’source’ then StringValue end) as source, max(case when element_id=’1′ then StringValue end) as geo_lat, max(case when element_id=’2′ then StringValue end) as geo_long from dbo.parseJSON( @JSON) update tweetJSON set Processed = ‘Y’ where ID=@ID end try begin catch update tweetJSON set Processed = ‘X’ where ID=@ID end catch select @count=count(1) from tweetJson where Processed =’N’ if @count=0 break else continue end
Спасибо за чтение! Я добавлю обновление, когда внесу изменения, чтобы сделать его более производительным.