TCP 滑动窗口的大小

场景

我们在用 tcpdump 分析 tcp 数据段时, 数据中会有一个 win 变量, 如

20:59:17.766879 IP 192.168.113.89.50179 > www.test.com.http-alt: Flags [.], ack 21379872, win 32768, length 0

这个 win 代表了什么意思呢?

滑动窗口

TCP 协议为了达到更高效的数据发送效率, 会使用一个叫滑动窗口的机制来批量发送数据, 其形式如图

图中 1~1000 为一个 MSS (Max Segment Size) 大小, 如果窗口大小是 4 MSS, 表示一个窗口可以同时发送 4 个段的数据 (1~4000), 也就是说 1001~2000 的数据可以不必等到 1~1000 的 ack 就直接发出去. 同时, 下一次发送的数据, 将会直接移动到收到的 ack 的最大编号开始, 而不必管前面的编号数据有没有收到 ack. 如图

窗口的大小

那么, 窗口的大小是如何确定的呢?

窗口控制

tcp 的接收方可以根据自己对网络数据的消费能力, 给数据发送方发送一个窗口大小控制参数, 这个参数就是 tcpdump 中看到的 win. 当消费能力不足时, 有可能窗口大小会变成 0. 这样, 发送方便不会再给接收方发数据. 在这期间, 发送方会时不时发一个"窗口探测"数据段, 这个数据段用于获取接收方的最新窗口控制参数. 当窗口大小恢复时, 数据传输才得以继续.

拥塞控制

发送方在数据传输开始时, 如果刚开始就发送大量数据, 很可能会造成网络拥堵. 那么, 窗口大小是如何定义的呢?
tcp 有个机制叫慢启动. 一般来讲, 如果 MSS 为 1460 Bytes (以太网标准), 则窗口初始大小设置为 3 MSS 大小. 此后, 根据每次数据往返的 ack, 会逐渐增大这个值. 首次开始数据传输时, 拥塞窗口大小会以 1, 2, 4 这样的指数倍增长, 直到发生超时重发, 窗口会重新开始计算, 并设置一个慢启动阈值, 这个阈值为窗口大小的一半, 当窗口大小超过阈值时, 窗口的增长速率按照一个小于 MSS 大小的比例增长, 公式为

如果遇超时重传的情况, 则拥塞窗口大小会重新设置为 (慢启动阈值 + 3 MSS). 而阈值会重新设置为当前窗口的一般. 窗口变化如图

窗口大小的确定

那么, 最终的窗口大小其实是由接收方的窗口控制参数和拥塞窗口大小共同决定的, 实际发送的窗口数据量大小, 是两者中的较小值.

close_wait troubleshooting

问题背景

最近, 有好几个业务反馈试用 apache httpclient 时出现了问题, 使用的版本是 4.3.1, 表现出来的现象就是机器的连接监控出现大量的 close_wait. 只有在 gc 的时候, 这些大量的连接才会被回收释放.

问题分析

tcp 四次挥手

要分析 close_wait 形成的原因, 首先需要知道 tcp 连接在关闭时的流程, 如图:

tcp 连接的关闭可以由连接双方任何一方发起, 从图中可知, close_wait 的状态发生在被动关闭方收到 fin, 并发出 ack 之后. 而在被动关闭方发出 fin 以后, 状态则会转为 last_ack. close_wait 的状态可能会永远持续下去, 直到连接被 close 状态变为 last_ack.

由上我们已经知道, 造成这个现象的原因就是连接被对方关闭, 而自己没有发出 fin. 从 api 层面来讲, 就是没有调用 socket 的 close()

代码分析

查看业务代码, 发现业务的请求逻辑如下

public class RequestUtils {

    public static String get(String url) {
        CloseableHttpClient client = HttpClients.createDefault();
        HttpGet get = new HttpGet(url);
        CloseableHttpResponse response = null;
        try {
            response = client.execute(get);
            return EntityUtils.toString(response.getEntity());
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            if (response != null) {
                try {
                    response.close();
                } catch (IOException e1) {
                    // ignored

                }
            }
        }
    }
}

看起来似乎没有什么问题, 最后已经调用了 response.close(), 看起来连接应该已经被关闭了, 但真的是这样吗? 既然连接是处于 close_wait 状态, 那么说明代码并有调用 socket.close(). 跟踪 response.close() 的代码, 到底层后会发现, close 时有一个判断逻辑, 即 Response 的 header 中有 Connection: keep-alive, 则连接不会被关闭, 以便下次请求同一个网站的时候复用连接. 所以, 问题就出在这.

问题原因

连接没有主动关闭, 是造成 close_wait 的直接原因. 那么更本原因是什么? 以前我们总是会说, httpclient 要复用, 连接要复用, 到底是为什么, 不复用会造成什么潜在的问题, 其实就是这个原因. 以上业务代码的问题就在于每次调用这个方法都会创建一个新的 HttpClient, 他的问题在于:
1. 浪费资源, 每次需要重新初始化 client
2. 由于 keep-alive 的存在 (http/1.1 协议默认开启 keep-alive), 导致连接不会被关闭, 只会被回收
3. 局部方法内生成 client, 在 finally 内没有关闭 client, 导致连接的泄露
实际上, 上面出现 close_wait 的应该是在 server 那边 hang 住连接超过一定时间以后超时, server 才关闭连接的, 因为既然 server 返回 keep-alive, 说明 server 实际上也期待客户端能复用连接, 却没想到客户端把连接给泄露了.

解决问题

知道了问题原因, 如何改进业务代码? 改进之前, 应该清楚使用 httpclient 的一些原则:
1. 复用 HttpClient 避免重复初始化
2. 使用连接池管理连接, 避免每次新建连接
3. 设置合理的连接池大小, 以及连接存活时间
改进后的代码如下:

public class RequestUtils2 {

    private static final CloseableHttpClient CLIENT;

    static {
        PoolingHttpClientConnectionManager connectionManager = 
                new PoolingHttpClientConnectionManager(60, TimeUnit.SECONDS);
        connectionManager.setMaxTotal(1080);
        connectionManager.setDefaultMaxPerRoute(128);

        CLIENT = HttpClientBuilder.create()
                .setConnectionManager(connectionManager)
                .build();
    }

    public static String get(String url) {
        HttpGet get = new HttpGet(url);
        CloseableHttpResponse response = null;
        try {
            response = CLIENT.execute(get);
            return EntityUtils.toString(response.getEntity());
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            if (response != null) {
                try {
                    response.close();
                } catch (IOException e1) {
                    // ignored

                }
            }
        }
    }
}

测试代码

有想测试这个问题的同学, 可以用以下代码启动一个 httpserver

public class SimpleHttpServer {

    public static void main(String[] args) throws Exception {
        HttpServer server = HttpServer.create(new InetSocketAddress(8000), 0);
        server.createContext("/test", new MyHandler());
        server.setExecutor(null); // creates a default executor

        server.start();
    }

    static class MyHandler implements HttpHandler {

        @Override
        public void handle(HttpExchange t) throws IOException {
            String response = "This is the response";
            Headers headers = t.getResponseHeaders();
            headers.add("Connection", "keep-alive");
            t.sendResponseHeaders(200, response.length());
            OutputStream os = t.getResponseBody();
            os.write(response.getBytes());
            os.close();
        }
    }
}

然后使用错误代码请求

public class RequestUtilsTest {

    @Test
    public void testRequest() throws Exception {
        String s = RequestUtils.get("http://localhost:8000/test");
        System.out.println(s);
        Thread.sleep(Long.MAX_VALUE);
    }
}

等到请求结束后, 手动结束 httpserver, 再用 sudo netstat -antup (Mac 使用 lsof -i -P) 查看一下端口状态, 你会发现如下信息

java      45896     liuzhenwei   48u  IPv6 0x8c89b842b9402bf3      0t0    TCP localhost:52519->localhost:8000 (CLOSE_WAIT)

题外话

有人会疑问, server 端关闭连接后, client 需要手动关闭连接吗? 关于这一点, 可以参考写简单的 socket 业务逻辑时的代码, 如果被动关闭方需要发出 fin, 是需要调用 socket.close() 的. 例如, 一般我们会这么写:

    Socket socket = new Socket("localhost", 9000);
    InputStream in = socket.getInputStream();
    while (true) {
        int read = in.read();
        if (read == -1) { // 这个 -1 就是 server close 连接后发出的

            in.close();
            break;
        }
        System.out.println(read);
    }

也就是说, 主动关闭方如果 close 了, 那么被动关闭方这边状态会变为 close_wait, 同时收到 -1, 然后我们需要手动调用 socket.close().

0xFF 是什么?

我们经常会看到在代码的基本数据类型转换时出现这样的代码

byte b = -1;
int a = b & 0xFF;

这个 0xFF 的用意是什么呢, 跟 int a = b; 又有什么区别?

数值与数据

在计算机程序里, 数值与数据是两个不同的概念. 数值表示一个数表示的大小, 例如 int 的 -1 和 byte 的 -1, 代表的是数学上的数值大小. 但是作为数据来讲, 他们两个却是不一样的

int 的 -1 的二进制表示: 11111111111111111111111111111111
byte 的 -1 的二进制表示: 11111111

基本数据类型转换

那么回到刚才的问题, int a = b & 0xFF 是什么意思?

0xFF = 00000000000000000000000011111111

也就是说 0xFF 代表了一个 byte 位全为 1.
那么

int a = b & 0xFF = 00000000000000000000000011111111 & 00000000000000000000000011111111 = 00000000000000000000000011111111 = 255

int a = b = -1 = 11111111111111111111111111111111;

至此, 我们可以看出区别, 前者保证了数据一致性, 但是数值从 -1 变成了 255, 而后者则保证了数值一致性, 都是 -1, 但是数据发生了变化.

0xFF 的应用

所以, 0xFF 这个东西, 一般用于我们做小基本数据类型转大基本数据类型. 例如我们需要序列化及反序列化 int 数据

public static void writeFixedInt(OutputStream out, int i) {
    out.write((byte) (i >>> 24));
    out.write((byte) (i >>> 16));
    out.write((byte) (i >>> 8));
    out.write((byte) i);
}

在序列化 int 时, 我们将 int 从高位到低位的每个字节一次写入 stream

public static int readFixedInt(ByteData data, long position) {
    int s = 0;
    int len = 4;
    for (int i = 0; i < len; i++) {
        s |= (data.get(position + i) & 0xFF) << ((len - i - 1) * 8);
    }
    return s;
}

而在反序列化时, 则是一次将 int 的高位字节读出来, 因为读出来的是 byte 类型, 在转为 int 类形时就需要注意保持数据一致性, 所以需要用到 0xFF, 然后再把高位数据左移, 最后拼接出完整的 int.

同为 32 bit, 为什么 Float 比 Integer 表示的范围大?

Float 和 Integer 在 Java 中的都是 32 位, 为什么 Float 表示的范围要比 Integer 大这么多呢?

Float.MAX_VALUE = 340282346638528860000000000000000000000.000000
Integer.MAX_VALUE = 2147483647

换个角度想, 32 bit 可以表示的状态是有限的, 32 bit 最多就能表示 2^32 种状态, 而 Integer 用它们来表示了一段连续的整数. 那么 Float 呢? 显然, Float 也表示了 2^32 种状态, 只是那是与 Integer 不同的 2^32 种状态.

Float 的内部的 32 bit 存储格式, 为 S EEEEEEEE MMMMMMMMMMMMMMMMMMMMMMM, 其中 S 为符号位, E 为指数, M 为底数. 所以, 一个 float = M × 2^E. E 的范围是 -128~127

因此, Float 的最大值很大, 但它并不能表示这个范围内的所有值, 例如

public static void main(String[] args) {
    float f = 1999999991808f;
    System.out.printf("%f\n", f);
    System.out.printf("%f\n", f + 1);
    System.out.printf("%f\n", f + 2);
    System.out.printf("%f\n", f + 3);
}

输出

1999999991808.000000
1999999991808.000000
1999999991808.000000
1999999991808.000000

文本加密 (DES & RSA)

背景

项目中需要保护敏感信息, 为了保证信息安全性, 打算先加密, 然后再在网络中传输.

加密算法

说到加密, 会想到很多加密算法, 包括对称非对称等. 此处由于是客户端服务端通信, 使用对称加密显然不合适, 例如 DES, 因为只要客户端密钥泄露, 则会造成加密无效. 因此, 考虑使用非对称加密, 例如 RSA. 但是, RSA 会有一个问题, 即 RSA 加密的文本长度有限制, 1024 位的 key 最大能加密 84 bytes 的数据, 2048 的 key 最大能加密 214 bytes 的数据, 这对于我们要加密长文本来说显然是不合适的.

解决方法

因此, 为了解决长文本的问题, 我们需要另外的解决方案.

分片加密

我们可以对数据进行切分, 将数据切成小片, 例如每片 200 字节. 然后对每片进行 RSA 加密, 传输到服务端后再对数据进行解密并重新组合. 这种做法可行, 但有一定的缺点: 每一小片被切分的数据都会膨胀, 造成空间的浪费, 例如使用 2048 bits 的 key, 每 245 bytes 的数据都会膨胀为 256 bytes.

结合 DES

将数据用 DES 直接加密, DES 的 key 为每次随机生成, 并用 RSA 加密 DES 的 key. 这样每次收到加密数据后, 先用 RSA 解密 DES 的 key, 然后再用 DES 解密实际的内容.

附上 Java Code

EncryptionUtils
public class EncryptionUtils {

    public static PublicKey loadRSAPublicKey(String path) throws IOException, InvalidKeySpecException {
        byte[] bb = qunar.agile.Files.readBytes(new File(EncryptionUtils.class.getResource(path).getPath()));
        X509EncodedKeySpec spec = new X509EncodedKeySpec(qunar.codec.Base64.decode(bb));
        try {
            return KeyFactory.getInstance("RSA").generatePublic(spec);
        } catch (NoSuchAlgorithmException e) {
            // ignore;

            return null;
        }
    }

    public static PrivateKey loadRSAPrivateKey(String path) throws IOException, InvalidKeySpecException {
        byte[] bb = qunar.agile.Files.readBytes(new File(EncryptionUtils.class.getResource(path).getPath()));
        PKCS8EncodedKeySpec spec = new PKCS8EncodedKeySpec(qunar.codec.Base64.decode(bb));
        try {
            return KeyFactory.getInstance("RSA").generatePrivate(spec);
        } catch (NoSuchAlgorithmException e) {
            // ignore;

            return null;
        }
    }

    public static SecretKey loadDesKey(String path) throws InvalidKeySpecException, IOException, InvalidKeyException {
        String s = qunar.agile.Files.readString(new File(EncryptionUtils.class.getResource(path).getPath()), Charsets.UTF_8.name());
        DESKeySpec spec = new DESKeySpec(Base64.decode(s));
        try {
            return SecretKeyFactory.getInstance("DES").generateSecret(spec);
        } catch (NoSuchAlgorithmException e) {
            // ignore

            return null;
        }
    }

    public static KeyPair createKeyPair(String algorithm) throws NoSuchAlgorithmException, NoSuchProviderException {
        KeyPairGenerator generator = KeyPairGenerator.getInstance(algorithm);
        generator.initialize(1024);
        return generator.generateKeyPair();
    }

    public static SecretKey createKey(String algorithm) throws NoSuchAlgorithmException, NoSuchProviderException {
        KeyGenerator generator = KeyGenerator.getInstance(algorithm);
        return generator.generateKey();
    }

    public static void serializeKey(String dstFile, Key key) throws IOException {
        Files.write(qunar.codec.Base64.encode(key.getEncoded()), new File(dstFile), Charsets.UTF_8);
    }

    public static String decryptDes(String data, String keyString) throws Encryption.DecryptException {
        try {
            Cipher cipher = Cipher.getInstance("DES/ECB/PKCS5Padding");
            DESKeySpec keySpec = new DESKeySpec(keyString.getBytes(Charsets.UTF_8));
            SecretKey key = SecretKeyFactory.getInstance("DES").generateSecret(keySpec);
            cipher.init(Cipher.DECRYPT_MODE, key);
            byte[] bytes = Base64.decode(data);
            return new String(cipher.doFinal(bytes), Charsets.UTF_8);
        } catch (Exception e) {
            throw new Encryption.DecryptException(e);
        }
    }
}

RSAEncryption
public class RSAEncryption implements Encryption {

    private static final String ALGORITHM = "RSA";

    private PublicKey publicKey;
    private PrivateKey privateKey;

    public RSAEncryption(String publicKeyPath, String privateKeyPath) throws IOException, ClassNotFoundException, InvalidKeyException, NoSuchPaddingException, NoSuchAlgorithmException, InvalidKeySpecException {
        this.publicKey = EncryptionUtils.loadRSAPublicKey(publicKeyPath);
        this.privateKey = EncryptionUtils.loadRSAPrivateKey(privateKeyPath);

        // fail fast

        Cipher cipher = Cipher.getInstance(ALGORITHM);
        cipher.init(Cipher.ENCRYPT_MODE, publicKey);
        cipher = Cipher.getInstance(ALGORITHM);
        cipher.init(Cipher.DECRYPT_MODE, privateKey);

    }

    @Override
    public String encrypt(String source) throws EncryptException {
        Cipher cipher;
        try {
            cipher = Cipher.getInstance(ALGORITHM);
            cipher.init(Cipher.ENCRYPT_MODE, publicKey);
            byte[] bytes = source.getBytes(Charsets.UTF_8);
            return Base64.encode(cipher.doFinal(bytes));
        } catch (Exception e) {
            throw new EncryptException(e);
        }
    }

    @Override
    public String decrypt(String source) throws DecryptException {

        Cipher cipher;
        try {
            cipher = Cipher.getInstance(ALGORITHM);
            cipher.init(Cipher.DECRYPT_MODE, privateKey);
            return new String(cipher.doFinal(Base64.decode(source)), Charsets.UTF_8);
        } catch (Exception e) {
            throw new DecryptException(e);
        }
    }
}

IP地址分类与子网掩码

说起来很惭愧, 在工作了这么长时间, 居然一直没有弄清楚像 IP 地址分类子网掩码 这些基础概念. 原因应该跟我不喜欢系统的看书学习有关, 最近闲下来看了一些关于 tcp/ip 的书, 总算有了比较清晰的了解. 特此记录一下.

IP 地址类型

大部分人都知道 IPv4 地址有 A, B, C, D 四类地址, 实际上他们的区别就在于地址在以二进制表示时是以什么数字开头.

A 类地址

IP 地址以 0 开头的都是 A 类地址, 范围包括 0.0.0.0 ~ 127.0.0.0
默认情况下, A 类地址的前 8 位为网络标识位, 后 24 位为主机标识位
| 0 | 7位 | 8位 | 8位 | 8位 |
加粗部分为网络地址位

B 类地址

IP 地址以 10 开头的都是 B 类地址, 范围包括 128.0.0.1 ~ 191.255.0.0
默认情况下, B 类地址的前 16 位为网络标识位, 后 16 位为主机标识位
| 10 | 6位 | 8位 | 8位 | 8位 |
加粗部分为网络地址位

C 类地址

IP 地址以 110 开头的都是 C 类地址, 范围包括 192.168.0.0 ~ 239.255.255.0
默认情况下, C 类地址的前 24 位为网络地址标识位, 后 8 位为主机标识位
| 110 | 5位 | 8位 | 8位 | 8位 |
加粗部分为网络地址位

D 类地址

IP 地址以 1110 开头的都是 D 类地址, 范围包括 224.0.0.0 ~ 239.255.255.255
D 类地址没有主机标识位, 所有 32 位都是网络标志位, 一般用于多播
| 1110 | 4位 | 8位 | 8位 | 8位 |

特殊地址

  • 0.0.0.0 : 用于表示无法获取 ip 的地址
  • 255.255.255.255 : 全为 1 地址通常用作广播地址 因为, 我们在计算地址个数的时候通常要减去这两个特殊地址

子网掩码

子网掩码用于划分哪些 IP 属于同一个子网, 一般的表示形式如下
172.20.100.52/26
首先这是个 B 类地址, 这个地址表示他的子网掩码是 26, 26 代表的意思子网掩码前 26 位全为 1, 即为 11111111.11111111.11111111.11000000, 化为整数形式则是 255.255.255.192

那么, 如何确定子网内的主机呢.

IP 地址: 172 . 20 . 100 . 52 ( 10101100 . 00010100 . 01100100 . 00110100 )
子网掩码: 255 . 255 . 255 . 192 ( 11111111 . 11111111 . 11111111 . 11*000000* )
网络地址: 172 . 20 . 100 . 0 ( 10101100 . 00010100 . 01100100 . 00*000000* )

实际上网络地址的计算就是用二进制的 IP 地址与上二进制的子网掩码, 那么最后得出了网络地址, 网络地址中与子网掩中为 0 对应的位即为可变化的子网内主机 ip 的位, 所以对于 172.20.100.52/26 来说, 他的子网的主机应该包括了
172.20.100.0 (10101100.00010100.01100100.00*000000) ~ 172.20.100.63 (10101100.00010100.01100100.00111111*)

CIDR (Classless Inter-Domain Routing)

前面提到过默认情况下 B 类地址的网络标识位应该是 16 位, 那么上面提到的 172.20.100.52/26 却有 26 位是为什么呢?
这是因为默认情况下的固定了网络标识位, 也就固定了某个子网的主机数, 这样很有可能会造成 ip 资源的浪费, 也许你并不需要这么大的子网. 例如 B 类地址的子网包括了 65534 台主机, C 类地址只有 254 台主机, 而你的子网却可能是 500 台主机. 这样, 如果可以自定义子网掩码的长度, 也就是自定义网络标识位的长度, 则可以更有效的利用 ip 资源来构成子网. 这种采用任意长度切割 IP 地址子网的方式叫 CIDR, 意为 "无类型域间选路".

小结

实际上, 一个地址是什么类型的地址仅需要通过地址的前几位去判断就可以了, 与子网掩码并没有关系. 而一个 IP 的子网掩码是多少, 则是需要通过 ISP 去向区域互联网注册管理机构申请 CIDR 地址块, 也就是说一次性会申请一个段的 IP, 而同时也确定了这些 IP 的子网掩码. 换句话说, 并不可能会同时存在 172.20.100.52/26 和 172.20.100.52/25 两个 IP 相同, 子网掩码却不同的地址.

分布式限流

背景

限流是生产中经常遇到的一个场景, 目前现有的一个工具大部分是提供单机限流的能力, 例如 google 的 guava 中提供的 RateLimiter. 但是生产环境大部分是分布式环境, 在多台机器的环境下, 需要的是能对多台机器一起限流的分布式限流. 分布式限流依赖公共的后端存储, 所以还需要自己搭建.

算法

说到限流, 首先依赖的是限流的算法, 限流的算法很多包括令牌桶, 漏桶.

滑动窗口

滑动窗口算法的优点在于可以在滑动时间内计算出相对精确的限流数据. 想象一个简单的限流算法, 例如限制在一分钟内最多访问 10 次. 我们在后端存储的结构如下

假设每个框代表了一分钟, 框中存储了一分钟内的限流数据, 那么问题在于我们想要红色框的限流的数据时将无法计算, 也就是说我们的限流的时间节点的起止时间是固定的. 而滑动窗口之所以为"滑动", 则是为了解决这个问题诞生. 而实际上, 这个方法也是令牌桶的一种变相实现.

滑动实现的核心思想, 在于将时间块切分到更细的精度, 假如我们继续将 1 分钟切分为更小的维度, 例如 5 秒, 那么以后我们的频率计算的时间节点就可以变得更精确, 例如 10:11:00 ~ 10:12:00, 10:11:05 ~ 10:12:05, 10:11:10 ~ 10:12:10 ... 做到 5 秒的精度, 如图

从另一个角度来讲, 一分钟时间的间隔, 实际上也是一种滑动的特殊情况, 只不过精度一分钟.

后端存储

既然说到分布式实现, 则需要考虑公共的后端存储服务, 此处我们选择 redis, 因为 redis 提供了方便的数据结构供我们实现滑动窗口, 主要会用到 redis 中的 map. 具体实现可以参照代码.

实现

为了保证单次限流各种操作的原子性, 我们选择使用 lua 脚本执行限流逻辑, 最终会返回是否达到流量限制的结果. 参考 Part 1 Part 2

-- KEYS[1] map key

-- ARGV[1] current time

-- ARGV[2] duration

-- ARGV[3] limitation

-- ARGV[4] precision

-- ARGV[5] permits


local function clear(i1, i2, key, count_key, dele)
    local sum = 0
    for id = i1, i2 do
        local bkey = count_key .. ":" .. id;
        local bcount = redis.call('HGET', key, bkey)
        if bcount then
            sum = sum + tonumber(bcount)
            table.insert(dele, bkey)
        end
    end
    return sum
end

local count_key = "cnt"
local ts_key = "ts"

local key = KEYS[1]
local now = tonumber(ARGV[1])
local duration = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local precision = tonumber(ARGV[4])
local permits = tonumber(ARGV[5])

local blocks = math.ceil(duration / precision)
local block_id = math.floor(now / precision) % blocks
local last_ts = redis.call('HGET', key, ts_key)
last_ts = last_ts and tonumber(last_ts) or 0

if last_ts ~= 0 then
    local decr = 0;
    local dele = {}
    local last_id = math.floor(last_ts / precision) % blocks
    local elapsed = now - last_ts;

    if elapsed >= duration then
        -- clear all

        clear(0, blocks - 1, key, count_key, dele)
        if permits > 0 then
            redis.call('HSET', key, ts_key, now)
            redis.call('HINCRBY', key, count_key, permits)
            redis.call('HINCRBY', key, count_key .. ":" .. block_id, permits)
            redis.call('PEXPIRE', key, duration)
        end
        return false
    elseif block_id > last_id then
        decr = decr + clear(last_id + 1, block_id, key, count_key, dele)
    elseif block_id < last_id then
        decr = decr + clear(0, block_id, key, count_key, dele)
        decr = decr + clear(last_id + 1, blocks - 1, key, count_key, dele)
    end

    local cur
    if #dele > 0 then
        redis.call('HDEL', key, unpack(dele))
        cur = redis.call('HINCRBY', key, count_key, -decr)
    else
        cur = redis.call('HGET', key, count_key)
    end

    if tonumber(cur or '0') + permits > limit then
        return true
    end
end

if permits > 0 then
    redis.call('HSET', key, ts_key, now)
    redis.call('HINCRBY', key, count_key, permits)
    redis.call('HINCRBY', key, count_key .. ":" .. block_id, permits)
    redis.call('PEXPIRE', key, duration)
end
return false

参数解释

  • key : 限流记录的 key, 此处的 key 由外部传入, 一般根据我们需要限流的维度来生成. 例如如果是按 ip 对某个 url 做访问限流限制, 则 key 可能是 url:/test:ip:192.168.1.1
  • current time : 当前时间, 使用服务端 redis 时间, 为了保证分布式情况下时间的一致性, 这里的使用通过 redis.time 获取并传入 lua 脚本
  • duration : 限流的总时长, 例如 1 分钟则是 60 * 1000 ms
  • limitation : 最高流量限制, 例如每分钟 10 次, 则为 10
  • precision : 限流精度, 例如精度是 1s, 则为 1000 ms, 限流精度也是保证能实现上图红框内限流的关键, 精度越小, 限流越精确, block 数也越多, 占用的内存也越大. 实际上上图的简单限流即是 duration = precision 的一种特殊情况
  • permits : 本次需要增加多少流量, 对于频率来说一般是 1, 而对于流量来说则是数据流量的字节数

至此分步实现的后端关键实现已经基本完成, 剩下的是做好客户端调用流量的接口.

考虑的问题

redis 集群问题

由于 redis 是集群环境, 集群环境下实际上直接执行 lua 脚本是有问题的. 试想 lua 脚本内可能涉及到多个 key 的操作, 而 redis 实际执行节点的选择也是通过 key 来选择的. 在多 key 情况下可能会造成 lua 脚本内 key 的执行混乱, 所以我们需要先手动选择好 redis 节点. 此处我们可以先用限流的 key 将 redis 选择出来, 再将 lua 脚本传到某个 redis 节点执行. 也就是我们必须要可以通过限流 key 唯一确定一个 redis 节点, 例如 url:/test:ip:192.168.1.1 是可以确定使用某个 redis 节点的.

分布式时间问题

分布式系统需要考虑多客户端时间不一致问题, 此处使用 redis 时间解决

客户端性能问题

由于这是一个公用的限流服务, 也就是所有接入该服务的应用的每次请求都会调用该服务, 再加上所有接入服务的应用共用一个 redis, 显然如果客户端使用同步等待限流服务的返回结果并不太合适, 会影响客户端的服务调用性能. 所以我们可以使用一种折中策略, 即将限流结果保存到本地, 每次请求直接检查本地限流结果是否被限流, 同时使用异步的方式调用限流服务, 并在异步回调中更新限流结果. 这种做法会让限流数据略有延迟, 但是影响不大.

限流服务本身的负载

作为限流服务, 一个主要的作用是限制恶意流量对正常业务造成冲击, 但如果所有流量都需要经过限流服务, 当流量激增的时候, 谁来保证限流服务自己不被压垮? 我的建议是设定一个阈值, 当流量超过某个阈值(这个阈值可以设置为 机器数 * 限流阈值)时, 直接退化为本地限流.

谈谈 Maven 的 settings.xml

问题起因

来公司开始使用 maven 以后, 会配置一个叫 settings.xml 的 maven 配置文件. 这个文件的内容不多, 格式大概如下:

settings.xml
<?xml version="1.0"?>
<settings xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                            http://maven.apache.org/xsd/settings-1.0.0.xsd">
    <servers>
        <server>
            <id>snapshots</id>
            <username>{username}</username>
            <password>{passwd}</password>
            <filePermissions>664</filePermissions>
            <directoryPermissions>775</directoryPermissions>
        </server>
    </servers>

    <profiles>
        <profile>
            <id>MyProfile</id>
            <repositories>
                <repository>
                    <id>Nexus</id>
                    <url>http://test.com/nexus/content/groups/public</url>
                    <releases>
                        <enabled>true</enabled>
                        <!-- always , daily (default), interval:X (where X is an integer in minutes) or never.-->
                        <updatePolicy>daily</updatePolicy>
                        <checksumPolicy>warn</checksumPolicy>
                    </releases>
                    <snapshots>
                        <updatePolicy>always</updatePolicy>
                    </snapshots>
                </repository>
            </repositories>
            <pluginRepositories>
                <pluginRepository>
                    <id>Nexus</id>
                    <url>http://test.com/nexus/content/groups/public</url>
                    <releases>
                        <enabled>true</enabled>
                        <checksumPolicy>warn</checksumPolicy>
                    </releases>
                    <snapshots>
                        <updatePolicy>always</updatePolicy>
                    </snapshots>
                </pluginRepository>
            </pluginRepositories>
        </profile>
    </profiles>

    <activeProfiles>
        <activeProfile>MyProfile</activeProfile>
    </activeProfiles>

</settings>

之前一直不太明白这个文件各部分都是什么意思, 查了文档 settings 配置指南pom 配置指南 才大概明白此处的各个节点的意思.

<server /> 节点

<server /> 中定义了一些敏感的账号密码信息, 这些信息用于在发布新 jar 包的时候使用. 一般来讲, 我们只会填写 snapshot 的 username & password. 因为本地一般只会发布 snapshot 用于测试, 而 release 版本发布都是走 bds 发布系统, 所以不会写在这里.

根据文档的解释, 这里的 <id> 应该是需要对应 repository 或 mirror 的 id 值的, 但是这里的配置没有叫 snapshots 的 repository. 那么这个对应的 id 到底是什么呢? 原因在于 settings.xml 与 pom.xml 是联用的. 而我们一般在自己项目的 pom 里会引用公司的 super pom. 去 super pom 中查看, 可以看到一个信息如下

super-pom.xml
<distributionManagement>

    <repository>
        <id>releases</id>
        <name>Nexus Releases</name>
        <url>${nexus.releases}</url>
    </repository>

    <snapshotRepository>
        <id>snapshots</id>
        <uniqueVersion>false</uniqueVersion>
        <name>Nexus Snapshots</name>
        <url>${nexus.snapshots}</url>
    </snapshotRepository>

</distributionManagement>

原来这个叫 snapshots 的 repository 定义在这里. 那么, distributionManagement 又是什么? 根据文档的解释, 这个是用来配置发布 jar 包的时候用的地址. 例如我们的 snapshot, 就会发布到 ${nexus.snapshots} 这个变量定义的地方. 而发布的时候验证的权限则是使用 servers 节点中定义的 username 与 password.

<profiles /> 节点

<profiles /> 节点中主要定义了我们 local repository 在获取依赖的时候访问的 url 地址. 我们可以发现它与 super-pom 的 profiles 定义类似.

super-pom.xml
<profiles>

    <profile>
        <id>local</id>
        <properties>
            <deploy.type>local</deploy.type>
        </properties>
    </profile>

    <profile>
        <id>dev</id>
        <properties>
            <deploy.type>dev</deploy.type>
        </properties>
    </profile>

    <profile>
        <id>beta</id>
        <properties>
            <deploy.type>beta</deploy.type>
        </properties>
    </profile>

    <profile>
        <id>prod</id>
        <properties>
            <deploy.type>prod</deploy.type>
        </properties>
    </profile>

</profiles>

他们的联系在于 settings.xml 中定义的 profile 值会覆盖 pom.xml 中相同的 id 的 profile 参数. 当然, 我们此处 pom 中并没有定义与 settings 中相同的 key. 所以此处可以理解为最后在使用 local profile 编译的时候, 会合并使用 settings 和 pom 中对 local profile 定义的参数.

小结

从配置上看, 我们的发布和编译实际上是使用不同的 url 的, 总共会涉及到获取依赖的 repository, 发布 jar 包的 repository 两部分 repository, 分别定义在 <profile> 和 <distributionManagement> 两个节点中.

另外, settings 文件分为两种:

  • The Maven install: $M2_HOME/conf/settings.xml
  • A user’s install: ${user.home}/.m2/settings.xml

当两个同时存在时, 会合并使用两个配置文件, 出现冲突时, 优先考虑用户目录下的 settings.xml

最后, 我之前在考虑为什么不把获取依赖的 repository 直接定义到 super-pom, 而是要每个人都去修改自己的 settings 文件, 定义到 settings 文件里呢? 我发现这其实是一个悖论, 因为 super-pom 也是通过我们定义的 repository 来获取的.

equals 与里氏代换原则

equals 的实现原则

自反性(Reflexive) : a.equals(a) 必须为 true
对称性(Symmetric) : a.equals(b) && b.equals(a) 必须为 true
传递性(Transitive) : a.equal(b) && b.equals(c) && a.equals(c) 必须为 true
一致性(Consistent) : 多次调用 a.equal(b) 结果不变

里氏代换原则

If S is a subtype of T, then objects of type T may be replaced with objects of type S (i.e., objects of type S may substitute objects of type T) without altering any of the desirable properties of that program (correctness, task performed, etc.)

如果 S 是 T 的子类, 那么 T 在被 S 替代的情况下, 不需要修改任何程序的代码.
例如:

pubic void process(List<Integer> list) {
    for (Integer i : list) {
    System.out.println(i);
  }
}

此处的 list 我们可以使用任意的 ArrayList 或者 LinkedList 来替换, 这就符合里氏代换原则

错误的范例

违反对称性原则

Human.java
public class Human {
    private String name;

    public Human(String name) {
        this.name = name;
    }
    
    @Override
    public boolean equals(Object o) {
        if (!(o instanceof Human)) return false;
        if (o == this) return true;
        Human h = (Human) o;
        return name != null ? name.equals(h.name) : h.name == null;
    }

    @Override
    public int hashCode() {
        return name.hashCode();
    }
}
Man.java
package com.qunar.fresh;

public class Man extends Human {

    private int length;

    public Man(String name, int length) {
        super(name);
        this.length = length;
    }
    
    @Override
    public boolean equals(Object o) {
        if (!(o instanceof Man)) return false;
        if (this == o) return true;
        Man m = (Man) o;
        return super.equals(o) && this.length == m.length;
    }

    @Override
    public int hashCode() {
        return super.hashCode() * 31 + length;
    }
}

试想如下代码的执行结果

Human h = new Human("Jack");
  Man m = new Man("Jack", 20);
  System.out.println("h.equals(m) : " + h.equals(m));
  System.out.println("m.equals(h) : " + m.equals(h));

h.equals(m) : true
m.equals(h) : false

这是在子类中新增属性经常会出现的错误, 一般来讲, 在子类的 equals 中, 如果可以, 则只使用接口或父类型进行比较, 例如 ArrayList 的 equals

equals in ArrayList
public boolean equals(Object o) {
    if (o == this)
        return true;
    if (!(o instanceof List))
        return false;

    ListIterator<E> e1 = listIterator();
    ListIterator e2 = ((List) o).listIterator();
    while (e1.hasNext() && e2.hasNext()) {
        E o1 = e1.next();
        Object o2 = e2.next();
        if (!(o1==null ? o2==null : o1.equals(o2)))
            return false;
    }
    return !(e1.hasNext() || e2.hasNext());
}

同样的, 这个例子中, 对于里氏代换原则来讲, 也不成立, 例如如下代码

Human h = new Human("Jack");
Man m = new Man("Jack", 20);
Set<Human> hSet = new HashSet<Human>();
hSet.add(h);
System.out.println("Set contains Human : " + hSet.contains(h));
System.out.println("Set contains Man : " + hSet.contains(m));

Set contains Human : true
Set contains Man : false

容易发现这里 h.equals(m) 为 true, 但是却不能用 m 替换掉 h 来做查询.
这个道理用在 compareTo 上也是一样的, 如果子类的 compareTo 覆盖了父类的 compareTo, 并且加入了新的元素作为比较元素, 则也会违反对称性原则, 导致 a.compareTo(b) > 0, b.compareTo(a) < 0. 同时也会违反里氏代换原则, 导致在 TreeSet 中插入 a 或 a 的子类的时候顺序发生变化.

Java 中不合原则的类

Timestamp 与 Date

Java 中, Timestamp 和 Date 是一个反例, 因为 Timestamp extands Date, 而 equals 方法中却使用了 Timestamp 独有的变量, 例如:

Date now = new Date();
Timestamp t = new Timestamp(now.getTime());
System.out.println("now.equals(t) : " + now.equals(t));
System.out.println("t.equals(now) : " + t.equals(now));

now.equals(t) : true
t.equals(now) : false

显然, Timestamp 的 equals 方法是违反对称性的.

java.net.URL

URL 类的 equals 方法会使用 InetAddress 将 host 转为 ip 来比较两个 ip 是否相等, 则就使得在使用 equals 的时候会访问网络来获取 ip. 然而, 获得 ip 与你是用的的 DNS 服务有关, 在不同的地点, 也许同一个 host 会获得不同的 ip. 这就让 URL 类违反了一致性原则.