Hadoop权威指南: I/O操作序 - 列化

[TOC]

序列化和反序列化

序列化 是指将结构化对象转化为字节流以便在网络上传输或写到磁盘进行永久储存的过程

反序列化 是指将字节流转回结构化对象的逆过程

序列化在分布式数据处理的两大领域经常出现:进程间通讯永久存储

Hadoop中的序列化

  • 在Hadoop中,系统中多个节点上进程间的通信是通过"远程过程调用(remote procedure call, RPC)"是实现的.

  • RPC协议将信息序列化成二进制流后发送到远程节点, 远程节点接着将二进制流反序列化成原始消息

  • RPC序列化格式四大理想属性:

  • 紧凑: 高效使用储存空间

  • 快速: 读写数据的额外开销比较小

  • 可扩展: 可以透明得读取老数据

  • 支持互操作: 可以使用不同语言读写永久储存的数据

  • Hadoop使用自己的序列化格式Writable, 它绝对紧凑,速度快,但不太容易用Java以外的语言进行扩展和使用

  • Avro是克服了Writable部分不足的序列化系统

Writable接口

Writable接口定义了两个方法

  • 一个将其状态写入DataOutput二进制流

  • 另一个从DataOutput二进制流读取状态

package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOExcepion;

public interface Writable {
    void write(DataOutput out) throws IOExcepiont;
      void readFileds(DataInput in) throws IOException;
}

Writable接口示例

序列化

IntWritable writable = new IntWritable();
writable.set(163);

或者:

IntWritable writable = new IntWritable(163);

为了检查IntWritable的序列化形式,我们在java.io.DataOutputStream(java.io.Dataoutput的一个实现)中加入了一个帮助函数来封装java.io.ByteArrayOutputStream, 以便在序列化流中捕捉字节

public static byte[] serialize(Writable writable) throws IOException {
    ByteArrayOutputStream out = new ByteArrayOutputStream();
      DataOutputStream dataOut = new DataOutputStream(out);
      writable.write(dataOut);
      dataOut.close();
      return out.toByteArray();
}

每个字节是按照大端顺序写入的(按照java.io.DataOutput接口中的声明,最重要的字节先写到流)

反序列化

public static byte[] deseralize(Writable writable, byte[] bytes) throws  IOException {
    ByteArrayInputStream in = new ByteArrayInputStream(bytes);
      DataInputStream dataIn = new DataInputStream(in);
      writable.readFileds(dataIn);
      dataIn.close();
      return bytes;
}

调用deserialize()方法从我们刚才写入的输出数据中读取数据

IntWritable newWritable = new IntWritale();
deserialize(newWritable, bytes);

最后,通过get()方法获取原始值:

newWritable.get()

WritableComparable接口和comparator

IntWritable接口实现了原始WritableComarable接口:

package org.apache.hadoop.io;
public interface WritableComparable<T> extends Writable, Comparable<T> { }

对于MapReduce来说, 类型比较非常重要

Hadoop提供的一个优化接口是继承自Java Comparator的RawComparator接口:

package org.apache.hadoop.io;
import java.util.Comparator;
public interface RawComparator<T> extends Comparator<T> {
      public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}

该接口允许其实现直接比较数据流中的记录,无需先把数据流反序列化为对象, 避免了新建对象的额外开销

WritableComparator是对继承自WritableComparable类的RawComparator类的一个通用实现,它提供两个主要功能

  • 第一, 提供了对原始compare()方法的一个默认实现,该方法能够反序列化将在流中进行比较的对象,并调用对象的compare()方法

  • 第二, 它充当的是RawCompatator实例的工厂(已注册Writable的实现)

两个功能示例如下:

首先获得IntWritablecomparator, 直接调用:

RawComparator<IntWritable> comparator = WritableComparator.get(IntWritable.class);

(第一个功能, 这个comparator可以用于比较两个IntWritable对象):

IntWritable w1 = new IntWritable(163);
IntWritable w2 = new IntWritable(67);
System.out.println(comparator.compare(w1, w2))

(第二个功能,比较序列化对象):

byte[] b1 = serialize(w1);
byte[] b2 = serialize(w2);
System.out.println(comparator.compare(b1, 0, b1.length, b2, 0, b2.length));

Writable类

  • Hadoop自带的org.apache.hadoop.io包中有广泛的Writable类可供选择

Java基本类型的Writable封装器

Java基本类型

Writable实现

序列化大小(字节)

boolean

BooleanWritable

1

byte

ByteWritable

1

short

ShortWritable

2

int

IntWritable

4

VintWritable

1-5

float

FloatWritable

4

long

LongWritable

8

VlongWritable

1-9

double

DoubleWritable

8

Text类型

  • Text是针对UTF-8序列的Writable

  • Text使用标准UTF-8编码

索引

Text类和Java String类之间存在一定的差别.

  • 对Text类的索引是根据编码后字节序列中的位置实现的,并非字符串中的Unicode字符, 也不是Java char的编码单

  • 对于ASCII字符串, 这三个索引位置的概念是一致的

charAt()方法的使用

Text t = new Text("hadoop");
assertThat(t.getLength(), is(6));
assertThat(t.getBytes().length, is(6));

assertThat(t.charAt(2), is ((int)'d'));
assertThat("Out of bounds", t.charAt(100), is(-1));

注意charAt()方法返回的是一个表示Unicode编码位置的int类型值, 而String返回一个char类型值

Text还有一个find()方法,该方法类似于String的indexOf()方法:

Text t = new Text("hadoop");
assertThat("Find a substring", t.find("do"), is(2));
assertThat("Finds first 'o'", t.find("o"), is(3));
assertThat("Find 'o' from position 4 or later", t.find("o", 4), is(4));
assertThat("No match", t.find("pig"), is(-1));

Test与String的区别

所有字符(除了U+10400, 它是一个候补字符,需要两个Java char表示,称为字符代理对)都可以使用单个Java char类型来表示.当一个字符需要多个字节来编码时,Text和String的区别就会显现

Unicode字符表

Unicode编码点

U+0041

U+00DF

U+6711

U+10400

名称

A(拉丁大写字母)

拉丁小写字母SHARP S

无(统一表示的汉子)

DESERET CAPITAL LETTER CLONG I

UTF-8编码单元

41

c39f

e69db1

F0909080

Java表示

\u0041

\u00DF

\u6771

\uD801\uDC00

验证String和Text的差异性测试

import org.apache.hadoop.io.Text;
import org.junit.Test;
import java.io.UnsupportedEncodingException;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

public class StringTextComparisonTest {
    @Test
    public void string() throws UnsupportedEncodingException {
        String s = "\u0041\u00DF\u6771\uD801\uDC00";

        assertThat(s.length(), is(5));
        assertThat(s.getBytes("UTF-8").length, is(10));

        assertThat(s.indexOf("\u0041"), is(0));
        assertThat(s.indexOf("\u00DF"), is(1));
        assertThat(s.indexOf("\u6771"), is(2));
        assertThat(s.indexOf("\uD801\uDC00"), is(3));

        assertThat(s.charAt(0), is('\u0041'));
        assertThat(s.charAt(1), is('\u00DF'));
        assertThat(s.charAt(2), is('\u6771'));
        assertThat(s.charAt(3), is('\uD801'));
        assertThat(s.charAt(4), is('\uDC00'));

        assertThat(s.codePointAt(0), is(0x0041));
        assertThat(s.codePointAt(1), is(0x00DF));
        assertThat(s.codePointAt(2), is(0x6771));
        assertThat(s.codePointAt(3), is(0x10400));
    }

    @Test
    public void text() {
        Text t = new Text("\u0041\u00DF\u6771\uD801\uDC00");

        assertThat(t.getLength(), is(10));

        assertThat(t.find("\u0041"), is(0));
        assertThat(t.find("\u00DF"), is(1));
        assertThat(t.find("\u6771"), is(3));
        assertThat(t.find("\uD801\uDC00"), is(6));

        assertThat(t.charAt(0), is(0x0041));
        assertThat(t.charAt(1), is(0x00DF));
        assertThat(t.charAt(3), is(0x6771));
        assertThat(t.charAt(6), is(0x10400));
    }
}
  • 测试证明,String的长度是其所含char编码单元的个数,但Text对象的长度却是其UTF-8编码的字节数(10=1+2+3+4)

  • String类的indexOf()方法返回char编码单元中的索引位置,Text类的find()方法则返回字节偏移量

遍历Text

import org.apache.hadoop.io.Text;

import java.nio.ByteBuffer;

public class TextIterator {
    public static void main(String args[]) {
        Text t = new Text("\u0041\u00DF\u6771\uD801\uDC00");

        ByteBuffer buf = ByteBuffer.wrap(t.getBytes(), 0, t.getLength());
        int cp;
        while(buf.hasRemaining() && (cp=Text.bytesToCodePoint(buf)) != -1) {
            System.out.println(Integer.toHexString(cp));
        }
    }
}

可变性

String想比,Text的另一个区别在于它是可变的,可以通过调用其中一个set()方法来重用Text实例

Text t = new Text("hadoop");
t.set("pig");
assertThat(t.getLength(), is(3));
assertThat(t.getBytes().length, is(3));

BytesWritable

BytesWritable是对二进制数据数组的封装.它的序列化为一个指定所含数据字节数的整数域(4字节),后跟数据内容本身.

BytesWritable b = new BytesWritable(new byte[]{3, 5});
byte[] bytes = serialize(b);
assertThat(StringUtils.byteToHexString(bytes), is("000000020305"));
  • BytesWritable是可变的,可以通过set()方法进行修改

  • BytesWirtable类的getBytes()方法返回的字节数组长度是容量, 可能无法体现所储存数据的实际大小

  • 可以通过getLength()方法来确定BytesWritable变量的大小

    b.setCapacity(11);
    assertThat(b.getLength, is(2));
    assertThat(b.getBytes().length, is(11));

NullWritable

  • NullWritableWritable的特殊类型,它的序列化长度为0

  • 充当占位符,不从数据流中读取数据,也不写入数据

  • 是不可变的单实例类型,通过调用NullWritable.get()方法可以获取

ObjectWritable和GenericWritable

ObjectWritable

ObjectWritable是对Java基本类型(String, enum, Writable, null或这些类型的数组)的一个通用封装,在Hadoop RPC中用于对方法的参数和返回类型进行封装和解封装.

GenericWritable

  • 使用静态类型的数组,并使用对序列化后的类型的引用加入位置索引来提高性能

  • 适用封装的类型比较少,而且能够提前知道

Writable集合类

org.apache.hadoop.io包中一共有6个Writable集合类:ArrayWritable,ArrayPrimitiveWritable,TwoDArrayWritable,MapWritable,SortedMapWritable和EnumMapWritable

  • ArrayWritableTowDArrayWritable是对Writable的数组和两维数组的实现,所有元素必须是同一类型的实例,如下

    ArrayWritable writable = new ArrayWritable(Text.class);

  • ArrayPrimitiveWritable是对java基本数组类型的一个封装,调用set()方法时可以识别相应组件类型,因此无需通过继承该类来设置类型

  • MapWritableSortedMapWritable分别实现了java.util.Map<Writable, Writable>java.util.SortedMap<WritableComparable, Writable>

实现定制的Writable集合

  • 可以完全控制二进制表示和排序顺序

储存一对Text对象的Writable

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class TextPair implements WritableComparable<TextPair> {

    private Text first;
    private Text second;

    public TextPair() {
        set(new Text(), new Text());
    }

    public TextPair(String first, String second) {
        set(new Text(first), new Text(second));
    }

    public TextPair(Text first, Text second) {
        set(first, second);
    }

    public void set(Text first, Text second) {
        this.first = first;
        this.second = second;
    }

    public Text getFirst() {
        return first;
    }

    public Text getSecond() {
        return second;
    }

    @Override
    public int compareTo(TextPair tp) {
        int cmp = first.compareTo(tp.first);
        if (cmp != 0) {
            return cmp;
        }
        return second.compareTo(tp.second);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        first.write(out);
        second.write(out);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        first.readFields(in);
        second.readFields(in);
    }

    @Override
    public int hashCode() {
        return first.hashCode() * 163 + second.hashCode();
    }

    @Override
    public boolean equals(Object o) {
        if (o instanceof TextPair) {
            TextPair tp = (TextPair) o;
            return first.equals(tp.first) && second.equals(tp.second);
        }
        return false;
    }

    @Override
    public String toString() {
        return first + "\t" + second;
    }
}

用于比较TextPair字节表示的RawComparator

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;

import java.io.IOException;

public class Comparator extends WritableComparator {
    private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();

    public Comparator() {
        super(TextPair.class);
    }

    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        try {
            int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
            int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
            int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
            if (cmp != 0) {
                return cmp;
            }
            return TEXT_COMPARATOR.compare(b1, s1+firstL1, l1-firstL1, b2, s2+firstL2, l2-firstL2);
        } catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    static {
        WritableComparator.define(TextPair.class, new Comparator());
    }
}
  • firstL1firstL2表示两个每个字节流中第一个Text字段的长度

  • WritableUtilsdecodeVIntSize()方法返回变长整数的长度

  • readVInt()返回编码值

定制的RawComparator用于比较TextPair对象字节表示的第一个字段

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;

import java.io.IOException;

public class FirstComparator extends WritableComparator {
    private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
    public FirstComparator() {
        super(TextPair.class);
    }

    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        try {
            int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
            int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
            return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
        } catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        if (a instanceof TextPair && b instanceof TextPair) {
            return ((TextPair) a).getFirst().compareTo(((TextPair) b).getFirst());
        }
        return super.compare(a, b);
    }
}

序列化框架

  • 尽管大多是MapReduce程序使用的都是Writable类型的键值对,但这不是MapReduce API强制使用的.事实上,可以使用任何类型,只要能有一种机制对每个类型与其二进制表示来回转换

  • 为了支持上述机制,Hadoop有一个针对可替换序列化框架的API.序列化框架用一个Serialization实现(org.apache.hadoop.io.serializer包)来实现

  • Serialization对象定义了从类型到Serializer实例(将对象转换为字节流)和Deserializer实例的映射方式

  • io.serialization属性设置为一个逗号分隔的类名列表,即可注册Serialization实现,它的默认值包括org.apache.hadoop.io.serizlizer.WritableSerialization和Avro指定序列化和反序列化类, 这意味着只有Writable对象和Avro对象才可以在外部序列化和反序列化

  • Hadoop包含一个名为JavaSerialization的类,该类使用Java Object Serialization.一般不用, 不满足先前列出的序列化格式标准:精简,快速,可扩展,支持互操作

序列化IDL

  • 接口定义语言(Interface Description Language, IDL)以不依赖于具体语言的方式进行声明

Avro

最后更新于

这有帮助吗?