Victor's Code Journey
Victor's Code Journey

Flink:Buffer框架

Flink是使用 JVM 的大数据开源计算框架,基于 JVM 的数据分析引擎都需要面对将大量数据存到内存中,这就不得不面对 JVM 存在的几个问题:

  1. Java 对象存储密度低。一个只包含 boolean 属性的对象占用了16个字节内存: 对象头占了8个,boolean 属性占了1个,对齐填充占了7个。而实际上只需要一个bit就够了。
  • Full GC 会极大地影响性能, 尤其是为了处理更大数据而开了很大内存空间的JVM来说, GC 会达到秒级甚至分钟级。
  • OOM 问题影响稳定性。OutOfMemoryError是分布式计算框架经常会遇到的问题, 当JVM中所有对象大小超过分配给JVM的内存大小时, 就会发生OutOfMemoryError错误, 导致JVM崩溃, 分布式框架的健壮性和性能都会受到影响。

对于第一个问题,如果采用基类存储就可以解决。而第二个问题,可以考虑是使用直接内存和内存池来解决 Full GC 的问题。OOM 问题需要支持内存数据溢写到磁盘,即支持内存数据的序列化和反序列化。这里不使用 JDK 原始 buffer 的原因是 JDK Buffer只支持存储相同固定类型的实例数据,而实际上流式数据处理的总是一行数据,且数据要支持可扩展的类系统。

因此,Flink 选择了实现自己管理的内存单元和可扩展的类型系统,也就是接下来介绍的 Buffer框架(Memory Segment) 和对应的 TypeSerializer。

算法之CRC

循环冗余校验(英语:Cyclic redundancy check,通称“CRC”)是一种根据网络数据包或电脑文件等数据产生简短固定位数校验码的一种散列函数,主要用来检测或校验数据传输或者保存后可能出现的错误。生成的数字在传输或者存储之前计算出来并且附加到数据后面,然后接收方进行检验确定数据是否发生变化1

MySQL参数 kill_idle_transaction

最近遇到一个问题,在执行长事务任务的过程中,频繁出现异常 com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Communications link failure during commit(). Transaction resolution unknown.即,事务提交时发现连接已经失效。一开始以为是连接超时设置的有问题,但是这个异常重复出现,并且,数据库连接池设置了testOnBorrow。所以应该不是连接超时导致。后来发现,出现报错时,事务开启都刚好超过了5S。

经过和RDS同学的沟通。他们设置了kill_idle_transaction 这个参数,并且默认为5S

在线上遇到5.7.26的锁问题,需要解决idle事务长时间挂起的问题。同时也调研了现有的mysql timeout机制,以确保其和现有的timeout机制可以吻合。Percona从5.1.59-13.0引入了innodb_kill_idle_transaction,用于解决长事务场景,即对idle事务设定一个超时时间,对超过该时间的事务所在的用户连接进行断开。

docker容器中的Jdk-availableProcessors

最近在线上环境遇到一个问题,nacos客户端线程池中有96个线程在等待.一开始以为是哪里配置有误,于是检查了nacos的配置。没有发现问题。于是只能看nacos源码了.

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
        this.agent = agent;
        this.configFilterChainManager = configFilterChainManager;

        // Initialize the timeout parameter

        init(properties);

        executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });

        executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });

        executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }

如上面的代码,nacos长轮询线程池在初始化时使用了Runtime.getRuntime().availableProcessors().而宿主机恰好是48核*2。因此判断JVM获取可用核数错误,拿到的是宿主机核数而非容器可用核数1