Hive 事务配置

Hive 不开启事务情况下开启 Concurrency

  1. Concurrency 配置
<property>
    <name>hive.support.concurrency</name>
    <value>true</value>
</property>

SET hive.support.concurrency = true;
  1. 配置完成后重启 hiveserver2

  2. 创建一个普通的 Hive 表

> create table test_notransaction(user_id Int,name String);
> create table test(name string, id int);
  1. 准备测试数据,向表中插入数据
> insert into test_notransaction values(1,'peach1'),
> (2,'peach2'),
> (3, 'peach3'),
> (4, 'peach4');
  1. 开启 Concurrency 测试用例
    1) 对 catalog_sales 表进行并发 select 操作
> select count(*) from catalog_sales;
> select count(*) from catalog_sales;

2)对 test 表进行并发 insert 操作

> insert into test values('test11aaa1',1252);
> insert into test values('test1',52);

3)对 test 表执行 select 的同时执行 insert 操作

> select count(*) from test;
> insert into test values("test123",123);

4)对 test 表执行 insert 的同时执行 select 操作

> insert into test values("test123",123);
> select count(*) from test;

5)对 test 表进行 update 和 delete 操作

> update test set name='aaaa' where id=1252;
> delete test set name='bbbb' where id=123;

Hive 不开启事务情况下关闭 Concurrency

  1. Concurrency 配置
<property>
    <name>hive.support.concurrency</name>
    <value>false</value>
</property>

> SET hive.support.concurrency = false;
  1. 配置完成后重启 hiveserver2

  2. 关闭 Concurrency 测试用例
    1)执行 insert 操作的同时执行 select 操作

> insert into test_notransaction values(1,'peach1'),(2,'peach2'),(3, 'peach3'),(4, 'peach4');
> select count(*) from test_notransaction;

2)执行 select 操作的同时执行 insert 操作

> select count(*) from test_notransaction;
> insert into test_notransaction values(1,'peach1'),(2,'peach2'),(3, 'peach3'),(4, 'peach4');

3)同时执行多条 insert 操作

> insert into test_notransaction values(1,'peach1'),(2,'peach2'),(3, 'peach3'),(4, 'peach4');
> insert into test_notransaction values(1,'peach1'),(2,'peach2'),(3, 'peach3'),(4, 'peach4');

4)执行 update 操作,将表中 user_id 为2的用户名修改为peach22

> update test_notransaction set name='peach22' where user_id=2;

5)执行 delete 操作,将表中 user_id 为1信息删除

> delete from test_notransaction where user_id=1;

6)查看表获取锁类型

show locks;

Hive 开启事务

  1. 事务配置
<property>
    <name>hive.support.concurrency</name>
    <value>true</value>
</property>
<property>
    <name>hive.enforce.bucketing</name>
    <value>true</value>
</property>
<property>
    <name>hive.exec.dynamic.partition.mode</name>
    <value>nonstrict</value>
</property>
<property>
    <name>hive.txn.manager</name>
    <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
    <name>hive.compactor.initiator.on</name>
    <value>true</value>
</property>
<property>
    <name>hive.compactor.worker.threads </name>
    <value>1</value>
</property>
  1. 配置完成后重启 hiveserver2

  2. hive 事务建表语句

> create table test_trancaction (user_id Int,name String) 
> clustered by (user_id) into 3 buckets stored as orc TBLPROPERTIES ('transactional'='true');

修改表名:alter table test_trancaction rename to test_transaction;

  1. 准备测试数据,向表中插入数据
> insert into test_transaction values(1,'peach'),(2,'peach2'),(3,'peach3'),(4,'peach4'),(5,'peach5');
  1. hive 事务测试用例
    1)执行 update 操作,将 user_id 的name修改为peach_update
> update test_transaction set name='peach_update' where user_id=1;

2)同时修改同一条数据,将 user_id 为1的用户名字修改为peach,另一条sql将名字修改为peach_

> update test_transaction set name='peach' where user_id=1;
> update test_transaction set name='peach_' where user_id=1;

3)同时修改不同数据,修改id为2的name为peachtest,修改id为3的name为peach_test

> update test_transaction set name='peachtest' where user_id=2;
> update test_transaction set name='peach_test' where user_id=3;

4)执行 select 操作的同时执行 insert 操作

> select count(*) from test_transaction;
> insert into test_transaction values(3,'peach3');

5)update 同一条数据的同时 select 该条数据

> update test_transaction set name='peach_update' where user_id=1;
> select * from test_transaction where user_id=1;

6)执行 delete 操作,将 user_id 为3的数据删除

> delete from test_transaction where user_id=3;

7)同时 delete 同一条数据

> delete from test_transaction where user_id=3;
> delete from test_transaction where user_id=3;

8)同时 delete 两条不同的数据

> delete from test_transaction where user_id=1;
> delete from test_transaction where user_id=5;

9)执行 delete 的同时对删除的数据进行 update 操作

> delete from test_transaction where user_id=3;
> update test_transaction set name='test' where user_id=3;

10)执行 delete 的同时对不同的数据进行 update 操作

> delete from test_transaction where user_id=2;
> update test_transaction set name='test' where user_id=4;

11)执行 delete 的同时执行 select 操作

> delete from test_transaction where user_id=4;
> select count(*) from test_transaction;

对比总结

下表中,C表示并行执行,S表示串行执行,N表示不支持

操作 Hive Hive Hive MySQL
关闭 Concurrency 开启 Concurrency 开启 Transaction
并发执行 select 操作 C C C C
并发执行 insert 操作 C S S S
执行 insert 的同时执行 select 操作 C S C C
执行 select 的同时执行 insert 操作 C S C C
执行 delete 操作 N N S C
执行 update 操作 N N S C
同时 delete 同一条数据 N N S C
同时 delete 两条不同的数据 N N S C
同时 update 同一条数据 N N S C
同时 update 不同的数据 N N S C
update 的同时 select 该条数据 N N C C
delete 的同时对该数据进行 update 操作 N N S C
delete 的同时对不同的数据进行 update 操作 N N S C
执行 delete 的同时执行 select 操作 N N C C

Hive 事务使用

  1. Hive 对事务支持自动事务,因此 Hive 无法通过显示事务的方式对一个操作序列进行事务控制。
  2. Hive 不支持 ROLLBACK。
  3. Hive 对事务无法做到完全并发控制, 多个操作均需要获取 WRITE 的时候则这些操作为串行模式执行(delete 同一条数据的同时 update 该数据,操作是串行的且操作完成后数据未被删除且数据被修改)未保证数据一致性。
  4. Hive 的事务功能尚属于实验室功能,并不建议用户直接上生产系统,因为目前它还有诸多的限制,如只支持 ORC 文件格式,建表必须分桶等,使用起来没有那么方便,另外该功能的稳定性还有待进一步验证。
  5. CDH 默认开启了 Hive 的 Concurrency 功能,主要是对并发读写的的时候通过锁进行了控制。所以为了防止用户在使用 Hive 的时候,报错提示该表已经被 lock,对于用户来说不友好,建议在业务侧控制一下写入和读取,比如写入同一个 table 或者 partition 的时候保证是单任务写入,其他写入需控制写完第一个任务了,后面才继续写,并且控制在写的时候不让用户进行查询。另外需要控制在查询的时候不要允许有写入操作。
  6. 如果对于数据一致性不在乎,可以完全关闭 Hive 的 Concurrency 功能关闭,即设置 hive.support.concurrency 为 false,这样 Hive 的并发读写将没有任何限制。

References:
https://www.infoq.cn/article/guide-of-hive-transaction-management

Tags:

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注

18 − 8 =