Skip to content

WarpMQTT2Api API Documentation

mqtt_client

Created on 2025-05-09

@author: wf

MqttClient

MQTT client

Source code in warp/mqtt_client.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class MqttClient:
    """MQTT client"""

    def __init__(self, mqtt_config: MqttConfig, callback):
        """Initialize with configurations"""
        self.mqtt_config = mqtt_config
        self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.callback = callback

    def on_connect(self, client, userdata, flags, rc, properties=None):
        """Connection callback"""
        if userdata or flags or properties:
            pass

        if rc == 0:
            logger.info(f"Connected to MQTT broker at {self.mqtt_config.mqtt_broker}")
            client.subscribe(self.mqtt_config.mqtt_topic)
            logger.info(f"Subscribed to {self.mqtt_config.mqtt_topic}")
        else:
            logger.error(f"Failed to connect, return code {rc}")

    def on_message(self, client, userdata, msg):
        """Message callback"""
        if client or userdata:
            pass
        if isinstance(msg, mqtt.MQTTMessage):
            logger.debug(f"Message received: {msg.topic}")
            self.callback(msg)

    def run(self):
        """Run the client loop"""
        # Set up auth if needed
        if self.mqtt_config.mqtt_username and self.mqtt_config.mqtt_password:
            self.client.username_pw_set(
                self.mqtt_config.mqtt_username, self.mqtt_config.mqtt_password
            )

        # Connect to broker
        try:
            self.client.connect(
                self.mqtt_config.mqtt_broker, self.mqtt_config.mqtt_port
            )
        except Exception as e:
            logger.error(f"Connection failed: {e}")
            return False

        # Start loop
        try:
            logger.info("Starting MQTT loop")
            self.client.loop_forever()
        except KeyboardInterrupt:
            logger.info("Stopping client")
        except Exception as e:
            logger.error(f"Error in loop: {e}")
        finally:
            self.client.disconnect()

        return True

__init__(mqtt_config, callback)

Initialize with configurations

Source code in warp/mqtt_client.py
18
19
20
21
22
23
24
def __init__(self, mqtt_config: MqttConfig, callback):
    """Initialize with configurations"""
    self.mqtt_config = mqtt_config
    self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
    self.client.on_connect = self.on_connect
    self.client.on_message = self.on_message
    self.callback = callback

on_connect(client, userdata, flags, rc, properties=None)

Connection callback

Source code in warp/mqtt_client.py
26
27
28
29
30
31
32
33
34
35
36
def on_connect(self, client, userdata, flags, rc, properties=None):
    """Connection callback"""
    if userdata or flags or properties:
        pass

    if rc == 0:
        logger.info(f"Connected to MQTT broker at {self.mqtt_config.mqtt_broker}")
        client.subscribe(self.mqtt_config.mqtt_topic)
        logger.info(f"Subscribed to {self.mqtt_config.mqtt_topic}")
    else:
        logger.error(f"Failed to connect, return code {rc}")

on_message(client, userdata, msg)

Message callback

Source code in warp/mqtt_client.py
38
39
40
41
42
43
44
def on_message(self, client, userdata, msg):
    """Message callback"""
    if client or userdata:
        pass
    if isinstance(msg, mqtt.MQTTMessage):
        logger.debug(f"Message received: {msg.topic}")
        self.callback(msg)

run()

Run the client loop

Source code in warp/mqtt_client.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def run(self):
    """Run the client loop"""
    # Set up auth if needed
    if self.mqtt_config.mqtt_username and self.mqtt_config.mqtt_password:
        self.client.username_pw_set(
            self.mqtt_config.mqtt_username, self.mqtt_config.mqtt_password
        )

    # Connect to broker
    try:
        self.client.connect(
            self.mqtt_config.mqtt_broker, self.mqtt_config.mqtt_port
        )
    except Exception as e:
        logger.error(f"Connection failed: {e}")
        return False

    # Start loop
    try:
        logger.info("Starting MQTT loop")
        self.client.loop_forever()
    except KeyboardInterrupt:
        logger.info("Stopping client")
    except Exception as e:
        logger.error(f"Error in loop: {e}")
    finally:
        self.client.disconnect()

    return True

mqtt_config

Created on 09.05.2025

@author: wf

MqttConfig

Configuration for the MQTT to Warp3 middleware

Source code in warp/mqtt_config.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@lod_storable
class MqttConfig:
    """Configuration for the MQTT to Warp3 middleware"""

    mqtt_broker: str = "mqtt.bitplan.com"
    mqtt_port: int = 1883
    mqtt_topic: str = "tele/data"
    mqtt_username: Optional[str] = None
    mqtt_password: Optional[str] = None
    dry_run: bool = False

    @classmethod
    def ofArgs(cls, args: Namespace = None):
        """
        Create a configuration from command line arguments.

        Args:
            args: Optional list of command line arguments. If None, sys.argv is used.

        Returns:
            MqttConfig: Configuration object
        """
        if args is None:
            config = cls()
        else:
            config = cls(
                mqtt_broker=args.mqtt_broker,
                mqtt_port=args.mqtt_port,
                mqtt_topic=args.mqtt_topic,
                mqtt_username=args.mqtt_username,
                mqtt_password=args.mqtt_password,
                dry_run=args.dry_run,
            )
        return config

    @classmethod
    def addArgs(cls, parser):
        """
        Add command line arguments for MqttConfig to the given parser.

        Args:
            parser: The argument parser to add arguments to
        """
        parser.add_argument(
            "--mqtt-broker", help="MQTT broker address", default=cls.mqtt_broker
        )
        parser.add_argument(
            "--mqtt-port", type=int, help="MQTT broker port", default=cls.mqtt_port
        )
        parser.add_argument(
            "--mqtt-topic", help="MQTT topic to subscribe to", default=cls.mqtt_topic
        )
        parser.add_argument(
            "--mqtt-username", help="MQTT username", default=cls.mqtt_username
        )
        parser.add_argument(
            "--mqtt-password", help="MQTT password", default=cls.mqtt_password
        )
        parser.add_argument(
            "--dry-run", action="store_true", help="Run without updating the wallbox"
        )

    @classmethod
    def ofYaml(cls, yaml_path):
        config = cls.load_from_yaml_file(yaml_path)
        return config

addArgs(parser) classmethod

Add command line arguments for MqttConfig to the given parser.

Parameters:

Name Type Description Default
parser

The argument parser to add arguments to

required
Source code in warp/mqtt_config.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@classmethod
def addArgs(cls, parser):
    """
    Add command line arguments for MqttConfig to the given parser.

    Args:
        parser: The argument parser to add arguments to
    """
    parser.add_argument(
        "--mqtt-broker", help="MQTT broker address", default=cls.mqtt_broker
    )
    parser.add_argument(
        "--mqtt-port", type=int, help="MQTT broker port", default=cls.mqtt_port
    )
    parser.add_argument(
        "--mqtt-topic", help="MQTT topic to subscribe to", default=cls.mqtt_topic
    )
    parser.add_argument(
        "--mqtt-username", help="MQTT username", default=cls.mqtt_username
    )
    parser.add_argument(
        "--mqtt-password", help="MQTT password", default=cls.mqtt_password
    )
    parser.add_argument(
        "--dry-run", action="store_true", help="Run without updating the wallbox"
    )

ofArgs(args=None) classmethod

Create a configuration from command line arguments.

Parameters:

Name Type Description Default
args Namespace

Optional list of command line arguments. If None, sys.argv is used.

None

Returns:

Name Type Description
MqttConfig

Configuration object

Source code in warp/mqtt_config.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@classmethod
def ofArgs(cls, args: Namespace = None):
    """
    Create a configuration from command line arguments.

    Args:
        args: Optional list of command line arguments. If None, sys.argv is used.

    Returns:
        MqttConfig: Configuration object
    """
    if args is None:
        config = cls()
    else:
        config = cls(
            mqtt_broker=args.mqtt_broker,
            mqtt_port=args.mqtt_port,
            mqtt_topic=args.mqtt_topic,
            mqtt_username=args.mqtt_username,
            mqtt_password=args.mqtt_password,
            dry_run=args.dry_run,
        )
    return config

warp3

Created on 2025-05-09

@author: wf

MeterReading dataclass

Source code in warp/warp3.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@dataclass
class MeterReading:
    kWh_in: float
    kWh_out: float
    time_stamp: str

    @property
    def time(self) -> datetime:
        """Convert timestamp string to datetime object"""
        return datetime.strptime(self.time_stamp, "%Y-%m-%dT%H:%M:%S")

    def active_power(self, prev: "MeterReading") -> float:
        """
        calculate the active power
        """
        # Time difference in hours
        time_delta = (self.time - prev.time).total_seconds() / 3600
        # Energy change (kWh)
        energy_delta = (self.kWh_in - prev.kWh_in) - (self.kWh_out - prev.kWh_out)

        # Power in watts
        active_power = (energy_delta * 1000) / time_delta
        return active_power

time: datetime property

Convert timestamp string to datetime object

active_power(prev)

calculate the active power

Source code in warp/warp3.py
33
34
35
36
37
38
39
40
41
42
43
44
def active_power(self, prev: "MeterReading") -> float:
    """
    calculate the active power
    """
    # Time difference in hours
    time_delta = (self.time - prev.time).total_seconds() / 3600
    # Energy change (kWh)
    energy_delta = (self.kWh_in - prev.kWh_in) - (self.kWh_out - prev.kWh_out)

    # Power in watts
    active_power = (energy_delta * 1000) / time_delta
    return active_power

PowerMeter

Active power meter for Warp3 Wallbox

Source code in warp/warp3.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
class PowerMeter:
    """Active power meter for Warp3 Wallbox"""

    def __init__(self):
        """
        constructor
        """
        self.logger = logging.getLogger(__name__)
        self.warp3_api = None
        self._timer = None
        self._latest_active_power = None
        self._active_power_lock = threading.Lock()

    def setup_logging(self, debug=False):
        level = logging.DEBUG if debug else logging.INFO
        logging.basicConfig(
            level=level,
            format="%(asctime)s - %(levelname)s - %(message)s",
            handlers=[logging.StreamHandler(sys.stdout)],
        )
        self.logger = logging.getLogger(__name__)

    def schedule_update(self):
        """Schedule next update using a timer"""
        self._timer = threading.Timer(self.wallbox_config.update_interval, self.send_update)
        self._timer.daemon = True
        self._timer.start()

    def check_warp3_availability(self) -> bool:
        """
        Check availability of Warp3 API by verifying firmware version and meter configuration.

        Returns:
            True if both firmware info and meter config are available, else False.
        """
        # Check version
        version_info = self.warp3_api.get_version()
        if not version_info:
            self.logger.error("❌ Cannot connect to Warp3 API")
            available = False
        else:
            firmware = version_info.get("firmware", "unknown")
            self.logger.info(f"✅ Connected to Warp3 - Firmware version: {firmware}")

            # Check meter
            meter_id = self.wallbox_config.meter_id
            meter_config = self.warp3_api.get_meter_config(meter_id)
            if not meter_config:
                self.logger.error(f"❌ Meter {meter_id} not available")
                available = False
            else:
                description = self.warp3_api.describe_meter(meter_config[1])
                self.logger.info(f"✅ {description}")
                available = True

        return available

    def handle_message(self, msg):
        """Handle incoming MQTT message"""
        try:
            payload = json.loads(msg.payload.decode())
            active_power = self.wallbox_config.calcPower(payload)
            if active_power:
                self.update_wallbox(active_power)
        except json.JSONDecodeError as jde:
            self.logger.error(f"JSON decode error: {str(jde)}")
        except Exception as e:
            self.logger.error(f"Error handling message: {str(e)}")

    def update_wallbox(self, power_value):
        """Send power value to wallbox"""
        self.logger.info(f"Power value: {power_value}W")
        with self._active_power_lock:
            self._latest_active_power = power_value

    def send_update(self):
        """
        Send latest stored power value to wallbox
        this is called regularly as per the scheduled interval
        """
        with self._active_power_lock:
            power_value = self._latest_active_power
        if power_value is not None:
            self.warp3_api.update_meter(power_value, self.wallbox_config.meter_id)
        #re-schedule
        self.schedule_update()

    def start(self):
        """Start the power meter"""
        # Set up logging
        self.setup_logging(self.args.debug)

        # Initialize Warp3 API
        self.warp3_api = Warp3Api(self.wallbox_config.wallbox_host)

        # Log configuration
        self.logger.info("Starting MQTT to Warp3 middleware")
        self.logger.info(f"MQTT broker: {self.mqtt_config.mqtt_broker}")
        self.logger.info(f"MQTT topic: {self.mqtt_config.mqtt_topic}")
        self.logger.info(f"Wallbox host: {self.wallbox_config.wallbox_host}")
        self.logger.info(f"Power tag: {self.wallbox_config.power_tag}")
        self.logger.info(f"Meter ID: {self.wallbox_config.meter_id}")

        # Check API availability
        if not self.check_warp3_availability():
            self.logger.error("Cannot connect to Warp3 - exiting")
            sys.exit(1)

        if not self.mqtt_config.dry_run:
            self.schedule_update()

        # Create and run client
        client = MqttClient(self.mqtt_config, callback=self.handle_message)
        client.run()

    def maininstance(self):
        """Main instance setup and execution"""
        # Parse arguments
        self.parser = argparse.ArgumentParser(
            description="MQTT to Warp3 Wallbox Middleware"
        )
        self.parser.add_argument(
            "--config-path", help="Path to YAML configuration file"
        )
        MqttConfig.addArgs(self.parser)
        WallboxConfig.addArgs(self.parser)
        self.parser.add_argument(
            "--debug", action="store_true", help="Enable debug logging"
        )
        self.args = self.parser.parse_args()

        # Create configurations
        if self.args.config_path:
            self.logger = logging.getLogger(__name__)
            self.logger.info(f"Loading configuration from {self.args.config_path}")
            try:
                self.mqtt_config = MqttConfig.ofYaml(self.args.config_path)
                self.wallbox_config = WallboxConfig.ofYaml(self.args.config_path)
            except Exception as e:
                self.logger.error(f"Failed to load configuration from YAML: {e}")
                sys.exit(1)
        else:
            self.mqtt_config = MqttConfig.ofArgs(self.args)
            self.wallbox_config = WallboxConfig.ofArgs(self.args)

        self.start()

__init__()

constructor

Source code in warp/warp3.py
182
183
184
185
186
187
188
189
190
def __init__(self):
    """
    constructor
    """
    self.logger = logging.getLogger(__name__)
    self.warp3_api = None
    self._timer = None
    self._latest_active_power = None
    self._active_power_lock = threading.Lock()

check_warp3_availability()

Check availability of Warp3 API by verifying firmware version and meter configuration.

Returns:

Type Description
bool

True if both firmware info and meter config are available, else False.

Source code in warp/warp3.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def check_warp3_availability(self) -> bool:
    """
    Check availability of Warp3 API by verifying firmware version and meter configuration.

    Returns:
        True if both firmware info and meter config are available, else False.
    """
    # Check version
    version_info = self.warp3_api.get_version()
    if not version_info:
        self.logger.error("❌ Cannot connect to Warp3 API")
        available = False
    else:
        firmware = version_info.get("firmware", "unknown")
        self.logger.info(f"✅ Connected to Warp3 - Firmware version: {firmware}")

        # Check meter
        meter_id = self.wallbox_config.meter_id
        meter_config = self.warp3_api.get_meter_config(meter_id)
        if not meter_config:
            self.logger.error(f"❌ Meter {meter_id} not available")
            available = False
        else:
            description = self.warp3_api.describe_meter(meter_config[1])
            self.logger.info(f"✅ {description}")
            available = True

    return available

handle_message(msg)

Handle incoming MQTT message

Source code in warp/warp3.py
236
237
238
239
240
241
242
243
244
245
246
def handle_message(self, msg):
    """Handle incoming MQTT message"""
    try:
        payload = json.loads(msg.payload.decode())
        active_power = self.wallbox_config.calcPower(payload)
        if active_power:
            self.update_wallbox(active_power)
    except json.JSONDecodeError as jde:
        self.logger.error(f"JSON decode error: {str(jde)}")
    except Exception as e:
        self.logger.error(f"Error handling message: {str(e)}")

maininstance()

Main instance setup and execution

Source code in warp/warp3.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def maininstance(self):
    """Main instance setup and execution"""
    # Parse arguments
    self.parser = argparse.ArgumentParser(
        description="MQTT to Warp3 Wallbox Middleware"
    )
    self.parser.add_argument(
        "--config-path", help="Path to YAML configuration file"
    )
    MqttConfig.addArgs(self.parser)
    WallboxConfig.addArgs(self.parser)
    self.parser.add_argument(
        "--debug", action="store_true", help="Enable debug logging"
    )
    self.args = self.parser.parse_args()

    # Create configurations
    if self.args.config_path:
        self.logger = logging.getLogger(__name__)
        self.logger.info(f"Loading configuration from {self.args.config_path}")
        try:
            self.mqtt_config = MqttConfig.ofYaml(self.args.config_path)
            self.wallbox_config = WallboxConfig.ofYaml(self.args.config_path)
        except Exception as e:
            self.logger.error(f"Failed to load configuration from YAML: {e}")
            sys.exit(1)
    else:
        self.mqtt_config = MqttConfig.ofArgs(self.args)
        self.wallbox_config = WallboxConfig.ofArgs(self.args)

    self.start()

schedule_update()

Schedule next update using a timer

Source code in warp/warp3.py
201
202
203
204
205
def schedule_update(self):
    """Schedule next update using a timer"""
    self._timer = threading.Timer(self.wallbox_config.update_interval, self.send_update)
    self._timer.daemon = True
    self._timer.start()

send_update()

Send latest stored power value to wallbox this is called regularly as per the scheduled interval

Source code in warp/warp3.py
254
255
256
257
258
259
260
261
262
263
264
def send_update(self):
    """
    Send latest stored power value to wallbox
    this is called regularly as per the scheduled interval
    """
    with self._active_power_lock:
        power_value = self._latest_active_power
    if power_value is not None:
        self.warp3_api.update_meter(power_value, self.wallbox_config.meter_id)
    #re-schedule
    self.schedule_update()

start()

Start the power meter

Source code in warp/warp3.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def start(self):
    """Start the power meter"""
    # Set up logging
    self.setup_logging(self.args.debug)

    # Initialize Warp3 API
    self.warp3_api = Warp3Api(self.wallbox_config.wallbox_host)

    # Log configuration
    self.logger.info("Starting MQTT to Warp3 middleware")
    self.logger.info(f"MQTT broker: {self.mqtt_config.mqtt_broker}")
    self.logger.info(f"MQTT topic: {self.mqtt_config.mqtt_topic}")
    self.logger.info(f"Wallbox host: {self.wallbox_config.wallbox_host}")
    self.logger.info(f"Power tag: {self.wallbox_config.power_tag}")
    self.logger.info(f"Meter ID: {self.wallbox_config.meter_id}")

    # Check API availability
    if not self.check_warp3_availability():
        self.logger.error("Cannot connect to Warp3 - exiting")
        sys.exit(1)

    if not self.mqtt_config.dry_run:
        self.schedule_update()

    # Create and run client
    client = MqttClient(self.mqtt_config, callback=self.handle_message)
    client.run()

update_wallbox(power_value)

Send power value to wallbox

Source code in warp/warp3.py
248
249
250
251
252
def update_wallbox(self, power_value):
    """Send power value to wallbox"""
    self.logger.info(f"Power value: {power_value}W")
    with self._active_power_lock:
        self._latest_active_power = power_value

WallboxConfig

Configuration for the Warp3 wallbox

Source code in warp/warp3.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
@lod_storable
class WallboxConfig:
    """Configuration for the Warp3 wallbox"""

    wallbox_host: str = "http://warp3.mydomain"
    # example Tasmota reading
    power_tag: str = "eHZ"        # json tag for the payload content
    power_field: str = "Power2"   # active power (always positive)
    in_field: str = "E_in"        # field for energy input
    out_field: str = "E_out"      # field for energy output
    time_field: str = "Time"      # field for timestamp
    meter_id: int = 2             # id of the meter configured
    update_interval:float = 1.0   # how often the API should fed


    @classmethod
    def ofArgs(cls, args: Namespace = None):
        """
        Create a configuration from command line arguments.

        Args:
            args: Optional list of command line arguments. If None, sys.argv is used.

        Returns:
            WallboxConfig: Configuration object
        """
        if args is None:
            config = cls()
        else:
            config = cls(
                wallbox_host=args.wallbox_host,
                power_tag=args.power_tag,
                power_field=args.power_field,
                in_field=args.in_field,
                out_field=args.out_field,
                time_field=args.time_field,
                meter_id=args.meter_id,
                update_interval=args.update_interval,
            )
        return config

    @classmethod
    def addArgs(cls, parser):
        """
        Add command line arguments for WallboxConfig to the given parser.

        Args:
            parser: The argument parser to add arguments to
        """
        parser.add_argument(
            "--wallbox-host", help="Wallbox host URL", default=cls.wallbox_host
        )
        parser.add_argument(
            "--power-tag",
            help="Tag in MQTT data containing power information",
            default=cls.power_tag,
        )
        parser.add_argument(
            "--power-field",
            help="Field name in MQTT data containing active power value",
            default=cls.power_field,
        )
        parser.add_argument(
            "--in-field",
            help="Field name in MQTT data containing energy input",
            default=cls.in_field,
        )
        parser.add_argument(
            "--out-field",
            help="Field name in MQTT data containing energy output",
            default=cls.out_field,
        )
        parser.add_argument(
            "--time-field",
            help="Field name in MQTT data containing timestamp",
            default=cls.time_field,
        )
        parser.add_argument(
            "--meter-id", type=int, help="Meter ID to use", default=cls.meter_id
        )
        parser.add_argument(
            "--update-interval",
            type=float,
            help="Minimum update interval in seconds",
            default=cls.update_interval,
        )

    @classmethod
    def ofYaml(cls, yaml_path):
        config = cls.load_from_yaml_file(yaml_path)
        return config

    def calcPower(self, payload) -> float:
        """
        Calculate power from payload using MeterReading class.

        Args:
            payload: The decoded JSON payload from MQTT message

        Returns:
            float: Calculated power in watts
        """
        # get the timestamp
        timestamp_str = payload.get(self.time_field)
        # Get the data from the payload
        data = payload.get(self.power_tag, {})

        # Extract values
        e_in = data.get(self.in_field)
        e_out = data.get(self.out_field)
        power_magnitude = data.get(self.power_field)

        # Create current reading
        current = MeterReading(kWh_in=e_in, kWh_out=e_out, time_stamp=timestamp_str)
        # If we don't have a previous reading, store this one and return fallback power
        if not hasattr(self, "_last_reading"):
            active_power = None
        else:
            # calc power roughly from meter reading
            active_power = current.active_power(self._last_reading)
            # if we have a precise power magnitude we will use it:
            if power_magnitude:
                delta_in = e_in - self._last_reading.kWh_in
                delta_out = e_out - self._last_reading.kWh_out
                sign = 1 if delta_in >= delta_out else -1
                active_power=sign*power_magnitude
            active_power=round(active_power)

        # Update stored reading
        self._last_reading = current
        return active_power

addArgs(parser) classmethod

Add command line arguments for WallboxConfig to the given parser.

Parameters:

Name Type Description Default
parser

The argument parser to add arguments to

required
Source code in warp/warp3.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
@classmethod
def addArgs(cls, parser):
    """
    Add command line arguments for WallboxConfig to the given parser.

    Args:
        parser: The argument parser to add arguments to
    """
    parser.add_argument(
        "--wallbox-host", help="Wallbox host URL", default=cls.wallbox_host
    )
    parser.add_argument(
        "--power-tag",
        help="Tag in MQTT data containing power information",
        default=cls.power_tag,
    )
    parser.add_argument(
        "--power-field",
        help="Field name in MQTT data containing active power value",
        default=cls.power_field,
    )
    parser.add_argument(
        "--in-field",
        help="Field name in MQTT data containing energy input",
        default=cls.in_field,
    )
    parser.add_argument(
        "--out-field",
        help="Field name in MQTT data containing energy output",
        default=cls.out_field,
    )
    parser.add_argument(
        "--time-field",
        help="Field name in MQTT data containing timestamp",
        default=cls.time_field,
    )
    parser.add_argument(
        "--meter-id", type=int, help="Meter ID to use", default=cls.meter_id
    )
    parser.add_argument(
        "--update-interval",
        type=float,
        help="Minimum update interval in seconds",
        default=cls.update_interval,
    )

calcPower(payload)

Calculate power from payload using MeterReading class.

Parameters:

Name Type Description Default
payload

The decoded JSON payload from MQTT message

required

Returns:

Name Type Description
float float

Calculated power in watts

Source code in warp/warp3.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def calcPower(self, payload) -> float:
    """
    Calculate power from payload using MeterReading class.

    Args:
        payload: The decoded JSON payload from MQTT message

    Returns:
        float: Calculated power in watts
    """
    # get the timestamp
    timestamp_str = payload.get(self.time_field)
    # Get the data from the payload
    data = payload.get(self.power_tag, {})

    # Extract values
    e_in = data.get(self.in_field)
    e_out = data.get(self.out_field)
    power_magnitude = data.get(self.power_field)

    # Create current reading
    current = MeterReading(kWh_in=e_in, kWh_out=e_out, time_stamp=timestamp_str)
    # If we don't have a previous reading, store this one and return fallback power
    if not hasattr(self, "_last_reading"):
        active_power = None
    else:
        # calc power roughly from meter reading
        active_power = current.active_power(self._last_reading)
        # if we have a precise power magnitude we will use it:
        if power_magnitude:
            delta_in = e_in - self._last_reading.kWh_in
            delta_out = e_out - self._last_reading.kWh_out
            sign = 1 if delta_in >= delta_out else -1
            active_power=sign*power_magnitude
        active_power=round(active_power)

    # Update stored reading
    self._last_reading = current
    return active_power

ofArgs(args=None) classmethod

Create a configuration from command line arguments.

Parameters:

Name Type Description Default
args Namespace

Optional list of command line arguments. If None, sys.argv is used.

None

Returns:

Name Type Description
WallboxConfig

Configuration object

Source code in warp/warp3.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@classmethod
def ofArgs(cls, args: Namespace = None):
    """
    Create a configuration from command line arguments.

    Args:
        args: Optional list of command line arguments. If None, sys.argv is used.

    Returns:
        WallboxConfig: Configuration object
    """
    if args is None:
        config = cls()
    else:
        config = cls(
            wallbox_host=args.wallbox_host,
            power_tag=args.power_tag,
            power_field=args.power_field,
            in_field=args.in_field,
            out_field=args.out_field,
            time_field=args.time_field,
            meter_id=args.meter_id,
            update_interval=args.update_interval,
        )
    return config

main()

Main entry point

Source code in warp/warp3.py
327
328
329
330
def main():
    """Main entry point"""
    pm = PowerMeter()
    pm.maininstance()

warp3_api

Created on 2025-05-09

@author: wf

Warp3Api

API client for TinkerForge/Warp3 Wallbox

Source code in warp/warp3_api.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class Warp3Api:
    """API client for TinkerForge/Warp3 Wallbox"""

    def __init__(self, host):
        """Initialize with wallbox host"""
        self.host = host.rstrip("/")
        self.logger = logging.getLogger(__name__)
        self.meter_values={}

    def api_get(self, cmd):
        """
        Call the wallbox API with the given command and filter the JSON result

        Args:
            cmd: API command


        Returns:
            API response
        """
        api_response = None
        try:
            http_response = requests.get(f"{self.host}/{cmd}")
            http_response.raise_for_status()
            api_response = http_response.json()
        except Exception as e:
            self.logger.error(f"API GET error: {e}")
        return api_response

    def get_version(self):
        """Get wallbox firmware version"""
        version_info = self.api_get("info/version")
        return version_info

    def get_meter_config(self, meter_id=1):
        """Get meter configuration"""
        meter_config = self.api_get(f"meters/{meter_id}/config")
        return meter_config

    def update_meter(self, value, meter_id=1):
        """
        Update meter value

        Args:
            value: Power value in Watts
            meter_id: Meter ID (default 1)

        Returns:
            True if successful, False otherwise
        """
        update_success = False
        try:
            url = f"{self.host}/meters/{meter_id}/update"
            http_response = requests.post(url, data=f"[{value}]")
            if http_response.status_code == 200 and not http_response.text:

                prev=self.meter_values.get(meter_id)
                if not prev or prev!=value:
                    msg=f"✅ {value} Watt set for meter {meter_id}"
                    self.logger.info(msg)
                self.meter_values[meter_id]=value
                update_success = True
            else:
                self.logger.error(f"❌ Failed to update: {http_response.text}")
        except Exception as e:
            self.logger.error(f"Error updating meter: {e}")
        return update_success

    def describe_meter(self, meter: dict) -> str:
        """
        Describe the meter configuration using value_id explanations.

        Args:
            meter: The meter configuration dictionary.

        Returns:
            A human-readable description string.
        """
        name = meter.get('display_name', 'Unknown')
        location = meter.get('location', 'N/A')
        value_ids = meter.get('value_ids', [])
        values_explained = ', '.join(
            f"{vid}: {self.explain_value_id(vid)}" for vid in value_ids
        )
        description = f"Meter '{name}' at location {location} measures: {values_explained}"
        return description

    def explain_value_id(self,value_id: int) -> str:
        explanations = {
            1: "Spannung L1-N",
            2: "Spannung L2-N",
            3: "Spannung L3-N",
            4: "Spannung L1-L2",
            5: "Spannung L2-L3",
            6: "Spannung L3-L1",
            7: "Durchschnittliche Phasenspannung",
            8: "Durchschnitt Spannung L1-L2, L2-L3, L3-L1",
            13: "Strom (Bezug + Einspeisung)",
            17: "Strom (Bezug + Einspeisung)",
            21: "Strom (Bezug + Einspeisung)",
            25: "Neutralleiterstrom",
            29: "Durchschnitt der Phasenströme",
            33: "Summe der Phasenströme",
            39: "Wirkleistung (Bezug - Einspeisung)",
            48: "Wirkleistung (Bezug - Einspeisung)",
            57: "Wirkleistung (Bezug - Einspeisung)",
            74: "Summe der Phasenwirkleistungen (Bezug - Einspeisung)",
            83: "Blindleistung (induktiv - kapazitiv)",
            91: "Blindleistung (induktiv - kapazitiv)",
            99: "Blindleistung (induktiv - kapazitiv)",
            115: "Summe der Phasenblindleistungen",
            122: "Scheinleistung (Bezug + Einspeisung)",
            130: "Scheinleistung (Bezug + Einspeisung)",
            138: "Scheinleistung (Bezug + Einspeisung)",
            154: "Summe der Phasenscheinleistungen",
            209: "Wirkenergie Bezug (seit Herstellung)",
            210: "Wirkenergie Bezug (seit letztem Zurücksetzen)",
            211: "Wirkenergie Einspeisung (seit Herstellung)",
            212: "Wirkenergie Einspeisung (seit letztem Zurücksetzen)",
            213: "Wirkenergie Bezug + Einspeisung (seit Herstellung)",
            214: "Wirkenergie Bezug + Einspeisung (seit letztem Zurücksetzen)",
            277: "Blindenergie induktiv + kapazitiv (seit Herstellung)",
            353: "Leistungsfaktor (gerichtet)",
            354: "Leistungsfaktor (gerichtet)",
            355: "Leistungsfaktor (gerichtet)",
            356: "Summe der gerichteten Leistungsfaktoren",
            364: "Netzfrequenz",
        }
        return explanations.get(value_id, "Unknown value_id")

__init__(host)

Initialize with wallbox host

Source code in warp/warp3_api.py
15
16
17
18
19
def __init__(self, host):
    """Initialize with wallbox host"""
    self.host = host.rstrip("/")
    self.logger = logging.getLogger(__name__)
    self.meter_values={}

api_get(cmd)

Call the wallbox API with the given command and filter the JSON result

Parameters:

Name Type Description Default
cmd

API command

required

Returns:

Type Description

API response

Source code in warp/warp3_api.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def api_get(self, cmd):
    """
    Call the wallbox API with the given command and filter the JSON result

    Args:
        cmd: API command


    Returns:
        API response
    """
    api_response = None
    try:
        http_response = requests.get(f"{self.host}/{cmd}")
        http_response.raise_for_status()
        api_response = http_response.json()
    except Exception as e:
        self.logger.error(f"API GET error: {e}")
    return api_response

describe_meter(meter)

Describe the meter configuration using value_id explanations.

Parameters:

Name Type Description Default
meter dict

The meter configuration dictionary.

required

Returns:

Type Description
str

A human-readable description string.

Source code in warp/warp3_api.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def describe_meter(self, meter: dict) -> str:
    """
    Describe the meter configuration using value_id explanations.

    Args:
        meter: The meter configuration dictionary.

    Returns:
        A human-readable description string.
    """
    name = meter.get('display_name', 'Unknown')
    location = meter.get('location', 'N/A')
    value_ids = meter.get('value_ids', [])
    values_explained = ', '.join(
        f"{vid}: {self.explain_value_id(vid)}" for vid in value_ids
    )
    description = f"Meter '{name}' at location {location} measures: {values_explained}"
    return description

get_meter_config(meter_id=1)

Get meter configuration

Source code in warp/warp3_api.py
46
47
48
49
def get_meter_config(self, meter_id=1):
    """Get meter configuration"""
    meter_config = self.api_get(f"meters/{meter_id}/config")
    return meter_config

get_version()

Get wallbox firmware version

Source code in warp/warp3_api.py
41
42
43
44
def get_version(self):
    """Get wallbox firmware version"""
    version_info = self.api_get("info/version")
    return version_info

update_meter(value, meter_id=1)

Update meter value

Parameters:

Name Type Description Default
value

Power value in Watts

required
meter_id

Meter ID (default 1)

1

Returns:

Type Description

True if successful, False otherwise

Source code in warp/warp3_api.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def update_meter(self, value, meter_id=1):
    """
    Update meter value

    Args:
        value: Power value in Watts
        meter_id: Meter ID (default 1)

    Returns:
        True if successful, False otherwise
    """
    update_success = False
    try:
        url = f"{self.host}/meters/{meter_id}/update"
        http_response = requests.post(url, data=f"[{value}]")
        if http_response.status_code == 200 and not http_response.text:

            prev=self.meter_values.get(meter_id)
            if not prev or prev!=value:
                msg=f"✅ {value} Watt set for meter {meter_id}"
                self.logger.info(msg)
            self.meter_values[meter_id]=value
            update_success = True
        else:
            self.logger.error(f"❌ Failed to update: {http_response.text}")
    except Exception as e:
        self.logger.error(f"Error updating meter: {e}")
    return update_success