2016年12月13日火曜日

Scala/PySpqrkでSPARK-14927のワークアラウンドを試してみる

HDP 2.5.0 Sandboxを使用

まず、Scalaでうまくいくか試す。

1) PySpqrkからテストテーブルを作る
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
#sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict")
sqlContext.sql("set hive.exec.dynamic.partition.mode=nonstrict")import sqlContext.implicits._
sqlContext.sql("create external table if not exists default.partitiontest1(val string) partitioned by (year int)")
"nonstrict"を使わないとSparkException: Dynamic partition strict mode requires at least one static partition column.

Hiveから確認:
hive> show create table partitiontest1;
OK
CREATE EXTERNAL TABLE `partitiontest1`(
  `val` string)
PARTITIONED BY (
  `year` int)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://sandbox.hortonworks.com:8020/apps/hive/warehouse/partitiontest1'
TBLPROPERTIES (
  'transient_lastDdlTime'='1481536557')
Time taken: 1.071 seconds, Fetched: 14 row(s)
hive>

2) データを入力
import org.apache.spark.sql.SaveMode
Seq(2012 -> "a", 2013 -> "b", 2014 -> "c").toDF("year", "val").write.partitionBy("year") .mode(SaveMode.Append).saveAsTable("default.partitiontest1")

(Sparkと)Hiveから確認:
#sqlContext.sql("show partitions default.partitiontest1").show
hive> select * from partitiontest1;
OK
a 2012
b 2013
c 2014
Time taken: 6.325 seconds, Fetched: 3 row(s)
hive> show partitions partitiontest1;
OK
year=2012
year=2013
year=2014
Time taken: 0.706 seconds, Fetched: 3 row(s)
hive>

PySparkバージョン:

from pyspark.sql import HiveContext
from pyspark.sql import Row
sqlContext = HiveContext(sc)
sqlContext.sql("set hive.exec.dynamic.partition.mode=nonstrict")
...
sqlContext.sql("create external table if not exists default.partitiontest2(val string) partitioned by (year int)")
...
# Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead
# TypeError: schema should be StructType or list or None
#sc.parallelize([{"2012":"a", "2013":"b","2014":"c"}]).toDF("year", "val").write.partitionBy("year") .mode("append").saveAsTable("default.partitiontest2")
#sqlContext.createDataFrame([{"2012":"a", "2013":"b","2014":"c"}])

http://nadbordrozd.github.io/blog/2016/05/22/one-weird-trick-that-will-fix-your-pyspark-schemas/

from pyspark.sql.types import StringType, StructField, StructType, BooleanType, ArrayType, IntegerType
schema = StructType([StructField("year", IntegerType(), True), StructField("val", StringType(), True)])
# value order matters (eg: year needs to come first) and no label
record = [Row(2012, 'a'), Row(2013, 'b'), Row(2014, 'c')]
#sc.parallelize(record).toDF(schema).collect()
sc.parallelize(record).toDF(schema).write.partitionBy("year") .mode("append").saveAsTable("default.partitiontest2")

sqlContext.sql("show partitions default.partitiontest2").show()


統計情報を更新してみる:
sqlContext.sql("analyze table default.partitiontest1 compute statistics noscan")

# Table is partitioned and partition specification is needed
#sqlContext.sql("analyze table default.partitiontest1 compute statistics")

# ERROR ExecDriver: yarn
# java.lang.LinkageError: ClassCastException: attempting to castjar:file:/usr/hdp/2.5.0.0-1245/spark/lib/spark-assembly-1.6.2.2.5.0.0-1245-hadoop2.7.3.2.5.0.0-1245.jar!/javax/ws/rs/ext/RuntimeDelegate.classtojar:file:/usr/hdp/2.5.0.0-1245/spark/lib/spark-assembly-1.6.2.2.5.0.0-1245-hadoop2.7.3.2.5.0.0-1245.jar!/javax/ws/rs/ext/RuntimeDelegate.class
#sqlContext.sql("analyze table default.partitiontest1 partition(year) compute statistics")

pyspark.sql.utils.AnalysisException: u"missing KW_STATISTICS at 'for' near '<EOF>'; line 1 pos 61"
#sqlContext.sql("analyze table default.partitiontest1 partition(year) compute for columns")

# Don't see any stats change
# sqlContext.sql("MSCK REPAIR TABLE default.partitiontest2")


Misc.:
sqlContext.sql("select * from default.partitiontest1").collect()
...
[Row(val=u'a', year=2012), Row(val=u'b', year=2013), Row(val=u'c', year=2014)]

>>> sqlContext.sql("select year, val from default.partitiontest1").printSchema()
16/12/13 05:53:41 INFO ParseDriver: Parsing command: select year, val from default.partitiontest1
16/12/13 05:53:41 INFO ParseDriver: Parse Completed
root
 |-- year: integer (nullable = true)
 |-- val: string (nullable = true)

sqlContext.sql("set hive.stats.autogather").show()
...
+--------------------+-----+
|                 key|value|
+--------------------+-----+
|hive.stats.autoga...| true|
+--------------------+-----+

HDFS JournalNodeの違いをチェックする

たまにeditsファイルが壊れてしまって起動しなくなることがあるかと思いますが、手早くチェックするための備忘録です。

よく最後の/(スラッシュ)を忘れます。

[root@node1 ~]# rsync -vncr --delete /hadoop/hdfs/journal/nnha/current/ root@node2.localdomain:/hadoop/hdfs/journal/nnha/current/
sending incremental file list
deleting edits_0000000000000847636-0000000000000847636
deleting edits_0000000000000847631-0000000000000847635
deleting edits_0000000000000847626-0000000000000847630
edits_inprogress_0000000000000847626

sent 525821 bytes  received 18 bytes  116853.11 bytes/sec
total size is 17011624  speedup is 32.35 (DRY RUN)
[root@node1 ~]#

Rsyncがない場合は?
http://stackoverflow.com/questions/20969124/how-to-diff-directories-over-ssh

2016年12月12日月曜日

HDP HUEで最初にyum/rpmでインストールされたバージョンから変更されたファイルを素早く確認する

[root@sandbox hue]# for h in `rpm -qa hue*`; do echo "# Checking $h";rpm -V $h | grep -P '^..5|^missing'; done
# Checking hue-pig-2.6.1.2.5.0.0-1245.el6.x86_64
# Checking hue-common-2.6.1.2.5.0.0-1245.el6.x86_64
S.5....T.  c /etc/hue/conf.empty/hue.ini
S.5....T.  c /etc/hue/conf.empty/log.conf
S.5....T.    /usr/lib/hue/app.reg
S.5....T.    /usr/lib/hue/build/env/lib/python2.6/site-packages/hue.pth
S.5....T.    /var/lib/hue/desktop.db
# Checking hue-hcatalog-2.6.1.2.5.0.0-1245.el6.x86_64
# Checking hue-oozie-2.6.1.2.5.0.0-1245.el6.x86_64
# Checking hue-2.6.1.2.5.0.0-1245.el6.x86_64
# Checking hue-beeswax-2.6.1.2.5.0.0-1245.el6.x86_64
# Checking hue-server-2.6.1.2.5.0.0-1245.el6.x86_64