Fork me on GitHub

版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 |

起因:前段时间,我们把通过happybase向hbase 写数据的操作put() 操作换成了batch() 结果发现性能并没有提升

阅读代码,我发现put() 实现使用的就是批量插入

    def put(self, row, data, timestamp=None, wal=True):
        """Store data in the table.

        This method stores the data in the `data` argument for the row
        specified by `row`. The `data` argument is dictionary that maps columns
        to values. Column names must include a family and qualifier part, e.g.
        `cf:col`, though the qualifier part may be the empty string, e.g.

        Note that, in many situations, :py:meth:`batch()` is a more appropriate
        method to manipulate data.

        .. versionadded:: 0.7
           `wal` argument

        :param str row: the row key
        :param dict data: the data to store
        :param int timestamp: timestamp (optional)
        :param wal bool: whether to write to the WAL (optional)
        with self.batch(timestamp=timestamp, wal=wal) as batch:
            batch.put(row, data)  # 很明显是批量操作

class Batch(object):
    """Batch mutation class.

    This class cannot be instantiated directly; use :py:meth:`Table.batch`
    def __init__(self, table, timestamp=None, batch_size=None,
                 transaction=False, wal=True):
        """Initialise a new Batch instance."""
        if not (timestamp is None or isinstance(timestamp, Integral)):
            raise TypeError("'timestamp' must be an integer or None")

        if batch_size is not None:
            if transaction:
                raise TypeError("'transaction' cannot be used when "
                                "'batch_size' is specified")
            if not batch_size > 0:
                raise ValueError("'batch_size' must be > 0")

        self._table = table
        self._batch_size = batch_size
        self._timestamp = timestamp
        self._transaction = transaction
        self._wal = wal
        self._families = None

    def _reset_mutations(self):
        """Reset the internal mutation buffer."""
        self._mutations = defaultdict(list)
        self._mutation_count = 0

    def send(self):
        """Send the batch to the server."""
        bms = [BatchMutation(row, m) for row, m in self._mutations.iteritems()]
        if not bms:

        logger.debug("Sending batch for '%s' (%d mutations on %d rows)",
           , self._mutation_count, len(bms))
        if self._timestamp is None:
            self._table.connection.client.mutateRows(, bms, {})
      , bms, self._timestamp, {})


    # Mutation methods

    def put(self, row, data, wal=None):
        """Store data in the table.

        See :py:meth:`Table.put` for a description of the `row`, `data`,
        and `wal` arguments. The `wal` argument should normally not be
        used; its only use is to override the batch-wide value passed to
        if wal is None:
            wal = self._wal

            for column, value in data.iteritems())

        self._mutation_count += len(data)
        if self._batch_size and self._mutation_count >= self._batch_size: # 只有大于_batch_size 才会真正发送数据


您的电子邮箱地址不会被公开。 必填项已用 * 标注