Fork me on GitHub

版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc

起因:前段时间,我们把通过happybase向hbase 写数据的操作put() 操作换成了batch() 结果发现性能并没有提升

阅读代码,我发现put() 实现使用的就是批量插入
table.py

    def put(self, row, data, timestamp=None, wal=True):
        """Store data in the table.

        This method stores the data in the `data` argument for the row
        specified by `row`. The `data` argument is dictionary that maps columns
        to values. Column names must include a family and qualifier part, e.g.
        `cf:col`, though the qualifier part may be the empty string, e.g.
        `cf:`.

        Note that, in many situations, :py:meth:`batch()` is a more appropriate
        method to manipulate data.

        .. versionadded:: 0.7
           `wal` argument

        :param str row: the row key
        :param dict data: the data to store
        :param int timestamp: timestamp (optional)
        :param wal bool: whether to write to the WAL (optional)
        """
        with self.batch(timestamp=timestamp, wal=wal) as batch:
            batch.put(row, data)  # 很明显是批量操作

batch.py

class Batch(object):
    """Batch mutation class.

    This class cannot be instantiated directly; use :py:meth:`Table.batch`
    instead.
    """
    def __init__(self, table, timestamp=None, batch_size=None,
                 transaction=False, wal=True):
        """Initialise a new Batch instance."""
        if not (timestamp is None or isinstance(timestamp, Integral)):
            raise TypeError("'timestamp' must be an integer or None")

        if batch_size is not None:
            if transaction:
                raise TypeError("'transaction' cannot be used when "
                                "'batch_size' is specified")
            if not batch_size > 0:
                raise ValueError("'batch_size' must be > 0")

        self._table = table
        self._batch_size = batch_size
        self._timestamp = timestamp
        self._transaction = transaction
        self._wal = wal
        self._families = None
        self._reset_mutations()

    def _reset_mutations(self):
        """Reset the internal mutation buffer."""
        self._mutations = defaultdict(list)
        self._mutation_count = 0

    def send(self):
        """Send the batch to the server."""
        bms = [BatchMutation(row, m) for row, m in self._mutations.iteritems()]
        if not bms:
            return

        logger.debug("Sending batch for '%s' (%d mutations on %d rows)",
                     self._table.name, self._mutation_count, len(bms))
        if self._timestamp is None:
            self._table.connection.client.mutateRows(self._table.name, bms, {})
        else:
            self._table.connection.client.mutateRowsTs(
                self._table.name, bms, self._timestamp, {})

        self._reset_mutations()

    #
    # Mutation methods
    #

    def put(self, row, data, wal=None):
        """Store data in the table.

        See :py:meth:`Table.put` for a description of the `row`, `data`,
        and `wal` arguments. The `wal` argument should normally not be
        used; its only use is to override the batch-wide value passed to
        :py:meth:`Table.batch`.
        """
        if wal is None:
            wal = self._wal

        self._mutations[row].extend(
            Mutation(
                isDelete=False,
                column=column,
                value=value,
                writeToWAL=wal)
            for column, value in data.iteritems())

        self._mutation_count += len(data)
        if self._batch_size and self._mutation_count >= self._batch_size: # 只有大于_batch_size 才会真正发送数据
            self.send()

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注