python socket 服务:ipv4、ipv6

507次阅读
没有评论

基本

  • 判断是ipv4、ipv6
class IPvStatus():

    def __init__(self, ip):
        self.ip = ip
    def ipv4(self):
        try:
            socket.inet_pton(socket.AF_INET, self.ip)
        except AttributeError:
            try:
                socket.inet_aton(self.ip)
            except socket.error:
                return False
            return self.ip.count('.') == 3
        except socket.error:
            return False
        return True

    def ipv6(self):
        try:
            socket.inet_pton(socket.AF_INET6, self.ip)
        except socket.error:
            return False
        return True
    
    def is_ip(self): # 是否是ip
        if self.ipv4() or self.ipv6():
            return True
        else:
            return False

客户端

import json
import struct

host = "127.0.0.1"
ip_status = IPvStatus(host)
if ip_status.ipv6():
    # ipv6
    mySocket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
    port = 4256
else:
    # ipv4
    mySocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    port = 5506
try:
    mySocket.connect((host, port))
except:
    pass
dic_data = {}
# 数据总长度
size = len(json.dumps(dic_data).encode("utf-8"))
# 模拟报头
f = struct.pack("l", size)
# 先发送报头
mySocket.send(f)
# 再发数据
mySocket.send(json.dumps(dic_data).encode("utf-8"))
msg = mySocket.recv(10240)
if msg:
    msg = json.loads(str(msg, encoding="utf-8"))
    mySocket.close()
else:
    mySocket.close()

服务端

  • ipv4
import socket
import json
import struct

# 创建套接字
mySocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置IP和端口
host = '0.0.0.0'
port = 5506
block_size = 512
recv_data_size = 0
# bind绑定该端口
mySocket.bind((host, port))
# 监听
mySocket.listen(10)
while True:
    client, address = mySocket.accept()
    while True:
        # 接受定长报头
        data_header = client.recv(struct.calcsize("l"))
        total_size = struct.unpack("l", data_header)
        # 真正数据总长度
        size = total_size[0]
        data_bytes = b''
        while recv_data_size < size:
            data = client.recv(block_size)
            data_bytes += data
            recv_data_size += len(data)
        if data_bytes:
            data_dic = str(data_bytes, encoding='utf-8')
            # 真实数据
            data_dic = json.loads(data_dic)
        break
  • ipv6
import socket
import json
import struct

# 创建套接字
mySocket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
# 设置IP和端口
host = ''
port = 4256
block_size = 512
recv_data_size = 0
# bind绑定该端口
mySocket.bind((host, port))
# 监听
mySocket.listen(10)
while True:
    client, address = mySocket.accept()
    while True:
        # 接受定长报头
        data_header = client.recv(struct.calcsize("l"))
        total_size = struct.unpack("l", data_header)
        # 真正数据总长度
        size = total_size[0]
        data_bytes = b''
        while recv_data_size < size:
            data = client.recv(block_size)
            data_bytes += data
            recv_data_size += len(data)
        if data_bytes:
            data_dic = str(data_bytes, encoding='utf-8')
            # 真实数据
            data_dic = json.loads(data_dic)
        break

注意:

​ ipv6服务端的host绑定空字符串

  • 优化完整代码
import socket
import json
import struct


class SocketMain():
    def __init__(self, ipv4_port=5506, ipv6_port=4256, block_size=512, host=None, port=None):
        self.ipv4_host = "127.0.0.1"
        self.ipv6_host = ""
        self.ipv4_port = ipv4_port
        self.ipv6_port = ipv6_port
        self.block_size = block_size
        self.recv_data_size = 0
        self.host = host
        self.port = port
		# 服务端
    def socket_server(self, is_ipv4=True):
        if is_ipv4:
            host = self.ipv4_host
            port = self.ipv4_port
            mySocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        else:
            host = self.ipv6_host
            port = self.ipv6_port
            mySocket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
        mySocket.bind((host, port))
        # 监听
        mySocket.listen(10)
        while True:
            client, address = mySocket.accept()
            while True:
                # 接受定长报头
                data_header = client.recv(struct.calcsize("l"))
                total_size = struct.unpack("l", data_header)
                # 真正数据总长度
                size = total_size[0]
                data_bytes = b''
                while self.recv_data_size < size:
                    data = client.recv(self.block_size)
                    data_bytes += data
                    self.recv_data_size += len(data)
                if data_bytes:
                    data_dic = str(data_bytes, encoding='utf-8')
                    # 真实数据
                    data_dic = json.loads(data_dic)
                    if data_dic:
                        return data_dic
                break
		# 客户端
    def Socket_client(self):
        if not (self.host and self.host):
            return False
        ip_status = IPvStatus(self.host)
        if ip_status.ipv6():
            # ipv6
            mySocket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
            port = 4256
        else:
            # ipv4
            mySocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            port = 5506
        try:
            mySocket.connect((self.host, port))
        except:
            pass
        dic_data = {}
        # 数据总长度
        size = len(json.dumps(dic_data).encode("utf-8"))
        # 模拟报头
        f = struct.pack("l", size)
        # 先发送报头
        mySocket.send(f)
        # 再发数据
        mySocket.send(json.dumps(dic_data).encode("utf-8"))
        msg = mySocket.recv(self.block_size)
        if msg:
            msg = json.loads(str(msg, encoding="utf-8"))
            mySocket.close()
        else:
            mySocket.close()
正文完
可以使用微信扫码关注公众号(ID:xzluomor)
post-qrcode
 
评论(没有评论)