kaleido 编译与测试


git clone https://github.com/kaleidochain/kaleido.git
cd kaleido/


sudo apt-get update
sudo apt-get install -y git build-essential autotools-dev libtool autoconf automake pkg-config cmake libboost-all-dev

创建go mod

rm -rf vendor/
go mod init
go mod tidy


make kalgo


Done building.
Run "/kaleido源代码目录/build/bin/kalgo" to launch kalgo.

以太坊索引器通过 ETH 地址获取交易列表


# Indexer for Ethereum to get transaction list by ETH address
# https://github.com/Adamant-im/ETH-transactions-storage
# 2021 ADAMANT Foundation (devs@adamant.im), Francesco Bonanno (mibofra@parrotsec.org),
# Guénolé de Cadoudal (guenoledc@yahoo.fr), Drew Wells (drew.wells00@gmail.com)
# 2020-2021 ADAMANT Foundation (devs@adamant.im): Aleksei Lebedev
# 2017-2020 ADAMANT TECH LABS LP (pr@adamant.im): Artem Brunov, Aleksei Lebedev
# v2.0

from os import environ
from web3 import Web3
from web3.middleware import geth_poa_middleware
import psycopg2
import time
import sys
import logging
#from systemd.journal import JournalHandler

# Get env variables or set to default
dbname = environ.get("DB_NAME")
startBlock = environ.get("START_BLOCK") or "1"
confirmationBlocks = environ.get("CONFIRMATIONS_BLOCK") or "0"
nodeUrl = environ.get("ETH_URL")
pollingPeriod = environ.get("PERIOD") or "20"

if dbname == None:
    print('Add postgre database in env var DB_NAME')

if nodeUrl == None:
    print('Add eth url in env var ETH_URL')

# Connect to Ethereum node
if nodeUrl.startswith("http"):
    web3 = Web3(Web3.HTTPProvider(nodeUrl))
if nodeUrl.startswith("ws"):
    web3 = Web3(Web3.WebsocketProvider(nodeUrl)) # "ws://publicnode:8546"
if web3 == None:
    web3 = Web3(Web3.IPCProvider(nodeUrl)) # "/home/parity/.local/share/openethereum/jsonrpc.ipc"

web3.middleware_onion.inject(geth_poa_middleware, layer=0)

# Start logger
#logger = logging.getLogger("EthIndexerLog")
logger = logging.getLogger("eth-sync")

# File logger
#lfh = logging.FileHandler("/var/log/ethindexer.log")
lfh = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

# Systemd logger, if we want to user journalctl logs
# Install systemd-python and 
# decomment "#from systemd.journal import JournalHandler" up
#ljc = JournalHandler()
#formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

    logger.info("Trying to connect to "+ dbname)
    conn = psycopg2.connect(dbname)
    conn.autocommit = True
    logger.info("Connected to the database")
    logger.error("Unable to connect to database")

# Delete last block as it may be not imparted in full
cur = conn.cursor()
cur.execute('DELETE FROM public.ethtxs WHERE block = (SELECT Max(block) from public.ethtxs)')

# Wait for the node to be in sync before indexing
logger.info("Waiting Ethereum node to be in sync...")

while web3.eth.syncing != False:
    # Change with the time, in second, do you want to wait
    # before cheking again, default is 5 minutes

logger.info("Ethereum node is synced!")

# Adds all transactions from Ethereum block
def insertion(blockid, tr):
    time = web3.eth.getBlock(blockid)['timestamp']
    for x in range(0, tr):
        trans = web3.eth.getTransactionByBlock(blockid, x)
        # Save also transaction status, should be null if pre byzantium blocks
        status = bool(web3.eth.get_transaction_receipt(trans['hash']).status)
        txhash = trans['hash'].hex()
        value = trans['value']
        inputinfo = trans['input']
        # Check if transaction is a contract transfer
        if (value == 0 and not inputinfo.startswith('0xa9059cbb')):
        fr = trans['from']
        to = trans['to']
        gasprice = trans['gasPrice']
        gas = web3.eth.getTransactionReceipt(trans['hash'])['gasUsed']
        contract_to = ''
        contract_value = ''
        # Check if transaction is a contract transfer
        if inputinfo.startswith('0xa9059cbb'):
            contract_to = inputinfo[10:-64]
            contract_value = inputinfo[74:]
        # Correct contract transfer transaction represents '0x' + 4 bytes 'a9059cbb' + 32 bytes (64 chars) for contract address and 32 bytes for its value
        # Some buggy txs can break up Indexer, so we'll filter it
        if len(contract_to) > 128:
            logger.info('Skipping ' + str(txhash) + ' tx. Incorrect contract_to length: ' + str(len(contract_to)))
            contract_to = ''
            contract_value = ''
            'INSERT INTO public.ethtxs(time, txfrom, txto, value, gas, gasprice, block, txhash, contract_to, contract_value, status) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)',
            (time, fr, to, value, gas, gasprice, blockid, txhash, contract_to, contract_value, status))

# Fetch all of new (not in index) Ethereum blocks and add transactions to index
while True:
        conn = psycopg2.connect(dbname)
        conn.autocommit = True
        logger.error("Unable to connect to database")

    cur = conn.cursor()

    cur.execute('SELECT Max(block) from public.ethtxs')
    maxblockindb = cur.fetchone()[0]
    # On first start, we index transactions from a block number you indicate
    if maxblockindb is None:
        maxblockindb = int(startBlock)

    endblock = int(web3.eth.blockNumber) - int(confirmationBlocks)

    logger.info('Current best block in index: ' + str(maxblockindb) + '; in Ethereum chain: ' + str(endblock))

    for block in range(maxblockindb + 1, endblock):
        transactions = web3.eth.getBlockTransactionCount(block)
        if transactions > 0:
            insertion(block, transactions)
            logger.debug('Block ' + str(block) + ' does not contain transactions')



// SPDX-License-Identifier: MIT

pragma solidity 0.8.3;

 * @title An Ether or token balance scanner
 * @author Maarten Zuidhoorn
 * @author Luit Hollander
contract BalanceScanner {
  struct Result {
    bool success;
    bytes data;

   * @notice Get the Ether balance for all addresses specified
   * @param addresses The addresses to get the Ether balance for
   * @return results The Ether balance for all addresses in the same order as specified
  function etherBalances(address[] calldata addresses) external view returns (Result[] memory results) {
    results = new Result[](addresses.length);

    for (uint256 i = 0; i < addresses.length; i++) {
      results[i] = Result(true, abi.encode(addresses[i].balance));

   * @notice Get the ERC-20 token balance of `token` for all addresses specified
   * @dev This does not check if the `token` address specified is actually an ERC-20 token
   * @param addresses The addresses to get the token balance for
   * @param token The address of the ERC-20 token contract
   * @return results The token balance for all addresses in the same order as specified
  function tokenBalances(address[] calldata addresses, address token) external view returns (Result[] memory results) {
    results = new Result[](addresses.length);

    for (uint256 i = 0; i < addresses.length; i++) {
      bytes memory data = abi.encodeWithSignature("balanceOf(address)", addresses[i]);
      results[i] = staticCall(token, data, 20000);

   * @notice Get the ERC-20 token balance from multiple contracts for a single owner
   * @param owner The address of the token owner
   * @param contracts The addresses of the ERC-20 token contracts
   * @return results The token balances in the same order as the addresses specified
  function tokensBalance(address owner, address[] calldata contracts) external view returns (Result[] memory results) {
    results = new Result[](contracts.length);

    bytes memory data = abi.encodeWithSignature("balanceOf(address)", owner);
    for (uint256 i = 0; i < contracts.length; i++) {
      results[i] = staticCall(contracts[i], data, 20000);

   * @notice Call multiple contracts with the provided arbitrary data
   * @param contracts The contracts to call
   * @param data The data to call the contracts with
   * @return results The raw result of the contract calls
  function call(address[] calldata contracts, bytes[] calldata data) external view returns (Result[] memory results) {
    return call(contracts, data, gasleft());

   * @notice Call multiple contracts with the provided arbitrary data
   * @param contracts The contracts to call
   * @param data The data to call the contracts with
   * @param gas The amount of gas to call the contracts with
   * @return results The raw result of the contract calls
  function call(
    address[] calldata contracts,
    bytes[] calldata data,
    uint256 gas
  ) public view returns (Result[] memory results) {
    require(contracts.length == data.length, "Length must be equal");
    results = new Result[](contracts.length);

    for (uint256 i = 0; i < contracts.length; i++) {
      results[i] = staticCall(contracts[i], data[i], gas);

   * @notice Static call a contract with the provided data
   * @param target The address of the contract to call
   * @param data The data to call the contract with
   * @param gas The amount of gas to forward to the call
   * @return result The result of the contract call
  function staticCall(
    address target,
    bytes memory data,
    uint256 gas
  ) private view returns (Result memory) {
    uint256 size = codeSize(target);

    if (size > 0) {
      (bool success, bytes memory result) = target.staticcall{ gas: gas }(data);
      if (success) {
        return Result(success, result);

    return Result(false, "");

   * @notice Get code size of address
   * @param _address The address to get code size from
   * @return size Unsigned 256-bits integer
  function codeSize(address _address) private view returns (uint256 size) {
    // solhint-disable-next-line no-inline-assembly
    assembly {
      size := extcodesize(_address)













  1. 让订阅者订阅、取消订阅某类事件。
  2. 让发布者能够发布某个事件,并且把事件送到每个订阅者。






  1. 采用多对多结构:多个事件对多个订阅者。
  2. 采用推模式,把事件/消息推送给订阅者,就像信件一样,会被送到你的信箱,你在信箱里取信就行了。
  3. 是一个同步事件框架。这也是它的缺点所在,举个例子就是:邮递员要给小红、小明送信,只有信箱里的信被小红取走后,邮递员才去给小明送信,如果小红旅游去了无法取信,邮递员就一直等在小红家,而小明一直收不到信,小明很无辜无辜啊!


  1. 订阅和取消订阅。订阅通过函数TypeMux.Subscribe(),入参为要订阅的事件类型,会返回TypeMuxSubscription给订阅者,订阅者可通过此控制订阅,通过TypeMuxSubscription.Unsubscribe() 可以取消订阅。
  2. 发布事件和传递事件。TypeMux.Post(),入参为事件类型,根据订阅表找出该事件的订阅者列表,遍历列表,依次向每个订阅者传递事件,如果前一个没有传递完成进入阻塞,会导致后边的订阅者不能及时收到事件。



// A TypeMux dispatches events to registered receivers. Receivers can be
// registered to handle events of certain type. Any operation
// called after mux is stopped will return ErrMuxClosed.
// The zero value is ready to use.
// Deprecated: use Feed
// 本质:哈希列表,每个事件的订阅者都存到对于的列表里
type TypeMux struct {
    mutex   sync.RWMutex // 锁
    subm    map[reflect.Type][]*TypeMuxSubscription // 订阅表:所有事件类型的所有订阅者
    stopped bool


// Subscribe creates a subscription for events of the given types. The
// subscription's channel is closed when it is unsubscribed
// or the mux is closed.
// 订阅者只传入订阅的事件类型,然后TypeMux会返回给它一个订阅对象
func (mux *TypeMux) Subscribe(types ...interface{}) *TypeMuxSubscription {
    sub := newsub(mux)
    defer mux.mutex.Unlock()
    if mux.stopped {
        // set the status to closed so that calling Unsubscribe after this
        // call will short circuit.
        sub.closed = true
    } else {
        if mux.subm == nil {
            mux.subm = make(map[reflect.Type][]*TypeMuxSubscription)
        for _, t := range types {
            rtyp := reflect.TypeOf(t)
            // 在同一次订阅中,不要重复订阅同一个类型的事件
            oldsubs := mux.subm[rtyp]
            if find(oldsubs, sub) != -1 {
                panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp))
            subs := make([]*TypeMuxSubscription, len(oldsubs)+1)
            copy(subs, oldsubs)
            subs[len(oldsubs)] = sub
            mux.subm[rtyp] = subs
    return sub


func (s *TypeMuxSubscription) Unsubscribe() {


// Post sends an event to all receivers registered for the given type.
// It returns ErrMuxClosed if the mux has been stopped.
// 遍历map,找到所有订阅的人,向它们传递event,同一个event对象,非拷贝,运行在调用者goroutine
func (mux *TypeMux) Post(ev interface{}) error {
    event := &TypeMuxEvent{
        Time: time.Now(),
        Data: ev,
    rtyp := reflect.TypeOf(ev)
    if mux.stopped {
        return ErrMuxClosed
    subs := mux.subm[rtyp]
    for _, sub := range subs {
    return nil

func (s *TypeMuxSubscription) deliver(event *TypeMuxEvent) {
    // Short circuit delivery if stale event
    // 不发送过早(老)的消息
    if s.created.After(event.Time) {
    // Otherwise deliver the event
    defer s.postMu.RUnlock()

    select {
    case s.postC <- event:
    case <-s.closing:


func newsub(mux *TypeMux) *TypeMuxSubscription {
    c := make(chan *TypeMuxEvent) // 无缓冲通道,同步读写
    return &TypeMuxSubscription{
        mux:     mux,
        created: time.Now(),
        readC:   c,
        postC:   c,
        closing: make(chan struct{}),





  1. 公司要放中秋节,HR给所有同事都发了一封邮件,有些同事读了,有些同事没读,要到国庆节了HR又给所有同事发了一封邮件,这些邮件又进入到每个人的邮箱,不会因为任何一个人没有读邮件,导致剩下的同事收不到邮件。
  2. 你在朋友圈给朋友旅行的照片点了个赞,每当你们共同朋友点赞或者评论的时候,你都会收到提醒,无论你看没看这些提醒,这些提醒都会不断的发过来。
  3. 你微博关注了苍井空,苍井空发了个搞笑的视频,你刷微博的时候就收到了,但也有很多人根本没刷微博,你不会因为别人没有刷,你就收不到苍井空的动态。



  1. 订阅和取消订阅:Feed.Subscribe(),入参是一个通道,通常是有缓冲的,就算是无缓存也不会造成Feed阻塞,Feed会校验这个通道的类型和本Feed管理的事件类型是否一致,然后把通道保存下来,返回给订阅者一个Subscription,可以通过它取消订阅和读取通道错误。
  2. 发布事件和传递事件。Feed.Send()入参是一个事件,加锁确保本类型事件只有一个发送协程正在进行,然后校验事件类型是否匹配,Feed会尝试给每个订阅者发送事件,如果订阅者阻塞,Feed就继续尝试给下一个订阅者发送,直到给每个订阅者发送事件,返回发送该事件的数量。



// Feed implements one-to-many subscriptions where the carrier of events is a channel.
// Values sent to a Feed are delivered to all subscribed channels simultaneously.
// Feeds can only be used with a single type. The type is determined by the first Send or
// Subscribe operation. Subsequent calls to these methods panic if the type does not
// match.
// The zero value is ready to use.
// 一对多的事件订阅管理:每个feed对象,当别人调用send的时候,会发送给所有订阅者
// 每种事件类型都有一个自己的feed,一个feed内订阅的是同一种类型的事件,得用某个事件的feed才能订阅该事件
type Feed struct {
    once      sync.Once        // ensures that init only runs once
    sendLock  chan struct{}    // sendLock has a one-element buffer and is empty when held.It protects sendCases. 这个锁确保了只有一个协程在使用go routine
    removeSub chan interface{} // interrupts Send
    sendCases caseList         // the active set of select cases used by Send,订阅的channel列表,这些channel是活跃的

    // The inbox holds newly subscribed channels until they are added to sendCases.
    mu     sync.Mutex
    inbox  caseList // 不活跃的在这里
    etype  reflect.Type
    closed bool


// Subscribe adds a channel to the feed. Future sends will be delivered on the channel
// until the subscription is canceled. All channels added must have the same element type.
// The channel should have ample buffer space to avoid blocking other subscribers.
// Slow subscribers are not dropped.
// 订阅者传入接收事件的通道,feed将通道保存为case,然后返回给订阅者订阅对象
func (f *Feed) Subscribe(channel interface{}) Subscription {

    // 通道和通道类型检查
    chanval := reflect.ValueOf(channel)
    chantyp := chanval.Type()
    if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
    sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}

    defer f.mu.Unlock()
    if !f.typecheck(chantyp.Elem()) {
        panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})

    // 把通道保存到case
    // Add the select case to the inbox.
    // The next Send will add it to f.sendCases.
    cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
    f.inbox = append(f.inbox, cas)
    return sub


// Send delivers to all subscribed channels simultaneously.
// It returns the number of subscribers that the value was sent to.
// 同时向所有的订阅者发送事件,返回订阅者的数量
func (f *Feed) Send(value interface{}) (nsent int) {
    rvalue := reflect.ValueOf(value)

    <-f.sendLock // 获取发送锁

    // Add new cases from the inbox after taking the send lock.
    // 从inbox加入到sendCases,不能订阅的时候直接加入到sendCases,因为可能其他协程在调用发送
    f.sendCases = append(f.sendCases, f.inbox...)
    f.inbox = nil

    // 类型检查:如果该feed不是要发送的值的类型,释放锁,并且执行panic
    if !f.typecheck(rvalue.Type()) {
        f.sendLock <- struct{}{}
        panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})

    // Set the sent value on all channels.
    // 把发送的值关联到每个case/channel,每一个事件都有一个feed,所以这里全是同一个事件的
    for i := firstSubSendCase; i < len(f.sendCases); i++ {
        f.sendCases[i].Send = rvalue

    // Send until all channels except removeSub have been chosen. 'cases' tracks a prefix
    // of sendCases. When a send succeeds, the corresponding case moves to the end of
    // 'cases' and it shrinks by one element.
    // 所有case仍然保留在sendCases,只是用过的会移动到最后面
    cases := f.sendCases
    for {
        // Fast path: try sending without blocking before adding to the select set.
        // This should usually succeed if subscribers are fast enough and have free
        // buffer space.
        // 使用非阻塞式发送,如果不能发送就及时返回
        for i := firstSubSendCase; i < len(cases); i++ {
            // 如果发送成功,把这个case移动到末尾,所以i这个位置就是没处理过的,然后大小减1
            if cases[i].Chan.TrySend(rvalue) {
                cases = cases.deactivate(i)

        // 如果这个地方成立,代表所有订阅者都不阻塞,都发送完了
        if len(cases) == firstSubSendCase {

        // Select on all the receivers, waiting for them to unblock.
        // 返回一个可用的,直到不阻塞。
        chosen, recv, _ := reflect.Select(cases)
        if chosen == 0 /* <-f.removeSub */ {
            // 这个接收方要删除了,删除并缩小sendCases
            index := f.sendCases.find(recv.Interface())
            f.sendCases = f.sendCases.delete(index)
            if index >= 0 && index < len(cases) {
                // Shrink 'cases' too because the removed case was still active.
                cases = f.sendCases[:len(cases)-1]
        } else {
            // reflect已经确保数据已经发送,无需再尝试发送
            cases = cases.deactivate(chosen)

    // 把sendCases中的send都标记为空
    // Forget about the sent value and hand off the send lock.
    for i := firstSubSendCase; i < len(f.sendCases); i++ {
        f.sendCases[i].Send = reflect.Value{}
    f.sendLock <- struct{}{}
    return nsent
