ros_acomms_net_db subpackage#

Submodules#

ros_acomms_net_db.index_list module#

ros_acomms_net_db.index_list.compact(index_list)#
ros_acomms_net_db.index_list.get_intersection_indices(list_a, list_b)#
ros_acomms_net_db.index_list.get_touching_block(list_of_blocks, segment_to_touch)#
ros_acomms_net_db.index_list.invert_index_list(index_list: list, limits: list) list#

ros_acomms_net_db.net_database module#

net_database#

Database model definitions for ros_acomms_net

class ros_acomms_net_db.net_database.AppResponse(*args, **kwargs)#

Bases: BaseModel

DoesNotExist#

alias of AppResponseDoesNotExist

encoded_packet = <BlobField: AppResponse.encoded_packet>#
encoded_packet_size_in_bits = <IntegerField: AppResponse.encoded_packet_size_in_bits>#
id = <AutoField: AppResponse.id>#
net_message = <ForeignKeyField: AppResponse.net_message>#
net_message_id = <ForeignKeyField: AppResponse.net_message>#
payload = <BlobField: AppResponse.payload>#
payload_size_in_bits = <IntegerField: AppResponse.payload_size_in_bits>#
pending_transmit = <BooleanField: AppResponse.pending_transmit>#
tags#
class ros_acomms_net_db.net_database.AppResponseTag(*args, **kwargs)#

Bases: BaseModel

DoesNotExist#

alias of AppResponseTagDoesNotExist

app_response = <ForeignKeyField: AppResponseTag.app_response>#
app_response_id = <ForeignKeyField: AppResponseTag.app_response>#
id = <AutoField: AppResponseTag.id>#
tag = <CharField: AppResponseTag.tag>#
class ros_acomms_net_db.net_database.BaseModel(*args, **kwargs)#

Bases: Model

base model for all database models

DoesNotExist#

alias of BaseModelDoesNotExist

id = <AutoField: BaseModel.id>#
class ros_acomms_net_db.net_database.LinkPath(*args, **kwargs)#

Bases: BaseModel

DoesNotExist#

alias of LinkPathDoesNotExist

id = <AutoField: LinkPath.id>#
modem = <CharField: LinkPath.modem>#
net_destination = <IntegerField: LinkPath.net_destination>#
net_message = <ForeignKeyField: LinkPath.net_message>#
net_message_id = <ForeignKeyField: LinkPath.net_message>#
net_source = <IntegerField: LinkPath.net_source>#
rx_time = <DateTimeField: LinkPath.rx_time>#
class ros_acomms_net_db.net_database.MessageTag(*args, **kwargs)#

Bases: BaseModel

DoesNotExist#

alias of MessageTagDoesNotExist

id = <AutoField: MessageTag.id>#
net_message = <ForeignKeyField: MessageTag.net_message>#
net_message_id = <ForeignKeyField: MessageTag.net_message>#
tag = <CharField: MessageTag.tag>#
class ros_acomms_net_db.net_database.NetMessage(*args, **kwargs)#

Bases: BaseModel

DoesNotExist#

alias of NetMessageDoesNotExist

ack_type = <IntegerField: NetMessage.ack_type>#
all_fragments_acked_up_to(ack_type: int)#
all_fragments_at_state(state: int)#
property all_fragments_available#
property all_fragments_endpoint_acked#
property all_fragments_hop_acked#
property all_fragments_sent#
app_active = <BooleanField: NetMessage.app_active>#
app_responses#
append_received_fragment(start_idx: int, length_in_blocks: int, fragment_payload: bytes)#
application_state = <IntegerField: NetMessage.application_state>#
block_size_in_bits = <IntegerField: NetMessage.block_size_in_bits>#
comms_active = <BooleanField: NetMessage.comms_active>#
comms_state = <IntegerField: NetMessage.comms_state>#
created_at = <DateTimeField: NetMessage.created_at>#
encoded_packet = <BlobField: NetMessage.encoded_packet>#
encoded_packet_size_in_bits = <IntegerField: NetMessage.encoded_packet_size_in_bits>#
fragmentation_allowed = <BooleanField: NetMessage.fragmentation_allowed>#
fragments_status#
get_available_fragment_list() list#
get_fragment_status_list(status: int) list#
get_inverse_indices(ack_type: int) -> (<class 'int'>, <class 'int'>)#
get_next_fragment(max_size_bits: int, ack_type: int) Tuple[int, int, bytes]#
get_num_blocks_in_state(fragment_state: int | None)#
get_pending_ack_fragment_list(ack_type: int | None = None) list#
get_queued_ack_fragment_list(ack_type: int) list#
id = <AutoField: NetMessage.id>#
is_delivered_to_me = <BooleanField: NetMessage.is_delivered_to_me>#
is_fragmented = <BooleanField: NetMessage.is_fragmented>#
mark_fragment_acked(start_idx: int, fragment_size_in_blocks: int, ack_type)#
mark_fragment_pending_send(start_idx: int, end_idx: int)#
mark_fragment_state(start_idx: int, end_idx: int, fragment_state: int)#
mark_full_payload_available()#
mark_pending_fragments_as_sent()#
net_destination = <IntegerField: NetMessage.net_destination>#
net_priority = <IntegerField: NetMessage.net_priority>#
net_source = <IntegerField: NetMessage.net_source>#
num_acks_forwarded = <IntegerField: NetMessage.num_acks_forwarded>#
num_app_responses_forwarded = <IntegerField: NetMessage.num_app_responses_forwarded>#
property num_available_blocks#
originator_uuid = <CharField: NetMessage.originator_uuid>#
payload = <BlobField: NetMessage.payload>#
payload_size_in_bits = <IntegerField: NetMessage.payload_size_in_bits>#
property payload_size_in_blocks#
pending_acks#
send_an_ack = <BooleanField: NetMessage.send_an_ack>#
tags#
transaction_id = <IntegerField: NetMessage.transaction_id>#
transmit_events#
tx_tries_remaining = <IntegerField: NetMessage.tx_tries_remaining>#
unknown_content = <BooleanField: NetMessage.unknown_content>#
update_queued_ack_fragments(start_idx: int, length_in_blocks: int, ack_type: int | None)#
class ros_acomms_net_db.net_database.NetMessageFragmentStatus(*args, **kwargs)#

Bases: BaseModel

DoesNotExist#

alias of NetMessageFragmentStatusDoesNotExist

fragment_end_index = <IntegerField: NetMessageFragmentStatus.fragment_end_index>#
fragment_start_index = <IntegerField: NetMessageFragmentStatus.fragment_start_index>#
fragment_state = <IntegerField: NetMessageFragmentStatus.fragment_state>#
id = <AutoField: NetMessageFragmentStatus.id>#
net_message = <ForeignKeyField: NetMessageFragmentStatus.net_message>#
net_message_id = <ForeignKeyField: NetMessageFragmentStatus.net_message>#
class ros_acomms_net_db.net_database.PendingAckToTransmit(*args, **kwargs)#

Bases: BaseModel

DoesNotExist#

alias of PendingAckToTransmitDoesNotExist

ack_type = <IntegerField: PendingAckToTransmit.ack_type>#
encoded_packet = <BlobField: PendingAckToTransmit.encoded_packet>#
encoded_packet_size_in_bits = <IntegerField: PendingAckToTransmit.encoded_packet_size_in_bits>#
end_index = <IntegerField: PendingAckToTransmit.end_index>#
id = <AutoField: PendingAckToTransmit.id>#
is_fragment = <BooleanField: PendingAckToTransmit.is_fragment>#
net_message = <ForeignKeyField: PendingAckToTransmit.net_message>#
net_message_id = <ForeignKeyField: PendingAckToTransmit.net_message>#
property size_in_blocks#
start_index = <IntegerField: PendingAckToTransmit.start_index>#
updated_at = <DateTimeField: PendingAckToTransmit.updated_at>#
class ros_acomms_net_db.net_database.TransmitEvent(*args, **kwargs)#

Bases: BaseModel

DoesNotExist#

alias of TransmitEventDoesNotExist

id = <AutoField: TransmitEvent.id>#
interface = <CharField: TransmitEvent.interface>#
net_message = <ForeignKeyField: TransmitEvent.net_message>#
net_message_id = <ForeignKeyField: TransmitEvent.net_message>#
transmit_time = <DateTimeField: TransmitEvent.transmit_time>#
ros_acomms_net_db.net_database.on_save_handler(model_class, instance, created)#

ros_acomms_net_db.net_database_handler module#

net_database#

Database setup

class ros_acomms_net_db.net_database_handler.NetDatabaseHandler(init_db: bool = False)#

Bases: object

Class to handle database setup and reset

models = [<Model: NetMessage>, <Model: PendingAckToTransmit>, <Model: LinkPath>, <Model: AppResponse>, <Model: NetMessageFragmentStatus>, <Model: TransmitEvent>, <Model: MessageTag>, <Model: AppResponseTag>]#
reset_db()#

Function to reset the database