## Overview
The [[Node|Cellframe Node]] extensions provide tools for interacting with [[Network|networks]], [[Chain|chains]], [[Ledger|ledgers]], and [[Mempool|mempool]]. These extensions allow you to retrieve network data, manage states, handle [[Data element (Datum)|datums]] and [[Atom|atoms]], process [[5. Transactions Structure|transactions]], and enable notifications for various events. More details about networks can be found in the [[Architecture Overview|architecture overview]].
### Basic Functionality
This example demonstrates how to use these Python extensions to:
- Retrieve network information using the `Net` class.
- Process atoms and datums from a chain.
- Interact with the ledger to get transaction and emission information.
Below is the complete code for these operations:
```python
from CellFrame.Network import Net
from DAP.Core import logIt
from CellFrame.Consensus import DAG, Block
from CellFrame.Common import DatumEmission
from CellFrame.Chain import ChainAtomPtr, Chain
from itertools import islice, chain as itertools_chain
net_name = "riemann"
net = Net.byName(net_name)
NUM_OF_DATUMS_ATOMS = 5
def net_info(net) -> None:
logIt.notice(f"The Network {str(net)} data:")
logIt.notice(f"My addr: {net.getCurAddr()}")
# Since net.id represents NetID instance,
# use method "long" to see a value.
logIt.notice(f"ID: {str(net.id.long)}")
logIt.notice(f"Chains: {[str(chain) for chain in net.chains]}")
logIt.notice(f"GDB group alias: {net.gdb_group_alias}")
# The fee data can also be output using the command:
# cellframe-node-cli net -net <name of network> get fee
# If network is offline, the following values will be
# shown as zeros, or nones.
logIt.notice(f"Transaction fee: {net.txFee}")
logIt.notice(f"Transaction fee address: {net.txFeeAddr}")
logIt.notice(f"The average validator fee: {net.validatorAverageFee}")
logIt.notice(f"The maximum validator fee: {net.validatorMaxFee}")
logIt.notice(f"The minimum validator fee: {net.validatorMinFee}")
logIt.notice(f"The native ticker: {net.nativeTicker}")
def get_atom_hash(atom: ChainAtomPtr):
# To get atom hash transorm it to `Block` or `DAG` object,
# using `fromAtom` method:
try:
block = Block.fromAtom(atom)
event = DAG.fromAtom(atom)
if block:
hash = str(block.hash)
elif event:
hash = str(event.hash)
else:
raise ValueError("Not found block or event from atom")
return hash
except ValueError as e:
logIt.error(str(e))
def get_first_n_atoms(chain: Chain, n: int):
# Create an atom iterator.
# Setting with_treshold to False means the iterator will
# not include thresholds, which are temporary storage for
# events that do not yet meet the criteria to be included
# in the main list.
atom_iter = chain.createAtomIter(False)
def atoms_generator():
# Get the first atom
atom, size = chain.atomIterGetFirst(atom_iter)
while atom and size:
yield atom, size
# Get the next atom
atom, size = chain.atomIterGetNext(atom_iter)
# Use islice to get the first n atoms.
# For more detais: https://docs.python.org/3/library/itertools
first_n_atoms = list(islice(atoms_generator(), n))
if not first_n_atoms:
logIt.notice("No atoms found.")
return []
return first_n_atoms
def get_first_n_datums(chain, n):
# Get the first n atoms
atoms = get_first_n_atoms(chain, n)
# Extract datums from atoms and combine them into a single iterator
all_datums = itertools_chain.from_iterable(chain.atomGetDatums(atom) for atom, size in atoms)
# Use islice to get the first n datums
first_n_datums = list(islice(all_datums, n))
if not first_n_datums:
logIt.notice("No transactions found.")
return []
return first_n_datums
def get_first_n_transactions(chain, n):
transactions = []
# Get the first n atoms
atoms = get_first_n_atoms(chain, n)
# Extract datums from atoms and combine them into a single iterator
all_datums = itertools_chain.from_iterable(chain.atomGetDatums(atom) for atom, size in atoms)
# Use islice to get the first n datums
datums = list(all_datums)
if not datums:
logIt.notice("No transactions found.")
return []
else:
for datum in datums:
if datum.isDatumTX() and len(transactions) <= n:
datum_tx = datum.getDatumTX()
transactions.append(datum_tx)
return transactions
def get_datum_emission(chain) -> DatumEmission | None:
# Get the first 100 atoms
atoms = get_first_n_atoms(chain, 100)
# Extract datums from atoms and combine them into a single iterator
all_datums = itertools_chain.from_iterable(chain.atomGetDatums(atom) for atom, size in atoms)
# Use islice to get the first n datums
datums = list(all_datums)
if not datums:
return None
for datum in datums:
if datum.isDatumTokenEmission():
return datum.getDatumTokenEmission() # DatumEmission
return None
def get_transaction_info(tx):
hash = tx.hash
date = tx.dateCreated
logIt.notice(f"Transaction with hash: {hash}, created at {date}")
return
def chain_info(chain, num) -> None:
logIt.notice(f"The Chain {str(chain)} data")
logIt.notice(f"Consensus type: {chain.getCSName()}")
logIt.notice(f"Numder of atoms in chain: {chain.countAtom()}")
# There are two ways to get atoms:
# The first is to use method getAtoms(count: int, page: int, reverse: bool).
# Count parameter - specifies the number of atoms per page.
# Page - the number of page from which the atoms will be received.
# Reverse - the order of the atoms output.
# All arguments must be posotional, not keywords.
logIt.notice(f"First {num} atoms:")
logIt.notice(f"Next {num} atoms received by `getAtoms` method")
for atom in chain.getAtoms(num, 1, False):
logIt.notice(f"Atom hash: {get_atom_hash(atom)}")
# The second way is to use iterations across all atoms.
# Look the implementation in the get_first_n_atoms function.
logIt.notice(f"Next {num} atoms received by using iterations")
for atom, size in get_first_n_atoms(chain, num):
logIt.notice(f"Atom hash: {get_atom_hash(atom)}, size: {size}")
logIt.notice(f"First {num} datums:")
for datum in get_first_n_datums(chain, num):
logIt.notice(f"Datum hash: {datum.hash}")
logIt.notice(f"First {num} transactions:")
for tx in get_first_n_transactions(chain, num):
get_transaction_info(tx)
def ledger_info(chain, ledger, num):
logIt.notice("The Ledger data")
transactions = ledger.getTransactions(3, 1, False, True)
logIt.notice("Transactions from Ledger")
if not transactions:
logIt.notice("No transactions found.")
return []
for tx in transactions[:num]:
get_transaction_info(tx)
ledger_tx_ticker = ledger.txGetMainTickerAndLedgerRc(tx)[0]
ledger_tx_responce = ledger.txGetMainTickerAndLedgerRc(tx)[1]
logIt.notice(f"Ticker: {ledger_tx_ticker}, Ledger responce: {ledger_tx_responce}")
one_tx = transactions[0]
ticker = ledger.txGetMainTickerAndLedgerRc(one_tx)[0]
pkeys_hashes = ledger.tokenAuthPkeysHashes(ticker) # list[HashFast]
signs_valid = ledger.tokenAuthSignsValid(ticker) # int
auth_signs_total = ledger.tokenAuthSignsTotal(ticker) # int
# Retrieve information about specified transaction from ledger
logIt.notice(f"Check for signature signs for {tx.hash}")
logIt.notice(f"The number of valid signature signs: {signs_valid}")
logIt.notice(f"The number of total signature signs: {auth_signs_total}")
logIt.notice("The hashes of public keys for signing ticker emission:")
for hash in pkeys_hashes:
logIt.notice(f"{hash}")
emission = get_datum_emission(chain)
if emission is None:
logIt.notice("No emission found")
else:
logIt.notice(f"Emission with hash: {emission.hash}")
if ledger.has_emission(emission.hash):
logIt.notice("exists in the ledger")
else:
logIt.notice("does not exist in the ledger")
def init():
net_name = "riemann"
raiden_net = Net.byName(net_name)
# It is possible to get Ledger in a several ways:
# Use getLedger() method for the specified Net instance:
raiden_ledger = raiden_net.getLedger()
# Or use static method ledgerByNetName:
# foobar_ledger = Net.ledgerByNetName("foobar")
main_chain = raiden_net.getChainByName("main")
zero_chain = raiden_net.getChainByName("zerochain")
NUM_OF_DATUMS_ATOMS = 5
net_info(raiden_net)
chain_info(main_chain, NUM_OF_DATUMS_ATOMS)
chain_info(zero_chain, NUM_OF_DATUMS_ATOMS)
ledger_info(main_chain, raiden_ledger, NUM_OF_DATUMS_ATOMS)
return 0
```