"""Pymodbus Server Payload Example.
This example shows how to initialize a server with a
complicated memory layout using builder.
"""
import asyncio
import logging
from pymodbus.constants import Endian
from pymodbus.datastore import (
ModbusSequentialDataBlock,
ModbusServerContext,
ModbusSlaveContext,
)
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.payload import BinaryPayloadBuilder
from pymodbus.server import StartAsyncTcpServer
# --------------------------------------------------------------------------- #
# import the various server implementations
# --------------------------------------------------------------------------- #
from pymodbus.version import version
# set logging level for library.
logging.getLogger().setLevel(logging.DEBUG)
async def run_payload_server():
"""Run payload server."""
# ----------------------------------------------------------------------- #
# build your payload
# ----------------------------------------------------------------------- #
builder = BinaryPayloadBuilder(byteorder=Endian.Little, wordorder=Endian.Little)
builder.add_string("abcdefgh")
builder.add_bits([0, 1, 0, 1, 1, 0, 1, 0])
builder.add_8bit_int(-0x12)
builder.add_8bit_uint(0x12)
builder.add_16bit_int(-0x5678)
builder.add_16bit_uint(0x1234)
builder.add_32bit_int(-0x1234)
builder.add_32bit_uint(0x12345678)
builder.add_16bit_float(12.34)
builder.add_16bit_float(-12.34)
builder.add_32bit_float(22.34)
builder.add_32bit_float(-22.34)
builder.add_64bit_int(-0xDEADBEEF)
builder.add_64bit_uint(0x12345678DEADBEEF)
builder.add_64bit_uint(0xDEADBEEFDEADBEED)
builder.add_64bit_float(123.45)
builder.add_64bit_float(-123.45)
# ----------------------------------------------------------------------- #
# use that payload in the data store
# Here we use the same reference block for each underlying store.
# ----------------------------------------------------------------------- #
block = ModbusSequentialDataBlock(1, builder.to_registers())
store = ModbusSlaveContext(di=block, co=block, hr=block, ir=block)
context = ModbusServerContext(slaves=store, single=True)
# ----------------------------------------------------------------------- #
# initialize the server information
# If you don't set this or any fields, they are defaulted to empty strings.
# ----------------------------------------------------------------------- #
identity = ModbusDeviceIdentification(
info_name={
"VendorName": "Pymodbus",
"ProductCode": "PM",
"VendorUrl": "https://github.com/riptideio/pymodbus/",
"ProductName": "Pymodbus Server",
"ModelName": "Pymodbus Server",
"MajorMinorRevision": version.short(),
}
)
server = await StartAsyncTcpServer(
context,
identity=identity,
address=("0.0.0.0", 5020),
allow_reuse_address=True,
defer_start=True,
)
asyncio.get_event_loop().call_later(20, lambda: server.serve_forever)
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(run_payload_server())