This post will explain you how one can get data out of Hive by using rxjs. The end result will be a live stream of hive operations. This tutorial requires a basic understanding of Javascript.
Installing dependencies
npm install rxjs @hivechain/dhive --save
Setting up the client
Before we can get any data out of hive we need to setup a client.
import { Client } from '@hivechain/dhive'
const client = new Client('https://api.hive.blog/')
With this client we can interact with the database. For instance if we want to get latest information about a block we can call:
client.database.getDynamicProperties()
The above function will return a promise which we can call .then on to get the actual result.
client.database.getDynamicProperties().then(result => {
console.log(result)
})
Introducing rxjs in the mix
RxJS is a library to perform reactive programming. It's a functional approach for dealing with events and works around the concept of Observables. That's in my opinion why RxJS is a super interesting approach to work with Hive.
Wouldn't it be cool if we could just do:
posts$.subscribe(post => {
console.log('new post', post)
})
The above subscribe will be the end goal of this tutorial 😎.
First observable
RxJS works by utilizing operators through an observable stream. Our observable stream will be created by using the from observable creator.
Why from? If you look at the docs you'll see there are many operators to create observables. We need an operator to create an observable from a promise. from is capable of doing just that.
const blocks$ = from(client.database.getDynamicGlobalProperties())
The above code does nothing, yet. As long as there are no subscribers NO code will be executed. This is a difference compared with promises!
Once we subscribe to the above Observable it will do the actual call to hive.
const blocks$ = from(client.database.getDynamicGlobalProperties())
blocks$.subscribe(result => {
console.log('first result from the blockchain, yey', result)
})
Repeating behavior
The above example worked great, but... we need to repeat this behavior endlessly. There are many ways to repeat code. While loop? Recursive function? ...
In pseudo code we want to solve our problems as follows:
fn getData()
data = hive.getData() # get actual data
process(data) # process the data
sleep(1000) # wait for one second
getData() # repeat - recursive
It turns out there is an RxJS operator we can use for recursive operations
const source$ = of(0).pipe(expand(val => of(val + 1)))
source$.subscribe(value => console.log(value))
The above function will log endlessly incremental values. Here we also introduced an operator called pipe. All observables can be chained with other operations. Here we use expand to recursively call itself.
Back to our example. We can update our code and use the expand code to recursively call hive.
const blocks$ = from(client.database.getDynamicGlobalProperties())
.pipe(
expand(() => from(client.database.getDynamicGlobalProperties()))
)
blocks$.subscribe(block => console.log(block))
Be careful with the above code, this will execute a new request once the previous one is finished without delay. Delay is something we need, or we will blow up our CPU 😉. I believe a reasonable time for Hive is a 1 second delay. We can apply the delay in our expand function.
const blocks$ = from(client.database.getDynamicGlobalProperties())
.pipe(
expand(() => from(client.database.getDynamicGlobalProperties())
.pipe(
delay(1000)
)
)
)
blocks$.subscribe(block => console.log(block))
By now we have a recursive function which calls itself every second. Next we want to make sure getDynamicGlobalProperties() actually returned something before proceeding. We can do this by filtering our stream and make sure it's not undefined.
const blocks$ = from(client.database.getDynamicGlobalProperties())
.pipe(
expand(() => from(client.database.getDynamicGlobalProperties())
.pipe(
delay(1000)
)
),
filter(property => property !== undefined)
)
blocks$.subscribe(block => console.log(block.head_block_number))
If you run the above code you'll see something like:
44881990
44881990
44881991
44881991
44881991
44881992
44881992
...
As you can see we emit every value we receive, what we really want is:
44881990
44881991
44881992
We don't want to emit double values in our event stream. We want the distinct values. To achieve this we can use the distinctUntilChanged operator by comparing the previous block number with the current block number.
const blocks$ = from(client.database.getDynamicGlobalProperties())
.pipe(
expand(() => from(client.database.getDynamicGlobalProperties())
.pipe(
delay(1000)
)
),
filter(property => property !== undefined),
distinctUntilChanged((prev, cur) => prev.head_block_number === cur.head_block_number),
)
blocks$.subscribe(block => console.log(block.head_block_number))
The property information is not what we really want. What we want is Post or Comment information! Hive has a function which we can call to get block information from the blockchain.
client.database.getBlock(blockNumber)
The above function also returns a promise. We already learned that we can convert a promise to an observable by using the from operator.
But.. we cannot simply add this to our event stream:
const blocks$ = from(client.database.getDynamicGlobalProperties())
.pipe(
expand(() => from(client.database.getDynamicGlobalProperties())
.pipe(
delay(1000)
)
),
filter(property => property !== undefined),
distinctUntilChanged((prev, cur) => prev.head_block_number === cur.head_block_number),
from(
client.database.getBlock(blockNumber)
)
)
blocks$.subscribe(block => console.log(block.head_block_number))
This will not work, we need to get the actual value from the inner observable. There are ways to solve this in RxJS, one is by using the switchMap operator. This is in general the most safe solution since this will complete the previous inner observable.
const blocks$ = from(client.database.getDynamicGlobalProperties())
.pipe(
expand(() => from(client.database.getDynamicGlobalProperties())
.pipe(
delay(1000)
)
),
filter(property => property !== undefined),
distinctUntilChanged((prev, cur) => prev.head_block_number === cur.head_block_number),
switchMap(block => from(client.database.getBlock(block.head_block_number))),
)
blocks$.subscribe(block => console.log(block.head_block_number))
Also here we want to make sure there are no undefined blocks. We can filter out the undefined values.
const blocks$ = from(client.database.getDynamicGlobalProperties())
.pipe(
expand(() => from(client.database.getDynamicGlobalProperties())
.pipe(
delay(1000)
)
),
filter(property => property !== undefined),
distinctUntilChanged((prev, cur) => prev.head_block_number === cur.head_block_number),
switchMap(block => from(client.database.getBlock(block.head_block_number))),
filter(x => x !== undefined),
)
blocks$.subscribe(block => console.log(block.head_block_number))
As last (for the blocks$) we want to make sure we share this stream. This is particularly important if we'd introduce side effects. Share will make sure these side effects are only executed once regardless of the amount of subscriptions.
const blocks$ = from(client.database.getDynamicGlobalProperties())
.pipe(
expand(() => from(client.database.getDynamicGlobalProperties())
.pipe(
delay(1000)
)
),
filter(property => property !== undefined),
distinctUntilChanged((prev, cur) => prev.head_block_number === cur.head_block_number),
switchMap(block => from(client.database.getBlock(block.head_block_number))),
filter(x => x !== undefined),
share()
)
blocks$.subscribe(block => console.log(block.head_block_number))
So, in 14 lines of code. We've managed to get relevant data out of the hive blockchain. Of course this is not yet production ready. This does not cover error handling or the fact that we missed a block.
The following code snippets expand on the blocks$ stream.
Operations
Emits all operations as a single event stream
const operations$ = blocks$.pipe(
map(x => x.transactions),
concatAll(), // flatten transactions and emit as stream
map(transaction => transaction.operations),
concatAll() // flatten operations and emit as stream
)
Comments & posts
const filterType = (type: Operation[0]) => (operation: Operation) => operation[0] === type
const comments$ = operations$.pipe(
filter(filterType('comment')),
map(x => x['1']), // get property '1'
filter(x => x.parent_author !== '')
)
const posts$ = operations$.pipe(
filter(filterType('comment')),
map(x => x['1']),
filter(x => x.parent_author === '')
)
posts$.subscribe(post => {
console.log('new post posted on hive', post)
})
posts$.subscribe(comment => {
console.log('new comment posted on a post', comment)
})
Final full code
import { from } from 'rxjs'
import { delay, expand, share, filter, distinctUntilChanged, switchMap, map, concatAll } from 'rxjs/operators'
import { Operation } from '@hivechain/dhive'
import { client } from './client'
const blocks$ = from(client.database.getDynamicGlobalProperties())
.pipe(
expand(() => from(client.database.getDynamicGlobalProperties()).pipe(delay(1000))),
filter(x => x !== undefined),
distinctUntilChanged((prev, cur) => prev.head_block_number === cur.head_block_number),
switchMap(block => from(client.database.getBlock(block.head_block_number))),
filter(x => x !== undefined),
share()
)
const operations$ = blocks$.pipe(
map(x => x.transactions),
concatAll(),
map(transaction => transaction.operations),
concatAll()
)
const filterType = (type: Operation[0]) => (operation: Operation) => operation[0] === type
const comments$ = operations$.pipe(
filter(filterType('comment')),
map(x => x['1']),
filter(x => x.parent_author !== '')
)
const posts$ = operations$.pipe(
filter(filterType('comment')),
map(x => x['1'])
)
export function start () {
comments$.subscribe((result) => {
console.log('comment received')
console.log(result)
console.log('end comment received')
})
posts$.subscribe((result) => {
console.log('post received')
console.log(result)
console.log('end post received')
})
console.log('Press any key to exit')
process.stdin.setRawMode(true)
process.stdin.resume()
process.stdin.on('data', process.exit.bind(process, 0))
}
start()