Статьи

Использование «sstable» Casandra для массовой загрузки данных в облаке


‘Sstableloader’, представленный начиная с Apache Cassandra 0.8.1 и далее, обеспечивает мощный способ загрузки огромных объемов данных в кластер Cassandra.
Если вы переходите от облачного кластера к выделенному кластеру или наоборот, или из другой базы данных в Cassandra, вас заинтересует этот инструмент. Как показано ниже, в любом случае, если вы можете сгенерировать ‘sstable’ из данных, загружаемых в Cassandra, вы можете загрузить его в кластер, используя ‘sstableloader’. Я попробовал это в версии 1.1.2 здесь.


В этой статье я поделюсь своим опытом, когда я создал sstables из файла .csv и загрузил его в экземпляр Cassandra, работающий на том же компьютере, который здесь действует как кластер.
  1. стабильное поколение
  2. Массовая загрузка Cassandra с помощью sstableloader
  3. Использование JMX

«стабильное» поколение


Для создания ‘SSTableSimpleUnsortedWriter’ файл ‘cassandra.yaml’ должен присутствовать в пути к классам.
В Intellij Idea вы можете сделать это в меню «Выполнить» -> «Редактировать конфигурации» -> «Приложение» -> «Конфигурация» -> «Параметры виртуальной машины». Там вы должны указать путь к cassandra.yaml следующим образом.
-Dcassandra-foreground -Dcassandra.config=file:///<path to/apache-cassandra-1.1.2/conf/cassandra.yaml> -ea -Xmx1G

Вот простой код для генерации sstables в соответствии с контекстом, который я пробовал, со ссылкой на документацию по Datastax. С помощью всего лишь нескольких модификаций вы можете использовать его. Я постараюсь объяснить код немного ниже.
 
SSTableSimpleUnsortedWriter eventWriter = new SSTableSimpleUnsortedWriter(directory, partitioner, keySpace, "Events", AsciiType.instance,null, 64);

Этот писатель не принимает никакого порядка в строках.
Вместо этого он буферизует строки в памяти и записывает их в отсортированном порядке. Вы можете определить порог для количества строк, подлежащих буферизации, чтобы избежать загрузки всего набора данных в памяти. Каждый раз, когда достигается порог, создается один sstable и буфер отдыхает.
каталог — каталог для записи sstables
Partitioner — стратегия для распределения данных по узлам. Я использовал RandomPartitioner, который использует хеш-значение MD5 для распространения данных. Этот
пост может помочь вам решить, что использовать в соответствии с вашим контекстом от RandomPartitioner и OrderPreservingPartitioner. Доступны еще два разделителя.
keySpace — имя Keyspace
«События» — название семейства столбцов
AsciiType.instance — компаратор семейства столбцов
null — для subComparator задано значение null, поскольку это не семейство суперколонок
64 — размер буфера в МБ. Это должно решаться в зависимости от контекста для достижения максимальной производительности.

С помощью следующего кода мы создаем строки и добавляем столбцы каждой строки в соответствии с записью, считанной из файла .csv. Что касается Cassandra wiki, то в одной строке может быть до 2 миллиардов столбцов, подобных этой.

 
eventWriter.newRow(uuid);
eventWriter.addColumn(bytes("sourceAdd"),bytes(entry.sourceAdd), timestamp);
eventWriter.addColumn(bytes("sourceChannelType"),bytes(entry.sourceChannelType), timestamp);

Статический вложенный класс CsvEntry используется для чтения только соответствующих полей из строки csv.

Как только вы запустите код, указывающий на файл csv, в каталоге, указанном вами как каталог, будет создан каталог. Внутри вы найдете нечто похожее на следующее, которое содержит созданные sstables.

Массовая загрузка Cassandra с помощью sstableloader

Внутри каталога bin Cassandra вы можете найти этот инструмент sstableloader. Вы можете запустить его через командную строку, указывая на сгенерированные выше sstables. Хорошее руководство по этому вопросу можно найти в
Datastax и в этом
блоге . Также вы можете напрямую использовать класс org.apache.cassandra.tools.Bulkloader в java-коде для загрузки sstables в кластер Cassandra.

Если вы тестируете все это в localhost, необходимо выполнить следующие шаги, чтобы попробовать sstableloader.

  • Получить копию работающего экземпляра Cassandra
  • Установите другой адрес обратной связи. В Linux вы можете сделать это, используя

sudo ifconfig lo: 0 127.0.0.2 маска сети 255.0.0.0 up

  • Установите адрес rpc и адрес прослушивания скопированного /conf/casandra.yaml равным 127.0.0.2. Конечно, вы можете установить адрес rpc на 0.0.0.0, если вы хотите слушать все интерфейсы.
  • Затем из скопированного Cassandra запуска sstableloader мы запускаем sstableloader из командной строки следующим образом

./sstableloader -d 127.0.0.2 <путь к сгенерированным sstables>

  • Следует отметить, что путь должен заканчиваться как / keyspace_name / columnfamily_name (например: ….. / CDRs / Events для приведенного выше снимка экрана)

Использование массовой загрузки JMX


Вы также можете использовать следующий код для массовой загрузки Cassandra из сгенерированных sstables.
Я получил это из списка рассылки пользователей Cassandra от Brian Jeltema. Основной метод должен быть запущен, давая путь к сгенерированным sstables как выше, в качестве аргумента.
import java.io.IOException;>
import java.util.HashMap;
import java.util.Map;

import javax.management.JMX;
import javax.management.MBeanServerConnection;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;

import org.apache.cassandra.service.StorageServiceMBean;


public class JmxBulkLoader {
    private JMXConnector connector;
    private StorageServiceMBean storageBean;


    public JmxBulkLoader(String host, int port) throws Exception    {
        connect(host, port);
    }
    private void connect(String host, int port) throws IOException, MalformedObjectNameException    {
        JMXServiceURL jmxUrl = new JMXServiceURL(String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", host, port));
        Map<String,Object> env = new HashMap<String,Object>();
        connector = JMXConnectorFactory.connect(jmxUrl, env);
        MBeanServerConnection mbeanServerConn = connector.getMBeanServerConnection();
        ObjectName name = new ObjectName("org.apache.cassandra.db:type=StorageService");
        storageBean = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class);
    }

    public void close() throws IOException    {
        connector.close();
    }

    
    public void bulkLoad(String path) {
        storageBean.bulkLoad(path);
    }

    
    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            throw new IllegalArgumentException("usage: paths to bulk files");
        }
        JmxBulkLoader np = new JmxBulkLoader("127.0.0.1", 7199);
        for (String arg : args) {
            np.bulkLoad(arg);
        }
        np.close();
    }
}