## Overview The [[Node|Cellframe Node]] extensions provide tools for interacting with [[Network|networks]], [[Chain|chains]], [[Ledger|ledgers]], and [[Mempool|mempool]]. These extensions allow you to retrieve network data, manage states, handle [[Data element (Datum)|datums]] and [[Atom|atoms]], process [[5. Transactions Structure|transactions]], and enable notifications for various events. More details about networks can be found in the [[Architecture Overview|architecture overview]]. ### Basic Functionality This example demonstrates how to use these Python extensions to: - Retrieve network information using the `Net` class. - Process atoms and datums from a chain. - Interact with the ledger to get transaction and emission information. Below is the complete code for these operations: ```python from CellFrame.Network import Net from DAP.Core import logIt from CellFrame.Consensus import DAG, Block from CellFrame.Common import DatumEmission from CellFrame.Chain import ChainAtomPtr, Chain from itertools import islice, chain as itertools_chain net_name = "riemann" net = Net.byName(net_name) NUM_OF_DATUMS_ATOMS = 5 def net_info(net) -> None: logIt.notice(f"The Network {str(net)} data:") logIt.notice(f"My addr: {net.getCurAddr()}") # Since net.id represents NetID instance, # use method "long" to see a value. logIt.notice(f"ID: {str(net.id.long)}") logIt.notice(f"Chains: {[str(chain) for chain in net.chains]}") logIt.notice(f"GDB group alias: {net.gdb_group_alias}") # The fee data can also be output using the command: # cellframe-node-cli net -net <name of network> get fee # If network is offline, the following values will be # shown as zeros, or nones. logIt.notice(f"Transaction fee: {net.txFee}") logIt.notice(f"Transaction fee address: {net.txFeeAddr}") logIt.notice(f"The average validator fee: {net.validatorAverageFee}") logIt.notice(f"The maximum validator fee: {net.validatorMaxFee}") logIt.notice(f"The minimum validator fee: {net.validatorMinFee}") logIt.notice(f"The native ticker: {net.nativeTicker}") def get_atom_hash(atom: ChainAtomPtr): # To get atom hash transorm it to `Block` or `DAG` object, # using `fromAtom` method: try: block = Block.fromAtom(atom) event = DAG.fromAtom(atom) if block: hash = str(block.hash) elif event: hash = str(event.hash) else: raise ValueError("Not found block or event from atom") return hash except ValueError as e: logIt.error(str(e)) def get_first_n_atoms(chain: Chain, n: int): # Create an atom iterator. # Setting with_treshold to False means the iterator will # not include thresholds, which are temporary storage for # events that do not yet meet the criteria to be included # in the main list. atom_iter = chain.createAtomIter(False) def atoms_generator(): # Get the first atom atom, size = chain.atomIterGetFirst(atom_iter) while atom and size: yield atom, size # Get the next atom atom, size = chain.atomIterGetNext(atom_iter) # Use islice to get the first n atoms. # For more detais: https://docs.python.org/3/library/itertools first_n_atoms = list(islice(atoms_generator(), n)) if not first_n_atoms: logIt.notice("No atoms found.") return [] return first_n_atoms def get_first_n_datums(chain, n): # Get the first n atoms atoms = get_first_n_atoms(chain, n) # Extract datums from atoms and combine them into a single iterator all_datums = itertools_chain.from_iterable(chain.atomGetDatums(atom) for atom, size in atoms) # Use islice to get the first n datums first_n_datums = list(islice(all_datums, n)) if not first_n_datums: logIt.notice("No transactions found.") return [] return first_n_datums def get_first_n_transactions(chain, n): transactions = [] # Get the first n atoms atoms = get_first_n_atoms(chain, n) # Extract datums from atoms and combine them into a single iterator all_datums = itertools_chain.from_iterable(chain.atomGetDatums(atom) for atom, size in atoms) # Use islice to get the first n datums datums = list(all_datums) if not datums: logIt.notice("No transactions found.") return [] else: for datum in datums: if datum.isDatumTX() and len(transactions) <= n: datum_tx = datum.getDatumTX() transactions.append(datum_tx) return transactions def get_datum_emission(chain) -> DatumEmission | None: # Get the first 100 atoms atoms = get_first_n_atoms(chain, 100) # Extract datums from atoms and combine them into a single iterator all_datums = itertools_chain.from_iterable(chain.atomGetDatums(atom) for atom, size in atoms) # Use islice to get the first n datums datums = list(all_datums) if not datums: return None for datum in datums: if datum.isDatumTokenEmission(): return datum.getDatumTokenEmission() # DatumEmission return None def get_transaction_info(tx): hash = tx.hash date = tx.dateCreated logIt.notice(f"Transaction with hash: {hash}, created at {date}") return def chain_info(chain, num) -> None: logIt.notice(f"The Chain {str(chain)} data") logIt.notice(f"Consensus type: {chain.getCSName()}") logIt.notice(f"Numder of atoms in chain: {chain.countAtom()}") # There are two ways to get atoms: # The first is to use method getAtoms(count: int, page: int, reverse: bool). # Count parameter - specifies the number of atoms per page. # Page - the number of page from which the atoms will be received. # Reverse - the order of the atoms output. # All arguments must be posotional, not keywords. logIt.notice(f"First {num} atoms:") logIt.notice(f"Next {num} atoms received by `getAtoms` method") for atom in chain.getAtoms(num, 1, False): logIt.notice(f"Atom hash: {get_atom_hash(atom)}") # The second way is to use iterations across all atoms. # Look the implementation in the get_first_n_atoms function. logIt.notice(f"Next {num} atoms received by using iterations") for atom, size in get_first_n_atoms(chain, num): logIt.notice(f"Atom hash: {get_atom_hash(atom)}, size: {size}") logIt.notice(f"First {num} datums:") for datum in get_first_n_datums(chain, num): logIt.notice(f"Datum hash: {datum.hash}") logIt.notice(f"First {num} transactions:") for tx in get_first_n_transactions(chain, num): get_transaction_info(tx) def ledger_info(chain, ledger, num): logIt.notice("The Ledger data") transactions = ledger.getTransactions(3, 1, False, True) logIt.notice("Transactions from Ledger") if not transactions: logIt.notice("No transactions found.") return [] for tx in transactions[:num]: get_transaction_info(tx) ledger_tx_ticker = ledger.txGetMainTickerAndLedgerRc(tx)[0] ledger_tx_responce = ledger.txGetMainTickerAndLedgerRc(tx)[1] logIt.notice(f"Ticker: {ledger_tx_ticker}, Ledger responce: {ledger_tx_responce}") one_tx = transactions[0] ticker = ledger.txGetMainTickerAndLedgerRc(one_tx)[0] pkeys_hashes = ledger.tokenAuthPkeysHashes(ticker) # list[HashFast] signs_valid = ledger.tokenAuthSignsValid(ticker) # int auth_signs_total = ledger.tokenAuthSignsTotal(ticker) # int # Retrieve information about specified transaction from ledger logIt.notice(f"Check for signature signs for {tx.hash}") logIt.notice(f"The number of valid signature signs: {signs_valid}") logIt.notice(f"The number of total signature signs: {auth_signs_total}") logIt.notice("The hashes of public keys for signing ticker emission:") for hash in pkeys_hashes: logIt.notice(f"{hash}") emission = get_datum_emission(chain) if emission is None: logIt.notice("No emission found") else: logIt.notice(f"Emission with hash: {emission.hash}") if ledger.has_emission(emission.hash): logIt.notice("exists in the ledger") else: logIt.notice("does not exist in the ledger") def init(): net_name = "riemann" raiden_net = Net.byName(net_name) # It is possible to get Ledger in a several ways: # Use getLedger() method for the specified Net instance: raiden_ledger = raiden_net.getLedger() # Or use static method ledgerByNetName: # foobar_ledger = Net.ledgerByNetName("foobar") main_chain = raiden_net.getChainByName("main") zero_chain = raiden_net.getChainByName("zerochain") NUM_OF_DATUMS_ATOMS = 5 net_info(raiden_net) chain_info(main_chain, NUM_OF_DATUMS_ATOMS) chain_info(zero_chain, NUM_OF_DATUMS_ATOMS) ledger_info(main_chain, raiden_ledger, NUM_OF_DATUMS_ATOMS) return 0 ```