Communication

Description

Module: geocompy.communication

Implementations of connection methods.

Functions

  • get_dummy_logger

  • open_serial

  • open_socket

  • crc16_bitwise

  • crc16_bytewise

Constants

  • DUMMYLOGGER

Types

  • Connection

  • SerialConnection

  • SocketConnection

Definitions

class Connection[source]

Interface definition for connection implementations.

abstractmethod close() None[source]
abstractmethod exchange(cmd: str) str[source]
abstractmethod exchange_binary(data: bytes) bytes[source]
abstractmethod is_open() bool[source]
abstractmethod receive() str[source]
abstractmethod receive_binary() bytes[source]
abstractmethod reset() None[source]
abstractmethod send(message: str) None[source]
abstractmethod send_binary(data: bytes) None[source]
class SerialConnection(port: Serial, *, eom: str = '\r\n', eoa: str = '\r\n', sync_after_timeout: bool = False, logger: Logger | None = None)[source]

Connection wrapping an open serial port.

The passed serial port should be configured and opened in advance. Take care to set the approriate speed (baud), data bits, timeout etc. For most instruments a 9600 speed setting, 8 data + 1 stop bits is correct. A suitable timeout for total stations might be 15 seconds. (A too short timeout may result in unexpected errors when waiting for a slower, motorized function.)

Examples

Setting up a basic serial connection:

>>> from serial import Serial
>>> port = Serial("COM4", timeout=15)
>>> conn = gc.communication.SerialConnection(port)
>>> # message exchanges
>>> conn.close()

Using as a context manager:

>>> from serial import Serial
>>> port = Serial("COM4", timeout=15)
>>> with gc.communication.SerialConnection(port) as conn:
...     # message exchanges
>>>
>>> port.is_open()
False
>>> # port is automatically closed when the context is exited
Parameters:
port: Serial

Serial port to communicate on.

eom: str = '\r\n'

EndOfMessage sequence, by default "\r\n"

eoa: str = '\r\n'

EndOfAnswer sequence, by default "\r\n"

sync_after_timeout: bool = False

Attempt to re-sync the message-response que, if a timeout occured in the previous exchange, by default False

logger: Logger | None = None

Logger instance to use to log connection related events. Defaults to a dummy logger when not specified, by default None

Notes

If the serial port is not already open, the opening will be attempted. This might raise an exception if the port cannot be opened.

Warning

The que syncing is attempted by repeatedly reading from the receiving buffer, as many times as a timeout was previously detected. This can only solve issues, if the connection target was just slow, and not completely unresponsive. If the target became truly unresponsive, but came back online later, the sync attempt can cause further problems. Use with caution!

(Timeouts should be avoided when possible, use a temporary override on exchanges that are expected to finish in a longer time.)

close() None[source]

Closes the serial port.

exchange(cmd: str) str[source]

Writes a message to the serial line, and receives the corresponding response.

Parameters:
cmd: str

Message to send.

Returns:

Response to the sent message

Return type:

str

Raises:
  • ConnectionError – If the serial port is not open.

  • TimeoutError – If the connection timed out before receiving the EndOfAnswer sequence for one of the responses.

exchange_binary(data: bytes) bytes[source]

Writes a block of data to the serial line, and receives the corresponding response.

Parameters:
data: bytes

Message to send.

Returns:

Response to the sent data

Return type:

bytes

Raises:
  • ConnectionError – If the serial port is not open.

  • TimeoutError – If the connection timed out before receiving the EndOfAnswer sequence for one of the responses.

is_open() bool[source]

Checks if the serial port is currently open.

Returns:

State of the port.

Return type:

bool

receive() str[source]

Reads a single message from the serial line.

Returns:

Received message.

Return type:

str

Raises:
  • ConnectionError – If the serial port is not open.

  • TimeoutError – If the connection timed out before receiving the EndOfAnswer sequence.

receive_binary() bytes[source]

Reads a single binary data block from the serial line.

Returns:

Received data.

Return type:

bytes

Raises:
  • ConnectionError – If the serial port is not open.

  • TimeoutError – If the connection timed out before receiving the EndOfAnswer sequence.

reset() None[source]

Resets the connection by clearing the incoming and outgoing buffers, and resetting the internal state. This could be used to recover from a desync (possibly caused by a timeout).

Warning

Trying to recover after repeated timeouts with a hard reset can cause further issues, if the reset is attempted while responses were finally being received. It is recommended to wait some time after the last command was sent, before resetting.

send(message: str) None[source]

Writes a single message to the serial line.

Parameters:
message: str

Message to send.

Raises:

ConnectionError – If the serial port is not open.

send_binary(data: bytes) None[source]

Writes a single message to the serial line.

Parameters:
data: bytes

Data to send.

Raises:

ConnectionError – If the serial port is not open.

timeout_override(timeout: int | None) Generator[None, None, None][source]

Context manager that temporarily overrides connection parameters.

Parameters:
timeout: int | None

Temporary timeout in seconds. Set to None to wait indefinitely.

Returns:

Context manager generator object.

Return type:

Generator

Warning

An indefinite timeout might leave the connection in a perpetual waiting state, if the instrument became unresponsive in the mean time (e.g. it powered off due to low battery charge).

Example

>>> from serial import Serial
>>> from geocompy.communication import SerialConnection
>>>
>>> port = Serial("COM1", timeout=5)
>>> with SerialConnection(port) as com:
...     # normal operation
...
...     # potentially long operation
...     with com.timeout_override(20):
...         answer = com.exchange("message")
...
...     # continue normal operation
...
class SocketConnection(sock: socket, *, eom: str = '\r\n', eoa: str = '\r\n', sync_after_timeout: bool = False, logger: Logger | None = None)[source]

Connection wrapping an open socket connection.

The passed socket should be configured and opened in advance. The socket can use any protocol, that the target instrument supports. A suitable timeout for total stations might be 15 seconds. (A too short timeout may result in unexpected errors when waiting for a slower, motorized function.)

Examples

Setting up a basic TCP connection:

>>> import socket
>>> soc = socket.socket(
>>>     socket.AF_INET,
>>>     socket.SOCK_STREAM,
>>>     socket.IPPROTO_TCP
>>> )
>>> soc.connect(("192.168.0.1", 1212))
>>> conn = SocketConnection(soc)
>>> # message exchanges
>>> conn.close()

Using as a context manager:

>>> import socket
>>> soc = socket.socket(
>>>     socket.AF_INET,
>>>     socket.SOCK_STREAM,
>>>     socket.IPPROTO_TCP
>>> )
>>> soc.connect(("192.168.0.1", 1212))
>>> with SocketConnection(soc) as conn:
...     # message exchanges
>>>
>>> port.is_open()
False
>>> # port is automatically closed when the context is exited
Parameters:
sock: socket

Socket communicate on.

eom: str = '\r\n'

EndOfMessage sequence, by default "\r\n"

eoa: str = '\r\n'

EndOfAnswer sequence, by default "\r\n"

sync_after_timeout: bool = False

Attempt to re-sync the message-response que, if a timeout occured in the previous exchange, by default False

logger: Logger | None = None

Logger instance to use to log connection related events. Defaults to a dummy logger when not specified, by default None

Warning

The que syncing is attempted by repeatedly reading from the receiving buffer, as many times as a timeout was previously detected. This can only solve issues, if the connection target was just slow, and not completely unresponsive. If the target became truly unresponsive, but came back online later, the sync attempt can cause further problems. Use with caution!

(Timeouts should be avoided when possible, use a temporary override on exchanges that are expected to finish in a longer time.)

close() None[source]

Shuts down and closes the socket.

exchange(cmd: str) str[source]

Sends message through the socket, and receives the corresponding response.

Parameters:
cmd: str

Message to send.

Returns:

Response to the sent message.

Return type:

str

Raises:
  • ConnectionError – The socket is not connected or closed to writing or reading.

  • TimeoutError – Data was not received within the timeout period.

exchange_binary(data: bytes) bytes[source]

Sends a block of data through the socket, and receives the corresponding response.

Parameters:
data: bytes

Message to send.

Returns:

Response to the sent data.

Return type:

bytes

Raises:
  • ConnectionError – The socket is not connected or closed to writing or reading.

  • TimeoutError – Data was not received within the timeout period.

is_open() bool[source]

Checks if the socket currently open and connected.

Returns:

State of the socket.

Return type:

bool

receive() str[source]

Receives a single message from the socket.

Returns:

Received message.

Return type:

str

Raises:
  • ConnectionError – The socket is not connected or closed to writing.

  • TimeoutError – Data was not received within the timeout period.

receive_binary() bytes[source]

Receives a single binary data block from the socket.

Returns:

Received data.

Return type:

bytes

Raises:
  • ConnectionError – The socket is not connected or closed to writing.

  • TimeoutError – Data was not received within the timeout period.

reset() None[source]

Resets the connection by closing the socket and opening a new one, and resetting the internal state. This could be used to recover from a desync (possibly caused by a timeout).

Warning

Trying to recover after repeated timeouts with a hard reset can cause further issues, if the reset is attempted while responses were finally being received. It is recommended to wait some time after the last command was sent, before resetting.

send(message: str) None[source]

Sends a single message through the socket.

Parameters:
message: str

Message to send.

Raises:

ConnectionError – The socket is not connected or closed to writing.

send_binary(data: bytes) None[source]

Sends a single message through the socket.

Parameters:
data: bytes

Data to send.

Raises:

ConnectionError – The socket is not connected or closed to writing.

timeout_override(timeout: int | None) Generator[None, None, None][source]

Context manager that temporarily overrides connection parameters.

Parameters:
timeout: int | None

Temporary timeout in seconds. Set to None to wait indefinitely.

Returns:

Context manager generator object.

Return type:

Generator

Warning

An indefinite timeout might leave the connection in a perpetual waiting state, if the instrument became unresponsive in the mean time (e.g. it powered off due to low battery charge).

Example

>>> import socket
>>> soc = socket.socket(
>>>     socket.AF_INET,
>>>     socket.SOCK_STREAM,
>>>     socket.IPPROTO_TCP
>>> )
>>> soc.connect(("192.168.0.1", 1212))
>>> with SocketConnection(soc) as conn:
...     # normal operation
...
...     # potentially long operation
...     with conn.timeout_override(20):
...         answer = conn.exchange("message")
...
...     # continue normal operation
...
crc16_bitwise(value: str | bytes) int[source]

Bitwise CRC-16/ARC calculation method.

Adapted from: https://stackoverflow.com/a/68095008

Parameters:
value: str | bytes

Data to digest.

Returns:

CRC-16 checksum

Return type:

int

crc16_bytewise(value: str | bytes) int[source]

Bytewise CRC-16/ARC calculation method.

The method uses a precomputed lookup table to reduce the number of required operations to calculate a checksum.

Parameters:
value: str | bytes

Data to digest.

Returns:

CRC-16 checksum

Return type:

int

get_dummy_logger(name: str = 'geocompy.dummy') Logger[source]

Utility function that sets up a dummy logger instance, that does not propagate records, and logs to the nulldevice.

Parameters:
name: str = 'geocompy.dummy'

Logger name.

Returns:

Dummy logger.

Return type:

logging.Logger

open_serial(port: str, *, speed: int = 9600, databits: int = 8, stopbits: int = 1, parity: str = 'N', timeout: int = 15, eom: str = '\r\n', eoa: str = '\r\n', sync_after_timeout: bool = False, attempts: int = 1, logger: Logger | None = None) SerialConnection[source]

Constructs a SerialConnection with the given communication parameters.

Parameters:
port: str

Name of the port to use (e.g. COM1 or /dev/ttyUSB0).

speed: int = 9600

Communication speed (baud), by default 9600

databits: int = 8

Number of data bits, by default 8

stopbits: int = 1

Number of stop bits, by default 1

parity: str = 'N'

Parity bit behavior, by default PARITY_NONE

timeout: int = 15

Communication timeout threshold, by default 15

eom: str = '\r\n'

EndOfMessage sequence, by default "\r\n"

eoa: str = '\r\n'

EndOfAnswer sequence, by default "\r\n"

sync_after_timeout: bool = False

Attempt to re-sync the message-response que, if a timeout occured in the previous exchange, by default False

attempts: int = 1

Number of attempts at opening the connection, by default 1

logger: Logger | None = None

Logger instance to use to log connection related events. Defaults to a dummy logger when not specified, by default None

Return type:

SerialConnection

Raises:

ConnectionRefusedError – Serial port could not be opened.

Warning

The syncing feature should be used with caution! See SerialConnection for more information!

Examples

Opening a serial connection similar to a file:

>>> conn = open_serial("COM1", speed=19200, timeout=5)
>>> # do operations
>>> conn.close()

Using as a context manager:

>>> with open_serial("COM1", timeout=20) as conn:
...     conn.send("test")
open_socket(address: str, port: int, protocol: 'rfcomm' | 'tcp', *, timeout: int = 15, eom: str = '\r\n', eoa: str = '\r\n', sync_after_timeout: bool = False, attempts: int = 1, logger: Logger | None = None) SocketConnection[source]

Constructs a SocketConnection with the given communication parameters.

Parameters:
address: str

Address of the target device (MAC for RFCOMM Bluetooth, IP for TCP)

port: int

Connection port/channel (RFCOMM channel or TCP port).

protocol: 'rfcomm' | 'tcp'

Protocol to use for connection.

timeout: int = 15

Communication timeout threshold, by default 15

eom: str = '\r\n'

EndOfMessage sequence, by default "\r\n"

eoa: str = '\r\n'

EndOfAnswer sequence, by default "\r\n"

sync_after_timeout: bool = False

Attempt to re-sync the message-response que, if a timeout occured in the previous exchange, by default False

attempts: int = 1

Number of attempts at opening the connection, by default 1

logger: Logger | None = None

Logger instance to use to log connection related events. Defaults to a dummy logger when not specified, by default None

Return type:

SocketConnection

Raises:

ConnectionRefusedError – Socket could not be opened.

Warning

The syncing feature should be used with caution! See SocketConnection for more information!

Examples

Opening a socket connection through WLAN similar to a file:

>>> conn = open_socket("192.168.0.1", 1212, "tcp", timeout=5)
>>> # do operations
>>> conn.close()

Using as a context manager:

>>> with open_socket("192.168.0.1", 1212, "tcp", timeout=20) as conn:
...     conn.send("test")
DUMMYLOGGER = <Logger geocompy.dummy (WARNING)>[source]

Dummy logger instance to use when no logger is actually needed.